Commit | Line | Data |
---|---|---|
ab4a34ef BA |
1 | #include "TimeSeries/serialize.h" |
2 | #define __STDC_FORMAT_MACROS | |
3 | #include <inttypes.h> | |
4 | #include <math.h> | |
5 | #include "Util/types.h" | |
6 | #include "Util/utils.h" | |
311c5c07 | 7 | #include <cgds/Vector.h> |
ab4a34ef BA |
8 | #include <string.h> |
9 | ||
10 | // parse a line into two integers (ID, raw power) | |
ebf1280e | 11 | static void scan_line(FILE* ifile, int posID, uint32_t* ID, int posPower, float* rawPower) |
ab4a34ef BA |
12 | { |
13 | char nextChar; | |
14 | int position = 1; | |
15 | while (1) | |
16 | { | |
17 | if (position == posID) | |
18 | { | |
19 | int64_t ID_on64bits; | |
20 | nextChar = readInt(ifile, &ID_on64bits); | |
21 | *ID = (uint32_t)ID_on64bits; | |
22 | } | |
23 | else if (position == posPower) | |
24 | { | |
73d68777 | 25 | float power; |
ebf1280e BA |
26 | nextChar = readReal(ifile, &power); |
27 | *rawPower = (float) power; | |
ab4a34ef BA |
28 | } |
29 | else | |
30 | //erase the comma (and skip field then) | |
31 | nextChar = fgetc(ifile); | |
ebf1280e | 32 | |
ab4a34ef BA |
33 | //continue until next comma (or line end or file end) |
34 | while (!feof(ifile) && nextChar != '\n' && nextChar != '\r' && nextChar != ',') | |
35 | nextChar = fgetc(ifile); | |
36 | position++; | |
37 | ||
38 | if (feof(ifile) || nextChar == '\n' || nextChar == '\r') | |
39 | { | |
40 | // skip all potential line feeds | |
41 | while (!feof(ifile) && nextChar == '\n' || nextChar == '\r') | |
42 | nextChar = fgetc(ifile); | |
43 | if (!feof(ifile)) | |
44 | ungetc(nextChar, ifile); | |
45 | break; | |
46 | } | |
47 | } | |
48 | } | |
49 | ||
50 | //main job: parse a text file into a binary compressed version | |
51 | //TODO [long term]: adapt to distributed files/DB, maybe to distributed binary chunks | |
52 | void serialize_byCols(const char* ifileName, const char* ofileName, uint32_t nbItems) | |
53 | { | |
54 | // use the header to know positions of ID and rawPower | |
55 | FILE* ifile = fopen(ifileName, "r"); | |
56 | uint32_t headerShift = 0; | |
57 | char curChar; | |
58 | Vector* header = vector_new(char); | |
59 | do | |
60 | { | |
61 | curChar = fgetc(ifile); | |
62 | headerShift++; | |
63 | if (curChar == '\n' || curChar == '\r') | |
64 | { | |
65 | //flush all potential other line feeds | |
66 | while (curChar == '\n' || curChar == '\r') | |
67 | curChar = fgetc(ifile); | |
68 | ungetc(curChar, ifile); | |
69 | break; | |
70 | } | |
71 | vector_push(header, curChar); | |
72 | } | |
73 | while (1); | |
74 | char* headerString = (char*)malloc((vector_size(header) + 1)*sizeof(char)); | |
75 | VectorIterator* it = vector_get_iterator(header); | |
76 | int index = 0; | |
77 | while (vectorI_has_data(it)) | |
78 | { | |
79 | vectorI_get(it, headerString[index]); | |
80 | vectorI_move_next(it); | |
81 | index++; | |
82 | } | |
83 | vectorI_destroy(it); | |
84 | headerString[index] = 0; | |
85 | vector_destroy(header); | |
86 | int position = 1, posID = 0, posPower = 0; | |
87 | char* columnDescriptor = strtok(headerString, ","); | |
88 | while (columnDescriptor != NULL) | |
89 | { | |
90 | if (!strcmp(columnDescriptor,"FK_CCU_ID") || !strcmp(columnDescriptor,"fk_ccu_id")) | |
91 | posID = position; | |
92 | else if (!strcmp(columnDescriptor,"CPP_PUISSANCE_BRUTE")) | |
93 | posPower = position; | |
94 | position++; | |
95 | columnDescriptor = strtok(NULL, ","); | |
96 | } | |
97 | free(headerString); | |
98 | ||
99 | //estimate tsLength with a scan of the 3 first series | |
ebf1280e BA |
100 | uint32_t ID=0, lastID=0, refTsLength=0; |
101 | float rawPower = 0.0; | |
ab4a34ef BA |
102 | scan_line(ifile, posID, &ID, posPower, &rawPower); |
103 | //'sl' = sample lengths (short because a lot of comparisons then) | |
104 | uint32_t* sl = (uint32_t*) calloc(3, sizeof(uint32_t)); | |
105 | for (int i=0; i<3; i++) | |
106 | { | |
107 | lastID = ID; | |
108 | while (ID == lastID) | |
109 | { | |
ebf1280e | 110 | sl[i]++; |
ab4a34ef BA |
111 | scan_line(ifile, posID, &ID, posPower, &rawPower); |
112 | } | |
113 | } | |
114 | if (sl[1] <= sl[0] <= sl[2] || sl[2] <= sl[0] <= sl[1]) | |
115 | refTsLength = sl[0]; | |
116 | else if (sl[0] <= sl[1] <= sl[2] || sl[2] <= sl[1] <= sl[0]) | |
117 | refTsLength = sl[1]; | |
118 | else | |
119 | refTsLength = sl[2]; | |
120 | free(sl); | |
121 | //go back at the beginning of the first series (ready to read '\n'...) | |
122 | fseek(ifile, headerShift-1, SEEK_SET); | |
123 | ||
124 | // output file to write time-series sequentially, binary format. | |
125 | // Format: for each series, ID is stored on 4 bytes (unsigned integer32). Then, | |
ebf1280e | 126 | // (<rawPower>)+ follow, with rawPower stored as a float |
ab4a34ef BA |
127 | FILE* ofile = fopen(ofileName, "wb"); |
128 | ||
129 | // leave space to write the number of series (32bits), and their length in bytes (32bits) | |
130 | for (int i = 0; i < 8; i++) | |
131 | fputc(0, ofile); | |
132 | ||
133 | // process one client (ID in first column) at a time | |
134 | uint64_t processedLines = 0; //execution trace | |
135 | uint32_t seriesCount=0, skippedSeriesCount=0, tsLength=0; | |
ebf1280e BA |
136 | uint32_t mismatchLengthCount=0; |
137 | Byte tsBuffer[4+4*refTsLength]; | |
ab4a34ef BA |
138 | lastID = 0; |
139 | while (!feof(ifile)) | |
140 | { | |
141 | // next element to read always start with a digit | |
142 | do | |
143 | curChar = fgetc(ifile); | |
144 | while (!feof(ifile) && (curChar < '0' || curChar > '9')); | |
145 | if (feof(ifile)) | |
146 | break; | |
147 | ungetc(curChar, ifile); | |
148 | ||
149 | // read line | |
150 | scan_line(ifile, posID, &ID, posPower, &rawPower); | |
151 | if (ID != lastID) | |
152 | { | |
153 | //just starting a new time-series: must process the last one (if there is a last one !) | |
154 | if (lastID > 0) | |
311c5c07 | 155 | { |
ebf1280e | 156 | if (tsLength == refTsLength) |
ab4a34ef BA |
157 | { |
158 | seriesCount++; | |
ebf1280e | 159 | fwrite(tsBuffer, 4+4*tsLength, 1, ofile); |
ab4a34ef BA |
160 | if (nbItems > 0 && seriesCount >= nbItems) |
161 | break; | |
162 | } | |
163 | //if something wrong happened, skip series | |
164 | else | |
165 | { | |
166 | skippedSeriesCount++; | |
167 | if (tsLength != refTsLength) | |
168 | mismatchLengthCount++; | |
ab4a34ef BA |
169 | } |
170 | } | |
ebf1280e | 171 | |
ab4a34ef | 172 | // ID for the new series is printed only once: |
ebf1280e | 173 | write_int(ID, tsBuffer); |
ab4a34ef | 174 | // reinitialize flags |
ab4a34ef BA |
175 | tsLength = 0; |
176 | lastID = ID; | |
177 | } | |
178 | ||
ab4a34ef BA |
179 | //We cannot write more than refTsLength bytes |
180 | if (tsLength < refTsLength) | |
ebf1280e | 181 | write_real(rawPower, tsBuffer + 4+4*tsLength); |
ab4a34ef | 182 | tsLength++; |
ebf1280e | 183 | |
ab4a34ef BA |
184 | if ((++processedLines) % 1000000 == 0) |
185 | fprintf(stdout,"Processed %"PRIu64" lines\n", processedLines); | |
186 | } | |
187 | ||
ebf1280e | 188 | if (tsLength == refTsLength && (nbItems <= 0 || seriesCount < nbItems)) |
ab4a34ef BA |
189 | { |
190 | // flush last time-series if all conditions are met | |
ebf1280e | 191 | fwrite(tsBuffer, 4+4*tsLength, 1, ofile); |
ab4a34ef BA |
192 | seriesCount++; |
193 | } | |
194 | else if (nbItems <= 0 || seriesCount < nbItems) | |
195 | { | |
196 | if (tsLength != refTsLength) | |
197 | mismatchLengthCount++; | |
ab4a34ef BA |
198 | } |
199 | ||
200 | // write lines count and size of a time-series in bytes | |
201 | Byte intBuffer[4]; | |
202 | fseek(ofile, 0, SEEK_SET); | |
ebf1280e | 203 | write_int(seriesCount, intBuffer); |
ab4a34ef BA |
204 | fwrite(intBuffer, 1, 4, ofile); |
205 | // re-express tsLength in bytes (not forgetting the ID)) | |
ebf1280e | 206 | write_int(4 + 4 * refTsLength, intBuffer); |
ab4a34ef BA |
207 | fwrite(intBuffer, 1, 4, ofile); |
208 | ||
209 | // finally print some statistics | |
210 | if (seriesCount < nbItems) | |
211 | fprintf(stdout,"Warning: only %u series retrieved.\n",seriesCount); | |
ebf1280e BA |
212 | fprintf(stdout,"%u mismatch series lengths.\n",mismatchLengthCount); |
213 | ||
ab4a34ef BA |
214 | fclose(ifile); |
215 | fclose(ofile); | |
216 | } | |
217 | ||
ebf1280e | 218 | //serialize from usual 'by-row' data |
ab4a34ef BA |
219 | void serialize_byRows(const char* ifileName, const char* ofileName, uint32_t nbItems) |
220 | { | |
221 | FILE* ifile = fopen(ifileName, "r"); | |
222 | // first scan to know dimensions | |
223 | uint32_t nbValues = 0; //ID (or, more useful, real class number) comes first | |
224 | char curChar = ' '; | |
225 | while (curChar != '\n' && curChar != '\r') | |
226 | { | |
227 | curChar = fgetc(ifile); | |
228 | if (curChar == ',') | |
229 | { | |
230 | nbValues++; | |
231 | //skip potential consecutive commas (could be hard to spot) | |
ebf1280e | 232 | //TODO(...): should be 'NA' in R, thus an error (we don't handle NAs)... |
ab4a34ef BA |
233 | while (curChar == ',') |
234 | curChar = fgetc(ifile); | |
235 | ungetc(curChar, ifile); | |
236 | } | |
237 | } | |
238 | while (curChar == '\n' || curChar == '\r') | |
239 | curChar = fgetc(ifile); | |
240 | ungetc(curChar, ifile); | |
241 | uint32_t nbSeries = 1; //we already read 1st line | |
242 | while (!feof(ifile)) | |
243 | { | |
244 | if ((curChar = fgetc(ifile)) == '\n') | |
245 | nbSeries++; | |
246 | } | |
247 | fseek(ifile, 0, SEEK_SET); | |
248 | ||
249 | //write meta info first | |
ebf1280e | 250 | uint32_t tsLength = 4*nbValues+4; |
ab4a34ef BA |
251 | FILE* ofile = fopen(ofileName, "wb"); |
252 | Byte intBuffer[4]; | |
73d68777 | 253 | write_int(nbSeries, intBuffer); |
ab4a34ef | 254 | fwrite(intBuffer, 1, 4, ofile); |
73d68777 | 255 | write_int(tsLength, intBuffer); |
ab4a34ef | 256 | fwrite(intBuffer, 1, 4, ofile); |
73d68777 | 257 | float rawPower; |
ab4a34ef | 258 | int64_t ID; |
ebf1280e | 259 | |
ab4a34ef BA |
260 | for (uint32_t i=0; i<nbSeries; i++) |
261 | { | |
262 | //skip potential line feeds before next line | |
263 | while (curChar == '\n' || curChar == '\r') | |
264 | curChar = fgetc(ifile); | |
265 | ungetc(curChar, ifile); | |
266 | curChar = readInt(ifile, &ID); | |
73d68777 | 267 | write_int((uint32_t)ID, intBuffer); |
ab4a34ef BA |
268 | fwrite(intBuffer, 1, 4, ofile); |
269 | while (curChar == ',') | |
270 | curChar = fgetc(ifile); | |
271 | ungetc(curChar, ifile); | |
272 | for (uint32_t j=0; j<nbValues; j++) | |
273 | { | |
274 | curChar = readReal(ifile, &rawPower); | |
ebf1280e BA |
275 | write_real(rawPower, intBuffer); |
276 | fwrite(intBuffer, 1, 4, ofile); | |
ab4a34ef BA |
277 | while (curChar == ',') |
278 | curChar = fgetc(ifile); | |
279 | ungetc(curChar, ifile); | |
280 | } | |
281 | } | |
282 | fclose(ifile); | |
283 | fclose(ofile); | |
ab4a34ef | 284 | } |