1 #include "TimeSeries/serialize.h"
2 #define __STDC_FORMAT_MACROS
5 #include "Util/types.h"
6 #include "Util/utils.h"
7 #include <cgds/Vector.h>
10 // parse a line into two integers (ID, raw power)
11 static void scan_line(FILE* ifile
, int posID
, uint32_t* ID
, int posPower
, float* rawPower
)
17 if (position
== posID
)
20 nextChar
= readInt(ifile
, &ID_on64bits
);
21 *ID
= (uint32_t)ID_on64bits
;
23 else if (position
== posPower
)
26 nextChar
= readReal(ifile
, &power
);
27 *rawPower
= (float) power
;
30 //erase the comma (and skip field then)
31 nextChar
= fgetc(ifile
);
33 //continue until next comma (or line end or file end)
34 while (!feof(ifile
) && nextChar
!= '\n' && nextChar
!= '\r' && nextChar
!= ',')
35 nextChar
= fgetc(ifile
);
38 if (feof(ifile
) || nextChar
== '\n' || nextChar
== '\r')
40 // skip all potential line feeds
41 while (!feof(ifile
) && nextChar
== '\n' || nextChar
== '\r')
42 nextChar
= fgetc(ifile
);
44 ungetc(nextChar
, ifile
);
50 //main job: parse a text file into a binary compressed version
51 //TODO [long term]: adapt to distributed files/DB, maybe to distributed binary chunks
52 void serialize_byCols(const char* ifileName
, const char* ofileName
, uint32_t nbItems
)
54 // use the header to know positions of ID and rawPower
55 FILE* ifile
= fopen(ifileName
, "r");
56 uint32_t headerShift
= 0;
58 Vector
* header
= vector_new(char);
61 curChar
= fgetc(ifile
);
63 if (curChar
== '\n' || curChar
== '\r')
65 //flush all potential other line feeds
66 while (curChar
== '\n' || curChar
== '\r')
67 curChar
= fgetc(ifile
);
68 ungetc(curChar
, ifile
);
71 vector_push(header
, curChar
);
74 char* headerString
= (char*)malloc((vector_size(header
) + 1)*sizeof(char));
75 VectorIterator
* it
= vector_get_iterator(header
);
77 while (vectorI_has_data(it
))
79 vectorI_get(it
, headerString
[index
]);
80 vectorI_move_next(it
);
84 headerString
[index
] = 0;
85 vector_destroy(header
);
86 int position
= 1, posID
= 0, posPower
= 0;
87 char* columnDescriptor
= strtok(headerString
, ",");
88 while (columnDescriptor
!= NULL
)
90 if (!strcmp(columnDescriptor
,"FK_CCU_ID") || !strcmp(columnDescriptor
,"fk_ccu_id"))
92 else if (!strcmp(columnDescriptor
,"CPP_PUISSANCE_BRUTE"))
95 columnDescriptor
= strtok(NULL
, ",");
99 //estimate tsLength with a scan of the 3 first series
100 uint32_t ID
=0, lastID
=0, refTsLength
=0;
101 float rawPower
= 0.0;
102 scan_line(ifile
, posID
, &ID
, posPower
, &rawPower
);
103 //'sl' = sample lengths (short because a lot of comparisons then)
104 uint32_t* sl
= (uint32_t*) calloc(3, sizeof(uint32_t));
105 for (int i
=0; i
<3; i
++)
111 scan_line(ifile
, posID
, &ID
, posPower
, &rawPower
);
114 if (sl
[1] <= sl
[0] <= sl
[2] || sl
[2] <= sl
[0] <= sl
[1])
116 else if (sl
[0] <= sl
[1] <= sl
[2] || sl
[2] <= sl
[1] <= sl
[0])
121 //go back at the beginning of the first series (ready to read '\n'...)
122 fseek(ifile
, headerShift
-1, SEEK_SET
);
124 // output file to write time-series sequentially, binary format.
125 // Format: for each series, ID is stored on 4 bytes (unsigned integer32). Then,
126 // (<rawPower>)+ follow, with rawPower stored as a float
127 FILE* ofile
= fopen(ofileName
, "wb");
129 // leave space to write the number of series (32bits), and their length in bytes (32bits)
130 for (int i
= 0; i
< 8; i
++)
133 // process one client (ID in first column) at a time
134 uint64_t processedLines
= 0; //execution trace
135 uint32_t seriesCount
=0, skippedSeriesCount
=0, tsLength
=0;
136 uint32_t mismatchLengthCount
=0;
137 Byte tsBuffer
[4+4*refTsLength
];
141 // next element to read always start with a digit
143 curChar
= fgetc(ifile
);
144 while (!feof(ifile
) && (curChar
< '0' || curChar
> '9'));
147 ungetc(curChar
, ifile
);
150 scan_line(ifile
, posID
, &ID
, posPower
, &rawPower
);
153 //just starting a new time-series: must process the last one (if there is a last one !)
156 if (tsLength
== refTsLength
)
159 fwrite(tsBuffer
, 4+4*tsLength
, 1, ofile
);
160 if (nbItems
> 0 && seriesCount
>= nbItems
)
163 //if something wrong happened, skip series
166 skippedSeriesCount
++;
167 if (tsLength
!= refTsLength
)
168 mismatchLengthCount
++;
172 // ID for the new series is printed only once:
173 write_int(ID
, tsBuffer
);
174 // reinitialize flags
179 //We cannot write more than refTsLength bytes
180 if (tsLength
< refTsLength
)
181 write_real(rawPower
, tsBuffer
+ 4+4*tsLength
);
184 if ((++processedLines
) % 1000000 == 0)
185 fprintf(stdout
,"Processed %"PRIu64
" lines\n", processedLines
);
188 if (tsLength
== refTsLength
&& (nbItems
<= 0 || seriesCount
< nbItems
))
190 // flush last time-series if all conditions are met
191 fwrite(tsBuffer
, 4+4*tsLength
, 1, ofile
);
194 else if (nbItems
<= 0 || seriesCount
< nbItems
)
196 if (tsLength
!= refTsLength
)
197 mismatchLengthCount
++;
200 // write lines count and size of a time-series in bytes
202 fseek(ofile
, 0, SEEK_SET
);
203 write_int(seriesCount
, intBuffer
);
204 fwrite(intBuffer
, 1, 4, ofile
);
205 // re-express tsLength in bytes (not forgetting the ID))
206 write_int(4 + 4 * refTsLength
, intBuffer
);
207 fwrite(intBuffer
, 1, 4, ofile
);
209 // finally print some statistics
210 if (seriesCount
< nbItems
)
211 fprintf(stdout
,"Warning: only %u series retrieved.\n",seriesCount
);
212 fprintf(stdout
,"%u mismatch series lengths.\n",mismatchLengthCount
);
218 //serialize from usual 'by-row' data
219 void serialize_byRows(const char* ifileName
, const char* ofileName
, uint32_t nbItems
)
221 FILE* ifile
= fopen(ifileName
, "r");
222 // first scan to know dimensions
223 uint32_t nbValues
= 0; //ID (or, more useful, real class number) comes first
225 while (curChar
!= '\n' && curChar
!= '\r')
227 curChar
= fgetc(ifile
);
231 //skip potential consecutive commas (could be hard to spot)
232 //TODO(...): should be 'NA' in R, thus an error (we don't handle NAs)...
233 while (curChar
== ',')
234 curChar
= fgetc(ifile
);
235 ungetc(curChar
, ifile
);
238 while (curChar
== '\n' || curChar
== '\r')
239 curChar
= fgetc(ifile
);
240 ungetc(curChar
, ifile
);
241 uint32_t nbSeries
= 1; //we already read 1st line
244 if ((curChar
= fgetc(ifile
)) == '\n')
247 fseek(ifile
, 0, SEEK_SET
);
249 //write meta info first
250 uint32_t tsLength
= 4*nbValues
+4;
251 FILE* ofile
= fopen(ofileName
, "wb");
253 write_int(nbSeries
, 4, intBuffer
);
254 fwrite(intBuffer
, 1, 4, ofile
);
255 write_int(tsLength
, 4, intBuffer
);
256 fwrite(intBuffer
, 1, 4, ofile
);
260 for (uint32_t i
=0; i
<nbSeries
; i
++)
262 //skip potential line feeds before next line
263 while (curChar
== '\n' || curChar
== '\r')
264 curChar
= fgetc(ifile
);
265 ungetc(curChar
, ifile
);
266 curChar
= readInt(ifile
, &ID
);
267 write_int((uint32_t)ID
, 4, intBuffer
);
268 fwrite(intBuffer
, 1, 4, ofile
);
269 while (curChar
== ',')
270 curChar
= fgetc(ifile
);
271 ungetc(curChar
, ifile
);
272 for (uint32_t j
=0; j
<nbValues
; j
++)
274 curChar
= readReal(ifile
, &rawPower
);
275 write_real(rawPower
, intBuffer
);
276 fwrite(intBuffer
, 1, 4, ofile
);
277 while (curChar
== ',')
278 curChar
= fgetc(ifile
);
279 ungetc(curChar
, ifile
);