From 24ed5d835e2eebaaa4d5f8296f8d2e2132cc6398 Mon Sep 17 00:00:00 2001 From: Benjamin Auder Date: Wed, 8 Mar 2017 11:52:39 +0100 Subject: [PATCH] add some TODOs --- epclust/R/clustering.R | 82 +++++++++++++++++++---------- epclust/R/main.R | 9 ++++ epclust/src/WER.c | 117 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 180 insertions(+), 28 deletions(-) create mode 100644 epclust/src/WER.c diff --git a/epclust/R/clustering.R b/epclust/R/clustering.R index 9a55495..3993e76 100644 --- a/epclust/R/clustering.R +++ b/epclust/R/clustering.R @@ -21,7 +21,7 @@ #' #' @return For \code{clusteringTask1()} and \code{computeClusters1()}, the indices of the #' computed (K1) medoids. Indices are irrelevant for stage 2 clustering, thus -#' \code{computeClusters2()} outputs a matrix of medoids +#' \code{computeClusters2()} outputs a big.matrix of medoids #' (of size limited by nb_series_per_chunk) NULL @@ -73,6 +73,7 @@ computeClusters2 = function(medoids, K2, synchrones = computeSynchrones(medoids, getRefSeries, nb_ref_curves, nb_series_per_chunk, ncores_clust, verbose, parll) distances = computeWerDists(synchrones, ncores_clust, verbose, parll) + #TODO: if PAM cannot take big.matrix in input, cast it before... (more than OK in RAM) medoids[ cluster::pam(distances, K2, diss=TRUE)$medoids , ] } @@ -81,16 +82,27 @@ computeClusters2 = function(medoids, K2, #' Compute the synchrones curves (sum of clusters elements) from a matrix of medoids, #' using L2 distances. #' -#' @param medoids Matrix of medoids (curves of same legnth as initial series) +#' @param medoids big.matrix of medoids (curves of same length as initial series) #' @param getRefSeries Function to retrieve initial series (e.g. in stage 2 after series #' have been replaced by stage-1 medoids) #' @param nb_ref_curves How many reference series? (This number is known at this stage) #' @inheritParams claws #' +#' @return A big.matrix of size K1 x L where L = data_length +#' #' @export computeSynchrones = function(medoids, getRefSeries, nb_ref_curves, nb_series_per_chunk, ncores_clust=1,verbose=FALSE,parll=TRUE) { + + + +#TODO: si parll, getMedoids + serialization, pass only getMedoids to nodes +# --> BOF... chaque node chargera tous les medoids (efficacité) :/ ==> faut que ça tienne en RAM +#au pire :: C-ifier et charger medoids 1 by 1... + + #MIEUX :: medoids DOIT etre une big.matrix partagée ! + computeSynchronesChunk = function(indices) { if (verbose) @@ -111,36 +123,43 @@ computeSynchrones = function(medoids, getRefSeries, K = nrow(medoids) # Use bigmemory (shared==TRUE by default) + synchronicity to fill synchrones in // + # TODO: if size > RAM (not our case), use file-backed big.matrix synchrones = bigmemory::big.matrix(nrow=K,ncol=ncol(medoids),type="double",init=0.) counts = bigmemory::big.matrix(nrow=K,ncol=1,type="double",init=0) - # Fork (// run) only on Linux & MacOS; on Windows: run sequentially + # synchronicity is only for Linux & MacOS; on Windows: run sequentially parll = (requireNamespace("synchronicity",quietly=TRUE) && parll && Sys.info()['sysname'] != "Windows") if (parll) m <- synchronicity::boost.mutex() + if (parll) + { + cl = parallel::makeCluster(ncores_clust) + parallel::clusterExport(cl, + varlist=c("synchrones","counts","verbose","medoids","getRefSeries"), + envir=environment()) + } + indices_workers = .spreadIndices(seq_len(nb_ref_curves), nb_series_per_chunk) ignored <- if (parll) - { - parallel::mclapply(indices_workers, computeSynchronesChunk, - mc.cores=ncores_clust, mc.allow.recursive=FALSE) - } + parallel::parLapply(indices_workers, computeSynchronesChunk) else lapply(indices_workers, computeSynchronesChunk) - mat_syncs = matrix(nrow=K, ncol=ncol(medoids)) - vec_count = rep(NA, K) - #TODO: can we avoid this loop? + if (parll) + parallel::stopCluster(cl) + + #TODO: can we avoid this loop? ( synchrones = sweep(synchrones, 1, counts, '/') ) for (i in seq_len(K)) - { - mat_syncs[i,] = synchrones[i,] - vec_count[i] = counts[i,1] - } + synchrones[i,] = synchrones[i,] / counts[i,1] #NOTE: odds for some clusters to be empty? (when series already come from stage 2) # ...maybe; but let's hope resulting K1' be still quite bigger than K2 - mat_syncs = sweep(mat_syncs, 1, vec_count, '/') - mat_syncs[ sapply(seq_len(K), function(i) all(!is.nan(mat_syncs[i,]))) , ] + noNA_rows = sapply(seq_len(K), function(i) all(!is.nan(synchrones[i,]))) + if (all(noNA_rows)) + return (synchrones) + # Else: some clusters are empty, need to slice synchrones + synchrones[noNA_rows,] } #' computeWerDists @@ -148,13 +167,21 @@ computeSynchrones = function(medoids, getRefSeries, #' Compute the WER distances between the synchrones curves (in rows), which are #' returned (e.g.) by \code{computeSynchrones()} #' -#' @param synchrones A matrix of synchrones, in rows. The series have same length as the -#' series in the initial dataset +#' @param synchrones A big.matrix of synchrones, in rows. The series have same length +#' as the series in the initial dataset #' @inheritParams claws #' +#' @return A big.matrix of size K1 x K1 +#' #' @export computeWerDists = function(synchrones, ncores_clust=1,verbose=FALSE,parll=TRUE) { + + + +#TODO: re-organize to call computeWerDist(x,y) [C] (in //?) from two indices + big.matrix + + n <- nrow(synchrones) delta <- ncol(synchrones) #TODO: automatic tune of all these parameters ? (for other users) @@ -163,7 +190,7 @@ computeWerDists = function(synchrones, ncores_clust=1,verbose=FALSE,parll=TRUE) noctave = 13 # 4 here represent 2^5 = 32 half-hours ~ 1 day #NOTE: default scalevector == 2^(0:(noctave * nvoice) / nvoice) * s0 (?) - scalevector <- 2^(4:(noctave * nvoice) / nvoice) * 2 + scalevector <- 2^(4:(noctave * nvoice) / nvoice + 1) #condition: ( log2(s0*w0/(2*pi)) - 1 ) * nvoice + 1.5 >= 1 s0=2 w0=2*pi @@ -176,7 +203,7 @@ computeWerDists = function(synchrones, ncores_clust=1,verbose=FALSE,parll=TRUE) if (verbose) cat(paste("+++ Compute Rwave::cwt() on serie ",i,"\n", sep="")) ts <- scale(ts(synchrones[i,]), center=TRUE, scale=scaled) - totts.cwt = Rwave::cwt(ts,totnoct,nvoice,w0,plot=0) + totts.cwt = Rwave::cwt(ts, totnoct, nvoice, w0, plot=FALSE) ts.cwt = totts.cwt[,s0log:(s0log+noctave*nvoice)] #Normalization sqs <- sqrt(2^(0:(noctave*nvoice)/nvoice)*s0) @@ -192,7 +219,8 @@ computeWerDists = function(synchrones, ncores_clust=1,verbose=FALSE,parll=TRUE) envir=environment()) } - # (normalized) observations node with CWT + # list of CWT from synchrones + # TODO: fit in RAM, OK? If not, 2 options: serialize, compute individual distances Xcwt4 <- if (parll) parallel::parLapply(cl, seq_len(n), computeCWT) @@ -207,6 +235,9 @@ computeWerDists = function(synchrones, ncores_clust=1,verbose=FALSE,parll=TRUE) if (verbose) cat("*** Compute WER distances from CWT\n") + #TODO: computeDistances(i,j), et répartir les n(n-1)/2 couples d'indices + #là c'est trop déséquilibré + computeDistancesLineI = function(i) { if (verbose) @@ -217,7 +248,7 @@ computeWerDists = function(synchrones, ncores_clust=1,verbose=FALSE,parll=TRUE) num <- filter(Mod(Xcwt4[[i]] * Conj(Xcwt4[[j]])), fcoefs, circular=TRUE) WX <- filter(Mod(Xcwt4[[i]] * Conj(Xcwt4[[i]])), fcoefs, circular=TRUE) WY <- filter(Mod(Xcwt4[[j]] * Conj(Xcwt4[[j]])), fcoefs, circular=TRUE) - wer2 <- sum(colSums(num)^2) / sum( sum(colSums(WX) * colSums(WY)) ) + wer2 <- sum(colSums(num)^2) / sum( sum(colSums(WX) * colSums(WY)) ) if (parll) synchronicity::lock(m) Xwer_dist[i,j] <- sqrt(delta * ncol(Xcwt4[[1]]) * (1 - wer2)) @@ -242,12 +273,7 @@ computeWerDists = function(synchrones, ncores_clust=1,verbose=FALSE,parll=TRUE) else lapply(seq_len(n-1), computeDistancesLineI) Xwer_dist[n,n] = 0. - - mat_dists = matrix(nrow=n, ncol=n) - #TODO: avoid this loop? - for (i in 1:n) - mat_dists[i,] = Xwer_dist[i,] - mat_dists + Xwer_dist } # Helper function to divide indices into balanced sets diff --git a/epclust/R/main.R b/epclust/R/main.R index bba0618..f1e435c 100644 --- a/epclust/R/main.R +++ b/epclust/R/main.R @@ -169,6 +169,15 @@ claws = function(getSeries, K1, K2, inds, getContribs, K1, nb_series_per_chunk, ncores_clust, verbose, parll) if (WER=="mix") { + + + + +#TODO: getSeries(indices_medoids) BAD ; il faudrait une big.matrix de medoids en entree + #OK en RAM il y en aura 1000 (donc 1000*K1*17519... OK) + #...mais du coup chaque process ne re-dupliquera pas medoids + + medoids2 = computeClusters2(getSeries(indices_medoids), K2, getSeries, nb_curves, nb_series_per_chunk, ncores_clust, verbose, parll) binarize(medoids2, synchrones_file, nb_series_per_chunk, sep, nbytes, endian) diff --git a/epclust/src/WER.c b/epclust/src/WER.c new file mode 100644 index 0000000..36bfba7 --- /dev/null +++ b/epclust/src/WER.c @@ -0,0 +1,117 @@ +#include +#include +#include + +#ifndef M_PI +#define M_PI 3.14159265358979323846 +#endif + +// n: number of synchrones, m: length of a synchrone +float computeWerDist(float* s1, float* s2, int n, int m) +{ + //TODO: automatic tune of all these parameters ? (for other users) + int nvoice = 4; + //noctave 2^13 = 8192 half hours ~ 180 days ; ~log2(ncol(synchrones)) + int noctave = 13 + // 4 here represent 2^5 = 32 half-hours ~ 1 day + //NOTE: default scalevector == 2^(0:(noctave * nvoice) / nvoice) * s0 (?) + //R: scalevector <- 2^(4:(noctave * nvoice) / nvoice + 1) + int* scalevector = (int*)malloc( (noctave*nvoice-4 + 1) * sizeof(int)) + for (int i=4; i<=noctave*nvoice; i++) + scalevector[i-4] = pow(2., (float)i/nvoice + 1.); + //condition: ( log2(s0*w0/(2*pi)) - 1 ) * nvoice + 1.5 >= 1 + int s0 = 2; + double w0 = 2*M_PI; + bool scaled = false; + int s0log = as.integer( (log2( s0*w0/(2*pi) ) - 1) * nvoice + 1.5 ) + int totnoct = noctave + as.integer(s0log/nvoice) + 1 + + + + + +///TODO: continue + + + + computeCWT = function(i) + { + if (verbose) + cat(paste("+++ Compute Rwave::cwt() on serie ",i,"\n", sep="")) + ts <- scale(ts(synchrones[i,]), center=TRUE, scale=scaled) + totts.cwt = Rwave::cwt(ts,totnoct,nvoice,w0,plot=0) + ts.cwt = totts.cwt[,s0log:(s0log+noctave*nvoice)] + #Normalization + sqs <- sqrt(2^(0:(noctave*nvoice)/nvoice)*s0) + sqres <- sweep(ts.cwt,2,sqs,'*') + sqres / max(Mod(sqres)) + } + + if (parll) + { + cl = parallel::makeCluster(ncores_clust) + parallel::clusterExport(cl, + varlist=c("synchrones","totnoct","nvoice","w0","s0log","noctave","s0","verbose"), + envir=environment()) + } + + # (normalized) observations node with CWT + Xcwt4 <- + if (parll) + parallel::parLapply(cl, seq_len(n), computeCWT) + else + lapply(seq_len(n), computeCWT) + + if (parll) + parallel::stopCluster(cl) + + Xwer_dist <- bigmemory::big.matrix(nrow=n, ncol=n, type="double") + fcoefs = rep(1/3, 3) #moving average on 3 values (TODO: very slow! correct?!) + if (verbose) + cat("*** Compute WER distances from CWT\n") + + #TODO: computeDistances(i,j), et répartir les n(n-1)/2 couples d'indices + #là c'est trop déséquilibré + + computeDistancesLineI = function(i) + { + if (verbose) + cat(paste(" Line ",i,"\n", sep="")) + for (j in (i+1):n) + { + #TODO: 'circular=TRUE' is wrong, should just take values on the sides; to rewrite in C + num <- filter(Mod(Xcwt4[[i]] * Conj(Xcwt4[[j]])), fcoefs, circular=TRUE) + WX <- filter(Mod(Xcwt4[[i]] * Conj(Xcwt4[[i]])), fcoefs, circular=TRUE) + WY <- filter(Mod(Xcwt4[[j]] * Conj(Xcwt4[[j]])), fcoefs, circular=TRUE) + wer2 <- sum(colSums(num)^2) / sum( sum(colSums(WX) * colSums(WY)) ) + if (parll) + synchronicity::lock(m) + Xwer_dist[i,j] <- sqrt(delta * ncol(Xcwt4[[1]]) * (1 - wer2)) + Xwer_dist[j,i] <- Xwer_dist[i,j] + if (parll) + synchronicity::unlock(m) + } + Xwer_dist[i,i] = 0. + } + + parll = (requireNamespace("synchronicity",quietly=TRUE) + && parll && Sys.info()['sysname'] != "Windows") + if (parll) + m <- synchronicity::boost.mutex() + + ignored <- + if (parll) + { + parallel::mclapply(seq_len(n-1), computeDistancesLineI, + mc.cores=ncores_clust, mc.allow.recursive=FALSE) + } + else + lapply(seq_len(n-1), computeDistancesLineI) + Xwer_dist[n,n] = 0. + + mat_dists = matrix(nrow=n, ncol=n) + #TODO: avoid this loop? + for (i in 1:n) + mat_dists[i,] = Xwer_dist[i,] + mat_dists + -- 2.44.0