throw away old code, prepare tests
[epclust.git] / old_C_code / stage1 / src / MPI_Main / master.c
diff --git a/old_C_code/stage1/src/MPI_Main/master.c b/old_C_code/stage1/src/MPI_Main/master.c
deleted file mode 100644 (file)
index dfecfde..0000000
+++ /dev/null
@@ -1,383 +0,0 @@
-#include "Util/types.h"
-#include "Util/utils.h"
-#include <mpi.h>
-#include <stdlib.h>
-#include <stdio.h>
-#include <string.h>
-#include <math.h>
-#include <unistd.h>
-#include "MPI_Communication/pack.h"
-#include "MPI_Communication/unpack.h"
-#include "Util/rng.h"
-
-// save the final result in XML format
-static void result_to_XML(Result_t* result, const char* inputFileName, 
-       const char* lastBinaryFileName, uint32_t p_for_dissims)
-{
-       uint32_t nbClusters = result->nbClusters;
-       FILE* ofile = fopen("ppamResult.xml", "w");
-       
-       fprintf(ofile, "<medoids>\n\n");
-
-       fprintf(ofile, "  <file>");
-       fprintf(ofile, "%s", lastBinaryFileName);
-       fprintf(ofile, "</file>\n\n");
-
-       fprintf(ofile, "  <p_for_dissims>");
-       fprintf(ofile, "%u", p_for_dissims);
-       fprintf(ofile, "</p_for_dissims>\n\n");
-       
-       fprintf(ofile, "  <IDs>\n");
-       for (uint32_t i=0; i<nbClusters; i++)
-               fprintf(ofile, "    <ID>%u</ID>\n", result->medoids_ID[i]);
-       fprintf(ofile, "  </IDs>\n\n");
-
-       // medoids ranks in last binary file (human printing: 0 --> 1 ...etc)
-       fprintf(ofile, "  <ranks>\n");
-       for (uint32_t i=0; i<nbClusters; i++)
-               fprintf(ofile, "    <rank>%u</rank>\n", result->medoids_ranks[i] + 1);
-       fprintf(ofile, "  </ranks>\n\n");
-
-       fprintf(ofile, "</medoids>\n");
-       fclose(ofile);
-}
-
-static void binaryResult_to_file(Result_t* result, const char* inputFileName,
-       const char* outputFileName)
-{
-       // Determine tsLength from inputFile
-       uint32_t tsLength = get_tsLength(inputFileName);
-
-       FILE* ifile = fopen(inputFileName, "rb");
-       FILE* ofile = fopen(outputFileName, "ab"); //'append binary'
-
-       Byte tsBuffer[tsLength];
-       for (uint32_t i = 0; i < result->nbClusters; i++)
-       {
-               // Get time-series in (binary) inputFile
-               fseek(ifile, 8 + result->medoids_ranks[i] * tsLength, SEEK_SET);
-               // Copy current serie onto ofile
-               
-               size_t lengthRead = fread(tsBuffer, 1, tsLength, ifile);
-               if (lengthRead != tsLength)
-                       fprintf(stderr, "problem while copying binary series to new file.\n");
-               fwrite(tsBuffer, 1, tsLength, ofile);
-       }
-
-       fclose(ifile);
-       fclose(ofile);
-}
-
-// fill a new unit of work suitable to be given to a slave
-static Work_t* get_next_work(char* inputFileName, uint32_t nbSeries, uint32_t nbSeriesInChunk,
-       double idealNbSeriesInChunk, uint32_t jobsSentCount, uint32_t lastEndIndex,
-       uint32_t nbClusters, uint32_t clustOnMedoids, 
-       int randomize, uint32_t p_for_dissims)
-{
-       Work_t* work = (Work_t*) malloc(sizeof(Work_t));
-
-       work->inputFileName = (char*) malloc(strlen(inputFileName) + 1);
-       strcpy(work->inputFileName, inputFileName);
-
-       if (randomize)
-               work->nbSeries = nbSeriesInChunk;
-       else
-       {
-               double adjustedNbSeriesInNextChunk =
-                       idealNbSeriesInChunk * (jobsSentCount + 1) - lastEndIndex;
-               // round to closest integer
-               work->nbSeries = (uint32_t)adjustedNbSeriesInNextChunk;
-               // stay below the upper bound (TODO: is this check required ?)
-               if (work->nbSeries > nbSeriesInChunk)
-                       work->nbSeries = nbSeriesInChunk;
-               // TODO: what about this one ? (probably useless)
-               if (lastEndIndex + work->nbSeries > nbSeries)
-                       work->nbSeries = nbSeries - lastEndIndex;
-       }
-
-       //TODO: ranks on uint64_t if more than 4.3 billion series at the same place (unlikely...)
-       work->ranks = (uint32_t*) malloc(work->nbSeries * sizeof(uint32_t));
-       for (uint32_t i = 0; i < work->nbSeries; i++)
-               work->ranks[i] = (randomize ? get_rand_int() % nbSeries : lastEndIndex + i);
-
-       work->nbClusters = nbClusters;
-       work->clustOnMedoids = clustOnMedoids;
-       work->p_for_dissims = p_for_dissims;
-
-       return work;
-}
-
-// process all subtasks and save binary results into a new file
-// NOTE: this file will be smaller than initial file (or DB...)
-static void clusters_reduce(char* inputFileName, char* outputFileName, uint32_t ntasks,
-       uint32_t totalNbSeries, uint32_t nbSeriesInChunk, double idealNbSeriesInChunk,
-       uint32_t tsLength, uint32_t nbClusters, uint32_t clustOnMedoids, 
-       int randomize, uint32_t p_for_dissims)
-{
-       FILE* ofile = fopen(outputFileName, "wb"); //'write binary'
-       // Leave a blank for series' count and tsLength
-       for (uint32_t i = 0; i < 8; i++)
-               fputc(0, ofile);
-       fclose(ofile);
-
-       uint32_t jobsSentCount = 0; //used if randomize==FALSE
-       uint32_t lastEndIndex = 0; //used if randomize==FALSE
-
-       uint32_t sentSeriesCount = 0; //used if randomize==TRUE
-
-       // Count series sent to binary file on output
-       uint32_t newSeriesCount = 0;
-
-       // Expected size of a Work message in bytes:
-       uint32_t work_message_length = get_packedWork_length(nbSeriesInChunk);
-       Byte packedWork[work_message_length];
-
-       // Expected size of a Result message in bytes: (uint32_t is on 4 bytes)
-       uint32_t result_message_length = 4 + 4 * nbClusters + 4 * nbClusters;
-       Byte packedResult[result_message_length];
-
-       // Seed the slaves; send one unit of work to each slave.
-       Work_t* work;
-       int* busy_slave = (int*) malloc(ntasks * sizeof(int));
-       for (int rank = 1; rank < ntasks; rank++)
-               busy_slave[rank] = 0;
-       for (int rank = 1; rank < ntasks; rank++)
-       {
-               // Find the next item of work to do
-               work = get_next_work(inputFileName, totalNbSeries, nbSeriesInChunk, idealNbSeriesInChunk,
-                       jobsSentCount, lastEndIndex, nbClusters, clustOnMedoids, 
-                       randomize, p_for_dissims);
-
-               if (randomize)
-                       sentSeriesCount += nbSeriesInChunk;
-               else
-               {
-                       lastEndIndex = lastEndIndex + work->nbSeries;
-                       jobsSentCount++;
-               }
-
-               // Send it to current rank
-               pack_work(work, nbSeriesInChunk, packedWork);
-               free_work(work);
-               fprintf(stdout, "0 / Send work %s to rank=%i / %u\n",inputFileName, rank, 
-                       (randomize ? sentSeriesCount : lastEndIndex));
-               MPI_Send(packedWork, work_message_length, MPI_BYTE, rank, WORKTAG, MPI_COMM_WORLD);
-
-               busy_slave[rank] = 1;
-
-               if ((randomize && sentSeriesCount >= 1.5*totalNbSeries) //TODO: 1.5 = heuristic, magic number...
-                       || (!randomize && lastEndIndex >= totalNbSeries))
-               {
-                       // Nothing more to read
-                       break;
-               }
-       }
-
-       // Loop over getting new work requests until there is no more work to be done
-       Result_t* result;
-       MPI_Status status;
-       while (1)
-       {
-               // If no slave is active, job is over
-               int atLeastOneSlaveActive = 0;
-               for (int rank = 1; rank < ntasks; rank++)
-               {
-                       if (busy_slave[rank])
-                       {
-                               atLeastOneSlaveActive = 1;
-                               break;
-                       }
-               }
-               if (!atLeastOneSlaveActive)
-                       break;
-
-               // Receive results from a slave
-               MPI_Recv(packedResult, result_message_length, MPI_BYTE, MPI_ANY_SOURCE,
-                       WORKTAG, MPI_COMM_WORLD, &status);
-               result = unpack_result(packedResult);
-               fprintf(stdout, "0 / Receive result from rank=%i on %s\n",status.MPI_SOURCE,inputFileName);
-
-               // 'binarize' the result (only series' values) returned by the slave
-               binaryResult_to_file(result, inputFileName, outputFileName);
-               free_result(result);
-               newSeriesCount += nbClusters;
-
-               if ((randomize && sentSeriesCount < totalNbSeries)
-                       || (!randomize && lastEndIndex < totalNbSeries))
-               {
-                       // Get the next unit of work to be done
-                       work = get_next_work(inputFileName, totalNbSeries, nbSeriesInChunk,
-                               idealNbSeriesInChunk, jobsSentCount, lastEndIndex, nbClusters, 
-                               clustOnMedoids, randomize, p_for_dissims);
-
-                       if (randomize)
-                               sentSeriesCount += nbSeriesInChunk;
-                       else
-                       {
-                               lastEndIndex = lastEndIndex + work->nbSeries;
-                               jobsSentCount++;
-                       }
-
-                       // Send the slave a new work unit
-                       pack_work(work, nbSeriesInChunk, packedWork);
-                       free_work(work);
-                       fprintf(stdout, "0 / Send work %s to rank=%i / %u\n",inputFileName, status.MPI_SOURCE, 
-                               (randomize ? sentSeriesCount : lastEndIndex));
-                       MPI_Send(packedWork, work_message_length, MPI_BYTE,
-                               status.MPI_SOURCE, WORKTAG, MPI_COMM_WORLD);
-               }
-
-               else
-                       // No more work to do
-                       busy_slave[status.MPI_SOURCE] = 0;
-       }
-
-       // There's no more work to be done, so receive all results from the slaves.
-       for (int rank = 1; rank < ntasks; rank++)
-       {
-               if (busy_slave[rank])
-               {
-                       MPI_Recv(packedResult, result_message_length, MPI_BYTE,
-                               rank, WORKTAG, MPI_COMM_WORLD, &status);
-                       result = unpack_result(packedResult);
-
-                       // 'binarize' the result (only series' values) returned by the slave
-                       binaryResult_to_file(result, inputFileName, outputFileName);
-                       free_result(result);
-                       newSeriesCount += nbClusters;
-               }
-       }
-       free(busy_slave);
-
-       // Finalize output file: write total number of series inside it, and tsLength
-       ofile = fopen(outputFileName, "r+b"); //read and write, binary
-       fseek(ofile, 0, SEEK_SET);
-       Byte intBuffer[4];
-       write_int(newSeriesCount, intBuffer);
-       fwrite(intBuffer, 1, 4, ofile);
-       write_int(tsLength, intBuffer);
-       fwrite(intBuffer, 1, 4, ofile);
-       fclose(ofile);
-}
-
-// generate random temporary file names
-static char* get_unique_name()
-{
-       size_t nbDigits = 7; //rather arbitrary
-       size_t stringLength = 5 + nbDigits; //5 for '.tmp/'
-       char* s = (char*) malloc(stringLength + 1);
-       s[0] = '.';
-       s[1] = 't';
-       s[2] = 'm';
-       s[3] = 'p';
-       s[4] = '/';
-       for (int i=0; i<nbDigits; i++)
-               s[5+i] = '0' + get_rand_int() % 10;
-       s[stringLength] = 0;
-       return s;
-}
-
-// code executed by master process
-void master_run(char* mainInputFileName, uint32_t totalNbSeries, uint32_t nbSeriesInChunk,
-       double idealNbSeriesInChunk, uint32_t tsLength, uint32_t nbClusters, 
-       int randomize, uint32_t p_for_dissims)
-{
-       // Basic sanity check: nbClusters must be clearly less than series count per chunk
-       if (10 * nbClusters >= nbSeriesInChunk)
-       {
-               fprintf(stdout, "WARNING: cluster size (%u) may be too high compared with chunk size (%u).\n",
-                       nbClusters, nbSeriesInChunk);
-       }
-       
-       // Find out how many processes there are in the default communicator
-       int ntasks;
-       MPI_Comm_size(MPI_COMM_WORLD, &ntasks);
-       if (ntasks <= 1)
-       {
-               fprintf(stderr,"No slaves available (only master running).\n");
-               return;
-       }
-       
-       // initializations
-       char* inputFileName = mainInputFileName;
-       char* outputFileName;
-
-       uint32_t nbSeriesInFile = totalNbSeries;
-
-       while (nbSeriesInFile > nbSeriesInChunk)
-       {
-               outputFileName = get_unique_name();
-               uint32_t clustOnMedoids = (nbSeriesInFile < totalNbSeries ? 1 : 0);
-               clusters_reduce(inputFileName, outputFileName, ntasks, nbSeriesInFile, nbSeriesInChunk,
-                       idealNbSeriesInChunk, tsLength, nbClusters, clustOnMedoids, 
-                       randomize, p_for_dissims);
-
-               // read nbSeries in outputFile
-               nbSeriesInFile = get_nbSeries(outputFileName);
-               
-               // update file names
-               if (strcmp(mainInputFileName, inputFileName) != 0)
-               {
-                       // No need to keep every intermediate binary
-                       unlink(inputFileName);
-                       free(inputFileName);
-               }
-               inputFileName = outputFileName;
-       }
-
-       // read nbSeries in inputFileName (the last one)
-       // we know that there is at most 'nbSeriesInChunk' series in it
-       nbSeriesInFile = get_nbSeries(inputFileName);
-
-       // Expected size of a Work message in bytes:
-       uint32_t work_message_length = get_packedWork_length(nbSeriesInChunk);
-       Byte packedWork[work_message_length];
-
-       // Expected size of a Result message in bytes: (uint32_t is on 4 bytes)
-       uint32_t result_message_length = get_packedResult_length(nbClusters);
-       Byte packedResult[result_message_length];
-
-       // Run a last task by some slave, and get final result
-       Work_t* work = get_next_work(inputFileName, nbSeriesInFile, nbSeriesInChunk,
-               (double)nbSeriesInFile, 0, 0, nbClusters, 1, 0, p_for_dissims);
-
-       // Send the slave a new work unit
-       pack_work(work, nbSeriesInChunk, packedWork);
-       free_work(work);
-       int selectedSlaveRank = get_rand_int() % (ntasks - 1) + 1;
-       fprintf(stdout, "0 / Send final work %s to rank=%i / %u\n", 
-               inputFileName, selectedSlaveRank, nbSeriesInFile);
-       MPI_Send(packedWork, work_message_length, MPI_BYTE, selectedSlaveRank,
-               WORKTAG, MPI_COMM_WORLD);
-
-       MPI_Status status;
-       // Wait for him to finish
-       MPI_Recv(packedResult, result_message_length, MPI_BYTE, selectedSlaveRank,
-                       WORKTAG, MPI_COMM_WORLD, &status);
-       Result_t* finalResult = unpack_result(packedResult);
-       fprintf(stdout, "0 / Receive final result from rank=%i on %s\n",status.MPI_SOURCE,inputFileName);
-
-       //Tell all the slaves to exit by sending an empty message with the DIETAG.
-       for (int rank = 1; rank < ntasks; ++rank)
-               MPI_Send(0, 0, MPI_BYTE, rank, DIETAG, MPI_COMM_WORLD);
-
-       const char finalBinaryName[] = "ppamFinalSeries.bin";
-       result_to_XML(finalResult, inputFileName, finalBinaryName, p_for_dissims);
-       free_result(finalResult);
-
-       // free memory
-       if (strcmp(mainInputFileName, inputFileName))
-       {
-               // Keep last input binary, but rename it
-               rename(inputFileName, finalBinaryName);
-               free(inputFileName);
-       }
-       else
-       {
-               // just symlink mainInputFileName
-               if (!access(finalBinaryName, F_OK))
-                       unlink(finalBinaryName);
-               if (symlink(mainInputFileName, finalBinaryName))
-                       fprintf(stderr,"Cannot create symlink to initial binary file.\n");
-       }
-}