complete first draft of package
[epclust.git] / old_C_code / stage1 / src / TimeSeries / serialize.c
CommitLineData
ab4a34ef
BA
1#include "TimeSeries/serialize.h"
2#define __STDC_FORMAT_MACROS
3#include <inttypes.h>
4#include <math.h>
5#include "Util/types.h"
6#include "Util/utils.h"
311c5c07 7#include <cgds/Vector.h>
ab4a34ef
BA
8#include <string.h>
9
10// parse a line into two integers (ID, raw power)
ebf1280e 11static void scan_line(FILE* ifile, int posID, uint32_t* ID, int posPower, float* rawPower)
ab4a34ef
BA
12{
13 char nextChar;
14 int position = 1;
15 while (1)
16 {
17 if (position == posID)
18 {
19 int64_t ID_on64bits;
20 nextChar = readInt(ifile, &ID_on64bits);
21 *ID = (uint32_t)ID_on64bits;
22 }
23 else if (position == posPower)
24 {
73d68777 25 float power;
ebf1280e
BA
26 nextChar = readReal(ifile, &power);
27 *rawPower = (float) power;
ab4a34ef
BA
28 }
29 else
30 //erase the comma (and skip field then)
31 nextChar = fgetc(ifile);
ebf1280e 32
ab4a34ef
BA
33 //continue until next comma (or line end or file end)
34 while (!feof(ifile) && nextChar != '\n' && nextChar != '\r' && nextChar != ',')
35 nextChar = fgetc(ifile);
36 position++;
37
38 if (feof(ifile) || nextChar == '\n' || nextChar == '\r')
39 {
40 // skip all potential line feeds
41 while (!feof(ifile) && nextChar == '\n' || nextChar == '\r')
42 nextChar = fgetc(ifile);
43 if (!feof(ifile))
44 ungetc(nextChar, ifile);
45 break;
46 }
47 }
48}
49
50//main job: parse a text file into a binary compressed version
51//TODO [long term]: adapt to distributed files/DB, maybe to distributed binary chunks
52void serialize_byCols(const char* ifileName, const char* ofileName, uint32_t nbItems)
53{
54 // use the header to know positions of ID and rawPower
55 FILE* ifile = fopen(ifileName, "r");
56 uint32_t headerShift = 0;
57 char curChar;
58 Vector* header = vector_new(char);
59 do
60 {
61 curChar = fgetc(ifile);
62 headerShift++;
63 if (curChar == '\n' || curChar == '\r')
64 {
65 //flush all potential other line feeds
66 while (curChar == '\n' || curChar == '\r')
67 curChar = fgetc(ifile);
68 ungetc(curChar, ifile);
69 break;
70 }
71 vector_push(header, curChar);
72 }
73 while (1);
74 char* headerString = (char*)malloc((vector_size(header) + 1)*sizeof(char));
75 VectorIterator* it = vector_get_iterator(header);
76 int index = 0;
77 while (vectorI_has_data(it))
78 {
79 vectorI_get(it, headerString[index]);
80 vectorI_move_next(it);
81 index++;
82 }
83 vectorI_destroy(it);
84 headerString[index] = 0;
85 vector_destroy(header);
86 int position = 1, posID = 0, posPower = 0;
87 char* columnDescriptor = strtok(headerString, ",");
88 while (columnDescriptor != NULL)
89 {
90 if (!strcmp(columnDescriptor,"FK_CCU_ID") || !strcmp(columnDescriptor,"fk_ccu_id"))
91 posID = position;
92 else if (!strcmp(columnDescriptor,"CPP_PUISSANCE_BRUTE"))
93 posPower = position;
94 position++;
95 columnDescriptor = strtok(NULL, ",");
96 }
97 free(headerString);
98
99 //estimate tsLength with a scan of the 3 first series
ebf1280e
BA
100 uint32_t ID=0, lastID=0, refTsLength=0;
101 float rawPower = 0.0;
ab4a34ef
BA
102 scan_line(ifile, posID, &ID, posPower, &rawPower);
103 //'sl' = sample lengths (short because a lot of comparisons then)
104 uint32_t* sl = (uint32_t*) calloc(3, sizeof(uint32_t));
105 for (int i=0; i<3; i++)
106 {
107 lastID = ID;
108 while (ID == lastID)
109 {
ebf1280e 110 sl[i]++;
ab4a34ef
BA
111 scan_line(ifile, posID, &ID, posPower, &rawPower);
112 }
113 }
114 if (sl[1] <= sl[0] <= sl[2] || sl[2] <= sl[0] <= sl[1])
115 refTsLength = sl[0];
116 else if (sl[0] <= sl[1] <= sl[2] || sl[2] <= sl[1] <= sl[0])
117 refTsLength = sl[1];
118 else
119 refTsLength = sl[2];
120 free(sl);
121 //go back at the beginning of the first series (ready to read '\n'...)
122 fseek(ifile, headerShift-1, SEEK_SET);
123
124 // output file to write time-series sequentially, binary format.
125 // Format: for each series, ID is stored on 4 bytes (unsigned integer32). Then,
ebf1280e 126 // (<rawPower>)+ follow, with rawPower stored as a float
ab4a34ef
BA
127 FILE* ofile = fopen(ofileName, "wb");
128
129 // leave space to write the number of series (32bits), and their length in bytes (32bits)
130 for (int i = 0; i < 8; i++)
131 fputc(0, ofile);
132
133 // process one client (ID in first column) at a time
134 uint64_t processedLines = 0; //execution trace
135 uint32_t seriesCount=0, skippedSeriesCount=0, tsLength=0;
ebf1280e
BA
136 uint32_t mismatchLengthCount=0;
137 Byte tsBuffer[4+4*refTsLength];
ab4a34ef
BA
138 lastID = 0;
139 while (!feof(ifile))
140 {
141 // next element to read always start with a digit
142 do
143 curChar = fgetc(ifile);
144 while (!feof(ifile) && (curChar < '0' || curChar > '9'));
145 if (feof(ifile))
146 break;
147 ungetc(curChar, ifile);
148
149 // read line
150 scan_line(ifile, posID, &ID, posPower, &rawPower);
151 if (ID != lastID)
152 {
153 //just starting a new time-series: must process the last one (if there is a last one !)
154 if (lastID > 0)
311c5c07 155 {
ebf1280e 156 if (tsLength == refTsLength)
ab4a34ef
BA
157 {
158 seriesCount++;
ebf1280e 159 fwrite(tsBuffer, 4+4*tsLength, 1, ofile);
ab4a34ef
BA
160 if (nbItems > 0 && seriesCount >= nbItems)
161 break;
162 }
163 //if something wrong happened, skip series
164 else
165 {
166 skippedSeriesCount++;
167 if (tsLength != refTsLength)
168 mismatchLengthCount++;
ab4a34ef
BA
169 }
170 }
ebf1280e 171
ab4a34ef 172 // ID for the new series is printed only once:
ebf1280e 173 write_int(ID, tsBuffer);
ab4a34ef 174 // reinitialize flags
ab4a34ef
BA
175 tsLength = 0;
176 lastID = ID;
177 }
178
ab4a34ef
BA
179 //We cannot write more than refTsLength bytes
180 if (tsLength < refTsLength)
ebf1280e 181 write_real(rawPower, tsBuffer + 4+4*tsLength);
ab4a34ef 182 tsLength++;
ebf1280e 183
ab4a34ef
BA
184 if ((++processedLines) % 1000000 == 0)
185 fprintf(stdout,"Processed %"PRIu64" lines\n", processedLines);
186 }
187
ebf1280e 188 if (tsLength == refTsLength && (nbItems <= 0 || seriesCount < nbItems))
ab4a34ef
BA
189 {
190 // flush last time-series if all conditions are met
ebf1280e 191 fwrite(tsBuffer, 4+4*tsLength, 1, ofile);
ab4a34ef
BA
192 seriesCount++;
193 }
194 else if (nbItems <= 0 || seriesCount < nbItems)
195 {
196 if (tsLength != refTsLength)
197 mismatchLengthCount++;
ab4a34ef
BA
198 }
199
200 // write lines count and size of a time-series in bytes
201 Byte intBuffer[4];
202 fseek(ofile, 0, SEEK_SET);
ebf1280e 203 write_int(seriesCount, intBuffer);
ab4a34ef
BA
204 fwrite(intBuffer, 1, 4, ofile);
205 // re-express tsLength in bytes (not forgetting the ID))
ebf1280e 206 write_int(4 + 4 * refTsLength, intBuffer);
ab4a34ef
BA
207 fwrite(intBuffer, 1, 4, ofile);
208
209 // finally print some statistics
210 if (seriesCount < nbItems)
211 fprintf(stdout,"Warning: only %u series retrieved.\n",seriesCount);
ebf1280e
BA
212 fprintf(stdout,"%u mismatch series lengths.\n",mismatchLengthCount);
213
ab4a34ef
BA
214 fclose(ifile);
215 fclose(ofile);
216}
217
ebf1280e 218//serialize from usual 'by-row' data
ab4a34ef
BA
219void serialize_byRows(const char* ifileName, const char* ofileName, uint32_t nbItems)
220{
221 FILE* ifile = fopen(ifileName, "r");
222 // first scan to know dimensions
223 uint32_t nbValues = 0; //ID (or, more useful, real class number) comes first
224 char curChar = ' ';
225 while (curChar != '\n' && curChar != '\r')
226 {
227 curChar = fgetc(ifile);
228 if (curChar == ',')
229 {
230 nbValues++;
231 //skip potential consecutive commas (could be hard to spot)
ebf1280e 232 //TODO(...): should be 'NA' in R, thus an error (we don't handle NAs)...
ab4a34ef
BA
233 while (curChar == ',')
234 curChar = fgetc(ifile);
235 ungetc(curChar, ifile);
236 }
237 }
238 while (curChar == '\n' || curChar == '\r')
239 curChar = fgetc(ifile);
240 ungetc(curChar, ifile);
241 uint32_t nbSeries = 1; //we already read 1st line
242 while (!feof(ifile))
243 {
244 if ((curChar = fgetc(ifile)) == '\n')
245 nbSeries++;
246 }
247 fseek(ifile, 0, SEEK_SET);
248
249 //write meta info first
ebf1280e 250 uint32_t tsLength = 4*nbValues+4;
ab4a34ef
BA
251 FILE* ofile = fopen(ofileName, "wb");
252 Byte intBuffer[4];
73d68777 253 write_int(nbSeries, intBuffer);
ab4a34ef 254 fwrite(intBuffer, 1, 4, ofile);
73d68777 255 write_int(tsLength, intBuffer);
ab4a34ef 256 fwrite(intBuffer, 1, 4, ofile);
73d68777 257 float rawPower;
ab4a34ef 258 int64_t ID;
ebf1280e 259
ab4a34ef
BA
260 for (uint32_t i=0; i<nbSeries; i++)
261 {
262 //skip potential line feeds before next line
263 while (curChar == '\n' || curChar == '\r')
264 curChar = fgetc(ifile);
265 ungetc(curChar, ifile);
266 curChar = readInt(ifile, &ID);
73d68777 267 write_int((uint32_t)ID, intBuffer);
ab4a34ef
BA
268 fwrite(intBuffer, 1, 4, ofile);
269 while (curChar == ',')
270 curChar = fgetc(ifile);
271 ungetc(curChar, ifile);
272 for (uint32_t j=0; j<nbValues; j++)
273 {
274 curChar = readReal(ifile, &rawPower);
ebf1280e
BA
275 write_real(rawPower, intBuffer);
276 fwrite(intBuffer, 1, 4, ofile);
ab4a34ef
BA
277 while (curChar == ',')
278 curChar = fgetc(ifile);
279 ungetc(curChar, ifile);
280 }
281 }
282 fclose(ifile);
283 fclose(ofile);
ab4a34ef 284}