diff options
Diffstat (limited to 'Frameworks/SPMySQLFramework/Source/SPMySQLFastStreamingResult.m')
-rw-r--r-- | Frameworks/SPMySQLFramework/Source/SPMySQLFastStreamingResult.m | 417 |
1 files changed, 417 insertions, 0 deletions
diff --git a/Frameworks/SPMySQLFramework/Source/SPMySQLFastStreamingResult.m b/Frameworks/SPMySQLFramework/Source/SPMySQLFastStreamingResult.m new file mode 100644 index 00000000..f084dee0 --- /dev/null +++ b/Frameworks/SPMySQLFramework/Source/SPMySQLFastStreamingResult.m @@ -0,0 +1,417 @@ +// +// $Id$ +// +// SPMySQLFastStreamingResult.m +// SPMySQLFramework +// +// Created by Rowan Beentje (rowan.beent.je) on February 2, 2012 +// Copyright (c) 2012 Rowan Beentje. All rights reserved. +// +// Permission is hereby granted, free of charge, to any person +// obtaining a copy of this software and associated documentation +// files (the "Software"), to deal in the Software without +// restriction, including without limitation the rights to use, +// copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following +// conditions: +// +// The above copyright notice and this permission notice shall be +// included in all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +// EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES +// OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +// NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT +// HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, +// WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +// OTHER DEALINGS IN THE SOFTWARE. +// +// More info at <http://code.google.com/p/sequel-pro/> + +#import "SPMySQLFastStreamingResult.h" +#import "SPMySQL Private APIs.h" +#include <pthread.h> + +/** + * This type of streaming result operates in a multithreaded fashion - a worker + * thread is set up to download the results as fast as possible in the background, + * while the results are made available via blocking (and so single-thread-compatible) + * calls. This provides the benefit of allowing a progress bar to be shown during + * downloads, and threaded processing, but still has reasonable memory usage for the + * downloaded result - and won't block the server. + */ + +typedef struct st_spmysqlstreamingrowdata { + char *data; + NSUInteger *dataLengths; + struct st_spmysqlstreamingrowdata *nextRow; +} SPMySQLStreamingRowData; + +@interface SPMySQLFastStreamingResult (Private_API) + +- (void) _downloadAllData; + +@end + +#pragma mark - + +@implementation SPMySQLFastStreamingResult + +#pragma mark - + +/** + * Standard init method, constructing the SPMySQLStreamingResult around a MySQL + * result pointer and the encoding to use when working with the data. + * As opposed to SPMySQLResult, defaults to returning rows as arrays, as the result + * sets are likely to be larger and processed in loops. + */ +- (id)initWithMySQLResult:(void *)theResult stringEncoding:(NSStringEncoding)theStringEncoding connection:(SPMySQLConnection *)theConnection +{ + + // If no result set was passed in, return nil. + if (!theResult) return nil; + + if ((self = [super initWithMySQLResult:theResult stringEncoding:theStringEncoding connection:theConnection])) { + + // Initialise the extra streaming result counts and tracking + processedRowCount = 0; + + // Initialise the linked list pointers + currentDataStoreEntry = NULL; + lastDataStoreEntry = NULL; + + // Set up the linked list lock + pthread_mutex_init(&dataLock, NULL); + + // Start the data download thread + [NSThread detachNewThreadSelector:@selector(_downloadAllData) toTarget:self withObject:nil]; + } + + return self; +} + +/** + * Deallocate the result and ensure the parent connection is unlocked for further use. + */ +- (void)dealloc +{ + + // Ensure all data is processed and the parent connection is unlocked + [self cancelResultLoad]; + + // Destroy the linked list lock + pthread_mutex_destroy(&dataLock); + + // Call dealloc on super to clean up everything else, and to throw an exception if + // the parent connection hasn't been cleaned up correctly. + [super dealloc]; +} + +#pragma mark - +#pragma mark Data retrieval + +/** + * Override the convenience selectors so that forwarding works correctly. + */ +- (id)getRow +{ + return SPMySQLResultGetRow(self, SPMySQLResultRowAsDefault); +} +- (NSArray *)getRowAsArray +{ + return SPMySQLResultGetRow(self, SPMySQLResultRowAsArray); +} +- (NSDictionary *)getRowAsDictionary +{ + return SPMySQLResultGetRow(self, SPMySQLResultRowAsDictionary); +} + +/** + * Retrieve the next row in the result set, using the internal pointer, in the specified + * return format. + * If there are no rows remaining in the current iteration, returns nil. + */ +- (id)getRowAsType:(SPMySQLResultRowType)theType +{ + NSUInteger copiedDataLength = 0; + char *theRowData; + NSUInteger *fieldLengths; + id theReturnData; + + // Lock the data mutex for safe access of variables and counters + pthread_mutex_lock(&dataLock); + + // Determine whether any data is available; if not, wait 1ms before trying again + while (!dataDownloaded && processedRowCount == downloadedRowCount) { + pthread_mutex_unlock(&dataLock); + usleep(1000); + pthread_mutex_lock(&dataLock); + } + + // If all rows have been processed, the end of the result set has been reached; return nil. + if (processedRowCount == downloadedRowCount) { + pthread_mutex_unlock(&dataLock); + return nil; + } + + // Unlock the data mutex now checks are complete + pthread_mutex_unlock(&dataLock); + + // Get a reference to the data for the current row; this is safe to do outside the lock + // as the pointer won't change until markers are changed at the end of this process + theRowData = currentDataStoreEntry->data; + fieldLengths = currentDataStoreEntry->dataLengths; + + // If the target type was unspecified, use the instance default + if (theType == SPMySQLResultRowAsDefault) theType = defaultRowReturnType; + + // Set up the return data as appropriate + if (theType == SPMySQLResultRowAsArray) { + theReturnData = [NSMutableArray arrayWithCapacity:numberOfFields]; + } else { + theReturnData = [NSMutableDictionary dictionaryWithCapacity:numberOfFields]; + } + + // Convert each of the cells in the row in turn + for (NSUInteger i = 0; i < numberOfFields; i++) { + char *rawCellData; + NSUInteger fieldLength = fieldLengths[i]; + + // If the length of this cell is NSNotFound, it's a null reference + if (fieldLength == NSNotFound) { + rawCellData = NULL; + + // Otherwise grab a reference to that data using pointer arithmetic + } else { + rawCellData = theRowData + copiedDataLength; + copiedDataLength += fieldLength; + } + + // Convert to the correct object type + id cellData = SPMySQLResultGetObject(self, rawCellData, fieldLength, fieldTypes[i], i); + + // If object creation failed, display a null + if (!cellData) cellData = [NSNull null]; + + // Add to the result array/dictionary + if (theType == SPMySQLResultRowAsArray) { + [(NSMutableArray *)theReturnData addObject:cellData]; + } else { + [(NSMutableDictionary *)theReturnData setObject:cellData forKey:fieldNames[i]]; + } + } + + // Get a reference to the current item + SPMySQLStreamingRowData *previousDataStoreEntry = currentDataStoreEntry; + + // Lock the mutex before updating counters and linked lists + pthread_mutex_lock(&dataLock); + + // Update the active-data pointer to the next item in the list (which may be NULL) + currentDataStoreEntry = currentDataStoreEntry->nextRow; + + // Increment the processed counter and row index + processedRowCount++; + currentRowIndex++; + if (dataDownloaded && processedRowCount == downloadedRowCount) currentRowIndex = NSNotFound; + + // Unlock the mutex + pthread_mutex_unlock(&dataLock); + + // Free the memory for the processed row + previousDataStoreEntry->nextRow = NULL; + free(previousDataStoreEntry->dataLengths); + if (previousDataStoreEntry->data != NULL) free(previousDataStoreEntry->data); + free(previousDataStoreEntry); + + return theReturnData; +} + +/* + * Ensure the result set is fully processed and freed without any processing + * This method ensures that the connection is unlocked. + */ +- (void)cancelResultLoad +{ + + // If data has already been downloaded successfully, no further action is required + if (dataDownloaded && processedRowCount == downloadedRowCount) return; + + // Loop until all data is fetched and freed + while (1) { + + // Check to see whether we need to wait for the data to be available + // - if so, wait 1ms before checking again + while (!dataDownloaded && processedRowCount == downloadedRowCount) usleep(1000); + + // If all rows have been processed, we're at the end of the result set - return + if (processedRowCount == downloadedRowCount) { + + // We don't need to unlock the connection because the data loading thread + // has already taken care of that + return; + } + + // Mark the row entry as processed without performing any actions + pthread_mutex_lock(&dataLock); + SPMySQLStreamingRowData *previousDataStoreEntry = currentDataStoreEntry; + + // Update the active-data pointer to the next item in the list (which may be NULL) + currentDataStoreEntry = currentDataStoreEntry->nextRow; + + processedRowCount++; + currentRowIndex++; + if (dataDownloaded && processedRowCount == downloadedRowCount) currentRowIndex = NSNotFound; + + // Unlock the mutex + pthread_mutex_unlock(&dataLock); + + // Free the memory for the processed row + previousDataStoreEntry->nextRow = NULL; + free(previousDataStoreEntry->dataLengths); + if (previousDataStoreEntry->data != NULL) free(previousDataStoreEntry->data); + free(previousDataStoreEntry); + } +} + +#pragma mark - +#pragma mark Data retrieval for fast enumeration + +/** + * Implement the fast enumeration endpoint. Rows for fast enumeration are retrieved in + * the instance default, as specified in setDefaultRowReturnType: or defaulting to + * NSDictionary. + */ +- (NSUInteger)countByEnumeratingWithState:(NSFastEnumerationState *)state objects:(id *)stackbuf count:(NSUInteger)len +{ + + // If all rows have already been processed, return 0 to stop iteration. + if (dataDownloaded && processedRowCount == downloadedRowCount) return 0; + + // If the MySQL row pointer does not match the requested state, throw an exception + if (state->state != currentRowIndex) { + [NSException raise:NSRangeException format:@"SPMySQLFastStreamingResult results can only be accessed linearly"]; + } + + // Determine how many objects to return. Default to 128 items, or the number of items requested + NSUInteger itemsToReturn = 128; + if (len < 128) itemsToReturn = len; + + // If there are fewer items available in the downloaded-but-processed queue, limit to that + if (downloadedRowCount - processedRowCount < itemsToReturn) { + itemsToReturn = downloadedRowCount - processedRowCount; + } + + // If no rows are available to be processed, wait for a single item to be readied. + if (!itemsToReturn) itemsToReturn = 1; + + // Retrieve rows and add them to the result stack + NSUInteger i, itemsRetrieved = 0; + id eachRow; + for (i = 0; i < itemsToReturn; i++) { + eachRow = SPMySQLResultGetRow(self, SPMySQLResultRowAsDefault); + + // If nil was returned the end of the result resource has been reached + if (!eachRow) { + if (!itemsRetrieved) return 0; + break; + } + + stackbuf[i] = eachRow; + itemsRetrieved++; + } + + state->state += itemsRetrieved; + state->itemsPtr = stackbuf; + state->mutationsPtr = (unsigned long *)self; + + return itemsRetrieved; +} + +@end + +#pragma mark - +#pragma mark Result set internals + +@implementation SPMySQLFastStreamingResult (Private_API) + +/** + * Used internally to download results in a background thread + */ +- (void)_downloadAllData +{ + NSAutoreleasePool *downloadPool = [[NSAutoreleasePool alloc] init]; + MYSQL_ROW theRow; + unsigned long *fieldLengths; + NSUInteger i, dataCopiedLength, rowDataLength; + SPMySQLStreamingRowData *newRowStore; + + size_t sizeOfStreamingRowData = sizeof(SPMySQLStreamingRowData); + size_t sizeOfDataLengths = (size_t)(sizeof(NSUInteger) * numberOfFields); + size_t sizeOfChar = sizeof(char); + + // Loop through the rows until the end of the data is reached - indicated via a NULL + while ( + (*isConnectedPtr)(parentConnection, isConnectedSelector) + && (theRow = mysql_fetch_row(resultSet)) + ) + { + + // Retrieve the lengths of the returned data + fieldLengths = mysql_fetch_lengths(resultSet); + rowDataLength = 0; + dataCopiedLength = 0; + for (i = 0; i < numberOfFields; i++) { + rowDataLength += fieldLengths[i]; + } + + // Initialise memory for the row and set a NULL pointer for the next item + newRowStore = malloc(sizeOfStreamingRowData); + newRowStore->nextRow = NULL; + + // Set up the row data store - a char* - and copy in the data if there is any. + newRowStore->data = malloc(sizeOfChar * rowDataLength); + for (i = 0; i < numberOfFields; i++) { + if (theRow[i] != NULL) { + memcpy(newRowStore->data+dataCopiedLength, theRow[i], fieldLengths[i]); + dataCopiedLength += fieldLengths[i]; + } else { + fieldLengths[i] = NSNotFound; + } + } + + // Set up the memory for, and copy in, the field lengths + newRowStore->dataLengths = memcpy(malloc(sizeOfDataLengths), fieldLengths, sizeOfDataLengths); + + // Lock the data mutex + pthread_mutex_lock(&dataLock); + + // Add the newly allocated row to end of the storage linked list + if (lastDataStoreEntry) { + lastDataStoreEntry->nextRow = newRowStore; + } + lastDataStoreEntry = newRowStore; + if (!currentDataStoreEntry) currentDataStoreEntry = newRowStore; + + // Update the downloaded row count + downloadedRowCount++; + + // Unlock the mutex + pthread_mutex_unlock(&dataLock); + } + + // Update the connection's error statuses to reflect any errors during the content download + [parentConnection _updateLastErrorID:NSNotFound]; + [parentConnection _updateLastErrorMessage:nil]; + + // Unlock the parent connection now all data has been retrieved + [parentConnection _unlockConnection]; + connectionUnlocked = YES; + + dataDownloaded = YES; + [downloadPool drain]; +} + +@end |