commit last state
[ppam-mpi.git] / code / src / TimeSeries / serialize.c
1 #include "TimeSeries/serialize.h"
2 #define __STDC_FORMAT_MACROS
3 #include <inttypes.h>
4 #include <math.h>
5 #include "Util/types.h"
6 #include "Util/utils.h"
7 #include <cds/Vector.h>
8 #include <string.h>
9
10 // parse a line into two integers (ID, raw power)
11 static void scan_line(FILE* ifile, int posID, uint32_t* ID, int posPower, uint32_t* rawPower)
12 {
13 char nextChar;
14 int position = 1;
15 while (1)
16 {
17 if (position == posID)
18 {
19 int64_t ID_on64bits;
20 nextChar = readInt(ifile, &ID_on64bits);
21 *ID = (uint32_t)ID_on64bits;
22 }
23 else if (position == posPower)
24 {
25 Real untruncatedPower;
26 nextChar = readReal(ifile, &untruncatedPower);
27 if (untruncatedPower < 0.0)
28 untruncatedPower = 0.0;
29 *rawPower = (uint32_t) floor(untruncatedPower*10.0);
30 }
31 else
32 //erase the comma (and skip field then)
33 nextChar = fgetc(ifile);
34
35 //continue until next comma (or line end or file end)
36 while (!feof(ifile) && nextChar != '\n' && nextChar != '\r' && nextChar != ',')
37 nextChar = fgetc(ifile);
38 position++;
39
40 if (feof(ifile) || nextChar == '\n' || nextChar == '\r')
41 {
42 // skip all potential line feeds
43 while (!feof(ifile) && nextChar == '\n' || nextChar == '\r')
44 nextChar = fgetc(ifile);
45 if (!feof(ifile))
46 ungetc(nextChar, ifile);
47 break;
48 }
49 }
50 }
51
52 //main job: parse a text file into a binary compressed version
53 //TODO [long term]: adapt to distributed files/DB, maybe to distributed binary chunks
54 void serialize_byCols(const char* ifileName, const char* ofileName, uint32_t nbItems)
55 {
56 // use the header to know positions of ID and rawPower
57 FILE* ifile = fopen(ifileName, "r");
58 uint32_t headerShift = 0;
59 char curChar;
60 Vector* header = vector_new(char);
61 do
62 {
63 curChar = fgetc(ifile);
64 headerShift++;
65 if (curChar == '\n' || curChar == '\r')
66 {
67 //flush all potential other line feeds
68 while (curChar == '\n' || curChar == '\r')
69 curChar = fgetc(ifile);
70 ungetc(curChar, ifile);
71 break;
72 }
73 vector_push(header, curChar);
74 }
75 while (1);
76 char* headerString = (char*)malloc((vector_size(header) + 1)*sizeof(char));
77 VectorIterator* it = vector_get_iterator(header);
78 int index = 0;
79 while (vectorI_has_data(it))
80 {
81 vectorI_get(it, headerString[index]);
82 vectorI_move_next(it);
83 index++;
84 }
85 vectorI_destroy(it);
86 headerString[index] = 0;
87 vector_destroy(header);
88 int position = 1, posID = 0, posPower = 0;
89 char* columnDescriptor = strtok(headerString, ",");
90 while (columnDescriptor != NULL)
91 {
92 if (!strcmp(columnDescriptor,"FK_CCU_ID") || !strcmp(columnDescriptor,"fk_ccu_id"))
93 posID = position;
94 else if (!strcmp(columnDescriptor,"CPP_PUISSANCE_BRUTE"))
95 posPower = position;
96 position++;
97 columnDescriptor = strtok(NULL, ",");
98 }
99 free(headerString);
100
101 //estimate tsLength with a scan of the 3 first series
102 uint32_t ID=0, rawPower=0, lastID=0, refTsLength=0;
103 scan_line(ifile, posID, &ID, posPower, &rawPower);
104 //'sl' = sample lengths (short because a lot of comparisons then)
105 uint32_t* sl = (uint32_t*) calloc(3, sizeof(uint32_t));
106 for (int i=0; i<3; i++)
107 {
108 lastID = ID;
109 while (ID == lastID)
110 {
111 sl[i]++;
112 scan_line(ifile, posID, &ID, posPower, &rawPower);
113 }
114 }
115 if (sl[1] <= sl[0] <= sl[2] || sl[2] <= sl[0] <= sl[1])
116 refTsLength = sl[0];
117 else if (sl[0] <= sl[1] <= sl[2] || sl[2] <= sl[1] <= sl[0])
118 refTsLength = sl[1];
119 else
120 refTsLength = sl[2];
121 free(sl);
122 //go back at the beginning of the first series (ready to read '\n'...)
123 fseek(ifile, headerShift-1, SEEK_SET);
124
125 // output file to write time-series sequentially, binary format.
126 // Format: for each series, ID is stored on 4 bytes (unsigned integer32). Then,
127 // (<rawPower>)+ follow, with rawPower stored as a "3 bytes int"
128 // rawPower values are multiplied by 10 and truncated one digit after 0
129 // NOTE: no raw power should be exactly zero
130 FILE* ofile = fopen(ofileName, "wb");
131
132 // leave space to write the number of series (32bits), and their length in bytes (32bits)
133 for (int i = 0; i < 8; i++)
134 fputc(0, ofile);
135
136 // process one client (ID in first column) at a time
137 uint64_t processedLines = 0; //execution trace
138 uint32_t seriesCount=0, skippedSeriesCount=0, tsLength=0;
139 uint32_t mismatchLengthCount=0, overflowCount=0;
140 Byte tsBuffer[4+3*refTsLength];
141 int overflow = 0;
142 lastID = 0;
143 while (!feof(ifile))
144 {
145 // next element to read always start with a digit
146 do
147 curChar = fgetc(ifile);
148 while (!feof(ifile) && (curChar < '0' || curChar > '9'));
149 if (feof(ifile))
150 break;
151 ungetc(curChar, ifile);
152
153 // read line
154 scan_line(ifile, posID, &ID, posPower, &rawPower);
155 if (ID != lastID)
156 {
157 //just starting a new time-series: must process the last one (if there is a last one !)
158 if (lastID > 0)
159 {
160 if (tsLength == refTsLength && !overflow)
161 {
162 seriesCount++;
163 fwrite(tsBuffer, 4+3*tsLength, 1, ofile);
164 if (nbItems > 0 && seriesCount >= nbItems)
165 break;
166 }
167 //if something wrong happened, skip series
168 else
169 {
170 skippedSeriesCount++;
171 if (tsLength != refTsLength)
172 mismatchLengthCount++;
173 if (overflow)
174 overflowCount++;
175 }
176 }
177
178 // ID for the new series is printed only once:
179 write_int(ID, 4, tsBuffer);
180 // reinitialize flags
181 overflow = 0;
182 tsLength = 0;
183 lastID = ID;
184 }
185
186 overflow = (overflow || (rawPower >= (1 << 24)));
187 //We cannot write more than refTsLength bytes
188 if (tsLength < refTsLength)
189 write_int(rawPower, 3, tsBuffer + 4+3*tsLength);
190 tsLength++;
191
192 if ((++processedLines) % 1000000 == 0)
193 fprintf(stdout,"Processed %"PRIu64" lines\n", processedLines);
194 }
195
196 if (!overflow && tsLength == refTsLength && (nbItems <= 0 || seriesCount < nbItems))
197 {
198 // flush last time-series if all conditions are met
199 fwrite(tsBuffer, 4+3*tsLength, 1, ofile);
200 seriesCount++;
201 }
202 else if (nbItems <= 0 || seriesCount < nbItems)
203 {
204 if (tsLength != refTsLength)
205 mismatchLengthCount++;
206 if (overflow)
207 overflowCount++;
208 }
209
210 // write lines count and size of a time-series in bytes
211 Byte intBuffer[4];
212 fseek(ofile, 0, SEEK_SET);
213 write_int(seriesCount, 4, intBuffer);
214 fwrite(intBuffer, 1, 4, ofile);
215 // re-express tsLength in bytes (not forgetting the ID))
216 write_int(4 + 3 * refTsLength, 4, intBuffer);
217 fwrite(intBuffer, 1, 4, ofile);
218
219 // finally print some statistics
220 if (seriesCount < nbItems)
221 fprintf(stdout,"Warning: only %u series retrieved.\n",seriesCount);
222 fprintf(stdout,"%u overflows / %u mismatch series lengths.\n",overflowCount,mismatchLengthCount);
223
224 fclose(ifile);
225 fclose(ofile);
226 }
227
228 //serialize from usual 'by-row' data (for StarLight example and toy dataset)
229 void serialize_byRows(const char* ifileName, const char* ofileName, uint32_t nbItems)
230 {
231 FILE* ifile = fopen(ifileName, "r");
232 // first scan to know dimensions
233 uint32_t nbValues = 0; //ID (or, more useful, real class number) comes first
234 char curChar = ' ';
235 while (curChar != '\n' && curChar != '\r')
236 {
237 curChar = fgetc(ifile);
238 if (curChar == ',')
239 {
240 nbValues++;
241 //skip potential consecutive commas (could be hard to spot)
242 while (curChar == ',')
243 curChar = fgetc(ifile);
244 ungetc(curChar, ifile);
245 }
246 }
247 while (curChar == '\n' || curChar == '\r')
248 curChar = fgetc(ifile);
249 ungetc(curChar, ifile);
250 uint32_t nbSeries = 1; //we already read 1st line
251 while (!feof(ifile))
252 {
253 if ((curChar = fgetc(ifile)) == '\n')
254 nbSeries++;
255 }
256 fseek(ifile, 0, SEEK_SET);
257
258 //write meta info first
259 uint32_t tsLength = 3*nbValues+4;
260 FILE* ofile = fopen(ofileName, "wb");
261 Byte intBuffer[4];
262 write_int(nbSeries, 4, intBuffer);
263 fwrite(intBuffer, 1, 4, ofile);
264 write_int(tsLength, 4, intBuffer);
265 fwrite(intBuffer, 1, 4, ofile);
266 Real rawPower;
267 int64_t ID;
268
269 //DEBUG / TEST (ugly, TOFIX...)
270 double minrp = INFINITY, maxrp = -INFINITY;
271
272 for (uint32_t i=0; i<nbSeries; i++)
273 {
274 //skip potential line feeds before next line
275 while (curChar == '\n' || curChar == '\r')
276 curChar = fgetc(ifile);
277 ungetc(curChar, ifile);
278 curChar = readInt(ifile, &ID);
279 write_int((uint32_t)ID, 4, intBuffer);
280 fwrite(intBuffer, 1, 4, ofile);
281 while (curChar == ',')
282 curChar = fgetc(ifile);
283 ungetc(curChar, ifile);
284 for (uint32_t j=0; j<nbValues; j++)
285 {
286 curChar = readReal(ifile, &rawPower);
287
288 //DEBUG / TEST (ugly, TOFIX...)
289 if (rawPower < minrp)
290 minrp = rawPower;
291 if (rawPower > maxrp)
292 maxrp = rawPower;
293
294 write_int((uint32_t)floor(10.0*(rawPower+0.0)), 3, intBuffer); //x10... +3...
295 fwrite(intBuffer, 1, 3, ofile);
296 while (curChar == ',')
297 curChar = fgetc(ifile);
298 ungetc(curChar, ifile);
299 }
300 }
301 fclose(ifile);
302 fclose(ofile);
303
304 //DEBUG / TEST (ugly, TOFIX...)
305 printf("min / max values = %g %g\n",minrp,maxrp);
306 }