+ if (parll && ntasks>1)
+ {
+ # Initialize parallel runs: outfile="" allow to output verbose traces in the console
+ # under Linux. All necessary variables are passed to the workers.
+ cl = parallel::makeCluster(ncores_tasks, outfile="")
+ parallel::clusterExport(cl, envir = environment(),
+ varlist = c("getSeries","getContribs","K1","K2","algoClust1","algoClust2",
+ "nb_series_per_chunk","ncores_clust","nvoice","nbytes","endian","verbose","parll"))
+ }
+
+ # This function achieves one complete clustering task, divided in stage 1 + stage 2.
+ # stage 1: n indices --> clusteringTask1(...) --> K1 medoids
+ # stage 2: K1 medoids --> clusteringTask2(...) --> K2 medoids,
+ # where n = N / ntasks, N being the total number of curves.
+ runTwoStepClustering = function(inds)
+ {
+ # When running in parallel, the environment is blank: we need to load the required
+ # packages, and pass useful variables.
+ if (parll && ntasks>1)
+ require("epclust", quietly=TRUE)
+ indices_medoids = clusteringTask1(inds, getContribs, K1, algoClust1,
+ nb_series_per_chunk, ncores_clust, verbose, parll)
+ if (WER=="mix")
+ {
+ indices_medoids = clusteringTask2(indices_medoids, getSeries, K2, algoClust2,
+ nb_series_per_chunk, nvoice, nbytes, endian, ncores_clust, verbose, parll)
+ }
+ indices_medoids
+ }
+
+ if (verbose)
+ {
+ message = paste("...Run ",ntasks," x stage 1", sep="")
+ if (WER=="mix")
+ message = paste(message," + stage 2", sep="")
+ cat(paste(message,"\n", sep=""))
+ }
+
+ # As explained above, indices will be assigned to ntasks*K1 medoids indices [if WER=="end"],
+ # or nothing (empty vector) if WER=="mix"; in this case, synchrones are stored in a file.
+ indices_medoids_all <-
+ if (parll && ntasks>1)
+ unlist( parallel::parLapply(cl, indices_tasks, runTwoStepClustering) )
+ else
+ unlist( lapply(indices_tasks, runTwoStepClustering) )
+
+ if (parll && ntasks>1)
+ parallel::stopCluster(cl)
+
+ # Right before the final stage, input data still is the initial set of curves, referenced
+ # by the ntasks*[K1 or K2] medoids indices.
+
+ # Run last clustering tasks to obtain only K2 medoids indices, from ntasks*[K1 or K2]
+ # indices, depending wether WER=="end" or WER=="mix"
+ if (verbose)
+ cat("...Run final // stage 1 + stage 2\n")
+ indices_medoids = clusteringTask1(indices_medoids_all, getContribs, K1, algoClust1,
+ nb_series_per_chunk, ncores_tasks*ncores_clust, verbose, parll)
+ indices_medoids = clusteringTask2(indices_medoids, getContribs, K2, algoClust2,
+ nb_series_per_chunk, nvoice, nbytes, endian, ncores_tasks*ncores_clust, verbose, parll)
+
+ # Compute synchrones
+ medoids = getSeries(indices_medoids)
+ synchrones = computeSynchrones(medoids, getSeries, nb_curves, nb_series_per_chunk,
+ ncores_tasks*ncores_clust, verbose, parll)
+
+ # NOTE: no need to use big.matrix here, since there are only K2 << K1 << N remaining curves
+ list("medoids"=medoids, "ranks"=indices_medoids, "synchrones"=synchrones)