aboutsummaryrefslogtreecommitdiffstats
path: root/Frameworks/MCPKit/MCPFoundationKit
diff options
context:
space:
mode:
authorrowanbeentje <rowan@beent.je>2009-08-20 00:41:30 +0000
committerrowanbeentje <rowan@beent.je>2009-08-20 00:41:30 +0000
commita3f0cd7a5c21c87f154956cb645cb41b1bb35821 (patch)
tree444ec8c61df6c41bf4ab7c67b68d1d6666ff244d /Frameworks/MCPKit/MCPFoundationKit
parentb30cc9d877f2dab24901ac9cf5c6e1f1835fb454 (diff)
downloadsequelpro-a3f0cd7a5c21c87f154956cb645cb41b1bb35821.tar.gz
sequelpro-a3f0cd7a5c21c87f154956cb645cb41b1bb35821.tar.bz2
sequelpro-a3f0cd7a5c21c87f154956cb645cb41b1bb35821.zip
- Change MCPStreamingResult to use a safer streaming mode by default - download all results as fast as possible from the server, to avoid blocking, but do so in a background thread to allow results processing to start as soon as data is available. Many thanks to Hans-Jörg Bibiko for assistance with this.
- Add an option to the SQL export dialog to allow selection of the full-streaming method, with a warning that it may block table UPDATES/INSERTS.
Diffstat (limited to 'Frameworks/MCPKit/MCPFoundationKit')
-rw-r--r--Frameworks/MCPKit/MCPFoundationKit/MCPConnection.h10
-rw-r--r--Frameworks/MCPKit/MCPFoundationKit/MCPConnection.m34
-rw-r--r--Frameworks/MCPKit/MCPFoundationKit/MCPResult.h1
-rw-r--r--Frameworks/MCPKit/MCPFoundationKit/MCPResult.m18
-rw-r--r--Frameworks/MCPKit/MCPFoundationKit/MCPStreamingResult.h28
-rw-r--r--Frameworks/MCPKit/MCPFoundationKit/MCPStreamingResult.m434
6 files changed, 373 insertions, 152 deletions
diff --git a/Frameworks/MCPKit/MCPFoundationKit/MCPConnection.h b/Frameworks/MCPKit/MCPFoundationKit/MCPConnection.h
index 932237d8..5b8aefba 100644
--- a/Frameworks/MCPKit/MCPFoundationKit/MCPConnection.h
+++ b/Frameworks/MCPKit/MCPFoundationKit/MCPConnection.h
@@ -32,6 +32,13 @@
#import "mysql.h"
+enum mcp_query_streaming_types
+{
+ MCP_NO_STREAMING = 0,
+ MCP_FAST_STREAMING = 1,
+ MCP_LOWMEM_STREAMING = 2
+};
+
@class MCPResult, MCPStreamingResult;
@protocol MCPConnectionProxy;
@@ -186,7 +193,8 @@ static inline NSData* NSStringDataUsingLossyEncoding(NSString* self, int encodin
- (NSString *)quoteObject:(id)theObject;
- (MCPResult *)queryString:(NSString *)query;
- (MCPStreamingResult *)streamingQueryString:(NSString *)query;
-- (id)queryString:(NSString *) query usingEncoding:(NSStringEncoding) encoding streamingResult:(BOOL) streamResult;
+- (MCPStreamingResult *)streamingQueryString:(NSString *)query useLowMemoryBlockingStreaming:(BOOL)fullStream;
+- (id)queryString:(NSString *) query usingEncoding:(NSStringEncoding) encoding streamingResult:(int) streamResult;
- (double)lastQueryExecutionTime;
- (my_ulonglong)affectedRows;
- (my_ulonglong)insertId;
diff --git a/Frameworks/MCPKit/MCPFoundationKit/MCPConnection.m b/Frameworks/MCPKit/MCPFoundationKit/MCPConnection.m
index b311ac14..a61f3223 100644
--- a/Frameworks/MCPKit/MCPFoundationKit/MCPConnection.m
+++ b/Frameworks/MCPKit/MCPFoundationKit/MCPConnection.m
@@ -1231,17 +1231,31 @@ static void forcePingTimeout(int signalNumber)
*/
- (MCPResult *)queryString:(NSString *)query
{
- return [self queryString:query usingEncoding:mEncoding streamingResult:NO];
+ return [self queryString:query usingEncoding:mEncoding streamingResult:MCP_NO_STREAMING];
}
/**
* Takes a query string and returns an MCPStreamingResult representing the result of the query.
* The returned MCPStreamingResult is retained and the client is responsible for releasing it.
* If no fields are present in the result, nil will be returned.
+ * Uses safe/fast mode, which may use more memory as results are downloaded.
*/
- (MCPStreamingResult *)streamingQueryString:(NSString *)query
{
- return [self queryString:query usingEncoding:mEncoding streamingResult:YES];
+ return [self queryString:query usingEncoding:mEncoding streamingResult:MCP_FAST_STREAMING];
+}
+
+/**
+ * Takes a query string and returns an MCPStreamingResult representing the result of the query.
+ * The returned MCPStreamingResult is retained and the client is responsible for releasing it.
+ * If no fields are present in the result, nil will be returned.
+ * Can be used in either fast/safe mode, where data is downloaded as fast as possible to avoid
+ * blocking the server, or in full streaming mode for lowest memory usage but potentially blocking
+ * the table.
+ */
+- (MCPStreamingResult *)streamingQueryString:(NSString *)query useLowMemoryBlockingStreaming:(BOOL)fullStream
+{
+ return [self queryString:query usingEncoding:mEncoding streamingResult:(fullStream?MCP_LOWMEM_STREAMING:MCP_FAST_STREAMING)];
}
/**
@@ -1250,7 +1264,7 @@ static void forcePingTimeout(int signalNumber)
* and the result can be returned or the connection and document have been closed.
* If using streamingResult, the caller is responsible for releasing the result set.
*/
-- (id)queryString:(NSString *) query usingEncoding:(NSStringEncoding) encoding streamingResult:(BOOL) streamResult
+- (id)queryString:(NSString *) query usingEncoding:(NSStringEncoding) encoding streamingResult:(int) streamResultType
{
MCPResult *theResult = nil;
double queryStartTime, queryExecutionTime;
@@ -1354,13 +1368,15 @@ static void forcePingTimeout(int signalNumber)
if (mysql_field_count(mConnection) != 0) {
// For normal result sets, fetch the results and unlock the connection
- if (!streamResult) {
+ if (streamResultType == MCP_NO_STREAMING) {
theResult = [[MCPResult alloc] initWithMySQLPtr:mConnection encoding:mEncoding timeZone:mTimeZone];
[queryLock unlock];
// For streaming result sets, fetch the result pointer and leave the connection locked
- } else {
- theResult = [[MCPStreamingResult alloc] initWithMySQLPtr:mConnection encoding:mEncoding timeZone:mTimeZone connection:self];
+ } else if (streamResultType == MCP_FAST_STREAMING) {
+ theResult = [[MCPStreamingResult alloc] initWithMySQLPtr:mConnection encoding:mEncoding timeZone:mTimeZone connection:self withFullStreaming:NO];
+ } else if (streamResultType == MCP_LOWMEM_STREAMING) {
+ theResult = [[MCPStreamingResult alloc] initWithMySQLPtr:mConnection encoding:mEncoding timeZone:mTimeZone connection:self withFullStreaming:YES];
}
// Ensure no problem occurred during the result fetch
@@ -1375,7 +1391,7 @@ static void forcePingTimeout(int signalNumber)
queryErrorMessage = [[NSString alloc] initWithString:@""];
queryErrorId = 0;
- if (!streamResult) {
+ if (streamResultType == MCP_NO_STREAMING) {
queryAffectedRows = mysql_affected_rows(mConnection);
} else {
queryAffectedRows = 0;
@@ -1398,7 +1414,7 @@ static void forcePingTimeout(int signalNumber)
break;
}
- if (!streamResult) {
+ if (streamResultType == MCP_NO_STREAMING) {
// If the mysql thread id has changed as a result of a connection error,
// ensure connection details are still correct
@@ -1423,7 +1439,7 @@ static void forcePingTimeout(int signalNumber)
(void)(*startKeepAliveTimerResettingStatePtr)(self, startKeepAliveTimerResettingStateSEL, YES);
if (!theResult) return nil;
- if (streamResult) return theResult;
+ if (streamResultType != MCP_NO_STREAMING) return theResult;
return [theResult autorelease];
}
diff --git a/Frameworks/MCPKit/MCPFoundationKit/MCPResult.h b/Frameworks/MCPKit/MCPFoundationKit/MCPResult.h
index 1dd7404f..cb66a6cf 100644
--- a/Frameworks/MCPKit/MCPFoundationKit/MCPResult.h
+++ b/Frameworks/MCPKit/MCPFoundationKit/MCPResult.h
@@ -37,7 +37,6 @@
{
MYSQL_RES *mResult; /* The MYSQL_RES structure of the C API. */
NSArray *mNames; /* An NSArray holding the name of the columns. */
- NSDictionary *mMySQLLocales; /* A Locales dictionary to define the locales of MySQL. */
NSStringEncoding mEncoding; /* The encoding used by MySQL server, to ISO-1 default. */
unsigned int mNumOfFields; /* The number of fields in the result. */
NSTimeZone *mTimeZone; /* The time zone of the connection when the query was made. */
diff --git a/Frameworks/MCPKit/MCPFoundationKit/MCPResult.m b/Frameworks/MCPKit/MCPFoundationKit/MCPResult.m
index 158d01c8..f3edd9a5 100644
--- a/Frameworks/MCPKit/MCPFoundationKit/MCPResult.m
+++ b/Frameworks/MCPKit/MCPFoundationKit/MCPResult.m
@@ -253,11 +253,7 @@ const OUR_CHARSET our_charsets60[] =
[mNames release];
mNames = NULL;
}
-
- if (mMySQLLocales == NULL) {
- mMySQLLocales = [[MCPConnection getMySQLLocales] retain];
- }
-
+
mNumOfFields = 0;
}
@@ -292,10 +288,6 @@ const OUR_CHARSET our_charsets60[] =
else {
mNumOfFields = 0;
}
-
- if (mMySQLLocales == NULL) {
- mMySQLLocales = [[MCPConnection getMySQLLocales] retain];
- }
}
return self;
@@ -330,10 +322,6 @@ const OUR_CHARSET our_charsets60[] =
else {
mNumOfFields = 0;
}
-
- if (mMySQLLocales == NULL) {
- mMySQLLocales = [[MCPConnection getMySQLLocales] retain];
- }
}
return self;
@@ -1341,10 +1329,6 @@ const OUR_CHARSET our_charsets60[] =
[mNames autorelease];
}
- if (mMySQLLocales) {
- [mMySQLLocales autorelease];
- }
-
[super dealloc];
}
diff --git a/Frameworks/MCPKit/MCPFoundationKit/MCPStreamingResult.h b/Frameworks/MCPKit/MCPFoundationKit/MCPStreamingResult.h
index 168f85d3..ae296192 100644
--- a/Frameworks/MCPKit/MCPFoundationKit/MCPStreamingResult.h
+++ b/Frameworks/MCPKit/MCPFoundationKit/MCPStreamingResult.h
@@ -1,5 +1,5 @@
//
-// $Id$
+// $Id$
//
// MCPStreamingResult.h
// sequel-pro
@@ -28,14 +28,34 @@
@class MCPConnection;
+typedef struct SP_MYSQL_ROWS {
+ char *data;
+ unsigned long *dataLengths;
+ struct SP_MYSQL_ROWS *nextRow;
+} LOCAL_ROW_DATA;
+
@interface MCPStreamingResult : MCPResult
{
- MCPConnection *parentConnection;
+ MCPConnection *parentConnection;
- MYSQL_FIELD *fieldDefinitions;
+ MYSQL_FIELD *fieldDefinitions;
+ BOOL fullyStreaming;
+ BOOL dataDownloaded;
+ BOOL dataFreed;
+ LOCAL_ROW_DATA *localDataStore;
+ LOCAL_ROW_DATA *currentDataStoreEntry;
+ LOCAL_ROW_DATA *localDataStoreLastEntry;
+ unsigned long localDataRows;
+ unsigned long localDataAllocated;
+ unsigned long downloadedRowCount;
+ unsigned long processedRowCount;
+ unsigned long freedRowCount;
}
- (id)initWithMySQLPtr:(MYSQL *)mySQLPtr encoding:(NSStringEncoding)theEncoding timeZone:(NSTimeZone *)theTimeZone connection:(MCPConnection *)theConnection;
+- (id)initWithMySQLPtr:(MYSQL *)mySQLPtr encoding:(NSStringEncoding)theEncoding timeZone:(NSTimeZone *)theTimeZone connection:(MCPConnection *)theConnection withFullStreaming:(BOOL)useFullStreaming;
+
+// Results fetching
- (NSArray *)fetchNextRowAsArray;
-@end
+@end \ No newline at end of file
diff --git a/Frameworks/MCPKit/MCPFoundationKit/MCPStreamingResult.m b/Frameworks/MCPKit/MCPFoundationKit/MCPStreamingResult.m
index 5eb147d3..63353056 100644
--- a/Frameworks/MCPKit/MCPFoundationKit/MCPStreamingResult.m
+++ b/Frameworks/MCPKit/MCPFoundationKit/MCPStreamingResult.m
@@ -1,5 +1,5 @@
//
-// $Id$
+// $Id$
//
// MCPStreamingResult.m
// sequel-pro
@@ -31,11 +31,24 @@
/**
* IMPORTANT NOTE
*
- * MCPStreamingResult can produce fast and low-memory result reads, but should not
- * be widely used for reads as it can result in MySQL thread or table blocking.
+ * MCPStreamingResult can operate in two modes. The default mode is a safe implementation,
+ * which operates in a multithreaded fashion - a worker thread is set up to download the results as
+ * fast as possible in the background, while the results are made available via a blocking (and so
+ * single-thread-compatible) fetchNextRowAsArray call. This provides the benefit of allowing a progress
+ * bar to be shown during downloads, and threaded processing, but still has reasonable memory usage for
+ * the downloaded result - and won't block the server.
+ * Alternatively, withFullStreaming: can be set to YES, in which case each row will be accessed on-demand;
+ * this can be dangerous as it means a SELECT will tie up the server for longer, as for MyISAM tables
+ * updates (and subsequent reads) must block while a SELECT is still running. However this can be useful
+ * for certain processes such as working with very large tables to keep memory usage low.
*/
+@interface MCPStreamingResult (PrivateAPI)
+- (void) _downloadAllData;
+- (void) _freeAllDataWhenDone;
+@end
+
@implementation MCPStreamingResult : MCPResult
#pragma mark -
@@ -47,36 +60,64 @@
*/
- (id)initWithMySQLPtr:(MYSQL *)mySQLPtr encoding:(NSStringEncoding)theEncoding timeZone:(NSTimeZone *)theTimeZone connection:(MCPConnection *)theConnection
{
- if ((self = [super init])) {
- mEncoding = theEncoding;
- mTimeZone = [theTimeZone retain];
- parentConnection = theConnection;
-
- if (mResult) {
- mysql_free_result(mResult);
- mResult = NULL;
- }
-
- if (mNames) {
- [mNames release];
- mNames = NULL;
- }
-
- mResult = mysql_use_result(mySQLPtr);
-
- if (mResult) {
- mNumOfFields = mysql_num_fields(mResult);
- fieldDefinitions = mysql_fetch_fields(mResult);
- } else {
- mNumOfFields = 0;
- }
-
- if (mMySQLLocales == NULL) {
- mMySQLLocales = [[MCPConnection getMySQLLocales] retain];
- }
- }
-
- return self;
+ return [self initWithMySQLPtr:mySQLPtr encoding:theEncoding timeZone:theTimeZone connection:theConnection withFullStreaming:NO];
+
+}
+
+/**
+ * Master initialisation method, allowing selection of either full streaming or safe streaming
+ * (see "important note" above)
+ */
+- (id)initWithMySQLPtr:(MYSQL *)mySQLPtr encoding:(NSStringEncoding)theEncoding timeZone:(NSTimeZone *)theTimeZone connection:(MCPConnection *)theConnection withFullStreaming:(BOOL)useFullStreaming
+{
+ if ((self = [super init])) {
+ mEncoding = theEncoding;
+ mTimeZone = [theTimeZone retain];
+ parentConnection = theConnection;
+ fullyStreaming = useFullStreaming;
+
+ if (mResult) {
+ mysql_free_result(mResult);
+ mResult = NULL;
+ }
+
+ if (mNames) {
+ [mNames release];
+ mNames = NULL;
+ }
+
+ mResult = mysql_use_result(mySQLPtr);
+
+ if (mResult) {
+ mNumOfFields = mysql_num_fields(mResult);
+ fieldDefinitions = mysql_fetch_fields(mResult);
+ } else {
+ mNumOfFields = 0;
+ }
+
+ // If the result is opened in download-data-fast safe mode, set up the additional variables
+ // and threads required.
+ if (!fullyStreaming) {
+ dataDownloaded = NO;
+ dataFreed = NO;
+ localDataStore = NULL;
+ currentDataStoreEntry = NULL;
+ localDataStoreLastEntry = NULL;
+ localDataRows = 0;
+ localDataAllocated = 0;
+ downloadedRowCount = 0;
+ processedRowCount = 0;
+ freedRowCount = 0;
+
+ // Start the data download thread
+ [NSThread detachNewThreadSelector:@selector(_downloadAllData) toTarget:self withObject:nil];
+
+ // Start the data freeing thread
+ [NSThread detachNewThreadSelector:@selector(_freeAllDataWhenDone) toTarget:self withObject:nil];
+ }
+ }
+
+ return self;
}
/**
@@ -84,9 +125,9 @@
*/
- (void) dealloc
{
- [parentConnection unlockConnection];
-
- [super dealloc];
+ [parentConnection unlockConnection];
+
+ [super dealloc];
}
#pragma mark -
@@ -98,100 +139,149 @@
*/
- (NSArray *)fetchNextRowAsArray
{
- MYSQL_ROW theRow;
- unsigned long *fieldLengths;
- int i;
- NSMutableArray *returnArray;
+ MYSQL_ROW theRow;
+ char *theRowData;
+ unsigned long *fieldLengths;
+ int i, copiedDataLength;
+ NSMutableArray *returnArray;
- // Retrieve the next row
- theRow = mysql_fetch_row(mResult);
+ // Retrieve the next row according to the mode this result set is in.
+ // If fully streaming, retrieve the MYSQL_ROW
+ if (fullyStreaming) {
+ theRow = mysql_fetch_row(mResult);
- // If no data was returned, we're at the end of the result set - return nil.
- if (theRow == NULL) return nil;
+ // If no data was returned, we're at the end of the result set - return nil.
+ if (theRow == NULL) return nil;
- // Retrieve the lengths of the returned data
- fieldLengths = mysql_fetch_lengths(mResult);
+ // Retrieve the lengths of the returned data
+ fieldLengths = mysql_fetch_lengths(mResult);
- // Initialise the array to return
- returnArray = [NSMutableArray arrayWithCapacity:mNumOfFields];
- for (i = 0; i < mNumOfFields; i++) {
- id cellData;
+ // If in cached-streaming/fast download mode, get a reference to the data for the current row
+ } else {
+ copiedDataLength = 0;
+
+ // Check to see whether we need to wait for the data to be availabe
+ // - if so, wait 1ms before checking again
+ while (!dataDownloaded && processedRowCount == downloadedRowCount) {
+ usleep(1000);
+ }
- // Use NSNulls for the NULL data type
- if (theRow[i] == NULL) {
- cellData = [NSNull null];
+ // If all rows have been processed, we're at the end of the result set - return nil
+ // once all memory has been freed
+ if (processedRowCount == downloadedRowCount) {
+ while (!dataFreed) usleep(1000);
+ return nil;
+ }
- // Otherwise, switch by data type
- } else {
+ // Retrieve a reference to the data and the associated lengths
+ theRowData = currentDataStoreEntry->data;
+ fieldLengths = currentDataStoreEntry->dataLengths;
+ }
- // Create a null-terminated data string for processing
- char *theData = calloc(sizeof(char), fieldLengths[i]+1);
- memcpy(theData, theRow[i], fieldLengths[i]);
- theData[fieldLengths[i]] = '\0';
-
- switch (fieldDefinitions[i].type) {
- case FIELD_TYPE_TINY:
- case FIELD_TYPE_SHORT:
- case FIELD_TYPE_INT24:
- case FIELD_TYPE_LONG:
- case FIELD_TYPE_LONGLONG:
- case FIELD_TYPE_DECIMAL:
- case FIELD_TYPE_NEWDECIMAL:
- case FIELD_TYPE_FLOAT:
- case FIELD_TYPE_DOUBLE:
- case FIELD_TYPE_TIMESTAMP:
- case FIELD_TYPE_DATE:
- case FIELD_TYPE_TIME:
- case FIELD_TYPE_DATETIME:
- case FIELD_TYPE_YEAR:
- case FIELD_TYPE_VAR_STRING:
- case FIELD_TYPE_STRING:
- case FIELD_TYPE_SET:
- case FIELD_TYPE_ENUM:
- case FIELD_TYPE_NEWDATE: // Don't know what the format for this type is...
- cellData = [NSString stringWithCString:theData encoding:mEncoding];
- break;
-
- case FIELD_TYPE_BIT:
- cellData = [NSString stringWithFormat:@"%u", theData[0]];
- break;
-
- case FIELD_TYPE_TINY_BLOB:
- case FIELD_TYPE_BLOB:
- case FIELD_TYPE_MEDIUM_BLOB:
- case FIELD_TYPE_LONG_BLOB:
-
- // For binary data, return the data
- if (fieldDefinitions[i].flags & BINARY_FLAG) {
- cellData = [NSData dataWithBytes:theData length:fieldLengths[i]];
-
- // For string data, convert to text
- } else {
- cellData = [[NSString alloc] initWithBytes:theData length:fieldLengths[i] encoding:mEncoding];
- if (cellData) [cellData autorelease];
- }
- break;
-
- case FIELD_TYPE_NULL:
- cellData = [NSNull null];
- break;
-
- default:
- NSLog(@"in fetchNextRowAsArray : Unknown type : %d for column %d, sending back a NSData object", (int)fieldDefinitions[i].type, (int)i);
- cellData = [NSData dataWithBytes:theData length:fieldLengths[i]];
- break;
+ // Initialise the array to return
+ returnArray = [NSMutableArray arrayWithCapacity:mNumOfFields];
+ for (i = 0; i < mNumOfFields; i++) {
+ id cellData = nil;
+ char *theData;
+
+ // In fully streaming mode, copy across the data for the MYSQL_ROW
+ if (fullyStreaming) {
+ if (theRow[i] == NULL) {
+ cellData = [NSNull null];
+ } else {
+ theData = calloc(sizeof(char), fieldLengths[i]+1);
+ memcpy(theData, theRow[i], fieldLengths[i]);
+ theData[fieldLengths[i]] = '\0';
}
+
+ // In cached-streaming mode, use a reference to the downloaded data
+ } else {
+ if (fieldLengths[i] == NSNotFound) {
+ cellData = [NSNull null];
+ } else {
+ theData = theRowData+copiedDataLength;
+ copiedDataLength += fieldLengths[i] + 1;
+ }
+ }
- free(theData);
+ // If the data hasn't already been detected as NULL - in which case it will have been
+ // set to NSNull - process the data by type
+ if (cellData == nil) {
- // If a creator returned a nil object, replace with NSNull
- if (cellData == nil) cellData = [NSNull null];
- }
+ switch (fieldDefinitions[i].type) {
+ case FIELD_TYPE_TINY:
+ case FIELD_TYPE_SHORT:
+ case FIELD_TYPE_INT24:
+ case FIELD_TYPE_LONG:
+ case FIELD_TYPE_LONGLONG:
+ case FIELD_TYPE_DECIMAL:
+ case FIELD_TYPE_NEWDECIMAL:
+ case FIELD_TYPE_FLOAT:
+ case FIELD_TYPE_DOUBLE:
+ case FIELD_TYPE_TIMESTAMP:
+ case FIELD_TYPE_DATE:
+ case FIELD_TYPE_TIME:
+ case FIELD_TYPE_DATETIME:
+ case FIELD_TYPE_YEAR:
+ case FIELD_TYPE_VAR_STRING:
+ case FIELD_TYPE_STRING:
+ case FIELD_TYPE_SET:
+ case FIELD_TYPE_ENUM:
+ case FIELD_TYPE_NEWDATE: // Don't know what the format for this type is...
+ cellData = [NSString stringWithCString:theData encoding:mEncoding];
+ break;
+
+ case FIELD_TYPE_BIT:
+ cellData = [NSString stringWithFormat:@"%u", theData[0]];
+ break;
+
+ case FIELD_TYPE_TINY_BLOB:
+ case FIELD_TYPE_BLOB:
+ case FIELD_TYPE_MEDIUM_BLOB:
+ case FIELD_TYPE_LONG_BLOB:
+
+ // For binary data, return the data
+ if (fieldDefinitions[i].flags & BINARY_FLAG) {
+ cellData = [NSData dataWithBytes:theData length:fieldLengths[i]];
+
+ // For string data, convert to text
+ } else {
+ cellData = [[NSString alloc] initWithBytes:theData length:fieldLengths[i] encoding:mEncoding];
+ if (cellData) [cellData autorelease];
+ }
+ break;
+
+ case FIELD_TYPE_NULL:
+ cellData = [NSNull null];
+ break;
+
+ default:
+ NSLog(@"in fetchNextRowAsArray : Unknown type : %d for column %d, sending back a NSData object", (int)fieldDefinitions[i].type, (int)i);
+ cellData = [NSData dataWithBytes:theData length:fieldLengths[i]];
+ break;
+ }
+
+ // Free the data if it was originally allocated
+ if (fullyStreaming) free(theData);
+
+ // If a creator returned a nil object, replace with NSNull
+ if (cellData == nil) cellData = [NSNull null];
+ }
+
+ [returnArray insertObject:cellData atIndex:i];
+ }
- [returnArray insertObject:cellData atIndex:i];
+ // If in cached-streaming mode, update the current entry processed count
+ if (!fullyStreaming) {
+
+ // Update the active-data pointer to the next item in the list, or set to NULL if no more items
+ currentDataStoreEntry = currentDataStoreEntry->nextRow;
+
+ // Increment counter
+ processedRowCount++;
}
- return returnArray;
+ return returnArray;
}
#pragma mark -
@@ -199,13 +289,117 @@
- (my_ulonglong) numOfRows
{
- NSLog(@"numOfRows cannot be used with streaming results");
- return 0;
+ NSLog(@"numOfRows cannot be used with streaming results");
+ return 0;
}
- (void) dataSeek:(my_ulonglong) row
{
- NSLog(@"dataSeek cannot be used with streaming results");
+ NSLog(@"dataSeek cannot be used with streaming results");
+}
+
+@end
+
+@implementation MCPStreamingResult (PrivateAPI)
+
+/**
+ * Used internally to download results in a background thread
+ */
+- (void) _downloadAllData
+{
+ NSAutoreleasePool *downloadPool = [[NSAutoreleasePool alloc] init];
+ MYSQL_ROW theRow;
+ unsigned long *fieldLengths;
+ int i, dataCopiedLength, rowDataLength;
+ LOCAL_ROW_DATA *newRowStore;
+
+ size_t sizeOfLocalRowData = sizeof(LOCAL_ROW_DATA);
+ size_t sizeOfDataLengths = (size_t)(sizeof(unsigned long) * mNumOfFields);
+
+ // Loop through the rows until the end of the data is reached - indicated via a NULL
+ while (theRow = mysql_fetch_row(mResult)) {
+
+ // Retrieve the lengths of the returned data
+ fieldLengths = mysql_fetch_lengths(mResult);
+ rowDataLength = 0;
+ dataCopiedLength = 0;
+ for (i = 0; i < mNumOfFields; i++)
+ rowDataLength += fieldLengths[i];
+
+ // Initialise memory for the row and set a NULL pointer for the next item
+ newRowStore = malloc(sizeOfLocalRowData);
+ newRowStore->nextRow = NULL;
+
+ // Set up the row data store - a char* - and copy in the data if there is any,
+ // using a null terminator for each field boundary for easier data processing later
+ if (rowDataLength) {
+ newRowStore->data = malloc(sizeof(char) * (rowDataLength + mNumOfFields));
+ for (i = 0; i < mNumOfFields; i++) {
+ if (theRow[i] != NULL) {
+ memcpy(newRowStore->data+dataCopiedLength, theRow[i], fieldLengths[i]);
+ newRowStore->data[dataCopiedLength+fieldLengths[i]] = '\0';
+ dataCopiedLength += fieldLengths[i] + 1;
+ } else {
+ fieldLengths[i] = NSNotFound;
+ }
+ }
+ } else {
+ newRowStore->data = NULL;
+ }
+
+ // Set up and copy in the field lengths
+ newRowStore->dataLengths = memcpy(malloc(sizeOfDataLengths), fieldLengths, sizeOfDataLengths);
+
+ // Add the newly allocated row to end of the storage linked list
+ if (localDataStore) {
+ localDataStoreLastEntry->nextRow = newRowStore;
+ } else {
+ localDataStore = newRowStore;
+ }
+ localDataStoreLastEntry = newRowStore;
+ if (!currentDataStoreEntry) currentDataStoreEntry = newRowStore;
+
+ // Update the downloaded row count
+ downloadedRowCount++;
+ }
+
+ dataDownloaded = YES;
+ [downloadPool drain];
+}
+
+/**
+ * Used internally to free data which has been fully processed; done in a thread to allow
+ * fetchNextRowAsArray to be faster.
+ */
+- (void) _freeAllDataWhenDone
+{
+ NSAutoreleasePool *dataFreeingPool = [[NSAutoreleasePool alloc] init];
+
+ while (!dataDownloaded || freedRowCount != downloadedRowCount) {
+
+ // If the freed row count matches the processed row count, wait before retrying
+ if (freedRowCount == processedRowCount) {
+ usleep(1000);
+ continue;
+ }
+
+ // Free a single item off the bottom of the list
+ // Update the data pointer to the next item in the list, or set to NULL if no more items
+ LOCAL_ROW_DATA *rowToRemove = localDataStore;
+ localDataStore = localDataStore->nextRow;
+
+ // Free memory for the first row
+ rowToRemove->nextRow = NULL;
+ free(rowToRemove->dataLengths);
+ if (rowToRemove->data != NULL) free(rowToRemove->data);
+ free(rowToRemove);
+
+ // Increment the counter
+ freedRowCount++;
+ }
+
+ dataFreed = YES;
+ [dataFreeingPool drain];
}
@end \ No newline at end of file