'update'
authorBenjamin Auder <benjamin.auder@somewhere>
Mon, 6 Mar 2017 11:03:53 +0000 (12:03 +0100)
committerBenjamin Auder <benjamin.auder@somewhere>
Mon, 6 Mar 2017 11:03:53 +0000 (12:03 +0100)
data/preprocessing/convert_32kEDF.R [moved from data/preprocessing/convert_ts.R with 58% similarity]
data/preprocessing/serialize.c [new file with mode: 0644]

similarity index 58%
rename from data/preprocessing/convert_ts.R
rename to data/preprocessing/convert_32kEDF.R
index 6c48b07..2e6798a 100644 (file)
@@ -1,9 +1,12 @@
-convert = function(orig_csv_file, nb_series_per_chunk)
+convert_32kEDF = function(orig_csv, nb_series_per_chunk)
 {
-       orig_file = file(orig_csv_file, open="r")
-       ignored = readLines(orig_file, 1) #skip header
+       datetimes = #...TODO: all 3 years? year-by-year is better
+       orig_con = file(orig_csv, open="r") #2009, 2010 or 2011
+       ignored = readLines(orig_con, 1) #skip header
+       serie_length = length(datetimes) #around 365*24*2 = 17520
+       sep = if (year==2009) "," else if (year==2010) ";" else ";"
 
-       serie_length = 17520 #365*24*2
+scan(orig_con, character(), sep=",", nlines=1, quiet=TRUE)
        library(sqldf, quietly=TRUE)
        ids = read.csv.sql(file_csv, header = TRUE, sep = ","
                sql = "select * from file_csv group by FK_CCU_ID")
diff --git a/data/preprocessing/serialize.c b/data/preprocessing/serialize.c
new file mode 100644 (file)
index 0000000..f35da64
--- /dev/null
@@ -0,0 +1,244 @@
+#define __STDC_FORMAT_MACROS //to print 64bits unsigned integers
+#include <inttypes.h>
+#include <cgds/Vector.h>
+#include <string.h>
+#include <math.h>
+#include <float.h>
+
+char readInt(FILE* stream, int64_t* integer)
+{
+       *integer = 0;
+       char curChar = fgetc(stream);
+       int sign = (curChar == '-' ? -1 : 1);
+       while (curChar < '0' || curChar > '9')
+               curChar = fgetc(stream);
+       ungetc(curChar, stream);
+       while ((curChar = fgetc(stream)) >= '0' && curChar <= '9')
+               *integer = 10 * (*integer) + (int64_t) (curChar - '0');
+       (*integer) *= sign;
+       return curChar;
+}
+
+char readReal(FILE* stream, float* real)
+{
+       int64_t integerPart;
+       char nextChar = readInt(stream, &integerPart);
+       int64_t fractionalPart = 0;
+       int countZeros = 0;
+       if (nextChar == '.')
+       {
+               //need to count zeros
+               while ((nextChar = fgetc(stream)) == '0')
+                       countZeros++;
+               if (nextChar >= '1' && nextChar <= '9')
+               {
+                       ungetc(nextChar, stream);
+                       nextChar = readInt(stream, &fractionalPart);
+               }
+       }
+       int64_t exponent = 0;
+       if (nextChar == 'e' || nextChar == 'E')
+               nextChar = readInt(stream, &exponent);
+       *real = ( integerPart + (integerPart>0 ? 1. : -1.) * (float)fractionalPart
+               / pow(10,countZeros+floor(log10(fractionalPart>0 ? fractionalPart : 1)+1)) )
+                       * pow(10,exponent);
+       return nextChar;
+}
+
+// Parse a line into integer+float (ID, raw power)
+static void scan_line(FILE* ifile, int posID, uint32_t* ID, int posPower, float* rawPower)
+{
+       char nextChar;
+       int position = 1;
+       while (1)
+       {
+               if (position == posID)
+               {
+                       int64_t ID_on64bits;
+                       nextChar = readInt(ifile, &ID_on64bits);
+                       *ID = (uint32_t)ID_on64bits;
+               }
+               else if (position == posPower)
+               {
+                       float power = FLT_MAX; //"NA"
+                       nextChar = readReal(ifile, &power); //?? WARNING here... if empty field ?!
+                       *rawPower = (float) power;
+               }
+               else
+                       //erase the comma (and skip field then)
+                       nextChar = fgetc(ifile);
+
+               //continue until next comma (or line end or file end)
+               while (!feof(ifile) && nextChar != '\n' && nextChar != '\r' && nextChar != ',')
+                       nextChar = fgetc(ifile);
+               position++;
+
+               if (feof(ifile) || nextChar == '\n' || nextChar == '\r')
+               {
+                       // skip all potential line feeds
+                       while (!feof(ifile) && nextChar == '\n' || nextChar == '\r')
+                               nextChar = fgetc(ifile);
+                       if (!feof(ifile))
+                               ungetc(nextChar, ifile);
+                       break;
+               }
+       }
+}
+
+// Main job: parse an "EDF-text file" into a conventional CSV file in rows, without header
+void transform(const char* ifileName, const char* ofileName, uint32_t nbItems)
+{
+       // Use the header to know positions of ID and rawPower
+       FILE* ifile = fopen(ifileName, "r");
+       uint32_t headerShift = 0;
+       char curChar;
+       Vector* header = vector_new(char);
+       do
+       {
+               curChar = fgetc(ifile);
+               headerShift++;
+               if (curChar == '\n' || curChar == '\r')
+               {
+                       // Flush all potential other line feeds
+                       while (curChar == '\n' || curChar == '\r')
+                               curChar = fgetc(ifile);
+                       ungetc(curChar, ifile);
+                       break;
+               }
+               vector_push(header, curChar);
+       }
+       while (1);
+       char* headerString = (char*)malloc((vector_size(header) + 1)*sizeof(char));
+       VectorIterator* it = vector_get_iterator(header);
+       int index = 0;
+       while (vectorI_has_data(it))
+       {
+               vectorI_get(it, headerString[index]);
+               vectorI_move_next(it);
+               index++;
+       }
+       vectorI_destroy(it);
+       headerString[index] = 0;
+       vector_destroy(header);
+       int position = 1, posID = 0, posPower = 0;
+       char* columnDescriptor = strtok(headerString, ",");
+       while (columnDescriptor != NULL)
+       {
+               if (!strcmp(columnDescriptor,"FK_CCU_ID") || !strcmp(columnDescriptor,"fk_ccu_id"))
+                       posID = position;
+               else if (!strcmp(columnDescriptor,"CPP_PUISSANCE_BRUTE"))
+                       posPower = position;
+               position++;
+               columnDescriptor = strtok(NULL, ",");
+       }
+       free(headerString);
+
+       // Estimate tsLength with a scan of the 3 first series
+       uint32_t ID=0, lastID=0, refTsLength=0;
+       float rawPower = 0.;
+       scan_line(ifile, posID, &ID, posPower, &rawPower);
+       //'sl' = sample lengths (short because a lot of comparisons then)
+       uint32_t* sl = (uint32_t*) calloc(3, sizeof(uint32_t));
+       for (int i=0; i<3; i++)
+       {
+               lastID = ID;
+               while (ID == lastID)
+               {
+                       sl[i]++;
+                       scan_line(ifile, posID, &ID, posPower, &rawPower);
+               }
+       }
+       if (sl[0] <= sl[1] <= sl[2] || sl[1] <= sl[0] <= sl[2])
+               refTsLength = sl[2];
+       else if (sl[1] <= sl[2] <= sl[0] || sl[2] <= sl[1] <= sl[0])
+               refTsLength = sl[0];
+       else
+               refTsLength = sl[1];
+       free(sl);
+       //go back at the beginning of the first series (ready to read '\n'...)
+       fseek(ifile, headerShift-1, SEEK_SET);
+
+       // output file to write time-series sequentially, CSV format.
+       FILE* ofile = fopen(ofileName, "w");
+
+       // process one client (ID in first column) at a time
+       uint64_t processedLines = 0; //execution trace
+       uint32_t seriesCount=0, skippedSeriesCount=0, tsLength=0;
+       uint32_t mismatchLengthCount=0;
+       float tsBuffer[refTsLength];
+       lastID = 0;
+       while (!feof(ifile))
+       {
+               // next element to read always start with a digit
+               do
+                       curChar = fgetc(ifile);
+               while (!feof(ifile) && (curChar < '0' || curChar > '9'));
+               if (feof(ifile))
+                       break;
+               ungetc(curChar, ifile);
+
+               // read line
+               scan_line(ifile, posID, &ID, posPower, &rawPower);
+               if (ID != lastID)
+               {
+                       //just starting a new time-series: must process the last one (if there is a last one !)
+                       if (lastID > 0)
+                       {
+                               if (tsLength == refTsLength)
+                               {
+                                       for (int i=0; i<tsLength; i++)
+                                       {
+                                               char* format = i<tsLength-1 ? "%g," : "%g";
+                                               fprintf(ofile, format, tsBuffer[i]);
+                                       }
+                                       fprintf(ofile, "\n");
+                                       if (nbItems > 0 && ++seriesCount >= nbItems)
+                                               break;
+                               }
+                               //if something wrong happened, skip series
+                               else
+                               {
+                                       skippedSeriesCount++;
+                                       if (tsLength != refTsLength)
+                                               mismatchLengthCount++;
+                               }
+                       }
+
+                       // reinitialize flags
+                       tsLength = 0;
+                       lastID = ID;
+               }
+
+               //We cannot write more than refTsLength values
+               if (tsLength < refTsLength)
+                       tsBuffer[tsLength++] = rawPower;
+
+               if ((++processedLines) % 1000000 == 0)
+                       fprintf(stdout,"Processed %"PRIu64" lines\n", processedLines);
+       }
+
+       if (tsLength == refTsLength && (nbItems <= 0 || seriesCount < nbItems))
+       {
+               // flush last time-series if all conditions are met
+               for (int i=0; i<tsLength; i++)
+               {
+                       char* format = i<tsLength-1 ? "%g," : "%g";
+                       fprintf(ofile, format, tsBuffer[i]);
+               }
+               fprintf(ofile, "\n");
+               seriesCount++;
+       }
+       else if (nbItems <= 0 || seriesCount < nbItems)
+       {
+               if (tsLength != refTsLength)
+                       mismatchLengthCount++;
+       }
+
+       // finally print some statistics
+       if (seriesCount < nbItems)
+               fprintf(stdout,"Warning: only %u series retrieved.\n",seriesCount);
+       fprintf(stdout,"%u mismatch series lengths.\n",mismatchLengthCount);
+
+       fclose(ifile);
+       fclose(ofile);
+}