Commit | Line | Data |
---|---|---|
81923e5c BA |
1 | #include "Util/types.h" |
2 | #include "Util/utils.h" | |
3 | #include <mpi.h> | |
4 | #include <stdlib.h> | |
5 | #include <stdio.h> | |
6 | #include <string.h> | |
7 | #include <math.h> | |
8 | #include <unistd.h> | |
9 | #include "MPI_Communication/pack.h" | |
10 | #include "MPI_Communication/unpack.h" | |
11 | #include "Util/rng.h" | |
12 | ||
13 | // save the final result in XML format | |
14 | static void result_to_XML(Result_t* result, const char* inputFileName, | |
15 | const char* lastBinaryFileName, uint32_t p_for_dissims) | |
16 | { | |
17 | uint32_t nbClusters = result->nbClusters; | |
18 | FILE* ofile = fopen("ppamResult.xml", "w"); | |
19 | ||
20 | fprintf(ofile, "<medoids>\n\n"); | |
21 | ||
22 | fprintf(ofile, " <file>"); | |
23 | fprintf(ofile, "%s", lastBinaryFileName); | |
24 | fprintf(ofile, "</file>\n\n"); | |
25 | ||
26 | fprintf(ofile, " <p_for_dissims>"); | |
27 | fprintf(ofile, "%u", p_for_dissims); | |
28 | fprintf(ofile, "</p_for_dissims>\n\n"); | |
29 | ||
30 | fprintf(ofile, " <IDs>\n"); | |
31 | for (uint32_t i=0; i<nbClusters; i++) | |
32 | fprintf(ofile, " <ID>%u</ID>\n", result->medoids_ID[i]); | |
33 | fprintf(ofile, " </IDs>\n\n"); | |
34 | ||
35 | // medoids ranks in last binary file (human printing: 0 --> 1 ...etc) | |
36 | fprintf(ofile, " <ranks>\n"); | |
37 | for (uint32_t i=0; i<nbClusters; i++) | |
38 | fprintf(ofile, " <rank>%u</rank>\n", result->medoids_ranks[i] + 1); | |
39 | fprintf(ofile, " </ranks>\n\n"); | |
40 | ||
41 | fprintf(ofile, "</medoids>\n"); | |
42 | fclose(ofile); | |
43 | } | |
44 | ||
45 | static void binaryResult_to_file(Result_t* result, const char* inputFileName, | |
46 | const char* outputFileName) | |
47 | { | |
48 | // Determine tsLength from inputFile | |
49 | uint32_t tsLength = get_tsLength(inputFileName); | |
50 | ||
51 | FILE* ifile = fopen(inputFileName, "rb"); | |
52 | FILE* ofile = fopen(outputFileName, "ab"); //'append binary' | |
53 | ||
54 | Byte tsBuffer[tsLength]; | |
55 | for (uint32_t i = 0; i < result->nbClusters; i++) | |
56 | { | |
57 | // Get time-series in (binary) inputFile | |
58 | fseek(ifile, 8 + result->medoids_ranks[i] * tsLength, SEEK_SET); | |
59 | // Copy current serie onto ofile | |
60 | ||
61 | size_t lengthRead = fread(tsBuffer, 1, tsLength, ifile); | |
62 | if (lengthRead != tsLength) | |
63 | fprintf(stderr, "problem while copying binary series to new file.\n"); | |
64 | fwrite(tsBuffer, 1, tsLength, ofile); | |
65 | } | |
66 | ||
67 | fclose(ifile); | |
68 | fclose(ofile); | |
69 | } | |
70 | ||
71 | // fill a new unit of work suitable to be given to a slave | |
72 | static Work_t* get_next_work(char* inputFileName, uint32_t nbSeries, uint32_t nbSeriesInChunk, | |
73 | double idealNbSeriesInChunk, uint32_t jobsSentCount, uint32_t lastEndIndex, | |
74 | uint32_t nbClusters, uint32_t clustOnMedoids, | |
75 | int randomize, uint32_t p_for_dissims) | |
76 | { | |
77 | Work_t* work = (Work_t*) malloc(sizeof(Work_t)); | |
78 | ||
79 | work->inputFileName = (char*) malloc(strlen(inputFileName) + 1); | |
80 | strcpy(work->inputFileName, inputFileName); | |
81 | ||
82 | if (randomize) | |
83 | work->nbSeries = nbSeriesInChunk; | |
84 | else | |
85 | { | |
86 | double adjustedNbSeriesInNextChunk = | |
87 | idealNbSeriesInChunk * (jobsSentCount + 1) - lastEndIndex; | |
88 | // round to closest integer | |
89 | work->nbSeries = (uint32_t)adjustedNbSeriesInNextChunk; | |
90 | // stay below the upper bound (TODO: is this check required ?) | |
91 | if (work->nbSeries > nbSeriesInChunk) | |
92 | work->nbSeries = nbSeriesInChunk; | |
93 | // TODO: what about this one ? (probably useless) | |
94 | if (lastEndIndex + work->nbSeries > nbSeries) | |
95 | work->nbSeries = nbSeries - lastEndIndex; | |
96 | } | |
97 | ||
98 | //TODO: ranks on uint64_t if more than 4.3 billion series at the same place (unlikely...) | |
99 | work->ranks = (uint32_t*) malloc(work->nbSeries * sizeof(uint32_t)); | |
100 | for (uint32_t i = 0; i < work->nbSeries; i++) | |
101 | work->ranks[i] = (randomize ? get_rand_int() % nbSeries : lastEndIndex + i); | |
102 | ||
103 | work->nbClusters = nbClusters; | |
104 | work->clustOnMedoids = clustOnMedoids; | |
105 | work->p_for_dissims = p_for_dissims; | |
106 | ||
107 | return work; | |
108 | } | |
109 | ||
110 | // process all subtasks and save binary results into a new file | |
111 | // NOTE: this file will be smaller than initial file (or DB...) | |
112 | static void clusters_reduce(char* inputFileName, char* outputFileName, uint32_t ntasks, | |
113 | uint32_t totalNbSeries, uint32_t nbSeriesInChunk, double idealNbSeriesInChunk, | |
114 | uint32_t tsLength, uint32_t nbClusters, uint32_t clustOnMedoids, | |
115 | int randomize, uint32_t p_for_dissims) | |
116 | { | |
117 | FILE* ofile = fopen(outputFileName, "wb"); //'write binary' | |
118 | // Leave a blank for series' count and tsLength | |
119 | for (uint32_t i = 0; i < 8; i++) | |
120 | fputc(0, ofile); | |
121 | fclose(ofile); | |
122 | ||
123 | uint32_t jobsSentCount = 0; //used if randomize==FALSE | |
124 | uint32_t lastEndIndex = 0; //used if randomize==FALSE | |
125 | ||
126 | uint32_t sentSeriesCount = 0; //used if randomize==TRUE | |
127 | ||
128 | // Count series sent to binary file on output | |
129 | uint32_t newSeriesCount = 0; | |
130 | ||
131 | // Expected size of a Work message in bytes: | |
132 | uint32_t work_message_length = get_packedWork_length(nbSeriesInChunk); | |
133 | Byte packedWork[work_message_length]; | |
134 | ||
135 | // Expected size of a Result message in bytes: (uint32_t is on 4 bytes) | |
136 | uint32_t result_message_length = 4 + 4 * nbClusters + 4 * nbClusters; | |
137 | Byte packedResult[result_message_length]; | |
138 | ||
139 | // Seed the slaves; send one unit of work to each slave. | |
140 | Work_t* work; | |
141 | int* busy_slave = (int*) malloc(ntasks * sizeof(int)); | |
142 | for (int rank = 1; rank < ntasks; rank++) | |
143 | busy_slave[rank] = 0; | |
144 | for (int rank = 1; rank < ntasks; rank++) | |
145 | { | |
146 | // Find the next item of work to do | |
147 | work = get_next_work(inputFileName, totalNbSeries, nbSeriesInChunk, idealNbSeriesInChunk, | |
148 | jobsSentCount, lastEndIndex, nbClusters, clustOnMedoids, | |
149 | randomize, p_for_dissims); | |
150 | ||
151 | if (randomize) | |
152 | sentSeriesCount += nbSeriesInChunk; | |
153 | else | |
154 | { | |
155 | lastEndIndex = lastEndIndex + work->nbSeries; | |
156 | jobsSentCount++; | |
157 | } | |
158 | ||
159 | // Send it to current rank | |
160 | pack_work(work, nbSeriesInChunk, packedWork); | |
161 | free_work(work); | |
162 | fprintf(stdout, "0 / Send work %s to rank=%i / %u\n",inputFileName, rank, | |
163 | (randomize ? sentSeriesCount : lastEndIndex)); | |
164 | MPI_Send(packedWork, work_message_length, MPI_BYTE, rank, WORKTAG, MPI_COMM_WORLD); | |
165 | ||
166 | busy_slave[rank] = 1; | |
167 | ||
168 | if ((randomize && sentSeriesCount >= 1.5*totalNbSeries) //TODO: 1.5 = heuristic, magic number... | |
169 | || (!randomize && lastEndIndex >= totalNbSeries)) | |
170 | { | |
171 | // Nothing more to read | |
172 | break; | |
173 | } | |
174 | } | |
175 | ||
176 | // Loop over getting new work requests until there is no more work to be done | |
177 | Result_t* result; | |
178 | MPI_Status status; | |
179 | while (1) | |
180 | { | |
181 | // If no slave is active, job is over | |
182 | int atLeastOneSlaveActive = 0; | |
183 | for (int rank = 1; rank < ntasks; rank++) | |
184 | { | |
185 | if (busy_slave[rank]) | |
186 | { | |
187 | atLeastOneSlaveActive = 1; | |
188 | break; | |
189 | } | |
190 | } | |
191 | if (!atLeastOneSlaveActive) | |
192 | break; | |
193 | ||
194 | // Receive results from a slave | |
195 | MPI_Recv(packedResult, result_message_length, MPI_BYTE, MPI_ANY_SOURCE, | |
196 | WORKTAG, MPI_COMM_WORLD, &status); | |
197 | result = unpack_result(packedResult); | |
198 | fprintf(stdout, "0 / Receive result from rank=%i on %s\n",status.MPI_SOURCE,inputFileName); | |
199 | ||
200 | // 'binarize' the result (only series' values) returned by the slave | |
201 | binaryResult_to_file(result, inputFileName, outputFileName); | |
202 | free_result(result); | |
203 | newSeriesCount += nbClusters; | |
204 | ||
205 | if ((randomize && sentSeriesCount < totalNbSeries) | |
206 | || (!randomize && lastEndIndex < totalNbSeries)) | |
207 | { | |
208 | // Get the next unit of work to be done | |
209 | work = get_next_work(inputFileName, totalNbSeries, nbSeriesInChunk, | |
210 | idealNbSeriesInChunk, jobsSentCount, lastEndIndex, nbClusters, | |
211 | clustOnMedoids, randomize, p_for_dissims); | |
212 | ||
213 | if (randomize) | |
214 | sentSeriesCount += nbSeriesInChunk; | |
215 | else | |
216 | { | |
217 | lastEndIndex = lastEndIndex + work->nbSeries; | |
218 | jobsSentCount++; | |
219 | } | |
220 | ||
221 | // Send the slave a new work unit | |
222 | pack_work(work, nbSeriesInChunk, packedWork); | |
223 | free_work(work); | |
224 | fprintf(stdout, "0 / Send work %s to rank=%i / %u\n",inputFileName, status.MPI_SOURCE, | |
225 | (randomize ? sentSeriesCount : lastEndIndex)); | |
226 | MPI_Send(packedWork, work_message_length, MPI_BYTE, | |
227 | status.MPI_SOURCE, WORKTAG, MPI_COMM_WORLD); | |
228 | } | |
229 | ||
230 | else | |
231 | // No more work to do | |
232 | busy_slave[status.MPI_SOURCE] = 0; | |
233 | } | |
234 | ||
235 | // There's no more work to be done, so receive all results from the slaves. | |
236 | for (int rank = 1; rank < ntasks; rank++) | |
237 | { | |
238 | if (busy_slave[rank]) | |
239 | { | |
240 | MPI_Recv(packedResult, result_message_length, MPI_BYTE, | |
241 | rank, WORKTAG, MPI_COMM_WORLD, &status); | |
242 | result = unpack_result(packedResult); | |
243 | ||
244 | // 'binarize' the result (only series' values) returned by the slave | |
245 | binaryResult_to_file(result, inputFileName, outputFileName); | |
246 | free_result(result); | |
247 | newSeriesCount += nbClusters; | |
248 | } | |
249 | } | |
250 | free(busy_slave); | |
251 | ||
252 | // Finalize output file: write total number of series inside it, and tsLength | |
253 | ofile = fopen(outputFileName, "r+b"); //read and write, binary | |
254 | fseek(ofile, 0, SEEK_SET); | |
255 | Byte intBuffer[4]; | |
256 | write_int(newSeriesCount, 4, intBuffer); | |
257 | fwrite(intBuffer, 1, 4, ofile); | |
258 | write_int(tsLength, 4, intBuffer); | |
259 | fwrite(intBuffer, 1, 4, ofile); | |
260 | fclose(ofile); | |
261 | } | |
262 | ||
263 | // generate random temporary file names | |
264 | static char* get_unique_name() | |
265 | { | |
266 | size_t nbDigits = 7; //rather arbitrary | |
267 | size_t stringLength = 5 + nbDigits; //5 for '.tmp/' | |
268 | char* s = (char*) malloc(stringLength + 1); | |
269 | s[0] = '.'; | |
270 | s[1] = 't'; | |
271 | s[2] = 'm'; | |
272 | s[3] = 'p'; | |
273 | s[4] = '/'; | |
274 | for (int i=0; i<nbDigits; i++) | |
275 | s[5+i] = '0' + get_rand_int() % 10; | |
276 | s[stringLength] = 0; | |
277 | return s; | |
278 | } | |
279 | ||
280 | // code executed by master process | |
281 | void master_run(char* mainInputFileName, uint32_t totalNbSeries, uint32_t nbSeriesInChunk, | |
282 | double idealNbSeriesInChunk, uint32_t tsLength, uint32_t nbClusters, | |
283 | int randomize, uint32_t p_for_dissims) | |
284 | { | |
285 | // Basic sanity check: nbClusters must be clearly less than series count per chunk | |
286 | if (10 * nbClusters >= nbSeriesInChunk) | |
287 | { | |
288 | fprintf(stdout, "WARNING: cluster size (%u) may be too high compared with chunk size (%u).\n", | |
289 | nbClusters, nbSeriesInChunk); | |
290 | } | |
291 | ||
292 | // Find out how many processes there are in the default communicator | |
293 | int ntasks; | |
294 | MPI_Comm_size(MPI_COMM_WORLD, &ntasks); | |
295 | if (ntasks <= 1) | |
296 | { | |
297 | fprintf(stderr,"No slaves available (only master running).\n"); | |
298 | return; | |
299 | } | |
300 | ||
301 | // initializations | |
302 | char* inputFileName = mainInputFileName; | |
303 | char* outputFileName; | |
304 | ||
305 | uint32_t nbSeriesInFile = totalNbSeries; | |
306 | ||
307 | while (nbSeriesInFile > nbSeriesInChunk) | |
308 | { | |
309 | outputFileName = get_unique_name(); | |
310 | uint32_t clustOnMedoids = (nbSeriesInFile < totalNbSeries ? 1 : 0); | |
311 | clusters_reduce(inputFileName, outputFileName, ntasks, nbSeriesInFile, nbSeriesInChunk, | |
312 | idealNbSeriesInChunk, tsLength, nbClusters, clustOnMedoids, | |
313 | randomize, p_for_dissims); | |
314 | ||
315 | // read nbSeries in outputFile | |
316 | nbSeriesInFile = get_nbSeries(outputFileName); | |
317 | ||
318 | // update file names | |
319 | if (strcmp(mainInputFileName, inputFileName) != 0) | |
320 | { | |
321 | // No need to keep every intermediate binary | |
322 | unlink(inputFileName); | |
323 | free(inputFileName); | |
324 | } | |
325 | inputFileName = outputFileName; | |
326 | } | |
327 | ||
328 | // read nbSeries in inputFileName (the last one) | |
329 | // we know that there is at most 'nbSeriesInChunk' series in it | |
330 | nbSeriesInFile = get_nbSeries(inputFileName); | |
331 | ||
332 | // Expected size of a Work message in bytes: | |
333 | uint32_t work_message_length = get_packedWork_length(nbSeriesInChunk); | |
334 | Byte packedWork[work_message_length]; | |
335 | ||
336 | // Expected size of a Result message in bytes: (uint32_t is on 4 bytes) | |
337 | uint32_t result_message_length = get_packedResult_length(nbClusters); | |
338 | Byte packedResult[result_message_length]; | |
339 | ||
340 | // Run a last task by some slave, and get final result | |
341 | Work_t* work = get_next_work(inputFileName, nbSeriesInFile, nbSeriesInChunk, | |
342 | (double)nbSeriesInFile, 0, 0, nbClusters, 1, 0, p_for_dissims); | |
343 | ||
344 | // Send the slave a new work unit | |
345 | pack_work(work, nbSeriesInChunk, packedWork); | |
346 | free_work(work); | |
347 | int selectedSlaveRank = get_rand_int() % (ntasks - 1) + 1; | |
348 | fprintf(stdout, "0 / Send final work %s to rank=%i / %u\n", | |
349 | inputFileName, selectedSlaveRank, nbSeriesInFile); | |
350 | MPI_Send(packedWork, work_message_length, MPI_BYTE, selectedSlaveRank, | |
351 | WORKTAG, MPI_COMM_WORLD); | |
352 | ||
353 | MPI_Status status; | |
354 | // Wait for him to finish | |
355 | MPI_Recv(packedResult, result_message_length, MPI_BYTE, selectedSlaveRank, | |
356 | WORKTAG, MPI_COMM_WORLD, &status); | |
357 | Result_t* finalResult = unpack_result(packedResult); | |
358 | fprintf(stdout, "0 / Receive final result from rank=%i on %s\n",status.MPI_SOURCE,inputFileName); | |
359 | ||
360 | //Tell all the slaves to exit by sending an empty message with the DIETAG. | |
361 | for (int rank = 1; rank < ntasks; ++rank) | |
362 | MPI_Send(0, 0, MPI_BYTE, rank, DIETAG, MPI_COMM_WORLD); | |
363 | ||
364 | const char finalBinaryName[] = "ppamFinalSeries.bin"; | |
365 | result_to_XML(finalResult, inputFileName, finalBinaryName, p_for_dissims); | |
366 | free_result(finalResult); | |
367 | ||
368 | // free memory | |
369 | if (strcmp(mainInputFileName, inputFileName)) | |
370 | { | |
371 | // Keep last input binary, but rename it | |
372 | rename(inputFileName, finalBinaryName); | |
373 | free(inputFileName); | |
374 | } | |
375 | else | |
376 | { | |
377 | // just symlink mainInputFileName | |
378 | if (!access(finalBinaryName, F_OK)) | |
379 | unlink(finalBinaryName); | |
380 | if (symlink(mainInputFileName, finalBinaryName)) | |
381 | fprintf(stderr,"Cannot create symlink to initial binary file.\n"); | |
382 | } | |
383 | } |