diff options
author | rowanbeentje <rowan@beent.je> | 2009-08-20 00:41:30 +0000 |
---|---|---|
committer | rowanbeentje <rowan@beent.je> | 2009-08-20 00:41:30 +0000 |
commit | a3f0cd7a5c21c87f154956cb645cb41b1bb35821 (patch) | |
tree | 444ec8c61df6c41bf4ab7c67b68d1d6666ff244d /Frameworks/MCPKit/MCPFoundationKit/MCPStreamingResult.m | |
parent | b30cc9d877f2dab24901ac9cf5c6e1f1835fb454 (diff) | |
download | sequelpro-a3f0cd7a5c21c87f154956cb645cb41b1bb35821.tar.gz sequelpro-a3f0cd7a5c21c87f154956cb645cb41b1bb35821.tar.bz2 sequelpro-a3f0cd7a5c21c87f154956cb645cb41b1bb35821.zip |
- Change MCPStreamingResult to use a safer streaming mode by default - download all results as fast as possible from the server, to avoid blocking, but do so in a background thread to allow results processing to start as soon as data is available. Many thanks to Hans-Jörg Bibiko for assistance with this.
- Add an option to the SQL export dialog to allow selection of the full-streaming method, with a warning that it may block table UPDATES/INSERTS.
Diffstat (limited to 'Frameworks/MCPKit/MCPFoundationKit/MCPStreamingResult.m')
-rw-r--r-- | Frameworks/MCPKit/MCPFoundationKit/MCPStreamingResult.m | 434 |
1 files changed, 314 insertions, 120 deletions
diff --git a/Frameworks/MCPKit/MCPFoundationKit/MCPStreamingResult.m b/Frameworks/MCPKit/MCPFoundationKit/MCPStreamingResult.m index 5eb147d3..63353056 100644 --- a/Frameworks/MCPKit/MCPFoundationKit/MCPStreamingResult.m +++ b/Frameworks/MCPKit/MCPFoundationKit/MCPStreamingResult.m @@ -1,5 +1,5 @@ // -// $Id$ +// $Id$ // // MCPStreamingResult.m // sequel-pro @@ -31,11 +31,24 @@ /** * IMPORTANT NOTE * - * MCPStreamingResult can produce fast and low-memory result reads, but should not - * be widely used for reads as it can result in MySQL thread or table blocking. + * MCPStreamingResult can operate in two modes. The default mode is a safe implementation, + * which operates in a multithreaded fashion - a worker thread is set up to download the results as + * fast as possible in the background, while the results are made available via a blocking (and so + * single-thread-compatible) fetchNextRowAsArray call. This provides the benefit of allowing a progress + * bar to be shown during downloads, and threaded processing, but still has reasonable memory usage for + * the downloaded result - and won't block the server. + * Alternatively, withFullStreaming: can be set to YES, in which case each row will be accessed on-demand; + * this can be dangerous as it means a SELECT will tie up the server for longer, as for MyISAM tables + * updates (and subsequent reads) must block while a SELECT is still running. However this can be useful + * for certain processes such as working with very large tables to keep memory usage low. */ +@interface MCPStreamingResult (PrivateAPI) +- (void) _downloadAllData; +- (void) _freeAllDataWhenDone; +@end + @implementation MCPStreamingResult : MCPResult #pragma mark - @@ -47,36 +60,64 @@ */ - (id)initWithMySQLPtr:(MYSQL *)mySQLPtr encoding:(NSStringEncoding)theEncoding timeZone:(NSTimeZone *)theTimeZone connection:(MCPConnection *)theConnection { - if ((self = [super init])) { - mEncoding = theEncoding; - mTimeZone = [theTimeZone retain]; - parentConnection = theConnection; - - if (mResult) { - mysql_free_result(mResult); - mResult = NULL; - } - - if (mNames) { - [mNames release]; - mNames = NULL; - } - - mResult = mysql_use_result(mySQLPtr); - - if (mResult) { - mNumOfFields = mysql_num_fields(mResult); - fieldDefinitions = mysql_fetch_fields(mResult); - } else { - mNumOfFields = 0; - } - - if (mMySQLLocales == NULL) { - mMySQLLocales = [[MCPConnection getMySQLLocales] retain]; - } - } - - return self; + return [self initWithMySQLPtr:mySQLPtr encoding:theEncoding timeZone:theTimeZone connection:theConnection withFullStreaming:NO]; + +} + +/** + * Master initialisation method, allowing selection of either full streaming or safe streaming + * (see "important note" above) + */ +- (id)initWithMySQLPtr:(MYSQL *)mySQLPtr encoding:(NSStringEncoding)theEncoding timeZone:(NSTimeZone *)theTimeZone connection:(MCPConnection *)theConnection withFullStreaming:(BOOL)useFullStreaming +{ + if ((self = [super init])) { + mEncoding = theEncoding; + mTimeZone = [theTimeZone retain]; + parentConnection = theConnection; + fullyStreaming = useFullStreaming; + + if (mResult) { + mysql_free_result(mResult); + mResult = NULL; + } + + if (mNames) { + [mNames release]; + mNames = NULL; + } + + mResult = mysql_use_result(mySQLPtr); + + if (mResult) { + mNumOfFields = mysql_num_fields(mResult); + fieldDefinitions = mysql_fetch_fields(mResult); + } else { + mNumOfFields = 0; + } + + // If the result is opened in download-data-fast safe mode, set up the additional variables + // and threads required. + if (!fullyStreaming) { + dataDownloaded = NO; + dataFreed = NO; + localDataStore = NULL; + currentDataStoreEntry = NULL; + localDataStoreLastEntry = NULL; + localDataRows = 0; + localDataAllocated = 0; + downloadedRowCount = 0; + processedRowCount = 0; + freedRowCount = 0; + + // Start the data download thread + [NSThread detachNewThreadSelector:@selector(_downloadAllData) toTarget:self withObject:nil]; + + // Start the data freeing thread + [NSThread detachNewThreadSelector:@selector(_freeAllDataWhenDone) toTarget:self withObject:nil]; + } + } + + return self; } /** @@ -84,9 +125,9 @@ */ - (void) dealloc { - [parentConnection unlockConnection]; - - [super dealloc]; + [parentConnection unlockConnection]; + + [super dealloc]; } #pragma mark - @@ -98,100 +139,149 @@ */ - (NSArray *)fetchNextRowAsArray { - MYSQL_ROW theRow; - unsigned long *fieldLengths; - int i; - NSMutableArray *returnArray; + MYSQL_ROW theRow; + char *theRowData; + unsigned long *fieldLengths; + int i, copiedDataLength; + NSMutableArray *returnArray; - // Retrieve the next row - theRow = mysql_fetch_row(mResult); + // Retrieve the next row according to the mode this result set is in. + // If fully streaming, retrieve the MYSQL_ROW + if (fullyStreaming) { + theRow = mysql_fetch_row(mResult); - // If no data was returned, we're at the end of the result set - return nil. - if (theRow == NULL) return nil; + // If no data was returned, we're at the end of the result set - return nil. + if (theRow == NULL) return nil; - // Retrieve the lengths of the returned data - fieldLengths = mysql_fetch_lengths(mResult); + // Retrieve the lengths of the returned data + fieldLengths = mysql_fetch_lengths(mResult); - // Initialise the array to return - returnArray = [NSMutableArray arrayWithCapacity:mNumOfFields]; - for (i = 0; i < mNumOfFields; i++) { - id cellData; + // If in cached-streaming/fast download mode, get a reference to the data for the current row + } else { + copiedDataLength = 0; + + // Check to see whether we need to wait for the data to be availabe + // - if so, wait 1ms before checking again + while (!dataDownloaded && processedRowCount == downloadedRowCount) { + usleep(1000); + } - // Use NSNulls for the NULL data type - if (theRow[i] == NULL) { - cellData = [NSNull null]; + // If all rows have been processed, we're at the end of the result set - return nil + // once all memory has been freed + if (processedRowCount == downloadedRowCount) { + while (!dataFreed) usleep(1000); + return nil; + } - // Otherwise, switch by data type - } else { + // Retrieve a reference to the data and the associated lengths + theRowData = currentDataStoreEntry->data; + fieldLengths = currentDataStoreEntry->dataLengths; + } - // Create a null-terminated data string for processing - char *theData = calloc(sizeof(char), fieldLengths[i]+1); - memcpy(theData, theRow[i], fieldLengths[i]); - theData[fieldLengths[i]] = '\0'; - - switch (fieldDefinitions[i].type) { - case FIELD_TYPE_TINY: - case FIELD_TYPE_SHORT: - case FIELD_TYPE_INT24: - case FIELD_TYPE_LONG: - case FIELD_TYPE_LONGLONG: - case FIELD_TYPE_DECIMAL: - case FIELD_TYPE_NEWDECIMAL: - case FIELD_TYPE_FLOAT: - case FIELD_TYPE_DOUBLE: - case FIELD_TYPE_TIMESTAMP: - case FIELD_TYPE_DATE: - case FIELD_TYPE_TIME: - case FIELD_TYPE_DATETIME: - case FIELD_TYPE_YEAR: - case FIELD_TYPE_VAR_STRING: - case FIELD_TYPE_STRING: - case FIELD_TYPE_SET: - case FIELD_TYPE_ENUM: - case FIELD_TYPE_NEWDATE: // Don't know what the format for this type is... - cellData = [NSString stringWithCString:theData encoding:mEncoding]; - break; - - case FIELD_TYPE_BIT: - cellData = [NSString stringWithFormat:@"%u", theData[0]]; - break; - - case FIELD_TYPE_TINY_BLOB: - case FIELD_TYPE_BLOB: - case FIELD_TYPE_MEDIUM_BLOB: - case FIELD_TYPE_LONG_BLOB: - - // For binary data, return the data - if (fieldDefinitions[i].flags & BINARY_FLAG) { - cellData = [NSData dataWithBytes:theData length:fieldLengths[i]]; - - // For string data, convert to text - } else { - cellData = [[NSString alloc] initWithBytes:theData length:fieldLengths[i] encoding:mEncoding]; - if (cellData) [cellData autorelease]; - } - break; - - case FIELD_TYPE_NULL: - cellData = [NSNull null]; - break; - - default: - NSLog(@"in fetchNextRowAsArray : Unknown type : %d for column %d, sending back a NSData object", (int)fieldDefinitions[i].type, (int)i); - cellData = [NSData dataWithBytes:theData length:fieldLengths[i]]; - break; + // Initialise the array to return + returnArray = [NSMutableArray arrayWithCapacity:mNumOfFields]; + for (i = 0; i < mNumOfFields; i++) { + id cellData = nil; + char *theData; + + // In fully streaming mode, copy across the data for the MYSQL_ROW + if (fullyStreaming) { + if (theRow[i] == NULL) { + cellData = [NSNull null]; + } else { + theData = calloc(sizeof(char), fieldLengths[i]+1); + memcpy(theData, theRow[i], fieldLengths[i]); + theData[fieldLengths[i]] = '\0'; } + + // In cached-streaming mode, use a reference to the downloaded data + } else { + if (fieldLengths[i] == NSNotFound) { + cellData = [NSNull null]; + } else { + theData = theRowData+copiedDataLength; + copiedDataLength += fieldLengths[i] + 1; + } + } - free(theData); + // If the data hasn't already been detected as NULL - in which case it will have been + // set to NSNull - process the data by type + if (cellData == nil) { - // If a creator returned a nil object, replace with NSNull - if (cellData == nil) cellData = [NSNull null]; - } + switch (fieldDefinitions[i].type) { + case FIELD_TYPE_TINY: + case FIELD_TYPE_SHORT: + case FIELD_TYPE_INT24: + case FIELD_TYPE_LONG: + case FIELD_TYPE_LONGLONG: + case FIELD_TYPE_DECIMAL: + case FIELD_TYPE_NEWDECIMAL: + case FIELD_TYPE_FLOAT: + case FIELD_TYPE_DOUBLE: + case FIELD_TYPE_TIMESTAMP: + case FIELD_TYPE_DATE: + case FIELD_TYPE_TIME: + case FIELD_TYPE_DATETIME: + case FIELD_TYPE_YEAR: + case FIELD_TYPE_VAR_STRING: + case FIELD_TYPE_STRING: + case FIELD_TYPE_SET: + case FIELD_TYPE_ENUM: + case FIELD_TYPE_NEWDATE: // Don't know what the format for this type is... + cellData = [NSString stringWithCString:theData encoding:mEncoding]; + break; + + case FIELD_TYPE_BIT: + cellData = [NSString stringWithFormat:@"%u", theData[0]]; + break; + + case FIELD_TYPE_TINY_BLOB: + case FIELD_TYPE_BLOB: + case FIELD_TYPE_MEDIUM_BLOB: + case FIELD_TYPE_LONG_BLOB: + + // For binary data, return the data + if (fieldDefinitions[i].flags & BINARY_FLAG) { + cellData = [NSData dataWithBytes:theData length:fieldLengths[i]]; + + // For string data, convert to text + } else { + cellData = [[NSString alloc] initWithBytes:theData length:fieldLengths[i] encoding:mEncoding]; + if (cellData) [cellData autorelease]; + } + break; + + case FIELD_TYPE_NULL: + cellData = [NSNull null]; + break; + + default: + NSLog(@"in fetchNextRowAsArray : Unknown type : %d for column %d, sending back a NSData object", (int)fieldDefinitions[i].type, (int)i); + cellData = [NSData dataWithBytes:theData length:fieldLengths[i]]; + break; + } + + // Free the data if it was originally allocated + if (fullyStreaming) free(theData); + + // If a creator returned a nil object, replace with NSNull + if (cellData == nil) cellData = [NSNull null]; + } + + [returnArray insertObject:cellData atIndex:i]; + } - [returnArray insertObject:cellData atIndex:i]; + // If in cached-streaming mode, update the current entry processed count + if (!fullyStreaming) { + + // Update the active-data pointer to the next item in the list, or set to NULL if no more items + currentDataStoreEntry = currentDataStoreEntry->nextRow; + + // Increment counter + processedRowCount++; } - return returnArray; + return returnArray; } #pragma mark - @@ -199,13 +289,117 @@ - (my_ulonglong) numOfRows { - NSLog(@"numOfRows cannot be used with streaming results"); - return 0; + NSLog(@"numOfRows cannot be used with streaming results"); + return 0; } - (void) dataSeek:(my_ulonglong) row { - NSLog(@"dataSeek cannot be used with streaming results"); + NSLog(@"dataSeek cannot be used with streaming results"); +} + +@end + +@implementation MCPStreamingResult (PrivateAPI) + +/** + * Used internally to download results in a background thread + */ +- (void) _downloadAllData +{ + NSAutoreleasePool *downloadPool = [[NSAutoreleasePool alloc] init]; + MYSQL_ROW theRow; + unsigned long *fieldLengths; + int i, dataCopiedLength, rowDataLength; + LOCAL_ROW_DATA *newRowStore; + + size_t sizeOfLocalRowData = sizeof(LOCAL_ROW_DATA); + size_t sizeOfDataLengths = (size_t)(sizeof(unsigned long) * mNumOfFields); + + // Loop through the rows until the end of the data is reached - indicated via a NULL + while (theRow = mysql_fetch_row(mResult)) { + + // Retrieve the lengths of the returned data + fieldLengths = mysql_fetch_lengths(mResult); + rowDataLength = 0; + dataCopiedLength = 0; + for (i = 0; i < mNumOfFields; i++) + rowDataLength += fieldLengths[i]; + + // Initialise memory for the row and set a NULL pointer for the next item + newRowStore = malloc(sizeOfLocalRowData); + newRowStore->nextRow = NULL; + + // Set up the row data store - a char* - and copy in the data if there is any, + // using a null terminator for each field boundary for easier data processing later + if (rowDataLength) { + newRowStore->data = malloc(sizeof(char) * (rowDataLength + mNumOfFields)); + for (i = 0; i < mNumOfFields; i++) { + if (theRow[i] != NULL) { + memcpy(newRowStore->data+dataCopiedLength, theRow[i], fieldLengths[i]); + newRowStore->data[dataCopiedLength+fieldLengths[i]] = '\0'; + dataCopiedLength += fieldLengths[i] + 1; + } else { + fieldLengths[i] = NSNotFound; + } + } + } else { + newRowStore->data = NULL; + } + + // Set up and copy in the field lengths + newRowStore->dataLengths = memcpy(malloc(sizeOfDataLengths), fieldLengths, sizeOfDataLengths); + + // Add the newly allocated row to end of the storage linked list + if (localDataStore) { + localDataStoreLastEntry->nextRow = newRowStore; + } else { + localDataStore = newRowStore; + } + localDataStoreLastEntry = newRowStore; + if (!currentDataStoreEntry) currentDataStoreEntry = newRowStore; + + // Update the downloaded row count + downloadedRowCount++; + } + + dataDownloaded = YES; + [downloadPool drain]; +} + +/** + * Used internally to free data which has been fully processed; done in a thread to allow + * fetchNextRowAsArray to be faster. + */ +- (void) _freeAllDataWhenDone +{ + NSAutoreleasePool *dataFreeingPool = [[NSAutoreleasePool alloc] init]; + + while (!dataDownloaded || freedRowCount != downloadedRowCount) { + + // If the freed row count matches the processed row count, wait before retrying + if (freedRowCount == processedRowCount) { + usleep(1000); + continue; + } + + // Free a single item off the bottom of the list + // Update the data pointer to the next item in the list, or set to NULL if no more items + LOCAL_ROW_DATA *rowToRemove = localDataStore; + localDataStore = localDataStore->nextRow; + + // Free memory for the first row + rowToRemove->nextRow = NULL; + free(rowToRemove->dataLengths); + if (rowToRemove->data != NULL) free(rowToRemove->data); + free(rowToRemove); + + // Increment the counter + freedRowCount++; + } + + dataFreed = YES; + [dataFreeingPool drain]; } @end
\ No newline at end of file |