aboutsummaryrefslogtreecommitdiffstats
path: root/Frameworks/MCPKit
diff options
context:
space:
mode:
Diffstat (limited to 'Frameworks/MCPKit')
-rw-r--r--Frameworks/MCPKit/MCPFoundationKit/MCPStreamingResult.h2
-rw-r--r--Frameworks/MCPKit/MCPFoundationKit/MCPStreamingResult.m47
2 files changed, 45 insertions, 4 deletions
diff --git a/Frameworks/MCPKit/MCPFoundationKit/MCPStreamingResult.h b/Frameworks/MCPKit/MCPFoundationKit/MCPStreamingResult.h
index 38e59d1d..2f5ec638 100644
--- a/Frameworks/MCPKit/MCPFoundationKit/MCPStreamingResult.h
+++ b/Frameworks/MCPKit/MCPFoundationKit/MCPStreamingResult.h
@@ -51,6 +51,8 @@ typedef struct SP_MYSQL_ROWS {
unsigned long downloadedRowCount;
unsigned long processedRowCount;
unsigned long freedRowCount;
+ pthread_mutex_t dataCreationLock;
+ pthread_mutex_t dataFreeLock;
}
- (id)initWithMySQLPtr:(MYSQL *)mySQLPtr encoding:(NSStringEncoding)theEncoding timeZone:(NSTimeZone *)theTimeZone connection:(MCPConnection *)theConnection;
diff --git a/Frameworks/MCPKit/MCPFoundationKit/MCPStreamingResult.m b/Frameworks/MCPKit/MCPFoundationKit/MCPStreamingResult.m
index 84120505..4093b2a4 100644
--- a/Frameworks/MCPKit/MCPFoundationKit/MCPStreamingResult.m
+++ b/Frameworks/MCPKit/MCPFoundationKit/MCPStreamingResult.m
@@ -108,6 +108,8 @@
downloadedRowCount = 0;
processedRowCount = 0;
freedRowCount = 0;
+ pthread_mutex_init(&dataCreationLock, NULL);
+ pthread_mutex_init(&dataFreeLock, NULL);
// Start the data download thread
[NSThread detachNewThreadSelector:@selector(_downloadAllData) toTarget:self withObject:nil];
@@ -127,6 +129,11 @@
{
if (!connectionUnlocked) [parentConnection unlockConnection];
+ if (!fullyStreaming) {
+ pthread_mutex_destroy(&dataFreeLock);
+ pthread_mutex_destroy(&dataCreationLock);
+ }
+
[super dealloc];
}
@@ -160,18 +167,22 @@
} else {
copiedDataLength = 0;
+ // Lock the data mutex
+ pthread_mutex_lock(&dataCreationLock);
+
// Check to see whether we need to wait for the data to be availabe
// - if so, wait 1ms before checking again.
- // Keep the data processing thread at a number of rows behind the download
- // thread - this aids memory issues across the threads and prevents occasional
- // race condition crashes.
- while (!dataDownloaded && (processedRowCount + 10 > downloadedRowCount)) {
+ while (!dataDownloaded && processedRowCount == downloadedRowCount) {
+ pthread_mutex_unlock(&dataCreationLock);
usleep(1000);
+ pthread_mutex_lock(&dataCreationLock);
}
// If all rows have been processed, we're at the end of the result set - return nil
// once all memory has been freed
if (processedRowCount == downloadedRowCount) {
+ pthread_mutex_unlock(&dataCreationLock);
+
while (!dataFreed) usleep(1000);
// Update the connection's error statuses in case of error during content download
@@ -186,6 +197,9 @@
// Retrieve a reference to the data and the associated lengths
theRowData = currentDataStoreEntry->data;
fieldLengths = currentDataStoreEntry->dataLengths;
+
+ // Unlock the data mutex
+ pthread_mutex_unlock(&dataCreationLock);
}
// Initialise the array to return
@@ -297,11 +311,19 @@
// If in cached-streaming mode, update the current entry processed count
if (!fullyStreaming) {
+ // Lock both mutexes
+ pthread_mutex_lock(&dataCreationLock);
+ pthread_mutex_lock(&dataFreeLock);
+
// Update the active-data pointer to the next item in the list, or set to NULL if no more items
currentDataStoreEntry = currentDataStoreEntry->nextRow;
// Increment counter
processedRowCount++;
+
+ // Unlock both mutexes
+ pthread_mutex_unlock(&dataCreationLock);
+ pthread_mutex_unlock(&dataFreeLock);
}
return returnArray;
@@ -393,6 +415,9 @@
for (i = 0; i < mNumOfFields; i++)
rowDataLength += fieldLengths[i];
+ // Lock the data mutex
+ pthread_mutex_lock(&dataCreationLock);
+
// Initialise memory for the row and set a NULL pointer for the next item
newRowStore = malloc(sizeOfLocalRowData);
newRowStore->nextRow = NULL;
@@ -412,6 +437,9 @@
// Set up and copy in the field lengths
newRowStore->dataLengths = memcpy(malloc(sizeOfDataLengths), fieldLengths, sizeOfDataLengths);
+
+ // Lock the data free mutex
+ pthread_mutex_lock(&dataFreeLock);
// Add the newly allocated row to end of the storage linked list
if (localDataStore) {
@@ -424,6 +452,10 @@
// Update the downloaded row count
downloadedRowCount++;
+
+ // Unlock both mutexes
+ pthread_mutex_unlock(&dataCreationLock);
+ pthread_mutex_unlock(&dataFreeLock);
}
dataDownloaded = YES;
@@ -440,8 +472,12 @@
while (!dataDownloaded || freedRowCount != downloadedRowCount) {
+ // Lock the data free mutex
+ pthread_mutex_lock(&dataFreeLock);
+
// If the freed row count matches the processed row count, wait before retrying
if (freedRowCount == processedRowCount) {
+ pthread_mutex_unlock(&dataFreeLock);
usleep(1000);
continue;
}
@@ -459,6 +495,9 @@
// Increment the counter
freedRowCount++;
+
+ // Unlock the data free mutex
+ pthread_mutex_unlock(&dataFreeLock);
}
dataFreed = YES;