aboutsummaryrefslogtreecommitdiffstats
path: root/Frameworks/SPMySQLFramework/Source/SPMySQLFastStreamingResult.m
diff options
context:
space:
mode:
authorrowanbeentje <rowan@beent.je>2012-02-20 00:51:47 +0000
committerrowanbeentje <rowan@beent.je>2012-02-20 00:51:47 +0000
commitcc3c436432e592d15c46a4a9e38c6cb689eca0ee (patch)
tree2b0c17791b8a62ff2ab707a8bb082e563bec8c80 /Frameworks/SPMySQLFramework/Source/SPMySQLFastStreamingResult.m
parentbe5b96f6d79fecec53da93be6bb4d10df90e9ef5 (diff)
downloadsequelpro-cc3c436432e592d15c46a4a9e38c6cb689eca0ee.tar.gz
sequelpro-cc3c436432e592d15c46a4a9e38c6cb689eca0ee.tar.bz2
sequelpro-cc3c436432e592d15c46a4a9e38c6cb689eca0ee.zip
Initial commit of the new SPMySQL Framework, which is added to the project and ready for use but not yet integrated. This new framework should provide much of the functionality required from MCPKit and is based around its interface for relatively easy integration. The largest missing component is Hans' structure code which I believe is better placed outside the framework.
From the Readme file: The SPMySQL Framework is intended to provide a stable MySQL connection framework, with the ability to run text-based queries and rapidly retrieve result sets with conversion from MySQL data types to Cocoa objects. SPMySQL.framework has an interface loosely based around that provided by MCPKit by Serge Cohen and Bertrand Mansion (http://mysql-cocoa.sourceforge.net/), and in particular the heavily modified Sequel Pro version (http://www.sequelpro.com/). It is a full rewrite of the original framework, although it includes code from patches implementing the following Sequel Pro functionality, largely contributed by Hans-Jörg Bibiko, Stuart Connolly, Jakob Egger, and Rowan Beentje: - Connection locking (Jakob et al) - Ping & keepalive (Rowan et al) - Query cancellation (Rowan et al) - Delegate setup (Stuart et al) - SSL support (Rowan et al) - Connection checking (Rowan et al) - Version state (Stuart et al) - Maximum packet size control (Hans et al) - Result multithreading and streaming (Rowan et al) - Improved encoding support & switching (Rowan et al) - Database structure; moved to inside the app (Hans et al) - Query reattempts and error-handling approach (Rowan et al) - Geometry result class (Hans et al) - Connection proxy (Stuart et al)
Diffstat (limited to 'Frameworks/SPMySQLFramework/Source/SPMySQLFastStreamingResult.m')
-rw-r--r--Frameworks/SPMySQLFramework/Source/SPMySQLFastStreamingResult.m417
1 files changed, 417 insertions, 0 deletions
diff --git a/Frameworks/SPMySQLFramework/Source/SPMySQLFastStreamingResult.m b/Frameworks/SPMySQLFramework/Source/SPMySQLFastStreamingResult.m
new file mode 100644
index 00000000..f084dee0
--- /dev/null
+++ b/Frameworks/SPMySQLFramework/Source/SPMySQLFastStreamingResult.m
@@ -0,0 +1,417 @@
+//
+// $Id$
+//
+// SPMySQLFastStreamingResult.m
+// SPMySQLFramework
+//
+// Created by Rowan Beentje (rowan.beent.je) on February 2, 2012
+// Copyright (c) 2012 Rowan Beentje. All rights reserved.
+//
+// Permission is hereby granted, free of charge, to any person
+// obtaining a copy of this software and associated documentation
+// files (the "Software"), to deal in the Software without
+// restriction, including without limitation the rights to use,
+// copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the
+// Software is furnished to do so, subject to the following
+// conditions:
+//
+// The above copyright notice and this permission notice shall be
+// included in all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+// EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+// OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+// NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+// HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+// WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+// OTHER DEALINGS IN THE SOFTWARE.
+//
+// More info at <http://code.google.com/p/sequel-pro/>
+
+#import "SPMySQLFastStreamingResult.h"
+#import "SPMySQL Private APIs.h"
+#include <pthread.h>
+
+/**
+ * This type of streaming result operates in a multithreaded fashion - a worker
+ * thread is set up to download the results as fast as possible in the background,
+ * while the results are made available via blocking (and so single-thread-compatible)
+ * calls. This provides the benefit of allowing a progress bar to be shown during
+ * downloads, and threaded processing, but still has reasonable memory usage for the
+ * downloaded result - and won't block the server.
+ */
+
+typedef struct st_spmysqlstreamingrowdata {
+ char *data;
+ NSUInteger *dataLengths;
+ struct st_spmysqlstreamingrowdata *nextRow;
+} SPMySQLStreamingRowData;
+
+@interface SPMySQLFastStreamingResult (Private_API)
+
+- (void) _downloadAllData;
+
+@end
+
+#pragma mark -
+
+@implementation SPMySQLFastStreamingResult
+
+#pragma mark -
+
+/**
+ * Standard init method, constructing the SPMySQLStreamingResult around a MySQL
+ * result pointer and the encoding to use when working with the data.
+ * As opposed to SPMySQLResult, defaults to returning rows as arrays, as the result
+ * sets are likely to be larger and processed in loops.
+ */
+- (id)initWithMySQLResult:(void *)theResult stringEncoding:(NSStringEncoding)theStringEncoding connection:(SPMySQLConnection *)theConnection
+{
+
+ // If no result set was passed in, return nil.
+ if (!theResult) return nil;
+
+ if ((self = [super initWithMySQLResult:theResult stringEncoding:theStringEncoding connection:theConnection])) {
+
+ // Initialise the extra streaming result counts and tracking
+ processedRowCount = 0;
+
+ // Initialise the linked list pointers
+ currentDataStoreEntry = NULL;
+ lastDataStoreEntry = NULL;
+
+ // Set up the linked list lock
+ pthread_mutex_init(&dataLock, NULL);
+
+ // Start the data download thread
+ [NSThread detachNewThreadSelector:@selector(_downloadAllData) toTarget:self withObject:nil];
+ }
+
+ return self;
+}
+
+/**
+ * Deallocate the result and ensure the parent connection is unlocked for further use.
+ */
+- (void)dealloc
+{
+
+ // Ensure all data is processed and the parent connection is unlocked
+ [self cancelResultLoad];
+
+ // Destroy the linked list lock
+ pthread_mutex_destroy(&dataLock);
+
+ // Call dealloc on super to clean up everything else, and to throw an exception if
+ // the parent connection hasn't been cleaned up correctly.
+ [super dealloc];
+}
+
+#pragma mark -
+#pragma mark Data retrieval
+
+/**
+ * Override the convenience selectors so that forwarding works correctly.
+ */
+- (id)getRow
+{
+ return SPMySQLResultGetRow(self, SPMySQLResultRowAsDefault);
+}
+- (NSArray *)getRowAsArray
+{
+ return SPMySQLResultGetRow(self, SPMySQLResultRowAsArray);
+}
+- (NSDictionary *)getRowAsDictionary
+{
+ return SPMySQLResultGetRow(self, SPMySQLResultRowAsDictionary);
+}
+
+/**
+ * Retrieve the next row in the result set, using the internal pointer, in the specified
+ * return format.
+ * If there are no rows remaining in the current iteration, returns nil.
+ */
+- (id)getRowAsType:(SPMySQLResultRowType)theType
+{
+ NSUInteger copiedDataLength = 0;
+ char *theRowData;
+ NSUInteger *fieldLengths;
+ id theReturnData;
+
+ // Lock the data mutex for safe access of variables and counters
+ pthread_mutex_lock(&dataLock);
+
+ // Determine whether any data is available; if not, wait 1ms before trying again
+ while (!dataDownloaded && processedRowCount == downloadedRowCount) {
+ pthread_mutex_unlock(&dataLock);
+ usleep(1000);
+ pthread_mutex_lock(&dataLock);
+ }
+
+ // If all rows have been processed, the end of the result set has been reached; return nil.
+ if (processedRowCount == downloadedRowCount) {
+ pthread_mutex_unlock(&dataLock);
+ return nil;
+ }
+
+ // Unlock the data mutex now checks are complete
+ pthread_mutex_unlock(&dataLock);
+
+ // Get a reference to the data for the current row; this is safe to do outside the lock
+ // as the pointer won't change until markers are changed at the end of this process
+ theRowData = currentDataStoreEntry->data;
+ fieldLengths = currentDataStoreEntry->dataLengths;
+
+ // If the target type was unspecified, use the instance default
+ if (theType == SPMySQLResultRowAsDefault) theType = defaultRowReturnType;
+
+ // Set up the return data as appropriate
+ if (theType == SPMySQLResultRowAsArray) {
+ theReturnData = [NSMutableArray arrayWithCapacity:numberOfFields];
+ } else {
+ theReturnData = [NSMutableDictionary dictionaryWithCapacity:numberOfFields];
+ }
+
+ // Convert each of the cells in the row in turn
+ for (NSUInteger i = 0; i < numberOfFields; i++) {
+ char *rawCellData;
+ NSUInteger fieldLength = fieldLengths[i];
+
+ // If the length of this cell is NSNotFound, it's a null reference
+ if (fieldLength == NSNotFound) {
+ rawCellData = NULL;
+
+ // Otherwise grab a reference to that data using pointer arithmetic
+ } else {
+ rawCellData = theRowData + copiedDataLength;
+ copiedDataLength += fieldLength;
+ }
+
+ // Convert to the correct object type
+ id cellData = SPMySQLResultGetObject(self, rawCellData, fieldLength, fieldTypes[i], i);
+
+ // If object creation failed, display a null
+ if (!cellData) cellData = [NSNull null];
+
+ // Add to the result array/dictionary
+ if (theType == SPMySQLResultRowAsArray) {
+ [(NSMutableArray *)theReturnData addObject:cellData];
+ } else {
+ [(NSMutableDictionary *)theReturnData setObject:cellData forKey:fieldNames[i]];
+ }
+ }
+
+ // Get a reference to the current item
+ SPMySQLStreamingRowData *previousDataStoreEntry = currentDataStoreEntry;
+
+ // Lock the mutex before updating counters and linked lists
+ pthread_mutex_lock(&dataLock);
+
+ // Update the active-data pointer to the next item in the list (which may be NULL)
+ currentDataStoreEntry = currentDataStoreEntry->nextRow;
+
+ // Increment the processed counter and row index
+ processedRowCount++;
+ currentRowIndex++;
+ if (dataDownloaded && processedRowCount == downloadedRowCount) currentRowIndex = NSNotFound;
+
+ // Unlock the mutex
+ pthread_mutex_unlock(&dataLock);
+
+ // Free the memory for the processed row
+ previousDataStoreEntry->nextRow = NULL;
+ free(previousDataStoreEntry->dataLengths);
+ if (previousDataStoreEntry->data != NULL) free(previousDataStoreEntry->data);
+ free(previousDataStoreEntry);
+
+ return theReturnData;
+}
+
+/*
+ * Ensure the result set is fully processed and freed without any processing
+ * This method ensures that the connection is unlocked.
+ */
+- (void)cancelResultLoad
+{
+
+ // If data has already been downloaded successfully, no further action is required
+ if (dataDownloaded && processedRowCount == downloadedRowCount) return;
+
+ // Loop until all data is fetched and freed
+ while (1) {
+
+ // Check to see whether we need to wait for the data to be available
+ // - if so, wait 1ms before checking again
+ while (!dataDownloaded && processedRowCount == downloadedRowCount) usleep(1000);
+
+ // If all rows have been processed, we're at the end of the result set - return
+ if (processedRowCount == downloadedRowCount) {
+
+ // We don't need to unlock the connection because the data loading thread
+ // has already taken care of that
+ return;
+ }
+
+ // Mark the row entry as processed without performing any actions
+ pthread_mutex_lock(&dataLock);
+ SPMySQLStreamingRowData *previousDataStoreEntry = currentDataStoreEntry;
+
+ // Update the active-data pointer to the next item in the list (which may be NULL)
+ currentDataStoreEntry = currentDataStoreEntry->nextRow;
+
+ processedRowCount++;
+ currentRowIndex++;
+ if (dataDownloaded && processedRowCount == downloadedRowCount) currentRowIndex = NSNotFound;
+
+ // Unlock the mutex
+ pthread_mutex_unlock(&dataLock);
+
+ // Free the memory for the processed row
+ previousDataStoreEntry->nextRow = NULL;
+ free(previousDataStoreEntry->dataLengths);
+ if (previousDataStoreEntry->data != NULL) free(previousDataStoreEntry->data);
+ free(previousDataStoreEntry);
+ }
+}
+
+#pragma mark -
+#pragma mark Data retrieval for fast enumeration
+
+/**
+ * Implement the fast enumeration endpoint. Rows for fast enumeration are retrieved in
+ * the instance default, as specified in setDefaultRowReturnType: or defaulting to
+ * NSDictionary.
+ */
+- (NSUInteger)countByEnumeratingWithState:(NSFastEnumerationState *)state objects:(id *)stackbuf count:(NSUInteger)len
+{
+
+ // If all rows have already been processed, return 0 to stop iteration.
+ if (dataDownloaded && processedRowCount == downloadedRowCount) return 0;
+
+ // If the MySQL row pointer does not match the requested state, throw an exception
+ if (state->state != currentRowIndex) {
+ [NSException raise:NSRangeException format:@"SPMySQLFastStreamingResult results can only be accessed linearly"];
+ }
+
+ // Determine how many objects to return. Default to 128 items, or the number of items requested
+ NSUInteger itemsToReturn = 128;
+ if (len < 128) itemsToReturn = len;
+
+ // If there are fewer items available in the downloaded-but-processed queue, limit to that
+ if (downloadedRowCount - processedRowCount < itemsToReturn) {
+ itemsToReturn = downloadedRowCount - processedRowCount;
+ }
+
+ // If no rows are available to be processed, wait for a single item to be readied.
+ if (!itemsToReturn) itemsToReturn = 1;
+
+ // Retrieve rows and add them to the result stack
+ NSUInteger i, itemsRetrieved = 0;
+ id eachRow;
+ for (i = 0; i < itemsToReturn; i++) {
+ eachRow = SPMySQLResultGetRow(self, SPMySQLResultRowAsDefault);
+
+ // If nil was returned the end of the result resource has been reached
+ if (!eachRow) {
+ if (!itemsRetrieved) return 0;
+ break;
+ }
+
+ stackbuf[i] = eachRow;
+ itemsRetrieved++;
+ }
+
+ state->state += itemsRetrieved;
+ state->itemsPtr = stackbuf;
+ state->mutationsPtr = (unsigned long *)self;
+
+ return itemsRetrieved;
+}
+
+@end
+
+#pragma mark -
+#pragma mark Result set internals
+
+@implementation SPMySQLFastStreamingResult (Private_API)
+
+/**
+ * Used internally to download results in a background thread
+ */
+- (void)_downloadAllData
+{
+ NSAutoreleasePool *downloadPool = [[NSAutoreleasePool alloc] init];
+ MYSQL_ROW theRow;
+ unsigned long *fieldLengths;
+ NSUInteger i, dataCopiedLength, rowDataLength;
+ SPMySQLStreamingRowData *newRowStore;
+
+ size_t sizeOfStreamingRowData = sizeof(SPMySQLStreamingRowData);
+ size_t sizeOfDataLengths = (size_t)(sizeof(NSUInteger) * numberOfFields);
+ size_t sizeOfChar = sizeof(char);
+
+ // Loop through the rows until the end of the data is reached - indicated via a NULL
+ while (
+ (*isConnectedPtr)(parentConnection, isConnectedSelector)
+ && (theRow = mysql_fetch_row(resultSet))
+ )
+ {
+
+ // Retrieve the lengths of the returned data
+ fieldLengths = mysql_fetch_lengths(resultSet);
+ rowDataLength = 0;
+ dataCopiedLength = 0;
+ for (i = 0; i < numberOfFields; i++) {
+ rowDataLength += fieldLengths[i];
+ }
+
+ // Initialise memory for the row and set a NULL pointer for the next item
+ newRowStore = malloc(sizeOfStreamingRowData);
+ newRowStore->nextRow = NULL;
+
+ // Set up the row data store - a char* - and copy in the data if there is any.
+ newRowStore->data = malloc(sizeOfChar * rowDataLength);
+ for (i = 0; i < numberOfFields; i++) {
+ if (theRow[i] != NULL) {
+ memcpy(newRowStore->data+dataCopiedLength, theRow[i], fieldLengths[i]);
+ dataCopiedLength += fieldLengths[i];
+ } else {
+ fieldLengths[i] = NSNotFound;
+ }
+ }
+
+ // Set up the memory for, and copy in, the field lengths
+ newRowStore->dataLengths = memcpy(malloc(sizeOfDataLengths), fieldLengths, sizeOfDataLengths);
+
+ // Lock the data mutex
+ pthread_mutex_lock(&dataLock);
+
+ // Add the newly allocated row to end of the storage linked list
+ if (lastDataStoreEntry) {
+ lastDataStoreEntry->nextRow = newRowStore;
+ }
+ lastDataStoreEntry = newRowStore;
+ if (!currentDataStoreEntry) currentDataStoreEntry = newRowStore;
+
+ // Update the downloaded row count
+ downloadedRowCount++;
+
+ // Unlock the mutex
+ pthread_mutex_unlock(&dataLock);
+ }
+
+ // Update the connection's error statuses to reflect any errors during the content download
+ [parentConnection _updateLastErrorID:NSNotFound];
+ [parentConnection _updateLastErrorMessage:nil];
+
+ // Unlock the parent connection now all data has been retrieved
+ [parentConnection _unlockConnection];
+ connectionUnlocked = YES;
+
+ dataDownloaded = YES;
+ [downloadPool drain];
+}
+
+@end