From bf5c08443087a23ea3d1a7ab993568e608a8b5dd Mon Sep 17 00:00:00 2001 From: Benjamin Auder Date: Wed, 8 Mar 2017 15:04:44 +0100 Subject: [PATCH] 'update' --- TODO | 2 ++ epclust/R/clustering.R | 38 +++++++++++++++++++++++++------------- epclust/R/main.R | 38 ++++++++++++++------------------------ 3 files changed, 41 insertions(+), 37 deletions(-) diff --git a/TODO b/TODO index 1788ce6..199a59f 100644 --- a/TODO +++ b/TODO @@ -45,3 +45,5 @@ utiliser Rcpp ? #Alternative: use bigmemory to share series when CSV or matrix(...) #' @importFrom synchronicity boost.mutex lock unlock + +subtree: epclust, shared. This root folder should remain private diff --git a/epclust/R/clustering.R b/epclust/R/clustering.R index 3993e76..f5e497f 100644 --- a/epclust/R/clustering.R +++ b/epclust/R/clustering.R @@ -6,11 +6,13 @@ #' #' @description \code{clusteringTask1()} runs one full stage-1 task, which consists in #' iterated stage 1 clustering (on nb_curves / ntasks energy contributions, computed -#' through discrete wavelets coefficients). \code{computeClusters1()} and -#' \code{computeClusters2()} correspond to the atomic clustering procedures respectively -#' for stage 1 and 2. The former applies the clustering algorithm (PAM) on a -#' contributions matrix, while the latter clusters a chunk of series inside one task -#' (~max nb_series_per_chunk) +#' through discrete wavelets coefficients). +#' \code{clusteringTask2()} runs a full stage-2 task, which consists in synchrones +#' and then WER distances computations, before applying the clustering algorithm. +#' \code{computeClusters1()} and \code{computeClusters2()} correspond to the atomic +#' clustering procedures respectively for stage 1 and 2. The former applies the +#' clustering algorithm (PAM) on a contributions matrix, while the latter clusters +#' a chunk of series inside one task (~max nb_series_per_chunk) #' #' @param indices Range of series indices to cluster in parallel (initial data) #' @param getContribs Function to retrieve contributions from initial series indices: @@ -62,21 +64,31 @@ clusteringTask1 = function( #' @rdname clustering #' @export -computeClusters1 = function(contribs, K1) - cluster::pam(contribs, K1, diss=FALSE)$id.med - -#' @rdname clustering -#' @export -computeClusters2 = function(medoids, K2, +clusteringTask2 = function(medoids, K2, getRefSeries, nb_ref_curves, nb_series_per_chunk, ncores_clust=1,verbose=FALSE,parll=TRUE) { + if (nrow(medoids) <= K2) + return (medoids) synchrones = computeSynchrones(medoids, getRefSeries, nb_ref_curves, nb_series_per_chunk, ncores_clust, verbose, parll) distances = computeWerDists(synchrones, ncores_clust, verbose, parll) - #TODO: if PAM cannot take big.matrix in input, cast it before... (more than OK in RAM) - medoids[ cluster::pam(distances, K2, diss=TRUE)$medoids , ] + # PAM in package 'cluster' cannot take big.matrix in input: need to cast it + mat_dists = matrix(nrow=K1, ncol=K1) + for (i in seq_len(K1)) + mat_dists[i,] = distances[i,] + medoids[ computeClusters2(mat_dists,K2), ] } +#' @rdname clustering +#' @export +computeClusters1 = function(contribs, K1) + cluster::pam(contribs, K1, diss=FALSE)$id.med + +#' @rdname clustering +#' @export +computeClusters2 = function(distances, K2) + cluster::pam(distances, K2, diss=TRUE)$id.med + #' computeSynchrones #' #' Compute the synchrones curves (sum of clusters elements) from a matrix of medoids, diff --git a/epclust/R/main.R b/epclust/R/main.R index 892c64c..2037dbe 100644 --- a/epclust/R/main.R +++ b/epclust/R/main.R @@ -7,8 +7,9 @@ #' @param getSeries Access to the (time-)series, which can be of one of the three #' following types: #' \itemize{ -#' \item matrix: each line contains all the values for one time-serie, ordered by time -#' \item connection: any R connection object (e.g. a file) providing lines as described above +#' \item [big.]matrix: each line contains all the values for one time-serie, ordered by time +#' \item connection: any R connection object providing lines as described above +#' \item character: name of a CSV file containing series in rows (no header) #' \item function: a custom way to retrieve the curves; it has only one argument: #' the indices of the series to be retrieved. See examples #' } @@ -32,7 +33,7 @@ #' @param verbose Level of verbosity (0/FALSE for nothing or 1/TRUE for all; devel stage) #' @param parll TRUE to fully parallelize; otherwise run sequentially (debug, comparison) #' -#' @return A matrix of the final medoids curves (K2) in rows +#' @return A big.matrix of the final medoids curves (K2) in rows #' #' @examples #' \dontrun{ @@ -163,22 +164,14 @@ claws = function(getSeries, K1, K2, runTwoStepClustering = function(inds) { - if (parll) + if (parll && ntasks>1) require("epclust", quietly=TRUE) indices_medoids = clusteringTask1( inds, getContribs, K1, nb_series_per_chunk, ncores_clust, verbose, parll) if (WER=="mix") { - - - - -#TODO: getSeries(indices_medoids) BAD ; il faudrait une big.matrix de medoids en entree - #OK en RAM il y en aura 1000 (donc 1000*K1*17519... OK) - #...mais du coup chaque process ne re-dupliquera pas medoids - - - medoids2 = computeClusters2(getSeries(indices_medoids), + medoids1 = bigmemory::as.big.matrix( getSeries(indices_medoids) ) + medoids2 = clusteringTask2(medoids1, K2, getSeries, nb_curves, nb_series_per_chunk, ncores_clust, verbose, parll) binarize(medoids2, synchrones_file, nb_series_per_chunk, sep, nbytes, endian) return (vector("integer",0)) @@ -196,22 +189,22 @@ claws = function(getSeries, K1, K2, cat(paste("...Run ",ntasks," x stage 1 in parallel\n",sep="")) if (WER=="mix") {synchrones_file = paste(bin_dir,"synchrones",sep="") ; unlink(synchrones_file)} - if (parll) + if (parll && ntasks>1) { cl = parallel::makeCluster(ncores_tasks) varlist = c("getSeries","getContribs","K1","K2","verbose","parll", - "nb_series_per_chunk","ncores_clust","sep","nbytes","endian") + "nb_series_per_chunk","ntasks","ncores_clust","sep","nbytes","endian") if (WER=="mix") varlist = c(varlist, "synchrones_file") parallel::clusterExport(cl, varlist=varlist, envir = environment()) } # 1000*K1 indices [if WER=="end"], or empty vector [if WER=="mix"] --> series on file - if (parll) + if (parll && ntasks>1) indices = unlist( parallel::parLapply(cl, indices_tasks, runTwoStepClustering) ) else indices = unlist( lapply(indices_tasks, runTwoStepClustering) ) - if (parll) + if (parll && ntasks>1) parallel::stopCluster(cl) getRefSeries = getSeries @@ -230,22 +223,19 @@ claws = function(getSeries, K1, K2, contribs_file, nb_series_per_chunk, nbytes, endian) } - - -#TODO: if ntasks==1, c'est deja terminé - # Run step2 on resulting indices or series (from file) if (verbose) cat("...Run final // stage 1 + stage 2\n") indices_medoids = clusteringTask1( indices, getContribs, K1, nb_series_per_chunk, ncores_tasks*ncores_clust, verbose, parll) - medoids = computeClusters2(getSeries(indices_medoids), K2, + medoids1 = bigmemory::as.big.matrix( getSeries(indices_medoids) ) + medoids2 = computeClusters2(medoids1, K2, getRefSeries, nb_curves, nb_series_per_chunk, ncores_tasks*ncores_clust, verbose, parll) # Cleanup unlink(bin_dir, recursive=TRUE) - medoids + medoids2 } #' curvesToContribs -- 2.44.0