commit last state
[ppam-mpi.git] / code / src / MPI_Main / slave.c
CommitLineData
81923e5c
BA
1#include "Util/types.h"
2#include "Util/utils.h"
3#include "TimeSeries/deserialize.h"
4#include "Algorithm/compute_coefficients.h"
5#include "Algorithm/get_dissimilarities.h"
6#include "Algorithm/pam.h"
7#include <mpi.h>
8#include <stdio.h>
9#include <math.h>
10#include <stdlib.h>
11#include <unistd.h>
12#include "MPI_Communication/unpack.h"
13#include "MPI_Communication/pack.h"
14
15// main job done by a slave:
16static Result_t* do_work(Work_t* work)
17{
18 // *** Step 1 ***
19 // Obtain reduced coordinates matrix from time-series.
20
21 uint32_t nbSeries = work->nbSeries;
22 uint32_t tsLength = get_tsLength(work->inputFileName);
23 uint32_t nbValues = (tsLength - 4) / 3;
24
25 // nbReducedCoordinates = smallest power of 2 which is above nbValues
26 uint32_t nbReducedCoordinates = (uint32_t)ceil(log2(nbValues));
27 Real* reducedCoordinates = (Real*) malloc(nbSeries * nbReducedCoordinates * sizeof(Real));
28
29 // call preprocessing with the rows of raw power values matrix.
30 // Keep the IDs in memory for further processing.
31 uint32_t* IDs = (uint32_t*)malloc(nbSeries*sizeof(uint32_t));
32 for (uint32_t i = 0; i < nbSeries; i+=CURVES_PER_REQUEST)
33 {
34 uint32_t nbCurvesRetrieved = CURVES_PER_REQUEST;
35 if (i + nbCurvesRetrieved > nbSeries)
36 nbCurvesRetrieved -= (i + nbCurvesRetrieved - nbSeries);
37 PowerCurve* powerCurves =
38 deserialize(work->inputFileName, NULL, work->ranks + i, nbCurvesRetrieved);
39 compute_coefficients(powerCurves, nbCurvesRetrieved, nbValues,
40 reducedCoordinates, i, nbReducedCoordinates);
41 for (uint32_t ii=i; ii<i+nbCurvesRetrieved; ii++)
42 {
43 IDs[ii] = powerCurves[ii-i].ID;
44 free(powerCurves[ii-i].values);
45 }
46 free(powerCurves);
47 }
48
49 // *** Step 2 ***
50 // Run PAM algorithm on the dissimilarity matrix computed from 'reducedCoordinates'.
51
52 Real* dissimilarities = get_dissimilarities_intra(
53 reducedCoordinates, nbSeries, nbReducedCoordinates, work->p_for_dissims);
54 free(reducedCoordinates);
55
56 uint32_t nbClusters = work->nbClusters;
57 Result_t* result = (Result_t*) malloc(sizeof(Result_t));
58 result->medoids_ID = (uint32_t*) malloc(nbClusters * sizeof(uint32_t));
59 result->medoids_ranks = (uint32_t*) malloc(nbClusters * sizeof(uint32_t));
60 result->nbClusters = nbClusters;
61
62 // Run PAM algorithm to fill result->medoids_ranks
63 pam(dissimilarities, nbSeries, nbClusters, work->clustOnMedoids, NSTART, MAXITER, result);
64 free(dissimilarities);
65
66 // Deduce medoids_IDs from indices
67 for (uint32_t i = 0; i < nbClusters; i++)
68 result->medoids_ID[i] = IDs[result->medoids_ranks[i]];
69 free(IDs);
70
71 // return medoids IDs, indices and items labels (to be post-processed)
72 return result;
73}
74
75// code executed by slave process
76void slave_run(int myrank, uint32_t nbSeriesInChunk, uint32_t nbClusters)
77{
78 Work_t* work;
79 Result_t* result;
80 MPI_Status status;
81
82 // Expected size of a Work message in bytes:
83 uint32_t work_message_length = get_packedWork_length(nbSeriesInChunk);
84 Byte packedWork[work_message_length];
85
86 // Expected size of a Result message in bytes: (uint32_t is on 4 bytes)
87 uint32_t result_message_length = get_packedResult_length(nbClusters);
88 Byte packedResult[result_message_length];
89
90 while (1)
91 {
92 // Receive a message from the master
93 MPI_Recv(packedWork, work_message_length, MPI_BYTE, 0,
94 MPI_ANY_TAG, MPI_COMM_WORLD, &status);
95
96 // Check the tag of the received message.
97 if (status.MPI_TAG == DIETAG)
98 return;
99
100 // Do the work
101 work = unpack_work(packedWork, nbSeriesInChunk);
102 fprintf(stdout, "%u / Slave pid=%u work on %s\n",myrank,getpid(),work->inputFileName);
103 result = do_work(work);
104 free_work(work);
105
106 // Send the result back
107 pack_result(result, packedResult);
108 free_result(result);
109 MPI_Send(packedResult, result_message_length, MPI_BYTE, 0, WORKTAG, MPI_COMM_WORLD);
110 }
111}