major folder reorganisation, R pkg is now epclust/ at first level. Experimental usage...
[epclust.git] / old_C_code / stage1 / src / MPI_Main / master.c
diff --git a/old_C_code/stage1/src/MPI_Main/master.c b/old_C_code/stage1/src/MPI_Main/master.c
new file mode 100644 (file)
index 0000000..dfecfde
--- /dev/null
@@ -0,0 +1,383 @@
+#include "Util/types.h"
+#include "Util/utils.h"
+#include <mpi.h>
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include <math.h>
+#include <unistd.h>
+#include "MPI_Communication/pack.h"
+#include "MPI_Communication/unpack.h"
+#include "Util/rng.h"
+
+// save the final result in XML format
+static void result_to_XML(Result_t* result, const char* inputFileName, 
+       const char* lastBinaryFileName, uint32_t p_for_dissims)
+{
+       uint32_t nbClusters = result->nbClusters;
+       FILE* ofile = fopen("ppamResult.xml", "w");
+       
+       fprintf(ofile, "<medoids>\n\n");
+
+       fprintf(ofile, "  <file>");
+       fprintf(ofile, "%s", lastBinaryFileName);
+       fprintf(ofile, "</file>\n\n");
+
+       fprintf(ofile, "  <p_for_dissims>");
+       fprintf(ofile, "%u", p_for_dissims);
+       fprintf(ofile, "</p_for_dissims>\n\n");
+       
+       fprintf(ofile, "  <IDs>\n");
+       for (uint32_t i=0; i<nbClusters; i++)
+               fprintf(ofile, "    <ID>%u</ID>\n", result->medoids_ID[i]);
+       fprintf(ofile, "  </IDs>\n\n");
+
+       // medoids ranks in last binary file (human printing: 0 --> 1 ...etc)
+       fprintf(ofile, "  <ranks>\n");
+       for (uint32_t i=0; i<nbClusters; i++)
+               fprintf(ofile, "    <rank>%u</rank>\n", result->medoids_ranks[i] + 1);
+       fprintf(ofile, "  </ranks>\n\n");
+
+       fprintf(ofile, "</medoids>\n");
+       fclose(ofile);
+}
+
+static void binaryResult_to_file(Result_t* result, const char* inputFileName,
+       const char* outputFileName)
+{
+       // Determine tsLength from inputFile
+       uint32_t tsLength = get_tsLength(inputFileName);
+
+       FILE* ifile = fopen(inputFileName, "rb");
+       FILE* ofile = fopen(outputFileName, "ab"); //'append binary'
+
+       Byte tsBuffer[tsLength];
+       for (uint32_t i = 0; i < result->nbClusters; i++)
+       {
+               // Get time-series in (binary) inputFile
+               fseek(ifile, 8 + result->medoids_ranks[i] * tsLength, SEEK_SET);
+               // Copy current serie onto ofile
+               
+               size_t lengthRead = fread(tsBuffer, 1, tsLength, ifile);
+               if (lengthRead != tsLength)
+                       fprintf(stderr, "problem while copying binary series to new file.\n");
+               fwrite(tsBuffer, 1, tsLength, ofile);
+       }
+
+       fclose(ifile);
+       fclose(ofile);
+}
+
+// fill a new unit of work suitable to be given to a slave
+static Work_t* get_next_work(char* inputFileName, uint32_t nbSeries, uint32_t nbSeriesInChunk,
+       double idealNbSeriesInChunk, uint32_t jobsSentCount, uint32_t lastEndIndex,
+       uint32_t nbClusters, uint32_t clustOnMedoids, 
+       int randomize, uint32_t p_for_dissims)
+{
+       Work_t* work = (Work_t*) malloc(sizeof(Work_t));
+
+       work->inputFileName = (char*) malloc(strlen(inputFileName) + 1);
+       strcpy(work->inputFileName, inputFileName);
+
+       if (randomize)
+               work->nbSeries = nbSeriesInChunk;
+       else
+       {
+               double adjustedNbSeriesInNextChunk =
+                       idealNbSeriesInChunk * (jobsSentCount + 1) - lastEndIndex;
+               // round to closest integer
+               work->nbSeries = (uint32_t)adjustedNbSeriesInNextChunk;
+               // stay below the upper bound (TODO: is this check required ?)
+               if (work->nbSeries > nbSeriesInChunk)
+                       work->nbSeries = nbSeriesInChunk;
+               // TODO: what about this one ? (probably useless)
+               if (lastEndIndex + work->nbSeries > nbSeries)
+                       work->nbSeries = nbSeries - lastEndIndex;
+       }
+
+       //TODO: ranks on uint64_t if more than 4.3 billion series at the same place (unlikely...)
+       work->ranks = (uint32_t*) malloc(work->nbSeries * sizeof(uint32_t));
+       for (uint32_t i = 0; i < work->nbSeries; i++)
+               work->ranks[i] = (randomize ? get_rand_int() % nbSeries : lastEndIndex + i);
+
+       work->nbClusters = nbClusters;
+       work->clustOnMedoids = clustOnMedoids;
+       work->p_for_dissims = p_for_dissims;
+
+       return work;
+}
+
+// process all subtasks and save binary results into a new file
+// NOTE: this file will be smaller than initial file (or DB...)
+static void clusters_reduce(char* inputFileName, char* outputFileName, uint32_t ntasks,
+       uint32_t totalNbSeries, uint32_t nbSeriesInChunk, double idealNbSeriesInChunk,
+       uint32_t tsLength, uint32_t nbClusters, uint32_t clustOnMedoids, 
+       int randomize, uint32_t p_for_dissims)
+{
+       FILE* ofile = fopen(outputFileName, "wb"); //'write binary'
+       // Leave a blank for series' count and tsLength
+       for (uint32_t i = 0; i < 8; i++)
+               fputc(0, ofile);
+       fclose(ofile);
+
+       uint32_t jobsSentCount = 0; //used if randomize==FALSE
+       uint32_t lastEndIndex = 0; //used if randomize==FALSE
+
+       uint32_t sentSeriesCount = 0; //used if randomize==TRUE
+
+       // Count series sent to binary file on output
+       uint32_t newSeriesCount = 0;
+
+       // Expected size of a Work message in bytes:
+       uint32_t work_message_length = get_packedWork_length(nbSeriesInChunk);
+       Byte packedWork[work_message_length];
+
+       // Expected size of a Result message in bytes: (uint32_t is on 4 bytes)
+       uint32_t result_message_length = 4 + 4 * nbClusters + 4 * nbClusters;
+       Byte packedResult[result_message_length];
+
+       // Seed the slaves; send one unit of work to each slave.
+       Work_t* work;
+       int* busy_slave = (int*) malloc(ntasks * sizeof(int));
+       for (int rank = 1; rank < ntasks; rank++)
+               busy_slave[rank] = 0;
+       for (int rank = 1; rank < ntasks; rank++)
+       {
+               // Find the next item of work to do
+               work = get_next_work(inputFileName, totalNbSeries, nbSeriesInChunk, idealNbSeriesInChunk,
+                       jobsSentCount, lastEndIndex, nbClusters, clustOnMedoids, 
+                       randomize, p_for_dissims);
+
+               if (randomize)
+                       sentSeriesCount += nbSeriesInChunk;
+               else
+               {
+                       lastEndIndex = lastEndIndex + work->nbSeries;
+                       jobsSentCount++;
+               }
+
+               // Send it to current rank
+               pack_work(work, nbSeriesInChunk, packedWork);
+               free_work(work);
+               fprintf(stdout, "0 / Send work %s to rank=%i / %u\n",inputFileName, rank, 
+                       (randomize ? sentSeriesCount : lastEndIndex));
+               MPI_Send(packedWork, work_message_length, MPI_BYTE, rank, WORKTAG, MPI_COMM_WORLD);
+
+               busy_slave[rank] = 1;
+
+               if ((randomize && sentSeriesCount >= 1.5*totalNbSeries) //TODO: 1.5 = heuristic, magic number...
+                       || (!randomize && lastEndIndex >= totalNbSeries))
+               {
+                       // Nothing more to read
+                       break;
+               }
+       }
+
+       // Loop over getting new work requests until there is no more work to be done
+       Result_t* result;
+       MPI_Status status;
+       while (1)
+       {
+               // If no slave is active, job is over
+               int atLeastOneSlaveActive = 0;
+               for (int rank = 1; rank < ntasks; rank++)
+               {
+                       if (busy_slave[rank])
+                       {
+                               atLeastOneSlaveActive = 1;
+                               break;
+                       }
+               }
+               if (!atLeastOneSlaveActive)
+                       break;
+
+               // Receive results from a slave
+               MPI_Recv(packedResult, result_message_length, MPI_BYTE, MPI_ANY_SOURCE,
+                       WORKTAG, MPI_COMM_WORLD, &status);
+               result = unpack_result(packedResult);
+               fprintf(stdout, "0 / Receive result from rank=%i on %s\n",status.MPI_SOURCE,inputFileName);
+
+               // 'binarize' the result (only series' values) returned by the slave
+               binaryResult_to_file(result, inputFileName, outputFileName);
+               free_result(result);
+               newSeriesCount += nbClusters;
+
+               if ((randomize && sentSeriesCount < totalNbSeries)
+                       || (!randomize && lastEndIndex < totalNbSeries))
+               {
+                       // Get the next unit of work to be done
+                       work = get_next_work(inputFileName, totalNbSeries, nbSeriesInChunk,
+                               idealNbSeriesInChunk, jobsSentCount, lastEndIndex, nbClusters, 
+                               clustOnMedoids, randomize, p_for_dissims);
+
+                       if (randomize)
+                               sentSeriesCount += nbSeriesInChunk;
+                       else
+                       {
+                               lastEndIndex = lastEndIndex + work->nbSeries;
+                               jobsSentCount++;
+                       }
+
+                       // Send the slave a new work unit
+                       pack_work(work, nbSeriesInChunk, packedWork);
+                       free_work(work);
+                       fprintf(stdout, "0 / Send work %s to rank=%i / %u\n",inputFileName, status.MPI_SOURCE, 
+                               (randomize ? sentSeriesCount : lastEndIndex));
+                       MPI_Send(packedWork, work_message_length, MPI_BYTE,
+                               status.MPI_SOURCE, WORKTAG, MPI_COMM_WORLD);
+               }
+
+               else
+                       // No more work to do
+                       busy_slave[status.MPI_SOURCE] = 0;
+       }
+
+       // There's no more work to be done, so receive all results from the slaves.
+       for (int rank = 1; rank < ntasks; rank++)
+       {
+               if (busy_slave[rank])
+               {
+                       MPI_Recv(packedResult, result_message_length, MPI_BYTE,
+                               rank, WORKTAG, MPI_COMM_WORLD, &status);
+                       result = unpack_result(packedResult);
+
+                       // 'binarize' the result (only series' values) returned by the slave
+                       binaryResult_to_file(result, inputFileName, outputFileName);
+                       free_result(result);
+                       newSeriesCount += nbClusters;
+               }
+       }
+       free(busy_slave);
+
+       // Finalize output file: write total number of series inside it, and tsLength
+       ofile = fopen(outputFileName, "r+b"); //read and write, binary
+       fseek(ofile, 0, SEEK_SET);
+       Byte intBuffer[4];
+       write_int(newSeriesCount, intBuffer);
+       fwrite(intBuffer, 1, 4, ofile);
+       write_int(tsLength, intBuffer);
+       fwrite(intBuffer, 1, 4, ofile);
+       fclose(ofile);
+}
+
+// generate random temporary file names
+static char* get_unique_name()
+{
+       size_t nbDigits = 7; //rather arbitrary
+       size_t stringLength = 5 + nbDigits; //5 for '.tmp/'
+       char* s = (char*) malloc(stringLength + 1);
+       s[0] = '.';
+       s[1] = 't';
+       s[2] = 'm';
+       s[3] = 'p';
+       s[4] = '/';
+       for (int i=0; i<nbDigits; i++)
+               s[5+i] = '0' + get_rand_int() % 10;
+       s[stringLength] = 0;
+       return s;
+}
+
+// code executed by master process
+void master_run(char* mainInputFileName, uint32_t totalNbSeries, uint32_t nbSeriesInChunk,
+       double idealNbSeriesInChunk, uint32_t tsLength, uint32_t nbClusters, 
+       int randomize, uint32_t p_for_dissims)
+{
+       // Basic sanity check: nbClusters must be clearly less than series count per chunk
+       if (10 * nbClusters >= nbSeriesInChunk)
+       {
+               fprintf(stdout, "WARNING: cluster size (%u) may be too high compared with chunk size (%u).\n",
+                       nbClusters, nbSeriesInChunk);
+       }
+       
+       // Find out how many processes there are in the default communicator
+       int ntasks;
+       MPI_Comm_size(MPI_COMM_WORLD, &ntasks);
+       if (ntasks <= 1)
+       {
+               fprintf(stderr,"No slaves available (only master running).\n");
+               return;
+       }
+       
+       // initializations
+       char* inputFileName = mainInputFileName;
+       char* outputFileName;
+
+       uint32_t nbSeriesInFile = totalNbSeries;
+
+       while (nbSeriesInFile > nbSeriesInChunk)
+       {
+               outputFileName = get_unique_name();
+               uint32_t clustOnMedoids = (nbSeriesInFile < totalNbSeries ? 1 : 0);
+               clusters_reduce(inputFileName, outputFileName, ntasks, nbSeriesInFile, nbSeriesInChunk,
+                       idealNbSeriesInChunk, tsLength, nbClusters, clustOnMedoids, 
+                       randomize, p_for_dissims);
+
+               // read nbSeries in outputFile
+               nbSeriesInFile = get_nbSeries(outputFileName);
+               
+               // update file names
+               if (strcmp(mainInputFileName, inputFileName) != 0)
+               {
+                       // No need to keep every intermediate binary
+                       unlink(inputFileName);
+                       free(inputFileName);
+               }
+               inputFileName = outputFileName;
+       }
+
+       // read nbSeries in inputFileName (the last one)
+       // we know that there is at most 'nbSeriesInChunk' series in it
+       nbSeriesInFile = get_nbSeries(inputFileName);
+
+       // Expected size of a Work message in bytes:
+       uint32_t work_message_length = get_packedWork_length(nbSeriesInChunk);
+       Byte packedWork[work_message_length];
+
+       // Expected size of a Result message in bytes: (uint32_t is on 4 bytes)
+       uint32_t result_message_length = get_packedResult_length(nbClusters);
+       Byte packedResult[result_message_length];
+
+       // Run a last task by some slave, and get final result
+       Work_t* work = get_next_work(inputFileName, nbSeriesInFile, nbSeriesInChunk,
+               (double)nbSeriesInFile, 0, 0, nbClusters, 1, 0, p_for_dissims);
+
+       // Send the slave a new work unit
+       pack_work(work, nbSeriesInChunk, packedWork);
+       free_work(work);
+       int selectedSlaveRank = get_rand_int() % (ntasks - 1) + 1;
+       fprintf(stdout, "0 / Send final work %s to rank=%i / %u\n", 
+               inputFileName, selectedSlaveRank, nbSeriesInFile);
+       MPI_Send(packedWork, work_message_length, MPI_BYTE, selectedSlaveRank,
+               WORKTAG, MPI_COMM_WORLD);
+
+       MPI_Status status;
+       // Wait for him to finish
+       MPI_Recv(packedResult, result_message_length, MPI_BYTE, selectedSlaveRank,
+                       WORKTAG, MPI_COMM_WORLD, &status);
+       Result_t* finalResult = unpack_result(packedResult);
+       fprintf(stdout, "0 / Receive final result from rank=%i on %s\n",status.MPI_SOURCE,inputFileName);
+
+       //Tell all the slaves to exit by sending an empty message with the DIETAG.
+       for (int rank = 1; rank < ntasks; ++rank)
+               MPI_Send(0, 0, MPI_BYTE, rank, DIETAG, MPI_COMM_WORLD);
+
+       const char finalBinaryName[] = "ppamFinalSeries.bin";
+       result_to_XML(finalResult, inputFileName, finalBinaryName, p_for_dissims);
+       free_result(finalResult);
+
+       // free memory
+       if (strcmp(mainInputFileName, inputFileName))
+       {
+               // Keep last input binary, but rename it
+               rename(inputFileName, finalBinaryName);
+               free(inputFileName);
+       }
+       else
+       {
+               // just symlink mainInputFileName
+               if (!access(finalBinaryName, F_OK))
+                       unlink(finalBinaryName);
+               if (symlink(mainInputFileName, finalBinaryName))
+                       fprintf(stderr,"Cannot create symlink to initial binary file.\n");
+       }
+}