//
// $Id$
//
// SPMySQLFastStreamingResult.m
// SPMySQLFramework
//
// Created by Rowan Beentje (rowan.beent.je) on February 2, 2012
// Copyright (c) 2012 Rowan Beentje. All rights reserved.
//
// Permission is hereby granted, free of charge, to any person
// obtaining a copy of this software and associated documentation
// files (the "Software"), to deal in the Software without
// restriction, including without limitation the rights to use,
// copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following
// conditions:
//
// The above copyright notice and this permission notice shall be
// included in all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
// EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
// OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
// NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
// HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
// WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
// OTHER DEALINGS IN THE SOFTWARE.
//
// More info at
#import "SPMySQLFastStreamingResult.h"
#import "SPMySQL Private APIs.h"
#import "SPMySQLArrayAdditions.h"
#include
static id NSNullPointer;
/**
* This type of streaming result operates in a multithreaded fashion - a worker
* thread is set up to download the results as fast as possible in the background,
* while the results are made available via blocking (and so single-thread-compatible)
* calls. This provides the benefit of allowing a progress bar to be shown during
* downloads, and threaded processing, but still has reasonable memory usage for the
* downloaded result - and won't block the server.
*/
typedef struct st_spmysqlstreamingrowdata {
char *data;
unsigned long *dataLengths;
struct st_spmysqlstreamingrowdata *nextRow;
} SPMySQLStreamingRowData;
@interface SPMySQLFastStreamingResult (Private_API)
- (void) _downloadAllData;
@end
#pragma mark -
@implementation SPMySQLFastStreamingResult
#pragma mark -
/**
* In the one-off class initialisation, cache static variables
*/
+ (void)initialize
{
// Cached NSNull singleton reference
if (!NSNullPointer) NSNullPointer = [NSNull null];
}
/**
* Standard init method, constructing the SPMySQLStreamingResult around a MySQL
* result pointer and the encoding to use when working with the data.
* As opposed to SPMySQLResult, defaults to returning rows as arrays, as the result
* sets are likely to be larger and processed in loops.
*/
- (id)initWithMySQLResult:(void *)theResult stringEncoding:(NSStringEncoding)theStringEncoding connection:(SPMySQLConnection *)theConnection
{
// If no result set was passed in, return nil.
if (!theResult) return nil;
if ((self = [super initWithMySQLResult:theResult stringEncoding:theStringEncoding connection:theConnection])) {
// Initialise the extra streaming result counts and tracking
processedRowCount = 0;
// Initialise the linked list pointers
currentDataStoreEntry = NULL;
lastDataStoreEntry = NULL;
// Set up the linked list lock
pthread_mutex_init(&dataLock, NULL);
// Start the data download thread
[NSThread detachNewThreadSelector:@selector(_downloadAllData) toTarget:self withObject:nil];
}
return self;
}
/**
* Deallocate the result and ensure the parent connection is unlocked for further use.
*/
- (void)dealloc
{
// Ensure all data is processed and the parent connection is unlocked
[self cancelResultLoad];
// Destroy the linked list lock
pthread_mutex_destroy(&dataLock);
// Call dealloc on super to clean up everything else, and to throw an exception if
// the parent connection hasn't been cleaned up correctly.
[super dealloc];
}
#pragma mark -
#pragma mark Data retrieval
/**
* Override the convenience selectors so that forwarding works correctly.
*/
- (id)getRow
{
return SPMySQLResultGetRow(self, SPMySQLResultRowAsDefault);
}
- (NSArray *)getRowAsArray
{
return SPMySQLResultGetRow(self, SPMySQLResultRowAsArray);
}
- (NSDictionary *)getRowAsDictionary
{
return SPMySQLResultGetRow(self, SPMySQLResultRowAsDictionary);
}
/**
* Retrieve the next row in the result set, using the internal pointer, in the specified
* return format.
* If there are no rows remaining in the current iteration, returns nil.
*/
- (id)getRowAsType:(SPMySQLResultRowType)theType
{
NSUInteger copiedDataLength = 0;
char *theRowData;
unsigned long *fieldLengths;
id theReturnData;
// If the target type was unspecified, use the instance default
if (theType == SPMySQLResultRowAsDefault) theType = defaultRowReturnType;
// Set up the return data as appropriate
if (theType == SPMySQLResultRowAsArray) {
theReturnData = [NSMutableArray arrayWithCapacity:numberOfFields];
} else {
theReturnData = [NSMutableDictionary dictionaryWithCapacity:numberOfFields];
}
// Lock the data mutex for safe access of variables and counters
pthread_mutex_lock(&dataLock);
// Determine whether any data is available; if not, wait 1ms before trying again
while (!dataDownloaded && processedRowCount == downloadedRowCount) {
pthread_mutex_unlock(&dataLock);
usleep(1000);
pthread_mutex_lock(&dataLock);
}
// If all rows have been processed, the end of the result set has been reached; return nil.
if (processedRowCount == downloadedRowCount) {
pthread_mutex_unlock(&dataLock);
return nil;
}
// Unlock the data mutex now checks are complete
pthread_mutex_unlock(&dataLock);
// Get a reference to the data for the current row; this is safe to do outside the lock
// as the pointer won't change until markers are changed at the end of this process
theRowData = currentDataStoreEntry->data;
fieldLengths = currentDataStoreEntry->dataLengths;
// Convert each of the cells in the row in turn
unsigned long fieldLength;
id cellData;
char *rawCellData;
for (NSUInteger i = 0; i < numberOfFields; i++) {
fieldLength = fieldLengths[i];
// If the length of this cell is NSNotFound, it's a null reference
if (fieldLength == NSNotFound) {
cellData = nil;
// Otherwise grab a reference to that data using pointer arithmetic
} else {
rawCellData = theRowData + copiedDataLength;
copiedDataLength += fieldLength;
// Convert to the correct object type
cellData = SPMySQLResultGetObject(self, rawCellData, fieldLength, i, NSNotFound);
}
// If object creation failed, display a null
if (!cellData) cellData = NSNullPointer;
// Add to the result array/dictionary
if (theType == SPMySQLResultRowAsArray) {
SPMySQLMutableArrayInsertObject(theReturnData, cellData, i);
} else {
[(NSMutableDictionary *)theReturnData setObject:cellData forKey:fieldNames[i]];
}
}
// Get a reference to the current item
SPMySQLStreamingRowData *previousDataStoreEntry = currentDataStoreEntry;
// Lock the mutex before updating counters and linked lists
pthread_mutex_lock(&dataLock);
// Update the active-data pointer to the next item in the list (which may be NULL)
currentDataStoreEntry = currentDataStoreEntry->nextRow;
if (!currentDataStoreEntry) lastDataStoreEntry = NULL;
// Increment the processed counter and row index
processedRowCount++;
currentRowIndex++;
if (dataDownloaded && processedRowCount == downloadedRowCount) currentRowIndex = NSNotFound;
// Unlock the mutex
pthread_mutex_unlock(&dataLock);
// Free the memory for the processed row
free(previousDataStoreEntry->dataLengths);
if (previousDataStoreEntry->data != NULL) free(previousDataStoreEntry->data);
free(previousDataStoreEntry);
return theReturnData;
}
/*
* Ensure the result set is fully processed and freed without any processing
* This method ensures that the connection is unlocked.
*/
- (void)cancelResultLoad
{
// If data has already been downloaded successfully, no further action is required
if (dataDownloaded && processedRowCount == downloadedRowCount) return;
// Loop until all data is fetched and freed
while (1) {
// Check to see whether we need to wait for the data to be available
// - if so, wait 1ms before checking again
while (!dataDownloaded && processedRowCount == downloadedRowCount) usleep(1000);
// If all rows have been processed, we're at the end of the result set - return
if (processedRowCount == downloadedRowCount) {
// We don't need to unlock the connection because the data loading thread
// has already taken care of that
return;
}
// Mark the row entry as processed without performing any actions
pthread_mutex_lock(&dataLock);
SPMySQLStreamingRowData *previousDataStoreEntry = currentDataStoreEntry;
// Update the active-data pointer to the next item in the list (which may be NULL)
currentDataStoreEntry = currentDataStoreEntry->nextRow;
if (!currentDataStoreEntry) lastDataStoreEntry = NULL;
processedRowCount++;
currentRowIndex++;
if (dataDownloaded && processedRowCount == downloadedRowCount) currentRowIndex = NSNotFound;
// Unlock the mutex
pthread_mutex_unlock(&dataLock);
// Free the memory for the processed row
free(previousDataStoreEntry->dataLengths);
if (previousDataStoreEntry->data != NULL) free(previousDataStoreEntry->data);
free(previousDataStoreEntry);
}
}
#pragma mark -
#pragma mark Data retrieval for fast enumeration
/**
* Implement the fast enumeration endpoint. Rows for fast enumeration are retrieved in
* the instance default, as specified in setDefaultRowReturnType: or defaulting to
* NSDictionary.
*/
- (NSUInteger)countByEnumeratingWithState:(NSFastEnumerationState *)state objects:(id *)stackbuf count:(NSUInteger)len
{
// To avoid lock issues, return one row at a time.
id nextRow = SPMySQLResultGetRow(self, SPMySQLResultRowAsDefault);
// If no row was available, return 0 to stop iteration.
if (!nextRow) return 0;
// Otherwise, add the item to the buffer and return the appropriate state.
stackbuf[0] = nextRow;
state->state += 1;
state->itemsPtr = stackbuf;
state->mutationsPtr = (unsigned long *)self;
return 1;
}
@end
#pragma mark -
#pragma mark Result set internals
@implementation SPMySQLFastStreamingResult (Private_API)
/**
* Used internally to download results in a background thread
*/
- (void)_downloadAllData
{
NSAutoreleasePool *downloadPool = [[NSAutoreleasePool alloc] init];
MYSQL_ROW theRow;
unsigned long *fieldLengths;
NSUInteger i, dataCopiedLength, rowDataLength;
SPMySQLStreamingRowData *newRowStore;
[[NSThread currentThread] setName:@"SPMySQLFastStreamingResult data download thread"];
size_t sizeOfStreamingRowData = sizeof(SPMySQLStreamingRowData);
size_t sizeOfDataLengths = (size_t)(sizeof(unsigned long) * numberOfFields);
size_t sizeOfChar = sizeof(char);
// Loop through the rows until the end of the data is reached - indicated via a NULL
while (
(*isConnectedPtr)(parentConnection, isConnectedSelector)
&& (theRow = mysql_fetch_row(resultSet))
)
{
// Retrieve the lengths of the returned data
fieldLengths = mysql_fetch_lengths(resultSet);
rowDataLength = 0;
dataCopiedLength = 0;
for (i = 0; i < numberOfFields; i++) {
rowDataLength += fieldLengths[i];
}
// Initialise memory for the row and set a NULL pointer for the next item
newRowStore = malloc(sizeOfStreamingRowData);
newRowStore->nextRow = NULL;
// Set up the row data store - a char* - and copy in the data if there is any.
newRowStore->data = malloc(sizeOfChar * rowDataLength);
for (i = 0; i < numberOfFields; i++) {
if (theRow[i] != NULL) {
memcpy(newRowStore->data+dataCopiedLength, theRow[i], fieldLengths[i]);
dataCopiedLength += fieldLengths[i];
} else {
fieldLengths[i] = NSNotFound;
}
}
// Set up the memory for, and copy in, the field lengths
newRowStore->dataLengths = memcpy(malloc(sizeOfDataLengths), fieldLengths, sizeOfDataLengths);
// Lock the data mutex
pthread_mutex_lock(&dataLock);
// Add the newly allocated row to end of the storage linked list
if (lastDataStoreEntry) {
lastDataStoreEntry->nextRow = newRowStore;
}
lastDataStoreEntry = newRowStore;
if (!currentDataStoreEntry) currentDataStoreEntry = newRowStore;
// Update the downloaded row count
downloadedRowCount++;
// Unlock the mutex
pthread_mutex_unlock(&dataLock);
}
// Update the connection's error statuses to reflect any errors during the content download
[parentConnection _updateLastErrorID:NSNotFound];
[parentConnection _updateLastErrorMessage:nil];
// Unlock the parent connection now all data has been retrieved
[parentConnection _unlockConnection];
connectionUnlocked = YES;
// If the connection query may have been cancelled with a query kill, double-check connection
if ([parentConnection lastQueryWasCancelled] && [parentConnection serverMajorVersion] < 5) {
[parentConnection checkConnection];
}
dataDownloaded = YES;
[downloadPool drain];
}
@end