-#include "TimeSeries/serialize.h"
-#define __STDC_FORMAT_MACROS
-#include <inttypes.h>
-#include <math.h>
-#include "Util/types.h"
-#include "Util/utils.h"
-#include <cgds/Vector.h>
-#include <string.h>
-
-// parse a line into two integers (ID, raw power)
-static void scan_line(FILE* ifile, int posID, uint32_t* ID, int posPower, float* rawPower)
-{
- char nextChar;
- int position = 1;
- while (1)
- {
- if (position == posID)
- {
- int64_t ID_on64bits;
- nextChar = readInt(ifile, &ID_on64bits);
- *ID = (uint32_t)ID_on64bits;
- }
- else if (position == posPower)
- {
- float power;
- nextChar = readReal(ifile, &power);
- *rawPower = (float) power;
- }
- else
- //erase the comma (and skip field then)
- nextChar = fgetc(ifile);
-
- //continue until next comma (or line end or file end)
- while (!feof(ifile) && nextChar != '\n' && nextChar != '\r' && nextChar != ',')
- nextChar = fgetc(ifile);
- position++;
-
- if (feof(ifile) || nextChar == '\n' || nextChar == '\r')
- {
- // skip all potential line feeds
- while (!feof(ifile) && nextChar == '\n' || nextChar == '\r')
- nextChar = fgetc(ifile);
- if (!feof(ifile))
- ungetc(nextChar, ifile);
- break;
- }
- }
-}
-
-//main job: parse a text file into a binary compressed version
-//TODO [long term]: adapt to distributed files/DB, maybe to distributed binary chunks
-void serialize_byCols(const char* ifileName, const char* ofileName, uint32_t nbItems)
-{
- // use the header to know positions of ID and rawPower
- FILE* ifile = fopen(ifileName, "r");
- uint32_t headerShift = 0;
- char curChar;
- Vector* header = vector_new(char);
- do
- {
- curChar = fgetc(ifile);
- headerShift++;
- if (curChar == '\n' || curChar == '\r')
- {
- //flush all potential other line feeds
- while (curChar == '\n' || curChar == '\r')
- curChar = fgetc(ifile);
- ungetc(curChar, ifile);
- break;
- }
- vector_push(header, curChar);
- }
- while (1);
- char* headerString = (char*)malloc((vector_size(header) + 1)*sizeof(char));
- VectorIterator* it = vector_get_iterator(header);
- int index = 0;
- while (vectorI_has_data(it))
- {
- vectorI_get(it, headerString[index]);
- vectorI_move_next(it);
- index++;
- }
- vectorI_destroy(it);
- headerString[index] = 0;
- vector_destroy(header);
- int position = 1, posID = 0, posPower = 0;
- char* columnDescriptor = strtok(headerString, ",");
- while (columnDescriptor != NULL)
- {
- if (!strcmp(columnDescriptor,"FK_CCU_ID") || !strcmp(columnDescriptor,"fk_ccu_id"))
- posID = position;
- else if (!strcmp(columnDescriptor,"CPP_PUISSANCE_BRUTE"))
- posPower = position;
- position++;
- columnDescriptor = strtok(NULL, ",");
- }
- free(headerString);
-
- //estimate tsLength with a scan of the 3 first series
- uint32_t ID=0, lastID=0, refTsLength=0;
- float rawPower = 0.0;
- scan_line(ifile, posID, &ID, posPower, &rawPower);
- //'sl' = sample lengths (short because a lot of comparisons then)
- uint32_t* sl = (uint32_t*) calloc(3, sizeof(uint32_t));
- for (int i=0; i<3; i++)
- {
- lastID = ID;
- while (ID == lastID)
- {
- sl[i]++;
- scan_line(ifile, posID, &ID, posPower, &rawPower);
- }
- }
- if (sl[1] <= sl[0] <= sl[2] || sl[2] <= sl[0] <= sl[1])
- refTsLength = sl[0];
- else if (sl[0] <= sl[1] <= sl[2] || sl[2] <= sl[1] <= sl[0])
- refTsLength = sl[1];
- else
- refTsLength = sl[2];
- free(sl);
- //go back at the beginning of the first series (ready to read '\n'...)
- fseek(ifile, headerShift-1, SEEK_SET);
-
- // output file to write time-series sequentially, binary format.
- // Format: for each series, ID is stored on 4 bytes (unsigned integer32). Then,
- // (<rawPower>)+ follow, with rawPower stored as a float
- FILE* ofile = fopen(ofileName, "wb");
-
- // leave space to write the number of series (32bits), and their length in bytes (32bits)
- for (int i = 0; i < 8; i++)
- fputc(0, ofile);
-
- // process one client (ID in first column) at a time
- uint64_t processedLines = 0; //execution trace
- uint32_t seriesCount=0, skippedSeriesCount=0, tsLength=0;
- uint32_t mismatchLengthCount=0;
- Byte tsBuffer[4+4*refTsLength];
- lastID = 0;
- while (!feof(ifile))
- {
- // next element to read always start with a digit
- do
- curChar = fgetc(ifile);
- while (!feof(ifile) && (curChar < '0' || curChar > '9'));
- if (feof(ifile))
- break;
- ungetc(curChar, ifile);
-
- // read line
- scan_line(ifile, posID, &ID, posPower, &rawPower);
- if (ID != lastID)
- {
- //just starting a new time-series: must process the last one (if there is a last one !)
- if (lastID > 0)
- {
- if (tsLength == refTsLength)
- {
- seriesCount++;
- fwrite(tsBuffer, 4+4*tsLength, 1, ofile);
- if (nbItems > 0 && seriesCount >= nbItems)
- break;
- }
- //if something wrong happened, skip series
- else
- {
- skippedSeriesCount++;
- if (tsLength != refTsLength)
- mismatchLengthCount++;
- }
- }
-
- // ID for the new series is printed only once:
- write_int(ID, tsBuffer);
- // reinitialize flags
- tsLength = 0;
- lastID = ID;
- }
-
- //We cannot write more than refTsLength bytes
- if (tsLength < refTsLength)
- write_real(rawPower, tsBuffer + 4+4*tsLength);
- tsLength++;
-
- if ((++processedLines) % 1000000 == 0)
- fprintf(stdout,"Processed %"PRIu64" lines\n", processedLines);
- }
-
- if (tsLength == refTsLength && (nbItems <= 0 || seriesCount < nbItems))
- {
- // flush last time-series if all conditions are met
- fwrite(tsBuffer, 4+4*tsLength, 1, ofile);
- seriesCount++;
- }
- else if (nbItems <= 0 || seriesCount < nbItems)
- {
- if (tsLength != refTsLength)
- mismatchLengthCount++;
- }
-
- // write lines count and size of a time-series in bytes
- Byte intBuffer[4];
- fseek(ofile, 0, SEEK_SET);
- write_int(seriesCount, intBuffer);
- fwrite(intBuffer, 1, 4, ofile);
- // re-express tsLength in bytes (not forgetting the ID))
- write_int(4 + 4 * refTsLength, intBuffer);
- fwrite(intBuffer, 1, 4, ofile);
-
- // finally print some statistics
- if (seriesCount < nbItems)
- fprintf(stdout,"Warning: only %u series retrieved.\n",seriesCount);
- fprintf(stdout,"%u mismatch series lengths.\n",mismatchLengthCount);
-
- fclose(ifile);
- fclose(ofile);
-}
-
-//serialize from usual 'by-row' data
-void serialize_byRows(const char* ifileName, const char* ofileName, uint32_t nbItems)
-{
- FILE* ifile = fopen(ifileName, "r");
- // first scan to know dimensions
- uint32_t nbValues = 0; //ID (or, more useful, real class number) comes first
- char curChar = ' ';
- while (curChar != '\n' && curChar != '\r')
- {
- curChar = fgetc(ifile);
- if (curChar == ',')
- {
- nbValues++;
- //skip potential consecutive commas (could be hard to spot)
- //TODO(...): should be 'NA' in R, thus an error (we don't handle NAs)...
- while (curChar == ',')
- curChar = fgetc(ifile);
- ungetc(curChar, ifile);
- }
- }
- while (curChar == '\n' || curChar == '\r')
- curChar = fgetc(ifile);
- ungetc(curChar, ifile);
- uint32_t nbSeries = 1; //we already read 1st line
- while (!feof(ifile))
- {
- if ((curChar = fgetc(ifile)) == '\n')
- nbSeries++;
- }
- fseek(ifile, 0, SEEK_SET);
-
- //write meta info first
- uint32_t tsLength = 4*nbValues+4;
- FILE* ofile = fopen(ofileName, "wb");
- Byte intBuffer[4];
- write_int(nbSeries, intBuffer);
- fwrite(intBuffer, 1, 4, ofile);
- write_int(tsLength, intBuffer);
- fwrite(intBuffer, 1, 4, ofile);
- float rawPower;
- int64_t ID;
-
- for (uint32_t i=0; i<nbSeries; i++)
- {
- //skip potential line feeds before next line
- while (curChar == '\n' || curChar == '\r')
- curChar = fgetc(ifile);
- ungetc(curChar, ifile);
- curChar = readInt(ifile, &ID);
- write_int((uint32_t)ID, intBuffer);
- fwrite(intBuffer, 1, 4, ofile);
- while (curChar == ',')
- curChar = fgetc(ifile);
- ungetc(curChar, ifile);
- for (uint32_t j=0; j<nbValues; j++)
- {
- curChar = readReal(ifile, &rawPower);
- write_real(rawPower, intBuffer);
- fwrite(intBuffer, 1, 4, ofile);
- while (curChar == ',')
- curChar = fgetc(ifile);
- ungetc(curChar, ifile);
- }
- }
- fclose(ifile);
- fclose(ofile);
-}