+++ /dev/null
-#include "Util/types.h"
-#include "Util/utils.h"
-#include <mpi.h>
-#include <stdlib.h>
-#include <stdio.h>
-#include <string.h>
-#include <math.h>
-#include <unistd.h>
-#include "MPI_Communication/pack.h"
-#include "MPI_Communication/unpack.h"
-#include "Util/rng.h"
-
-// save the final result in XML format
-static void result_to_XML(Result_t* result, const char* inputFileName,
- const char* lastBinaryFileName, uint32_t p_for_dissims)
-{
- uint32_t nbClusters = result->nbClusters;
- FILE* ofile = fopen("ppamResult.xml", "w");
-
- fprintf(ofile, "<medoids>\n\n");
-
- fprintf(ofile, " <file>");
- fprintf(ofile, "%s", lastBinaryFileName);
- fprintf(ofile, "</file>\n\n");
-
- fprintf(ofile, " <p_for_dissims>");
- fprintf(ofile, "%u", p_for_dissims);
- fprintf(ofile, "</p_for_dissims>\n\n");
-
- fprintf(ofile, " <IDs>\n");
- for (uint32_t i=0; i<nbClusters; i++)
- fprintf(ofile, " <ID>%u</ID>\n", result->medoids_ID[i]);
- fprintf(ofile, " </IDs>\n\n");
-
- // medoids ranks in last binary file (human printing: 0 --> 1 ...etc)
- fprintf(ofile, " <ranks>\n");
- for (uint32_t i=0; i<nbClusters; i++)
- fprintf(ofile, " <rank>%u</rank>\n", result->medoids_ranks[i] + 1);
- fprintf(ofile, " </ranks>\n\n");
-
- fprintf(ofile, "</medoids>\n");
- fclose(ofile);
-}
-
-static void binaryResult_to_file(Result_t* result, const char* inputFileName,
- const char* outputFileName)
-{
- // Determine tsLength from inputFile
- uint32_t tsLength = get_tsLength(inputFileName);
-
- FILE* ifile = fopen(inputFileName, "rb");
- FILE* ofile = fopen(outputFileName, "ab"); //'append binary'
-
- Byte tsBuffer[tsLength];
- for (uint32_t i = 0; i < result->nbClusters; i++)
- {
- // Get time-series in (binary) inputFile
- fseek(ifile, 8 + result->medoids_ranks[i] * tsLength, SEEK_SET);
- // Copy current serie onto ofile
-
- size_t lengthRead = fread(tsBuffer, 1, tsLength, ifile);
- if (lengthRead != tsLength)
- fprintf(stderr, "problem while copying binary series to new file.\n");
- fwrite(tsBuffer, 1, tsLength, ofile);
- }
-
- fclose(ifile);
- fclose(ofile);
-}
-
-// fill a new unit of work suitable to be given to a slave
-static Work_t* get_next_work(char* inputFileName, uint32_t nbSeries, uint32_t nbSeriesInChunk,
- double idealNbSeriesInChunk, uint32_t jobsSentCount, uint32_t lastEndIndex,
- uint32_t nbClusters, uint32_t clustOnMedoids,
- int randomize, uint32_t p_for_dissims)
-{
- Work_t* work = (Work_t*) malloc(sizeof(Work_t));
-
- work->inputFileName = (char*) malloc(strlen(inputFileName) + 1);
- strcpy(work->inputFileName, inputFileName);
-
- if (randomize)
- work->nbSeries = nbSeriesInChunk;
- else
- {
- double adjustedNbSeriesInNextChunk =
- idealNbSeriesInChunk * (jobsSentCount + 1) - lastEndIndex;
- // round to closest integer
- work->nbSeries = (uint32_t)adjustedNbSeriesInNextChunk;
- // stay below the upper bound (TODO: is this check required ?)
- if (work->nbSeries > nbSeriesInChunk)
- work->nbSeries = nbSeriesInChunk;
- // TODO: what about this one ? (probably useless)
- if (lastEndIndex + work->nbSeries > nbSeries)
- work->nbSeries = nbSeries - lastEndIndex;
- }
-
- //TODO: ranks on uint64_t if more than 4.3 billion series at the same place (unlikely...)
- work->ranks = (uint32_t*) malloc(work->nbSeries * sizeof(uint32_t));
- for (uint32_t i = 0; i < work->nbSeries; i++)
- work->ranks[i] = (randomize ? get_rand_int() % nbSeries : lastEndIndex + i);
-
- work->nbClusters = nbClusters;
- work->clustOnMedoids = clustOnMedoids;
- work->p_for_dissims = p_for_dissims;
-
- return work;
-}
-
-// process all subtasks and save binary results into a new file
-// NOTE: this file will be smaller than initial file (or DB...)
-static void clusters_reduce(char* inputFileName, char* outputFileName, uint32_t ntasks,
- uint32_t totalNbSeries, uint32_t nbSeriesInChunk, double idealNbSeriesInChunk,
- uint32_t tsLength, uint32_t nbClusters, uint32_t clustOnMedoids,
- int randomize, uint32_t p_for_dissims)
-{
- FILE* ofile = fopen(outputFileName, "wb"); //'write binary'
- // Leave a blank for series' count and tsLength
- for (uint32_t i = 0; i < 8; i++)
- fputc(0, ofile);
- fclose(ofile);
-
- uint32_t jobsSentCount = 0; //used if randomize==FALSE
- uint32_t lastEndIndex = 0; //used if randomize==FALSE
-
- uint32_t sentSeriesCount = 0; //used if randomize==TRUE
-
- // Count series sent to binary file on output
- uint32_t newSeriesCount = 0;
-
- // Expected size of a Work message in bytes:
- uint32_t work_message_length = get_packedWork_length(nbSeriesInChunk);
- Byte packedWork[work_message_length];
-
- // Expected size of a Result message in bytes: (uint32_t is on 4 bytes)
- uint32_t result_message_length = 4 + 4 * nbClusters + 4 * nbClusters;
- Byte packedResult[result_message_length];
-
- // Seed the slaves; send one unit of work to each slave.
- Work_t* work;
- int* busy_slave = (int*) malloc(ntasks * sizeof(int));
- for (int rank = 1; rank < ntasks; rank++)
- busy_slave[rank] = 0;
- for (int rank = 1; rank < ntasks; rank++)
- {
- // Find the next item of work to do
- work = get_next_work(inputFileName, totalNbSeries, nbSeriesInChunk, idealNbSeriesInChunk,
- jobsSentCount, lastEndIndex, nbClusters, clustOnMedoids,
- randomize, p_for_dissims);
-
- if (randomize)
- sentSeriesCount += nbSeriesInChunk;
- else
- {
- lastEndIndex = lastEndIndex + work->nbSeries;
- jobsSentCount++;
- }
-
- // Send it to current rank
- pack_work(work, nbSeriesInChunk, packedWork);
- free_work(work);
- fprintf(stdout, "0 / Send work %s to rank=%i / %u\n",inputFileName, rank,
- (randomize ? sentSeriesCount : lastEndIndex));
- MPI_Send(packedWork, work_message_length, MPI_BYTE, rank, WORKTAG, MPI_COMM_WORLD);
-
- busy_slave[rank] = 1;
-
- if ((randomize && sentSeriesCount >= 1.5*totalNbSeries) //TODO: 1.5 = heuristic, magic number...
- || (!randomize && lastEndIndex >= totalNbSeries))
- {
- // Nothing more to read
- break;
- }
- }
-
- // Loop over getting new work requests until there is no more work to be done
- Result_t* result;
- MPI_Status status;
- while (1)
- {
- // If no slave is active, job is over
- int atLeastOneSlaveActive = 0;
- for (int rank = 1; rank < ntasks; rank++)
- {
- if (busy_slave[rank])
- {
- atLeastOneSlaveActive = 1;
- break;
- }
- }
- if (!atLeastOneSlaveActive)
- break;
-
- // Receive results from a slave
- MPI_Recv(packedResult, result_message_length, MPI_BYTE, MPI_ANY_SOURCE,
- WORKTAG, MPI_COMM_WORLD, &status);
- result = unpack_result(packedResult);
- fprintf(stdout, "0 / Receive result from rank=%i on %s\n",status.MPI_SOURCE,inputFileName);
-
- // 'binarize' the result (only series' values) returned by the slave
- binaryResult_to_file(result, inputFileName, outputFileName);
- free_result(result);
- newSeriesCount += nbClusters;
-
- if ((randomize && sentSeriesCount < totalNbSeries)
- || (!randomize && lastEndIndex < totalNbSeries))
- {
- // Get the next unit of work to be done
- work = get_next_work(inputFileName, totalNbSeries, nbSeriesInChunk,
- idealNbSeriesInChunk, jobsSentCount, lastEndIndex, nbClusters,
- clustOnMedoids, randomize, p_for_dissims);
-
- if (randomize)
- sentSeriesCount += nbSeriesInChunk;
- else
- {
- lastEndIndex = lastEndIndex + work->nbSeries;
- jobsSentCount++;
- }
-
- // Send the slave a new work unit
- pack_work(work, nbSeriesInChunk, packedWork);
- free_work(work);
- fprintf(stdout, "0 / Send work %s to rank=%i / %u\n",inputFileName, status.MPI_SOURCE,
- (randomize ? sentSeriesCount : lastEndIndex));
- MPI_Send(packedWork, work_message_length, MPI_BYTE,
- status.MPI_SOURCE, WORKTAG, MPI_COMM_WORLD);
- }
-
- else
- // No more work to do
- busy_slave[status.MPI_SOURCE] = 0;
- }
-
- // There's no more work to be done, so receive all results from the slaves.
- for (int rank = 1; rank < ntasks; rank++)
- {
- if (busy_slave[rank])
- {
- MPI_Recv(packedResult, result_message_length, MPI_BYTE,
- rank, WORKTAG, MPI_COMM_WORLD, &status);
- result = unpack_result(packedResult);
-
- // 'binarize' the result (only series' values) returned by the slave
- binaryResult_to_file(result, inputFileName, outputFileName);
- free_result(result);
- newSeriesCount += nbClusters;
- }
- }
- free(busy_slave);
-
- // Finalize output file: write total number of series inside it, and tsLength
- ofile = fopen(outputFileName, "r+b"); //read and write, binary
- fseek(ofile, 0, SEEK_SET);
- Byte intBuffer[4];
- write_int(newSeriesCount, intBuffer);
- fwrite(intBuffer, 1, 4, ofile);
- write_int(tsLength, intBuffer);
- fwrite(intBuffer, 1, 4, ofile);
- fclose(ofile);
-}
-
-// generate random temporary file names
-static char* get_unique_name()
-{
- size_t nbDigits = 7; //rather arbitrary
- size_t stringLength = 5 + nbDigits; //5 for '.tmp/'
- char* s = (char*) malloc(stringLength + 1);
- s[0] = '.';
- s[1] = 't';
- s[2] = 'm';
- s[3] = 'p';
- s[4] = '/';
- for (int i=0; i<nbDigits; i++)
- s[5+i] = '0' + get_rand_int() % 10;
- s[stringLength] = 0;
- return s;
-}
-
-// code executed by master process
-void master_run(char* mainInputFileName, uint32_t totalNbSeries, uint32_t nbSeriesInChunk,
- double idealNbSeriesInChunk, uint32_t tsLength, uint32_t nbClusters,
- int randomize, uint32_t p_for_dissims)
-{
- // Basic sanity check: nbClusters must be clearly less than series count per chunk
- if (10 * nbClusters >= nbSeriesInChunk)
- {
- fprintf(stdout, "WARNING: cluster size (%u) may be too high compared with chunk size (%u).\n",
- nbClusters, nbSeriesInChunk);
- }
-
- // Find out how many processes there are in the default communicator
- int ntasks;
- MPI_Comm_size(MPI_COMM_WORLD, &ntasks);
- if (ntasks <= 1)
- {
- fprintf(stderr,"No slaves available (only master running).\n");
- return;
- }
-
- // initializations
- char* inputFileName = mainInputFileName;
- char* outputFileName;
-
- uint32_t nbSeriesInFile = totalNbSeries;
-
- while (nbSeriesInFile > nbSeriesInChunk)
- {
- outputFileName = get_unique_name();
- uint32_t clustOnMedoids = (nbSeriesInFile < totalNbSeries ? 1 : 0);
- clusters_reduce(inputFileName, outputFileName, ntasks, nbSeriesInFile, nbSeriesInChunk,
- idealNbSeriesInChunk, tsLength, nbClusters, clustOnMedoids,
- randomize, p_for_dissims);
-
- // read nbSeries in outputFile
- nbSeriesInFile = get_nbSeries(outputFileName);
-
- // update file names
- if (strcmp(mainInputFileName, inputFileName) != 0)
- {
- // No need to keep every intermediate binary
- unlink(inputFileName);
- free(inputFileName);
- }
- inputFileName = outputFileName;
- }
-
- // read nbSeries in inputFileName (the last one)
- // we know that there is at most 'nbSeriesInChunk' series in it
- nbSeriesInFile = get_nbSeries(inputFileName);
-
- // Expected size of a Work message in bytes:
- uint32_t work_message_length = get_packedWork_length(nbSeriesInChunk);
- Byte packedWork[work_message_length];
-
- // Expected size of a Result message in bytes: (uint32_t is on 4 bytes)
- uint32_t result_message_length = get_packedResult_length(nbClusters);
- Byte packedResult[result_message_length];
-
- // Run a last task by some slave, and get final result
- Work_t* work = get_next_work(inputFileName, nbSeriesInFile, nbSeriesInChunk,
- (double)nbSeriesInFile, 0, 0, nbClusters, 1, 0, p_for_dissims);
-
- // Send the slave a new work unit
- pack_work(work, nbSeriesInChunk, packedWork);
- free_work(work);
- int selectedSlaveRank = get_rand_int() % (ntasks - 1) + 1;
- fprintf(stdout, "0 / Send final work %s to rank=%i / %u\n",
- inputFileName, selectedSlaveRank, nbSeriesInFile);
- MPI_Send(packedWork, work_message_length, MPI_BYTE, selectedSlaveRank,
- WORKTAG, MPI_COMM_WORLD);
-
- MPI_Status status;
- // Wait for him to finish
- MPI_Recv(packedResult, result_message_length, MPI_BYTE, selectedSlaveRank,
- WORKTAG, MPI_COMM_WORLD, &status);
- Result_t* finalResult = unpack_result(packedResult);
- fprintf(stdout, "0 / Receive final result from rank=%i on %s\n",status.MPI_SOURCE,inputFileName);
-
- //Tell all the slaves to exit by sending an empty message with the DIETAG.
- for (int rank = 1; rank < ntasks; ++rank)
- MPI_Send(0, 0, MPI_BYTE, rank, DIETAG, MPI_COMM_WORLD);
-
- const char finalBinaryName[] = "ppamFinalSeries.bin";
- result_to_XML(finalResult, inputFileName, finalBinaryName, p_for_dissims);
- free_result(finalResult);
-
- // free memory
- if (strcmp(mainInputFileName, inputFileName))
- {
- // Keep last input binary, but rename it
- rename(inputFileName, finalBinaryName);
- free(inputFileName);
- }
- else
- {
- // just symlink mainInputFileName
- if (!access(finalBinaryName, F_OK))
- unlink(finalBinaryName);
- if (symlink(mainInputFileName, finalBinaryName))
- fprintf(stderr,"Cannot create symlink to initial binary file.\n");
- }
-}