finalize R 'wrapper' for stage 1; WARNING: changed series encoding. TODO: test
authorBenjamin Auder <benjamin.auder@somewhere>
Wed, 11 Jan 2017 00:47:23 +0000 (01:47 +0100)
committerBenjamin Auder <benjamin.auder@somewhere>
Wed, 11 Jan 2017 00:47:23 +0000 (01:47 +0100)
code/stage1/src/TimeSeries/deserialize.c
code/stage1/src/TimeSeries/serialize.c
code/stage1/src/Util/types.h
code/stage1/src/Util/utils.c
code/stage1/wrapper.R

index 5d1d758..79f7843 100644 (file)
@@ -10,7 +10,7 @@ PowerCurve* deserialize(const char* ifileName, const char* ofileName,
        // Read tsLength at the beginning of the file
        uint32_t tsLength = get_tsLength(ifileName);
        
-       uint32_t valuesPerSerie = (tsLength - 4) / 3; //remove 4 bytes of ID
+       uint32_t valuesPerSerie = (tsLength - 4) / 4; //remove 4 bytes of ID
 
        FILE* ifile = fopen(ifileName, "rb");
        FILE* ofile = NULL;
@@ -22,7 +22,7 @@ PowerCurve* deserialize(const char* ifileName, const char* ofileName,
                nbRanks = get_nbSeries(ifileName);
                ranks = NULL;
        }
-       
+
        PowerCurve* powerCurves = NULL;
        if (!ofile)
                powerCurves = (PowerCurve*) malloc(nbRanks * sizeof(PowerCurve));
@@ -52,22 +52,21 @@ PowerCurve* deserialize(const char* ifileName, const char* ofileName,
                else
                        powerCurve->ID = ID;
 
-               // translate 3-bytes binary integers into Real
-               Byte* binarySerie = (Byte*) malloc(3 * valuesPerSerie);
-               lengthRead = fread(binarySerie, 1, 3*valuesPerSerie, ifile);
-               if (lengthRead != 3*valuesPerSerie)
-                       fprintf(stderr,"Warning: deserializing truncated binary file.\n");
+               // translate 4-bytes binary integers into Real
+               Byte* binarySerie = (Byte*) malloc(4 * valuesPerSerie);
+               lengthRead = fread(binarySerie, 1, 4*valuesPerSerie, ifile);
+               //TODO: assert that lengthRead == 4*valuesPerSerie (...)
                for (uint32_t i = 0; i < valuesPerSerie; i++)
                {
-                       uint32_t powerInt = bInt_to_uint(binarySerie + 3 * i, 3);
+                       float power = bReal_to_float(binarySerie + 4 * i);
                        if (ofile)
                        {
-                               fprintf(ofile, "%g", powerInt / 10.0 - 0.0);
+                               fprintf(ofile, "%g", power);
                                if (i < valuesPerSerie-1)
                                        fprintf(ofile, ",");
                        }
                        else
-                               powerCurve->values[i] = powerInt / 10.0 - 0.0;
+                               powerCurve->values[i] = power;
                }
                free(binarySerie);
                if (ofile)
index 88b15f1..3caa371 100644 (file)
@@ -8,7 +8,7 @@
 #include <string.h>
 
 // parse a line into two integers (ID, raw power)
-static void scan_line(FILE* ifile, int posID, uint32_t* ID, int posPower, uint32_t* rawPower)
+static void scan_line(FILE* ifile, int posID, uint32_t* ID, int posPower, float* rawPower)
 {
        char nextChar;
        int position = 1;
@@ -22,16 +22,14 @@ static void scan_line(FILE* ifile, int posID, uint32_t* ID, int posPower, uint32
                }
                else if (position == posPower)
                {
-                       Real untruncatedPower;
-                       nextChar = readReal(ifile, &untruncatedPower);
-                       if (untruncatedPower < 0.0)
-                               untruncatedPower = 0.0;
-                       *rawPower = (uint32_t) floor(untruncatedPower*10.0);
+                       Real power;
+                       nextChar = readReal(ifile, &power);
+                       *rawPower = (float) power;
                }
                else
                        //erase the comma (and skip field then)
                        nextChar = fgetc(ifile);
-               
+
                //continue until next comma (or line end or file end)
                while (!feof(ifile) && nextChar != '\n' && nextChar != '\r' && nextChar != ',')
                        nextChar = fgetc(ifile);
@@ -99,7 +97,8 @@ void serialize_byCols(const char* ifileName, const char* ofileName, uint32_t nbI
        free(headerString);
        
        //estimate tsLength with a scan of the 3 first series
-       uint32_t ID=0, rawPower=0, lastID=0, refTsLength=0;
+       uint32_t ID=0, lastID=0, refTsLength=0;
+       float rawPower = 0.0;
        scan_line(ifile, posID, &ID, posPower, &rawPower);
        //'sl' = sample lengths (short because a lot of comparisons then)
        uint32_t* sl = (uint32_t*) calloc(3, sizeof(uint32_t));
@@ -108,7 +107,7 @@ void serialize_byCols(const char* ifileName, const char* ofileName, uint32_t nbI
                lastID = ID;
                while (ID == lastID)
                {
-                       sl[i]++;        
+                       sl[i]++;
                        scan_line(ifile, posID, &ID, posPower, &rawPower);
                }
        }
@@ -124,9 +123,7 @@ void serialize_byCols(const char* ifileName, const char* ofileName, uint32_t nbI
 
        // output file to write time-series sequentially, binary format.
        // Format: for each series, ID is stored on 4 bytes (unsigned integer32). Then,
-       //         (<rawPower>)+ follow, with rawPower stored as a "3 bytes int"
-       //         rawPower values are multiplied by 10 and truncated one digit after 0
-       // NOTE: no raw power should be exactly zero
+       //         (<rawPower>)+ follow, with rawPower stored as a float
        FILE* ofile = fopen(ofileName, "wb");
 
        // leave space to write the number of series (32bits), and their length in bytes (32bits)
@@ -136,9 +133,8 @@ void serialize_byCols(const char* ifileName, const char* ofileName, uint32_t nbI
        // process one client (ID in first column) at a time
        uint64_t processedLines = 0; //execution trace
        uint32_t seriesCount=0, skippedSeriesCount=0, tsLength=0;
-       uint32_t mismatchLengthCount=0, overflowCount=0;
-       Byte tsBuffer[4+3*refTsLength];
-       int overflow = 0;
+       uint32_t mismatchLengthCount=0;
+       Byte tsBuffer[4+4*refTsLength];
        lastID = 0;
        while (!feof(ifile))
        {
@@ -157,10 +153,10 @@ void serialize_byCols(const char* ifileName, const char* ofileName, uint32_t nbI
                        //just starting a new time-series: must process the last one (if there is a last one !)
                        if (lastID > 0)
                        {
-                               if (tsLength == refTsLength && !overflow)
+                               if (tsLength == refTsLength)
                                {
                                        seriesCount++;
-                                       fwrite(tsBuffer, 4+3*tsLength, 1, ofile);
+                                       fwrite(tsBuffer, 4+4*tsLength, 1, ofile);
                                        if (nbItems > 0 && seriesCount >= nbItems)
                                                break;
                                }
@@ -170,62 +166,56 @@ void serialize_byCols(const char* ifileName, const char* ofileName, uint32_t nbI
                                        skippedSeriesCount++;
                                        if (tsLength != refTsLength)
                                                mismatchLengthCount++;
-                                       if (overflow)
-                                               overflowCount++;
                                }
                        }
-                       
+
                        // ID for the new series is printed only once:
-                       write_int(ID, 4, tsBuffer);
+                       write_int(ID, tsBuffer);
                        // reinitialize flags
-                       overflow = 0;
                        tsLength = 0;
                        lastID = ID;
                }
 
-               overflow = (overflow || (rawPower >= (1 << 24)));
                //We cannot write more than refTsLength bytes
                if (tsLength < refTsLength)
-                       write_int(rawPower, 3, tsBuffer + 4+3*tsLength);
+                       write_real(rawPower, tsBuffer + 4+4*tsLength);
                tsLength++;
-               
+
                if ((++processedLines) % 1000000 == 0)
                        fprintf(stdout,"Processed %"PRIu64" lines\n", processedLines);
        }
 
-       if (!overflow && tsLength == refTsLength && (nbItems <= 0 || seriesCount < nbItems))
+       if (tsLength == refTsLength && (nbItems <= 0 || seriesCount < nbItems))
        {
                // flush last time-series if all conditions are met
-               fwrite(tsBuffer, 4+3*tsLength, 1, ofile);
+               fwrite(tsBuffer, 4+4*tsLength, 1, ofile);
                seriesCount++;
        }
        else if (nbItems <= 0 || seriesCount < nbItems)
        {
                if (tsLength != refTsLength)
                        mismatchLengthCount++;
-               if (overflow)
-                       overflowCount++;
        }
 
        // write lines count and size of a time-series in bytes
        Byte intBuffer[4];
        fseek(ofile, 0, SEEK_SET);
-       write_int(seriesCount, 4, intBuffer);
+       write_int(seriesCount, intBuffer);
        fwrite(intBuffer, 1, 4, ofile);
        // re-express tsLength in bytes (not forgetting the ID))
-       write_int(4 + 3 * refTsLength, 4, intBuffer);
+       write_int(4 + 4 * refTsLength, intBuffer);
        fwrite(intBuffer, 1, 4, ofile);
 
        // finally print some statistics
        if (seriesCount < nbItems)
                fprintf(stdout,"Warning: only %u series retrieved.\n",seriesCount);
-       fprintf(stdout,"%u overflows / %u mismatch series lengths.\n",overflowCount,mismatchLengthCount);
-       
+       fprintf(stdout,"%u mismatch series lengths.\n",mismatchLengthCount);
+
        fclose(ifile);
        fclose(ofile);
 }
 
-//serialize from usual 'by-row' data (for StarLight example and toy dataset)
+//serialize from usual 'by-row' data
 void serialize_byRows(const char* ifileName, const char* ofileName, uint32_t nbItems)
 {
        FILE* ifile = fopen(ifileName, "r");
@@ -239,6 +229,7 @@ void serialize_byRows(const char* ifileName, const char* ofileName, uint32_t nbI
                {
                        nbValues++;
                        //skip potential consecutive commas (could be hard to spot)
+                       //TODO(...): should be 'NA' in R, thus an error (we don't handle NAs)...
                        while (curChar == ',')
                                curChar = fgetc(ifile);
                        ungetc(curChar, ifile);
@@ -256,7 +247,7 @@ void serialize_byRows(const char* ifileName, const char* ofileName, uint32_t nbI
        fseek(ifile, 0, SEEK_SET);
 
        //write meta info first
-       uint32_t tsLength = 3*nbValues+4;
+       uint32_t tsLength = 4*nbValues+4;
        FILE* ofile = fopen(ofileName, "wb");
        Byte intBuffer[4];
        write_int(nbSeries, 4, intBuffer);
@@ -265,10 +256,7 @@ void serialize_byRows(const char* ifileName, const char* ofileName, uint32_t nbI
        fwrite(intBuffer, 1, 4, ofile);
        Real rawPower;
        int64_t ID;
-       
-       //DEBUG / TEST (ugly, TOFIX...)
-       double minrp = INFINITY, maxrp = -INFINITY;
-       
+
        for (uint32_t i=0; i<nbSeries; i++)
        {
                //skip potential line feeds before next line
@@ -284,15 +272,8 @@ void serialize_byRows(const char* ifileName, const char* ofileName, uint32_t nbI
                for (uint32_t j=0; j<nbValues; j++)
                {
                        curChar = readReal(ifile, &rawPower);
-                       
-                       //DEBUG / TEST (ugly, TOFIX...)
-                       if (rawPower < minrp)
-                               minrp = rawPower;
-                       if (rawPower > maxrp)
-                               maxrp = rawPower;
-                       
-                       write_int((uint32_t)floor(10.0*(rawPower+0.0)), 3, intBuffer); //x10... +3...
-                       fwrite(intBuffer, 1, 3, ofile);
+                       write_real(rawPower, intBuffer);
+                       fwrite(intBuffer, 1, 4, ofile);
                        while (curChar == ',')
                                curChar = fgetc(ifile);
                        ungetc(curChar, ifile);
@@ -300,7 +281,4 @@ void serialize_byRows(const char* ifileName, const char* ofileName, uint32_t nbI
        }
        fclose(ifile);
        fclose(ofile);
-       
-       //DEBUG / TEST (ugly, TOFIX...)
-       printf("min / max values = %g %g\n",minrp,maxrp);
 }
index eaab1b0..824be88 100644 (file)
@@ -12,7 +12,7 @@
 
 typedef unsigned char Byte;
 
-typedef double Real;
+typedef float Real;
 
 // Type to describe a job to be done in a node
 //TODO: merge with packed version to avoid extra copy by MPI
index 9713e30..efd4dd2 100644 (file)
@@ -58,24 +58,45 @@ char readReal(FILE* stream, Real* real)
 }
 
 // convert n-bytes binary integers to uint32_t
-uint32_t bInt_to_uint(Byte* pInteger, size_t bytesCount)
+uint32_t bInt_to_uint(Byte* pInteger)
 {
-       uint32_t integer = 0;
-       for (size_t i = 0; i < bytesCount; i++)
-               integer += ((uint32_t) (pInteger[i])) << (i << 3);
-       return integer;
+       uint32_t res;
+       memcpy(&res, pInteger, 4);
+       return res;
 }
 
-// serialize integers with a portable bytes order
-void write_int(uint32_t integer, size_t bytesCount, Byte* buffer)
+// serialize integers
+void write_int(uint32_t x, Byte* buffer)
 {
-       Byte chunk;
-       // write from left to right, from least to most significative bit
-       for (size_t i = 0; i < bytesCount; i++)
-       {
-               chunk = (integer >> (i << 3)) & 0xFF;
-               buffer[i] = chunk;
-       }
+       union {
+               uint32_t i;
+               char bytes[4];
+       } u;
+       u.i = x;
+       for (size_t i = 0; i < 4; i++)
+               buffer[i] = u.bytes[i];
+}
+
+//WARNING: assuming float is 32bits...
+// convert 4-bytes binary float to float
+float bReal_to_double(Byte* pFloat)
+{
+       float res;
+       memcpy(&res, pFloat, 4);
+       return res;
+}
+
+//WARNING: assuming float is 32bits...
+// serialize double with a NON-portable bytes order
+void write_real(float x, Byte* buffer)
+{
+       union {
+               float d;
+               char bytes[4];
+       } u;
+       u.d = x;
+       for (size_t i = 0; i < 4; i++)
+               buffer[i] = u.bytes[i];
 }
 
 // Expected size of a Work message in bytes:
index 2549f1f..9bbb53b 100644 (file)
@@ -1,9 +1,15 @@
-ppam_exe = function(pathToExe, nbProcess, ...)
+ppam_exe = function(path=".", np=parallel::detectCores(), data=NULL, args="DontLetMeEmpty")
 {
-       command_line = paste("mpirun -np ",nbProcess," ",pathToExe,"/ppam.exe",sep="")
-       args = list(...)
-       for (i in 1:length(args))
-               command_line = paste(command_line, args[[i]])
-       print(paste("EXECUTE: '", command_line,"'",sep=""))
+       command_line = paste("mpirun -np ",np," ",path,"/ppam.exe",sep="")
+
+       #if data provided (as data.frame or matrix...): binarize it, and add it as first argument
+       if (!is.null(data))
+       {
+               write.csv(data, "/tmp/data_csv")
+               system(paste(path,"/ppam.exe serialize /tmp/data_csv /tmp/data_bin 0 0",sep=""))
+               command_line = paste(command_line," ","/tmp/data_bin",sep="")
+       }
+
+       command_line = paste(command_line," ",args,sep="")
        system(command_line)
 }