1 #include "Util/types.h"
2 #include "Util/utils.h"
3 #include "TimeSeries/deserialize.h"
4 #include "Algorithm/compute_coefficients.h"
5 #include "Algorithm/get_dissimilarities.h"
6 #include "Algorithm/pam.h"
12 #include "MPI_Communication/unpack.h"
13 #include "MPI_Communication/pack.h"
15 // main job done by a slave:
16 static Result_t
* do_work(Work_t
* work
)
19 // Obtain reduced coordinates matrix from time-series.
21 uint32_t nbSeries
= work
->nbSeries
;
22 uint32_t tsLength
= get_tsLength(work
->inputFileName
);
23 uint32_t nbValues
= (tsLength
- 4) / 3;
25 // nbReducedCoordinates = smallest power of 2 which is above nbValues
26 uint32_t nbReducedCoordinates
= (uint32_t)ceil(log2(nbValues
));
27 Real
* reducedCoordinates
= (Real
*) malloc(nbSeries
* nbReducedCoordinates
* sizeof(Real
));
29 // call preprocessing with the rows of raw power values matrix.
30 // Keep the IDs in memory for further processing.
31 uint32_t* IDs
= (uint32_t*)malloc(nbSeries
*sizeof(uint32_t));
32 for (uint32_t i
= 0; i
< nbSeries
; i
+=CURVES_PER_REQUEST
)
34 uint32_t nbCurvesRetrieved
= CURVES_PER_REQUEST
;
35 if (i
+ nbCurvesRetrieved
> nbSeries
)
36 nbCurvesRetrieved
-= (i
+ nbCurvesRetrieved
- nbSeries
);
37 PowerCurve
* powerCurves
=
38 deserialize(work
->inputFileName
, NULL
, work
->ranks
+ i
, nbCurvesRetrieved
);
39 compute_coefficients(powerCurves
, nbCurvesRetrieved
, nbValues
,
40 reducedCoordinates
, i
, nbReducedCoordinates
);
41 for (uint32_t ii
=i
; ii
<i
+nbCurvesRetrieved
; ii
++)
43 IDs
[ii
] = powerCurves
[ii
-i
].ID
;
44 free(powerCurves
[ii
-i
].values
);
50 // Run PAM algorithm on the dissimilarity matrix computed from 'reducedCoordinates'.
52 Real
* dissimilarities
= get_dissimilarities_intra(
53 reducedCoordinates
, nbSeries
, nbReducedCoordinates
, work
->p_for_dissims
);
54 free(reducedCoordinates
);
56 uint32_t nbClusters
= work
->nbClusters
;
57 Result_t
* result
= (Result_t
*) malloc(sizeof(Result_t
));
58 result
->medoids_ID
= (uint32_t*) malloc(nbClusters
* sizeof(uint32_t));
59 result
->medoids_ranks
= (uint32_t*) malloc(nbClusters
* sizeof(uint32_t));
60 result
->nbClusters
= nbClusters
;
62 // Run PAM algorithm to fill result->medoids_ranks
63 pam(dissimilarities
, nbSeries
, nbClusters
, work
->clustOnMedoids
, NSTART
, MAXITER
, result
);
64 free(dissimilarities
);
66 // Deduce medoids_IDs from indices
67 for (uint32_t i
= 0; i
< nbClusters
; i
++)
68 result
->medoids_ID
[i
] = IDs
[result
->medoids_ranks
[i
]];
71 // return medoids IDs, indices and items labels (to be post-processed)
75 // code executed by slave process
76 void slave_run(int myrank
, uint32_t nbSeriesInChunk
, uint32_t nbClusters
)
82 // Expected size of a Work message in bytes:
83 uint32_t work_message_length
= get_packedWork_length(nbSeriesInChunk
);
84 Byte packedWork
[work_message_length
];
86 // Expected size of a Result message in bytes: (uint32_t is on 4 bytes)
87 uint32_t result_message_length
= get_packedResult_length(nbClusters
);
88 Byte packedResult
[result_message_length
];
92 // Receive a message from the master
93 MPI_Recv(packedWork
, work_message_length
, MPI_BYTE
, 0,
94 MPI_ANY_TAG
, MPI_COMM_WORLD
, &status
);
96 // Check the tag of the received message.
97 if (status
.MPI_TAG
== DIETAG
)
101 work
= unpack_work(packedWork
, nbSeriesInChunk
);
102 fprintf(stdout
, "%u / Slave pid=%u work on %s\n",myrank
,getpid(),work
->inputFileName
);
103 result
= do_work(work
);
106 // Send the result back
107 pack_result(result
, packedResult
);
109 MPI_Send(packedResult
, result_message_length
, MPI_BYTE
, 0, WORKTAG
, MPI_COMM_WORLD
);