1 #include "Util/types.h"
2 #include "Util/utils.h"
9 #include "MPI_Communication/pack.h"
10 #include "MPI_Communication/unpack.h"
13 // save the final result in XML format
14 static void result_to_XML(Result_t
* result
, const char* inputFileName
,
15 const char* lastBinaryFileName
, uint32_t p_for_dissims
)
17 uint32_t nbClusters
= result
->nbClusters
;
18 FILE* ofile
= fopen("ppamResult.xml", "w");
20 fprintf(ofile
, "<medoids>\n\n");
22 fprintf(ofile
, " <file>");
23 fprintf(ofile
, "%s", lastBinaryFileName
);
24 fprintf(ofile
, "</file>\n\n");
26 fprintf(ofile
, " <p_for_dissims>");
27 fprintf(ofile
, "%u", p_for_dissims
);
28 fprintf(ofile
, "</p_for_dissims>\n\n");
30 fprintf(ofile
, " <IDs>\n");
31 for (uint32_t i
=0; i
<nbClusters
; i
++)
32 fprintf(ofile
, " <ID>%u</ID>\n", result
->medoids_ID
[i
]);
33 fprintf(ofile
, " </IDs>\n\n");
35 // medoids ranks in last binary file (human printing: 0 --> 1 ...etc)
36 fprintf(ofile
, " <ranks>\n");
37 for (uint32_t i
=0; i
<nbClusters
; i
++)
38 fprintf(ofile
, " <rank>%u</rank>\n", result
->medoids_ranks
[i
] + 1);
39 fprintf(ofile
, " </ranks>\n\n");
41 fprintf(ofile
, "</medoids>\n");
45 static void binaryResult_to_file(Result_t
* result
, const char* inputFileName
,
46 const char* outputFileName
)
48 // Determine tsLength from inputFile
49 uint32_t tsLength
= get_tsLength(inputFileName
);
51 FILE* ifile
= fopen(inputFileName
, "rb");
52 FILE* ofile
= fopen(outputFileName
, "ab"); //'append binary'
54 Byte tsBuffer
[tsLength
];
55 for (uint32_t i
= 0; i
< result
->nbClusters
; i
++)
57 // Get time-series in (binary) inputFile
58 fseek(ifile
, 8 + result
->medoids_ranks
[i
] * tsLength
, SEEK_SET
);
59 // Copy current serie onto ofile
61 size_t lengthRead
= fread(tsBuffer
, 1, tsLength
, ifile
);
62 if (lengthRead
!= tsLength
)
63 fprintf(stderr
, "problem while copying binary series to new file.\n");
64 fwrite(tsBuffer
, 1, tsLength
, ofile
);
71 // fill a new unit of work suitable to be given to a slave
72 static Work_t
* get_next_work(char* inputFileName
, uint32_t nbSeries
, uint32_t nbSeriesInChunk
,
73 double idealNbSeriesInChunk
, uint32_t jobsSentCount
, uint32_t lastEndIndex
,
74 uint32_t nbClusters
, uint32_t clustOnMedoids
,
75 int randomize
, uint32_t p_for_dissims
)
77 Work_t
* work
= (Work_t
*) malloc(sizeof(Work_t
));
79 work
->inputFileName
= (char*) malloc(strlen(inputFileName
) + 1);
80 strcpy(work
->inputFileName
, inputFileName
);
83 work
->nbSeries
= nbSeriesInChunk
;
86 double adjustedNbSeriesInNextChunk
=
87 idealNbSeriesInChunk
* (jobsSentCount
+ 1) - lastEndIndex
;
88 // round to closest integer
89 work
->nbSeries
= (uint32_t)adjustedNbSeriesInNextChunk
;
90 // stay below the upper bound (TODO: is this check required ?)
91 if (work
->nbSeries
> nbSeriesInChunk
)
92 work
->nbSeries
= nbSeriesInChunk
;
93 // TODO: what about this one ? (probably useless)
94 if (lastEndIndex
+ work
->nbSeries
> nbSeries
)
95 work
->nbSeries
= nbSeries
- lastEndIndex
;
98 //TODO: ranks on uint64_t if more than 4.3 billion series at the same place (unlikely...)
99 work
->ranks
= (uint32_t*) malloc(work
->nbSeries
* sizeof(uint32_t));
100 for (uint32_t i
= 0; i
< work
->nbSeries
; i
++)
101 work
->ranks
[i
] = (randomize
? get_rand_int() % nbSeries
: lastEndIndex
+ i
);
103 work
->nbClusters
= nbClusters
;
104 work
->clustOnMedoids
= clustOnMedoids
;
105 work
->p_for_dissims
= p_for_dissims
;
110 // process all subtasks and save binary results into a new file
111 // NOTE: this file will be smaller than initial file (or DB...)
112 static void clusters_reduce(char* inputFileName
, char* outputFileName
, uint32_t ntasks
,
113 uint32_t totalNbSeries
, uint32_t nbSeriesInChunk
, double idealNbSeriesInChunk
,
114 uint32_t tsLength
, uint32_t nbClusters
, uint32_t clustOnMedoids
,
115 int randomize
, uint32_t p_for_dissims
)
117 FILE* ofile
= fopen(outputFileName
, "wb"); //'write binary'
118 // Leave a blank for series' count and tsLength
119 for (uint32_t i
= 0; i
< 8; i
++)
123 uint32_t jobsSentCount
= 0; //used if randomize==FALSE
124 uint32_t lastEndIndex
= 0; //used if randomize==FALSE
126 uint32_t sentSeriesCount
= 0; //used if randomize==TRUE
128 // Count series sent to binary file on output
129 uint32_t newSeriesCount
= 0;
131 // Expected size of a Work message in bytes:
132 uint32_t work_message_length
= get_packedWork_length(nbSeriesInChunk
);
133 Byte packedWork
[work_message_length
];
135 // Expected size of a Result message in bytes: (uint32_t is on 4 bytes)
136 uint32_t result_message_length
= 4 + 4 * nbClusters
+ 4 * nbClusters
;
137 Byte packedResult
[result_message_length
];
139 // Seed the slaves; send one unit of work to each slave.
141 int* busy_slave
= (int*) malloc(ntasks
* sizeof(int));
142 for (int rank
= 1; rank
< ntasks
; rank
++)
143 busy_slave
[rank
] = 0;
144 for (int rank
= 1; rank
< ntasks
; rank
++)
146 // Find the next item of work to do
147 work
= get_next_work(inputFileName
, totalNbSeries
, nbSeriesInChunk
, idealNbSeriesInChunk
,
148 jobsSentCount
, lastEndIndex
, nbClusters
, clustOnMedoids
,
149 randomize
, p_for_dissims
);
152 sentSeriesCount
+= nbSeriesInChunk
;
155 lastEndIndex
= lastEndIndex
+ work
->nbSeries
;
159 // Send it to current rank
160 pack_work(work
, nbSeriesInChunk
, packedWork
);
162 fprintf(stdout
, "0 / Send work %s to rank=%i / %u\n",inputFileName
, rank
,
163 (randomize
? sentSeriesCount
: lastEndIndex
));
164 MPI_Send(packedWork
, work_message_length
, MPI_BYTE
, rank
, WORKTAG
, MPI_COMM_WORLD
);
166 busy_slave
[rank
] = 1;
168 if ((randomize
&& sentSeriesCount
>= 1.5*totalNbSeries
) //TODO: 1.5 = heuristic, magic number...
169 || (!randomize
&& lastEndIndex
>= totalNbSeries
))
171 // Nothing more to read
176 // Loop over getting new work requests until there is no more work to be done
181 // If no slave is active, job is over
182 int atLeastOneSlaveActive
= 0;
183 for (int rank
= 1; rank
< ntasks
; rank
++)
185 if (busy_slave
[rank
])
187 atLeastOneSlaveActive
= 1;
191 if (!atLeastOneSlaveActive
)
194 // Receive results from a slave
195 MPI_Recv(packedResult
, result_message_length
, MPI_BYTE
, MPI_ANY_SOURCE
,
196 WORKTAG
, MPI_COMM_WORLD
, &status
);
197 result
= unpack_result(packedResult
);
198 fprintf(stdout
, "0 / Receive result from rank=%i on %s\n",status
.MPI_SOURCE
,inputFileName
);
200 // 'binarize' the result (only series' values) returned by the slave
201 binaryResult_to_file(result
, inputFileName
, outputFileName
);
203 newSeriesCount
+= nbClusters
;
205 if ((randomize
&& sentSeriesCount
< totalNbSeries
)
206 || (!randomize
&& lastEndIndex
< totalNbSeries
))
208 // Get the next unit of work to be done
209 work
= get_next_work(inputFileName
, totalNbSeries
, nbSeriesInChunk
,
210 idealNbSeriesInChunk
, jobsSentCount
, lastEndIndex
, nbClusters
,
211 clustOnMedoids
, randomize
, p_for_dissims
);
214 sentSeriesCount
+= nbSeriesInChunk
;
217 lastEndIndex
= lastEndIndex
+ work
->nbSeries
;
221 // Send the slave a new work unit
222 pack_work(work
, nbSeriesInChunk
, packedWork
);
224 fprintf(stdout
, "0 / Send work %s to rank=%i / %u\n",inputFileName
, status
.MPI_SOURCE
,
225 (randomize
? sentSeriesCount
: lastEndIndex
));
226 MPI_Send(packedWork
, work_message_length
, MPI_BYTE
,
227 status
.MPI_SOURCE
, WORKTAG
, MPI_COMM_WORLD
);
231 // No more work to do
232 busy_slave
[status
.MPI_SOURCE
] = 0;
235 // There's no more work to be done, so receive all results from the slaves.
236 for (int rank
= 1; rank
< ntasks
; rank
++)
238 if (busy_slave
[rank
])
240 MPI_Recv(packedResult
, result_message_length
, MPI_BYTE
,
241 rank
, WORKTAG
, MPI_COMM_WORLD
, &status
);
242 result
= unpack_result(packedResult
);
244 // 'binarize' the result (only series' values) returned by the slave
245 binaryResult_to_file(result
, inputFileName
, outputFileName
);
247 newSeriesCount
+= nbClusters
;
252 // Finalize output file: write total number of series inside it, and tsLength
253 ofile
= fopen(outputFileName
, "r+b"); //read and write, binary
254 fseek(ofile
, 0, SEEK_SET
);
256 write_int(newSeriesCount
, 4, intBuffer
);
257 fwrite(intBuffer
, 1, 4, ofile
);
258 write_int(tsLength
, 4, intBuffer
);
259 fwrite(intBuffer
, 1, 4, ofile
);
263 // generate random temporary file names
264 static char* get_unique_name()
266 size_t nbDigits
= 7; //rather arbitrary
267 size_t stringLength
= 5 + nbDigits
; //5 for '.tmp/'
268 char* s
= (char*) malloc(stringLength
+ 1);
274 for (int i
=0; i
<nbDigits
; i
++)
275 s
[5+i
] = '0' + get_rand_int() % 10;
280 // code executed by master process
281 void master_run(char* mainInputFileName
, uint32_t totalNbSeries
, uint32_t nbSeriesInChunk
,
282 double idealNbSeriesInChunk
, uint32_t tsLength
, uint32_t nbClusters
,
283 int randomize
, uint32_t p_for_dissims
)
285 // Basic sanity check: nbClusters must be clearly less than series count per chunk
286 if (10 * nbClusters
>= nbSeriesInChunk
)
288 fprintf(stdout
, "WARNING: cluster size (%u) may be too high compared with chunk size (%u).\n",
289 nbClusters
, nbSeriesInChunk
);
292 // Find out how many processes there are in the default communicator
294 MPI_Comm_size(MPI_COMM_WORLD
, &ntasks
);
297 fprintf(stderr
,"No slaves available (only master running).\n");
302 char* inputFileName
= mainInputFileName
;
303 char* outputFileName
;
305 uint32_t nbSeriesInFile
= totalNbSeries
;
307 while (nbSeriesInFile
> nbSeriesInChunk
)
309 outputFileName
= get_unique_name();
310 uint32_t clustOnMedoids
= (nbSeriesInFile
< totalNbSeries
? 1 : 0);
311 clusters_reduce(inputFileName
, outputFileName
, ntasks
, nbSeriesInFile
, nbSeriesInChunk
,
312 idealNbSeriesInChunk
, tsLength
, nbClusters
, clustOnMedoids
,
313 randomize
, p_for_dissims
);
315 // read nbSeries in outputFile
316 nbSeriesInFile
= get_nbSeries(outputFileName
);
319 if (strcmp(mainInputFileName
, inputFileName
) != 0)
321 // No need to keep every intermediate binary
322 unlink(inputFileName
);
325 inputFileName
= outputFileName
;
328 // read nbSeries in inputFileName (the last one)
329 // we know that there is at most 'nbSeriesInChunk' series in it
330 nbSeriesInFile
= get_nbSeries(inputFileName
);
332 // Expected size of a Work message in bytes:
333 uint32_t work_message_length
= get_packedWork_length(nbSeriesInChunk
);
334 Byte packedWork
[work_message_length
];
336 // Expected size of a Result message in bytes: (uint32_t is on 4 bytes)
337 uint32_t result_message_length
= get_packedResult_length(nbClusters
);
338 Byte packedResult
[result_message_length
];
340 // Run a last task by some slave, and get final result
341 Work_t
* work
= get_next_work(inputFileName
, nbSeriesInFile
, nbSeriesInChunk
,
342 (double)nbSeriesInFile
, 0, 0, nbClusters
, 1, 0, p_for_dissims
);
344 // Send the slave a new work unit
345 pack_work(work
, nbSeriesInChunk
, packedWork
);
347 int selectedSlaveRank
= get_rand_int() % (ntasks
- 1) + 1;
348 fprintf(stdout
, "0 / Send final work %s to rank=%i / %u\n",
349 inputFileName
, selectedSlaveRank
, nbSeriesInFile
);
350 MPI_Send(packedWork
, work_message_length
, MPI_BYTE
, selectedSlaveRank
,
351 WORKTAG
, MPI_COMM_WORLD
);
354 // Wait for him to finish
355 MPI_Recv(packedResult
, result_message_length
, MPI_BYTE
, selectedSlaveRank
,
356 WORKTAG
, MPI_COMM_WORLD
, &status
);
357 Result_t
* finalResult
= unpack_result(packedResult
);
358 fprintf(stdout
, "0 / Receive final result from rank=%i on %s\n",status
.MPI_SOURCE
,inputFileName
);
360 //Tell all the slaves to exit by sending an empty message with the DIETAG.
361 for (int rank
= 1; rank
< ntasks
; ++rank
)
362 MPI_Send(0, 0, MPI_BYTE
, rank
, DIETAG
, MPI_COMM_WORLD
);
364 const char finalBinaryName
[] = "ppamFinalSeries.bin";
365 result_to_XML(finalResult
, inputFileName
, finalBinaryName
, p_for_dissims
);
366 free_result(finalResult
);
369 if (strcmp(mainInputFileName
, inputFileName
))
371 // Keep last input binary, but rename it
372 rename(inputFileName
, finalBinaryName
);
377 // just symlink mainInputFileName
378 if (!access(finalBinaryName
, F_OK
))
379 unlink(finalBinaryName
);
380 if (symlink(mainInputFileName
, finalBinaryName
))
381 fprintf(stderr
,"Cannot create symlink to initial binary file.\n");