X-Git-Url: https://git.auder.net/?p=epclust.git;a=blobdiff_plain;f=epclust%2FR%2Fmain.R;h=ce650ff6522ab9d8f774fd526d3506fbb6317b25;hp=603f7bfbbec05e2c71db48bcc75a7132507eb2f4;hb=3c5a4b0880db63367a474a568e1322b3999932fe;hpb=40f12a2f66d06fd77183ea02b996f5c66f90761c diff --git a/epclust/R/main.R b/epclust/R/main.R index 603f7bf..ce650ff 100644 --- a/epclust/R/main.R +++ b/epclust/R/main.R @@ -45,8 +45,8 @@ #' } #' @param K1 Number of clusters to be found after stage 1 (K1 << N [number of series]) #' @param K2 Number of clusters to be found after stage 2 (K2 << K1) -#' @param nb_series_per_chunk (Maximum) number of series to retrieve in one batch; -#' this value is also used for the (maximum) number of series to cluster at a time +#' @param nb_series_per_chunk (Maximum) number of series to retrieve in one batch +#' @param nb_items_clust (~Maximum) number of items in clustering algorithm 1 input #' @param algoClust1 Clustering algorithm for stage 1. A function which takes (data, K) #' as argument where data is a matrix in columns and K the desired number of clusters, #' and outputs K medoids ranks. Default: PAM. In our method, this function is called @@ -150,7 +150,7 @@ #' digest::sha1(res_db) #' } #' @export -claws <- function(getSeries, K1, K2, nb_series_per_chunk, +claws <- function(series, K1, K2, nb_series_per_chunk, nb_items_clust=7*K1, algoClust1=function(data,K) cluster::pam(t(data),K,diss=FALSE,pamonce=1)$id.med, algoClust2=function(dists,K) cluster::pam(dists,K,diss=TRUE,pamonce=1)$id.med, wav_filt="d8", contrib_type="absolute", WER="end", nvoice=4, random=TRUE, @@ -167,10 +167,7 @@ claws <- function(getSeries, K1, K2, nb_series_per_chunk, K1 <- .toInteger(K1, function(x) x>=2) K2 <- .toInteger(K2, function(x) x>=2) nb_series_per_chunk <- .toInteger(nb_series_per_chunk, function(x) x>=1) - # K1 (number of clusters at step 1) cannot exceed nb_series_per_chunk, because we will need - # to load K1 series in memory for clustering stage 2. - if (K1 > nb_series_per_chunk) - stop("'K1' cannot exceed 'nb_series_per_chunk'") + nb_items_clust <- .toInteger(nb_items_clust, function(x) x>K1) random <- .toLogical(random) tryCatch( {ignored <- wavelets::wt.filter(wav_filt)}, error = function(e) stop("Invalid wavelet filter; see ?wavelets::wt.filter") ) @@ -247,14 +244,20 @@ claws <- function(getSeries, K1, K2, nb_series_per_chunk, # Initialize parallel runs: outfile="" allow to output verbose traces in the console # under Linux. All necessary variables are passed to the workers. cl = parallel::makeCluster(ncores_tasks, outfile="") - parallel::clusterExport(cl, envir = environment(), - varlist = c("getSeries","getContribs","K1","K2","algoClust1","algoClust2", - "nb_series_per_chunk","ncores_clust","nvoice","nbytes","endian","verbose","parll")) + varlist = c("ncores_clust","verbose","parll", #task 1 & 2 + "K1","getContribs","algoClust1","nb_items_clust") #task 1 + if (WER=="mix") + { + # Add variables for task 2 + varlist = c(varlist, "K2","getSeries","algoClust2","nb_series_per_chunk", + "nvoice","nbytes","endian") + } + parallel::clusterExport(cl, varlist, envir = environment()) } # This function achieves one complete clustering task, divided in stage 1 + stage 2. - # stage 1: n indices --> clusteringTask1(...) --> K1 medoids - # stage 2: K1 medoids --> clusteringTask2(...) --> K2 medoids, + # stage 1: n indices --> clusteringTask1(...) --> K1 medoids (indices) + # stage 2: K1 indices --> K1xK1 WER distances --> clusteringTask2(...) --> K2 medoids, # where n = N / ntasks, N being the total number of curves. runTwoStepClustering = function(inds) { @@ -263,7 +266,7 @@ claws <- function(getSeries, K1, K2, nb_series_per_chunk, if (parll && ntasks>1) require("epclust", quietly=TRUE) indices_medoids = clusteringTask1(inds, getContribs, K1, algoClust1, - nb_series_per_chunk, ncores_clust, verbose, parll) + nb_items_clust, ncores_clust, verbose, parll) if (WER=="mix") { indices_medoids = clusteringTask2(indices_medoids, getSeries, K2, algoClust2, @@ -280,8 +283,8 @@ claws <- function(getSeries, K1, K2, nb_series_per_chunk, cat(paste(message,"\n", sep="")) } - # As explained above, indices will be assigned to ntasks*K1 medoids indices [if WER=="end"], - # or nothing (empty vector) if WER=="mix"; in this case, synchrones are stored in a file. + # As explained above, we obtain after all runs ntasks*[K1 or K2] medoids indices, + # depending wether WER=="end" or "mix", respectively. indices_medoids_all <- if (parll && ntasks>1) unlist( parallel::parLapply(cl, indices_tasks, runTwoStepClustering) ) @@ -291,22 +294,26 @@ claws <- function(getSeries, K1, K2, nb_series_per_chunk, if (parll && ntasks>1) parallel::stopCluster(cl) - # Right before the final stage, input data still is the initial set of curves, referenced - # by the ntasks*[K1 or K2] medoids indices. + # For the last stage, ncores_tasks*(ncores_clusts+1) cores should be available: + # - ntasks for level 1 parallelism + # - ntasks*ncores_clust for level 2 parallelism, + # but since an extension MPI <--> tasks / OpenMP <--> sub-tasks is on the way, + # it's better to just re-use ncores_clust + ncores_last_stage <- ncores_clust - # Run last clustering tasks to obtain only K2 medoids indices, from ntasks*[K1 or K2] - # indices, depending wether WER=="end" or WER=="mix" + # Run last clustering tasks to obtain only K2 medoids indices if (verbose) cat("...Run final // stage 1 + stage 2\n") indices_medoids = clusteringTask1(indices_medoids_all, getContribs, K1, algoClust1, - nb_series_per_chunk, ncores_tasks*ncores_clust, verbose, parll) + nb_items_clust, ncores_tasks*ncores_clust, verbose, parll) indices_medoids = clusteringTask2(indices_medoids, getContribs, K2, algoClust2, - nb_series_per_chunk, nvoice, nbytes, endian, ncores_tasks*ncores_clust, verbose, parll) + nb_series_per_chunk, nvoice, nbytes, endian, ncores_last_stage, verbose, parll) - # Compute synchrones + # Compute synchrones, that is to say the cumulated power consumptions for each of the K2 + # final groups. medoids = getSeries(indices_medoids) synchrones = computeSynchrones(medoids, getSeries, nb_curves, nb_series_per_chunk, - ncores_tasks*ncores_clust, verbose, parll) + ncores_last_stage, verbose, parll) # NOTE: no need to use big.matrix here, since there are only K2 << K1 << N remaining curves list("medoids"=medoids, "ranks"=indices_medoids, "synchrones"=synchrones)