#' }
#' @param K1 Number of clusters to be found after stage 1 (K1 << N [number of series])
#' @param K2 Number of clusters to be found after stage 2 (K2 << K1)
-#' @param nb_series_per_chunk (Maximum) number of series to retrieve in one batch;
-#' this value is also used for the (maximum) number of series to cluster at a time
+#' @param nb_series_per_chunk (Maximum) number of series to retrieve in one batch
+#' @param nb_items_clust (~Maximum) number of items in clustering algorithm 1 input
#' @param algoClust1 Clustering algorithm for stage 1. A function which takes (data, K)
#' as argument where data is a matrix in columns and K the desired number of clusters,
#' and outputs K medoids ranks. Default: PAM. In our method, this function is called
#' digest::sha1(res_db)
#' }
#' @export
-claws <- function(getSeries, K1, K2, nb_series_per_chunk,
+claws <- function(series, K1, K2, nb_series_per_chunk, nb_items_clust=7*K1,
algoClust1=function(data,K) cluster::pam(t(data),K,diss=FALSE,pamonce=1)$id.med,
algoClust2=function(dists,K) cluster::pam(dists,K,diss=TRUE,pamonce=1)$id.med,
wav_filt="d8", contrib_type="absolute", WER="end", nvoice=4, random=TRUE,
K1 <- .toInteger(K1, function(x) x>=2)
K2 <- .toInteger(K2, function(x) x>=2)
nb_series_per_chunk <- .toInteger(nb_series_per_chunk, function(x) x>=1)
- # K1 (number of clusters at step 1) cannot exceed nb_series_per_chunk, because we will need
- # to load K1 series in memory for clustering stage 2.
- if (K1 > nb_series_per_chunk)
- stop("'K1' cannot exceed 'nb_series_per_chunk'")
+ nb_items_clust <- .toInteger(nb_items_clust, function(x) x>K1)
random <- .toLogical(random)
tryCatch( {ignored <- wavelets::wt.filter(wav_filt)},
error = function(e) stop("Invalid wavelet filter; see ?wavelets::wt.filter") )
# Initialize parallel runs: outfile="" allow to output verbose traces in the console
# under Linux. All necessary variables are passed to the workers.
cl = parallel::makeCluster(ncores_tasks, outfile="")
- parallel::clusterExport(cl, envir = environment(),
- varlist = c("getSeries","getContribs","K1","K2","algoClust1","algoClust2",
- "nb_series_per_chunk","ncores_clust","nvoice","nbytes","endian","verbose","parll"))
+ varlist = c("ncores_clust","verbose","parll", #task 1 & 2
+ "K1","getContribs","algoClust1","nb_items_clust") #task 1
+ if (WER=="mix")
+ {
+ # Add variables for task 2
+ varlist = c(varlist, "K2","getSeries","algoClust2","nb_series_per_chunk",
+ "nvoice","nbytes","endian")
+ }
+ parallel::clusterExport(cl, varlist, envir = environment())
}
# This function achieves one complete clustering task, divided in stage 1 + stage 2.
- # stage 1: n indices --> clusteringTask1(...) --> K1 medoids
- # stage 2: K1 medoids --> clusteringTask2(...) --> K2 medoids,
+ # stage 1: n indices --> clusteringTask1(...) --> K1 medoids (indices)
+ # stage 2: K1 indices --> K1xK1 WER distances --> clusteringTask2(...) --> K2 medoids,
# where n = N / ntasks, N being the total number of curves.
runTwoStepClustering = function(inds)
{
if (parll && ntasks>1)
require("epclust", quietly=TRUE)
indices_medoids = clusteringTask1(inds, getContribs, K1, algoClust1,
- nb_series_per_chunk, ncores_clust, verbose, parll)
+ nb_items_clust, ncores_clust, verbose, parll)
if (WER=="mix")
{
indices_medoids = clusteringTask2(indices_medoids, getSeries, K2, algoClust2,
cat(paste(message,"\n", sep=""))
}
- # As explained above, indices will be assigned to ntasks*K1 medoids indices [if WER=="end"],
- # or nothing (empty vector) if WER=="mix"; in this case, synchrones are stored in a file.
+ # As explained above, we obtain after all runs ntasks*[K1 or K2] medoids indices,
+ # depending wether WER=="end" or "mix", respectively.
indices_medoids_all <-
if (parll && ntasks>1)
unlist( parallel::parLapply(cl, indices_tasks, runTwoStepClustering) )
if (parll && ntasks>1)
parallel::stopCluster(cl)
- # Right before the final stage, input data still is the initial set of curves, referenced
- # by the ntasks*[K1 or K2] medoids indices.
+ # For the last stage, ncores_tasks*(ncores_clusts+1) cores should be available:
+ # - ntasks for level 1 parallelism
+ # - ntasks*ncores_clust for level 2 parallelism,
+ # but since an extension MPI <--> tasks / OpenMP <--> sub-tasks is on the way,
+ # it's better to just re-use ncores_clust
+ ncores_last_stage <- ncores_clust
- # Run last clustering tasks to obtain only K2 medoids indices, from ntasks*[K1 or K2]
- # indices, depending wether WER=="end" or WER=="mix"
+ # Run last clustering tasks to obtain only K2 medoids indices
if (verbose)
cat("...Run final // stage 1 + stage 2\n")
indices_medoids = clusteringTask1(indices_medoids_all, getContribs, K1, algoClust1,
- nb_series_per_chunk, ncores_tasks*ncores_clust, verbose, parll)
+ nb_items_clust, ncores_tasks*ncores_clust, verbose, parll)
indices_medoids = clusteringTask2(indices_medoids, getContribs, K2, algoClust2,
- nb_series_per_chunk, nvoice, nbytes, endian, ncores_tasks*ncores_clust, verbose, parll)
+ nb_series_per_chunk, nvoice, nbytes, endian, ncores_last_stage, verbose, parll)
- # Compute synchrones
+ # Compute synchrones, that is to say the cumulated power consumptions for each of the K2
+ # final groups.
medoids = getSeries(indices_medoids)
synchrones = computeSynchrones(medoids, getSeries, nb_curves, nb_series_per_chunk,
- ncores_tasks*ncores_clust, verbose, parll)
+ ncores_last_stage, verbose, parll)
# NOTE: no need to use big.matrix here, since there are only K2 << K1 << N remaining curves
list("medoids"=medoids, "ranks"=indices_medoids, "synchrones"=synchrones)