1 #include "TimeSeries/serialize.h"
2 #define __STDC_FORMAT_MACROS
5 #include "Util/types.h"
6 #include "Util/utils.h"
7 #include <cds/Vector.h>
10 // parse a line into two integers (ID, raw power)
11 static void scan_line(FILE* ifile
, int posID
, uint32_t* ID
, int posPower
, uint32_t* rawPower
)
17 if (position
== posID
)
20 nextChar
= readInt(ifile
, &ID_on64bits
);
21 *ID
= (uint32_t)ID_on64bits
;
23 else if (position
== posPower
)
25 Real untruncatedPower
;
26 nextChar
= readReal(ifile
, &untruncatedPower
);
27 if (untruncatedPower
< 0.0)
28 untruncatedPower
= 0.0;
29 *rawPower
= (uint32_t) floor(untruncatedPower
*10.0);
32 //erase the comma (and skip field then)
33 nextChar
= fgetc(ifile
);
35 //continue until next comma (or line end or file end)
36 while (!feof(ifile
) && nextChar
!= '\n' && nextChar
!= '\r' && nextChar
!= ',')
37 nextChar
= fgetc(ifile
);
40 if (feof(ifile
) || nextChar
== '\n' || nextChar
== '\r')
42 // skip all potential line feeds
43 while (!feof(ifile
) && nextChar
== '\n' || nextChar
== '\r')
44 nextChar
= fgetc(ifile
);
46 ungetc(nextChar
, ifile
);
52 //main job: parse a text file into a binary compressed version
53 //TODO [long term]: adapt to distributed files/DB, maybe to distributed binary chunks
54 void serialize_byCols(const char* ifileName
, const char* ofileName
, uint32_t nbItems
)
56 // use the header to know positions of ID and rawPower
57 FILE* ifile
= fopen(ifileName
, "r");
58 uint32_t headerShift
= 0;
60 Vector
* header
= vector_new(char);
63 curChar
= fgetc(ifile
);
65 if (curChar
== '\n' || curChar
== '\r')
67 //flush all potential other line feeds
68 while (curChar
== '\n' || curChar
== '\r')
69 curChar
= fgetc(ifile
);
70 ungetc(curChar
, ifile
);
73 vector_push(header
, curChar
);
76 char* headerString
= (char*)malloc((vector_size(header
) + 1)*sizeof(char));
77 VectorIterator
* it
= vector_get_iterator(header
);
79 while (vectorI_has_data(it
))
81 vectorI_get(it
, headerString
[index
]);
82 vectorI_move_next(it
);
86 headerString
[index
] = 0;
87 vector_destroy(header
);
88 int position
= 1, posID
= 0, posPower
= 0;
89 char* columnDescriptor
= strtok(headerString
, ",");
90 while (columnDescriptor
!= NULL
)
92 if (!strcmp(columnDescriptor
,"FK_CCU_ID") || !strcmp(columnDescriptor
,"fk_ccu_id"))
94 else if (!strcmp(columnDescriptor
,"CPP_PUISSANCE_BRUTE"))
97 columnDescriptor
= strtok(NULL
, ",");
101 //estimate tsLength with a scan of the 3 first series
102 uint32_t ID
=0, rawPower
=0, lastID
=0, refTsLength
=0;
103 scan_line(ifile
, posID
, &ID
, posPower
, &rawPower
);
104 //'sl' = sample lengths (short because a lot of comparisons then)
105 uint32_t* sl
= (uint32_t*) calloc(3, sizeof(uint32_t));
106 for (int i
=0; i
<3; i
++)
112 scan_line(ifile
, posID
, &ID
, posPower
, &rawPower
);
115 if (sl
[1] <= sl
[0] <= sl
[2] || sl
[2] <= sl
[0] <= sl
[1])
117 else if (sl
[0] <= sl
[1] <= sl
[2] || sl
[2] <= sl
[1] <= sl
[0])
122 //go back at the beginning of the first series (ready to read '\n'...)
123 fseek(ifile
, headerShift
-1, SEEK_SET
);
125 // output file to write time-series sequentially, binary format.
126 // Format: for each series, ID is stored on 4 bytes (unsigned integer32). Then,
127 // (<rawPower>)+ follow, with rawPower stored as a "3 bytes int"
128 // rawPower values are multiplied by 10 and truncated one digit after 0
129 // NOTE: no raw power should be exactly zero
130 FILE* ofile
= fopen(ofileName
, "wb");
132 // leave space to write the number of series (32bits), and their length in bytes (32bits)
133 for (int i
= 0; i
< 8; i
++)
136 // process one client (ID in first column) at a time
137 uint64_t processedLines
= 0; //execution trace
138 uint32_t seriesCount
=0, skippedSeriesCount
=0, tsLength
=0;
139 uint32_t mismatchLengthCount
=0, overflowCount
=0;
140 Byte tsBuffer
[4+3*refTsLength
];
145 // next element to read always start with a digit
147 curChar
= fgetc(ifile
);
148 while (!feof(ifile
) && (curChar
< '0' || curChar
> '9'));
151 ungetc(curChar
, ifile
);
154 scan_line(ifile
, posID
, &ID
, posPower
, &rawPower
);
157 //just starting a new time-series: must process the last one (if there is a last one !)
160 if (tsLength
== refTsLength
&& !overflow
)
163 fwrite(tsBuffer
, 4+3*tsLength
, 1, ofile
);
164 if (nbItems
> 0 && seriesCount
>= nbItems
)
167 //if something wrong happened, skip series
170 skippedSeriesCount
++;
171 if (tsLength
!= refTsLength
)
172 mismatchLengthCount
++;
178 // ID for the new series is printed only once:
179 write_int(ID
, 4, tsBuffer
);
180 // reinitialize flags
186 overflow
= (overflow
|| (rawPower
>= (1 << 24)));
187 //We cannot write more than refTsLength bytes
188 if (tsLength
< refTsLength
)
189 write_int(rawPower
, 3, tsBuffer
+ 4+3*tsLength
);
192 if ((++processedLines
) % 1000000 == 0)
193 fprintf(stdout
,"Processed %"PRIu64
" lines\n", processedLines
);
196 if (!overflow
&& tsLength
== refTsLength
&& (nbItems
<= 0 || seriesCount
< nbItems
))
198 // flush last time-series if all conditions are met
199 fwrite(tsBuffer
, 4+3*tsLength
, 1, ofile
);
202 else if (nbItems
<= 0 || seriesCount
< nbItems
)
204 if (tsLength
!= refTsLength
)
205 mismatchLengthCount
++;
210 // write lines count and size of a time-series in bytes
212 fseek(ofile
, 0, SEEK_SET
);
213 write_int(seriesCount
, 4, intBuffer
);
214 fwrite(intBuffer
, 1, 4, ofile
);
215 // re-express tsLength in bytes (not forgetting the ID))
216 write_int(4 + 3 * refTsLength
, 4, intBuffer
);
217 fwrite(intBuffer
, 1, 4, ofile
);
219 // finally print some statistics
220 if (seriesCount
< nbItems
)
221 fprintf(stdout
,"Warning: only %u series retrieved.\n",seriesCount
);
222 fprintf(stdout
,"%u overflows / %u mismatch series lengths.\n",overflowCount
,mismatchLengthCount
);
228 //serialize from usual 'by-row' data (for StarLight example and toy dataset)
229 void serialize_byRows(const char* ifileName
, const char* ofileName
, uint32_t nbItems
)
231 FILE* ifile
= fopen(ifileName
, "r");
232 // first scan to know dimensions
233 uint32_t nbValues
= 0; //ID (or, more useful, real class number) comes first
235 while (curChar
!= '\n' && curChar
!= '\r')
237 curChar
= fgetc(ifile
);
241 //skip potential consecutive commas (could be hard to spot)
242 while (curChar
== ',')
243 curChar
= fgetc(ifile
);
244 ungetc(curChar
, ifile
);
247 while (curChar
== '\n' || curChar
== '\r')
248 curChar
= fgetc(ifile
);
249 ungetc(curChar
, ifile
);
250 uint32_t nbSeries
= 1; //we already read 1st line
253 if ((curChar
= fgetc(ifile
)) == '\n')
256 fseek(ifile
, 0, SEEK_SET
);
258 //write meta info first
259 uint32_t tsLength
= 3*nbValues
+4;
260 FILE* ofile
= fopen(ofileName
, "wb");
262 write_int(nbSeries
, 4, intBuffer
);
263 fwrite(intBuffer
, 1, 4, ofile
);
264 write_int(tsLength
, 4, intBuffer
);
265 fwrite(intBuffer
, 1, 4, ofile
);
269 //DEBUG / TEST (ugly, TOFIX...)
270 double minrp
= INFINITY
, maxrp
= -INFINITY
;
272 for (uint32_t i
=0; i
<nbSeries
; i
++)
274 //skip potential line feeds before next line
275 while (curChar
== '\n' || curChar
== '\r')
276 curChar
= fgetc(ifile
);
277 ungetc(curChar
, ifile
);
278 curChar
= readInt(ifile
, &ID
);
279 write_int((uint32_t)ID
, 4, intBuffer
);
280 fwrite(intBuffer
, 1, 4, ofile
);
281 while (curChar
== ',')
282 curChar
= fgetc(ifile
);
283 ungetc(curChar
, ifile
);
284 for (uint32_t j
=0; j
<nbValues
; j
++)
286 curChar
= readReal(ifile
, &rawPower
);
288 //DEBUG / TEST (ugly, TOFIX...)
289 if (rawPower
< minrp
)
291 if (rawPower
> maxrp
)
294 write_int((uint32_t)floor(10.0*(rawPower
+0.0)), 3, intBuffer
); //x10... +3...
295 fwrite(intBuffer
, 1, 3, ofile
);
296 while (curChar
== ',')
297 curChar
= fgetc(ifile
);
298 ungetc(curChar
, ifile
);
304 //DEBUG / TEST (ugly, TOFIX...)
305 printf("min / max values = %g %g\n",minrp
,maxrp
);