//
//  $Id$
//
//  MCPStreamingResult.m
//  sequel-pro
//
//  Created by Rowan Beentje on Aug 16, 2009
//  Copyright 2009 Rowan Beentje. All rights reserved.
//
//  This program is free software; you can redistribute it and/or modify
//  it under the terms of the GNU General Public License as published by
//  the Free Software Foundation; either version 2 of the License, or
//  (at your option) any later version.
//
//  This program is distributed in the hope that it will be useful,
//  but WITHOUT ANY WARRANTY; without even the implied warranty of
//  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
//  GNU General Public License for more details.
//
//  You should have received a copy of the GNU General Public License
//  along with this program; if not, write to the Free Software
//  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
//
//  More info at <http://code.google.com/p/sequel-pro/>

#import "MCPStreamingResult.h"
#import "MCPConnection.h"
#import "MCPNull.h"
#import "MCPNumber.h"
#import "MCPGeometryData.h"

/**
 * IMPORTANT NOTE
 *
 * MCPStreamingResult can operate in two modes.  The default mode is a safe implementation,
 * which operates in a multithreaded fashion - a worker thread is set up to download the results as
 * fast as possible in the background, while the results are made available via a blocking (and so
 * single-thread-compatible) fetchNextRowAsArray call.  This provides the benefit of allowing a progress
 * bar to be shown during downloads, and threaded processing, but still has reasonable memory usage for
 * the downloaded result - and won't block the server.
 * Alternatively, withFullStreaming: can be set to YES, in which case each row will be accessed on-demand;
 * this can be dangerous as it means a SELECT will tie up the server for longer, as for MyISAM tables
 * updates (and subsequent reads) must block while a SELECT is still running.  However this can be useful
 * for certain processes such as working with very large tables to keep memory usage low.
 */


@interface MCPStreamingResult (PrivateAPI)

void _bytes2bin(Byte *n, NSUInteger nbytes, NSUInteger len, char *buf);

- (void) _downloadAllData;
- (void) _freeAllDataWhenDone;

@end

@implementation MCPStreamingResult : MCPResult

#pragma mark -
#pragma mark Setup and teardown

/**
 * Initialise a MCPStreamingResult in the same way as MCPResult - as used
 * internally by the MCPConnection !{queryString:} method.
 */
- (id)initWithMySQLPtr:(MYSQL *)mySQLPtr encoding:(NSStringEncoding)theEncoding timeZone:(NSTimeZone *)theTimeZone connection:(MCPConnection *)theConnection
{
	return [self initWithMySQLPtr:mySQLPtr encoding:theEncoding timeZone:theTimeZone connection:theConnection withFullStreaming:NO];
}

/**
 * Master initialisation method, allowing selection of either full streaming or safe streaming
 * (see "important note" above)
 */
- (id)initWithMySQLPtr:(MYSQL *)mySQLPtr encoding:(NSStringEncoding)theEncoding timeZone:(NSTimeZone *)theTimeZone connection:(MCPConnection *)theConnection withFullStreaming:(BOOL)useFullStreaming
{
	if ((self = [super init])) {
		mEncoding = theEncoding;
		mTimeZone = [theTimeZone retain];
		parentConnection = theConnection;
		fullyStreaming = useFullStreaming;
		connectionUnlocked = NO;

		if (mResult) {
			mysql_free_result(mResult);
			mResult = NULL;
		}

		if (mNames) {
			[mNames release];
			mNames = nil;
		}

		mResult = mysql_use_result(mySQLPtr);

		if (mResult) {
			mNumOfFields = mysql_num_fields(mResult);
			fieldDefinitions = mysql_fetch_fields(mResult);
		} else {
			mNumOfFields = 0;
		}

		// Obtain SEL references and pointer
		isConnectedSEL = @selector(isConnected);
		isConnectedPtr = [parentConnection methodForSelector:isConnectedSEL];

		// If the result is opened in download-data-fast safe mode, set up the additional variables
		// and threads required.
		if (!fullyStreaming) {
			dataDownloaded = NO;
			dataFreed = NO;
			localDataStore = NULL;
			currentDataStoreEntry = NULL;
			localDataStoreLastEntry = NULL;
			localDataRows = 0;
			localDataAllocated = 0;
			downloadedRowCount = 0;
			processedRowCount = 0;
			freedRowCount = 0;
			pthread_mutex_init(&dataCreationLock, NULL);
			pthread_mutex_init(&dataFreeLock, NULL);

			// Start the data download thread
			[NSThread detachNewThreadSelector:@selector(_downloadAllData) toTarget:self withObject:nil];

			// Start the data freeing thread
			[NSThread detachNewThreadSelector:@selector(_freeAllDataWhenDone) toTarget:self withObject:nil];
		}
	}

	return self;
}

/**
 * Deallocate the result and unlock the parent connection for further use
 */
- (void) dealloc
{
	[self cancelResultLoad]; //this should close the connection if it is still open

    if (!connectionUnlocked) {
        //this should NEVER happen
        NSLog(@"MCPStreamingResult: The connection has not been unlocked.");
        [parentConnection unlockConnection];
    }

	if (!fullyStreaming) {
		pthread_mutex_destroy(&dataFreeLock);
		pthread_mutex_destroy(&dataCreationLock);
	}

	[super dealloc];
}

#pragma mark -
#pragma mark Results fetching

/**
 * Retrieve the next row of the result as an array.  Should be called in a loop
 * until nil is returned to ensure all the results have been retrieved.
 */
- (NSArray *)fetchNextRowAsArray
{
	MYSQL_ROW theRow;
	char *theRowData, *buf;
	unsigned long *fieldLengths;
	NSInteger i, copiedDataLength;
	NSMutableArray *returnArray;

	// Retrieve the next row according to the mode this result set is in.
	// If fully streaming, retrieve the MYSQL_ROW
	if (fullyStreaming) {
		theRow = mysql_fetch_row(mResult);

		// If no data was returned, we're at the end of the result set - unlock the connection and return nil
		if (theRow == NULL) {
            if (!connectionUnlocked) {
                [parentConnection unlockConnection];
                connectionUnlocked = YES;
            }
            return nil;
        }

		// Retrieve the lengths of the returned data
		fieldLengths = mysql_fetch_lengths(mResult);

	// If in cached-streaming/fast download mode, get a reference to the data for the current row
	} else {
		copiedDataLength = 0;

		// Lock the data mutex
		pthread_mutex_lock(&dataCreationLock);

		// Check to see whether we need to wait for the data to be availabe
		// - if so, wait 1ms before checking again.
		while (!dataDownloaded && processedRowCount == downloadedRowCount) {
			pthread_mutex_unlock(&dataCreationLock);
			usleep(1000);
			pthread_mutex_lock(&dataCreationLock);
		}

		// If all rows have been processed, we're at the end of the result set - return nil
		// once all memory has been freed
		if (processedRowCount == downloadedRowCount) {
			pthread_mutex_unlock(&dataCreationLock);

			while (!dataFreed) usleep(1000);

			// Update the connection's error statuses in case of error during content download
			[parentConnection updateErrorStatuses];

			return nil;
		}

		// Retrieve a reference to the data and the associated lengths
		theRowData = currentDataStoreEntry->data;
		fieldLengths = currentDataStoreEntry->dataLengths;

		// Unlock the data mutex
		pthread_mutex_unlock(&dataCreationLock);
	}

	// Initialise the array to return
	returnArray = [NSMutableArray arrayWithCapacity:mNumOfFields];
	for (i = 0; i < mNumOfFields; i++) {
		id cellData = nil;
		char *theData = NULL;

		// In fully streaming mode, copy across the data for the MYSQL_ROW
		if (fullyStreaming) {
			if (theRow[i] == NULL) {
				cellData = [NSNull null];
			} else {
				theData = calloc(sizeof(char), fieldLengths[i]+1);
				memcpy(theData, theRow[i], fieldLengths[i]);
				theData[fieldLengths[i]] = '\0';
			}

		// In cached-streaming mode, use a reference to the downloaded data
		} else {
			if (fieldLengths[i] == NSNotFound) {
				cellData = [NSNull null];
			} else {
				theData = theRowData+copiedDataLength;
				copiedDataLength += fieldLengths[i] + 1;
			}
		}

		// If the data hasn't already been detected as NULL - in which case it will have been
		// set to NSNull - process the data by type

		if (cellData == nil) {
			switch (fieldDefinitions[i].type) {
				case FIELD_TYPE_TINY:
				case FIELD_TYPE_SHORT:
				case FIELD_TYPE_INT24:
				case FIELD_TYPE_LONG:
				case FIELD_TYPE_LONGLONG:
				case FIELD_TYPE_DECIMAL:
				case FIELD_TYPE_NEWDECIMAL:
				case FIELD_TYPE_FLOAT:
				case FIELD_TYPE_DOUBLE:
				case FIELD_TYPE_TIMESTAMP:
				case FIELD_TYPE_DATE:
				case FIELD_TYPE_TIME:
				case FIELD_TYPE_DATETIME:
				case FIELD_TYPE_YEAR:
				case FIELD_TYPE_VAR_STRING:
				case FIELD_TYPE_STRING:
				case FIELD_TYPE_SET:
				case FIELD_TYPE_ENUM:
				case FIELD_TYPE_NEWDATE: // Don't know what the format for this type is...

					// For fields of type BINARY/VARBINARY, return the data. Also add an extra check to make
					// sure it's binary data (seems that it's returned as type STRING) to get around a MySQL
					// bug (#28214) returning DATE fields with the binary flag set.
					if ((fieldDefinitions[i].flags & BINARY_FLAG) &&
						(fieldDefinitions[i].type == FIELD_TYPE_STRING || fieldDefinitions[i].type == FIELD_TYPE_VAR_STRING))
					{
						cellData = [NSData dataWithBytes:theData length:fieldLengths[i]];
					}
					// For string data, convert to text
					else {
						cellData = [NSString stringWithCString:theData encoding:mEncoding];
					}

					break;

				case FIELD_TYPE_BIT:
					// Get a binary representation of the data

					buf = malloc(fieldDefinitions[i].length + 1);
					_bytes2bin(theData, fieldLengths[i], fieldDefinitions[i].length, buf);

					cellData = (theData != NULL) ? [NSString stringWithUTF8String:buf] : @"";

					free(buf);
					break;

				case FIELD_TYPE_TINY_BLOB:
				case FIELD_TYPE_BLOB:
				case FIELD_TYPE_MEDIUM_BLOB:
				case FIELD_TYPE_LONG_BLOB:

					// For binary data, return the data if force-return-as-string is not enabled
					if ((fieldDefinitions[i].flags & BINARY_FLAG) && !mReturnDataAsStrings) {
						cellData = [NSData dataWithBytes:theData length:fieldLengths[i]];
					}
					else {
						cellData = [[NSString alloc] initWithBytes:theData length:fieldLengths[i] encoding:mEncoding];

						if (cellData) [cellData autorelease];
					}

					break;

				case FIELD_TYPE_NULL:
					cellData = [NSNull null];
					break;

				case FIELD_TYPE_GEOMETRY:
					cellData = [MCPGeometryData dataWithBytes:theData length:fieldLengths[i]];
					break;

				default:
					NSLog(@"in fetchNextRowAsArray : Unknown type : %ld for column %ld, sending back a NSData object", (NSInteger)fieldDefinitions[i].type, (NSInteger)i);
					cellData = [NSData dataWithBytes:theData length:fieldLengths[i]];
				break;
			}

			// Free the data if it was originally allocated
			if (fullyStreaming) free(theData);

			// If a creator returned a nil object, replace with NSNull
			if (cellData == nil) cellData = [NSNull null];
		}

		[returnArray insertObject:cellData atIndex:i];
	}

	// If in cached-streaming mode, update the current entry processed count
	if (!fullyStreaming) {

		// Lock both mutexes
		pthread_mutex_lock(&dataCreationLock);
		pthread_mutex_lock(&dataFreeLock);

		// Update the active-data pointer to the next item in the list, or set to NULL if no more items
		currentDataStoreEntry = currentDataStoreEntry->nextRow;

		// Increment counter
		processedRowCount++;

		// Unlock both mutexes
		pthread_mutex_unlock(&dataCreationLock);
		pthread_mutex_unlock(&dataFreeLock);
	}

	return returnArray;
}

/*
 * Ensure the result set is fully processed and freed without any processing
 * This method ensures that the connection is unlocked.
 */
- (void) cancelResultLoad
{
	MYSQL_ROW theRow;

	// Loop through all the rows and ensure the rows are fetched.
	// If fully streaming, loop through the rows directly
	if (fullyStreaming) {
		while (1) {
			theRow = mysql_fetch_row(mResult);

			// If no data was returned, we're at the end of the result set - return.
			if (theRow == NULL) {
                if (!connectionUnlocked) {
                    [parentConnection unlockConnection];
                    connectionUnlocked = YES;
                }
                return;
            }
		}

	// If in cached-streaming/fast download mode, loop until all data is fetched and freed
	} else {

		while (1) {

			// Check to see whether we need to wait for the data to be available
			// - if so, wait 1ms before checking again
			while (!dataDownloaded && processedRowCount == downloadedRowCount) usleep(1000);

			// If all rows have been processed, we're at the end of the result set - return
			// once all memory has been freed
			if (processedRowCount == downloadedRowCount) {
				while (!dataFreed) usleep(1000);
                // we don't need to unlock the connection because
                // the data loading thread already did that
				return;
			}
			processedRowCount++;
		}
	}
}

#pragma mark -
#pragma mark Overrides for safety

/**
 * If numOfRows is used before the data is fully downloaded, -1 will be returned;
 * otherwise the number of rows is returned.
 */
- (my_ulonglong)numOfRows
{
	if (!dataDownloaded) return -1;

	return downloadedRowCount;
}

- (void)dataSeek:(my_ulonglong) row
{
	NSLog(@"dataSeek cannot be used with streaming results");
}

@end

@implementation MCPStreamingResult (PrivateAPI)

/**
 * Provides a binary representation of the supplied chars (n) in the supplied buffer (buf). The resulting
 * binary representation will be zero-padded according to the supplied field length (len).
 */
void _bytes2bin(Byte *n, NSUInteger nbytes, NSUInteger len, char *buf)
{

	int i = 0;
	nbytes--;
	while (i < len)
		buf[len - ++i] = ( (n[nbytes - (i >> 3)] >> (i & 0x7)) & 1 ) ? '1' : '0';

	buf[len] = '\0';
}

/**
 * Used internally to download results in a background thread
 */
- (void)_downloadAllData
{
	NSAutoreleasePool *downloadPool = [[NSAutoreleasePool alloc] init];
	MYSQL_ROW theRow;
	unsigned long *fieldLengths;
	NSInteger i, dataCopiedLength, rowDataLength;
	LOCAL_ROW_DATA *newRowStore;

	size_t sizeOfLocalRowData = sizeof(LOCAL_ROW_DATA);
	size_t sizeOfDataLengths = (size_t)(sizeof(unsigned long) * mNumOfFields);

	// Loop through the rows until the end of the data is reached - indicated via a NULL
	while (	(BOOL)(*isConnectedPtr)(parentConnection, isConnectedSEL) && (theRow = mysql_fetch_row(mResult))) {

		// Retrieve the lengths of the returned data
		fieldLengths = mysql_fetch_lengths(mResult);
		rowDataLength = 0;
		dataCopiedLength = 0;
		for (i = 0; i < mNumOfFields; i++)
			rowDataLength += fieldLengths[i];

		// Initialise memory for the row and set a NULL pointer for the next item
		newRowStore = malloc(sizeOfLocalRowData);
		newRowStore->nextRow = NULL;

		// Set up the row data store - a char* - and copy in the data if there is any,
		// using a null terminator for each field boundary for easier data processing later
		newRowStore->data = malloc(sizeof(char) * (rowDataLength + mNumOfFields));
		for (i = 0; i < mNumOfFields; i++) {
			if (theRow[i] != NULL) {
				memcpy(newRowStore->data+dataCopiedLength, theRow[i], fieldLengths[i]);
				newRowStore->data[dataCopiedLength+fieldLengths[i]] = '\0';
				dataCopiedLength += fieldLengths[i] + 1;
			} else {
				fieldLengths[i] = NSNotFound;
			}
		}

		// Set up and copy in the field lengths
		newRowStore->dataLengths = memcpy(malloc(sizeOfDataLengths), fieldLengths, sizeOfDataLengths);

		// Lock both mutexes
		pthread_mutex_lock(&dataCreationLock);
		pthread_mutex_lock(&dataFreeLock);

		// Add the newly allocated row to end of the storage linked list
		if (localDataStore) {
			localDataStoreLastEntry->nextRow = newRowStore;
		} else {
			localDataStore = newRowStore;
		}
		localDataStoreLastEntry = newRowStore;
		if (!currentDataStoreEntry) currentDataStoreEntry = newRowStore;

		// Update the downloaded row count
		downloadedRowCount++;

		// Unlock both mutexes
		pthread_mutex_unlock(&dataCreationLock);
		pthread_mutex_unlock(&dataFreeLock);
	}

	// Unlock the parent connection now data has been retrieved
    [parentConnection unlockConnection];
    connectionUnlocked = YES;

	dataDownloaded = YES;
	[downloadPool drain];
}

/**
 * Used internally to free data which has been fully processed; done in a thread to allow
 * fetchNextRowAsArray to be faster.
 */
- (void) _freeAllDataWhenDone
{
	NSAutoreleasePool *dataFreeingPool = [[NSAutoreleasePool alloc] init];

	while (!dataDownloaded || freedRowCount != downloadedRowCount) {

		// Lock the data free mutex
		pthread_mutex_lock(&dataFreeLock);

		// If the freed row count matches the processed row count, wait before retrying
		if (freedRowCount == processedRowCount) {
			pthread_mutex_unlock(&dataFreeLock);
			usleep(1000);
			continue;
		}

		// Free a single item off the bottom of the list
		// Update the data pointer to the next item in the list, or set to NULL if no more items
		LOCAL_ROW_DATA *rowToRemove = localDataStore;
		localDataStore = localDataStore->nextRow;

		// Free memory for the first row
		rowToRemove->nextRow = NULL;
		free(rowToRemove->dataLengths);
		if (rowToRemove->data != NULL) free(rowToRemove->data);
		free(rowToRemove);

		// Increment the counter
		freedRowCount++;

		// Unlock the data free mutex
		pthread_mutex_unlock(&dataFreeLock);
	}

	dataFreed = YES;
	[dataFreeingPool drain];
}

@end