aboutsummaryrefslogtreecommitdiffstats
path: root/Frameworks/MCPKit/MCPFoundationKit/MCPStreamingResult.m
diff options
context:
space:
mode:
Diffstat (limited to 'Frameworks/MCPKit/MCPFoundationKit/MCPStreamingResult.m')
-rw-r--r--Frameworks/MCPKit/MCPFoundationKit/MCPStreamingResult.m434
1 files changed, 314 insertions, 120 deletions
diff --git a/Frameworks/MCPKit/MCPFoundationKit/MCPStreamingResult.m b/Frameworks/MCPKit/MCPFoundationKit/MCPStreamingResult.m
index 5eb147d3..63353056 100644
--- a/Frameworks/MCPKit/MCPFoundationKit/MCPStreamingResult.m
+++ b/Frameworks/MCPKit/MCPFoundationKit/MCPStreamingResult.m
@@ -1,5 +1,5 @@
//
-// $Id$
+// $Id$
//
// MCPStreamingResult.m
// sequel-pro
@@ -31,11 +31,24 @@
/**
* IMPORTANT NOTE
*
- * MCPStreamingResult can produce fast and low-memory result reads, but should not
- * be widely used for reads as it can result in MySQL thread or table blocking.
+ * MCPStreamingResult can operate in two modes. The default mode is a safe implementation,
+ * which operates in a multithreaded fashion - a worker thread is set up to download the results as
+ * fast as possible in the background, while the results are made available via a blocking (and so
+ * single-thread-compatible) fetchNextRowAsArray call. This provides the benefit of allowing a progress
+ * bar to be shown during downloads, and threaded processing, but still has reasonable memory usage for
+ * the downloaded result - and won't block the server.
+ * Alternatively, withFullStreaming: can be set to YES, in which case each row will be accessed on-demand;
+ * this can be dangerous as it means a SELECT will tie up the server for longer, as for MyISAM tables
+ * updates (and subsequent reads) must block while a SELECT is still running. However this can be useful
+ * for certain processes such as working with very large tables to keep memory usage low.
*/
+@interface MCPStreamingResult (PrivateAPI)
+- (void) _downloadAllData;
+- (void) _freeAllDataWhenDone;
+@end
+
@implementation MCPStreamingResult : MCPResult
#pragma mark -
@@ -47,36 +60,64 @@
*/
- (id)initWithMySQLPtr:(MYSQL *)mySQLPtr encoding:(NSStringEncoding)theEncoding timeZone:(NSTimeZone *)theTimeZone connection:(MCPConnection *)theConnection
{
- if ((self = [super init])) {
- mEncoding = theEncoding;
- mTimeZone = [theTimeZone retain];
- parentConnection = theConnection;
-
- if (mResult) {
- mysql_free_result(mResult);
- mResult = NULL;
- }
-
- if (mNames) {
- [mNames release];
- mNames = NULL;
- }
-
- mResult = mysql_use_result(mySQLPtr);
-
- if (mResult) {
- mNumOfFields = mysql_num_fields(mResult);
- fieldDefinitions = mysql_fetch_fields(mResult);
- } else {
- mNumOfFields = 0;
- }
-
- if (mMySQLLocales == NULL) {
- mMySQLLocales = [[MCPConnection getMySQLLocales] retain];
- }
- }
-
- return self;
+ return [self initWithMySQLPtr:mySQLPtr encoding:theEncoding timeZone:theTimeZone connection:theConnection withFullStreaming:NO];
+
+}
+
+/**
+ * Master initialisation method, allowing selection of either full streaming or safe streaming
+ * (see "important note" above)
+ */
+- (id)initWithMySQLPtr:(MYSQL *)mySQLPtr encoding:(NSStringEncoding)theEncoding timeZone:(NSTimeZone *)theTimeZone connection:(MCPConnection *)theConnection withFullStreaming:(BOOL)useFullStreaming
+{
+ if ((self = [super init])) {
+ mEncoding = theEncoding;
+ mTimeZone = [theTimeZone retain];
+ parentConnection = theConnection;
+ fullyStreaming = useFullStreaming;
+
+ if (mResult) {
+ mysql_free_result(mResult);
+ mResult = NULL;
+ }
+
+ if (mNames) {
+ [mNames release];
+ mNames = NULL;
+ }
+
+ mResult = mysql_use_result(mySQLPtr);
+
+ if (mResult) {
+ mNumOfFields = mysql_num_fields(mResult);
+ fieldDefinitions = mysql_fetch_fields(mResult);
+ } else {
+ mNumOfFields = 0;
+ }
+
+ // If the result is opened in download-data-fast safe mode, set up the additional variables
+ // and threads required.
+ if (!fullyStreaming) {
+ dataDownloaded = NO;
+ dataFreed = NO;
+ localDataStore = NULL;
+ currentDataStoreEntry = NULL;
+ localDataStoreLastEntry = NULL;
+ localDataRows = 0;
+ localDataAllocated = 0;
+ downloadedRowCount = 0;
+ processedRowCount = 0;
+ freedRowCount = 0;
+
+ // Start the data download thread
+ [NSThread detachNewThreadSelector:@selector(_downloadAllData) toTarget:self withObject:nil];
+
+ // Start the data freeing thread
+ [NSThread detachNewThreadSelector:@selector(_freeAllDataWhenDone) toTarget:self withObject:nil];
+ }
+ }
+
+ return self;
}
/**
@@ -84,9 +125,9 @@
*/
- (void) dealloc
{
- [parentConnection unlockConnection];
-
- [super dealloc];
+ [parentConnection unlockConnection];
+
+ [super dealloc];
}
#pragma mark -
@@ -98,100 +139,149 @@
*/
- (NSArray *)fetchNextRowAsArray
{
- MYSQL_ROW theRow;
- unsigned long *fieldLengths;
- int i;
- NSMutableArray *returnArray;
+ MYSQL_ROW theRow;
+ char *theRowData;
+ unsigned long *fieldLengths;
+ int i, copiedDataLength;
+ NSMutableArray *returnArray;
- // Retrieve the next row
- theRow = mysql_fetch_row(mResult);
+ // Retrieve the next row according to the mode this result set is in.
+ // If fully streaming, retrieve the MYSQL_ROW
+ if (fullyStreaming) {
+ theRow = mysql_fetch_row(mResult);
- // If no data was returned, we're at the end of the result set - return nil.
- if (theRow == NULL) return nil;
+ // If no data was returned, we're at the end of the result set - return nil.
+ if (theRow == NULL) return nil;
- // Retrieve the lengths of the returned data
- fieldLengths = mysql_fetch_lengths(mResult);
+ // Retrieve the lengths of the returned data
+ fieldLengths = mysql_fetch_lengths(mResult);
- // Initialise the array to return
- returnArray = [NSMutableArray arrayWithCapacity:mNumOfFields];
- for (i = 0; i < mNumOfFields; i++) {
- id cellData;
+ // If in cached-streaming/fast download mode, get a reference to the data for the current row
+ } else {
+ copiedDataLength = 0;
+
+ // Check to see whether we need to wait for the data to be availabe
+ // - if so, wait 1ms before checking again
+ while (!dataDownloaded && processedRowCount == downloadedRowCount) {
+ usleep(1000);
+ }
- // Use NSNulls for the NULL data type
- if (theRow[i] == NULL) {
- cellData = [NSNull null];
+ // If all rows have been processed, we're at the end of the result set - return nil
+ // once all memory has been freed
+ if (processedRowCount == downloadedRowCount) {
+ while (!dataFreed) usleep(1000);
+ return nil;
+ }
- // Otherwise, switch by data type
- } else {
+ // Retrieve a reference to the data and the associated lengths
+ theRowData = currentDataStoreEntry->data;
+ fieldLengths = currentDataStoreEntry->dataLengths;
+ }
- // Create a null-terminated data string for processing
- char *theData = calloc(sizeof(char), fieldLengths[i]+1);
- memcpy(theData, theRow[i], fieldLengths[i]);
- theData[fieldLengths[i]] = '\0';
-
- switch (fieldDefinitions[i].type) {
- case FIELD_TYPE_TINY:
- case FIELD_TYPE_SHORT:
- case FIELD_TYPE_INT24:
- case FIELD_TYPE_LONG:
- case FIELD_TYPE_LONGLONG:
- case FIELD_TYPE_DECIMAL:
- case FIELD_TYPE_NEWDECIMAL:
- case FIELD_TYPE_FLOAT:
- case FIELD_TYPE_DOUBLE:
- case FIELD_TYPE_TIMESTAMP:
- case FIELD_TYPE_DATE:
- case FIELD_TYPE_TIME:
- case FIELD_TYPE_DATETIME:
- case FIELD_TYPE_YEAR:
- case FIELD_TYPE_VAR_STRING:
- case FIELD_TYPE_STRING:
- case FIELD_TYPE_SET:
- case FIELD_TYPE_ENUM:
- case FIELD_TYPE_NEWDATE: // Don't know what the format for this type is...
- cellData = [NSString stringWithCString:theData encoding:mEncoding];
- break;
-
- case FIELD_TYPE_BIT:
- cellData = [NSString stringWithFormat:@"%u", theData[0]];
- break;
-
- case FIELD_TYPE_TINY_BLOB:
- case FIELD_TYPE_BLOB:
- case FIELD_TYPE_MEDIUM_BLOB:
- case FIELD_TYPE_LONG_BLOB:
-
- // For binary data, return the data
- if (fieldDefinitions[i].flags & BINARY_FLAG) {
- cellData = [NSData dataWithBytes:theData length:fieldLengths[i]];
-
- // For string data, convert to text
- } else {
- cellData = [[NSString alloc] initWithBytes:theData length:fieldLengths[i] encoding:mEncoding];
- if (cellData) [cellData autorelease];
- }
- break;
-
- case FIELD_TYPE_NULL:
- cellData = [NSNull null];
- break;
-
- default:
- NSLog(@"in fetchNextRowAsArray : Unknown type : %d for column %d, sending back a NSData object", (int)fieldDefinitions[i].type, (int)i);
- cellData = [NSData dataWithBytes:theData length:fieldLengths[i]];
- break;
+ // Initialise the array to return
+ returnArray = [NSMutableArray arrayWithCapacity:mNumOfFields];
+ for (i = 0; i < mNumOfFields; i++) {
+ id cellData = nil;
+ char *theData;
+
+ // In fully streaming mode, copy across the data for the MYSQL_ROW
+ if (fullyStreaming) {
+ if (theRow[i] == NULL) {
+ cellData = [NSNull null];
+ } else {
+ theData = calloc(sizeof(char), fieldLengths[i]+1);
+ memcpy(theData, theRow[i], fieldLengths[i]);
+ theData[fieldLengths[i]] = '\0';
}
+
+ // In cached-streaming mode, use a reference to the downloaded data
+ } else {
+ if (fieldLengths[i] == NSNotFound) {
+ cellData = [NSNull null];
+ } else {
+ theData = theRowData+copiedDataLength;
+ copiedDataLength += fieldLengths[i] + 1;
+ }
+ }
- free(theData);
+ // If the data hasn't already been detected as NULL - in which case it will have been
+ // set to NSNull - process the data by type
+ if (cellData == nil) {
- // If a creator returned a nil object, replace with NSNull
- if (cellData == nil) cellData = [NSNull null];
- }
+ switch (fieldDefinitions[i].type) {
+ case FIELD_TYPE_TINY:
+ case FIELD_TYPE_SHORT:
+ case FIELD_TYPE_INT24:
+ case FIELD_TYPE_LONG:
+ case FIELD_TYPE_LONGLONG:
+ case FIELD_TYPE_DECIMAL:
+ case FIELD_TYPE_NEWDECIMAL:
+ case FIELD_TYPE_FLOAT:
+ case FIELD_TYPE_DOUBLE:
+ case FIELD_TYPE_TIMESTAMP:
+ case FIELD_TYPE_DATE:
+ case FIELD_TYPE_TIME:
+ case FIELD_TYPE_DATETIME:
+ case FIELD_TYPE_YEAR:
+ case FIELD_TYPE_VAR_STRING:
+ case FIELD_TYPE_STRING:
+ case FIELD_TYPE_SET:
+ case FIELD_TYPE_ENUM:
+ case FIELD_TYPE_NEWDATE: // Don't know what the format for this type is...
+ cellData = [NSString stringWithCString:theData encoding:mEncoding];
+ break;
+
+ case FIELD_TYPE_BIT:
+ cellData = [NSString stringWithFormat:@"%u", theData[0]];
+ break;
+
+ case FIELD_TYPE_TINY_BLOB:
+ case FIELD_TYPE_BLOB:
+ case FIELD_TYPE_MEDIUM_BLOB:
+ case FIELD_TYPE_LONG_BLOB:
+
+ // For binary data, return the data
+ if (fieldDefinitions[i].flags & BINARY_FLAG) {
+ cellData = [NSData dataWithBytes:theData length:fieldLengths[i]];
+
+ // For string data, convert to text
+ } else {
+ cellData = [[NSString alloc] initWithBytes:theData length:fieldLengths[i] encoding:mEncoding];
+ if (cellData) [cellData autorelease];
+ }
+ break;
+
+ case FIELD_TYPE_NULL:
+ cellData = [NSNull null];
+ break;
+
+ default:
+ NSLog(@"in fetchNextRowAsArray : Unknown type : %d for column %d, sending back a NSData object", (int)fieldDefinitions[i].type, (int)i);
+ cellData = [NSData dataWithBytes:theData length:fieldLengths[i]];
+ break;
+ }
+
+ // Free the data if it was originally allocated
+ if (fullyStreaming) free(theData);
+
+ // If a creator returned a nil object, replace with NSNull
+ if (cellData == nil) cellData = [NSNull null];
+ }
+
+ [returnArray insertObject:cellData atIndex:i];
+ }
- [returnArray insertObject:cellData atIndex:i];
+ // If in cached-streaming mode, update the current entry processed count
+ if (!fullyStreaming) {
+
+ // Update the active-data pointer to the next item in the list, or set to NULL if no more items
+ currentDataStoreEntry = currentDataStoreEntry->nextRow;
+
+ // Increment counter
+ processedRowCount++;
}
- return returnArray;
+ return returnArray;
}
#pragma mark -
@@ -199,13 +289,117 @@
- (my_ulonglong) numOfRows
{
- NSLog(@"numOfRows cannot be used with streaming results");
- return 0;
+ NSLog(@"numOfRows cannot be used with streaming results");
+ return 0;
}
- (void) dataSeek:(my_ulonglong) row
{
- NSLog(@"dataSeek cannot be used with streaming results");
+ NSLog(@"dataSeek cannot be used with streaming results");
+}
+
+@end
+
+@implementation MCPStreamingResult (PrivateAPI)
+
+/**
+ * Used internally to download results in a background thread
+ */
+- (void) _downloadAllData
+{
+ NSAutoreleasePool *downloadPool = [[NSAutoreleasePool alloc] init];
+ MYSQL_ROW theRow;
+ unsigned long *fieldLengths;
+ int i, dataCopiedLength, rowDataLength;
+ LOCAL_ROW_DATA *newRowStore;
+
+ size_t sizeOfLocalRowData = sizeof(LOCAL_ROW_DATA);
+ size_t sizeOfDataLengths = (size_t)(sizeof(unsigned long) * mNumOfFields);
+
+ // Loop through the rows until the end of the data is reached - indicated via a NULL
+ while (theRow = mysql_fetch_row(mResult)) {
+
+ // Retrieve the lengths of the returned data
+ fieldLengths = mysql_fetch_lengths(mResult);
+ rowDataLength = 0;
+ dataCopiedLength = 0;
+ for (i = 0; i < mNumOfFields; i++)
+ rowDataLength += fieldLengths[i];
+
+ // Initialise memory for the row and set a NULL pointer for the next item
+ newRowStore = malloc(sizeOfLocalRowData);
+ newRowStore->nextRow = NULL;
+
+ // Set up the row data store - a char* - and copy in the data if there is any,
+ // using a null terminator for each field boundary for easier data processing later
+ if (rowDataLength) {
+ newRowStore->data = malloc(sizeof(char) * (rowDataLength + mNumOfFields));
+ for (i = 0; i < mNumOfFields; i++) {
+ if (theRow[i] != NULL) {
+ memcpy(newRowStore->data+dataCopiedLength, theRow[i], fieldLengths[i]);
+ newRowStore->data[dataCopiedLength+fieldLengths[i]] = '\0';
+ dataCopiedLength += fieldLengths[i] + 1;
+ } else {
+ fieldLengths[i] = NSNotFound;
+ }
+ }
+ } else {
+ newRowStore->data = NULL;
+ }
+
+ // Set up and copy in the field lengths
+ newRowStore->dataLengths = memcpy(malloc(sizeOfDataLengths), fieldLengths, sizeOfDataLengths);
+
+ // Add the newly allocated row to end of the storage linked list
+ if (localDataStore) {
+ localDataStoreLastEntry->nextRow = newRowStore;
+ } else {
+ localDataStore = newRowStore;
+ }
+ localDataStoreLastEntry = newRowStore;
+ if (!currentDataStoreEntry) currentDataStoreEntry = newRowStore;
+
+ // Update the downloaded row count
+ downloadedRowCount++;
+ }
+
+ dataDownloaded = YES;
+ [downloadPool drain];
+}
+
+/**
+ * Used internally to free data which has been fully processed; done in a thread to allow
+ * fetchNextRowAsArray to be faster.
+ */
+- (void) _freeAllDataWhenDone
+{
+ NSAutoreleasePool *dataFreeingPool = [[NSAutoreleasePool alloc] init];
+
+ while (!dataDownloaded || freedRowCount != downloadedRowCount) {
+
+ // If the freed row count matches the processed row count, wait before retrying
+ if (freedRowCount == processedRowCount) {
+ usleep(1000);
+ continue;
+ }
+
+ // Free a single item off the bottom of the list
+ // Update the data pointer to the next item in the list, or set to NULL if no more items
+ LOCAL_ROW_DATA *rowToRemove = localDataStore;
+ localDataStore = localDataStore->nextRow;
+
+ // Free memory for the first row
+ rowToRemove->nextRow = NULL;
+ free(rowToRemove->dataLengths);
+ if (rowToRemove->data != NULL) free(rowToRemove->data);
+ free(rowToRemove);
+
+ // Increment the counter
+ freedRowCount++;
+ }
+
+ dataFreed = YES;
+ [dataFreeingPool drain];
}
@end \ No newline at end of file