From ebf1280e432d51f47238ce8df86750ba3a7d6d1f Mon Sep 17 00:00:00 2001 From: Benjamin Auder Date: Wed, 11 Jan 2017 01:47:23 +0100 Subject: [PATCH] finalize R 'wrapper' for stage 1; WARNING: changed series encoding. TODO: test --- code/stage1/src/TimeSeries/deserialize.c | 19 +++--- code/stage1/src/TimeSeries/serialize.c | 80 +++++++++--------------- code/stage1/src/Util/types.h | 2 +- code/stage1/src/Util/utils.c | 49 ++++++++++----- code/stage1/wrapper.R | 18 ++++-- 5 files changed, 86 insertions(+), 82 deletions(-) diff --git a/code/stage1/src/TimeSeries/deserialize.c b/code/stage1/src/TimeSeries/deserialize.c index 5d1d758..79f7843 100644 --- a/code/stage1/src/TimeSeries/deserialize.c +++ b/code/stage1/src/TimeSeries/deserialize.c @@ -10,7 +10,7 @@ PowerCurve* deserialize(const char* ifileName, const char* ofileName, // Read tsLength at the beginning of the file uint32_t tsLength = get_tsLength(ifileName); - uint32_t valuesPerSerie = (tsLength - 4) / 3; //remove 4 bytes of ID + uint32_t valuesPerSerie = (tsLength - 4) / 4; //remove 4 bytes of ID FILE* ifile = fopen(ifileName, "rb"); FILE* ofile = NULL; @@ -22,7 +22,7 @@ PowerCurve* deserialize(const char* ifileName, const char* ofileName, nbRanks = get_nbSeries(ifileName); ranks = NULL; } - + PowerCurve* powerCurves = NULL; if (!ofile) powerCurves = (PowerCurve*) malloc(nbRanks * sizeof(PowerCurve)); @@ -52,22 +52,21 @@ PowerCurve* deserialize(const char* ifileName, const char* ofileName, else powerCurve->ID = ID; - // translate 3-bytes binary integers into Real - Byte* binarySerie = (Byte*) malloc(3 * valuesPerSerie); - lengthRead = fread(binarySerie, 1, 3*valuesPerSerie, ifile); - if (lengthRead != 3*valuesPerSerie) - fprintf(stderr,"Warning: deserializing truncated binary file.\n"); + // translate 4-bytes binary integers into Real + Byte* binarySerie = (Byte*) malloc(4 * valuesPerSerie); + lengthRead = fread(binarySerie, 1, 4*valuesPerSerie, ifile); + //TODO: assert that lengthRead == 4*valuesPerSerie (...) for (uint32_t i = 0; i < valuesPerSerie; i++) { - uint32_t powerInt = bInt_to_uint(binarySerie + 3 * i, 3); + float power = bReal_to_float(binarySerie + 4 * i); if (ofile) { - fprintf(ofile, "%g", powerInt / 10.0 - 0.0); + fprintf(ofile, "%g", power); if (i < valuesPerSerie-1) fprintf(ofile, ","); } else - powerCurve->values[i] = powerInt / 10.0 - 0.0; + powerCurve->values[i] = power; } free(binarySerie); if (ofile) diff --git a/code/stage1/src/TimeSeries/serialize.c b/code/stage1/src/TimeSeries/serialize.c index 88b15f1..3caa371 100644 --- a/code/stage1/src/TimeSeries/serialize.c +++ b/code/stage1/src/TimeSeries/serialize.c @@ -8,7 +8,7 @@ #include // parse a line into two integers (ID, raw power) -static void scan_line(FILE* ifile, int posID, uint32_t* ID, int posPower, uint32_t* rawPower) +static void scan_line(FILE* ifile, int posID, uint32_t* ID, int posPower, float* rawPower) { char nextChar; int position = 1; @@ -22,16 +22,14 @@ static void scan_line(FILE* ifile, int posID, uint32_t* ID, int posPower, uint32 } else if (position == posPower) { - Real untruncatedPower; - nextChar = readReal(ifile, &untruncatedPower); - if (untruncatedPower < 0.0) - untruncatedPower = 0.0; - *rawPower = (uint32_t) floor(untruncatedPower*10.0); + Real power; + nextChar = readReal(ifile, &power); + *rawPower = (float) power; } else //erase the comma (and skip field then) nextChar = fgetc(ifile); - + //continue until next comma (or line end or file end) while (!feof(ifile) && nextChar != '\n' && nextChar != '\r' && nextChar != ',') nextChar = fgetc(ifile); @@ -99,7 +97,8 @@ void serialize_byCols(const char* ifileName, const char* ofileName, uint32_t nbI free(headerString); //estimate tsLength with a scan of the 3 first series - uint32_t ID=0, rawPower=0, lastID=0, refTsLength=0; + uint32_t ID=0, lastID=0, refTsLength=0; + float rawPower = 0.0; scan_line(ifile, posID, &ID, posPower, &rawPower); //'sl' = sample lengths (short because a lot of comparisons then) uint32_t* sl = (uint32_t*) calloc(3, sizeof(uint32_t)); @@ -108,7 +107,7 @@ void serialize_byCols(const char* ifileName, const char* ofileName, uint32_t nbI lastID = ID; while (ID == lastID) { - sl[i]++; + sl[i]++; scan_line(ifile, posID, &ID, posPower, &rawPower); } } @@ -124,9 +123,7 @@ void serialize_byCols(const char* ifileName, const char* ofileName, uint32_t nbI // output file to write time-series sequentially, binary format. // Format: for each series, ID is stored on 4 bytes (unsigned integer32). Then, - // ()+ follow, with rawPower stored as a "3 bytes int" - // rawPower values are multiplied by 10 and truncated one digit after 0 - // NOTE: no raw power should be exactly zero + // ()+ follow, with rawPower stored as a float FILE* ofile = fopen(ofileName, "wb"); // leave space to write the number of series (32bits), and their length in bytes (32bits) @@ -136,9 +133,8 @@ void serialize_byCols(const char* ifileName, const char* ofileName, uint32_t nbI // process one client (ID in first column) at a time uint64_t processedLines = 0; //execution trace uint32_t seriesCount=0, skippedSeriesCount=0, tsLength=0; - uint32_t mismatchLengthCount=0, overflowCount=0; - Byte tsBuffer[4+3*refTsLength]; - int overflow = 0; + uint32_t mismatchLengthCount=0; + Byte tsBuffer[4+4*refTsLength]; lastID = 0; while (!feof(ifile)) { @@ -157,10 +153,10 @@ void serialize_byCols(const char* ifileName, const char* ofileName, uint32_t nbI //just starting a new time-series: must process the last one (if there is a last one !) if (lastID > 0) { - if (tsLength == refTsLength && !overflow) + if (tsLength == refTsLength) { seriesCount++; - fwrite(tsBuffer, 4+3*tsLength, 1, ofile); + fwrite(tsBuffer, 4+4*tsLength, 1, ofile); if (nbItems > 0 && seriesCount >= nbItems) break; } @@ -170,62 +166,56 @@ void serialize_byCols(const char* ifileName, const char* ofileName, uint32_t nbI skippedSeriesCount++; if (tsLength != refTsLength) mismatchLengthCount++; - if (overflow) - overflowCount++; } } - + // ID for the new series is printed only once: - write_int(ID, 4, tsBuffer); + write_int(ID, tsBuffer); // reinitialize flags - overflow = 0; tsLength = 0; lastID = ID; } - overflow = (overflow || (rawPower >= (1 << 24))); //We cannot write more than refTsLength bytes if (tsLength < refTsLength) - write_int(rawPower, 3, tsBuffer + 4+3*tsLength); + write_real(rawPower, tsBuffer + 4+4*tsLength); tsLength++; - + if ((++processedLines) % 1000000 == 0) fprintf(stdout,"Processed %"PRIu64" lines\n", processedLines); } - if (!overflow && tsLength == refTsLength && (nbItems <= 0 || seriesCount < nbItems)) + if (tsLength == refTsLength && (nbItems <= 0 || seriesCount < nbItems)) { // flush last time-series if all conditions are met - fwrite(tsBuffer, 4+3*tsLength, 1, ofile); + fwrite(tsBuffer, 4+4*tsLength, 1, ofile); seriesCount++; } else if (nbItems <= 0 || seriesCount < nbItems) { if (tsLength != refTsLength) mismatchLengthCount++; - if (overflow) - overflowCount++; } // write lines count and size of a time-series in bytes Byte intBuffer[4]; fseek(ofile, 0, SEEK_SET); - write_int(seriesCount, 4, intBuffer); + write_int(seriesCount, intBuffer); fwrite(intBuffer, 1, 4, ofile); // re-express tsLength in bytes (not forgetting the ID)) - write_int(4 + 3 * refTsLength, 4, intBuffer); + write_int(4 + 4 * refTsLength, intBuffer); fwrite(intBuffer, 1, 4, ofile); // finally print some statistics if (seriesCount < nbItems) fprintf(stdout,"Warning: only %u series retrieved.\n",seriesCount); - fprintf(stdout,"%u overflows / %u mismatch series lengths.\n",overflowCount,mismatchLengthCount); - + fprintf(stdout,"%u mismatch series lengths.\n",mismatchLengthCount); + fclose(ifile); fclose(ofile); } -//serialize from usual 'by-row' data (for StarLight example and toy dataset) +//serialize from usual 'by-row' data void serialize_byRows(const char* ifileName, const char* ofileName, uint32_t nbItems) { FILE* ifile = fopen(ifileName, "r"); @@ -239,6 +229,7 @@ void serialize_byRows(const char* ifileName, const char* ofileName, uint32_t nbI { nbValues++; //skip potential consecutive commas (could be hard to spot) + //TODO(...): should be 'NA' in R, thus an error (we don't handle NAs)... while (curChar == ',') curChar = fgetc(ifile); ungetc(curChar, ifile); @@ -256,7 +247,7 @@ void serialize_byRows(const char* ifileName, const char* ofileName, uint32_t nbI fseek(ifile, 0, SEEK_SET); //write meta info first - uint32_t tsLength = 3*nbValues+4; + uint32_t tsLength = 4*nbValues+4; FILE* ofile = fopen(ofileName, "wb"); Byte intBuffer[4]; write_int(nbSeries, 4, intBuffer); @@ -265,10 +256,7 @@ void serialize_byRows(const char* ifileName, const char* ofileName, uint32_t nbI fwrite(intBuffer, 1, 4, ofile); Real rawPower; int64_t ID; - - //DEBUG / TEST (ugly, TOFIX...) - double minrp = INFINITY, maxrp = -INFINITY; - + for (uint32_t i=0; i maxrp) - maxrp = rawPower; - - write_int((uint32_t)floor(10.0*(rawPower+0.0)), 3, intBuffer); //x10... +3... - fwrite(intBuffer, 1, 3, ofile); + write_real(rawPower, intBuffer); + fwrite(intBuffer, 1, 4, ofile); while (curChar == ',') curChar = fgetc(ifile); ungetc(curChar, ifile); @@ -300,7 +281,4 @@ void serialize_byRows(const char* ifileName, const char* ofileName, uint32_t nbI } fclose(ifile); fclose(ofile); - - //DEBUG / TEST (ugly, TOFIX...) - printf("min / max values = %g %g\n",minrp,maxrp); } diff --git a/code/stage1/src/Util/types.h b/code/stage1/src/Util/types.h index eaab1b0..824be88 100644 --- a/code/stage1/src/Util/types.h +++ b/code/stage1/src/Util/types.h @@ -12,7 +12,7 @@ typedef unsigned char Byte; -typedef double Real; +typedef float Real; // Type to describe a job to be done in a node //TODO: merge with packed version to avoid extra copy by MPI diff --git a/code/stage1/src/Util/utils.c b/code/stage1/src/Util/utils.c index 9713e30..efd4dd2 100644 --- a/code/stage1/src/Util/utils.c +++ b/code/stage1/src/Util/utils.c @@ -58,24 +58,45 @@ char readReal(FILE* stream, Real* real) } // convert n-bytes binary integers to uint32_t -uint32_t bInt_to_uint(Byte* pInteger, size_t bytesCount) +uint32_t bInt_to_uint(Byte* pInteger) { - uint32_t integer = 0; - for (size_t i = 0; i < bytesCount; i++) - integer += ((uint32_t) (pInteger[i])) << (i << 3); - return integer; + uint32_t res; + memcpy(&res, pInteger, 4); + return res; } -// serialize integers with a portable bytes order -void write_int(uint32_t integer, size_t bytesCount, Byte* buffer) +// serialize integers +void write_int(uint32_t x, Byte* buffer) { - Byte chunk; - // write from left to right, from least to most significative bit - for (size_t i = 0; i < bytesCount; i++) - { - chunk = (integer >> (i << 3)) & 0xFF; - buffer[i] = chunk; - } + union { + uint32_t i; + char bytes[4]; + } u; + u.i = x; + for (size_t i = 0; i < 4; i++) + buffer[i] = u.bytes[i]; +} + +//WARNING: assuming float is 32bits... +// convert 4-bytes binary float to float +float bReal_to_double(Byte* pFloat) +{ + float res; + memcpy(&res, pFloat, 4); + return res; +} + +//WARNING: assuming float is 32bits... +// serialize double with a NON-portable bytes order +void write_real(float x, Byte* buffer) +{ + union { + float d; + char bytes[4]; + } u; + u.d = x; + for (size_t i = 0; i < 4; i++) + buffer[i] = u.bytes[i]; } // Expected size of a Work message in bytes: diff --git a/code/stage1/wrapper.R b/code/stage1/wrapper.R index 2549f1f..9bbb53b 100644 --- a/code/stage1/wrapper.R +++ b/code/stage1/wrapper.R @@ -1,9 +1,15 @@ -ppam_exe = function(pathToExe, nbProcess, ...) +ppam_exe = function(path=".", np=parallel::detectCores(), data=NULL, args="DontLetMeEmpty") { - command_line = paste("mpirun -np ",nbProcess," ",pathToExe,"/ppam.exe",sep="") - args = list(...) - for (i in 1:length(args)) - command_line = paste(command_line, args[[i]]) - print(paste("EXECUTE: '", command_line,"'",sep="")) + command_line = paste("mpirun -np ",np," ",path,"/ppam.exe",sep="") + + #if data provided (as data.frame or matrix...): binarize it, and add it as first argument + if (!is.null(data)) + { + write.csv(data, "/tmp/data_csv") + system(paste(path,"/ppam.exe serialize /tmp/data_csv /tmp/data_bin 0 0",sep="")) + command_line = paste(command_line," ","/tmp/data_bin",sep="") + } + + command_line = paste(command_line," ",args,sep="") system(command_line) } -- 2.44.0