diff options
author | rowanbeentje <rowan@beent.je> | 2009-08-20 00:41:30 +0000 |
---|---|---|
committer | rowanbeentje <rowan@beent.je> | 2009-08-20 00:41:30 +0000 |
commit | a3f0cd7a5c21c87f154956cb645cb41b1bb35821 (patch) | |
tree | 444ec8c61df6c41bf4ab7c67b68d1d6666ff244d /Frameworks/MCPKit/MCPFoundationKit | |
parent | b30cc9d877f2dab24901ac9cf5c6e1f1835fb454 (diff) | |
download | sequelpro-a3f0cd7a5c21c87f154956cb645cb41b1bb35821.tar.gz sequelpro-a3f0cd7a5c21c87f154956cb645cb41b1bb35821.tar.bz2 sequelpro-a3f0cd7a5c21c87f154956cb645cb41b1bb35821.zip |
- Change MCPStreamingResult to use a safer streaming mode by default - download all results as fast as possible from the server, to avoid blocking, but do so in a background thread to allow results processing to start as soon as data is available. Many thanks to Hans-Jörg Bibiko for assistance with this.
- Add an option to the SQL export dialog to allow selection of the full-streaming method, with a warning that it may block table UPDATES/INSERTS.
Diffstat (limited to 'Frameworks/MCPKit/MCPFoundationKit')
-rw-r--r-- | Frameworks/MCPKit/MCPFoundationKit/MCPConnection.h | 10 | ||||
-rw-r--r-- | Frameworks/MCPKit/MCPFoundationKit/MCPConnection.m | 34 | ||||
-rw-r--r-- | Frameworks/MCPKit/MCPFoundationKit/MCPResult.h | 1 | ||||
-rw-r--r-- | Frameworks/MCPKit/MCPFoundationKit/MCPResult.m | 18 | ||||
-rw-r--r-- | Frameworks/MCPKit/MCPFoundationKit/MCPStreamingResult.h | 28 | ||||
-rw-r--r-- | Frameworks/MCPKit/MCPFoundationKit/MCPStreamingResult.m | 434 |
6 files changed, 373 insertions, 152 deletions
diff --git a/Frameworks/MCPKit/MCPFoundationKit/MCPConnection.h b/Frameworks/MCPKit/MCPFoundationKit/MCPConnection.h index 932237d8..5b8aefba 100644 --- a/Frameworks/MCPKit/MCPFoundationKit/MCPConnection.h +++ b/Frameworks/MCPKit/MCPFoundationKit/MCPConnection.h @@ -32,6 +32,13 @@ #import "mysql.h" +enum mcp_query_streaming_types +{ + MCP_NO_STREAMING = 0, + MCP_FAST_STREAMING = 1, + MCP_LOWMEM_STREAMING = 2 +}; + @class MCPResult, MCPStreamingResult; @protocol MCPConnectionProxy; @@ -186,7 +193,8 @@ static inline NSData* NSStringDataUsingLossyEncoding(NSString* self, int encodin - (NSString *)quoteObject:(id)theObject; - (MCPResult *)queryString:(NSString *)query; - (MCPStreamingResult *)streamingQueryString:(NSString *)query; -- (id)queryString:(NSString *) query usingEncoding:(NSStringEncoding) encoding streamingResult:(BOOL) streamResult; +- (MCPStreamingResult *)streamingQueryString:(NSString *)query useLowMemoryBlockingStreaming:(BOOL)fullStream; +- (id)queryString:(NSString *) query usingEncoding:(NSStringEncoding) encoding streamingResult:(int) streamResult; - (double)lastQueryExecutionTime; - (my_ulonglong)affectedRows; - (my_ulonglong)insertId; diff --git a/Frameworks/MCPKit/MCPFoundationKit/MCPConnection.m b/Frameworks/MCPKit/MCPFoundationKit/MCPConnection.m index b311ac14..a61f3223 100644 --- a/Frameworks/MCPKit/MCPFoundationKit/MCPConnection.m +++ b/Frameworks/MCPKit/MCPFoundationKit/MCPConnection.m @@ -1231,17 +1231,31 @@ static void forcePingTimeout(int signalNumber) */ - (MCPResult *)queryString:(NSString *)query { - return [self queryString:query usingEncoding:mEncoding streamingResult:NO]; + return [self queryString:query usingEncoding:mEncoding streamingResult:MCP_NO_STREAMING]; } /** * Takes a query string and returns an MCPStreamingResult representing the result of the query. * The returned MCPStreamingResult is retained and the client is responsible for releasing it. * If no fields are present in the result, nil will be returned. + * Uses safe/fast mode, which may use more memory as results are downloaded. */ - (MCPStreamingResult *)streamingQueryString:(NSString *)query { - return [self queryString:query usingEncoding:mEncoding streamingResult:YES]; + return [self queryString:query usingEncoding:mEncoding streamingResult:MCP_FAST_STREAMING]; +} + +/** + * Takes a query string and returns an MCPStreamingResult representing the result of the query. + * The returned MCPStreamingResult is retained and the client is responsible for releasing it. + * If no fields are present in the result, nil will be returned. + * Can be used in either fast/safe mode, where data is downloaded as fast as possible to avoid + * blocking the server, or in full streaming mode for lowest memory usage but potentially blocking + * the table. + */ +- (MCPStreamingResult *)streamingQueryString:(NSString *)query useLowMemoryBlockingStreaming:(BOOL)fullStream +{ + return [self queryString:query usingEncoding:mEncoding streamingResult:(fullStream?MCP_LOWMEM_STREAMING:MCP_FAST_STREAMING)]; } /** @@ -1250,7 +1264,7 @@ static void forcePingTimeout(int signalNumber) * and the result can be returned or the connection and document have been closed. * If using streamingResult, the caller is responsible for releasing the result set. */ -- (id)queryString:(NSString *) query usingEncoding:(NSStringEncoding) encoding streamingResult:(BOOL) streamResult +- (id)queryString:(NSString *) query usingEncoding:(NSStringEncoding) encoding streamingResult:(int) streamResultType { MCPResult *theResult = nil; double queryStartTime, queryExecutionTime; @@ -1354,13 +1368,15 @@ static void forcePingTimeout(int signalNumber) if (mysql_field_count(mConnection) != 0) { // For normal result sets, fetch the results and unlock the connection - if (!streamResult) { + if (streamResultType == MCP_NO_STREAMING) { theResult = [[MCPResult alloc] initWithMySQLPtr:mConnection encoding:mEncoding timeZone:mTimeZone]; [queryLock unlock]; // For streaming result sets, fetch the result pointer and leave the connection locked - } else { - theResult = [[MCPStreamingResult alloc] initWithMySQLPtr:mConnection encoding:mEncoding timeZone:mTimeZone connection:self]; + } else if (streamResultType == MCP_FAST_STREAMING) { + theResult = [[MCPStreamingResult alloc] initWithMySQLPtr:mConnection encoding:mEncoding timeZone:mTimeZone connection:self withFullStreaming:NO]; + } else if (streamResultType == MCP_LOWMEM_STREAMING) { + theResult = [[MCPStreamingResult alloc] initWithMySQLPtr:mConnection encoding:mEncoding timeZone:mTimeZone connection:self withFullStreaming:YES]; } // Ensure no problem occurred during the result fetch @@ -1375,7 +1391,7 @@ static void forcePingTimeout(int signalNumber) queryErrorMessage = [[NSString alloc] initWithString:@""]; queryErrorId = 0; - if (!streamResult) { + if (streamResultType == MCP_NO_STREAMING) { queryAffectedRows = mysql_affected_rows(mConnection); } else { queryAffectedRows = 0; @@ -1398,7 +1414,7 @@ static void forcePingTimeout(int signalNumber) break; } - if (!streamResult) { + if (streamResultType == MCP_NO_STREAMING) { // If the mysql thread id has changed as a result of a connection error, // ensure connection details are still correct @@ -1423,7 +1439,7 @@ static void forcePingTimeout(int signalNumber) (void)(*startKeepAliveTimerResettingStatePtr)(self, startKeepAliveTimerResettingStateSEL, YES); if (!theResult) return nil; - if (streamResult) return theResult; + if (streamResultType != MCP_NO_STREAMING) return theResult; return [theResult autorelease]; } diff --git a/Frameworks/MCPKit/MCPFoundationKit/MCPResult.h b/Frameworks/MCPKit/MCPFoundationKit/MCPResult.h index 1dd7404f..cb66a6cf 100644 --- a/Frameworks/MCPKit/MCPFoundationKit/MCPResult.h +++ b/Frameworks/MCPKit/MCPFoundationKit/MCPResult.h @@ -37,7 +37,6 @@ { MYSQL_RES *mResult; /* The MYSQL_RES structure of the C API. */ NSArray *mNames; /* An NSArray holding the name of the columns. */ - NSDictionary *mMySQLLocales; /* A Locales dictionary to define the locales of MySQL. */ NSStringEncoding mEncoding; /* The encoding used by MySQL server, to ISO-1 default. */ unsigned int mNumOfFields; /* The number of fields in the result. */ NSTimeZone *mTimeZone; /* The time zone of the connection when the query was made. */ diff --git a/Frameworks/MCPKit/MCPFoundationKit/MCPResult.m b/Frameworks/MCPKit/MCPFoundationKit/MCPResult.m index 158d01c8..f3edd9a5 100644 --- a/Frameworks/MCPKit/MCPFoundationKit/MCPResult.m +++ b/Frameworks/MCPKit/MCPFoundationKit/MCPResult.m @@ -253,11 +253,7 @@ const OUR_CHARSET our_charsets60[] = [mNames release]; mNames = NULL; } - - if (mMySQLLocales == NULL) { - mMySQLLocales = [[MCPConnection getMySQLLocales] retain]; - } - + mNumOfFields = 0; } @@ -292,10 +288,6 @@ const OUR_CHARSET our_charsets60[] = else { mNumOfFields = 0; } - - if (mMySQLLocales == NULL) { - mMySQLLocales = [[MCPConnection getMySQLLocales] retain]; - } } return self; @@ -330,10 +322,6 @@ const OUR_CHARSET our_charsets60[] = else { mNumOfFields = 0; } - - if (mMySQLLocales == NULL) { - mMySQLLocales = [[MCPConnection getMySQLLocales] retain]; - } } return self; @@ -1341,10 +1329,6 @@ const OUR_CHARSET our_charsets60[] = [mNames autorelease]; } - if (mMySQLLocales) { - [mMySQLLocales autorelease]; - } - [super dealloc]; } diff --git a/Frameworks/MCPKit/MCPFoundationKit/MCPStreamingResult.h b/Frameworks/MCPKit/MCPFoundationKit/MCPStreamingResult.h index 168f85d3..ae296192 100644 --- a/Frameworks/MCPKit/MCPFoundationKit/MCPStreamingResult.h +++ b/Frameworks/MCPKit/MCPFoundationKit/MCPStreamingResult.h @@ -1,5 +1,5 @@ // -// $Id$ +// $Id$ // // MCPStreamingResult.h // sequel-pro @@ -28,14 +28,34 @@ @class MCPConnection; +typedef struct SP_MYSQL_ROWS { + char *data; + unsigned long *dataLengths; + struct SP_MYSQL_ROWS *nextRow; +} LOCAL_ROW_DATA; + @interface MCPStreamingResult : MCPResult { - MCPConnection *parentConnection; + MCPConnection *parentConnection; - MYSQL_FIELD *fieldDefinitions; + MYSQL_FIELD *fieldDefinitions; + BOOL fullyStreaming; + BOOL dataDownloaded; + BOOL dataFreed; + LOCAL_ROW_DATA *localDataStore; + LOCAL_ROW_DATA *currentDataStoreEntry; + LOCAL_ROW_DATA *localDataStoreLastEntry; + unsigned long localDataRows; + unsigned long localDataAllocated; + unsigned long downloadedRowCount; + unsigned long processedRowCount; + unsigned long freedRowCount; } - (id)initWithMySQLPtr:(MYSQL *)mySQLPtr encoding:(NSStringEncoding)theEncoding timeZone:(NSTimeZone *)theTimeZone connection:(MCPConnection *)theConnection; +- (id)initWithMySQLPtr:(MYSQL *)mySQLPtr encoding:(NSStringEncoding)theEncoding timeZone:(NSTimeZone *)theTimeZone connection:(MCPConnection *)theConnection withFullStreaming:(BOOL)useFullStreaming; + +// Results fetching - (NSArray *)fetchNextRowAsArray; -@end +@end
\ No newline at end of file diff --git a/Frameworks/MCPKit/MCPFoundationKit/MCPStreamingResult.m b/Frameworks/MCPKit/MCPFoundationKit/MCPStreamingResult.m index 5eb147d3..63353056 100644 --- a/Frameworks/MCPKit/MCPFoundationKit/MCPStreamingResult.m +++ b/Frameworks/MCPKit/MCPFoundationKit/MCPStreamingResult.m @@ -1,5 +1,5 @@ // -// $Id$ +// $Id$ // // MCPStreamingResult.m // sequel-pro @@ -31,11 +31,24 @@ /** * IMPORTANT NOTE * - * MCPStreamingResult can produce fast and low-memory result reads, but should not - * be widely used for reads as it can result in MySQL thread or table blocking. + * MCPStreamingResult can operate in two modes. The default mode is a safe implementation, + * which operates in a multithreaded fashion - a worker thread is set up to download the results as + * fast as possible in the background, while the results are made available via a blocking (and so + * single-thread-compatible) fetchNextRowAsArray call. This provides the benefit of allowing a progress + * bar to be shown during downloads, and threaded processing, but still has reasonable memory usage for + * the downloaded result - and won't block the server. + * Alternatively, withFullStreaming: can be set to YES, in which case each row will be accessed on-demand; + * this can be dangerous as it means a SELECT will tie up the server for longer, as for MyISAM tables + * updates (and subsequent reads) must block while a SELECT is still running. However this can be useful + * for certain processes such as working with very large tables to keep memory usage low. */ +@interface MCPStreamingResult (PrivateAPI) +- (void) _downloadAllData; +- (void) _freeAllDataWhenDone; +@end + @implementation MCPStreamingResult : MCPResult #pragma mark - @@ -47,36 +60,64 @@ */ - (id)initWithMySQLPtr:(MYSQL *)mySQLPtr encoding:(NSStringEncoding)theEncoding timeZone:(NSTimeZone *)theTimeZone connection:(MCPConnection *)theConnection { - if ((self = [super init])) { - mEncoding = theEncoding; - mTimeZone = [theTimeZone retain]; - parentConnection = theConnection; - - if (mResult) { - mysql_free_result(mResult); - mResult = NULL; - } - - if (mNames) { - [mNames release]; - mNames = NULL; - } - - mResult = mysql_use_result(mySQLPtr); - - if (mResult) { - mNumOfFields = mysql_num_fields(mResult); - fieldDefinitions = mysql_fetch_fields(mResult); - } else { - mNumOfFields = 0; - } - - if (mMySQLLocales == NULL) { - mMySQLLocales = [[MCPConnection getMySQLLocales] retain]; - } - } - - return self; + return [self initWithMySQLPtr:mySQLPtr encoding:theEncoding timeZone:theTimeZone connection:theConnection withFullStreaming:NO]; + +} + +/** + * Master initialisation method, allowing selection of either full streaming or safe streaming + * (see "important note" above) + */ +- (id)initWithMySQLPtr:(MYSQL *)mySQLPtr encoding:(NSStringEncoding)theEncoding timeZone:(NSTimeZone *)theTimeZone connection:(MCPConnection *)theConnection withFullStreaming:(BOOL)useFullStreaming +{ + if ((self = [super init])) { + mEncoding = theEncoding; + mTimeZone = [theTimeZone retain]; + parentConnection = theConnection; + fullyStreaming = useFullStreaming; + + if (mResult) { + mysql_free_result(mResult); + mResult = NULL; + } + + if (mNames) { + [mNames release]; + mNames = NULL; + } + + mResult = mysql_use_result(mySQLPtr); + + if (mResult) { + mNumOfFields = mysql_num_fields(mResult); + fieldDefinitions = mysql_fetch_fields(mResult); + } else { + mNumOfFields = 0; + } + + // If the result is opened in download-data-fast safe mode, set up the additional variables + // and threads required. + if (!fullyStreaming) { + dataDownloaded = NO; + dataFreed = NO; + localDataStore = NULL; + currentDataStoreEntry = NULL; + localDataStoreLastEntry = NULL; + localDataRows = 0; + localDataAllocated = 0; + downloadedRowCount = 0; + processedRowCount = 0; + freedRowCount = 0; + + // Start the data download thread + [NSThread detachNewThreadSelector:@selector(_downloadAllData) toTarget:self withObject:nil]; + + // Start the data freeing thread + [NSThread detachNewThreadSelector:@selector(_freeAllDataWhenDone) toTarget:self withObject:nil]; + } + } + + return self; } /** @@ -84,9 +125,9 @@ */ - (void) dealloc { - [parentConnection unlockConnection]; - - [super dealloc]; + [parentConnection unlockConnection]; + + [super dealloc]; } #pragma mark - @@ -98,100 +139,149 @@ */ - (NSArray *)fetchNextRowAsArray { - MYSQL_ROW theRow; - unsigned long *fieldLengths; - int i; - NSMutableArray *returnArray; + MYSQL_ROW theRow; + char *theRowData; + unsigned long *fieldLengths; + int i, copiedDataLength; + NSMutableArray *returnArray; - // Retrieve the next row - theRow = mysql_fetch_row(mResult); + // Retrieve the next row according to the mode this result set is in. + // If fully streaming, retrieve the MYSQL_ROW + if (fullyStreaming) { + theRow = mysql_fetch_row(mResult); - // If no data was returned, we're at the end of the result set - return nil. - if (theRow == NULL) return nil; + // If no data was returned, we're at the end of the result set - return nil. + if (theRow == NULL) return nil; - // Retrieve the lengths of the returned data - fieldLengths = mysql_fetch_lengths(mResult); + // Retrieve the lengths of the returned data + fieldLengths = mysql_fetch_lengths(mResult); - // Initialise the array to return - returnArray = [NSMutableArray arrayWithCapacity:mNumOfFields]; - for (i = 0; i < mNumOfFields; i++) { - id cellData; + // If in cached-streaming/fast download mode, get a reference to the data for the current row + } else { + copiedDataLength = 0; + + // Check to see whether we need to wait for the data to be availabe + // - if so, wait 1ms before checking again + while (!dataDownloaded && processedRowCount == downloadedRowCount) { + usleep(1000); + } - // Use NSNulls for the NULL data type - if (theRow[i] == NULL) { - cellData = [NSNull null]; + // If all rows have been processed, we're at the end of the result set - return nil + // once all memory has been freed + if (processedRowCount == downloadedRowCount) { + while (!dataFreed) usleep(1000); + return nil; + } - // Otherwise, switch by data type - } else { + // Retrieve a reference to the data and the associated lengths + theRowData = currentDataStoreEntry->data; + fieldLengths = currentDataStoreEntry->dataLengths; + } - // Create a null-terminated data string for processing - char *theData = calloc(sizeof(char), fieldLengths[i]+1); - memcpy(theData, theRow[i], fieldLengths[i]); - theData[fieldLengths[i]] = '\0'; - - switch (fieldDefinitions[i].type) { - case FIELD_TYPE_TINY: - case FIELD_TYPE_SHORT: - case FIELD_TYPE_INT24: - case FIELD_TYPE_LONG: - case FIELD_TYPE_LONGLONG: - case FIELD_TYPE_DECIMAL: - case FIELD_TYPE_NEWDECIMAL: - case FIELD_TYPE_FLOAT: - case FIELD_TYPE_DOUBLE: - case FIELD_TYPE_TIMESTAMP: - case FIELD_TYPE_DATE: - case FIELD_TYPE_TIME: - case FIELD_TYPE_DATETIME: - case FIELD_TYPE_YEAR: - case FIELD_TYPE_VAR_STRING: - case FIELD_TYPE_STRING: - case FIELD_TYPE_SET: - case FIELD_TYPE_ENUM: - case FIELD_TYPE_NEWDATE: // Don't know what the format for this type is... - cellData = [NSString stringWithCString:theData encoding:mEncoding]; - break; - - case FIELD_TYPE_BIT: - cellData = [NSString stringWithFormat:@"%u", theData[0]]; - break; - - case FIELD_TYPE_TINY_BLOB: - case FIELD_TYPE_BLOB: - case FIELD_TYPE_MEDIUM_BLOB: - case FIELD_TYPE_LONG_BLOB: - - // For binary data, return the data - if (fieldDefinitions[i].flags & BINARY_FLAG) { - cellData = [NSData dataWithBytes:theData length:fieldLengths[i]]; - - // For string data, convert to text - } else { - cellData = [[NSString alloc] initWithBytes:theData length:fieldLengths[i] encoding:mEncoding]; - if (cellData) [cellData autorelease]; - } - break; - - case FIELD_TYPE_NULL: - cellData = [NSNull null]; - break; - - default: - NSLog(@"in fetchNextRowAsArray : Unknown type : %d for column %d, sending back a NSData object", (int)fieldDefinitions[i].type, (int)i); - cellData = [NSData dataWithBytes:theData length:fieldLengths[i]]; - break; + // Initialise the array to return + returnArray = [NSMutableArray arrayWithCapacity:mNumOfFields]; + for (i = 0; i < mNumOfFields; i++) { + id cellData = nil; + char *theData; + + // In fully streaming mode, copy across the data for the MYSQL_ROW + if (fullyStreaming) { + if (theRow[i] == NULL) { + cellData = [NSNull null]; + } else { + theData = calloc(sizeof(char), fieldLengths[i]+1); + memcpy(theData, theRow[i], fieldLengths[i]); + theData[fieldLengths[i]] = '\0'; } + + // In cached-streaming mode, use a reference to the downloaded data + } else { + if (fieldLengths[i] == NSNotFound) { + cellData = [NSNull null]; + } else { + theData = theRowData+copiedDataLength; + copiedDataLength += fieldLengths[i] + 1; + } + } - free(theData); + // If the data hasn't already been detected as NULL - in which case it will have been + // set to NSNull - process the data by type + if (cellData == nil) { - // If a creator returned a nil object, replace with NSNull - if (cellData == nil) cellData = [NSNull null]; - } + switch (fieldDefinitions[i].type) { + case FIELD_TYPE_TINY: + case FIELD_TYPE_SHORT: + case FIELD_TYPE_INT24: + case FIELD_TYPE_LONG: + case FIELD_TYPE_LONGLONG: + case FIELD_TYPE_DECIMAL: + case FIELD_TYPE_NEWDECIMAL: + case FIELD_TYPE_FLOAT: + case FIELD_TYPE_DOUBLE: + case FIELD_TYPE_TIMESTAMP: + case FIELD_TYPE_DATE: + case FIELD_TYPE_TIME: + case FIELD_TYPE_DATETIME: + case FIELD_TYPE_YEAR: + case FIELD_TYPE_VAR_STRING: + case FIELD_TYPE_STRING: + case FIELD_TYPE_SET: + case FIELD_TYPE_ENUM: + case FIELD_TYPE_NEWDATE: // Don't know what the format for this type is... + cellData = [NSString stringWithCString:theData encoding:mEncoding]; + break; + + case FIELD_TYPE_BIT: + cellData = [NSString stringWithFormat:@"%u", theData[0]]; + break; + + case FIELD_TYPE_TINY_BLOB: + case FIELD_TYPE_BLOB: + case FIELD_TYPE_MEDIUM_BLOB: + case FIELD_TYPE_LONG_BLOB: + + // For binary data, return the data + if (fieldDefinitions[i].flags & BINARY_FLAG) { + cellData = [NSData dataWithBytes:theData length:fieldLengths[i]]; + + // For string data, convert to text + } else { + cellData = [[NSString alloc] initWithBytes:theData length:fieldLengths[i] encoding:mEncoding]; + if (cellData) [cellData autorelease]; + } + break; + + case FIELD_TYPE_NULL: + cellData = [NSNull null]; + break; + + default: + NSLog(@"in fetchNextRowAsArray : Unknown type : %d for column %d, sending back a NSData object", (int)fieldDefinitions[i].type, (int)i); + cellData = [NSData dataWithBytes:theData length:fieldLengths[i]]; + break; + } + + // Free the data if it was originally allocated + if (fullyStreaming) free(theData); + + // If a creator returned a nil object, replace with NSNull + if (cellData == nil) cellData = [NSNull null]; + } + + [returnArray insertObject:cellData atIndex:i]; + } - [returnArray insertObject:cellData atIndex:i]; + // If in cached-streaming mode, update the current entry processed count + if (!fullyStreaming) { + + // Update the active-data pointer to the next item in the list, or set to NULL if no more items + currentDataStoreEntry = currentDataStoreEntry->nextRow; + + // Increment counter + processedRowCount++; } - return returnArray; + return returnArray; } #pragma mark - @@ -199,13 +289,117 @@ - (my_ulonglong) numOfRows { - NSLog(@"numOfRows cannot be used with streaming results"); - return 0; + NSLog(@"numOfRows cannot be used with streaming results"); + return 0; } - (void) dataSeek:(my_ulonglong) row { - NSLog(@"dataSeek cannot be used with streaming results"); + NSLog(@"dataSeek cannot be used with streaming results"); +} + +@end + +@implementation MCPStreamingResult (PrivateAPI) + +/** + * Used internally to download results in a background thread + */ +- (void) _downloadAllData +{ + NSAutoreleasePool *downloadPool = [[NSAutoreleasePool alloc] init]; + MYSQL_ROW theRow; + unsigned long *fieldLengths; + int i, dataCopiedLength, rowDataLength; + LOCAL_ROW_DATA *newRowStore; + + size_t sizeOfLocalRowData = sizeof(LOCAL_ROW_DATA); + size_t sizeOfDataLengths = (size_t)(sizeof(unsigned long) * mNumOfFields); + + // Loop through the rows until the end of the data is reached - indicated via a NULL + while (theRow = mysql_fetch_row(mResult)) { + + // Retrieve the lengths of the returned data + fieldLengths = mysql_fetch_lengths(mResult); + rowDataLength = 0; + dataCopiedLength = 0; + for (i = 0; i < mNumOfFields; i++) + rowDataLength += fieldLengths[i]; + + // Initialise memory for the row and set a NULL pointer for the next item + newRowStore = malloc(sizeOfLocalRowData); + newRowStore->nextRow = NULL; + + // Set up the row data store - a char* - and copy in the data if there is any, + // using a null terminator for each field boundary for easier data processing later + if (rowDataLength) { + newRowStore->data = malloc(sizeof(char) * (rowDataLength + mNumOfFields)); + for (i = 0; i < mNumOfFields; i++) { + if (theRow[i] != NULL) { + memcpy(newRowStore->data+dataCopiedLength, theRow[i], fieldLengths[i]); + newRowStore->data[dataCopiedLength+fieldLengths[i]] = '\0'; + dataCopiedLength += fieldLengths[i] + 1; + } else { + fieldLengths[i] = NSNotFound; + } + } + } else { + newRowStore->data = NULL; + } + + // Set up and copy in the field lengths + newRowStore->dataLengths = memcpy(malloc(sizeOfDataLengths), fieldLengths, sizeOfDataLengths); + + // Add the newly allocated row to end of the storage linked list + if (localDataStore) { + localDataStoreLastEntry->nextRow = newRowStore; + } else { + localDataStore = newRowStore; + } + localDataStoreLastEntry = newRowStore; + if (!currentDataStoreEntry) currentDataStoreEntry = newRowStore; + + // Update the downloaded row count + downloadedRowCount++; + } + + dataDownloaded = YES; + [downloadPool drain]; +} + +/** + * Used internally to free data which has been fully processed; done in a thread to allow + * fetchNextRowAsArray to be faster. + */ +- (void) _freeAllDataWhenDone +{ + NSAutoreleasePool *dataFreeingPool = [[NSAutoreleasePool alloc] init]; + + while (!dataDownloaded || freedRowCount != downloadedRowCount) { + + // If the freed row count matches the processed row count, wait before retrying + if (freedRowCount == processedRowCount) { + usleep(1000); + continue; + } + + // Free a single item off the bottom of the list + // Update the data pointer to the next item in the list, or set to NULL if no more items + LOCAL_ROW_DATA *rowToRemove = localDataStore; + localDataStore = localDataStore->nextRow; + + // Free memory for the first row + rowToRemove->nextRow = NULL; + free(rowToRemove->dataLengths); + if (rowToRemove->data != NULL) free(rowToRemove->data); + free(rowToRemove); + + // Increment the counter + freedRowCount++; + } + + dataFreed = YES; + [dataFreeingPool drain]; } @end
\ No newline at end of file |