commit last state
[ppam-mpi.git] / code / src / MPI_Main / master.c
CommitLineData
81923e5c
BA
1#include "Util/types.h"
2#include "Util/utils.h"
3#include <mpi.h>
4#include <stdlib.h>
5#include <stdio.h>
6#include <string.h>
7#include <math.h>
8#include <unistd.h>
9#include "MPI_Communication/pack.h"
10#include "MPI_Communication/unpack.h"
11#include "Util/rng.h"
12
13// save the final result in XML format
14static void result_to_XML(Result_t* result, const char* inputFileName,
15 const char* lastBinaryFileName, uint32_t p_for_dissims)
16{
17 uint32_t nbClusters = result->nbClusters;
18 FILE* ofile = fopen("ppamResult.xml", "w");
19
20 fprintf(ofile, "<medoids>\n\n");
21
22 fprintf(ofile, " <file>");
23 fprintf(ofile, "%s", lastBinaryFileName);
24 fprintf(ofile, "</file>\n\n");
25
26 fprintf(ofile, " <p_for_dissims>");
27 fprintf(ofile, "%u", p_for_dissims);
28 fprintf(ofile, "</p_for_dissims>\n\n");
29
30 fprintf(ofile, " <IDs>\n");
31 for (uint32_t i=0; i<nbClusters; i++)
32 fprintf(ofile, " <ID>%u</ID>\n", result->medoids_ID[i]);
33 fprintf(ofile, " </IDs>\n\n");
34
35 // medoids ranks in last binary file (human printing: 0 --> 1 ...etc)
36 fprintf(ofile, " <ranks>\n");
37 for (uint32_t i=0; i<nbClusters; i++)
38 fprintf(ofile, " <rank>%u</rank>\n", result->medoids_ranks[i] + 1);
39 fprintf(ofile, " </ranks>\n\n");
40
41 fprintf(ofile, "</medoids>\n");
42 fclose(ofile);
43}
44
45static void binaryResult_to_file(Result_t* result, const char* inputFileName,
46 const char* outputFileName)
47{
48 // Determine tsLength from inputFile
49 uint32_t tsLength = get_tsLength(inputFileName);
50
51 FILE* ifile = fopen(inputFileName, "rb");
52 FILE* ofile = fopen(outputFileName, "ab"); //'append binary'
53
54 Byte tsBuffer[tsLength];
55 for (uint32_t i = 0; i < result->nbClusters; i++)
56 {
57 // Get time-series in (binary) inputFile
58 fseek(ifile, 8 + result->medoids_ranks[i] * tsLength, SEEK_SET);
59 // Copy current serie onto ofile
60
61 size_t lengthRead = fread(tsBuffer, 1, tsLength, ifile);
62 if (lengthRead != tsLength)
63 fprintf(stderr, "problem while copying binary series to new file.\n");
64 fwrite(tsBuffer, 1, tsLength, ofile);
65 }
66
67 fclose(ifile);
68 fclose(ofile);
69}
70
71// fill a new unit of work suitable to be given to a slave
72static Work_t* get_next_work(char* inputFileName, uint32_t nbSeries, uint32_t nbSeriesInChunk,
73 double idealNbSeriesInChunk, uint32_t jobsSentCount, uint32_t lastEndIndex,
74 uint32_t nbClusters, uint32_t clustOnMedoids,
75 int randomize, uint32_t p_for_dissims)
76{
77 Work_t* work = (Work_t*) malloc(sizeof(Work_t));
78
79 work->inputFileName = (char*) malloc(strlen(inputFileName) + 1);
80 strcpy(work->inputFileName, inputFileName);
81
82 if (randomize)
83 work->nbSeries = nbSeriesInChunk;
84 else
85 {
86 double adjustedNbSeriesInNextChunk =
87 idealNbSeriesInChunk * (jobsSentCount + 1) - lastEndIndex;
88 // round to closest integer
89 work->nbSeries = (uint32_t)adjustedNbSeriesInNextChunk;
90 // stay below the upper bound (TODO: is this check required ?)
91 if (work->nbSeries > nbSeriesInChunk)
92 work->nbSeries = nbSeriesInChunk;
93 // TODO: what about this one ? (probably useless)
94 if (lastEndIndex + work->nbSeries > nbSeries)
95 work->nbSeries = nbSeries - lastEndIndex;
96 }
97
98 //TODO: ranks on uint64_t if more than 4.3 billion series at the same place (unlikely...)
99 work->ranks = (uint32_t*) malloc(work->nbSeries * sizeof(uint32_t));
100 for (uint32_t i = 0; i < work->nbSeries; i++)
101 work->ranks[i] = (randomize ? get_rand_int() % nbSeries : lastEndIndex + i);
102
103 work->nbClusters = nbClusters;
104 work->clustOnMedoids = clustOnMedoids;
105 work->p_for_dissims = p_for_dissims;
106
107 return work;
108}
109
110// process all subtasks and save binary results into a new file
111// NOTE: this file will be smaller than initial file (or DB...)
112static void clusters_reduce(char* inputFileName, char* outputFileName, uint32_t ntasks,
113 uint32_t totalNbSeries, uint32_t nbSeriesInChunk, double idealNbSeriesInChunk,
114 uint32_t tsLength, uint32_t nbClusters, uint32_t clustOnMedoids,
115 int randomize, uint32_t p_for_dissims)
116{
117 FILE* ofile = fopen(outputFileName, "wb"); //'write binary'
118 // Leave a blank for series' count and tsLength
119 for (uint32_t i = 0; i < 8; i++)
120 fputc(0, ofile);
121 fclose(ofile);
122
123 uint32_t jobsSentCount = 0; //used if randomize==FALSE
124 uint32_t lastEndIndex = 0; //used if randomize==FALSE
125
126 uint32_t sentSeriesCount = 0; //used if randomize==TRUE
127
128 // Count series sent to binary file on output
129 uint32_t newSeriesCount = 0;
130
131 // Expected size of a Work message in bytes:
132 uint32_t work_message_length = get_packedWork_length(nbSeriesInChunk);
133 Byte packedWork[work_message_length];
134
135 // Expected size of a Result message in bytes: (uint32_t is on 4 bytes)
136 uint32_t result_message_length = 4 + 4 * nbClusters + 4 * nbClusters;
137 Byte packedResult[result_message_length];
138
139 // Seed the slaves; send one unit of work to each slave.
140 Work_t* work;
141 int* busy_slave = (int*) malloc(ntasks * sizeof(int));
142 for (int rank = 1; rank < ntasks; rank++)
143 busy_slave[rank] = 0;
144 for (int rank = 1; rank < ntasks; rank++)
145 {
146 // Find the next item of work to do
147 work = get_next_work(inputFileName, totalNbSeries, nbSeriesInChunk, idealNbSeriesInChunk,
148 jobsSentCount, lastEndIndex, nbClusters, clustOnMedoids,
149 randomize, p_for_dissims);
150
151 if (randomize)
152 sentSeriesCount += nbSeriesInChunk;
153 else
154 {
155 lastEndIndex = lastEndIndex + work->nbSeries;
156 jobsSentCount++;
157 }
158
159 // Send it to current rank
160 pack_work(work, nbSeriesInChunk, packedWork);
161 free_work(work);
162 fprintf(stdout, "0 / Send work %s to rank=%i / %u\n",inputFileName, rank,
163 (randomize ? sentSeriesCount : lastEndIndex));
164 MPI_Send(packedWork, work_message_length, MPI_BYTE, rank, WORKTAG, MPI_COMM_WORLD);
165
166 busy_slave[rank] = 1;
167
168 if ((randomize && sentSeriesCount >= 1.5*totalNbSeries) //TODO: 1.5 = heuristic, magic number...
169 || (!randomize && lastEndIndex >= totalNbSeries))
170 {
171 // Nothing more to read
172 break;
173 }
174 }
175
176 // Loop over getting new work requests until there is no more work to be done
177 Result_t* result;
178 MPI_Status status;
179 while (1)
180 {
181 // If no slave is active, job is over
182 int atLeastOneSlaveActive = 0;
183 for (int rank = 1; rank < ntasks; rank++)
184 {
185 if (busy_slave[rank])
186 {
187 atLeastOneSlaveActive = 1;
188 break;
189 }
190 }
191 if (!atLeastOneSlaveActive)
192 break;
193
194 // Receive results from a slave
195 MPI_Recv(packedResult, result_message_length, MPI_BYTE, MPI_ANY_SOURCE,
196 WORKTAG, MPI_COMM_WORLD, &status);
197 result = unpack_result(packedResult);
198 fprintf(stdout, "0 / Receive result from rank=%i on %s\n",status.MPI_SOURCE,inputFileName);
199
200 // 'binarize' the result (only series' values) returned by the slave
201 binaryResult_to_file(result, inputFileName, outputFileName);
202 free_result(result);
203 newSeriesCount += nbClusters;
204
205 if ((randomize && sentSeriesCount < totalNbSeries)
206 || (!randomize && lastEndIndex < totalNbSeries))
207 {
208 // Get the next unit of work to be done
209 work = get_next_work(inputFileName, totalNbSeries, nbSeriesInChunk,
210 idealNbSeriesInChunk, jobsSentCount, lastEndIndex, nbClusters,
211 clustOnMedoids, randomize, p_for_dissims);
212
213 if (randomize)
214 sentSeriesCount += nbSeriesInChunk;
215 else
216 {
217 lastEndIndex = lastEndIndex + work->nbSeries;
218 jobsSentCount++;
219 }
220
221 // Send the slave a new work unit
222 pack_work(work, nbSeriesInChunk, packedWork);
223 free_work(work);
224 fprintf(stdout, "0 / Send work %s to rank=%i / %u\n",inputFileName, status.MPI_SOURCE,
225 (randomize ? sentSeriesCount : lastEndIndex));
226 MPI_Send(packedWork, work_message_length, MPI_BYTE,
227 status.MPI_SOURCE, WORKTAG, MPI_COMM_WORLD);
228 }
229
230 else
231 // No more work to do
232 busy_slave[status.MPI_SOURCE] = 0;
233 }
234
235 // There's no more work to be done, so receive all results from the slaves.
236 for (int rank = 1; rank < ntasks; rank++)
237 {
238 if (busy_slave[rank])
239 {
240 MPI_Recv(packedResult, result_message_length, MPI_BYTE,
241 rank, WORKTAG, MPI_COMM_WORLD, &status);
242 result = unpack_result(packedResult);
243
244 // 'binarize' the result (only series' values) returned by the slave
245 binaryResult_to_file(result, inputFileName, outputFileName);
246 free_result(result);
247 newSeriesCount += nbClusters;
248 }
249 }
250 free(busy_slave);
251
252 // Finalize output file: write total number of series inside it, and tsLength
253 ofile = fopen(outputFileName, "r+b"); //read and write, binary
254 fseek(ofile, 0, SEEK_SET);
255 Byte intBuffer[4];
256 write_int(newSeriesCount, 4, intBuffer);
257 fwrite(intBuffer, 1, 4, ofile);
258 write_int(tsLength, 4, intBuffer);
259 fwrite(intBuffer, 1, 4, ofile);
260 fclose(ofile);
261}
262
263// generate random temporary file names
264static char* get_unique_name()
265{
266 size_t nbDigits = 7; //rather arbitrary
267 size_t stringLength = 5 + nbDigits; //5 for '.tmp/'
268 char* s = (char*) malloc(stringLength + 1);
269 s[0] = '.';
270 s[1] = 't';
271 s[2] = 'm';
272 s[3] = 'p';
273 s[4] = '/';
274 for (int i=0; i<nbDigits; i++)
275 s[5+i] = '0' + get_rand_int() % 10;
276 s[stringLength] = 0;
277 return s;
278}
279
280// code executed by master process
281void master_run(char* mainInputFileName, uint32_t totalNbSeries, uint32_t nbSeriesInChunk,
282 double idealNbSeriesInChunk, uint32_t tsLength, uint32_t nbClusters,
283 int randomize, uint32_t p_for_dissims)
284{
285 // Basic sanity check: nbClusters must be clearly less than series count per chunk
286 if (10 * nbClusters >= nbSeriesInChunk)
287 {
288 fprintf(stdout, "WARNING: cluster size (%u) may be too high compared with chunk size (%u).\n",
289 nbClusters, nbSeriesInChunk);
290 }
291
292 // Find out how many processes there are in the default communicator
293 int ntasks;
294 MPI_Comm_size(MPI_COMM_WORLD, &ntasks);
295 if (ntasks <= 1)
296 {
297 fprintf(stderr,"No slaves available (only master running).\n");
298 return;
299 }
300
301 // initializations
302 char* inputFileName = mainInputFileName;
303 char* outputFileName;
304
305 uint32_t nbSeriesInFile = totalNbSeries;
306
307 while (nbSeriesInFile > nbSeriesInChunk)
308 {
309 outputFileName = get_unique_name();
310 uint32_t clustOnMedoids = (nbSeriesInFile < totalNbSeries ? 1 : 0);
311 clusters_reduce(inputFileName, outputFileName, ntasks, nbSeriesInFile, nbSeriesInChunk,
312 idealNbSeriesInChunk, tsLength, nbClusters, clustOnMedoids,
313 randomize, p_for_dissims);
314
315 // read nbSeries in outputFile
316 nbSeriesInFile = get_nbSeries(outputFileName);
317
318 // update file names
319 if (strcmp(mainInputFileName, inputFileName) != 0)
320 {
321 // No need to keep every intermediate binary
322 unlink(inputFileName);
323 free(inputFileName);
324 }
325 inputFileName = outputFileName;
326 }
327
328 // read nbSeries in inputFileName (the last one)
329 // we know that there is at most 'nbSeriesInChunk' series in it
330 nbSeriesInFile = get_nbSeries(inputFileName);
331
332 // Expected size of a Work message in bytes:
333 uint32_t work_message_length = get_packedWork_length(nbSeriesInChunk);
334 Byte packedWork[work_message_length];
335
336 // Expected size of a Result message in bytes: (uint32_t is on 4 bytes)
337 uint32_t result_message_length = get_packedResult_length(nbClusters);
338 Byte packedResult[result_message_length];
339
340 // Run a last task by some slave, and get final result
341 Work_t* work = get_next_work(inputFileName, nbSeriesInFile, nbSeriesInChunk,
342 (double)nbSeriesInFile, 0, 0, nbClusters, 1, 0, p_for_dissims);
343
344 // Send the slave a new work unit
345 pack_work(work, nbSeriesInChunk, packedWork);
346 free_work(work);
347 int selectedSlaveRank = get_rand_int() % (ntasks - 1) + 1;
348 fprintf(stdout, "0 / Send final work %s to rank=%i / %u\n",
349 inputFileName, selectedSlaveRank, nbSeriesInFile);
350 MPI_Send(packedWork, work_message_length, MPI_BYTE, selectedSlaveRank,
351 WORKTAG, MPI_COMM_WORLD);
352
353 MPI_Status status;
354 // Wait for him to finish
355 MPI_Recv(packedResult, result_message_length, MPI_BYTE, selectedSlaveRank,
356 WORKTAG, MPI_COMM_WORLD, &status);
357 Result_t* finalResult = unpack_result(packedResult);
358 fprintf(stdout, "0 / Receive final result from rank=%i on %s\n",status.MPI_SOURCE,inputFileName);
359
360 //Tell all the slaves to exit by sending an empty message with the DIETAG.
361 for (int rank = 1; rank < ntasks; ++rank)
362 MPI_Send(0, 0, MPI_BYTE, rank, DIETAG, MPI_COMM_WORLD);
363
364 const char finalBinaryName[] = "ppamFinalSeries.bin";
365 result_to_XML(finalResult, inputFileName, finalBinaryName, p_for_dissims);
366 free_result(finalResult);
367
368 // free memory
369 if (strcmp(mainInputFileName, inputFileName))
370 {
371 // Keep last input binary, but rename it
372 rename(inputFileName, finalBinaryName);
373 free(inputFileName);
374 }
375 else
376 {
377 // just symlink mainInputFileName
378 if (!access(finalBinaryName, F_OK))
379 unlink(finalBinaryName);
380 if (symlink(mainInputFileName, finalBinaryName))
381 fprintf(stderr,"Cannot create symlink to initial binary file.\n");
382 }
383}