'update'
[epclust.git] / epclust / R / main.R
index 603f7bf..ce650ff 100644 (file)
@@ -45,8 +45,8 @@
 #'   }
 #' @param K1 Number of clusters to be found after stage 1 (K1 << N [number of series])
 #' @param K2 Number of clusters to be found after stage 2 (K2 << K1)
-#' @param nb_series_per_chunk (Maximum) number of series to retrieve in one batch;
-#'   this value is also used for the (maximum) number of series to cluster at a time
+#' @param nb_series_per_chunk (Maximum) number of series to retrieve in one batch
+#' @param nb_items_clust (~Maximum) number of items in clustering algorithm 1 input
 #' @param algoClust1 Clustering algorithm for stage 1. A function which takes (data, K)
 #'   as argument where data is a matrix in columns and K the desired number of clusters,
 #'   and outputs K medoids ranks. Default: PAM. In our method, this function is called
 #' digest::sha1(res_db)
 #' }
 #' @export
-claws <- function(getSeries, K1, K2, nb_series_per_chunk,
+claws <- function(series, K1, K2, nb_series_per_chunk, nb_items_clust=7*K1,
        algoClust1=function(data,K) cluster::pam(t(data),K,diss=FALSE,pamonce=1)$id.med,
        algoClust2=function(dists,K) cluster::pam(dists,K,diss=TRUE,pamonce=1)$id.med,
        wav_filt="d8", contrib_type="absolute", WER="end", nvoice=4, random=TRUE,
@@ -167,10 +167,7 @@ claws <- function(getSeries, K1, K2, nb_series_per_chunk,
        K1 <- .toInteger(K1, function(x) x>=2)
        K2 <- .toInteger(K2, function(x) x>=2)
        nb_series_per_chunk <- .toInteger(nb_series_per_chunk, function(x) x>=1)
-       # K1 (number of clusters at step 1) cannot exceed nb_series_per_chunk, because we will need
-       # to load K1 series in memory for clustering stage 2.
-       if (K1 > nb_series_per_chunk)
-               stop("'K1' cannot exceed 'nb_series_per_chunk'")
+       nb_items_clust <- .toInteger(nb_items_clust, function(x) x>K1)
        random <- .toLogical(random)
        tryCatch( {ignored <- wavelets::wt.filter(wav_filt)},
                error = function(e) stop("Invalid wavelet filter; see ?wavelets::wt.filter") )
@@ -247,14 +244,20 @@ claws <- function(getSeries, K1, K2, nb_series_per_chunk,
                # Initialize parallel runs: outfile="" allow to output verbose traces in the console
                # under Linux. All necessary variables are passed to the workers.
                cl = parallel::makeCluster(ncores_tasks, outfile="")
-               parallel::clusterExport(cl, envir = environment(),
-                       varlist = c("getSeries","getContribs","K1","K2","algoClust1","algoClust2",
-                       "nb_series_per_chunk","ncores_clust","nvoice","nbytes","endian","verbose","parll"))
+               varlist = c("ncores_clust","verbose","parll", #task 1 & 2
+                       "K1","getContribs","algoClust1","nb_items_clust") #task 1
+               if (WER=="mix")
+               {
+                       # Add variables for task 2
+                       varlist = c(varlist, "K2","getSeries","algoClust2","nb_series_per_chunk",
+                               "nvoice","nbytes","endian")
+               }
+               parallel::clusterExport(cl, varlist, envir = environment())
        }
 
        # This function achieves one complete clustering task, divided in stage 1 + stage 2.
-       # stage 1: n indices  --> clusteringTask1(...) --> K1 medoids
-       # stage 2: K1 medoids --> clusteringTask2(...) --> K2 medoids,
+       # stage 1: n indices  --> clusteringTask1(...) --> K1 medoids (indices)
+       # stage 2: K1 indices --> K1xK1 WER distances --> clusteringTask2(...) --> K2 medoids,
        # where n = N / ntasks, N being the total number of curves.
        runTwoStepClustering = function(inds)
        {
@@ -263,7 +266,7 @@ claws <- function(getSeries, K1, K2, nb_series_per_chunk,
                if (parll && ntasks>1)
                        require("epclust", quietly=TRUE)
                indices_medoids = clusteringTask1(inds, getContribs, K1, algoClust1,
-                       nb_series_per_chunk, ncores_clust, verbose, parll)
+                       nb_items_clust, ncores_clust, verbose, parll)
                if (WER=="mix")
                {
                        indices_medoids = clusteringTask2(indices_medoids, getSeries, K2, algoClust2,
@@ -280,8 +283,8 @@ claws <- function(getSeries, K1, K2, nb_series_per_chunk,
                cat(paste(message,"\n", sep=""))
        }
 
-       # As explained above, indices will be assigned to ntasks*K1 medoids indices [if WER=="end"],
-       # or nothing (empty vector) if WER=="mix"; in this case, synchrones are stored in a file.
+       # As explained above, we obtain after all runs ntasks*[K1 or K2] medoids indices,
+       # depending wether WER=="end" or "mix", respectively.
        indices_medoids_all <-
                if (parll && ntasks>1)
                        unlist( parallel::parLapply(cl, indices_tasks, runTwoStepClustering) )
@@ -291,22 +294,26 @@ claws <- function(getSeries, K1, K2, nb_series_per_chunk,
        if (parll && ntasks>1)
                parallel::stopCluster(cl)
 
-       # Right before the final stage, input data still is the initial set of curves, referenced
-       # by the ntasks*[K1 or K2] medoids indices.
+       # For the last stage, ncores_tasks*(ncores_clusts+1) cores should be available:
+       #  - ntasks for level 1 parallelism
+       #  - ntasks*ncores_clust for level 2 parallelism,
+       # but since an extension MPI <--> tasks / OpenMP <--> sub-tasks is on the way,
+       # it's better to just re-use ncores_clust
+       ncores_last_stage <- ncores_clust
 
-       # Run last clustering tasks to obtain only K2 medoids indices, from ntasks*[K1 or K2]
-       # indices, depending wether WER=="end" or WER=="mix"
+       # Run last clustering tasks to obtain only K2 medoids indices
        if (verbose)
                cat("...Run final // stage 1 + stage 2\n")
        indices_medoids = clusteringTask1(indices_medoids_all, getContribs, K1, algoClust1,
-               nb_series_per_chunk, ncores_tasks*ncores_clust, verbose, parll)
+               nb_items_clust, ncores_tasks*ncores_clust, verbose, parll)
        indices_medoids = clusteringTask2(indices_medoids, getContribs, K2, algoClust2,
-               nb_series_per_chunk, nvoice, nbytes, endian, ncores_tasks*ncores_clust, verbose, parll)
+               nb_series_per_chunk, nvoice, nbytes, endian, ncores_last_stage, verbose, parll)
 
-       # Compute synchrones
+       # Compute synchrones, that is to say the cumulated power consumptions for each of the K2
+       # final groups.
        medoids = getSeries(indices_medoids)
        synchrones = computeSynchrones(medoids, getSeries, nb_curves, nb_series_per_chunk,
-               ncores_tasks*ncores_clust, verbose, parll)
+               ncores_last_stage, verbose, parll)
 
        # NOTE: no need to use big.matrix here, since there are only K2 << K1 << N remaining curves
        list("medoids"=medoids, "ranks"=indices_medoids, "synchrones"=synchrones)