From ebf1280e432d51f47238ce8df86750ba3a7d6d1f Mon Sep 17 00:00:00 2001
From: Benjamin Auder <benjamin.auder@somewhere>
Date: Wed, 11 Jan 2017 01:47:23 +0100
Subject: [PATCH] finalize R 'wrapper' for stage 1; WARNING: changed series
 encoding. TODO: test

---
 code/stage1/src/TimeSeries/deserialize.c | 19 +++---
 code/stage1/src/TimeSeries/serialize.c   | 80 +++++++++---------------
 code/stage1/src/Util/types.h             |  2 +-
 code/stage1/src/Util/utils.c             | 49 ++++++++++-----
 code/stage1/wrapper.R                    | 18 ++++--
 5 files changed, 86 insertions(+), 82 deletions(-)

diff --git a/code/stage1/src/TimeSeries/deserialize.c b/code/stage1/src/TimeSeries/deserialize.c
index 5d1d758..79f7843 100644
--- a/code/stage1/src/TimeSeries/deserialize.c
+++ b/code/stage1/src/TimeSeries/deserialize.c
@@ -10,7 +10,7 @@ PowerCurve* deserialize(const char* ifileName, const char* ofileName,
 	// Read tsLength at the beginning of the file
 	uint32_t tsLength = get_tsLength(ifileName);
 	
-	uint32_t valuesPerSerie = (tsLength - 4) / 3; //remove 4 bytes of ID
+	uint32_t valuesPerSerie = (tsLength - 4) / 4; //remove 4 bytes of ID
 
 	FILE* ifile = fopen(ifileName, "rb");
 	FILE* ofile = NULL;
@@ -22,7 +22,7 @@ PowerCurve* deserialize(const char* ifileName, const char* ofileName,
 		nbRanks = get_nbSeries(ifileName);
 		ranks = NULL;
 	}
-	
+
 	PowerCurve* powerCurves = NULL;
 	if (!ofile)
 		powerCurves = (PowerCurve*) malloc(nbRanks * sizeof(PowerCurve));
@@ -52,22 +52,21 @@ PowerCurve* deserialize(const char* ifileName, const char* ofileName,
 		else
 			powerCurve->ID = ID;
 
-		// translate 3-bytes binary integers into Real
-		Byte* binarySerie = (Byte*) malloc(3 * valuesPerSerie);
-		lengthRead = fread(binarySerie, 1, 3*valuesPerSerie, ifile);
-		if (lengthRead != 3*valuesPerSerie)
-			fprintf(stderr,"Warning: deserializing truncated binary file.\n");
+		// translate 4-bytes binary integers into Real
+		Byte* binarySerie = (Byte*) malloc(4 * valuesPerSerie);
+		lengthRead = fread(binarySerie, 1, 4*valuesPerSerie, ifile);
+		//TODO: assert that lengthRead == 4*valuesPerSerie (...)
 		for (uint32_t i = 0; i < valuesPerSerie; i++)
 		{
-			uint32_t powerInt = bInt_to_uint(binarySerie + 3 * i, 3);
+			float power = bReal_to_float(binarySerie + 4 * i);
 			if (ofile)
 			{
-				fprintf(ofile, "%g", powerInt / 10.0 - 0.0);
+				fprintf(ofile, "%g", power);
 				if (i < valuesPerSerie-1)
 					fprintf(ofile, ",");
 			}
 			else
-				powerCurve->values[i] = powerInt / 10.0 - 0.0;
+				powerCurve->values[i] = power;
 		}
 		free(binarySerie);
 		if (ofile)
diff --git a/code/stage1/src/TimeSeries/serialize.c b/code/stage1/src/TimeSeries/serialize.c
index 88b15f1..3caa371 100644
--- a/code/stage1/src/TimeSeries/serialize.c
+++ b/code/stage1/src/TimeSeries/serialize.c
@@ -8,7 +8,7 @@
 #include <string.h>
 
 // parse a line into two integers (ID, raw power)
-static void scan_line(FILE* ifile, int posID, uint32_t* ID, int posPower, uint32_t* rawPower)
+static void scan_line(FILE* ifile, int posID, uint32_t* ID, int posPower, float* rawPower)
 {
 	char nextChar;
 	int position = 1;
@@ -22,16 +22,14 @@ static void scan_line(FILE* ifile, int posID, uint32_t* ID, int posPower, uint32
 		}
 		else if (position == posPower)
 		{
-			Real untruncatedPower;
-			nextChar = readReal(ifile, &untruncatedPower);
-			if (untruncatedPower < 0.0)
-				untruncatedPower = 0.0;
-			*rawPower = (uint32_t) floor(untruncatedPower*10.0);
+			Real power;
+			nextChar = readReal(ifile, &power);
+			*rawPower = (float) power;
 		}
 		else
 			//erase the comma (and skip field then)
 			nextChar = fgetc(ifile);
-		
+
 		//continue until next comma (or line end or file end)
 		while (!feof(ifile) && nextChar != '\n' && nextChar != '\r' && nextChar != ',')
 			nextChar = fgetc(ifile);
@@ -99,7 +97,8 @@ void serialize_byCols(const char* ifileName, const char* ofileName, uint32_t nbI
 	free(headerString);
 	
 	//estimate tsLength with a scan of the 3 first series
-	uint32_t ID=0, rawPower=0, lastID=0, refTsLength=0;
+	uint32_t ID=0, lastID=0, refTsLength=0;
+	float rawPower = 0.0;
 	scan_line(ifile, posID, &ID, posPower, &rawPower);
 	//'sl' = sample lengths (short because a lot of comparisons then)
 	uint32_t* sl = (uint32_t*) calloc(3, sizeof(uint32_t));
@@ -108,7 +107,7 @@ void serialize_byCols(const char* ifileName, const char* ofileName, uint32_t nbI
 		lastID = ID;
 		while (ID == lastID)
 		{
-			sl[i]++;	
+			sl[i]++;
 			scan_line(ifile, posID, &ID, posPower, &rawPower);
 		}
 	}
@@ -124,9 +123,7 @@ void serialize_byCols(const char* ifileName, const char* ofileName, uint32_t nbI
 
 	// output file to write time-series sequentially, binary format.
 	// Format: for each series, ID is stored on 4 bytes (unsigned integer32). Then,
-	//         (<rawPower>)+ follow, with rawPower stored as a "3 bytes int"
-	//         rawPower values are multiplied by 10 and truncated one digit after 0
-	// NOTE: no raw power should be exactly zero
+	//         (<rawPower>)+ follow, with rawPower stored as a float
 	FILE* ofile = fopen(ofileName, "wb");
 
 	// leave space to write the number of series (32bits), and their length in bytes (32bits)
@@ -136,9 +133,8 @@ void serialize_byCols(const char* ifileName, const char* ofileName, uint32_t nbI
 	// process one client (ID in first column) at a time
 	uint64_t processedLines = 0; //execution trace
 	uint32_t seriesCount=0, skippedSeriesCount=0, tsLength=0;
-	uint32_t mismatchLengthCount=0, overflowCount=0;
-	Byte tsBuffer[4+3*refTsLength];
-	int overflow = 0;
+	uint32_t mismatchLengthCount=0;
+	Byte tsBuffer[4+4*refTsLength];
 	lastID = 0;
 	while (!feof(ifile))
 	{
@@ -157,10 +153,10 @@ void serialize_byCols(const char* ifileName, const char* ofileName, uint32_t nbI
 			//just starting a new time-series: must process the last one (if there is a last one !)
 			if (lastID > 0)
 			{
-				if (tsLength == refTsLength && !overflow)
+				if (tsLength == refTsLength)
 				{
 					seriesCount++;
-					fwrite(tsBuffer, 4+3*tsLength, 1, ofile);
+					fwrite(tsBuffer, 4+4*tsLength, 1, ofile);
 					if (nbItems > 0 && seriesCount >= nbItems)
 						break;
 				}
@@ -170,62 +166,56 @@ void serialize_byCols(const char* ifileName, const char* ofileName, uint32_t nbI
 					skippedSeriesCount++;
 					if (tsLength != refTsLength)
 						mismatchLengthCount++;
-					if (overflow)
-						overflowCount++;
 				}
 			}
-			
+
 			// ID for the new series is printed only once:
-			write_int(ID, 4, tsBuffer);
+			write_int(ID, tsBuffer);
 			// reinitialize flags
-			overflow = 0;
 			tsLength = 0;
 			lastID = ID;
 		}
 
-		overflow = (overflow || (rawPower >= (1 << 24)));
 		//We cannot write more than refTsLength bytes
 		if (tsLength < refTsLength)
-			write_int(rawPower, 3, tsBuffer + 4+3*tsLength);
+			write_real(rawPower, tsBuffer + 4+4*tsLength);
 		tsLength++;
-		
+
 		if ((++processedLines) % 1000000 == 0)
 			fprintf(stdout,"Processed %"PRIu64" lines\n", processedLines);
 	}
 
-	if (!overflow && tsLength == refTsLength && (nbItems <= 0 || seriesCount < nbItems))
+	if (tsLength == refTsLength && (nbItems <= 0 || seriesCount < nbItems))
 	{
 		// flush last time-series if all conditions are met
-		fwrite(tsBuffer, 4+3*tsLength, 1, ofile);
+		fwrite(tsBuffer, 4+4*tsLength, 1, ofile);
 		seriesCount++;
 	}
 	else if (nbItems <= 0 || seriesCount < nbItems)
 	{
 		if (tsLength != refTsLength)
 			mismatchLengthCount++;
-		if (overflow)
-			overflowCount++;
 	}
 
 	// write lines count and size of a time-series in bytes
 	Byte intBuffer[4];
 	fseek(ofile, 0, SEEK_SET);
-	write_int(seriesCount, 4, intBuffer);
+	write_int(seriesCount, intBuffer);
 	fwrite(intBuffer, 1, 4, ofile);
 	// re-express tsLength in bytes (not forgetting the ID))
-	write_int(4 + 3 * refTsLength, 4, intBuffer);
+	write_int(4 + 4 * refTsLength, intBuffer);
 	fwrite(intBuffer, 1, 4, ofile);
 
 	// finally print some statistics
 	if (seriesCount < nbItems)
 		fprintf(stdout,"Warning: only %u series retrieved.\n",seriesCount);
-	fprintf(stdout,"%u overflows / %u mismatch series lengths.\n",overflowCount,mismatchLengthCount);
-	
+	fprintf(stdout,"%u mismatch series lengths.\n",mismatchLengthCount);
+
 	fclose(ifile);
 	fclose(ofile);
 }
 
-//serialize from usual 'by-row' data (for StarLight example and toy dataset)
+//serialize from usual 'by-row' data
 void serialize_byRows(const char* ifileName, const char* ofileName, uint32_t nbItems)
 {
 	FILE* ifile = fopen(ifileName, "r");
@@ -239,6 +229,7 @@ void serialize_byRows(const char* ifileName, const char* ofileName, uint32_t nbI
 		{
 			nbValues++;
 			//skip potential consecutive commas (could be hard to spot)
+			//TODO(...): should be 'NA' in R, thus an error (we don't handle NAs)...
 			while (curChar == ',')
 				curChar = fgetc(ifile);
 			ungetc(curChar, ifile);
@@ -256,7 +247,7 @@ void serialize_byRows(const char* ifileName, const char* ofileName, uint32_t nbI
 	fseek(ifile, 0, SEEK_SET);
 
 	//write meta info first
-	uint32_t tsLength = 3*nbValues+4;
+	uint32_t tsLength = 4*nbValues+4;
 	FILE* ofile = fopen(ofileName, "wb");
 	Byte intBuffer[4];
 	write_int(nbSeries, 4, intBuffer);
@@ -265,10 +256,7 @@ void serialize_byRows(const char* ifileName, const char* ofileName, uint32_t nbI
 	fwrite(intBuffer, 1, 4, ofile);
 	Real rawPower;
 	int64_t ID;
-	
-	//DEBUG / TEST (ugly, TOFIX...)
-	double minrp = INFINITY, maxrp = -INFINITY;
-	
+
 	for (uint32_t i=0; i<nbSeries; i++)
 	{
 		//skip potential line feeds before next line
@@ -284,15 +272,8 @@ void serialize_byRows(const char* ifileName, const char* ofileName, uint32_t nbI
 		for (uint32_t j=0; j<nbValues; j++)
 		{
 			curChar = readReal(ifile, &rawPower);
-			
-			//DEBUG / TEST (ugly, TOFIX...)
-			if (rawPower < minrp)
-				minrp = rawPower;
-			if (rawPower > maxrp)
-				maxrp = rawPower;
-			
-			write_int((uint32_t)floor(10.0*(rawPower+0.0)), 3, intBuffer); //x10... +3...
-			fwrite(intBuffer, 1, 3, ofile);
+			write_real(rawPower, intBuffer);
+			fwrite(intBuffer, 1, 4, ofile);
 			while (curChar == ',')
 				curChar = fgetc(ifile);
 			ungetc(curChar, ifile);
@@ -300,7 +281,4 @@ void serialize_byRows(const char* ifileName, const char* ofileName, uint32_t nbI
 	}
 	fclose(ifile);
 	fclose(ofile);
-	
-	//DEBUG / TEST (ugly, TOFIX...)
-	printf("min / max values = %g %g\n",minrp,maxrp);
 }
diff --git a/code/stage1/src/Util/types.h b/code/stage1/src/Util/types.h
index eaab1b0..824be88 100644
--- a/code/stage1/src/Util/types.h
+++ b/code/stage1/src/Util/types.h
@@ -12,7 +12,7 @@
 
 typedef unsigned char Byte;
 
-typedef double Real;
+typedef float Real;
 
 // Type to describe a job to be done in a node
 //TODO: merge with packed version to avoid extra copy by MPI
diff --git a/code/stage1/src/Util/utils.c b/code/stage1/src/Util/utils.c
index 9713e30..efd4dd2 100644
--- a/code/stage1/src/Util/utils.c
+++ b/code/stage1/src/Util/utils.c
@@ -58,24 +58,45 @@ char readReal(FILE* stream, Real* real)
 }
 
 // convert n-bytes binary integers to uint32_t
-uint32_t bInt_to_uint(Byte* pInteger, size_t bytesCount)
+uint32_t bInt_to_uint(Byte* pInteger)
 {
-	uint32_t integer = 0;
-	for (size_t i = 0; i < bytesCount; i++)
-		integer += ((uint32_t) (pInteger[i])) << (i << 3);
-	return integer;
+	uint32_t res;
+	memcpy(&res, pInteger, 4);
+	return res;
 }
 
-// serialize integers with a portable bytes order
-void write_int(uint32_t integer, size_t bytesCount, Byte* buffer)
+// serialize integers
+void write_int(uint32_t x, Byte* buffer)
 {
-	Byte chunk;
-	// write from left to right, from least to most significative bit
-	for (size_t i = 0; i < bytesCount; i++)
-	{
-		chunk = (integer >> (i << 3)) & 0xFF;
-		buffer[i] = chunk;
-	}
+	union {
+		uint32_t i;
+		char bytes[4];
+	} u;
+	u.i = x;
+	for (size_t i = 0; i < 4; i++)
+		buffer[i] = u.bytes[i];
+}
+
+//WARNING: assuming float is 32bits...
+// convert 4-bytes binary float to float
+float bReal_to_double(Byte* pFloat)
+{
+	float res;
+	memcpy(&res, pFloat, 4);
+	return res;
+}
+
+//WARNING: assuming float is 32bits...
+// serialize double with a NON-portable bytes order
+void write_real(float x, Byte* buffer)
+{
+	union {
+		float d;
+		char bytes[4];
+	} u;
+	u.d = x;
+	for (size_t i = 0; i < 4; i++)
+		buffer[i] = u.bytes[i];
 }
 
 // Expected size of a Work message in bytes:
diff --git a/code/stage1/wrapper.R b/code/stage1/wrapper.R
index 2549f1f..9bbb53b 100644
--- a/code/stage1/wrapper.R
+++ b/code/stage1/wrapper.R
@@ -1,9 +1,15 @@
-ppam_exe = function(pathToExe, nbProcess, ...)
+ppam_exe = function(path=".", np=parallel::detectCores(), data=NULL, args="DontLetMeEmpty")
 {
-	command_line = paste("mpirun -np ",nbProcess," ",pathToExe,"/ppam.exe",sep="")
-	args = list(...)
-	for (i in 1:length(args))
-		command_line = paste(command_line, args[[i]])
-	print(paste("EXECUTE: '", command_line,"'",sep=""))
+	command_line = paste("mpirun -np ",np," ",path,"/ppam.exe",sep="")
+
+	#if data provided (as data.frame or matrix...): binarize it, and add it as first argument
+	if (!is.null(data))
+	{
+		write.csv(data, "/tmp/data_csv")
+		system(paste(path,"/ppam.exe serialize /tmp/data_csv /tmp/data_bin 0 0",sep=""))
+		command_line = paste(command_line," ","/tmp/data_bin",sep="")
+	}
+
+	command_line = paste(command_line," ",args,sep="")
 	system(command_line)
 }
-- 
2.44.0