From: Benjamin Auder Date: Mon, 6 Mar 2017 11:03:53 +0000 (+0100) Subject: 'update' X-Git-Url: https://git.auder.net/?p=epclust.git;a=commitdiff_plain;h=c133b1bd162091c6fc2baeea0c9f1d0c1f1369fb 'update' --- diff --git a/data/preprocessing/convert_ts.R b/data/preprocessing/convert_32kEDF.R similarity index 58% rename from data/preprocessing/convert_ts.R rename to data/preprocessing/convert_32kEDF.R index 6c48b07..2e6798a 100644 --- a/data/preprocessing/convert_ts.R +++ b/data/preprocessing/convert_32kEDF.R @@ -1,9 +1,12 @@ -convert = function(orig_csv_file, nb_series_per_chunk) +convert_32kEDF = function(orig_csv, nb_series_per_chunk) { - orig_file = file(orig_csv_file, open="r") - ignored = readLines(orig_file, 1) #skip header + datetimes = #...TODO: all 3 years? year-by-year is better + orig_con = file(orig_csv, open="r") #2009, 2010 or 2011 + ignored = readLines(orig_con, 1) #skip header + serie_length = length(datetimes) #around 365*24*2 = 17520 + sep = if (year==2009) "," else if (year==2010) ";" else ";" - serie_length = 17520 #365*24*2 +scan(orig_con, character(), sep=",", nlines=1, quiet=TRUE) library(sqldf, quietly=TRUE) ids = read.csv.sql(file_csv, header = TRUE, sep = "," sql = "select * from file_csv group by FK_CCU_ID") diff --git a/data/preprocessing/serialize.c b/data/preprocessing/serialize.c new file mode 100644 index 0000000..f35da64 --- /dev/null +++ b/data/preprocessing/serialize.c @@ -0,0 +1,244 @@ +#define __STDC_FORMAT_MACROS //to print 64bits unsigned integers +#include +#include +#include +#include +#include + +char readInt(FILE* stream, int64_t* integer) +{ + *integer = 0; + char curChar = fgetc(stream); + int sign = (curChar == '-' ? -1 : 1); + while (curChar < '0' || curChar > '9') + curChar = fgetc(stream); + ungetc(curChar, stream); + while ((curChar = fgetc(stream)) >= '0' && curChar <= '9') + *integer = 10 * (*integer) + (int64_t) (curChar - '0'); + (*integer) *= sign; + return curChar; +} + +char readReal(FILE* stream, float* real) +{ + int64_t integerPart; + char nextChar = readInt(stream, &integerPart); + int64_t fractionalPart = 0; + int countZeros = 0; + if (nextChar == '.') + { + //need to count zeros + while ((nextChar = fgetc(stream)) == '0') + countZeros++; + if (nextChar >= '1' && nextChar <= '9') + { + ungetc(nextChar, stream); + nextChar = readInt(stream, &fractionalPart); + } + } + int64_t exponent = 0; + if (nextChar == 'e' || nextChar == 'E') + nextChar = readInt(stream, &exponent); + *real = ( integerPart + (integerPart>0 ? 1. : -1.) * (float)fractionalPart + / pow(10,countZeros+floor(log10(fractionalPart>0 ? fractionalPart : 1)+1)) ) + * pow(10,exponent); + return nextChar; +} + +// Parse a line into integer+float (ID, raw power) +static void scan_line(FILE* ifile, int posID, uint32_t* ID, int posPower, float* rawPower) +{ + char nextChar; + int position = 1; + while (1) + { + if (position == posID) + { + int64_t ID_on64bits; + nextChar = readInt(ifile, &ID_on64bits); + *ID = (uint32_t)ID_on64bits; + } + else if (position == posPower) + { + float power = FLT_MAX; //"NA" + nextChar = readReal(ifile, &power); //?? WARNING here... if empty field ?! + *rawPower = (float) power; + } + else + //erase the comma (and skip field then) + nextChar = fgetc(ifile); + + //continue until next comma (or line end or file end) + while (!feof(ifile) && nextChar != '\n' && nextChar != '\r' && nextChar != ',') + nextChar = fgetc(ifile); + position++; + + if (feof(ifile) || nextChar == '\n' || nextChar == '\r') + { + // skip all potential line feeds + while (!feof(ifile) && nextChar == '\n' || nextChar == '\r') + nextChar = fgetc(ifile); + if (!feof(ifile)) + ungetc(nextChar, ifile); + break; + } + } +} + +// Main job: parse an "EDF-text file" into a conventional CSV file in rows, without header +void transform(const char* ifileName, const char* ofileName, uint32_t nbItems) +{ + // Use the header to know positions of ID and rawPower + FILE* ifile = fopen(ifileName, "r"); + uint32_t headerShift = 0; + char curChar; + Vector* header = vector_new(char); + do + { + curChar = fgetc(ifile); + headerShift++; + if (curChar == '\n' || curChar == '\r') + { + // Flush all potential other line feeds + while (curChar == '\n' || curChar == '\r') + curChar = fgetc(ifile); + ungetc(curChar, ifile); + break; + } + vector_push(header, curChar); + } + while (1); + char* headerString = (char*)malloc((vector_size(header) + 1)*sizeof(char)); + VectorIterator* it = vector_get_iterator(header); + int index = 0; + while (vectorI_has_data(it)) + { + vectorI_get(it, headerString[index]); + vectorI_move_next(it); + index++; + } + vectorI_destroy(it); + headerString[index] = 0; + vector_destroy(header); + int position = 1, posID = 0, posPower = 0; + char* columnDescriptor = strtok(headerString, ","); + while (columnDescriptor != NULL) + { + if (!strcmp(columnDescriptor,"FK_CCU_ID") || !strcmp(columnDescriptor,"fk_ccu_id")) + posID = position; + else if (!strcmp(columnDescriptor,"CPP_PUISSANCE_BRUTE")) + posPower = position; + position++; + columnDescriptor = strtok(NULL, ","); + } + free(headerString); + + // Estimate tsLength with a scan of the 3 first series + uint32_t ID=0, lastID=0, refTsLength=0; + float rawPower = 0.; + scan_line(ifile, posID, &ID, posPower, &rawPower); + //'sl' = sample lengths (short because a lot of comparisons then) + uint32_t* sl = (uint32_t*) calloc(3, sizeof(uint32_t)); + for (int i=0; i<3; i++) + { + lastID = ID; + while (ID == lastID) + { + sl[i]++; + scan_line(ifile, posID, &ID, posPower, &rawPower); + } + } + if (sl[0] <= sl[1] <= sl[2] || sl[1] <= sl[0] <= sl[2]) + refTsLength = sl[2]; + else if (sl[1] <= sl[2] <= sl[0] || sl[2] <= sl[1] <= sl[0]) + refTsLength = sl[0]; + else + refTsLength = sl[1]; + free(sl); + //go back at the beginning of the first series (ready to read '\n'...) + fseek(ifile, headerShift-1, SEEK_SET); + + // output file to write time-series sequentially, CSV format. + FILE* ofile = fopen(ofileName, "w"); + + // process one client (ID in first column) at a time + uint64_t processedLines = 0; //execution trace + uint32_t seriesCount=0, skippedSeriesCount=0, tsLength=0; + uint32_t mismatchLengthCount=0; + float tsBuffer[refTsLength]; + lastID = 0; + while (!feof(ifile)) + { + // next element to read always start with a digit + do + curChar = fgetc(ifile); + while (!feof(ifile) && (curChar < '0' || curChar > '9')); + if (feof(ifile)) + break; + ungetc(curChar, ifile); + + // read line + scan_line(ifile, posID, &ID, posPower, &rawPower); + if (ID != lastID) + { + //just starting a new time-series: must process the last one (if there is a last one !) + if (lastID > 0) + { + if (tsLength == refTsLength) + { + for (int i=0; i 0 && ++seriesCount >= nbItems) + break; + } + //if something wrong happened, skip series + else + { + skippedSeriesCount++; + if (tsLength != refTsLength) + mismatchLengthCount++; + } + } + + // reinitialize flags + tsLength = 0; + lastID = ID; + } + + //We cannot write more than refTsLength values + if (tsLength < refTsLength) + tsBuffer[tsLength++] = rawPower; + + if ((++processedLines) % 1000000 == 0) + fprintf(stdout,"Processed %"PRIu64" lines\n", processedLines); + } + + if (tsLength == refTsLength && (nbItems <= 0 || seriesCount < nbItems)) + { + // flush last time-series if all conditions are met + for (int i=0; i