From c45fd66342e40c8b5387fc6f0059c4d3a9718340 Mon Sep 17 00:00:00 2001 From: Benjamin Auder Date: Wed, 8 Mar 2017 03:12:36 +0100 Subject: [PATCH 01/16] parallel version running; TODO: check==sequential, plotting routines, parser; check memory --- epclust/R/clustering.R | 31 ++++++++++++++----------------- epclust/R/main.R | 37 ++++++++++++++++++++----------------- epclust/R/plot.R | 3 +++ 3 files changed, 37 insertions(+), 34 deletions(-) create mode 100644 epclust/R/plot.R diff --git a/epclust/R/clustering.R b/epclust/R/clustering.R index 14e1f83..9a55495 100644 --- a/epclust/R/clustering.R +++ b/epclust/R/clustering.R @@ -93,6 +93,8 @@ computeSynchrones = function(medoids, getRefSeries, { computeSynchronesChunk = function(indices) { + if (verbose) + cat(paste("--- Compute synchrones for ",length(indices)," lines\n", sep="")) ref_series = getRefSeries(indices) #get medoids indices for this chunk of series for (i in seq_len(nrow(ref_series))) @@ -118,17 +120,14 @@ computeSynchrones = function(medoids, getRefSeries, m <- synchronicity::boost.mutex() indices_workers = .spreadIndices(seq_len(nb_ref_curves), nb_series_per_chunk) - for (inds in indices_workers) - { - if (verbose) - cat(paste("--- Compute synchrones for ",length(inds)," lines\n", sep="")) + ignored <- if (parll) - ignored <- parallel::mcparallel(computeSynchronesChunk(inds)) + { + parallel::mclapply(indices_workers, computeSynchronesChunk, + mc.cores=ncores_clust, mc.allow.recursive=FALSE) + } else - computeSynchronesChunk(inds) - } - if (parll) - parallel::mccollect() + lapply(indices_workers, computeSynchronesChunk) mat_syncs = matrix(nrow=K, ncol=ncol(medoids)) vec_count = rep(NA, K) @@ -234,18 +233,16 @@ computeWerDists = function(synchrones, ncores_clust=1,verbose=FALSE,parll=TRUE) if (parll) m <- synchronicity::boost.mutex() - for (i in 1:(n-1)) - { + ignored <- if (parll) - ignored <- parallel::mcparallel(computeDistancesLineI(i)) + { + parallel::mclapply(seq_len(n-1), computeDistancesLineI, + mc.cores=ncores_clust, mc.allow.recursive=FALSE) + } else - computeDistancesLineI(i) - } + lapply(seq_len(n-1), computeDistancesLineI) Xwer_dist[n,n] = 0. - if (parll) - parallel::mccollect() - mat_dists = matrix(nrow=n, ncol=n) #TODO: avoid this loop? for (i in 1:n) diff --git a/epclust/R/main.R b/epclust/R/main.R index bf31c87..bba0618 100644 --- a/epclust/R/main.R +++ b/epclust/R/main.R @@ -161,22 +161,6 @@ claws = function(getSeries, K1, K2, if (nb_series_per_task < min_series_per_chunk) stop("Too many tasks: less series in one task than min_series_per_chunk!") - # Cluster contributions in parallel (by nb_series_per_chunk) - indices_all = if (random) sample(nb_curves) else seq_len(nb_curves) - indices_tasks = lapply(seq_len(ntasks), function(i) { - upper_bound = ifelse( i series on file if (parll) indices = unlist( parallel::parLapply(cl, indices_tasks, runTwoStepClustering) ) @@ -202,7 +206,6 @@ claws = function(getSeries, K1, K2, parallel::stopCluster(cl) getRefSeries = getSeries - synchrones_file = paste(bin_dir,"synchrones",sep="") ; unlink(synchrones_file) if (WER=="mix") { indices = seq_len(ntasks*K2) diff --git a/epclust/R/plot.R b/epclust/R/plot.R new file mode 100644 index 0000000..096159d --- /dev/null +++ b/epclust/R/plot.R @@ -0,0 +1,3 @@ +#TODO: some visualization +#for (i in 1:6) {plot(medoids_ascii[i,1:200],type="l",ylim=c(-5,5),col=i);par(new=TRUE)} +#... -- 2.44.0 From 24ed5d835e2eebaaa4d5f8296f8d2e2132cc6398 Mon Sep 17 00:00:00 2001 From: Benjamin Auder Date: Wed, 8 Mar 2017 11:52:39 +0100 Subject: [PATCH 02/16] add some TODOs --- epclust/R/clustering.R | 82 +++++++++++++++++++---------- epclust/R/main.R | 9 ++++ epclust/src/WER.c | 117 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 180 insertions(+), 28 deletions(-) create mode 100644 epclust/src/WER.c diff --git a/epclust/R/clustering.R b/epclust/R/clustering.R index 9a55495..3993e76 100644 --- a/epclust/R/clustering.R +++ b/epclust/R/clustering.R @@ -21,7 +21,7 @@ #' #' @return For \code{clusteringTask1()} and \code{computeClusters1()}, the indices of the #' computed (K1) medoids. Indices are irrelevant for stage 2 clustering, thus -#' \code{computeClusters2()} outputs a matrix of medoids +#' \code{computeClusters2()} outputs a big.matrix of medoids #' (of size limited by nb_series_per_chunk) NULL @@ -73,6 +73,7 @@ computeClusters2 = function(medoids, K2, synchrones = computeSynchrones(medoids, getRefSeries, nb_ref_curves, nb_series_per_chunk, ncores_clust, verbose, parll) distances = computeWerDists(synchrones, ncores_clust, verbose, parll) + #TODO: if PAM cannot take big.matrix in input, cast it before... (more than OK in RAM) medoids[ cluster::pam(distances, K2, diss=TRUE)$medoids , ] } @@ -81,16 +82,27 @@ computeClusters2 = function(medoids, K2, #' Compute the synchrones curves (sum of clusters elements) from a matrix of medoids, #' using L2 distances. #' -#' @param medoids Matrix of medoids (curves of same legnth as initial series) +#' @param medoids big.matrix of medoids (curves of same length as initial series) #' @param getRefSeries Function to retrieve initial series (e.g. in stage 2 after series #' have been replaced by stage-1 medoids) #' @param nb_ref_curves How many reference series? (This number is known at this stage) #' @inheritParams claws #' +#' @return A big.matrix of size K1 x L where L = data_length +#' #' @export computeSynchrones = function(medoids, getRefSeries, nb_ref_curves, nb_series_per_chunk, ncores_clust=1,verbose=FALSE,parll=TRUE) { + + + +#TODO: si parll, getMedoids + serialization, pass only getMedoids to nodes +# --> BOF... chaque node chargera tous les medoids (efficacité) :/ ==> faut que ça tienne en RAM +#au pire :: C-ifier et charger medoids 1 by 1... + + #MIEUX :: medoids DOIT etre une big.matrix partagée ! + computeSynchronesChunk = function(indices) { if (verbose) @@ -111,36 +123,43 @@ computeSynchrones = function(medoids, getRefSeries, K = nrow(medoids) # Use bigmemory (shared==TRUE by default) + synchronicity to fill synchrones in // + # TODO: if size > RAM (not our case), use file-backed big.matrix synchrones = bigmemory::big.matrix(nrow=K,ncol=ncol(medoids),type="double",init=0.) counts = bigmemory::big.matrix(nrow=K,ncol=1,type="double",init=0) - # Fork (// run) only on Linux & MacOS; on Windows: run sequentially + # synchronicity is only for Linux & MacOS; on Windows: run sequentially parll = (requireNamespace("synchronicity",quietly=TRUE) && parll && Sys.info()['sysname'] != "Windows") if (parll) m <- synchronicity::boost.mutex() + if (parll) + { + cl = parallel::makeCluster(ncores_clust) + parallel::clusterExport(cl, + varlist=c("synchrones","counts","verbose","medoids","getRefSeries"), + envir=environment()) + } + indices_workers = .spreadIndices(seq_len(nb_ref_curves), nb_series_per_chunk) ignored <- if (parll) - { - parallel::mclapply(indices_workers, computeSynchronesChunk, - mc.cores=ncores_clust, mc.allow.recursive=FALSE) - } + parallel::parLapply(indices_workers, computeSynchronesChunk) else lapply(indices_workers, computeSynchronesChunk) - mat_syncs = matrix(nrow=K, ncol=ncol(medoids)) - vec_count = rep(NA, K) - #TODO: can we avoid this loop? + if (parll) + parallel::stopCluster(cl) + + #TODO: can we avoid this loop? ( synchrones = sweep(synchrones, 1, counts, '/') ) for (i in seq_len(K)) - { - mat_syncs[i,] = synchrones[i,] - vec_count[i] = counts[i,1] - } + synchrones[i,] = synchrones[i,] / counts[i,1] #NOTE: odds for some clusters to be empty? (when series already come from stage 2) # ...maybe; but let's hope resulting K1' be still quite bigger than K2 - mat_syncs = sweep(mat_syncs, 1, vec_count, '/') - mat_syncs[ sapply(seq_len(K), function(i) all(!is.nan(mat_syncs[i,]))) , ] + noNA_rows = sapply(seq_len(K), function(i) all(!is.nan(synchrones[i,]))) + if (all(noNA_rows)) + return (synchrones) + # Else: some clusters are empty, need to slice synchrones + synchrones[noNA_rows,] } #' computeWerDists @@ -148,13 +167,21 @@ computeSynchrones = function(medoids, getRefSeries, #' Compute the WER distances between the synchrones curves (in rows), which are #' returned (e.g.) by \code{computeSynchrones()} #' -#' @param synchrones A matrix of synchrones, in rows. The series have same length as the -#' series in the initial dataset +#' @param synchrones A big.matrix of synchrones, in rows. The series have same length +#' as the series in the initial dataset #' @inheritParams claws #' +#' @return A big.matrix of size K1 x K1 +#' #' @export computeWerDists = function(synchrones, ncores_clust=1,verbose=FALSE,parll=TRUE) { + + + +#TODO: re-organize to call computeWerDist(x,y) [C] (in //?) from two indices + big.matrix + + n <- nrow(synchrones) delta <- ncol(synchrones) #TODO: automatic tune of all these parameters ? (for other users) @@ -163,7 +190,7 @@ computeWerDists = function(synchrones, ncores_clust=1,verbose=FALSE,parll=TRUE) noctave = 13 # 4 here represent 2^5 = 32 half-hours ~ 1 day #NOTE: default scalevector == 2^(0:(noctave * nvoice) / nvoice) * s0 (?) - scalevector <- 2^(4:(noctave * nvoice) / nvoice) * 2 + scalevector <- 2^(4:(noctave * nvoice) / nvoice + 1) #condition: ( log2(s0*w0/(2*pi)) - 1 ) * nvoice + 1.5 >= 1 s0=2 w0=2*pi @@ -176,7 +203,7 @@ computeWerDists = function(synchrones, ncores_clust=1,verbose=FALSE,parll=TRUE) if (verbose) cat(paste("+++ Compute Rwave::cwt() on serie ",i,"\n", sep="")) ts <- scale(ts(synchrones[i,]), center=TRUE, scale=scaled) - totts.cwt = Rwave::cwt(ts,totnoct,nvoice,w0,plot=0) + totts.cwt = Rwave::cwt(ts, totnoct, nvoice, w0, plot=FALSE) ts.cwt = totts.cwt[,s0log:(s0log+noctave*nvoice)] #Normalization sqs <- sqrt(2^(0:(noctave*nvoice)/nvoice)*s0) @@ -192,7 +219,8 @@ computeWerDists = function(synchrones, ncores_clust=1,verbose=FALSE,parll=TRUE) envir=environment()) } - # (normalized) observations node with CWT + # list of CWT from synchrones + # TODO: fit in RAM, OK? If not, 2 options: serialize, compute individual distances Xcwt4 <- if (parll) parallel::parLapply(cl, seq_len(n), computeCWT) @@ -207,6 +235,9 @@ computeWerDists = function(synchrones, ncores_clust=1,verbose=FALSE,parll=TRUE) if (verbose) cat("*** Compute WER distances from CWT\n") + #TODO: computeDistances(i,j), et répartir les n(n-1)/2 couples d'indices + #là c'est trop déséquilibré + computeDistancesLineI = function(i) { if (verbose) @@ -217,7 +248,7 @@ computeWerDists = function(synchrones, ncores_clust=1,verbose=FALSE,parll=TRUE) num <- filter(Mod(Xcwt4[[i]] * Conj(Xcwt4[[j]])), fcoefs, circular=TRUE) WX <- filter(Mod(Xcwt4[[i]] * Conj(Xcwt4[[i]])), fcoefs, circular=TRUE) WY <- filter(Mod(Xcwt4[[j]] * Conj(Xcwt4[[j]])), fcoefs, circular=TRUE) - wer2 <- sum(colSums(num)^2) / sum( sum(colSums(WX) * colSums(WY)) ) + wer2 <- sum(colSums(num)^2) / sum( sum(colSums(WX) * colSums(WY)) ) if (parll) synchronicity::lock(m) Xwer_dist[i,j] <- sqrt(delta * ncol(Xcwt4[[1]]) * (1 - wer2)) @@ -242,12 +273,7 @@ computeWerDists = function(synchrones, ncores_clust=1,verbose=FALSE,parll=TRUE) else lapply(seq_len(n-1), computeDistancesLineI) Xwer_dist[n,n] = 0. - - mat_dists = matrix(nrow=n, ncol=n) - #TODO: avoid this loop? - for (i in 1:n) - mat_dists[i,] = Xwer_dist[i,] - mat_dists + Xwer_dist } # Helper function to divide indices into balanced sets diff --git a/epclust/R/main.R b/epclust/R/main.R index bba0618..f1e435c 100644 --- a/epclust/R/main.R +++ b/epclust/R/main.R @@ -169,6 +169,15 @@ claws = function(getSeries, K1, K2, inds, getContribs, K1, nb_series_per_chunk, ncores_clust, verbose, parll) if (WER=="mix") { + + + + +#TODO: getSeries(indices_medoids) BAD ; il faudrait une big.matrix de medoids en entree + #OK en RAM il y en aura 1000 (donc 1000*K1*17519... OK) + #...mais du coup chaque process ne re-dupliquera pas medoids + + medoids2 = computeClusters2(getSeries(indices_medoids), K2, getSeries, nb_curves, nb_series_per_chunk, ncores_clust, verbose, parll) binarize(medoids2, synchrones_file, nb_series_per_chunk, sep, nbytes, endian) diff --git a/epclust/src/WER.c b/epclust/src/WER.c new file mode 100644 index 0000000..36bfba7 --- /dev/null +++ b/epclust/src/WER.c @@ -0,0 +1,117 @@ +#include +#include +#include + +#ifndef M_PI +#define M_PI 3.14159265358979323846 +#endif + +// n: number of synchrones, m: length of a synchrone +float computeWerDist(float* s1, float* s2, int n, int m) +{ + //TODO: automatic tune of all these parameters ? (for other users) + int nvoice = 4; + //noctave 2^13 = 8192 half hours ~ 180 days ; ~log2(ncol(synchrones)) + int noctave = 13 + // 4 here represent 2^5 = 32 half-hours ~ 1 day + //NOTE: default scalevector == 2^(0:(noctave * nvoice) / nvoice) * s0 (?) + //R: scalevector <- 2^(4:(noctave * nvoice) / nvoice + 1) + int* scalevector = (int*)malloc( (noctave*nvoice-4 + 1) * sizeof(int)) + for (int i=4; i<=noctave*nvoice; i++) + scalevector[i-4] = pow(2., (float)i/nvoice + 1.); + //condition: ( log2(s0*w0/(2*pi)) - 1 ) * nvoice + 1.5 >= 1 + int s0 = 2; + double w0 = 2*M_PI; + bool scaled = false; + int s0log = as.integer( (log2( s0*w0/(2*pi) ) - 1) * nvoice + 1.5 ) + int totnoct = noctave + as.integer(s0log/nvoice) + 1 + + + + + +///TODO: continue + + + + computeCWT = function(i) + { + if (verbose) + cat(paste("+++ Compute Rwave::cwt() on serie ",i,"\n", sep="")) + ts <- scale(ts(synchrones[i,]), center=TRUE, scale=scaled) + totts.cwt = Rwave::cwt(ts,totnoct,nvoice,w0,plot=0) + ts.cwt = totts.cwt[,s0log:(s0log+noctave*nvoice)] + #Normalization + sqs <- sqrt(2^(0:(noctave*nvoice)/nvoice)*s0) + sqres <- sweep(ts.cwt,2,sqs,'*') + sqres / max(Mod(sqres)) + } + + if (parll) + { + cl = parallel::makeCluster(ncores_clust) + parallel::clusterExport(cl, + varlist=c("synchrones","totnoct","nvoice","w0","s0log","noctave","s0","verbose"), + envir=environment()) + } + + # (normalized) observations node with CWT + Xcwt4 <- + if (parll) + parallel::parLapply(cl, seq_len(n), computeCWT) + else + lapply(seq_len(n), computeCWT) + + if (parll) + parallel::stopCluster(cl) + + Xwer_dist <- bigmemory::big.matrix(nrow=n, ncol=n, type="double") + fcoefs = rep(1/3, 3) #moving average on 3 values (TODO: very slow! correct?!) + if (verbose) + cat("*** Compute WER distances from CWT\n") + + #TODO: computeDistances(i,j), et répartir les n(n-1)/2 couples d'indices + #là c'est trop déséquilibré + + computeDistancesLineI = function(i) + { + if (verbose) + cat(paste(" Line ",i,"\n", sep="")) + for (j in (i+1):n) + { + #TODO: 'circular=TRUE' is wrong, should just take values on the sides; to rewrite in C + num <- filter(Mod(Xcwt4[[i]] * Conj(Xcwt4[[j]])), fcoefs, circular=TRUE) + WX <- filter(Mod(Xcwt4[[i]] * Conj(Xcwt4[[i]])), fcoefs, circular=TRUE) + WY <- filter(Mod(Xcwt4[[j]] * Conj(Xcwt4[[j]])), fcoefs, circular=TRUE) + wer2 <- sum(colSums(num)^2) / sum( sum(colSums(WX) * colSums(WY)) ) + if (parll) + synchronicity::lock(m) + Xwer_dist[i,j] <- sqrt(delta * ncol(Xcwt4[[1]]) * (1 - wer2)) + Xwer_dist[j,i] <- Xwer_dist[i,j] + if (parll) + synchronicity::unlock(m) + } + Xwer_dist[i,i] = 0. + } + + parll = (requireNamespace("synchronicity",quietly=TRUE) + && parll && Sys.info()['sysname'] != "Windows") + if (parll) + m <- synchronicity::boost.mutex() + + ignored <- + if (parll) + { + parallel::mclapply(seq_len(n-1), computeDistancesLineI, + mc.cores=ncores_clust, mc.allow.recursive=FALSE) + } + else + lapply(seq_len(n-1), computeDistancesLineI) + Xwer_dist[n,n] = 0. + + mat_dists = matrix(nrow=n, ncol=n) + #TODO: avoid this loop? + for (i in 1:n) + mat_dists[i,] = Xwer_dist[i,] + mat_dists + -- 2.44.0 From 95b5c2e621af8949c5a5eed287d451817c16c24e Mon Sep 17 00:00:00 2001 From: Benjamin Auder Date: Wed, 8 Mar 2017 12:43:45 +0100 Subject: [PATCH 03/16] 'update' --- epclust/R/main.R | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/epclust/R/main.R b/epclust/R/main.R index f1e435c..892c64c 100644 --- a/epclust/R/main.R +++ b/epclust/R/main.R @@ -144,7 +144,7 @@ claws = function(getSeries, K1, K2, getSeries = function(inds) getDataInFile(inds, series_file, nbytes, endian) } - # Serialize all computed wavelets contributions onto a file + # Serialize all computed wavelets contributions into a file contribs_file = paste(bin_dir,"contribs",sep="") ; unlink(contribs_file) index = 1 nb_curves = 0 @@ -230,6 +230,10 @@ claws = function(getSeries, K1, K2, contribs_file, nb_series_per_chunk, nbytes, endian) } + + +#TODO: if ntasks==1, c'est deja terminé + # Run step2 on resulting indices or series (from file) if (verbose) cat("...Run final // stage 1 + stage 2\n") -- 2.44.0 From bf5c08443087a23ea3d1a7ab993568e608a8b5dd Mon Sep 17 00:00:00 2001 From: Benjamin Auder Date: Wed, 8 Mar 2017 15:04:44 +0100 Subject: [PATCH 04/16] 'update' --- TODO | 2 ++ epclust/R/clustering.R | 38 +++++++++++++++++++++++++------------- epclust/R/main.R | 38 ++++++++++++++------------------------ 3 files changed, 41 insertions(+), 37 deletions(-) diff --git a/TODO b/TODO index 1788ce6..199a59f 100644 --- a/TODO +++ b/TODO @@ -45,3 +45,5 @@ utiliser Rcpp ? #Alternative: use bigmemory to share series when CSV or matrix(...) #' @importFrom synchronicity boost.mutex lock unlock + +subtree: epclust, shared. This root folder should remain private diff --git a/epclust/R/clustering.R b/epclust/R/clustering.R index 3993e76..f5e497f 100644 --- a/epclust/R/clustering.R +++ b/epclust/R/clustering.R @@ -6,11 +6,13 @@ #' #' @description \code{clusteringTask1()} runs one full stage-1 task, which consists in #' iterated stage 1 clustering (on nb_curves / ntasks energy contributions, computed -#' through discrete wavelets coefficients). \code{computeClusters1()} and -#' \code{computeClusters2()} correspond to the atomic clustering procedures respectively -#' for stage 1 and 2. The former applies the clustering algorithm (PAM) on a -#' contributions matrix, while the latter clusters a chunk of series inside one task -#' (~max nb_series_per_chunk) +#' through discrete wavelets coefficients). +#' \code{clusteringTask2()} runs a full stage-2 task, which consists in synchrones +#' and then WER distances computations, before applying the clustering algorithm. +#' \code{computeClusters1()} and \code{computeClusters2()} correspond to the atomic +#' clustering procedures respectively for stage 1 and 2. The former applies the +#' clustering algorithm (PAM) on a contributions matrix, while the latter clusters +#' a chunk of series inside one task (~max nb_series_per_chunk) #' #' @param indices Range of series indices to cluster in parallel (initial data) #' @param getContribs Function to retrieve contributions from initial series indices: @@ -62,21 +64,31 @@ clusteringTask1 = function( #' @rdname clustering #' @export -computeClusters1 = function(contribs, K1) - cluster::pam(contribs, K1, diss=FALSE)$id.med - -#' @rdname clustering -#' @export -computeClusters2 = function(medoids, K2, +clusteringTask2 = function(medoids, K2, getRefSeries, nb_ref_curves, nb_series_per_chunk, ncores_clust=1,verbose=FALSE,parll=TRUE) { + if (nrow(medoids) <= K2) + return (medoids) synchrones = computeSynchrones(medoids, getRefSeries, nb_ref_curves, nb_series_per_chunk, ncores_clust, verbose, parll) distances = computeWerDists(synchrones, ncores_clust, verbose, parll) - #TODO: if PAM cannot take big.matrix in input, cast it before... (more than OK in RAM) - medoids[ cluster::pam(distances, K2, diss=TRUE)$medoids , ] + # PAM in package 'cluster' cannot take big.matrix in input: need to cast it + mat_dists = matrix(nrow=K1, ncol=K1) + for (i in seq_len(K1)) + mat_dists[i,] = distances[i,] + medoids[ computeClusters2(mat_dists,K2), ] } +#' @rdname clustering +#' @export +computeClusters1 = function(contribs, K1) + cluster::pam(contribs, K1, diss=FALSE)$id.med + +#' @rdname clustering +#' @export +computeClusters2 = function(distances, K2) + cluster::pam(distances, K2, diss=TRUE)$id.med + #' computeSynchrones #' #' Compute the synchrones curves (sum of clusters elements) from a matrix of medoids, diff --git a/epclust/R/main.R b/epclust/R/main.R index 892c64c..2037dbe 100644 --- a/epclust/R/main.R +++ b/epclust/R/main.R @@ -7,8 +7,9 @@ #' @param getSeries Access to the (time-)series, which can be of one of the three #' following types: #' \itemize{ -#' \item matrix: each line contains all the values for one time-serie, ordered by time -#' \item connection: any R connection object (e.g. a file) providing lines as described above +#' \item [big.]matrix: each line contains all the values for one time-serie, ordered by time +#' \item connection: any R connection object providing lines as described above +#' \item character: name of a CSV file containing series in rows (no header) #' \item function: a custom way to retrieve the curves; it has only one argument: #' the indices of the series to be retrieved. See examples #' } @@ -32,7 +33,7 @@ #' @param verbose Level of verbosity (0/FALSE for nothing or 1/TRUE for all; devel stage) #' @param parll TRUE to fully parallelize; otherwise run sequentially (debug, comparison) #' -#' @return A matrix of the final medoids curves (K2) in rows +#' @return A big.matrix of the final medoids curves (K2) in rows #' #' @examples #' \dontrun{ @@ -163,22 +164,14 @@ claws = function(getSeries, K1, K2, runTwoStepClustering = function(inds) { - if (parll) + if (parll && ntasks>1) require("epclust", quietly=TRUE) indices_medoids = clusteringTask1( inds, getContribs, K1, nb_series_per_chunk, ncores_clust, verbose, parll) if (WER=="mix") { - - - - -#TODO: getSeries(indices_medoids) BAD ; il faudrait une big.matrix de medoids en entree - #OK en RAM il y en aura 1000 (donc 1000*K1*17519... OK) - #...mais du coup chaque process ne re-dupliquera pas medoids - - - medoids2 = computeClusters2(getSeries(indices_medoids), + medoids1 = bigmemory::as.big.matrix( getSeries(indices_medoids) ) + medoids2 = clusteringTask2(medoids1, K2, getSeries, nb_curves, nb_series_per_chunk, ncores_clust, verbose, parll) binarize(medoids2, synchrones_file, nb_series_per_chunk, sep, nbytes, endian) return (vector("integer",0)) @@ -196,22 +189,22 @@ claws = function(getSeries, K1, K2, cat(paste("...Run ",ntasks," x stage 1 in parallel\n",sep="")) if (WER=="mix") {synchrones_file = paste(bin_dir,"synchrones",sep="") ; unlink(synchrones_file)} - if (parll) + if (parll && ntasks>1) { cl = parallel::makeCluster(ncores_tasks) varlist = c("getSeries","getContribs","K1","K2","verbose","parll", - "nb_series_per_chunk","ncores_clust","sep","nbytes","endian") + "nb_series_per_chunk","ntasks","ncores_clust","sep","nbytes","endian") if (WER=="mix") varlist = c(varlist, "synchrones_file") parallel::clusterExport(cl, varlist=varlist, envir = environment()) } # 1000*K1 indices [if WER=="end"], or empty vector [if WER=="mix"] --> series on file - if (parll) + if (parll && ntasks>1) indices = unlist( parallel::parLapply(cl, indices_tasks, runTwoStepClustering) ) else indices = unlist( lapply(indices_tasks, runTwoStepClustering) ) - if (parll) + if (parll && ntasks>1) parallel::stopCluster(cl) getRefSeries = getSeries @@ -230,22 +223,19 @@ claws = function(getSeries, K1, K2, contribs_file, nb_series_per_chunk, nbytes, endian) } - - -#TODO: if ntasks==1, c'est deja terminé - # Run step2 on resulting indices or series (from file) if (verbose) cat("...Run final // stage 1 + stage 2\n") indices_medoids = clusteringTask1( indices, getContribs, K1, nb_series_per_chunk, ncores_tasks*ncores_clust, verbose, parll) - medoids = computeClusters2(getSeries(indices_medoids), K2, + medoids1 = bigmemory::as.big.matrix( getSeries(indices_medoids) ) + medoids2 = computeClusters2(medoids1, K2, getRefSeries, nb_curves, nb_series_per_chunk, ncores_tasks*ncores_clust, verbose, parll) # Cleanup unlink(bin_dir, recursive=TRUE) - medoids + medoids2 } #' curvesToContribs -- 2.44.0 From 1a1196f21036a321710f848d4cb28e6677f24904 Mon Sep 17 00:00:00 2001 From: Benjamin Auder Date: Wed, 8 Mar 2017 15:45:31 +0100 Subject: [PATCH 05/16] 'update' --- epclust/R/clustering.R | 9 --------- 1 file changed, 9 deletions(-) diff --git a/epclust/R/clustering.R b/epclust/R/clustering.R index f5e497f..cda7fbe 100644 --- a/epclust/R/clustering.R +++ b/epclust/R/clustering.R @@ -106,15 +106,6 @@ computeClusters2 = function(distances, K2) computeSynchrones = function(medoids, getRefSeries, nb_ref_curves, nb_series_per_chunk, ncores_clust=1,verbose=FALSE,parll=TRUE) { - - - -#TODO: si parll, getMedoids + serialization, pass only getMedoids to nodes -# --> BOF... chaque node chargera tous les medoids (efficacité) :/ ==> faut que ça tienne en RAM -#au pire :: C-ifier et charger medoids 1 by 1... - - #MIEUX :: medoids DOIT etre une big.matrix partagée ! - computeSynchronesChunk = function(indices) { if (verbose) -- 2.44.0 From e161499b97c782aadfc287c22b55f85724f86fae Mon Sep 17 00:00:00 2001 From: Benjamin Auder Date: Wed, 8 Mar 2017 21:50:53 +0100 Subject: [PATCH 06/16] improvements --- .gitignore | 4 + epclust/DESCRIPTION | 3 +- epclust/R/a_NAMESPACE.R | 7 +- epclust/R/clustering.R | 168 ++++++++++++----------- epclust/R/main.R | 9 +- epclust/src/WER.c | 117 ---------------- epclust/src/computeMedoidsIndices.c | 51 +++++++ epclust/src/filter.c | 37 +++++ epclust/tests/testthat/test.clustering.R | 109 ++++++++++----- 9 files changed, 266 insertions(+), 239 deletions(-) delete mode 100644 epclust/src/WER.c create mode 100644 epclust/src/computeMedoidsIndices.c create mode 100644 epclust/src/filter.c diff --git a/.gitignore b/.gitignore index af4c22c..3a326ea 100644 --- a/.gitignore +++ b/.gitignore @@ -32,3 +32,7 @@ #ignore jupyter generated file (HTML vignette, and reports) *.ipynb.html + +#ignore object files +*.o +*.so diff --git a/epclust/DESCRIPTION b/epclust/DESCRIPTION index 304fdff..1f2b5ea 100644 --- a/epclust/DESCRIPTION +++ b/epclust/DESCRIPTION @@ -30,8 +30,9 @@ Suggests: DBI License: MIT + file LICENSE RoxygenNote: 6.0.1 -Collate: +Collate: 'main.R' 'clustering.R' 'de_serialize.R' 'a_NAMESPACE.R' + 'plot.R' diff --git a/epclust/R/a_NAMESPACE.R b/epclust/R/a_NAMESPACE.R index 89aa453..e9aa830 100644 --- a/epclust/R/a_NAMESPACE.R +++ b/epclust/R/a_NAMESPACE.R @@ -2,12 +2,13 @@ #' @include clustering.R #' @include main.R #' +#' @useDynLib epclust +#' #' @importFrom Rwave cwt #' @importFrom cluster pam #' @importFrom parallel makeCluster clusterExport parLapply stopCluster #' @importFrom wavelets dwt wt.filter -#' @importFrom stats filter spline -#' @importFrom utils tail +#' @importFrom stats spline #' @importFrom methods is -#' @importFrom bigmemory as.big.matrix is.big.matrix +#' @importFrom bigmemory big.matrix as.big.matrix is.big.matrix NULL diff --git a/epclust/R/clustering.R b/epclust/R/clustering.R index cda7fbe..92adda2 100644 --- a/epclust/R/clustering.R +++ b/epclust/R/clustering.R @@ -33,15 +33,7 @@ clusteringTask1 = function( indices, getContribs, K1, nb_series_per_chunk, ncores_clust=1, verbose=FALSE, parll=TRUE) { if (verbose) - cat(paste("*** Clustering task on ",length(indices)," lines\n", sep="")) - - wrapComputeClusters1 = function(inds) { - if (parll) - require("epclust", quietly=TRUE) - if (verbose) - cat(paste(" computeClusters1() on ",length(inds)," lines\n", sep="")) - inds[ computeClusters1(getContribs(inds), K1) ] - } + cat(paste("*** Clustering task 1 on ",length(indices)," lines\n", sep="")) if (parll) { @@ -51,10 +43,20 @@ clusteringTask1 = function( while (length(indices) > K1) { indices_workers = .spreadIndices(indices, nb_series_per_chunk) - if (parll) - indices = unlist( parallel::parLapply(cl, indices_workers, wrapComputeClusters1) ) - else - indices = unlist( lapply(indices_workers, wrapComputeClusters1) ) + indices <- + if (parll) + { + unlist( parallel::parLapply(cl, indices_workers, function(inds) { + require("epclust", quietly=TRUE) + inds[ computeClusters1(getContribs(inds), K1, verbose) ] + }) ) + } + else + { + unlist( lapply(indices_workers, function(inds) + inds[ computeClusters1(getContribs(inds), K1, verbose) ] + ) ) + } } if (parll) parallel::stopCluster(cl) @@ -67,27 +69,35 @@ clusteringTask1 = function( clusteringTask2 = function(medoids, K2, getRefSeries, nb_ref_curves, nb_series_per_chunk, ncores_clust=1,verbose=FALSE,parll=TRUE) { + if (verbose) + cat(paste("*** Clustering task 2 on ",nrow(medoids)," lines\n", sep="")) + if (nrow(medoids) <= K2) return (medoids) synchrones = computeSynchrones(medoids, getRefSeries, nb_ref_curves, nb_series_per_chunk, ncores_clust, verbose, parll) distances = computeWerDists(synchrones, ncores_clust, verbose, parll) # PAM in package 'cluster' cannot take big.matrix in input: need to cast it - mat_dists = matrix(nrow=K1, ncol=K1) - for (i in seq_len(K1)) - mat_dists[i,] = distances[i,] - medoids[ computeClusters2(mat_dists,K2), ] + medoids[ computeClusters2(distances[,],K2,verbose), ] } #' @rdname clustering #' @export -computeClusters1 = function(contribs, K1) +computeClusters1 = function(contribs, K1, verbose=FALSE) +{ + if (verbose) + cat(paste(" computeClusters1() on ",nrow(contribs)," lines\n", sep="")) cluster::pam(contribs, K1, diss=FALSE)$id.med +} #' @rdname clustering #' @export -computeClusters2 = function(distances, K2) +computeClusters2 = function(distances, K2, verbose=FALSE) +{ + if (verbose) + cat(paste(" computeClusters2() on ",nrow(distances)," lines\n", sep="")) cluster::pam(distances, K2, diss=TRUE)$id.med +} #' computeSynchrones #' @@ -106,29 +116,41 @@ computeClusters2 = function(distances, K2) computeSynchrones = function(medoids, getRefSeries, nb_ref_curves, nb_series_per_chunk, ncores_clust=1,verbose=FALSE,parll=TRUE) { + if (verbose) + cat(paste("--- Compute synchrones\n", sep="")) + computeSynchronesChunk = function(indices) { - if (verbose) - cat(paste("--- Compute synchrones for ",length(indices)," lines\n", sep="")) ref_series = getRefSeries(indices) + nb_series = nrow(ref_series) #get medoids indices for this chunk of series - for (i in seq_len(nrow(ref_series))) + + #TODO: debug this (address is OK but values are garbage: why?) +# mi = .Call("computeMedoidsIndices", medoids@address, ref_series, PACKAGE="epclust") + + #R-equivalent, requiring a matrix (thus potentially breaking "fit-in-memory" hope) + mat_meds = medoids[,] + mi = rep(NA,nb_series) + for (i in 1:nb_series) + mi[i] <- which.min( rowSums( sweep(mat_meds, 2, ref_series[i,], '-')^2 ) ) + rm(mat_meds); gc() + + for (i in seq_len(nb_series)) { - j = which.min( rowSums( sweep(medoids, 2, ref_series[i,], '-')^2 ) ) if (parll) synchronicity::lock(m) - synchrones[j,] = synchrones[j,] + ref_series[i,] - counts[j,1] = counts[j,1] + 1 + synchrones[mi[i],] = synchrones[mi[i],] + ref_series[i,] + counts[mi[i],1] = counts[mi[i],1] + 1 if (parll) synchronicity::unlock(m) } } - K = nrow(medoids) + K = nrow(medoids) ; L = ncol(medoids) # Use bigmemory (shared==TRUE by default) + synchronicity to fill synchrones in // # TODO: if size > RAM (not our case), use file-backed big.matrix - synchrones = bigmemory::big.matrix(nrow=K,ncol=ncol(medoids),type="double",init=0.) - counts = bigmemory::big.matrix(nrow=K,ncol=1,type="double",init=0) + synchrones = bigmemory::big.matrix(nrow=K, ncol=L, type="double", init=0.) + counts = bigmemory::big.matrix(nrow=K, ncol=1, type="double", init=0) # synchronicity is only for Linux & MacOS; on Windows: run sequentially parll = (requireNamespace("synchronicity",quietly=TRUE) && parll && Sys.info()['sysname'] != "Windows") @@ -144,9 +166,10 @@ computeSynchrones = function(medoids, getRefSeries, } indices_workers = .spreadIndices(seq_len(nb_ref_curves), nb_series_per_chunk) + browser() ignored <- if (parll) - parallel::parLapply(indices_workers, computeSynchronesChunk) + parallel::parLapply(cl, indices_workers, computeSynchronesChunk) else lapply(indices_workers, computeSynchronesChunk) @@ -179,11 +202,8 @@ computeSynchrones = function(medoids, getRefSeries, #' @export computeWerDists = function(synchrones, ncores_clust=1,verbose=FALSE,parll=TRUE) { - - - -#TODO: re-organize to call computeWerDist(x,y) [C] (in //?) from two indices + big.matrix - + if (verbose) + cat(paste("--- Compute WER dists\n", sep="")) n <- nrow(synchrones) delta <- ncol(synchrones) @@ -201,10 +221,20 @@ computeWerDists = function(synchrones, ncores_clust=1,verbose=FALSE,parll=TRUE) s0log = as.integer( (log2( s0*w0/(2*pi) ) - 1) * nvoice + 1.5 ) totnoct = noctave + as.integer(s0log/nvoice) + 1 + Xwer_dist <- bigmemory::big.matrix(nrow=n, ncol=n, type="double") + fcoefs = rep(1/3, 3) #moving average on 3 values + + # Generate n(n-1)/2 pairs for WER distances computations + pairs = list() + V = seq_len(n) + for (i in 1:n) + { + V = V[-1] + pairs = c(pairs, lapply(V, function(v) c(i,v))) + } + computeCWT = function(i) { - if (verbose) - cat(paste("+++ Compute Rwave::cwt() on serie ",i,"\n", sep="")) ts <- scale(ts(synchrones[i,]), center=TRUE, scale=scaled) totts.cwt = Rwave::cwt(ts, totnoct, nvoice, w0, plot=FALSE) ts.cwt = totts.cwt[,s0log:(s0log+noctave*nvoice)] @@ -214,6 +244,22 @@ computeWerDists = function(synchrones, ncores_clust=1,verbose=FALSE,parll=TRUE) sqres / max(Mod(sqres)) } + computeDistancesIJ = function(pair) + { + i = pair[1] ; j = pair[2] + if (verbose && j==i+1) + cat(paste(" Distances (",i,",",j,"), (",i,",",j+1,") ...\n", sep="")) + cwt_i = computeCWT(i) + cwt_j = computeCWT(j) + num <- .Call("filter", Mod(cwt_i * Conj(cwt_j)), PACKAGE="epclust") + WX <- .Call("filter", Mod(cwt_i * Conj(cwt_i)), PACKAGE="epclust") + WY <- .Call("filter", Mod(cwt_j * Conj(cwt_j)), PACKAGE="epclust") + wer2 <- sum(colSums(num)^2) / sum(colSums(WX) * colSums(WY)) + Xwer_dist[i,j] <- sqrt(delta * ncol(cwt_i) * (1 - wer2)) + Xwer_dist[j,i] <- Xwer_dist[i,j] + Xwer_dist[i,i] = 0. + } + if (parll) { cl = parallel::makeCluster(ncores_clust) @@ -222,59 +268,15 @@ computeWerDists = function(synchrones, ncores_clust=1,verbose=FALSE,parll=TRUE) envir=environment()) } - # list of CWT from synchrones - # TODO: fit in RAM, OK? If not, 2 options: serialize, compute individual distances - Xcwt4 <- + ignored <- if (parll) - parallel::parLapply(cl, seq_len(n), computeCWT) + parallel::parLapply(cl, pairs, computeDistancesIJ) else - lapply(seq_len(n), computeCWT) + lapply(pairs, computeDistancesIJ) if (parll) parallel::stopCluster(cl) - Xwer_dist <- bigmemory::big.matrix(nrow=n, ncol=n, type="double") - fcoefs = rep(1/3, 3) #moving average on 3 values (TODO: very slow! correct?!) - if (verbose) - cat("*** Compute WER distances from CWT\n") - - #TODO: computeDistances(i,j), et répartir les n(n-1)/2 couples d'indices - #là c'est trop déséquilibré - - computeDistancesLineI = function(i) - { - if (verbose) - cat(paste(" Line ",i,"\n", sep="")) - for (j in (i+1):n) - { - #TODO: 'circular=TRUE' is wrong, should just take values on the sides; to rewrite in C - num <- filter(Mod(Xcwt4[[i]] * Conj(Xcwt4[[j]])), fcoefs, circular=TRUE) - WX <- filter(Mod(Xcwt4[[i]] * Conj(Xcwt4[[i]])), fcoefs, circular=TRUE) - WY <- filter(Mod(Xcwt4[[j]] * Conj(Xcwt4[[j]])), fcoefs, circular=TRUE) - wer2 <- sum(colSums(num)^2) / sum( sum(colSums(WX) * colSums(WY)) ) - if (parll) - synchronicity::lock(m) - Xwer_dist[i,j] <- sqrt(delta * ncol(Xcwt4[[1]]) * (1 - wer2)) - Xwer_dist[j,i] <- Xwer_dist[i,j] - if (parll) - synchronicity::unlock(m) - } - Xwer_dist[i,i] = 0. - } - - parll = (requireNamespace("synchronicity",quietly=TRUE) - && parll && Sys.info()['sysname'] != "Windows") - if (parll) - m <- synchronicity::boost.mutex() - - ignored <- - if (parll) - { - parallel::mclapply(seq_len(n-1), computeDistancesLineI, - mc.cores=ncores_clust, mc.allow.recursive=FALSE) - } - else - lapply(seq_len(n-1), computeDistancesLineI) Xwer_dist[n,n] = 0. Xwer_dist } diff --git a/epclust/R/main.R b/epclust/R/main.R index 2037dbe..977e61b 100644 --- a/epclust/R/main.R +++ b/epclust/R/main.R @@ -186,7 +186,12 @@ claws = function(getSeries, K1, K2, indices_all[((i-1)*nb_series_per_task+1):upper_bound] }) if (verbose) - cat(paste("...Run ",ntasks," x stage 1 in parallel\n",sep="")) + { + message = paste("...Run ",ntasks," x stage 1", sep="") + if (WER=="mix") + message = paste(message," + stage 2", sep="") + cat(paste(message,"\n", sep="")) + } if (WER=="mix") {synchrones_file = paste(bin_dir,"synchrones",sep="") ; unlink(synchrones_file)} if (parll && ntasks>1) @@ -229,7 +234,7 @@ claws = function(getSeries, K1, K2, indices_medoids = clusteringTask1( indices, getContribs, K1, nb_series_per_chunk, ncores_tasks*ncores_clust, verbose, parll) medoids1 = bigmemory::as.big.matrix( getSeries(indices_medoids) ) - medoids2 = computeClusters2(medoids1, K2, + medoids2 = clusteringTask2(medoids1, K2, getRefSeries, nb_curves, nb_series_per_chunk, ncores_tasks*ncores_clust, verbose, parll) # Cleanup diff --git a/epclust/src/WER.c b/epclust/src/WER.c deleted file mode 100644 index 36bfba7..0000000 --- a/epclust/src/WER.c +++ /dev/null @@ -1,117 +0,0 @@ -#include -#include -#include - -#ifndef M_PI -#define M_PI 3.14159265358979323846 -#endif - -// n: number of synchrones, m: length of a synchrone -float computeWerDist(float* s1, float* s2, int n, int m) -{ - //TODO: automatic tune of all these parameters ? (for other users) - int nvoice = 4; - //noctave 2^13 = 8192 half hours ~ 180 days ; ~log2(ncol(synchrones)) - int noctave = 13 - // 4 here represent 2^5 = 32 half-hours ~ 1 day - //NOTE: default scalevector == 2^(0:(noctave * nvoice) / nvoice) * s0 (?) - //R: scalevector <- 2^(4:(noctave * nvoice) / nvoice + 1) - int* scalevector = (int*)malloc( (noctave*nvoice-4 + 1) * sizeof(int)) - for (int i=4; i<=noctave*nvoice; i++) - scalevector[i-4] = pow(2., (float)i/nvoice + 1.); - //condition: ( log2(s0*w0/(2*pi)) - 1 ) * nvoice + 1.5 >= 1 - int s0 = 2; - double w0 = 2*M_PI; - bool scaled = false; - int s0log = as.integer( (log2( s0*w0/(2*pi) ) - 1) * nvoice + 1.5 ) - int totnoct = noctave + as.integer(s0log/nvoice) + 1 - - - - - -///TODO: continue - - - - computeCWT = function(i) - { - if (verbose) - cat(paste("+++ Compute Rwave::cwt() on serie ",i,"\n", sep="")) - ts <- scale(ts(synchrones[i,]), center=TRUE, scale=scaled) - totts.cwt = Rwave::cwt(ts,totnoct,nvoice,w0,plot=0) - ts.cwt = totts.cwt[,s0log:(s0log+noctave*nvoice)] - #Normalization - sqs <- sqrt(2^(0:(noctave*nvoice)/nvoice)*s0) - sqres <- sweep(ts.cwt,2,sqs,'*') - sqres / max(Mod(sqres)) - } - - if (parll) - { - cl = parallel::makeCluster(ncores_clust) - parallel::clusterExport(cl, - varlist=c("synchrones","totnoct","nvoice","w0","s0log","noctave","s0","verbose"), - envir=environment()) - } - - # (normalized) observations node with CWT - Xcwt4 <- - if (parll) - parallel::parLapply(cl, seq_len(n), computeCWT) - else - lapply(seq_len(n), computeCWT) - - if (parll) - parallel::stopCluster(cl) - - Xwer_dist <- bigmemory::big.matrix(nrow=n, ncol=n, type="double") - fcoefs = rep(1/3, 3) #moving average on 3 values (TODO: very slow! correct?!) - if (verbose) - cat("*** Compute WER distances from CWT\n") - - #TODO: computeDistances(i,j), et répartir les n(n-1)/2 couples d'indices - #là c'est trop déséquilibré - - computeDistancesLineI = function(i) - { - if (verbose) - cat(paste(" Line ",i,"\n", sep="")) - for (j in (i+1):n) - { - #TODO: 'circular=TRUE' is wrong, should just take values on the sides; to rewrite in C - num <- filter(Mod(Xcwt4[[i]] * Conj(Xcwt4[[j]])), fcoefs, circular=TRUE) - WX <- filter(Mod(Xcwt4[[i]] * Conj(Xcwt4[[i]])), fcoefs, circular=TRUE) - WY <- filter(Mod(Xcwt4[[j]] * Conj(Xcwt4[[j]])), fcoefs, circular=TRUE) - wer2 <- sum(colSums(num)^2) / sum( sum(colSums(WX) * colSums(WY)) ) - if (parll) - synchronicity::lock(m) - Xwer_dist[i,j] <- sqrt(delta * ncol(Xcwt4[[1]]) * (1 - wer2)) - Xwer_dist[j,i] <- Xwer_dist[i,j] - if (parll) - synchronicity::unlock(m) - } - Xwer_dist[i,i] = 0. - } - - parll = (requireNamespace("synchronicity",quietly=TRUE) - && parll && Sys.info()['sysname'] != "Windows") - if (parll) - m <- synchronicity::boost.mutex() - - ignored <- - if (parll) - { - parallel::mclapply(seq_len(n-1), computeDistancesLineI, - mc.cores=ncores_clust, mc.allow.recursive=FALSE) - } - else - lapply(seq_len(n-1), computeDistancesLineI) - Xwer_dist[n,n] = 0. - - mat_dists = matrix(nrow=n, ncol=n) - #TODO: avoid this loop? - for (i in 1:n) - mat_dists[i,] = Xwer_dist[i,] - mat_dists - diff --git a/epclust/src/computeMedoidsIndices.c b/epclust/src/computeMedoidsIndices.c new file mode 100644 index 0000000..98a0111 --- /dev/null +++ b/epclust/src/computeMedoidsIndices.c @@ -0,0 +1,51 @@ +#include +#include +#include +#include +#include + +#include + +// (K,L): dim(medoids) +// mi: medoids indices +SEXP computeMedoidsIndices(SEXP medoids_, SEXP ref_series_) +{ + double *medoids = (double*) R_ExternalPtrAddr(medoids_), + *ref_series = REAL(ref_series_); + int nb_series = INTEGER(getAttrib(ref_series_, R_DimSymbol))[0], + K = INTEGER(getAttrib(medoids_, R_DimSymbol))[0], + L = INTEGER(getAttrib(ref_series_, R_DimSymbol))[1], + *mi = (int*)malloc(nb_series*sizeof(int)); + + //TODO: debug this: medoids have same addresses on both sides, but this side fails + printf("MED: %x\n",medoids); + + for (int i=0; i +#include +#include +#include + +#include + +SEXP filter(SEXP cwt_) +{ + int L = INTEGER(getAttrib(cwt_, R_DimSymbol))[0], + D = INTEGER(getAttrib(cwt_, R_DimSymbol))[1]; + double *cwt = REAL(cwt_); + SEXP R_fcwt; + PROTECT(R_fcwt = allocMatrix(REALSXP, L, D)); + double* fcwt = REAL(R_fcwt); + + //TODO: coding style is terrible... no time for now. + for (int col=0; col0) series[indices,] else NULL } - synchrones = computeSynchrones(rbind(s1,s2,s3), getRefSeries, n, 100, - verbose=TRUE, parll=FALSE) + synchrones = computeSynchrones(bigmemory::as.big.matrix(rbind(s1,s2,s3)), getRefSeries, + n, 100, verbose=TRUE, parll=FALSE) expect_equal(dim(synchrones), c(K,L)) for (i in 1:K) expect_equal(synchrones[i,], s[[i]], tolerance=0.01) }) +# NOTE: medoids can be a big.matrix computeDistortion = function(series, medoids) { n = nrow(series) ; L = ncol(series) distortion = 0. + if (bigmemory::is.big.matrix(medoids)) + medoids = medoids[,] for (i in seq_len(n)) distortion = distortion + min( rowSums( sweep(medoids,2,series[i,],'-')^2 ) / L ) distortion / n } -test_that("computeClusters2 behave as expected", +test_that("clusteringTask1 behave as expected", { n = 900 x = seq(0,9.5,0.1) L = length(x) #96 1/4h K1 = 60 - K2 = 3 - #for (i in 1:60) {plot(x^(1+i/30)*cos(x+i),type="l",col=i,ylim=c(-50,50)); par(new=TRUE)} s = lapply( seq_len(K1), function(i) x^(1+i/30)*cos(x+i) ) series = matrix(nrow=n, ncol=L) for (i in seq_len(n)) series[i,] = s[[I(i,K1)]] + rnorm(L,sd=0.01) - getRefSeries = function(indices) { + getSeries = function(indices) { indices = indices[indices <= n] if (length(indices)>0) series[indices,] else NULL } - # Artificially simulate 60 medoids - perfect situation, all equal to one of the refs - medoids_K1 = do.call(rbind, lapply( 1:K1, function(i) s[[I(i,K1)]] ) ) - medoids_K2 = computeClusters2(medoids_K1, K2, getRefSeries, n, 75, - verbose=TRUE, parll=FALSE) + wf = "haar" + ctype = "absolute" + getContribs = function(indices) curvesToContribs(series[indices,],wf,ctype) + indices1 = clusteringTask1(1:n, getContribs, K1, 75, verbose=TRUE, parll=FALSE) + medoids_K1 = getSeries(indices1) - expect_equal(dim(medoids_K2), c(K2,L)) + expect_equal(dim(medoids_K1), c(K1,L)) # Not easy to evaluate result: at least we expect it to be better than random selection of - # medoids within 1...K1 (among references) - distorGood = computeDistortion(series, medoids_K2) + # medoids within initial series + distorGood = computeDistortion(series, medoids_K1) for (i in 1:3) - expect_lte( distorGood, computeDistortion(series,medoids_K1[sample(1:K1, K2),]) ) + expect_lte( distorGood, computeDistortion(series,series[sample(1:n, K1),]) ) }) -test_that("clusteringTask1 + computeClusters2 behave as expected", +test_that("clusteringTask2 behave as expected", { n = 900 x = seq(0,9.5,0.1) L = length(x) #96 1/4h K1 = 60 K2 = 3 + #for (i in 1:60) {plot(x^(1+i/30)*cos(x+i),type="l",col=i,ylim=c(-50,50)); par(new=TRUE)} s = lapply( seq_len(K1), function(i) x^(1+i/30)*cos(x+i) ) series = matrix(nrow=n, ncol=L) for (i in seq_len(n)) series[i,] = s[[I(i,K1)]] + rnorm(L,sd=0.01) - getSeries = function(indices) { + getRefSeries = function(indices) { indices = indices[indices <= n] if (length(indices)>0) series[indices,] else NULL } - wf = "haar" - ctype = "absolute" - getContribs = function(indices) curvesToContribs(series[indices,],wf,ctype) - medoids_K1 = getSeries( clusteringTask1(1:n, getContribs, K1, 75, - verbose=TRUE, parll=FALSE) ) - medoids_K2 = computeClusters2(medoids_K1, K2, getSeries, n, 120, - verbose=TRUE, parll=FALSE) + # Artificially simulate 60 medoids - perfect situation, all equal to one of the refs + medoids_K1 = bigmemory::as.big.matrix( + do.call(rbind, lapply( 1:K1, function(i) s[[I(i,K1)]] ) ) ) + medoids_K2 = clusteringTask2(medoids_K1, K2, getRefSeries, n, 75, verbose=TRUE, parll=FALSE) - expect_equal(dim(medoids_K1), c(K1,L)) expect_equal(dim(medoids_K2), c(K2,L)) # Not easy to evaluate result: at least we expect it to be better than random selection of # medoids within 1...K1 (among references) @@ -143,3 +153,36 @@ test_that("clusteringTask1 + computeClusters2 behave as expected", for (i in 1:3) expect_lte( distorGood, computeDistortion(series,medoids_K1[sample(1:K1, K2),]) ) }) + +#NOTE: rather redundant test +#test_that("clusteringTask1 + clusteringTask2 behave as expected", +#{ +# n = 900 +# x = seq(0,9.5,0.1) +# L = length(x) #96 1/4h +# K1 = 60 +# K2 = 3 +# s = lapply( seq_len(K1), function(i) x^(1+i/30)*cos(x+i) ) +# series = matrix(nrow=n, ncol=L) +# for (i in seq_len(n)) +# series[i,] = s[[I(i,K1)]] + rnorm(L,sd=0.01) +# getSeries = function(indices) { +# indices = indices[indices <= n] +# if (length(indices)>0) series[indices,] else NULL +# } +# wf = "haar" +# ctype = "absolute" +# getContribs = function(indices) curvesToContribs(series[indices,],wf,ctype) +# require("bigmemory", quietly=TRUE) +# indices1 = clusteringTask1(1:n, getContribs, K1, 75, verbose=TRUE, parll=FALSE) +# medoids_K1 = bigmemory::as.big.matrix( getSeries(indices1) ) +# medoids_K2 = clusteringTask2(medoids_K1, K2, getSeries, n, 120, verbose=TRUE, parll=FALSE) +# +# expect_equal(dim(medoids_K1), c(K1,L)) +# expect_equal(dim(medoids_K2), c(K2,L)) +# # Not easy to evaluate result: at least we expect it to be better than random selection of +# # medoids within 1...K1 (among references) +# distorGood = computeDistortion(series, medoids_K2) +# for (i in 1:3) +# expect_lte( distorGood, computeDistortion(series,medoids_K1[sample(1:K1, K2),]) ) +#}) -- 2.44.0 From 777c4b0274c059eeb5d4bd784ef773e819a7f7a2 Mon Sep 17 00:00:00 2001 From: Benjamin Auder Date: Wed, 8 Mar 2017 22:30:16 +0100 Subject: [PATCH 07/16] WER distances is a regular matrix, fix doc (weird latex error?) --- epclust/R/clustering.R | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/epclust/R/clustering.R b/epclust/R/clustering.R index 92adda2..0d37c24 100644 --- a/epclust/R/clustering.R +++ b/epclust/R/clustering.R @@ -18,6 +18,7 @@ #' @param getContribs Function to retrieve contributions from initial series indices: #' \code{getContribs(indices)} outpus a contributions matrix #' @param contribs matrix of contributions (e.g. output of \code{curvesToContribs()}) +#' @param distances matrix of K1 x K1 (WER) distances between synchrones #' @inheritParams computeSynchrones #' @inheritParams claws #' @@ -77,8 +78,7 @@ clusteringTask2 = function(medoids, K2, synchrones = computeSynchrones(medoids, getRefSeries, nb_ref_curves, nb_series_per_chunk, ncores_clust, verbose, parll) distances = computeWerDists(synchrones, ncores_clust, verbose, parll) - # PAM in package 'cluster' cannot take big.matrix in input: need to cast it - medoids[ computeClusters2(distances[,],K2,verbose), ] + medoids[ computeClusters2(distances,K2,verbose), ] } #' @rdname clustering @@ -166,7 +166,7 @@ computeSynchrones = function(medoids, getRefSeries, } indices_workers = .spreadIndices(seq_len(nb_ref_curves), nb_series_per_chunk) - browser() +#browser() ignored <- if (parll) parallel::parLapply(cl, indices_workers, computeSynchronesChunk) @@ -197,7 +197,7 @@ computeSynchrones = function(medoids, getRefSeries, #' as the series in the initial dataset #' @inheritParams claws #' -#' @return A big.matrix of size K1 x K1 +#' @return A matrix of size K1 x K1 #' #' @export computeWerDists = function(synchrones, ncores_clust=1,verbose=FALSE,parll=TRUE) @@ -244,6 +244,7 @@ computeWerDists = function(synchrones, ncores_clust=1,verbose=FALSE,parll=TRUE) sqres / max(Mod(sqres)) } + # Distance between rows i and j computeDistancesIJ = function(pair) { i = pair[1] ; j = pair[2] @@ -278,7 +279,9 @@ computeWerDists = function(synchrones, ncores_clust=1,verbose=FALSE,parll=TRUE) parallel::stopCluster(cl) Xwer_dist[n,n] = 0. - Xwer_dist + distances <- Xwer_dist[,] + rm(Xwer_dist) ; gc() + distances #~small matrix K1 x K1 } # Helper function to divide indices into balanced sets -- 2.44.0 From 363ae13430cdee6ba76b42b7316aa4b292b04d93 Mon Sep 17 00:00:00 2001 From: Benjamin Auder Date: Thu, 9 Mar 2017 04:25:51 +0100 Subject: [PATCH 08/16] use Rcpp; ongoing debug for parallel synchrones computation --- .gitignore | 4 ++ epclust/DESCRIPTION | 14 ++-- epclust/R/a_NAMESPACE.R | 1 + epclust/R/clustering.R | 97 ++++++++++++++++++--------- epclust/R/main.R | 1 + epclust/TOTO | 16 +++++ epclust/src/computeMedoidsIndices.c | 51 -------------- epclust/src/computeMedoidsIndices.cpp | 51 ++++++++++++++ epclust/src/filter.c | 37 ---------- epclust/src/filter.cpp | 40 +++++++++++ 10 files changed, 190 insertions(+), 122 deletions(-) create mode 100644 epclust/TOTO delete mode 100644 epclust/src/computeMedoidsIndices.c create mode 100644 epclust/src/computeMedoidsIndices.cpp delete mode 100644 epclust/src/filter.c create mode 100644 epclust/src/filter.cpp diff --git a/.gitignore b/.gitignore index 3a326ea..c946480 100644 --- a/.gitignore +++ b/.gitignore @@ -36,3 +36,7 @@ #ignore object files *.o *.so + +#ignore RcppExports, generated by Rcpp::compileAttributes +/epclust/R/RcppExports.R +/epclust/src/RcppExports.cpp diff --git a/epclust/DESCRIPTION b/epclust/DESCRIPTION index 1f2b5ea..300f86e 100644 --- a/epclust/DESCRIPTION +++ b/epclust/DESCRIPTION @@ -18,19 +18,25 @@ Imports: parallel, cluster, wavelets, - bigmemory, - Rwave + bigmemory, + Rwave, + Rcpp +LinkingTo: + Rcpp, + BH, + bigmemory Suggests: synchronicity, devtools, testthat, - MASS, - clue, + MASS, + clue, wmtsa, DBI License: MIT + file LICENSE RoxygenNote: 6.0.1 Collate: + 'RcppExports.R' 'main.R' 'clustering.R' 'de_serialize.R' diff --git a/epclust/R/a_NAMESPACE.R b/epclust/R/a_NAMESPACE.R index e9aa830..8407d23 100644 --- a/epclust/R/a_NAMESPACE.R +++ b/epclust/R/a_NAMESPACE.R @@ -4,6 +4,7 @@ #' #' @useDynLib epclust #' +#' @importFrom Rcpp evalCpp sourceCpp #' @importFrom Rwave cwt #' @importFrom cluster pam #' @importFrom parallel makeCluster clusterExport parLapply stopCluster diff --git a/epclust/R/clustering.R b/epclust/R/clustering.R index 0d37c24..4d43b2b 100644 --- a/epclust/R/clustering.R +++ b/epclust/R/clustering.R @@ -123,17 +123,46 @@ computeSynchrones = function(medoids, getRefSeries, { ref_series = getRefSeries(indices) nb_series = nrow(ref_series) - #get medoids indices for this chunk of series - #TODO: debug this (address is OK but values are garbage: why?) -# mi = .Call("computeMedoidsIndices", medoids@address, ref_series, PACKAGE="epclust") + if (parll) + { + require("bigmemory", quietly=TRUE) + require("synchronicity", quietly=TRUE) + require("epclust", quietly=TRUE) + synchrones <- bigmemory::attach.big.matrix(synchrones_desc) + medoids <- bigmemory::attach.big.matrix(medoids_desc) + m <- synchronicity::attach.mutex(m_desc) + } + + - #R-equivalent, requiring a matrix (thus potentially breaking "fit-in-memory" hope) - mat_meds = medoids[,] - mi = rep(NA,nb_series) - for (i in 1:nb_series) - mi[i] <- which.min( rowSums( sweep(mat_meds, 2, ref_series[i,], '-')^2 ) ) - rm(mat_meds); gc() +#TODO: use dbs(), + #https://www.r-bloggers.com/debugging-parallel-code-with-dbs/ + #http://gforge.se/2015/02/how-to-go-parallel-in-r-basics-tips/ + +#OK :: +#write(length(indices), file="TOTO") +#write( computeMedoidsIndices(medoids@address, getRefSeries(indices[1:600])), file="TOTO") +#stop() + +# write(indices, file="TOTO", ncolumns=10, append=TRUE) +#write("medoids", file = "TOTO", ncolumns=1, append=TRUE) +#write(medoids[1,1:3], file = "TOTO", ncolumns=1, append=TRUE) +#write("synchrones", file = "TOTO", ncolumns=1, append=TRUE) +#write(synchrones[1,1:3], file = "TOTO", ncolumns=1, append=TRUE) + +#NOT OK :: (should just be "ref_series") ...or yes ? race problems mutex then ? ?! + #get medoids indices for this chunk of series + mi = computeMedoidsIndices(medoids@address, getRefSeries(indices[1:600])) #ref_series) +write("MI ::::", file = "TOTO", ncolumns=1, append=TRUE) +write(mi[1:3], file = "TOTO", ncolumns=1, append=TRUE) + +# #R-equivalent, requiring a matrix (thus potentially breaking "fit-in-memory" hope) +# mat_meds = medoids[,] +# mi = rep(NA,nb_series) +# for (i in 1:nb_series) +# mi[i] <- which.min( rowSums( sweep(mat_meds, 2, ref_series[i,], '-')^2 ) ) +# rm(mat_meds); gc() for (i in seq_len(nb_series)) { @@ -155,18 +184,19 @@ computeSynchrones = function(medoids, getRefSeries, parll = (requireNamespace("synchronicity",quietly=TRUE) && parll && Sys.info()['sysname'] != "Windows") if (parll) + { m <- synchronicity::boost.mutex() + m_desc <- synchronicity::describe(m) + synchrones_desc = bigmemory::describe(synchrones) + medoids_desc = bigmemory::describe(medoids) - if (parll) - { cl = parallel::makeCluster(ncores_clust) parallel::clusterExport(cl, - varlist=c("synchrones","counts","verbose","medoids","getRefSeries"), + varlist=c("synchrones_desc","counts","verbose","m_desc","medoids_desc","getRefSeries"), envir=environment()) } indices_workers = .spreadIndices(seq_len(nb_ref_curves), nb_series_per_chunk) -#browser() ignored <- if (parll) parallel::parLapply(cl, indices_workers, computeSynchronesChunk) @@ -233,28 +263,33 @@ computeWerDists = function(synchrones, ncores_clust=1,verbose=FALSE,parll=TRUE) pairs = c(pairs, lapply(V, function(v) c(i,v))) } - computeCWT = function(i) - { - ts <- scale(ts(synchrones[i,]), center=TRUE, scale=scaled) - totts.cwt = Rwave::cwt(ts, totnoct, nvoice, w0, plot=FALSE) - ts.cwt = totts.cwt[,s0log:(s0log+noctave*nvoice)] - #Normalization - sqs <- sqrt(2^(0:(noctave*nvoice)/nvoice)*s0) - sqres <- sweep(ts.cwt,2,sqs,'*') - sqres / max(Mod(sqres)) - } - # Distance between rows i and j computeDistancesIJ = function(pair) { + require("bigmemory", quietly=TRUE) + require("epclust", quietly=TRUE) + synchrones <- bigmemory::attach.big.matrix(synchrones_desc) + Xwer_dist <- bigmemory::attach.big.matrix(Xwer_dist_desc) + + computeCWT = function(i) + { + ts <- scale(ts(synchrones[i,]), center=TRUE, scale=scaled) + totts.cwt = Rwave::cwt(ts, totnoct, nvoice, w0, plot=FALSE) + ts.cwt = totts.cwt[,s0log:(s0log+noctave*nvoice)] + #Normalization + sqs <- sqrt(2^(0:(noctave*nvoice)/nvoice)*s0) + sqres <- sweep(ts.cwt,2,sqs,'*') + sqres / max(Mod(sqres)) + } + i = pair[1] ; j = pair[2] if (verbose && j==i+1) cat(paste(" Distances (",i,",",j,"), (",i,",",j+1,") ...\n", sep="")) cwt_i = computeCWT(i) cwt_j = computeCWT(j) - num <- .Call("filter", Mod(cwt_i * Conj(cwt_j)), PACKAGE="epclust") - WX <- .Call("filter", Mod(cwt_i * Conj(cwt_i)), PACKAGE="epclust") - WY <- .Call("filter", Mod(cwt_j * Conj(cwt_j)), PACKAGE="epclust") + num <- epclustFilter(Mod(cwt_i * Conj(cwt_j))) + WX <- epclustFilter(Mod(cwt_i * Conj(cwt_i))) + WY <- epclustFilter(Mod(cwt_j * Conj(cwt_j))) wer2 <- sum(colSums(num)^2) / sum(colSums(WX) * colSums(WY)) Xwer_dist[i,j] <- sqrt(delta * ncol(cwt_i) * (1 - wer2)) Xwer_dist[j,i] <- Xwer_dist[i,j] @@ -264,9 +299,11 @@ computeWerDists = function(synchrones, ncores_clust=1,verbose=FALSE,parll=TRUE) if (parll) { cl = parallel::makeCluster(ncores_clust) - parallel::clusterExport(cl, - varlist=c("synchrones","totnoct","nvoice","w0","s0log","noctave","s0","verbose"), - envir=environment()) + synchrones_desc <- bigmemory::describe(synchrones) + Xwer_dist_desc_desc <- bigmemory::describe(Xwer_dist) + + parallel::clusterExport(cl, varlist=c("synchrones_desc","Xwer_dist_desc","totnoct", + "nvoice","w0","s0log","noctave","s0","verbose"), envir=environment()) } ignored <- diff --git a/epclust/R/main.R b/epclust/R/main.R index 977e61b..9ba23ae 100644 --- a/epclust/R/main.R +++ b/epclust/R/main.R @@ -170,6 +170,7 @@ claws = function(getSeries, K1, K2, inds, getContribs, K1, nb_series_per_chunk, ncores_clust, verbose, parll) if (WER=="mix") { + require("bigmemory", quietly=TRUE) medoids1 = bigmemory::as.big.matrix( getSeries(indices_medoids) ) medoids2 = clusteringTask2(medoids1, K2, getSeries, nb_curves, nb_series_per_chunk, ncores_clust, verbose, parll) diff --git a/epclust/TOTO b/epclust/TOTO new file mode 100644 index 0000000..c6836a6 --- /dev/null +++ b/epclust/TOTO @@ -0,0 +1,16 @@ +MI :::: +14 +52 +55 +MI :::: +56 +21 +47 +MI :::: +45 +41 +5 +MI :::: +48 +58 +28 diff --git a/epclust/src/computeMedoidsIndices.c b/epclust/src/computeMedoidsIndices.c deleted file mode 100644 index 98a0111..0000000 --- a/epclust/src/computeMedoidsIndices.c +++ /dev/null @@ -1,51 +0,0 @@ -#include -#include -#include -#include -#include - -#include - -// (K,L): dim(medoids) -// mi: medoids indices -SEXP computeMedoidsIndices(SEXP medoids_, SEXP ref_series_) -{ - double *medoids = (double*) R_ExternalPtrAddr(medoids_), - *ref_series = REAL(ref_series_); - int nb_series = INTEGER(getAttrib(ref_series_, R_DimSymbol))[0], - K = INTEGER(getAttrib(medoids_, R_DimSymbol))[0], - L = INTEGER(getAttrib(ref_series_, R_DimSymbol))[1], - *mi = (int*)malloc(nb_series*sizeof(int)); - - //TODO: debug this: medoids have same addresses on both sides, but this side fails - printf("MED: %x\n",medoids); - - for (int i=0; i + +// [[Rcpp::depends(BH, bigmemory)]] +#include + +#include +#include + +using namespace Rcpp; + +//' computeMedoidsIndices +//' +//' Compute medoids indices +//' +//' @param pMedoids External pointer +//' @param ref_series reference series +//' +//' @return A map serie number -> medoid index +// [[Rcpp::export]] +IntegerVector computeMedoidsIndices(SEXP pMedoids, NumericMatrix ref_series) +{ + XPtr pMed(pMedoids); + MatrixAccessor medoids = MatrixAccessor(*pMed); + int nb_series = ref_series.nrow(), + K = pMed->nrow(), + L = pMed->ncol(); + IntegerVector mi(nb_series); + + for (int i=0; i -#include -#include -#include - -#include - -SEXP filter(SEXP cwt_) -{ - int L = INTEGER(getAttrib(cwt_, R_DimSymbol))[0], - D = INTEGER(getAttrib(cwt_, R_DimSymbol))[1]; - double *cwt = REAL(cwt_); - SEXP R_fcwt; - PROTECT(R_fcwt = allocMatrix(REALSXP, L, D)); - double* fcwt = REAL(R_fcwt); - - //TODO: coding style is terrible... no time for now. - for (int col=0; col + +using namespace Rcpp; + +//' filter +//' +//' Filter time-series +//' +//' @param cwt Continuous wavelets transform +//' +//' @return The filtered CWT +// [[Rcpp::export]] +NumericMatrix epclustFilter(NumericMatrix cwt) +{ + int L = cwt.nrow(), + D = cwt.ncol(); + NumericMatrix fcwt(L, D); //fill with 0... TODO: back to SEXP C-style? + double *cwt_c = cwt.begin(), + *fcwt_c = fcwt.begin(); + + //TODO: coding style is terrible... no time for now. + for (int col=0; col Date: Thu, 9 Mar 2017 04:26:00 +0100 Subject: [PATCH 09/16] 'update' --- epclust/TOTO | 16 ---------------- 1 file changed, 16 deletions(-) delete mode 100644 epclust/TOTO diff --git a/epclust/TOTO b/epclust/TOTO deleted file mode 100644 index c6836a6..0000000 --- a/epclust/TOTO +++ /dev/null @@ -1,16 +0,0 @@ -MI :::: -14 -52 -55 -MI :::: -56 -21 -47 -MI :::: -45 -41 -5 -MI :::: -48 -58 -28 -- 2.44.0 From 2c14dbea13c897c6964f49f9cd17622f4c9733c0 Mon Sep 17 00:00:00 2001 From: Benjamin Auder Date: Thu, 9 Mar 2017 11:58:25 +0100 Subject: [PATCH 10/16] fix typo, add some TODO --- .gitignore | 4 ++ TODO | 12 ++++ ...nal_Data_Using_Wavelets-Antoniadis2013.pdf | 1 + epclust/R/clustering.R | 58 +++++++------------ 4 files changed, 39 insertions(+), 36 deletions(-) create mode 100644 biblio/Clustering_Functional_Data_Using_Wavelets-Antoniadis2013.pdf diff --git a/.gitignore b/.gitignore index c946480..c8ea425 100644 --- a/.gitignore +++ b/.gitignore @@ -40,3 +40,7 @@ #ignore RcppExports, generated by Rcpp::compileAttributes /epclust/R/RcppExports.R /epclust/src/RcppExports.cpp + +#misc +Rprof.out +*.zip diff --git a/TODO b/TODO index 199a59f..4f865b0 100644 --- a/TODO +++ b/TODO @@ -47,3 +47,15 @@ utiliser Rcpp ? #' @importFrom synchronicity boost.mutex lock unlock subtree: epclust, shared. This root folder should remain private + +#TODO: use dbs(), + #https://www.r-bloggers.com/debugging-parallel-code-with-dbs/ + #http://gforge.se/2015/02/how-to-go-parallel-in-r-basics-tips/ + +synchrones --> somme, pas moyenne + +PLOT: +plot manifold 2D distances WER / +fenetre tempo forme des courbes / +medoids / +gain en prevision: clust puis full --> enercast diff --git a/biblio/Clustering_Functional_Data_Using_Wavelets-Antoniadis2013.pdf b/biblio/Clustering_Functional_Data_Using_Wavelets-Antoniadis2013.pdf new file mode 100644 index 0000000..5801a64 --- /dev/null +++ b/biblio/Clustering_Functional_Data_Using_Wavelets-Antoniadis2013.pdf @@ -0,0 +1 @@ +#$# git-fat 7571c6c3b737bf2f57eab62fea99eb24a756eded 895937 diff --git a/epclust/R/clustering.R b/epclust/R/clustering.R index 4d43b2b..7e06c43 100644 --- a/epclust/R/clustering.R +++ b/epclust/R/clustering.R @@ -134,29 +134,8 @@ computeSynchrones = function(medoids, getRefSeries, m <- synchronicity::attach.mutex(m_desc) } - - -#TODO: use dbs(), - #https://www.r-bloggers.com/debugging-parallel-code-with-dbs/ - #http://gforge.se/2015/02/how-to-go-parallel-in-r-basics-tips/ - -#OK :: -#write(length(indices), file="TOTO") -#write( computeMedoidsIndices(medoids@address, getRefSeries(indices[1:600])), file="TOTO") -#stop() - -# write(indices, file="TOTO", ncolumns=10, append=TRUE) -#write("medoids", file = "TOTO", ncolumns=1, append=TRUE) -#write(medoids[1,1:3], file = "TOTO", ncolumns=1, append=TRUE) -#write("synchrones", file = "TOTO", ncolumns=1, append=TRUE) -#write(synchrones[1,1:3], file = "TOTO", ncolumns=1, append=TRUE) - -#NOT OK :: (should just be "ref_series") ...or yes ? race problems mutex then ? ?! #get medoids indices for this chunk of series - mi = computeMedoidsIndices(medoids@address, getRefSeries(indices[1:600])) #ref_series) -write("MI ::::", file = "TOTO", ncolumns=1, append=TRUE) -write(mi[1:3], file = "TOTO", ncolumns=1, append=TRUE) - + mi = computeMedoidsIndices(medoids@address, ref_series) # #R-equivalent, requiring a matrix (thus potentially breaking "fit-in-memory" hope) # mat_meds = medoids[,] # mi = rep(NA,nb_series) @@ -169,6 +148,7 @@ write(mi[1:3], file = "TOTO", ncolumns=1, append=TRUE) if (parll) synchronicity::lock(m) synchrones[mi[i],] = synchrones[mi[i],] + ref_series[i,] +#TODO: remove counts counts[mi[i],1] = counts[mi[i],1] + 1 if (parll) synchronicity::unlock(m) @@ -266,14 +246,17 @@ computeWerDists = function(synchrones, ncores_clust=1,verbose=FALSE,parll=TRUE) # Distance between rows i and j computeDistancesIJ = function(pair) { - require("bigmemory", quietly=TRUE) - require("epclust", quietly=TRUE) - synchrones <- bigmemory::attach.big.matrix(synchrones_desc) - Xwer_dist <- bigmemory::attach.big.matrix(Xwer_dist_desc) - - computeCWT = function(i) + if (parll) { - ts <- scale(ts(synchrones[i,]), center=TRUE, scale=scaled) + require("bigmemory", quietly=TRUE) + require("epclust", quietly=TRUE) + synchrones <- bigmemory::attach.big.matrix(synchrones_desc) + Xwer_dist <- bigmemory::attach.big.matrix(Xwer_dist_desc) + } + + computeCWT = function(index) + { + ts <- scale(ts(synchrones[index,]), center=TRUE, scale=scaled) totts.cwt = Rwave::cwt(ts, totnoct, nvoice, w0, plot=FALSE) ts.cwt = totts.cwt[,s0log:(s0log+noctave*nvoice)] #Normalization @@ -281,18 +264,21 @@ computeWerDists = function(synchrones, ncores_clust=1,verbose=FALSE,parll=TRUE) sqres <- sweep(ts.cwt,2,sqs,'*') sqres / max(Mod(sqres)) } - +#browser() i = pair[1] ; j = pair[2] if (verbose && j==i+1) cat(paste(" Distances (",i,",",j,"), (",i,",",j+1,") ...\n", sep="")) - cwt_i = computeCWT(i) - cwt_j = computeCWT(j) +print(system.time( { cwt_i <- computeCWT(i) + cwt_j <- computeCWT(j) } )) + +print(system.time( { num <- epclustFilter(Mod(cwt_i * Conj(cwt_j))) WX <- epclustFilter(Mod(cwt_i * Conj(cwt_i))) WY <- epclustFilter(Mod(cwt_j * Conj(cwt_j))) wer2 <- sum(colSums(num)^2) / sum(colSums(WX) * colSums(WY)) - Xwer_dist[i,j] <- sqrt(delta * ncol(cwt_i) * (1 - wer2)) + Xwer_dist[i,j] <- sqrt(delta * ncol(cwt_i) * max(1 - wer2, 0.)) #FIXME: wer2 should be < 1 Xwer_dist[j,i] <- Xwer_dist[i,j] +} ) ) Xwer_dist[i,i] = 0. } @@ -300,12 +286,12 @@ computeWerDists = function(synchrones, ncores_clust=1,verbose=FALSE,parll=TRUE) { cl = parallel::makeCluster(ncores_clust) synchrones_desc <- bigmemory::describe(synchrones) - Xwer_dist_desc_desc <- bigmemory::describe(Xwer_dist) + Xwer_dist_desc <- bigmemory::describe(Xwer_dist) parallel::clusterExport(cl, varlist=c("synchrones_desc","Xwer_dist_desc","totnoct", "nvoice","w0","s0log","noctave","s0","verbose"), envir=environment()) } - +browser() ignored <- if (parll) parallel::parLapply(cl, pairs, computeDistancesIJ) @@ -314,7 +300,7 @@ computeWerDists = function(synchrones, ncores_clust=1,verbose=FALSE,parll=TRUE) if (parll) parallel::stopCluster(cl) - +#browser() Xwer_dist[n,n] = 0. distances <- Xwer_dist[,] rm(Xwer_dist) ; gc() -- 2.44.0 From 6ad3f3fd0ec4f3cd1fd5de4a287c1893293e5bcc Mon Sep 17 00:00:00 2001 From: Benjamin Auder Date: Thu, 9 Mar 2017 15:56:37 +0100 Subject: [PATCH 11/16] save intermediate --- TODO | 6 ++ epclust/DESCRIPTION | 4 +- epclust/R/{a_NAMESPACE.R => A_NAMESPACE.R} | 0 epclust/R/clustering.R | 69 +++++++++++-------- epclust/tests/testthat/test.Filter.R | 8 +++ .../testthat/test.computeMedoidsIndices.R | 42 +++++++++++ 6 files changed, 98 insertions(+), 31 deletions(-) rename epclust/R/{a_NAMESPACE.R => A_NAMESPACE.R} (100%) create mode 100644 epclust/tests/testthat/test.Filter.R create mode 100644 epclust/tests/testthat/test.computeMedoidsIndices.R diff --git a/TODO b/TODO index 4f865b0..52b139d 100644 --- a/TODO +++ b/TODO @@ -59,3 +59,9 @@ plot manifold 2D distances WER / fenetre tempo forme des courbes / medoids / gain en prevision: clust puis full --> enercast + +réduire taille 17519 trop long ? + +synchrone : sum +cwt : trim R part +// : clever by rows retenir cwt... diff --git a/epclust/DESCRIPTION b/epclust/DESCRIPTION index 300f86e..1c6c51d 100644 --- a/epclust/DESCRIPTION +++ b/epclust/DESCRIPTION @@ -36,9 +36,9 @@ Suggests: License: MIT + file LICENSE RoxygenNote: 6.0.1 Collate: - 'RcppExports.R' 'main.R' 'clustering.R' 'de_serialize.R' - 'a_NAMESPACE.R' + 'A_NAMESPACE.R' + 'RcppExports.R' 'plot.R' diff --git a/epclust/R/a_NAMESPACE.R b/epclust/R/A_NAMESPACE.R similarity index 100% rename from epclust/R/a_NAMESPACE.R rename to epclust/R/A_NAMESPACE.R diff --git a/epclust/R/clustering.R b/epclust/R/clustering.R index 7e06c43..c226786 100644 --- a/epclust/R/clustering.R +++ b/epclust/R/clustering.R @@ -121,35 +121,29 @@ computeSynchrones = function(medoids, getRefSeries, computeSynchronesChunk = function(indices) { - ref_series = getRefSeries(indices) - nb_series = nrow(ref_series) - if (parll) { require("bigmemory", quietly=TRUE) - require("synchronicity", quietly=TRUE) + requireNamespace("synchronicity", quietly=TRUE) require("epclust", quietly=TRUE) synchrones <- bigmemory::attach.big.matrix(synchrones_desc) + counts <- bigmemory::attach.big.matrix(counts_desc) medoids <- bigmemory::attach.big.matrix(medoids_desc) m <- synchronicity::attach.mutex(m_desc) } + ref_series = getRefSeries(indices) + nb_series = nrow(ref_series) + #get medoids indices for this chunk of series mi = computeMedoidsIndices(medoids@address, ref_series) -# #R-equivalent, requiring a matrix (thus potentially breaking "fit-in-memory" hope) -# mat_meds = medoids[,] -# mi = rep(NA,nb_series) -# for (i in 1:nb_series) -# mi[i] <- which.min( rowSums( sweep(mat_meds, 2, ref_series[i,], '-')^2 ) ) -# rm(mat_meds); gc() for (i in seq_len(nb_series)) { if (parll) synchronicity::lock(m) - synchrones[mi[i],] = synchrones[mi[i],] + ref_series[i,] -#TODO: remove counts - counts[mi[i],1] = counts[mi[i],1] + 1 + synchrones[ mi[i], ] = synchrones[ mi[i], ] + ref_series[i,] + counts[ mi[i] ] = counts[ mi[i] ] + 1 #TODO: remove counts? if (parll) synchronicity::unlock(m) } @@ -168,12 +162,11 @@ computeSynchrones = function(medoids, getRefSeries, m <- synchronicity::boost.mutex() m_desc <- synchronicity::describe(m) synchrones_desc = bigmemory::describe(synchrones) + counts_desc = bigmemory::describe(counts) medoids_desc = bigmemory::describe(medoids) - cl = parallel::makeCluster(ncores_clust) - parallel::clusterExport(cl, - varlist=c("synchrones_desc","counts","verbose","m_desc","medoids_desc","getRefSeries"), - envir=environment()) + parallel::clusterExport(cl, varlist=c("synchrones_desc","counts_desc","counts", + "verbose","m_desc","medoids_desc","getRefSeries"), envir=environment()) } indices_workers = .spreadIndices(seq_len(nb_ref_curves), nb_series_per_chunk) @@ -215,6 +208,15 @@ computeWerDists = function(synchrones, ncores_clust=1,verbose=FALSE,parll=TRUE) if (verbose) cat(paste("--- Compute WER dists\n", sep="")) + + + +#TODO: serializer les CWT, les récupérer via getDataInFile +#--> OK, faut juste stocker comme séries simples de taille delta*ncol (53*17519) + + + + n <- nrow(synchrones) delta <- ncol(synchrones) #TODO: automatic tune of all these parameters ? (for other users) @@ -232,16 +234,25 @@ computeWerDists = function(synchrones, ncores_clust=1,verbose=FALSE,parll=TRUE) totnoct = noctave + as.integer(s0log/nvoice) + 1 Xwer_dist <- bigmemory::big.matrix(nrow=n, ncol=n, type="double") - fcoefs = rep(1/3, 3) #moving average on 3 values # Generate n(n-1)/2 pairs for WER distances computations +# pairs = list() +# V = seq_len(n) +# for (i in 1:n) +# { +# V = V[-1] +# pairs = c(pairs, lapply(V, function(v) c(i,v))) +# } + # Generate "smart" pairs for WER distances computations pairs = list() - V = seq_len(n) - for (i in 1:n) + F = floor(2*n/3) + for (i in 1:F) + pairs = c(pairs, lapply((i+1):n, function(v) c(i,v))) + V = (F+1):n + for (i in (F+1):(n-1)) { V = V[-1] - pairs = c(pairs, lapply(V, function(v) c(i,v))) - } + pairs = c(pairs, # Distance between rows i and j computeDistancesIJ = function(pair) @@ -264,21 +275,21 @@ computeWerDists = function(synchrones, ncores_clust=1,verbose=FALSE,parll=TRUE) sqres <- sweep(ts.cwt,2,sqs,'*') sqres / max(Mod(sqres)) } -#browser() + i = pair[1] ; j = pair[2] if (verbose && j==i+1) cat(paste(" Distances (",i,",",j,"), (",i,",",j+1,") ...\n", sep="")) -print(system.time( { cwt_i <- computeCWT(i) - cwt_j <- computeCWT(j) } )) + cwt_i <- computeCWT(i) + cwt_j <- computeCWT(j) -print(system.time( { +#print(system.time( { num <- epclustFilter(Mod(cwt_i * Conj(cwt_j))) WX <- epclustFilter(Mod(cwt_i * Conj(cwt_i))) WY <- epclustFilter(Mod(cwt_j * Conj(cwt_j))) wer2 <- sum(colSums(num)^2) / sum(colSums(WX) * colSums(WY)) Xwer_dist[i,j] <- sqrt(delta * ncol(cwt_i) * max(1 - wer2, 0.)) #FIXME: wer2 should be < 1 Xwer_dist[j,i] <- Xwer_dist[i,j] -} ) ) +#} ) ) Xwer_dist[i,i] = 0. } @@ -291,7 +302,7 @@ print(system.time( { parallel::clusterExport(cl, varlist=c("synchrones_desc","Xwer_dist_desc","totnoct", "nvoice","w0","s0log","noctave","s0","verbose"), envir=environment()) } -browser() + ignored <- if (parll) parallel::parLapply(cl, pairs, computeDistancesIJ) @@ -300,7 +311,7 @@ browser() if (parll) parallel::stopCluster(cl) -#browser() + Xwer_dist[n,n] = 0. distances <- Xwer_dist[,] rm(Xwer_dist) ; gc() diff --git a/epclust/tests/testthat/test.Filter.R b/epclust/tests/testthat/test.Filter.R new file mode 100644 index 0000000..d94a5ac --- /dev/null +++ b/epclust/tests/testthat/test.Filter.R @@ -0,0 +1,8 @@ +TODO: test computeMedoids + filter +# #R-equivalent, requiring a matrix (thus potentially breaking "fit-in-memory" hope) +# mat_meds = medoids[,] +# mi = rep(NA,nb_series) +# for (i in 1:nb_series) +# mi[i] <- which.min( rowSums( sweep(mat_meds, 2, ref_series[i,], '-')^2 ) ) +# rm(mat_meds); gc() + diff --git a/epclust/tests/testthat/test.computeMedoidsIndices.R b/epclust/tests/testthat/test.computeMedoidsIndices.R new file mode 100644 index 0000000..be5b2b4 --- /dev/null +++ b/epclust/tests/testthat/test.computeMedoidsIndices.R @@ -0,0 +1,42 @@ +context("computeMedoidsIndices") + +test_that("serialization + getDataInFile retrieve original data / from matrix", +{ + data_bin_file = "/tmp/epclust_test_m.bin" + unlink(data_bin_file) + + #dataset 200 lignes / 30 columns + data_ascii = matrix(runif(200*30,-10,10),ncol=30) + nbytes = 4 #lead to a precision of 1e-7 / 1e-8 + endian = "little" + + #Simulate serialization in one single call + binarize(data_ascii, data_bin_file, 500, ",", nbytes, endian) + expect_equal(file.info(data_bin_file)$size, length(data_ascii)*nbytes+8) + for (indices in list(c(1,3,5), 3:13, c(5,20,50), c(75,130:135), 196:200)) + { + data_lines = getDataInFile(indices, data_bin_file, nbytes, endian) + expect_equal(data_lines, data_ascii[indices,], tolerance=1e-6) + } + unlink(data_bin_file) + + #...in several calls (last call complete, next call NULL) + for (i in 1:20) + binarize(data_ascii[((i-1)*10+1):(i*10),], data_bin_file, 20, ",", nbytes, endian) + expect_equal(file.info(data_bin_file)$size, length(data_ascii)*nbytes+8) + for (indices in list(c(1,3,5), 3:13, c(5,20,50), c(75,130:135), 196:200)) + { + data_lines = getDataInFile(indices, data_bin_file, nbytes, endian) + expect_equal(data_lines, data_ascii[indices,], tolerance=1e-6) + } + unlink(data_bin_file) +}) + +TODO: test computeMedoids + filter +# #R-equivalent, requiring a matrix (thus potentially breaking "fit-in-memory" hope) +# mat_meds = medoids[,] +# mi = rep(NA,nb_series) +# for (i in 1:nb_series) +# mi[i] <- which.min( rowSums( sweep(mat_meds, 2, ref_series[i,], '-')^2 ) ) +# rm(mat_meds); gc() + -- 2.44.0 From 4204e8774fdafe2db7ed44cd8cae018bc0c4e9d7 Mon Sep 17 00:00:00 2001 From: Benjamin Auder Date: Thu, 9 Mar 2017 17:04:05 +0100 Subject: [PATCH 12/16] add submodule enercast --- .enercast | 1 + .gitmodules | 3 ++ epclust/R/clustering.R | 94 ++++++++++++++++++++---------------------- 3 files changed, 49 insertions(+), 49 deletions(-) create mode 160000 .enercast diff --git a/.enercast b/.enercast new file mode 160000 index 0000000..35da9ea --- /dev/null +++ b/.enercast @@ -0,0 +1 @@ +Subproject commit 35da9ea4a4caaac6124c0807fb8fcbd8d5e1c7ca diff --git a/.gitmodules b/.gitmodules index 16826ed..2b8ef9a 100644 --- a/.gitmodules +++ b/.gitmodules @@ -4,3 +4,6 @@ [submodule ".nbstripout"] path = .nbstripout url = https://github.com/kynan/nbstripout.git +[submodule ".enercast"] + path = .enercast + url = https://github.com/cugliari/enercast.git diff --git a/epclust/R/clustering.R b/epclust/R/clustering.R index c226786..4519f44 100644 --- a/epclust/R/clustering.R +++ b/epclust/R/clustering.R @@ -208,15 +208,6 @@ computeWerDists = function(synchrones, ncores_clust=1,verbose=FALSE,parll=TRUE) if (verbose) cat(paste("--- Compute WER dists\n", sep="")) - - - -#TODO: serializer les CWT, les récupérer via getDataInFile -#--> OK, faut juste stocker comme séries simples de taille delta*ncol (53*17519) - - - - n <- nrow(synchrones) delta <- ncol(synchrones) #TODO: automatic tune of all these parameters ? (for other users) @@ -235,24 +226,52 @@ computeWerDists = function(synchrones, ncores_clust=1,verbose=FALSE,parll=TRUE) Xwer_dist <- bigmemory::big.matrix(nrow=n, ncol=n, type="double") + cwt_file = ".epclust_bin/cwt" + #TODO: args, nb_per_chunk, nbytes, endian + # Generate n(n-1)/2 pairs for WER distances computations -# pairs = list() -# V = seq_len(n) -# for (i in 1:n) -# { -# V = V[-1] -# pairs = c(pairs, lapply(V, function(v) c(i,v))) -# } - # Generate "smart" pairs for WER distances computations pairs = list() - F = floor(2*n/3) - for (i in 1:F) - pairs = c(pairs, lapply((i+1):n, function(v) c(i,v))) - V = (F+1):n - for (i in (F+1):(n-1)) + V = seq_len(n) + for (i in 1:n) { V = V[-1] - pairs = c(pairs, + pairs = c(pairs, lapply(V, function(v) c(i,v))) + } + + computeSaveCWT = function(index) + { + ts <- scale(ts(synchrones[index,]), center=TRUE, scale=scaled) + totts.cwt = Rwave::cwt(ts, totnoct, nvoice, w0, plot=FALSE) + ts.cwt = totts.cwt[,s0log:(s0log+noctave*nvoice)] + #Normalization + sqs <- sqrt(2^(0:(noctave*nvoice)/nvoice)*s0) + sqres <- sweep(ts.cwt,2,sqs,'*') + res <- sqres / max(Mod(sqres)) + #TODO: serializer les CWT, les récupérer via getDataInFile ; + #--> OK, faut juste stocker comme séries simples de taille delta*ncol (53*17519) + binarize(res, cwt_file, 100, ",", nbytes, endian) + } + + if (parll) + { + cl = parallel::makeCluster(ncores_clust) + synchrones_desc <- bigmemory::describe(synchrones) + Xwer_dist_desc <- bigmemory::describe(Xwer_dist) + parallel::clusterExport(cl, varlist=c("synchrones_desc","Xwer_dist_desc","totnoct", + "nvoice","w0","s0log","noctave","s0","verbose","getCWT"), envir=environment()) + } + + #precompute and serialize all CWT + ignored <- + if (parll) + parallel::parLapply(cl, 1:n, computeSaveCWT) + else + lapply(1:n, computeSaveCWT) + + getCWT = function(index) + { + #from cwt_file ... + } # Distance between rows i and j computeDistancesIJ = function(pair) @@ -265,44 +284,21 @@ computeWerDists = function(synchrones, ncores_clust=1,verbose=FALSE,parll=TRUE) Xwer_dist <- bigmemory::attach.big.matrix(Xwer_dist_desc) } - computeCWT = function(index) - { - ts <- scale(ts(synchrones[index,]), center=TRUE, scale=scaled) - totts.cwt = Rwave::cwt(ts, totnoct, nvoice, w0, plot=FALSE) - ts.cwt = totts.cwt[,s0log:(s0log+noctave*nvoice)] - #Normalization - sqs <- sqrt(2^(0:(noctave*nvoice)/nvoice)*s0) - sqres <- sweep(ts.cwt,2,sqs,'*') - sqres / max(Mod(sqres)) - } - i = pair[1] ; j = pair[2] if (verbose && j==i+1) cat(paste(" Distances (",i,",",j,"), (",i,",",j+1,") ...\n", sep="")) - cwt_i <- computeCWT(i) - cwt_j <- computeCWT(j) + cwt_i <- getCWT(i) + cwt_j <- getCWT(j) -#print(system.time( { num <- epclustFilter(Mod(cwt_i * Conj(cwt_j))) WX <- epclustFilter(Mod(cwt_i * Conj(cwt_i))) - WY <- epclustFilter(Mod(cwt_j * Conj(cwt_j))) + WY <- epclustFilter(Mod(cwt_j * Conj(cwt_j))) wer2 <- sum(colSums(num)^2) / sum(colSums(WX) * colSums(WY)) Xwer_dist[i,j] <- sqrt(delta * ncol(cwt_i) * max(1 - wer2, 0.)) #FIXME: wer2 should be < 1 Xwer_dist[j,i] <- Xwer_dist[i,j] -#} ) ) Xwer_dist[i,i] = 0. } - if (parll) - { - cl = parallel::makeCluster(ncores_clust) - synchrones_desc <- bigmemory::describe(synchrones) - Xwer_dist_desc <- bigmemory::describe(Xwer_dist) - - parallel::clusterExport(cl, varlist=c("synchrones_desc","Xwer_dist_desc","totnoct", - "nvoice","w0","s0log","noctave","s0","verbose"), envir=environment()) - } - ignored <- if (parll) parallel::parLapply(cl, pairs, computeDistancesIJ) -- 2.44.0 From a174b8ea1f322992068ab42810df017a2b9620ee Mon Sep 17 00:00:00 2001 From: Benjamin Auder Date: Thu, 9 Mar 2017 17:22:09 +0100 Subject: [PATCH 13/16] draft final form of current package --- TODO | 2 ++ epclust/R/clustering.R | 18 ++++++++++-------- epclust/R/main.R | 8 ++++---- 3 files changed, 16 insertions(+), 12 deletions(-) diff --git a/TODO b/TODO index 52b139d..53b4c97 100644 --- a/TODO +++ b/TODO @@ -65,3 +65,5 @@ réduire taille 17519 trop long ? synchrone : sum cwt : trim R part // : clever by rows retenir cwt... + +Stockage matrices : en colonnes systématiquement ? diff --git a/epclust/R/clustering.R b/epclust/R/clustering.R index 4519f44..a4c273a 100644 --- a/epclust/R/clustering.R +++ b/epclust/R/clustering.R @@ -67,8 +67,8 @@ clusteringTask1 = function( #' @rdname clustering #' @export -clusteringTask2 = function(medoids, K2, - getRefSeries, nb_ref_curves, nb_series_per_chunk, ncores_clust=1,verbose=FALSE,parll=TRUE) +clusteringTask2 = function(medoids, K2, getRefSeries, nb_ref_curves, + nb_series_per_chunk, nbytes,endian,ncores_clust=1,verbose=FALSE,parll=TRUE) { if (verbose) cat(paste("*** Clustering task 2 on ",nrow(medoids)," lines\n", sep="")) @@ -77,7 +77,7 @@ clusteringTask2 = function(medoids, K2, return (medoids) synchrones = computeSynchrones(medoids, getRefSeries, nb_ref_curves, nb_series_per_chunk, ncores_clust, verbose, parll) - distances = computeWerDists(synchrones, ncores_clust, verbose, parll) + distances = computeWerDists(synchrones, nbytes, endian, ncores_clust, verbose, parll) medoids[ computeClusters2(distances,K2,verbose), ] } @@ -203,7 +203,7 @@ computeSynchrones = function(medoids, getRefSeries, #' @return A matrix of size K1 x K1 #' #' @export -computeWerDists = function(synchrones, ncores_clust=1,verbose=FALSE,parll=TRUE) +computeWerDists = function(synchrones, nbytes,endian,ncores_clust=1,verbose=FALSE,parll=TRUE) { if (verbose) cat(paste("--- Compute WER dists\n", sep="")) @@ -218,8 +218,8 @@ computeWerDists = function(synchrones, ncores_clust=1,verbose=FALSE,parll=TRUE) #NOTE: default scalevector == 2^(0:(noctave * nvoice) / nvoice) * s0 (?) scalevector <- 2^(4:(noctave * nvoice) / nvoice + 1) #condition: ( log2(s0*w0/(2*pi)) - 1 ) * nvoice + 1.5 >= 1 - s0=2 - w0=2*pi + s0 = 2 + w0 = 2*pi scaled=FALSE s0log = as.integer( (log2( s0*w0/(2*pi) ) - 1) * nvoice + 1.5 ) totnoct = noctave + as.integer(s0log/nvoice) + 1 @@ -237,7 +237,7 @@ computeWerDists = function(synchrones, ncores_clust=1,verbose=FALSE,parll=TRUE) V = V[-1] pairs = c(pairs, lapply(V, function(v) c(i,v))) } - + computeSaveCWT = function(index) { ts <- scale(ts(synchrones[index,]), center=TRUE, scale=scaled) @@ -249,7 +249,7 @@ computeWerDists = function(synchrones, ncores_clust=1,verbose=FALSE,parll=TRUE) res <- sqres / max(Mod(sqres)) #TODO: serializer les CWT, les récupérer via getDataInFile ; #--> OK, faut juste stocker comme séries simples de taille delta*ncol (53*17519) - binarize(res, cwt_file, 100, ",", nbytes, endian) + binarize(c(as.double(Re(res)),as.double(Im(res))), cwt_file, ncol(res), ",", nbytes, endian) } if (parll) @@ -271,6 +271,8 @@ computeWerDists = function(synchrones, ncores_clust=1,verbose=FALSE,parll=TRUE) getCWT = function(index) { #from cwt_file ... + res <- getDataInFile(c(2*index-1,2*index), cwt_file, nbytes, endian) + ###############TODO: } # Distance between rows i and j diff --git a/epclust/R/main.R b/epclust/R/main.R index 9ba23ae..28217c3 100644 --- a/epclust/R/main.R +++ b/epclust/R/main.R @@ -172,8 +172,8 @@ claws = function(getSeries, K1, K2, { require("bigmemory", quietly=TRUE) medoids1 = bigmemory::as.big.matrix( getSeries(indices_medoids) ) - medoids2 = clusteringTask2(medoids1, - K2, getSeries, nb_curves, nb_series_per_chunk, ncores_clust, verbose, parll) + medoids2 = clusteringTask2(medoids1, K2, getSeries, nb_curves, nb_series_per_chunk, + nbytes, endian, ncores_clust, verbose, parll) binarize(medoids2, synchrones_file, nb_series_per_chunk, sep, nbytes, endian) return (vector("integer",0)) } @@ -235,8 +235,8 @@ claws = function(getSeries, K1, K2, indices_medoids = clusteringTask1( indices, getContribs, K1, nb_series_per_chunk, ncores_tasks*ncores_clust, verbose, parll) medoids1 = bigmemory::as.big.matrix( getSeries(indices_medoids) ) - medoids2 = clusteringTask2(medoids1, K2, - getRefSeries, nb_curves, nb_series_per_chunk, ncores_tasks*ncores_clust, verbose, parll) + medoids2 = clusteringTask2(medoids1, K2, getRefSeries, nb_curves, nb_series_per_chunk, + nbytes, endian, ncores_tasks*ncores_clust, verbose, parll) # Cleanup unlink(bin_dir, recursive=TRUE) -- 2.44.0 From eef6f6c97277ea3ce760981e5244cbde7fc904a0 Mon Sep 17 00:00:00 2001 From: Benjamin Auder Date: Fri, 10 Mar 2017 08:10:49 +0100 Subject: [PATCH 14/16] TODO: args, et finir tests; relancer --- TODO | 3 + epclust/R/clustering.R | 24 ++-- epclust/R/de_serialize.R | 11 +- epclust/R/main.R | 235 ++++++++++++++++++++++++--------------- 4 files changed, 168 insertions(+), 105 deletions(-) diff --git a/TODO b/TODO index 53b4c97..53a82b3 100644 --- a/TODO +++ b/TODO @@ -67,3 +67,6 @@ cwt : trim R part // : clever by rows retenir cwt... Stockage matrices : en colonnes systématiquement ? + +TODO: revoir les arguments, simplifier (dans les clustering...), + permettre algos de clustering quelconques, args: medoids (curves puis dists), K diff --git a/epclust/R/clustering.R b/epclust/R/clustering.R index a4c273a..14915ab 100644 --- a/epclust/R/clustering.R +++ b/epclust/R/clustering.R @@ -1,6 +1,6 @@ #' @name clustering #' @rdname clustering -#' @aliases clusteringTask1 computeClusters1 computeClusters2 +#' @aliases clusteringTask1 clusteringTask2 computeClusters1 computeClusters2 #' #' @title Two-stage clustering, withing one task (see \code{claws()}) #' @@ -31,7 +31,7 @@ NULL #' @rdname clustering #' @export clusteringTask1 = function( - indices, getContribs, K1, nb_series_per_chunk, ncores_clust=1, verbose=FALSE, parll=TRUE) + indices, getContribs, K1, nb_items_per_chunk, ncores_clust=1, verbose=FALSE, parll=TRUE) { if (verbose) cat(paste("*** Clustering task 1 on ",length(indices)," lines\n", sep="")) @@ -87,7 +87,7 @@ computeClusters1 = function(contribs, K1, verbose=FALSE) { if (verbose) cat(paste(" computeClusters1() on ",nrow(contribs)," lines\n", sep="")) - cluster::pam(contribs, K1, diss=FALSE)$id.med + cluster::pam( t(contribs) , K1, diss=FALSE)$id.med } #' @rdname clustering @@ -96,7 +96,7 @@ computeClusters2 = function(distances, K2, verbose=FALSE) { if (verbose) cat(paste(" computeClusters2() on ",nrow(distances)," lines\n", sep="")) - cluster::pam(distances, K2, diss=TRUE)$id.med + cluster::pam( distances , K2, diss=TRUE)$id.med } #' computeSynchrones @@ -110,7 +110,7 @@ computeClusters2 = function(distances, K2, verbose=FALSE) #' @param nb_ref_curves How many reference series? (This number is known at this stage) #' @inheritParams claws #' -#' @return A big.matrix of size K1 x L where L = data_length +#' @return A big.matrix of size L x K1 where L = length of a serie #' #' @export computeSynchrones = function(medoids, getRefSeries, @@ -142,8 +142,8 @@ computeSynchrones = function(medoids, getRefSeries, { if (parll) synchronicity::lock(m) - synchrones[ mi[i], ] = synchrones[ mi[i], ] + ref_series[i,] - counts[ mi[i] ] = counts[ mi[i] ] + 1 #TODO: remove counts? + synchrones[, mi[i] ] = synchrones[, mi[i] ] + ref_series[,i] + counts[ mi[i] ] = counts[ mi[i] ] + 1 #TODO: remove counts? ...or as arg?! if (parll) synchronicity::unlock(m) } @@ -152,7 +152,7 @@ computeSynchrones = function(medoids, getRefSeries, K = nrow(medoids) ; L = ncol(medoids) # Use bigmemory (shared==TRUE by default) + synchronicity to fill synchrones in // # TODO: if size > RAM (not our case), use file-backed big.matrix - synchrones = bigmemory::big.matrix(nrow=K, ncol=L, type="double", init=0.) + synchrones = bigmemory::big.matrix(nrow=L, ncol=K, type="double", init=0.) counts = bigmemory::big.matrix(nrow=K, ncol=1, type="double", init=0) # synchronicity is only for Linux & MacOS; on Windows: run sequentially parll = (requireNamespace("synchronicity",quietly=TRUE) @@ -181,14 +181,14 @@ computeSynchrones = function(medoids, getRefSeries, #TODO: can we avoid this loop? ( synchrones = sweep(synchrones, 1, counts, '/') ) for (i in seq_len(K)) - synchrones[i,] = synchrones[i,] / counts[i,1] + synchrones[,i] = synchrones[,i] / counts[i] #NOTE: odds for some clusters to be empty? (when series already come from stage 2) # ...maybe; but let's hope resulting K1' be still quite bigger than K2 - noNA_rows = sapply(seq_len(K), function(i) all(!is.nan(synchrones[i,]))) + noNA_rows = sapply(seq_len(K), function(i) all(!is.nan(synchrones[,i]))) if (all(noNA_rows)) return (synchrones) # Else: some clusters are empty, need to slice synchrones - synchrones[noNA_rows,] + bigmemory::as.big.matrix(synchrones[,noNA_rows]) } #' computeWerDists @@ -272,7 +272,7 @@ computeWerDists = function(synchrones, nbytes,endian,ncores_clust=1,verbose=FALS { #from cwt_file ... res <- getDataInFile(c(2*index-1,2*index), cwt_file, nbytes, endian) - ###############TODO: + ###############TODO: } # Distance between rows i and j diff --git a/epclust/R/de_serialize.R b/epclust/R/de_serialize.R index b6684d2..f04c13a 100644 --- a/epclust/R/de_serialize.R +++ b/epclust/R/de_serialize.R @@ -45,7 +45,7 @@ binarize = function(data_ascii, data_bin_file, nb_per_chunk, #number of items always on 8 bytes writeBin(0L, data_bin, size=8, endian=endian) if ( is_matrix ) - data_length = ncol(data_ascii) + data_length = nrow(data_ascii) else #connection { data_line = scan(data_ascii, double(), sep=sep, nlines=1, quiet=TRUE) @@ -61,8 +61,8 @@ binarize = function(data_ascii, data_bin_file, nb_per_chunk, if ( is_matrix ) { data_chunk = - if (index <= nrow(data_ascii)) - as.double(t(data_ascii[index:min(nrow(data_ascii),index+nb_per_chunk-1),])) + if (index <= ncol(data_ascii)) + as.double(data_ascii[,index:min(nrow(data_ascii),index+nb_per_chunk-1)]) else double(0) index = index + nb_per_chunk @@ -113,14 +113,13 @@ getDataInFile = function(indices, data_bin_file, nbytes=4, endian=.Platform$endi data_bin = file(data_bin_file, "rb") data_size = file.info(data_bin_file)$size data_length = readBin(data_bin, "integer", n=1, size=8, endian=endian) - #Ou t(sapply(...)) (+ rapide ?) - data_ascii = do.call( rbind, lapply( indices, function(i) { + data_ascii = sapply( indices, function(i) { offset = 8+(i-1)*data_length*nbytes if (offset > data_size) return (vector("double",0)) ignored = seek(data_bin, offset) readBin(data_bin, "double", n=data_length, size=nbytes, endian=endian) - } ) ) + } ) close(data_bin) if (ncol(data_ascii)>0) data_ascii else NULL } diff --git a/epclust/R/main.R b/epclust/R/main.R index 28217c3..86dac64 100644 --- a/epclust/R/main.R +++ b/epclust/R/main.R @@ -1,67 +1,97 @@ #' CLAWS: CLustering with wAvelets and Wer distanceS #' -#' Groups electricity power curves (or any series of similar nature) by applying PAM -#' algorithm in parallel to chunks of size \code{nb_series_per_chunk}. Input series -#' must be sampled on the same time grid, no missing values. +#' Cluster electricity power curves (or any series of similar nature) by applying a +#' two stage procedure in parallel (see details). +#' Input series must be sampled on the same time grid, no missing values. +#' +#' @details Summary of the function execution flow: +#' \enumerate{ +#' \item Compute and serialize all contributions, obtained through discrete wavelet +#' decomposition (see Antoniadis & al. [2013]) +#' \item Divide series into \code{ntasks} groups to process in parallel. In each task: +#' \enumerate{ +#' \item iterate the first clustering algorithm on its aggregated outputs, +#' on inputs of size \code{nb_items_clust} +#' \item optionally, if WER=="mix": +#' a) compute the K1 synchrones curves, +#' b) compute WER distances (K1xK1 matrix) between synchrones and +#' c) apply the second clustering algorithm +#' } +#' \item Launch a final task on the aggregated outputs of all previous tasks: +#' in the case WER=="end" this task takes indices in input, otherwise +#' (medoid) curves +#' } +#' The main argument -- \code{getSeries} -- has a quite misleading name, since it can be +#' either a [big.]matrix, a CSV file, a connection or a user function to retrieve +#' series; the name was chosen because all types of arguments are converted to a function. +#' When \code{getSeries} is given as a function, it must take a single argument, +#' 'indices', integer vector equal to the indices of the curves to retrieve; +#' see SQLite example. The nature and role of other arguments should be clear #' #' @param getSeries Access to the (time-)series, which can be of one of the three #' following types: #' \itemize{ -#' \item [big.]matrix: each line contains all the values for one time-serie, ordered by time +#' \item [big.]matrix: each column contains the (time-ordered) values of one time-serie #' \item connection: any R connection object providing lines as described above #' \item character: name of a CSV file containing series in rows (no header) #' \item function: a custom way to retrieve the curves; it has only one argument: -#' the indices of the series to be retrieved. See examples +#' the indices of the series to be retrieved. See SQLite example #' } -#' @inheritParams clustering -#' @param K1 Number of super-consumers to be found after stage 1 (K1 << N) +#' @param K1 Number of clusters to be found after stage 1 (K1 << N [number of series]) #' @param K2 Number of clusters to be found after stage 2 (K2 << K1) -#' @param wf Wavelet transform filter; see ?wavelets::wt.filter -#' @param ctype Type of contribution: "relative" or "absolute" (or any prefix) -#' @param WER "end" to apply stage 2 after stage 1 has fully iterated, or "mix" to apply stage 2 -#' at the end of each task +#' @param nb_per_chunk (Maximum) number of items to retrieve in one batch, for both types of +#' retrieval: resp. series and contribution; in a vector of size 2 +#' @param nb_items_clust1 (Maximum) number of items in input of the clustering algorithm +#' for stage 1 +#' @param wav_filt Wavelet transform filter; see ?wavelets::wt.filter +#' @param contrib_type Type of contribution: "relative", "logit" or "absolute" (any prefix) +#' @param WER "end" to apply stage 2 after stage 1 has fully iterated, or "mix" to apply +#' stage 2 at the end of each task #' @param random TRUE (default) for random chunks repartition -#' @param ntasks Number of tasks (parallel iterations to obtain K1 medoids); default: 1. -#' Note: ntasks << N, so that N is "roughly divisible" by N (number of series) -#' @param ncores_tasks "MPI" number of parallel tasks (1 to disable: sequential tasks) -#' @param ncores_clust "OpenMP" number of parallel clusterings in one task -#' @param nb_series_per_chunk (~Maximum) number of series in each group, inside a task -#' @param min_series_per_chunk Minimum number of series in each group +#' @param ntasks Number of tasks (parallel iterations to obtain K1 [if WER=="end"] +#' or K2 [if WER=="mix"] medoids); default: 1. +#' Note: ntasks << N (number of series), so that N is "roughly divisible" by ntasks +#' @param ncores_tasks Number of parallel tasks (1 to disable: sequential tasks) +#' @param ncores_clust Number of parallel clusterings in one task (4 should be a minimum) #' @param sep Separator in CSV input file (if any provided) #' @param nbytes Number of bytes to serialize a floating-point number; 4 or 8 -#' @param endian Endianness to use for (de)serialization. Use "little" or "big" for portability +#' @param endian Endianness for (de)serialization ("little" or "big") #' @param verbose Level of verbosity (0/FALSE for nothing or 1/TRUE for all; devel stage) #' @param parll TRUE to fully parallelize; otherwise run sequentially (debug, comparison) #' -#' @return A big.matrix of the final medoids curves (K2) in rows +#' @return A matrix of the final K2 medoids curves, in columns +#' +#' @references Clustering functional data using Wavelets [2013]; +#' A. Antoniadis, X. Brossat, J. Cugliari & J.-M. Poggi. +#' Inter. J. of Wavelets, Multiresolution and Information Procesing, +#' vol. 11, No 1, pp.1-30. doi:10.1142/S0219691313500033 #' #' @examples #' \dontrun{ -#' # WER distances computations are a bit too long for CRAN (for now) +#' # WER distances computations are too long for CRAN (for now) #' #' # Random series around cos(x,2x,3x)/sin(x,2x,3x) #' x = seq(0,500,0.05) #' L = length(x) #10001 -#' ref_series = matrix( c(cos(x), cos(2*x), cos(3*x), sin(x), sin(2*x), sin(3*x)), -#' byrow=TRUE, ncol=L ) +#' ref_series = matrix( c(cos(x),cos(2*x),cos(3*x),sin(x),sin(2*x),sin(3*x)), ncol=6 ) #' library(wmtsa) -#' series = do.call( rbind, lapply( 1:6, function(i) -#' do.call(rbind, wmtsa::wavBootstrap(ref_series[i,], n.realization=400)) ) ) +#' series = do.call( cbind, lapply( 1:6, function(i) +#' do.call(cbind, wmtsa::wavBootstrap(ref_series[i,], n.realization=400)) ) ) #' #dim(series) #c(2400,10001) -#' medoids_ascii = claws(series, K1=60, K2=6, "d8", "rel", nb_series_per_chunk=500) +#' medoids_ascii = claws(series, K1=60, K2=6, nb_per_chunk=c(200,500), verbose=TRUE) #' #' # Same example, from CSV file #' csv_file = "/tmp/epclust_series.csv" #' write.table(series, csv_file, sep=",", row.names=FALSE, col.names=FALSE) -#' medoids_csv = claws(csv_file, K1=60, K2=6, "d8", "rel", nb_series_per_chunk=500) +#' medoids_csv = claws(csv_file, K1=60, K2=6, nb_per_chunk=c(200,500)) #' #' # Same example, from binary file -#' bin_file = "/tmp/epclust_series.bin" -#' nbytes = 8 -#' endian = "little" -#' epclust::binarize(csv_file, bin_file, 500, nbytes, endian) -#' getSeries = function(indices) getDataInFile(indices, bin_file, nbytes, endian) -#' medoids_bin = claws(getSeries, K1=60, K2=6, "d8", "rel", nb_series_per_chunk=500) +#' bin_file <- "/tmp/epclust_series.bin" +#' nbytes <- 8 +#' endian <- "little" +#' binarize(csv_file, bin_file, 500, nbytes, endian) +#' getSeries <- function(indices) getDataInFile(indices, bin_file, nbytes, endian) +#' medoids_bin <- claws(getSeries, K1=60, K2=6, nb_per_chunk=c(200,500)) #' unlink(csv_file) #' unlink(bin_file) #' @@ -69,8 +99,8 @@ #' library(DBI) #' series_db <- dbConnect(RSQLite::SQLite(), "file::memory:") #' # Prepare data.frame in DB-format -#' n = nrow(series) -#' time_values = data.frame( +#' n <- nrow(series) +#' time_values <- data.frame( #' id = rep(1:n,each=L), #' time = rep( as.POSIXct(1800*(0:n),"GMT",origin="2001-01-01"), L ), #' value = as.double(t(series)) ) @@ -78,17 +108,17 @@ #' # Fill associative array, map index to identifier #' indexToID_inDB <- as.character( #' dbGetQuery(series_db, 'SELECT DISTINCT id FROM time_values')[,"id"] ) -#' getSeries = function(indices) { -#' request = "SELECT id,value FROM times_values WHERE id in (" +#' serie_length <- as.integer( dbGetQuery(series_db, +#' paste("SELECT COUNT * FROM time_values WHERE id == ",indexToID_inDB[1],sep="")) ) +#' getSeries <- function(indices) { +#' request <- "SELECT id,value FROM times_values WHERE id in (" #' for (i in indices) -#' request = paste(request, i, ",", sep="") -#' request = paste(request, ")", sep="") -#' df_series = dbGetQuery(series_db, request) -#' # Assume that all series share same length at this stage -#' ts_length = sum(df_series[,"id"] == df_series[1,"id"]) -#' t( as.matrix(df_series[,"value"], nrow=ts_length) ) +#' request <- paste(request, indexToID_inDB[i], ",", sep="") +#' request <- paste(request, ")", sep="") +#' df_series <- dbGetQuery(series_db, request) +#' as.matrix(df_series[,"value"], nrow=serie_length) #' } -#' medoids_db = claws(getSeries, K1=60, K2=6, "d8", "rel", nb_series_per_chunk=500) +#' medoids_db = claws(getSeries, K1=60, K2=6, nb_per_chunk=c(200,500)) #' dbDisconnect(series_db) #' #' # All computed medoids should be the same: @@ -98,12 +128,12 @@ #' digest::sha1(medoids_db) #' } #' @export -claws = function(getSeries, K1, K2, - wf,ctype, #stage 1 +claws <- function(getSeries, K1, K2, + nb_per_chunk,nb_items_clust1=7*K1 #volumes of data + wav_filt="d8",contrib_type="absolute", #stage 1 WER="end", #stage 2 random=TRUE, #randomize series order? - ntasks=1, ncores_tasks=1, ncores_clust=4, #control parallelism - nb_series_per_chunk=50*K1, min_series_per_chunk=5*K1, #chunk size + ntasks=1, ncores_tasks=1, ncores_clust=4, #parallelism sep=",", #ASCII input separator nbytes=4, endian=.Platform$endian, #serialization (write,read) verbose=FALSE, parll=TRUE) @@ -115,26 +145,39 @@ claws = function(getSeries, K1, K2, { stop("'getSeries': [big]matrix, function, file or valid connection (no NA)") } - K1 = .toInteger(K1, function(x) x>=2) - K2 = .toInteger(K2, function(x) x>=2) - if (!is.logical(random)) - stop("'random': logical") - tryCatch( - {ignored <- wavelets::wt.filter(wf)}, - error = function(e) stop("Invalid wavelet filter; see ?wavelets::wt.filter")) + K1 <- .toInteger(K1, function(x) x>=2) + K2 <- .toInteger(K2, function(x) x>=2) + if (!is.numeric(nb_per_chunk) || length(nb_per_chunk)!=2) + stop("'nb_per_chunk': numeric, size 2") + nb_per_chunk[1] <- .toInteger(nb_per_chunk[1], function(x) x>=1) + # A batch of contributions should have at least as many elements as a batch of series, + # because it always contains much less values + nb_per_chunk[2] <- max(.toInteger(nb_per_chunk[2],function(x) x>=1), nb_per_chunk[1]) + nb_items_clust1 <- .toInteger(nb_items_clust1, function(x) x>K1) + random <- .toLogical(random) + tryCatch + ( + {ignored <- wavelets::wt.filter(wav_filt)}, + error = function(e) stop("Invalid wavelet filter; see ?wavelets::wt.filter") + ) + ctypes = c("relative","absolute","logit") + contrib_type = ctypes[ pmatch(contrib_type,ctypes) ] + if (is.na(contrib_type)) + stop("'contrib_type' in {'relative','absolute','logit'}") if (WER!="end" && WER!="mix") - stop("WER takes values in {'end','mix'}") - ntasks = .toInteger(ntasks, function(x) x>=1) - ncores_tasks = .toInteger(ncores_tasks, function(x) x>=1) - ncores_clust = .toInteger(ncores_clust, function(x) x>=1) - nb_series_per_chunk = .toInteger(nb_series_per_chunk, function(x) x>=K1) - min_series_per_chunk = .toInteger(K1, function(x) x>=K1 && x<=nb_series_per_chunk) + stop("'WER': in {'end','mix'}") + random <- .toLogical(random) + ntasks <- .toInteger(ntasks, function(x) x>=1) + ncores_tasks <- .toInteger(ncores_tasks, function(x) x>=1) + ncores_clust <- .toInteger(ncores_clust, function(x) x>=1) if (!is.character(sep)) stop("'sep': character") - nbytes = .toInteger(nbytes, function(x) x==4 || x==8) + nbytes <- .toInteger(nbytes, function(x) x==4 || x==8) + verbose <- .toLogical(verbose) + parll <- .toLogical(parll) # Serialize series if required, to always use a function - bin_dir = ".epclust_bin/" + bin_dir <- ".epclust_bin/" dir.create(bin_dir, showWarnings=FALSE, mode="0755") if (!is.function(getSeries)) { @@ -156,11 +199,11 @@ claws = function(getSeries, K1, K2, contribs_file, nb_series_per_chunk, nbytes, endian) getContribs = function(indices) getDataInFile(indices, contribs_file, nbytes, endian) - if (nb_curves < min_series_per_chunk) - stop("Not enough data: less rows than min_series_per_chunk!") + if (nb_curves < K2) + stop("Not enough data: less series than final number of clusters") nb_series_per_task = round(nb_curves / ntasks) - if (nb_series_per_task < min_series_per_chunk) - stop("Too many tasks: less series in one task than min_series_per_chunk!") + if (nb_series_per_task < K2) + stop("Too many tasks: less series in one task than final number of clusters") runTwoStepClustering = function(inds) { @@ -170,7 +213,8 @@ claws = function(getSeries, K1, K2, inds, getContribs, K1, nb_series_per_chunk, ncores_clust, verbose, parll) if (WER=="mix") { - require("bigmemory", quietly=TRUE) + if (parll && ntasks>1) + require("bigmemory", quietly=TRUE) medoids1 = bigmemory::as.big.matrix( getSeries(indices_medoids) ) medoids2 = clusteringTask2(medoids1, K2, getSeries, nb_curves, nb_series_per_chunk, nbytes, endian, ncores_clust, verbose, parll) @@ -197,7 +241,7 @@ claws = function(getSeries, K1, K2, {synchrones_file = paste(bin_dir,"synchrones",sep="") ; unlink(synchrones_file)} if (parll && ntasks>1) { - cl = parallel::makeCluster(ncores_tasks) + cl = parallel::makeCluster(ncores_tasks, outfile="") varlist = c("getSeries","getContribs","K1","K2","verbose","parll", "nb_series_per_chunk","ntasks","ncores_clust","sep","nbytes","endian") if (WER=="mix") @@ -206,10 +250,11 @@ claws = function(getSeries, K1, K2, } # 1000*K1 indices [if WER=="end"], or empty vector [if WER=="mix"] --> series on file - if (parll && ntasks>1) - indices = unlist( parallel::parLapply(cl, indices_tasks, runTwoStepClustering) ) - else - indices = unlist( lapply(indices_tasks, runTwoStepClustering) ) + indices <- + if (parll && ntasks>1) + unlist( parallel::parLapply(cl, indices_tasks, runTwoStepClustering) ) + else + unlist( lapply(indices_tasks, runTwoStepClustering) ) if (parll && ntasks>1) parallel::stopCluster(cl) @@ -241,7 +286,7 @@ claws = function(getSeries, K1, K2, # Cleanup unlink(bin_dir, recursive=TRUE) - medoids2 + medoids2[,] } #' curvesToContribs @@ -249,36 +294,52 @@ claws = function(getSeries, K1, K2, #' Compute the discrete wavelet coefficients for each series, and aggregate them in #' energy contribution across scales as described in https://arxiv.org/abs/1101.4744v2 #' -#' @param series Matrix of series (in rows), of size n x L +#' @param series [big.]matrix of series (in columns), of size L x n #' @inheritParams claws #' -#' @return A matrix of size n x log(L) containing contributions in rows +#' @return A [big.]matrix of size log(L) x n containing contributions in columns #' #' @export -curvesToContribs = function(series, wf, ctype) +curvesToContribs = function(series, wav_filt, contrib_type) { - L = length(series[1,]) + L = nrow(series) D = ceiling( log2(L) ) nb_sample_points = 2^D - cont_types = c("relative","absolute") - ctype = cont_types[ pmatch(ctype,cont_types) ] - t( apply(series, 1, function(x) { + apply(series, 2, function(x) { interpolated_curve = spline(1:L, x, n=nb_sample_points)$y W = wavelets::dwt(interpolated_curve, filter=wf, D)@W nrj = rev( sapply( W, function(v) ( sqrt( sum(v^2) ) ) ) ) - if (ctype=="relative") nrj / sum(nrj) else nrj - }) ) + if (contrib_type!="absolute") + nrj = nrj / sum(nrj) + if (contrib_type=="logit") + nrj = - log(1 - nrj) + nrj + }) } # Check integer arguments with functional conditions .toInteger <- function(x, condition) { + errWarn <- function(ignored) + paste("Cannot convert argument' ",substitute(x),"' to integer", sep="") if (!is.integer(x)) - tryCatch( - {x = as.integer(x)[1]}, - error = function(e) paste("Cannot convert argument",substitute(x),"to integer") - ) + tryCatch({x = as.integer(x)[1]; if (is.na(x)) stop()}, + warning = errWarn, error = errWarn) if (!condition(x)) - stop(paste("Argument",substitute(x),"does not verify condition",body(condition))) + { + stop(paste("Argument '",substitute(x), + "' does not verify condition ",body(condition), sep="")) + } + x +} + +# Check logical arguments +.toLogical <- function(x) +{ + errWarn <- function(ignored) + paste("Cannot convert argument' ",substitute(x),"' to logical", sep="") + if (!is.logical(x)) + tryCatch({x = as.logical(x)[1]; if (is.na(x)) stop()}, + warning = errWarn, error = errWarn) x } -- 2.44.0 From 2b9f5356793c245a5e10229a74ac0dabd8f62508 Mon Sep 17 00:00:00 2001 From: Benjamin Auder Date: Fri, 10 Mar 2017 12:09:09 +0100 Subject: [PATCH 15/16] Add comments to easily read code --- epclust/R/clustering.R | 18 +++++-- epclust/R/main.R | 106 +++++++++++++++++++++++++++++------------ 2 files changed, 90 insertions(+), 34 deletions(-) diff --git a/epclust/R/clustering.R b/epclust/R/clustering.R index 14915ab..70d263e 100644 --- a/epclust/R/clustering.R +++ b/epclust/R/clustering.R @@ -11,8 +11,8 @@ #' and then WER distances computations, before applying the clustering algorithm. #' \code{computeClusters1()} and \code{computeClusters2()} correspond to the atomic #' clustering procedures respectively for stage 1 and 2. The former applies the -#' clustering algorithm (PAM) on a contributions matrix, while the latter clusters -#' a chunk of series inside one task (~max nb_series_per_chunk) +#' first clustering algorithm on a contributions matrix, while the latter clusters +#' a set of series inside one task (~nb_items_clust) #' #' @param indices Range of series indices to cluster in parallel (initial data) #' @param getContribs Function to retrieve contributions from initial series indices: @@ -31,11 +31,23 @@ NULL #' @rdname clustering #' @export clusteringTask1 = function( - indices, getContribs, K1, nb_items_per_chunk, ncores_clust=1, verbose=FALSE, parll=TRUE) + indices, getContribs, K1, nb_per_chunk, nb_items_clust, ncores_clust=1, + verbose=FALSE, parll=TRUE) { if (verbose) cat(paste("*** Clustering task 1 on ",length(indices)," lines\n", sep="")) + + + + + +##TODO: reviser le spreadIndices, tenant compte de nb_items_clust + + ##TODO: reviser / harmoniser avec getContribs qui en récupère pt'et + pt'et - !! + + + if (parll) { cl = parallel::makeCluster(ncores_clust) diff --git a/epclust/R/main.R b/epclust/R/main.R index 86dac64..cbc92b1 100644 --- a/epclust/R/main.R +++ b/epclust/R/main.R @@ -41,6 +41,12 @@ #' @param K2 Number of clusters to be found after stage 2 (K2 << K1) #' @param nb_per_chunk (Maximum) number of items to retrieve in one batch, for both types of #' retrieval: resp. series and contribution; in a vector of size 2 +#' @param algo_clust1 Clustering algorithm for stage 1. A function which takes (data, K) +#' as argument where data is a matrix in columns and K the desired number of clusters, +#' and outputs K medoids ranks. Default: PAM +#' @param algo_clust2 Clustering algorithm for stage 2. A function which takes (dists, K) +#' as argument where dists is a matrix of distances and K the desired number of clusters, +#' and outputs K clusters representatives (curves). Default: k-means #' @param nb_items_clust1 (Maximum) number of items in input of the clustering algorithm #' for stage 1 #' @param wav_filt Wavelet transform filter; see ?wavelets::wt.filter @@ -128,14 +134,16 @@ #' digest::sha1(medoids_db) #' } #' @export -claws <- function(getSeries, K1, K2, - nb_per_chunk,nb_items_clust1=7*K1 #volumes of data - wav_filt="d8",contrib_type="absolute", #stage 1 - WER="end", #stage 2 - random=TRUE, #randomize series order? - ntasks=1, ncores_tasks=1, ncores_clust=4, #parallelism - sep=",", #ASCII input separator - nbytes=4, endian=.Platform$endian, #serialization (write,read) +claws <- function(getSeries, K1, K2, nb_per_chunk, + nb_items_clust1=7*K1, + algo_clust1=function(data,K) cluster::pam(data,K,diss=FALSE), + algo_clust2=function(dists,K) stats::kmeans(dists,K,iter.max=50,nstart=3), + wav_filt="d8",contrib_type="absolute", + WER="end", + random=TRUE, + ntasks=1, ncores_tasks=1, ncores_clust=4, + sep=",", + nbytes=4, endian=.Platform$endian, verbose=FALSE, parll=TRUE) { # Check/transform arguments @@ -176,9 +184,16 @@ claws <- function(getSeries, K1, K2, verbose <- .toLogical(verbose) parll <- .toLogical(parll) - # Serialize series if required, to always use a function + # Since we don't make assumptions on initial data, there is a possibility that even + # when serialized, contributions or synchrones do not fit in RAM. For example, + # 30e6 series of length 100,000 would lead to a +4Go contribution matrix. Therefore, + # it's safer to place these in (binary) files, located in the following folder. bin_dir <- ".epclust_bin/" dir.create(bin_dir, showWarnings=FALSE, mode="0755") + + # Binarize series if getSeries is not a function; the aim is to always use a function, + # to uniformize treatments. An equally good alternative would be to use a file-backed + # bigmemory::big.matrix, but it would break the uniformity. if (!is.function(getSeries)) { if (verbose) @@ -199,14 +214,43 @@ claws <- function(getSeries, K1, K2, contribs_file, nb_series_per_chunk, nbytes, endian) getContribs = function(indices) getDataInFile(indices, contribs_file, nbytes, endian) + # A few sanity checks: do not continue if too few data available. if (nb_curves < K2) stop("Not enough data: less series than final number of clusters") nb_series_per_task = round(nb_curves / ntasks) if (nb_series_per_task < K2) stop("Too many tasks: less series in one task than final number of clusters") + # Generate a random permutation of 1:N (if random==TRUE); otherwise just use arrival + # (storage) order. + indices_all = if (random) sample(nb_curves) else seq_len(nb_curves) + # Split (all) indices into ntasks groups of ~same size + indices_tasks = lapply(seq_len(ntasks), function(i) { + upper_bound = ifelse( i1) + { + # Initialize parallel runs: outfile="" allow to output verbose traces in the console + # under Linux. All necessary variables are passed to the workers. + cl = parallel::makeCluster(ncores_tasks, outfile="") + varlist = c("getSeries","getContribs","K1","K2","algo_clust1","algo_clust2", + "nb_per_chunk","nb_items_clust","ncores_clust","sep","nbytes","endian", + "verbose","parll") + if (WER=="mix") + varlist = c(varlist, "medoids_file") + parallel::clusterExport(cl, varlist, envir = environment()) + } + + # This function achieves one complete clustering task, divided in stage 1 + stage 2. + # stage 1: n indices --> clusteringTask1(...) --> K1 medoids + # stage 2: K1 medoids --> clusteringTask2(...) --> K2 medoids, + # where n = N / ntasks, N being the total number of curves. runTwoStepClustering = function(inds) { + # When running in parallel, the environment is blank: we need to load required + # packages, and pass useful variables. if (parll && ntasks>1) require("epclust", quietly=TRUE) indices_medoids = clusteringTask1( @@ -218,18 +262,18 @@ claws <- function(getSeries, K1, K2, medoids1 = bigmemory::as.big.matrix( getSeries(indices_medoids) ) medoids2 = clusteringTask2(medoids1, K2, getSeries, nb_curves, nb_series_per_chunk, nbytes, endian, ncores_clust, verbose, parll) - binarize(medoids2, synchrones_file, nb_series_per_chunk, sep, nbytes, endian) + binarize(medoids2, medoids_file, nb_series_per_chunk, sep, nbytes, endian) return (vector("integer",0)) } indices_medoids } - # Cluster contributions in parallel (by nb_series_per_chunk) - indices_all = if (random) sample(nb_curves) else seq_len(nb_curves) - indices_tasks = lapply(seq_len(ntasks), function(i) { - upper_bound = ifelse( i1) - { - cl = parallel::makeCluster(ncores_tasks, outfile="") - varlist = c("getSeries","getContribs","K1","K2","verbose","parll", - "nb_series_per_chunk","ntasks","ncores_clust","sep","nbytes","endian") - if (WER=="mix") - varlist = c(varlist, "synchrones_file") - parallel::clusterExport(cl, varlist=varlist, envir = environment()) - } - # 1000*K1 indices [if WER=="end"], or empty vector [if WER=="mix"] --> series on file + # As explained above, indices will be assigned to ntasks*K1 medoids indices [if WER=="end"], + # or nothing (empty vector) if WER=="mix"; in this case, medoids (synchrones) are stored + # in a file. indices <- if (parll && ntasks>1) unlist( parallel::parLapply(cl, indices_tasks, runTwoStepClustering) ) @@ -258,13 +293,20 @@ claws <- function(getSeries, K1, K2, if (parll && ntasks>1) parallel::stopCluster(cl) + # Right before the final stage, two situations are possible: + # a. data to be processed now sit in binary format in medoids_file (if WER=="mix") + # b. data still is the initial set of curves, referenced by the ntasks*K1 indices + # So, the function getSeries() will potentially change. However, computeSynchrones() + # requires a function retrieving the initial series. Thus, the next line saves future + # conditional instructions. getRefSeries = getSeries + if (WER=="mix") { indices = seq_len(ntasks*K2) - #Now series must be retrieved from synchrones_file + # Now series must be retrieved from synchrones_file getSeries = function(inds) getDataInFile(inds, synchrones_file, nbytes, endian) - #Contributions must be re-computed + # Contributions must be re-computed unlink(contribs_file) index = 1 if (verbose) @@ -283,9 +325,11 @@ claws <- function(getSeries, K1, K2, medoids2 = clusteringTask2(medoids1, K2, getRefSeries, nb_curves, nb_series_per_chunk, nbytes, endian, ncores_tasks*ncores_clust, verbose, parll) - # Cleanup + # Cleanup: remove temporary binary files and their folder unlink(bin_dir, recursive=TRUE) + # Return medoids as a standard matrix, since K2 series have to fit in RAM + # (clustering algorithm 1 takes K1 > K2 of them as input) medoids2[,] } -- 2.44.0 From bccecb19b4faa20808dab761d30f2d2dd4dcf587 Mon Sep 17 00:00:00 2001 From: Benjamin Auder Date: Fri, 10 Mar 2017 12:38:09 +0100 Subject: [PATCH 16/16] 'update' --- epclust/R/main.R | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/epclust/R/main.R b/epclust/R/main.R index cbc92b1..31ce390 100644 --- a/epclust/R/main.R +++ b/epclust/R/main.R @@ -304,8 +304,8 @@ claws <- function(getSeries, K1, K2, nb_per_chunk, if (WER=="mix") { indices = seq_len(ntasks*K2) - # Now series must be retrieved from synchrones_file - getSeries = function(inds) getDataInFile(inds, synchrones_file, nbytes, endian) + # Now series (synchrones) must be retrieved from medoids_file + getSeries = function(inds) getDataInFile(inds, medoids_file, nbytes, endian) # Contributions must be re-computed unlink(contribs_file) index = 1 @@ -316,6 +316,9 @@ claws <- function(getSeries, K1, K2, nb_per_chunk, contribs_file, nb_series_per_chunk, nbytes, endian) } +#TODO: check THAT + + # Run step2 on resulting indices or series (from file) if (verbose) cat("...Run final // stage 1 + stage 2\n") -- 2.44.0