| 1 | #include "MPI_Main/master.h" |
| 2 | #include "MPI_Main/slave.h" |
| 3 | #include "Util/utils.h" |
| 4 | #include "Util/rng.h" |
| 5 | #include <sys/stat.h> |
| 6 | #include <mpi.h> |
| 7 | #include <math.h> |
| 8 | #include <stdio.h> |
| 9 | #include "TimeSeries/serialize.h" |
| 10 | #include "TimeSeries/deserialize.h" |
| 11 | #include "Classification/getClass.h" |
| 12 | #include <string.h> |
| 13 | #include <cds/Vector.h> |
| 14 | #include <libxml/xmlreader.h> |
| 15 | |
| 16 | // serialize text file argv[1] into a binary file argv[2] |
| 17 | int serialize_main(int argc, char** argv) |
| 18 | { |
| 19 | const char* ifileName = argv[1]; |
| 20 | const char* ofileName = argv[2]; |
| 21 | int byCols = atoi(argv[3]); |
| 22 | uint32_t nbItems = atoi(argv[4]); //==0 for "all series" |
| 23 | |
| 24 | if (byCols) |
| 25 | serialize_byCols(ifileName, ofileName, nbItems); |
| 26 | else |
| 27 | serialize_byRows(ifileName, ofileName, nbItems); |
| 28 | return 0; |
| 29 | } |
| 30 | |
| 31 | // deserialize binary file argv[1] into text file argv[2] |
| 32 | int deserialize_main(int argc, char** argv) |
| 33 | { |
| 34 | const char* ifileName = argv[1]; |
| 35 | const char* ofileName = argv[2]; |
| 36 | Vector* vranks = vector_new(uint32_t); |
| 37 | //each token is at most two ints (a-b = from a to b included) |
| 38 | char* token = strtok(argv[3], ","); |
| 39 | int retrieveAll = 0; |
| 40 | uint32_t* ranks = NULL; |
| 41 | while (token) |
| 42 | { |
| 43 | //scan token to find middle position of '-' (if any) |
| 44 | int minusPos = -1; |
| 45 | int tokenLength = strlen(token); |
| 46 | //loop starts at index 1 because -N is allowed (and means 'everything') |
| 47 | for (int i=1; i<tokenLength; i++) |
| 48 | { |
| 49 | if (token[i] == '-') |
| 50 | { |
| 51 | minusPos = i; |
| 52 | break; |
| 53 | } |
| 54 | } |
| 55 | if (minusPos < 0) |
| 56 | { |
| 57 | int64_t rank = (int64_t)atoi(token); |
| 58 | if (rank <= 0) |
| 59 | { |
| 60 | retrieveAll = 1; |
| 61 | break; |
| 62 | } |
| 63 | vector_push(vranks, (uint32_t)rank); |
| 64 | } |
| 65 | else |
| 66 | { |
| 67 | token[minusPos] = 0; |
| 68 | int int1 = atoi(token); |
| 69 | int int2 = atoi(token+minusPos+1); |
| 70 | for (uint32_t i=int1; i<=int2; i++) |
| 71 | vector_push(vranks, i); |
| 72 | } |
| 73 | token = strtok(NULL, ","); |
| 74 | } |
| 75 | uint32_t nbRanks = retrieveAll |
| 76 | ? 0 |
| 77 | : vector_size(vranks); |
| 78 | if (!retrieveAll) |
| 79 | { |
| 80 | ranks = (uint32_t*) malloc(nbRanks*sizeof(uint32_t)); |
| 81 | for (uint32_t i=0; i<nbRanks; i++) |
| 82 | { |
| 83 | vector_get(vranks, i, ranks[i]); |
| 84 | ranks[i]--; //re-express on 0...{n-1} |
| 85 | } |
| 86 | } |
| 87 | vector_destroy(vranks); |
| 88 | |
| 89 | deserialize(ifileName, ofileName, ranks, nbRanks); |
| 90 | return 0; |
| 91 | } |
| 92 | |
| 93 | //main clustering task (master or slave) |
| 94 | int cluster_main(int argc, char **argv) |
| 95 | { |
| 96 | MPI_Init(&argc, &argv); |
| 97 | |
| 98 | char* ifileName = argv[1]; //could be "../data/test.bin" |
| 99 | uint32_t nbSeriesInChunk = atoi(argv[2]); //could be 3000 |
| 100 | uint32_t nbClusters = atoi(argv[3]); //could be 15 |
| 101 | int randomize = atoi(argv[4]); //boolean |
| 102 | uint32_t p_for_dissims = atoi(argv[5]); //1 for L1, 2 for L2, ...etc |
| 103 | |
| 104 | // Get totalNbSeries and tsLength |
| 105 | uint32_t totalNbSeries = get_nbSeries(ifileName); |
| 106 | uint32_t tsLength = get_tsLength(ifileName); |
| 107 | |
| 108 | // Basic sanity checks |
| 109 | if (nbClusters <= 0 || nbSeriesInChunk <= 1) |
| 110 | { |
| 111 | MPI_Finalize(); |
| 112 | return 0; |
| 113 | } |
| 114 | if (nbSeriesInChunk > totalNbSeries) |
| 115 | nbSeriesInChunk = totalNbSeries; |
| 116 | if (nbClusters > nbSeriesInChunk) |
| 117 | nbClusters = nbSeriesInChunk; |
| 118 | |
| 119 | double idealNbSeriesInChunk = 0.0; //unused if randomize == TRUE |
| 120 | if (!randomize) |
| 121 | { |
| 122 | // Adjust nbSeriesInChunk to avoid small remainders. |
| 123 | // Each node should have at least nbSeriesInChunk (as given to the function). |
| 124 | |
| 125 | // ==> We seek for the largest N such that (double)totalNbSeries / N >= nbSeriesInChunk |
| 126 | uint32_t N = totalNbSeries / nbSeriesInChunk + 1; |
| 127 | while ((double)totalNbSeries / N < nbSeriesInChunk) N--; |
| 128 | // At this point N>=1 is the solution |
| 129 | idealNbSeriesInChunk = (double)totalNbSeries / N; |
| 130 | nbSeriesInChunk = ceil(idealNbSeriesInChunk); |
| 131 | } |
| 132 | |
| 133 | // Initialize random generator |
| 134 | init_rng(1); |
| 135 | |
| 136 | // Find out my identity in the default communicator |
| 137 | int myrank; |
| 138 | MPI_Comm_rank(MPI_COMM_WORLD, &myrank); |
| 139 | |
| 140 | if (myrank == 0) |
| 141 | { |
| 142 | // create temporary folder for intermediate results |
| 143 | mkdir(".tmp", S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH | S_IXOTH); |
| 144 | |
| 145 | master_run(ifileName, totalNbSeries, nbSeriesInChunk, idealNbSeriesInChunk, |
| 146 | tsLength, nbClusters, randomize, p_for_dissims); |
| 147 | } |
| 148 | |
| 149 | else |
| 150 | slave_run(myrank, nbSeriesInChunk, nbClusters); |
| 151 | |
| 152 | MPI_Finalize(); |
| 153 | return 0; |
| 154 | } |
| 155 | |
| 156 | //main classification task (using clustering result) |
| 157 | int classif_main(int argc, char** argv) |
| 158 | { |
| 159 | const char* ifileName = argv[1]; |
| 160 | const char* xmlFileName = argv[2]; |
| 161 | |
| 162 | // 1] load and retrieve info from XML file |
| 163 | xmlDoc* doc = xmlReadFile(xmlFileName, NULL, 0); |
| 164 | |
| 165 | // Get the root element node |
| 166 | xmlNode* root_element = xmlDocGetRootElement(doc); |
| 167 | |
| 168 | uint32_t p_for_dissims = 0; |
| 169 | uint32_t* ranks = NULL; |
| 170 | uint32_t nbClusters = 0; |
| 171 | char* binFileName; |
| 172 | for (xmlNode* cur_node=root_element->children; cur_node; cur_node=cur_node->next) |
| 173 | { |
| 174 | if (cur_node->type != XML_ELEMENT_NODE) |
| 175 | continue; |
| 176 | if (!strcmp(cur_node->name,"p_for_dissims")) |
| 177 | p_for_dissims = atoi(cur_node->last->content); |
| 178 | else if (!strcmp(cur_node->name,"ranks")) |
| 179 | { |
| 180 | //first pass: find nbClusters |
| 181 | for (xmlNode* rankNode=cur_node->children; rankNode; rankNode=rankNode->next) |
| 182 | { |
| 183 | if (rankNode->type == XML_ELEMENT_NODE && !strcmp(rankNode->name,"rank")) |
| 184 | nbClusters++; |
| 185 | } |
| 186 | //second pass: fill ranks (not optimal, but not very important here) |
| 187 | ranks = (uint32_t*) malloc(nbClusters*sizeof(uint32_t)); |
| 188 | uint32_t index = 0; |
| 189 | for (xmlNode* rankNode=cur_node->children; rankNode; rankNode=rankNode->next) |
| 190 | { |
| 191 | if (rankNode->type == XML_ELEMENT_NODE && !strcmp(rankNode->name,"rank")) |
| 192 | ranks[index++] = atoi(rankNode->last->content) - 1; |
| 193 | } |
| 194 | } |
| 195 | else if (!strcmp(cur_node->name,"file")) |
| 196 | { |
| 197 | binFileName = (char*) malloc(strlen(cur_node->last->content)+1); |
| 198 | strcpy(binFileName, cur_node->last->content); |
| 199 | } |
| 200 | } |
| 201 | |
| 202 | xmlFreeDoc(doc); |
| 203 | xmlCleanupParser(); |
| 204 | |
| 205 | uint32_t tsLength1 = get_tsLength(ifileName); |
| 206 | uint32_t tsLength2 = get_tsLength(binFileName); |
| 207 | if (tsLength1 != tsLength2) |
| 208 | { |
| 209 | fprintf(stderr,"Warning: nbValues do not match. Data will be truncated.\n"); |
| 210 | if (tsLength1 > tsLength2) |
| 211 | tsLength1 = tsLength2; |
| 212 | } |
| 213 | uint32_t nbValues = (tsLength1 - 4) / 3; |
| 214 | |
| 215 | // 2] Classify all series by batches of CURVES_PER_REQUEST |
| 216 | uint32_t nbSeries = get_nbSeries(ifileName); |
| 217 | PowerCurve* medoids = deserialize(binFileName, NULL, ranks, nbClusters); |
| 218 | free(binFileName); |
| 219 | free(ranks); |
| 220 | ranks = (uint32_t*)malloc(CURVES_PER_REQUEST*sizeof(uint32_t)); |
| 221 | |
| 222 | uint32_t smallestNonProcessedIndex = 0; |
| 223 | double DISTOR = 0.0; |
| 224 | while (smallestNonProcessedIndex < nbSeries) |
| 225 | { |
| 226 | uint32_t lowerBound = smallestNonProcessedIndex; |
| 227 | uint32_t upperBound = smallestNonProcessedIndex + CURVES_PER_REQUEST; |
| 228 | if (upperBound > nbSeries) |
| 229 | upperBound = nbSeries; |
| 230 | for (uint32_t i=0; i<upperBound-lowerBound; i++) |
| 231 | ranks[i] = lowerBound + i; |
| 232 | PowerCurve* data = deserialize(ifileName, NULL, ranks, upperBound-lowerBound); |
| 233 | uint32_t* labels = get_class(data, upperBound-lowerBound, medoids, nbClusters, |
| 234 | nbValues, p_for_dissims, &DISTOR); |
| 235 | // send labels to standard output |
| 236 | for (uint32_t i=0; i<upperBound-lowerBound; i++) |
| 237 | { |
| 238 | free(data[i].values); |
| 239 | fprintf(stdout, "%u\n",labels[i]); |
| 240 | } |
| 241 | free(data); |
| 242 | free(labels); |
| 243 | smallestNonProcessedIndex += (upperBound-lowerBound); |
| 244 | } |
| 245 | for (uint32_t i=0; i<nbClusters; i++) |
| 246 | free(medoids[i].values); |
| 247 | free(medoids); |
| 248 | free(ranks); |
| 249 | fprintf(stderr, "DISTOR = %g\n",DISTOR); |
| 250 | return 0; |
| 251 | } |
| 252 | |
| 253 | int main(int argc, char** argv) |
| 254 | { |
| 255 | if (argc <= 1) |
| 256 | { |
| 257 | fprintf(stderr, "No argument provided. Exit.\n"); |
| 258 | return 1; |
| 259 | } |
| 260 | |
| 261 | if (!strcmp(argv[1], "serialize")) |
| 262 | return serialize_main(argc-1, argv+1); |
| 263 | if (!strcmp(argv[1], "deserialize")) |
| 264 | return deserialize_main(argc-1, argv+1); |
| 265 | if (!strcmp(argv[1], "cluster")) |
| 266 | return cluster_main(argc-1, argv+1); |
| 267 | if (!strcmp(argv[1], "classif")) |
| 268 | return classif_main(argc-1, argv+1); |
| 269 | |
| 270 | fprintf(stderr, "Unknown first argument.\n"); |
| 271 | return 1; |
| 272 | } |