bb4bd467a98ae0c97a49358c7963242fe421ece0
[ppam-mpi.git] / code / src / main.c
1 #include "MPI_Main/master.h"
2 #include "MPI_Main/slave.h"
3 #include "Util/utils.h"
4 #include "Util/rng.h"
5 #include <sys/stat.h>
6 #include <mpi.h>
7 #include <math.h>
8 #include <stdio.h>
9 #include "TimeSeries/serialize.h"
10 #include "TimeSeries/deserialize.h"
11 #include "Classification/getClass.h"
12 #include <string.h>
13 #include <cds/Vector.h>
14 #include <libxml/xmlreader.h>
15
16 // serialize text file argv[1] into a binary file argv[2]
17 int serialize_main(int argc, char** argv)
18 {
19 const char* ifileName = argv[1];
20 const char* ofileName = argv[2];
21 int byCols = atoi(argv[3]);
22 uint32_t nbItems = atoi(argv[4]); //==0 for "all series"
23
24 if (byCols)
25 serialize_byCols(ifileName, ofileName, nbItems);
26 else
27 serialize_byRows(ifileName, ofileName, nbItems);
28 return 0;
29 }
30
31 // deserialize binary file argv[1] into text file argv[2]
32 int deserialize_main(int argc, char** argv)
33 {
34 const char* ifileName = argv[1];
35 const char* ofileName = argv[2];
36 Vector* vranks = vector_new(uint32_t);
37 //each token is at most two ints (a-b = from a to b included)
38 char* token = strtok(argv[3], ",");
39 int retrieveAll = 0;
40 uint32_t* ranks = NULL;
41 while (token)
42 {
43 //scan token to find middle position of '-' (if any)
44 int minusPos = -1;
45 int tokenLength = strlen(token);
46 //loop starts at index 1 because -N is allowed (and means 'everything')
47 for (int i=1; i<tokenLength; i++)
48 {
49 if (token[i] == '-')
50 {
51 minusPos = i;
52 break;
53 }
54 }
55 if (minusPos < 0)
56 {
57 int64_t rank = (int64_t)atoi(token);
58 if (rank <= 0)
59 {
60 retrieveAll = 1;
61 break;
62 }
63 vector_push(vranks, (uint32_t)rank);
64 }
65 else
66 {
67 token[minusPos] = 0;
68 int int1 = atoi(token);
69 int int2 = atoi(token+minusPos+1);
70 for (uint32_t i=int1; i<=int2; i++)
71 vector_push(vranks, i);
72 }
73 token = strtok(NULL, ",");
74 }
75 uint32_t nbRanks = retrieveAll
76 ? 0
77 : vector_size(vranks);
78 if (!retrieveAll)
79 {
80 ranks = (uint32_t*) malloc(nbRanks*sizeof(uint32_t));
81 for (uint32_t i=0; i<nbRanks; i++)
82 {
83 vector_get(vranks, i, ranks[i]);
84 ranks[i]--; //re-express on 0...{n-1}
85 }
86 }
87 vector_destroy(vranks);
88
89 deserialize(ifileName, ofileName, ranks, nbRanks);
90 return 0;
91 }
92
93 //main clustering task (master or slave)
94 int cluster_main(int argc, char **argv)
95 {
96 MPI_Init(&argc, &argv);
97
98 char* ifileName = argv[1]; //could be "../data/test.bin"
99 uint32_t nbSeriesInChunk = atoi(argv[2]); //could be 3000
100 uint32_t nbClusters = atoi(argv[3]); //could be 15
101 int randomize = atoi(argv[4]); //boolean
102 uint32_t p_for_dissims = atoi(argv[5]); //1 for L1, 2 for L2, ...etc
103
104 // Get totalNbSeries and tsLength
105 uint32_t totalNbSeries = get_nbSeries(ifileName);
106 uint32_t tsLength = get_tsLength(ifileName);
107
108 // Basic sanity checks
109 if (nbClusters <= 0 || nbSeriesInChunk <= 1)
110 {
111 MPI_Finalize();
112 return 0;
113 }
114 if (nbSeriesInChunk > totalNbSeries)
115 nbSeriesInChunk = totalNbSeries;
116 if (nbClusters > nbSeriesInChunk)
117 nbClusters = nbSeriesInChunk;
118
119 double idealNbSeriesInChunk = 0.0; //unused if randomize == TRUE
120 if (!randomize)
121 {
122 // Adjust nbSeriesInChunk to avoid small remainders.
123 // Each node should have at least nbSeriesInChunk (as given to the function).
124
125 // ==> We seek for the largest N such that (double)totalNbSeries / N >= nbSeriesInChunk
126 uint32_t N = totalNbSeries / nbSeriesInChunk + 1;
127 while ((double)totalNbSeries / N < nbSeriesInChunk) N--;
128 // At this point N>=1 is the solution
129 idealNbSeriesInChunk = (double)totalNbSeries / N;
130 nbSeriesInChunk = ceil(idealNbSeriesInChunk);
131 }
132
133 // Initialize random generator
134 init_rng(1);
135
136 // Find out my identity in the default communicator
137 int myrank;
138 MPI_Comm_rank(MPI_COMM_WORLD, &myrank);
139
140 if (myrank == 0)
141 {
142 // create temporary folder for intermediate results
143 mkdir(".tmp", S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH | S_IXOTH);
144
145 master_run(ifileName, totalNbSeries, nbSeriesInChunk, idealNbSeriesInChunk,
146 tsLength, nbClusters, randomize, p_for_dissims);
147 }
148
149 else
150 slave_run(myrank, nbSeriesInChunk, nbClusters);
151
152 MPI_Finalize();
153 return 0;
154 }
155
156 //main classification task (using clustering result)
157 int classif_main(int argc, char** argv)
158 {
159 const char* ifileName = argv[1];
160 const char* xmlFileName = argv[2];
161
162 // 1] load and retrieve info from XML file
163 xmlDoc* doc = xmlReadFile(xmlFileName, NULL, 0);
164
165 // Get the root element node
166 xmlNode* root_element = xmlDocGetRootElement(doc);
167
168 uint32_t p_for_dissims = 0;
169 uint32_t* ranks = NULL;
170 uint32_t nbClusters = 0;
171 char* binFileName;
172 for (xmlNode* cur_node=root_element->children; cur_node; cur_node=cur_node->next)
173 {
174 if (cur_node->type != XML_ELEMENT_NODE)
175 continue;
176 if (!strcmp(cur_node->name,"p_for_dissims"))
177 p_for_dissims = atoi(cur_node->last->content);
178 else if (!strcmp(cur_node->name,"ranks"))
179 {
180 //first pass: find nbClusters
181 for (xmlNode* rankNode=cur_node->children; rankNode; rankNode=rankNode->next)
182 {
183 if (rankNode->type == XML_ELEMENT_NODE && !strcmp(rankNode->name,"rank"))
184 nbClusters++;
185 }
186 //second pass: fill ranks (not optimal, but not very important here)
187 ranks = (uint32_t*) malloc(nbClusters*sizeof(uint32_t));
188 uint32_t index = 0;
189 for (xmlNode* rankNode=cur_node->children; rankNode; rankNode=rankNode->next)
190 {
191 if (rankNode->type == XML_ELEMENT_NODE && !strcmp(rankNode->name,"rank"))
192 ranks[index++] = atoi(rankNode->last->content) - 1;
193 }
194 }
195 else if (!strcmp(cur_node->name,"file"))
196 {
197 binFileName = (char*) malloc(strlen(cur_node->last->content)+1);
198 strcpy(binFileName, cur_node->last->content);
199 }
200 }
201
202 xmlFreeDoc(doc);
203 xmlCleanupParser();
204
205 uint32_t tsLength1 = get_tsLength(ifileName);
206 uint32_t tsLength2 = get_tsLength(binFileName);
207 if (tsLength1 != tsLength2)
208 {
209 fprintf(stderr,"Warning: nbValues do not match. Data will be truncated.\n");
210 if (tsLength1 > tsLength2)
211 tsLength1 = tsLength2;
212 }
213 uint32_t nbValues = (tsLength1 - 4) / 3;
214
215 // 2] Classify all series by batches of CURVES_PER_REQUEST
216 uint32_t nbSeries = get_nbSeries(ifileName);
217 PowerCurve* medoids = deserialize(binFileName, NULL, ranks, nbClusters);
218 free(binFileName);
219 free(ranks);
220 ranks = (uint32_t*)malloc(CURVES_PER_REQUEST*sizeof(uint32_t));
221
222 uint32_t smallestNonProcessedIndex = 0;
223 double DISTOR = 0.0;
224 while (smallestNonProcessedIndex < nbSeries)
225 {
226 uint32_t lowerBound = smallestNonProcessedIndex;
227 uint32_t upperBound = smallestNonProcessedIndex + CURVES_PER_REQUEST;
228 if (upperBound > nbSeries)
229 upperBound = nbSeries;
230 for (uint32_t i=0; i<upperBound-lowerBound; i++)
231 ranks[i] = lowerBound + i;
232 PowerCurve* data = deserialize(ifileName, NULL, ranks, upperBound-lowerBound);
233 uint32_t* labels = get_class(data, upperBound-lowerBound, medoids, nbClusters,
234 nbValues, p_for_dissims, &DISTOR);
235 // send labels to standard output
236 for (uint32_t i=0; i<upperBound-lowerBound; i++)
237 {
238 free(data[i].values);
239 fprintf(stdout, "%u\n",labels[i]);
240 }
241 free(data);
242 free(labels);
243 smallestNonProcessedIndex += (upperBound-lowerBound);
244 }
245 for (uint32_t i=0; i<nbClusters; i++)
246 free(medoids[i].values);
247 free(medoids);
248 free(ranks);
249 fprintf(stderr, "DISTOR = %g\n",DISTOR);
250 return 0;
251 }
252
253 int main(int argc, char** argv)
254 {
255 if (argc <= 1)
256 {
257 fprintf(stderr, "No argument provided. Exit.\n");
258 return 1;
259 }
260
261 if (!strcmp(argv[1], "serialize"))
262 return serialize_main(argc-1, argv+1);
263 if (!strcmp(argv[1], "deserialize"))
264 return deserialize_main(argc-1, argv+1);
265 if (!strcmp(argv[1], "cluster"))
266 return cluster_main(argc-1, argv+1);
267 if (!strcmp(argv[1], "classif"))
268 return classif_main(argc-1, argv+1);
269
270 fprintf(stderr, "Unknown first argument.\n");
271 return 1;
272 }