+ if (parll && ntasks>1)
+ {
+ # Initialize parallel runs: outfile="" allow to output verbose traces in the console
+ # under Linux. All necessary variables are passed to the workers.
+ cl <- parallel::makeCluster(ncores_tasks, outfile="")
+ varlist <- c("ncores_clust","verbose","parll", #task 1 & 2
+ "K1","getContribs","algoClust1","nb_items_clust") #task 1
+ if (WER=="mix")
+ {
+ # Add variables for task 2
+ varlist <- c(varlist, "K2","getSeries","algoClust2","nb_series_per_chunk",
+ "smooth_lvl","nvoice","nbytes","endian")
+ }
+ parallel::clusterExport(cl, varlist, envir <- environment())
+ }
+
+ # This function achieves one complete clustering task, divided in stage 1 + stage 2.
+ # stage 1: n indices --> clusteringTask1(...) --> K1 medoids (indices)
+ # stage 2: K1 indices --> K1xK1 WER distances --> clusteringTask2(...) --> K2 medoids,
+ # where n == N / ntasks, N being the total number of curves.
+ runTwoStepClustering <- function(inds)
+ {
+ # When running in parallel, the environment is blank: we need to load the required
+ # packages, and pass useful variables.
+ if (parll && ntasks>1)
+ require("epclust", quietly=TRUE)
+ indices_medoids <- clusteringTask1(inds, getContribs, K1, algoClust1,
+ nb_items_clust, ncores_clust, verbose, parll)
+ if (WER=="mix")
+ {
+ indices_medoids <- clusteringTask2(indices_medoids, getSeries, K2, algoClust2,
+ nb_series_per_chunk,smooth_lvl,nvoice,nbytes,endian,ncores_clust,verbose,parll)
+ }
+ indices_medoids
+ }
+
+ if (verbose)
+ {
+ message <- paste("...Run ",ntasks," x stage 1", sep="")
+ if (WER=="mix")
+ message <- paste(message," + stage 2", sep="")
+ cat(paste(message,"\n", sep=""))
+ }
+
+ # As explained above, we obtain after all runs ntasks*[K1 or K2] medoids indices,
+ # depending wether WER=="end" or "mix", respectively.
+ indices_medoids_all <-
+ if (parll && ntasks>1)
+ unlist( parallel::parLapply(cl, indices_tasks, runTwoStepClustering) )
+ else
+ unlist( lapply(indices_tasks, runTwoStepClustering) )
+
+ if (parll && ntasks>1)
+ parallel::stopCluster(cl)
+
+ # For the last stage, ncores_tasks*(ncores_clusts+1) cores should be available:
+ # - ntasks for level 1 parallelism
+ # - ntasks*ncores_clust for level 2 parallelism,
+ # but since an extension MPI <--> tasks / OpenMP <--> sub-tasks is on the way,
+ # it's better to just re-use ncores_clust
+ ncores_last_stage <- ncores_clust
+
+ # Run last clustering tasks to obtain only K2 medoids indices
+ if (verbose)
+ cat("...Run final // stage 1 + stage 2\n")
+ indices_medoids <- clusteringTask1(indices_medoids_all, getContribs, K1, algoClust1,
+ nb_items_clust, ncores_tasks*ncores_clust, verbose, parll)
+ indices_medoids <- clusteringTask2(indices_medoids, getContribs, K2, algoClust2,
+ nb_series_per_chunk,smooth_lvl,nvoice,nbytes,endian,ncores_last_stage,verbose,parll)
+
+ # Compute synchrones, that is to say the cumulated power consumptions for each of the K2
+ # final groups.
+ medoids <- getSeries(indices_medoids)
+ synchrones <- computeSynchrones(medoids, getSeries, nb_curves, nb_series_per_chunk,
+ ncores_last_stage, verbose, parll)
+
+ # NOTE: no need to use big.matrix here, since there are only K2 << K1 << N remaining curves
+ list("medoids"=medoids, "ranks"=indices_medoids, "synchrones"=synchrones)