From 40f12a2f66d06fd77183ea02b996f5c66f90761c Mon Sep 17 00:00:00 2001 From: Benjamin Auder Date: Mon, 13 Mar 2017 04:51:37 +0100 Subject: [PATCH] 'update' --- epclust/DESCRIPTION | 3 +- epclust/R/A_NAMESPACE.R | 1 - epclust/R/clustering.R | 278 +----------------- epclust/R/computeSynchrones.R | 89 ++++++ epclust/R/computeWerDists.R | 141 +++++++++ epclust/R/main.R | 222 +++++--------- epclust/R/utils.R | 117 ++++++++ epclust/inst/CITATION | 18 -- epclust/src/computeMedoidsIndices.cpp | 57 ---- epclust/src/filter.c | 70 +++++ epclust/src/filter.cpp | 50 ---- epclust/tests/testthat/helper-common.R | 3 + epclust/tests/testthat/helper.clustering.R | 18 -- .../testthat/helper.computeMedoidsIndices.R | 12 - epclust/tests/testthat/test-assignMedoids.R | 37 +++ .../{test.clustering.R => test-clustering.R} | 80 +---- .../tests/testthat/test-computeSynchrones.R | 38 +++ ...test.wavelets.R => test-computeWerDists.R} | 8 +- ...est.de_serialize.R => test-de_serialize.R} | 0 .../{test.filter.R => test-filterMA.R} | 0 epclust/tests/testthat/test-utils.R | 32 ++ .../testthat/test.computeMedoidsIndices.R | 25 -- 22 files changed, 626 insertions(+), 673 deletions(-) create mode 100644 epclust/R/computeSynchrones.R create mode 100644 epclust/R/computeWerDists.R create mode 100644 epclust/R/utils.R delete mode 100644 epclust/inst/CITATION delete mode 100644 epclust/src/computeMedoidsIndices.cpp create mode 100644 epclust/src/filter.c delete mode 100644 epclust/src/filter.cpp create mode 100644 epclust/tests/testthat/helper-common.R delete mode 100644 epclust/tests/testthat/helper.clustering.R delete mode 100644 epclust/tests/testthat/helper.computeMedoidsIndices.R create mode 100644 epclust/tests/testthat/test-assignMedoids.R rename epclust/tests/testthat/{test.clustering.R => test-clustering.R} (55%) create mode 100644 epclust/tests/testthat/test-computeSynchrones.R rename epclust/tests/testthat/{test.wavelets.R => test-computeWerDists.R} (74%) rename epclust/tests/testthat/{test.de_serialize.R => test-de_serialize.R} (100%) rename epclust/tests/testthat/{test.filter.R => test-filterMA.R} (100%) create mode 100644 epclust/tests/testthat/test-utils.R delete mode 100644 epclust/tests/testthat/test.computeMedoidsIndices.R diff --git a/epclust/DESCRIPTION b/epclust/DESCRIPTION index 1c6c51d..7fd3410 100644 --- a/epclust/DESCRIPTION +++ b/epclust/DESCRIPTION @@ -32,7 +32,8 @@ Suggests: MASS, clue, wmtsa, - DBI + DBI, + digest License: MIT + file LICENSE RoxygenNote: 6.0.1 Collate: diff --git a/epclust/R/A_NAMESPACE.R b/epclust/R/A_NAMESPACE.R index 8407d23..e9aa830 100644 --- a/epclust/R/A_NAMESPACE.R +++ b/epclust/R/A_NAMESPACE.R @@ -4,7 +4,6 @@ #' #' @useDynLib epclust #' -#' @importFrom Rcpp evalCpp sourceCpp #' @importFrom Rwave cwt #' @importFrom cluster pam #' @importFrom parallel makeCluster clusterExport parLapply stopCluster diff --git a/epclust/R/clustering.R b/epclust/R/clustering.R index 8be8715..bea073a 100644 --- a/epclust/R/clustering.R +++ b/epclust/R/clustering.R @@ -5,20 +5,17 @@ #' @title Two-stage clustering, withing one task (see \code{claws()}) #' #' @description \code{clusteringTask1()} runs one full stage-1 task, which consists in -#' iterated stage 1 clustering (on nb_curves / ntasks energy contributions, computed -#' through discrete wavelets coefficients). -#' \code{clusteringTask2()} runs a full stage-2 task, which consists in synchrones -#' and then WER distances computations, before applying the clustering algorithm. -#' \code{computeClusters1()} and \code{computeClusters2()} correspond to the atomic -#' clustering procedures respectively for stage 1 and 2. The former applies the -#' first clustering algorithm on a contributions matrix, while the latter clusters -#' a set of series inside one task (~nb_items_clust1) +#' iterated stage 1 clustering on nb_curves / ntasks energy contributions, computed +#' through discrete wavelets coefficients. +#' \code{clusteringTask2()} runs a full stage-2 task, which consists in +#' WER distances computations between medoids indices output from stage 1, +#' before applying the second clustering algorithm, on the distances matrix. #' -#' @param indices Range of series indices to cluster in parallel (initial data) +#' @param indices Range of series indices to cluster #' @param getContribs Function to retrieve contributions from initial series indices: -#' \code{getContribs(indices)} outpus a contributions matrix -#' @inheritParams computeSynchrones +#' \code{getContribs(indices)} outputs a contributions matrix #' @inheritParams claws +#' @inheritParams computeSynchrones #' #' @return For \code{clusteringTask1()}, the indices of the computed (K1) medoids. #' Indices are irrelevant for stage 2 clustering, thus \code{clusteringTask2()} @@ -27,7 +24,7 @@ NULL #' @rdname clustering #' @export -clusteringTask1 = function(indices, getContribs, K1, algoClust1, nb_items_clust1, +clusteringTask1 = function(indices, getContribs, K1, algoClust1, nb_series_per_chunk, ncores_clust=1, verbose=FALSE, parll=TRUE) { if (parll) @@ -39,7 +36,7 @@ clusteringTask1 = function(indices, getContribs, K1, algoClust1, nb_items_clust1 while (length(indices) > K1) { # Balance tasks by splitting the indices set - as evenly as possible - indices_workers = .spreadIndices(indices, nb_items_clust1) + indices_workers = .splitIndices(indices, nb_items_clust1) if (verbose) cat(paste("*** [iterated] Clustering task 1 on ",length(indices)," series\n", sep="")) indices <- @@ -65,8 +62,8 @@ clusteringTask1 = function(indices, getContribs, K1, algoClust1, nb_items_clust1 #' @rdname clustering #' @export -clusteringTask2 = function(medoids, K2, algoClust2, getRefSeries, nb_ref_curves, - nb_series_per_chunk, nvoice, nbytes,endian,ncores_clust=1,verbose=FALSE,parll=TRUE) +clusteringTask2 = function(indices, getSeries, K2, algoClust2, nb_series_per_chunk, + nvoice, nbytes, endian, ncores_clust=1, verbose=FALSE, parll=TRUE) { if (verbose) cat(paste("*** Clustering task 2 on ",ncol(medoids)," synchrones\n", sep="")) @@ -88,254 +85,3 @@ clusteringTask2 = function(medoids, K2, algoClust2, getRefSeries, nb_ref_curves, cat(paste("*** algoClust2() on ",nrow(distances)," items\n", sep="")) medoids[ ,algoClust2(distances,K2) ] } - -#' computeSynchrones -#' -#' Compute the synchrones curves (sum of clusters elements) from a matrix of medoids, -#' using euclidian distance. -#' -#' @param medoids big.matrix of medoids (curves of same length as initial series) -#' @param getRefSeries Function to retrieve initial series (e.g. in stage 2 after series -#' have been replaced by stage-1 medoids) -#' @param nb_ref_curves How many reference series? (This number is known at this stage) -#' @inheritParams claws -#' -#' @return A big.matrix of size L x K1 where L = length of a serie -#' -#' @export -computeSynchrones = function(medoids, getRefSeries, nb_ref_curves, - nb_series_per_chunk, ncores_clust=1,verbose=FALSE,parll=TRUE) -{ - # Synchrones computation is embarassingly parallel: compute it by chunks of series - computeSynchronesChunk = function(indices) - { - if (parll) - { - require("bigmemory", quietly=TRUE) - requireNamespace("synchronicity", quietly=TRUE) - require("epclust", quietly=TRUE) - # The big.matrix objects need to be attached to be usable on the workers - synchrones <- bigmemory::attach.big.matrix(synchrones_desc) - medoids <- bigmemory::attach.big.matrix(medoids_desc) - m <- synchronicity::attach.mutex(m_desc) - } - - # Obtain a chunk of reference series - ref_series = getRefSeries(indices) - nb_series = ncol(ref_series) - - # Get medoids indices for this chunk of series - mi = computeMedoidsIndices(medoids@address, ref_series) - - # Update synchrones using mi above - for (i in seq_len(nb_series)) - { - if (parll) - synchronicity::lock(m) #locking required because several writes at the same time - synchrones[, mi[i] ] = synchrones[, mi[i] ] + ref_series[,i] - if (parll) - synchronicity::unlock(m) - } - NULL - } - - K = ncol(medoids) ; L = nrow(medoids) - # Use bigmemory (shared==TRUE by default) + synchronicity to fill synchrones in // - synchrones = bigmemory::big.matrix(nrow=L, ncol=K, type="double", init=0.) - # NOTE: synchronicity is only for Linux & MacOS; on Windows: run sequentially - parll = (parll && requireNamespace("synchronicity",quietly=TRUE) - && Sys.info()['sysname'] != "Windows") - if (parll) - { - m <- synchronicity::boost.mutex() #for lock/unlock, see computeSynchronesChunk - # mutex and big.matrix objects cannot be passed directly: - # they will be accessed from their description - m_desc <- synchronicity::describe(m) - synchrones_desc = bigmemory::describe(synchrones) - medoids_desc = bigmemory::describe(medoids) - cl = parallel::makeCluster(ncores_clust) - parallel::clusterExport(cl, envir=environment(), - varlist=c("synchrones_desc","m_desc","medoids_desc","getRefSeries")) - } - - if (verbose) - cat(paste("--- Compute ",K," synchrones with ",nb_ref_curves," series\n", sep="")) - - # Balance tasks by splitting the indices set - maybe not so evenly, but - # max==TRUE in next call ensures that no set has more than nb_series_per_chunk items. - indices_workers = .spreadIndices(seq_len(nb_ref_curves), nb_series_per_chunk, max=TRUE) - ignored <- - if (parll) - parallel::parLapply(cl, indices_workers, computeSynchronesChunk) - else - lapply(indices_workers, computeSynchronesChunk) - - if (parll) - parallel::stopCluster(cl) - - return (synchrones) -} - -#' computeWerDists -#' -#' Compute the WER distances between the synchrones curves (in columns), which are -#' returned (e.g.) by \code{computeSynchrones()} -#' -#' @param synchrones A big.matrix of synchrones, in columns. The series have same -#' length as the series in the initial dataset -#' @inheritParams claws -#' -#' @return A distances matrix of size K1 x K1 -#' -#' @export -computeWerDists = function(synchrones, nvoice, nbytes,endian,ncores_clust=1, - verbose=FALSE,parll=TRUE) -{ - n <- ncol(synchrones) - L <- nrow(synchrones) - noctave = ceiling(log2(L)) #min power of 2 to cover serie range - - # Initialize result as a square big.matrix of size 'number of synchrones' - Xwer_dist <- bigmemory::big.matrix(nrow=n, ncol=n, type="double") - - # Generate n(n-1)/2 pairs for WER distances computations - pairs = list() - V = seq_len(n) - for (i in 1:n) - { - V = V[-1] - pairs = c(pairs, lapply(V, function(v) c(i,v))) - } - - cwt_file = ".cwt.bin" - # Compute the synchrones[,index] CWT, and store it in the binary file above - computeSaveCWT = function(index) - { - if (parll && !exists(synchrones)) #avoid going here after first call on a worker - { - require("bigmemory", quietly=TRUE) - require("Rwave", quietly=TRUE) - require("epclust", quietly=TRUE) - synchrones <- bigmemory::attach.big.matrix(synchrones_desc) - } - ts <- scale(ts(synchrones[,index]), center=TRUE, scale=FALSE) - ts_cwt = Rwave::cwt(ts, noctave, nvoice, w0=2*pi, twoD=TRUE, plot=FALSE) - - # Serialization - binarize(as.matrix(c(as.double(Re(ts_cwt)),as.double(Im(ts_cwt)))), cwt_file, 1, - ",", nbytes, endian) - } - - if (parll) - { - cl = parallel::makeCluster(ncores_clust) - synchrones_desc <- bigmemory::describe(synchrones) - Xwer_dist_desc <- bigmemory::describe(Xwer_dist) - parallel::clusterExport(cl, varlist=c("parll","synchrones_desc","Xwer_dist_desc", - "noctave","nvoice","verbose","getCWT"), envir=environment()) - } - - if (verbose) - cat(paste("--- Precompute and serialize synchrones CWT\n", sep="")) - - ignored <- - if (parll) - parallel::parLapply(cl, 1:n, computeSaveCWT) - else - lapply(1:n, computeSaveCWT) - - # Function to retrieve a synchrone CWT from (binary) file - getSynchroneCWT = function(index, L) - { - flat_cwt <- getDataInFile(index, cwt_file, nbytes, endian) - cwt_length = length(flat_cwt) / 2 - re_part = as.matrix(flat_cwt[1:cwt_length], nrow=L) - im_part = as.matrix(flat_cwt[(cwt_length+1):(2*cwt_length)], nrow=L) - re_part + 1i * im_part - } - - # Compute distance between columns i and j in synchrones - computeDistanceIJ = function(pair) - { - if (parll) - { - # parallel workers start with an empty environment - require("bigmemory", quietly=TRUE) - require("epclust", quietly=TRUE) - synchrones <- bigmemory::attach.big.matrix(synchrones_desc) - Xwer_dist <- bigmemory::attach.big.matrix(Xwer_dist_desc) - } - - i = pair[1] ; j = pair[2] - if (verbose && j==i+1 && !parll) - cat(paste(" Distances (",i,",",j,"), (",i,",",j+1,") ...\n", sep="")) - - # Compute CWT of columns i and j in synchrones - L = nrow(synchrones) - cwt_i <- getSynchroneCWT(i, L) - cwt_j <- getSynchroneCWT(j, L) - - # Compute the ratio of integrals formula 5.6 for WER^2 - # in https://arxiv.org/abs/1101.4744v2 §5.3 - num <- filterMA(Mod(cwt_i * Conj(cwt_j))) - WX <- filterMA(Mod(cwt_i * Conj(cwt_i))) - WY <- filterMA(Mod(cwt_j * Conj(cwt_j))) - wer2 <- sum(colSums(num)^2) / sum(colSums(WX) * colSums(WY)) - - Xwer_dist[i,j] <- sqrt(L * ncol(cwt_i) * (1 - wer2)) - Xwer_dist[j,i] <- Xwer_dist[i,j] - Xwer_dist[i,i] <- 0. - } - - if (verbose) - cat(paste("--- Compute WER distances\n", sep="")) - - ignored <- - if (parll) - parallel::parLapply(cl, pairs, computeDistanceIJ) - else - lapply(pairs, computeDistanceIJ) - - if (parll) - parallel::stopCluster(cl) - - unlink(cwt_file) - - Xwer_dist[n,n] = 0. - Xwer_dist[,] #~small matrix K1 x K1 -} - -# Helper function to divide indices into balanced sets -# If max == TRUE, sets sizes cannot exceed nb_per_set -.spreadIndices = function(indices, nb_per_set, max=FALSE) -{ - L = length(indices) - nb_workers = floor( L / nb_per_set ) - rem = L %% nb_per_set - if (nb_workers == 0 || (nb_workers==1 && rem==0)) - { - # L <= nb_per_set, simple case - indices_workers = list(indices) - } - else - { - indices_workers = lapply( seq_len(nb_workers), function(i) - indices[(nb_per_set*(i-1)+1):(nb_per_set*i)] ) - - if (max) - { - # Sets are not so well balanced, but size is supposed to be critical - return ( c( indices_workers, if (rem>0) list((L-rem+1):L) else NULL ) ) - } - - # Spread the remaining load among the workers - rem = L %% nb_per_set - while (rem > 0) - { - index = rem%%nb_workers + 1 - indices_workers[[index]] = c(indices_workers[[index]], indices[L-rem+1]) - rem = rem - 1 - } - } - indices_workers -} diff --git a/epclust/R/computeSynchrones.R b/epclust/R/computeSynchrones.R new file mode 100644 index 0000000..f73e64e --- /dev/null +++ b/epclust/R/computeSynchrones.R @@ -0,0 +1,89 @@ +#' computeSynchrones +#' +#' Compute the synchrones curves (sum of clusters elements) from a matrix of medoids, +#' using euclidian distance. +#' +#' @param medoids matrix of medoids in columns (curves of same length as the series) +#' @param getSeries Function to retrieve series (argument: 'indices', integer vector) +#' @param nb_curves How many series? (this is known, at this stage) +#' @inheritParams claws +#' +#' @return A matrix of K synchrones in columns (same length as the series) +#' +#' @export +computeSynchrones = function(medoids, getSeries, nb_curves, + nb_series_per_chunk, ncores_clust=1,verbose=FALSE,parll=TRUE) +{ + # Synchrones computation is embarassingly parallel: compute it by chunks of series + computeSynchronesChunk = function(indices) + { + if (parll) + { + require("bigmemory", quietly=TRUE) + requireNamespace("synchronicity", quietly=TRUE) + require("epclust", quietly=TRUE) + # The big.matrix objects need to be attached to be usable on the workers + synchrones <- bigmemory::attach.big.matrix(synchrones_desc) + medoids <- bigmemory::attach.big.matrix(medoids_desc) + m <- synchronicity::attach.mutex(m_desc) + } + + # Obtain a chunk of reference series + series_chunk = getSeries(indices) + nb_series_chunk = ncol(series_chunk) + + # Get medoids indices for this chunk of series + for (i in seq_len(nb_series_chunk)) + mi[i] <- which.min( colSums( sweep(medoids, 1, series_chunk[,i], '-')^2 ) ) + + # Update synchrones using mi above, grouping it by values of mi (in 1...K) + # to avoid too many lock/unlock + for (i in seq_len(K)) + { + # lock / unlock required because several writes at the same time + if (parll) + synchronicity::lock(m) + synchrones[,i] = synchrones[,i] + rowSums(series_chunk[,mi==i]) + if (parll) + synchronicity::unlock(m) + } + NULL + } + + K = ncol(medoids) + L = nrow(medoids) + # Use bigmemory (shared==TRUE by default) + synchronicity to fill synchrones in // + synchrones = bigmemory::big.matrix(nrow=L, ncol=K, type="double", init=0.) + # NOTE: synchronicity is only for Linux & MacOS; on Windows: run sequentially + parll = (parll && requireNamespace("synchronicity",quietly=TRUE) + && Sys.info()['sysname'] != "Windows") + if (parll) + { + m <- synchronicity::boost.mutex() #for lock/unlock, see computeSynchronesChunk + # mutex and big.matrix objects cannot be passed directly: + # they will be accessed from their description + m_desc <- synchronicity::describe(m) + synchrones_desc = bigmemory::describe(synchrones) + medoids <- bigmemory::as.big.matrix(medoids) + medoids_desc <- bigmemory::describe(medoids) + cl = parallel::makeCluster(ncores_clust) + parallel::clusterExport(cl, envir=environment(), + varlist=c("synchrones_desc","m_desc","medoids_desc","getRefSeries")) + } + + if (verbose) + cat(paste("--- Compute ",K," synchrones with ",nb_curves," series\n", sep="")) + + # Balance tasks by splitting 1:nb_ref_curves into groups of size <= nb_series_per_chunk + indices_workers = .splitIndices(seq_len(nb_ref_curves), nb_series_per_chunk) + ignored <- + if (parll) + parallel::parLapply(cl, indices_workers, computeSynchronesChunk) + else + lapply(indices_workers, computeSynchronesChunk) + + if (parll) + parallel::stopCluster(cl) + + return (synchrones[,]) +} diff --git a/epclust/R/computeWerDists.R b/epclust/R/computeWerDists.R new file mode 100644 index 0000000..aae1cc1 --- /dev/null +++ b/epclust/R/computeWerDists.R @@ -0,0 +1,141 @@ +#' computeWerDists +#' +#' Compute the WER distances between the synchrones curves (in columns), which are +#' returned (e.g.) by \code{computeSynchrones()} +#' +#' @param synchrones A big.matrix of synchrones, in columns. The series have same +#' length as the series in the initial dataset +#' @inheritParams claws +#' +#' @return A distances matrix of size K1 x K1 +#' +#' @export +computeWerDists = function(synchrones, nvoice, nbytes,endian,ncores_clust=1, + verbose=FALSE,parll=TRUE) +{ + n <- ncol(synchrones) + L <- nrow(synchrones) + noctave = ceiling(log2(L)) #min power of 2 to cover serie range + + # Initialize result as a square big.matrix of size 'number of synchrones' + Xwer_dist <- bigmemory::big.matrix(nrow=n, ncol=n, type="double") + + # Generate n(n-1)/2 pairs for WER distances computations + pairs = list() + V = seq_len(n) + for (i in 1:n) + { + V = V[-1] + pairs = c(pairs, lapply(V, function(v) c(i,v))) + } + + cwt_file = ".cwt.bin" + # Compute the synchrones[,indices] CWT, and store the results in the binary file + computeSaveCWT = function(indices) + { + if (parll) + { + require("bigmemory", quietly=TRUE) + require("Rwave", quietly=TRUE) + require("epclust", quietly=TRUE) + synchrones <- bigmemory::attach.big.matrix(synchrones_desc) + } + + # Obtain CWT as big vectors of real part + imaginary part (concatenate) + ts_cwt <- sapply(indices, function(i) { + ts <- scale(ts(synchrones[,i]), center=TRUE, scale=FALSE) + ts_cwt <- Rwave::cwt(ts, noctave, nvoice, w0=2*pi, twoD=TRUE, plot=FALSE) + c( as.double(Re(ts_cwt)),as.double(Im(ts_cwt)) ) + }) + + # Serialization + binarize(ts_cwt, cwt_file, 1, ",", nbytes, endian) + } + + if (parll) + { + cl = parallel::makeCluster(ncores_clust) + synchrones_desc <- bigmemory::describe(synchrones) + Xwer_dist_desc <- bigmemory::describe(Xwer_dist) + parallel::clusterExport(cl, varlist=c("parll","synchrones_desc","Xwer_dist_desc", + "noctave","nvoice","verbose","getCWT"), envir=environment()) + } + + if (verbose) + cat(paste("--- Precompute and serialize synchrones CWT\n", sep="")) + + ignored <- + if (parll) + parallel::parLapply(cl, 1:n, computeSaveCWT) + else + lapply(1:n, computeSaveCWT) + + # Function to retrieve a synchrone CWT from (binary) file + getSynchroneCWT = function(index, L) + { + flat_cwt <- getDataInFile(index, cwt_file, nbytes, endian) + cwt_length = length(flat_cwt) / 2 + re_part = as.matrix(flat_cwt[1:cwt_length], nrow=L) + im_part = as.matrix(flat_cwt[(cwt_length+1):(2*cwt_length)], nrow=L) + re_part + 1i * im_part + } + + + + +#TODO: better repartition here, + #better code in .splitIndices :: never exceed nb_per_chunk; arg: min_per_chunk (default: 1) +###TODO: reintroduire nb_items_clust ======> l'autre est typiquement + grand !!! (pas de relation !) + + + + # Compute distance between columns i and j in synchrones + computeDistanceIJ = function(pair) + { + if (parll) + { + # parallel workers start with an empty environment + require("bigmemory", quietly=TRUE) + require("epclust", quietly=TRUE) + synchrones <- bigmemory::attach.big.matrix(synchrones_desc) + Xwer_dist <- bigmemory::attach.big.matrix(Xwer_dist_desc) + } + + i = pair[1] ; j = pair[2] + if (verbose && j==i+1 && !parll) + cat(paste(" Distances (",i,",",j,"), (",i,",",j+1,") ...\n", sep="")) + + # Compute CWT of columns i and j in synchrones + L = nrow(synchrones) + cwt_i <- getSynchroneCWT(i, L) + cwt_j <- getSynchroneCWT(j, L) + + # Compute the ratio of integrals formula 5.6 for WER^2 + # in https://arxiv.org/abs/1101.4744v2 §5.3 + num <- filterMA(Mod(cwt_i * Conj(cwt_j))) + WX <- filterMA(Mod(cwt_i * Conj(cwt_i))) + WY <- filterMA(Mod(cwt_j * Conj(cwt_j))) + wer2 <- sum(colSums(num)^2) / sum(colSums(WX) * colSums(WY)) + + Xwer_dist[i,j] <- sqrt(L * ncol(cwt_i) * (1 - wer2)) + Xwer_dist[j,i] <- Xwer_dist[i,j] + Xwer_dist[i,i] <- 0. + } + + if (verbose) + cat(paste("--- Compute WER distances\n", sep="")) + + ignored <- + if (parll) + parallel::parLapply(cl, pairs, computeDistanceIJ) + else + lapply(pairs, computeDistanceIJ) + + if (parll) + parallel::stopCluster(cl) + + unlink(cwt_file) + + Xwer_dist[n,n] = 0. + Xwer_dist[,] #~small matrix K1 x K1 +} diff --git a/epclust/R/main.R b/epclust/R/main.R index f666267..603f7bf 100644 --- a/epclust/R/main.R +++ b/epclust/R/main.R @@ -11,31 +11,30 @@ #' \item Divide series into \code{ntasks} groups to process in parallel. In each task: #' \enumerate{ #' \item iterate the first clustering algorithm on its aggregated outputs, -#' on inputs of size \code{nb_items_clust1} +#' on inputs of size \code{nb_series_per_chunk} #' \item optionally, if WER=="mix": #' a) compute the K1 synchrones curves, -#' b) compute WER distances (K1xK1 matrix) between synchrones and -#' c) apply the second clustering algorithm +#' a) compute WER distances (K1xK1 matrix) between medoids and +#' b) apply the second clustering algorithm (output: K2 indices) #' } #' \item Launch a final task on the aggregated outputs of all previous tasks: -#' in the case WER=="end" this task takes indices in input, otherwise -#' (medoid) curves +#' ntasks*K1 if WER=="end", ntasks*K2 otherwise +#' \item Compute synchrones (sum of series within each final group) #' } #' \cr -#' The main argument -- \code{getSeries} -- has a quite misleading name, since it can be -#' either a [big.]matrix, a CSV file, a connection or a user function to retrieve -#' series; the name was chosen because all types of arguments are converted to a function. -#' When \code{getSeries} is given as a function, it must take a single argument, +#' The main argument -- \code{series} -- has a quite misleading name, since it can be +#' either a [big.]matrix, a CSV file, a connection or a user function to retrieve series. +#' When \code{series} is given as a function, it must take a single argument, #' 'indices', integer vector equal to the indices of the curves to retrieve; -#' see SQLite example. The nature and role of other arguments should be clear. +#' see SQLite example. #' WARNING: the return value must be a matrix (in columns), or NULL if no matches. #' \cr #' Note: Since we don't make assumptions on initial data, there is a possibility that -#' even when serialized, contributions or synchrones do not fit in RAM. For example, +#' even when serialized, contributions do not fit in RAM. For example, #' 30e6 series of length 100,000 would lead to a +4Go contribution matrix. Therefore, #' it's safer to place these in (binary) files; that's what we do. #' -#' @param getSeries Access to the (time-)series, which can be of one of the three +#' @param series Access to the (time-)series, which can be of one of the three #' following types: #' \itemize{ #' \item [big.]matrix: each column contains the (time-ordered) values of one time-serie @@ -46,7 +45,8 @@ #' } #' @param K1 Number of clusters to be found after stage 1 (K1 << N [number of series]) #' @param K2 Number of clusters to be found after stage 2 (K2 << K1) -#' @param nb_series_per_chunk (Maximum) number of series to retrieve in one batch +#' @param nb_series_per_chunk (Maximum) number of series to retrieve in one batch; +#' this value is also used for the (maximum) number of series to cluster at a time #' @param algoClust1 Clustering algorithm for stage 1. A function which takes (data, K) #' as argument where data is a matrix in columns and K the desired number of clusters, #' and outputs K medoids ranks. Default: PAM. In our method, this function is called @@ -54,10 +54,7 @@ #' @param algoClust2 Clustering algorithm for stage 2. A function which takes (dists, K) #' as argument where dists is a matrix of distances and K the desired number of clusters, #' and outputs K medoids ranks. Default: PAM. In our method, this function is called -#' on a matrix of K1 x K1 (WER) distances computed between synchrones -#' @param nb_items_clust1 (~Maximum) number of items in input of the clustering algorithm -#' for stage 1. At worst, a clustering algorithm might be called with ~2*nb_items_clust1 -#' items; but this could only happen at the last few iterations. +#' on a matrix of K1 x K1 (WER) distances computed between medoids after algorithm 1 #' @param wav_filt Wavelet transform filter; see ?wavelets::wt.filter #' @param contrib_type Type of contribution: "relative", "logit" or "absolute" (any prefix) #' @param WER "end" to apply stage 2 after stage 1 has fully iterated, or "mix" to apply @@ -68,14 +65,19 @@ #' or K2 [if WER=="mix"] medoids); default: 1. #' Note: ntasks << N (number of series), so that N is "roughly divisible" by ntasks #' @param ncores_tasks Number of parallel tasks (1 to disable: sequential tasks) -#' @param ncores_clust Number of parallel clusterings in one task (4 should be a minimum) +#' @param ncores_clust Number of parallel clusterings in one task (3 should be a minimum) #' @param sep Separator in CSV input file (if any provided) #' @param nbytes Number of bytes to serialize a floating-point number; 4 or 8 #' @param endian Endianness for (de)serialization ("little" or "big") #' @param verbose Level of verbosity (0/FALSE for nothing or 1/TRUE for all; devel stage) #' @param parll TRUE to fully parallelize; otherwise run sequentially (debug, comparison) #' -#' @return A matrix of the final K2 medoids curves, in columns +#' @return A list with +#' \itemize{ +#' medoids: a matrix of the final K2 medoids curves, in columns +#' ranks: corresponding indices in the dataset +#' synchrones: a matrix of the K2 sum of series within each final group +#' } #' #' @references Clustering functional data using Wavelets [2013]; #' A. Antoniadis, X. Brossat, J. Cugliari & J.-M. Poggi. @@ -94,12 +96,12 @@ #' series = do.call( cbind, lapply( 1:6, function(i) #' do.call(cbind, wmtsa::wavBootstrap(ref_series[,i], n.realization=400)) ) ) #' #dim(series) #c(2400,10001) -#' medoids_ascii = claws(series, K1=60, K2=6, 200, verbose=TRUE) +#' res_ascii = claws(series, K1=60, K2=6, 200, verbose=TRUE) #' #' # Same example, from CSV file #' csv_file = "/tmp/epclust_series.csv" #' write.table(series, csv_file, sep=",", row.names=FALSE, col.names=FALSE) -#' medoids_csv = claws(csv_file, K1=60, K2=6, 200) +#' res_csv = claws(csv_file, K1=60, K2=6, 200) #' #' # Same example, from binary file #' bin_file <- "/tmp/epclust_series.bin" @@ -107,7 +109,7 @@ #' endian <- "little" #' binarize(csv_file, bin_file, 500, nbytes, endian) #' getSeries <- function(indices) getDataInFile(indices, bin_file, nbytes, endian) -#' medoids_bin <- claws(getSeries, K1=60, K2=6, 200) +#' res_bin <- claws(getSeries, K1=60, K2=6, 200) #' unlink(csv_file) #' unlink(bin_file) #' @@ -137,29 +139,30 @@ #' else #' NULL #' } -#' medoids_db = claws(getSeries, K1=60, K2=6, 200)) +#' res_db = claws(getSeries, K1=60, K2=6, 200)) #' dbDisconnect(series_db) #' -#' # All computed medoids should be the same: -#' digest::sha1(medoids_ascii) -#' digest::sha1(medoids_csv) -#' digest::sha1(medoids_bin) -#' digest::sha1(medoids_db) +#' # All results should be the same: +#' library(digest) +#' digest::sha1(res_ascii) +#' digest::sha1(res_csv) +#' digest::sha1(res_bin) +#' digest::sha1(res_db) #' } #' @export -claws <- function(getSeries, K1, K2, nb_series_per_chunk, nb_items_clust1=7*K1, - algoClust1=function(data,K) cluster::pam(t(data),K,diss=FALSE)$id.med, - algoClust2=function(dists,K) cluster::pam(dists,K,diss=TRUE)$id.med, +claws <- function(getSeries, K1, K2, nb_series_per_chunk, + algoClust1=function(data,K) cluster::pam(t(data),K,diss=FALSE,pamonce=1)$id.med, + algoClust2=function(dists,K) cluster::pam(dists,K,diss=TRUE,pamonce=1)$id.med, wav_filt="d8", contrib_type="absolute", WER="end", nvoice=4, random=TRUE, ntasks=1, ncores_tasks=1, ncores_clust=4, sep=",", nbytes=4, endian=.Platform$endian, verbose=FALSE, parll=TRUE) { # Check/transform arguments - if (!is.matrix(getSeries) && !bigmemory::is.big.matrix(getSeries) - && !is.function(getSeries) - && !methods::is(getSeries,"connection") && !is.character(getSeries)) + if (!is.matrix(series) && !bigmemory::is.big.matrix(series) + && !is.function(series) + && !methods::is(series,"connection") && !is.character(series)) { - stop("'getSeries': [big]matrix, function, file or valid connection (no NA)") + stop("'series': [big]matrix, function, file or valid connection (no NA)") } K1 <- .toInteger(K1, function(x) x>=2) K2 <- .toInteger(K2, function(x) x>=2) @@ -168,7 +171,6 @@ claws <- function(getSeries, K1, K2, nb_series_per_chunk, nb_items_clust1=7*K1, # to load K1 series in memory for clustering stage 2. if (K1 > nb_series_per_chunk) stop("'K1' cannot exceed 'nb_series_per_chunk'") - nb_items_clust1 <- .toInteger(nb_items_clust1, function(x) x>K1) random <- .toLogical(random) tryCatch( {ignored <- wavelets::wt.filter(wav_filt)}, error = function(e) stop("Invalid wavelet filter; see ?wavelets::wt.filter") ) @@ -188,21 +190,23 @@ claws <- function(getSeries, K1, K2, nb_series_per_chunk, nb_items_clust1=7*K1, verbose <- .toLogical(verbose) parll <- .toLogical(parll) - # Binarize series if getSeries is not a function; the aim is to always use a function, + # Binarize series if it is not a function; the aim is to always use a function, # to uniformize treatments. An equally good alternative would be to use a file-backed # bigmemory::big.matrix, but it would break the "all-is-function" pattern. - if (!is.function(getSeries)) + if (!is.function(series)) { if (verbose) cat("...Serialize time-series (or retrieve past binary file)\n") - series_file = ".series.bin" + series_file = ".series.epclust.bin" if (!file.exists(series_file)) - binarize(getSeries, series_file, nb_series_per_chunk, sep, nbytes, endian) + binarize(series, series_file, nb_series_per_chunk, sep, nbytes, endian) getSeries = function(inds) getDataInFile(inds, series_file, nbytes, endian) } + else + getSeries = series # Serialize all computed wavelets contributions into a file - contribs_file = ".contribs.bin" + contribs_file = ".contribs.epclust.bin" index = 1 nb_curves = 0 if (verbose) @@ -210,7 +214,7 @@ claws <- function(getSeries, K1, K2, nb_series_per_chunk, nb_items_clust1=7*K1, if (!file.exists(contribs_file)) { nb_curves = binarizeTransform(getSeries, - function(series) curvesToContribs(series, wav_filt, contrib_type), + function(curves) curvesToContribs(curves, wav_filt, contrib_type), contribs_file, nb_series_per_chunk, nbytes, endian) } else @@ -243,12 +247,9 @@ claws <- function(getSeries, K1, K2, nb_series_per_chunk, nb_items_clust1=7*K1, # Initialize parallel runs: outfile="" allow to output verbose traces in the console # under Linux. All necessary variables are passed to the workers. cl = parallel::makeCluster(ncores_tasks, outfile="") - varlist = c("getSeries","getContribs","K1","K2","algoClust1","algoClust2", - "nb_series_per_chunk","nb_items_clust1","ncores_clust", - "nvoice","sep","nbytes","endian","verbose","parll") - if (WER=="mix" && ntasks>1) - varlist = c(varlist, "medoids_file") - parallel::clusterExport(cl, varlist, envir = environment()) + parallel::clusterExport(cl, envir = environment(), + varlist = c("getSeries","getContribs","K1","K2","algoClust1","algoClust2", + "nb_series_per_chunk","ncores_clust","nvoice","nbytes","endian","verbose","parll")) } # This function achieves one complete clustering task, divided in stage 1 + stage 2. @@ -261,27 +262,16 @@ claws <- function(getSeries, K1, K2, nb_series_per_chunk, nb_items_clust1=7*K1, # packages, and pass useful variables. if (parll && ntasks>1) require("epclust", quietly=TRUE) - indices_medoids = clusteringTask1( - inds, getContribs, K1, algoClust1, nb_series_per_chunk, ncores_clust, verbose, parll) - if (WER=="mix" && ntasks>1) + indices_medoids = clusteringTask1(inds, getContribs, K1, algoClust1, + nb_series_per_chunk, ncores_clust, verbose, parll) + if (WER=="mix") { - if (parll) - require("bigmemory", quietly=TRUE) - medoids1 = bigmemory::as.big.matrix( getSeries(indices_medoids) ) - medoids2 = clusteringTask2(medoids1, K2, algoClust2, getSeries, nb_curves, + indices_medoids = clusteringTask2(indices_medoids, getSeries, K2, algoClust2, nb_series_per_chunk, nvoice, nbytes, endian, ncores_clust, verbose, parll) - binarize(medoids2, medoids_file, nb_series_per_chunk, sep, nbytes, endian) - return (vector("integer",0)) } indices_medoids } - # Synchrones (medoids) need to be stored only if WER=="mix"; indeed in this case, every - # task output is a set of new (medoids) curves. If WER=="end" however, output is just a - # set of indices, representing some initial series. - if (WER=="mix" && ntasks>1) - {medoids_file = ".medoids.bin" ; unlink(medoids_file)} - if (verbose) { message = paste("...Run ",ntasks," x stage 1", sep="") @@ -292,110 +282,32 @@ claws <- function(getSeries, K1, K2, nb_series_per_chunk, nb_items_clust1=7*K1, # As explained above, indices will be assigned to ntasks*K1 medoids indices [if WER=="end"], # or nothing (empty vector) if WER=="mix"; in this case, synchrones are stored in a file. - indices <- + indices_medoids_all <- if (parll && ntasks>1) unlist( parallel::parLapply(cl, indices_tasks, runTwoStepClustering) ) else unlist( lapply(indices_tasks, runTwoStepClustering) ) + if (parll && ntasks>1) parallel::stopCluster(cl) - # Right before the final stage, two situations are possible: - # a. data to be processed now sit in a binary format in medoids_file (if WER=="mix") - # b. data still is the initial set of curves, referenced by the ntasks*K1 indices - # So, the function getSeries() will potentially change. However, computeSynchrones() - # requires a function retrieving the initial series. Thus, the next line saves future - # conditional instructions. - getRefSeries = getSeries - - if (WER=="mix" && ntasks>1) - { - indices = seq_len(ntasks*K2) - # Now series (synchrones) must be retrieved from medoids_file - getSeries = function(inds) getDataInFile(inds, medoids_file, nbytes, endian) - # Contributions must be re-computed - unlink(contribs_file) - index = 1 - if (verbose) - cat("...Serialize contributions computed on synchrones\n") - ignored = binarizeTransform(getSeries, - function(series) curvesToContribs(series, wav_filt, contrib_type), - contribs_file, nb_series_per_chunk, nbytes, endian) - } + # Right before the final stage, input data still is the initial set of curves, referenced + # by the ntasks*[K1 or K2] medoids indices. - # Run step2 on resulting indices or series (from file) + # Run last clustering tasks to obtain only K2 medoids indices, from ntasks*[K1 or K2] + # indices, depending wether WER=="end" or WER=="mix" if (verbose) cat("...Run final // stage 1 + stage 2\n") - indices_medoids = clusteringTask1(indices, getContribs, K1, algoClust1, + indices_medoids = clusteringTask1(indices_medoids_all, getContribs, K1, algoClust1, nb_series_per_chunk, ncores_tasks*ncores_clust, verbose, parll) - medoids1 = bigmemory::as.big.matrix( getSeries(indices_medoids) ) - medoids2 = clusteringTask2(medoids1, K2, algoClust2, getRefSeries, nb_curves, + indices_medoids = clusteringTask2(indices_medoids, getContribs, K2, algoClust2, nb_series_per_chunk, nvoice, nbytes, endian, ncores_tasks*ncores_clust, verbose, parll) - # Cleanup: remove temporary binary files - tryCatch( - {unlink(series_file); unlink(contribs_file); unlink(medoids_file)}, - error = function(e) {}) + # Compute synchrones + medoids = getSeries(indices_medoids) + synchrones = computeSynchrones(medoids, getSeries, nb_curves, nb_series_per_chunk, + ncores_tasks*ncores_clust, verbose, parll) - # Return medoids as a standard matrix, since K2 series have to fit in RAM - # (clustering algorithm 1 takes K1 > K2 of them as input) - medoids2[,] -} - -#' curvesToContribs -#' -#' Compute the discrete wavelet coefficients for each series, and aggregate them in -#' energy contribution across scales as described in https://arxiv.org/abs/1101.4744v2 -#' -#' @param series [big.]matrix of series (in columns), of size L x n -#' @inheritParams claws -#' -#' @return A [big.]matrix of size log(L) x n containing contributions in columns -#' -#' @export -curvesToContribs = function(series, wav_filt, contrib_type, coin=FALSE) -{ - series = as.matrix(series) #1D serie could occur - L = nrow(series) - D = ceiling( log2(L) ) - # Series are interpolated to all have length 2^D - nb_sample_points = 2^D - apply(series, 2, function(x) { - interpolated_curve = spline(1:L, x, n=nb_sample_points)$y - W = wavelets::dwt(interpolated_curve, filter=wav_filt, D)@W - # Compute the sum of squared discrete wavelet coefficients, for each scale - nrj = rev( sapply( W, function(v) ( sqrt( sum(v^2) ) ) ) ) - if (contrib_type!="absolute") - nrj = nrj / sum(nrj) - if (contrib_type=="logit") - nrj = - log(1 - nrj) - nrj - }) -} - -# Check integer arguments with functional conditions -.toInteger <- function(x, condition) -{ - errWarn <- function(ignored) - paste("Cannot convert argument' ",substitute(x),"' to integer", sep="") - if (!is.integer(x)) - tryCatch({x = as.integer(x)[1]; if (is.na(x)) stop()}, - warning = errWarn, error = errWarn) - if (!condition(x)) - { - stop(paste("Argument '",substitute(x), - "' does not verify condition ",body(condition), sep="")) - } - x -} - -# Check logical arguments -.toLogical <- function(x) -{ - errWarn <- function(ignored) - paste("Cannot convert argument' ",substitute(x),"' to logical", sep="") - if (!is.logical(x)) - tryCatch({x = as.logical(x)[1]; if (is.na(x)) stop()}, - warning = errWarn, error = errWarn) - x + # NOTE: no need to use big.matrix here, since there are only K2 << K1 << N remaining curves + list("medoids"=medoids, "ranks"=indices_medoids, "synchrones"=synchrones) } diff --git a/epclust/R/utils.R b/epclust/R/utils.R new file mode 100644 index 0000000..ba643d0 --- /dev/null +++ b/epclust/R/utils.R @@ -0,0 +1,117 @@ +# Check integer arguments with functional conditions +.toInteger <- function(x, condition) +{ + errWarn <- function(ignored) + paste("Cannot convert argument' ",substitute(x),"' to integer", sep="") + if (!is.integer(x)) + tryCatch({x = as.integer(x)[1]; if (is.na(x)) stop()}, + warning = errWarn, error = errWarn) + if (!condition(x)) + { + stop(paste("Argument '",substitute(x), + "' does not verify condition ",body(condition), sep="")) + } + x +} + +# Check logical arguments +.toLogical <- function(x) +{ + errWarn <- function(ignored) + paste("Cannot convert argument' ",substitute(x),"' to logical", sep="") + if (!is.logical(x)) + tryCatch({x = as.logical(x)[1]; if (is.na(x)) stop()}, + warning = errWarn, error = errWarn) + x +} + +#' curvesToContribs +#' +#' Compute the discrete wavelet coefficients for each series, and aggregate them in +#' energy contribution across scales as described in https://arxiv.org/abs/1101.4744v2 +#' +#' @param series [big.]matrix of series (in columns), of size L x n +#' @inheritParams claws +#' +#' @return A matrix of size log(L) x n containing contributions in columns +#' +#' @export +curvesToContribs = function(series, wav_filt, contrib_type, coin=FALSE) +{ + L = nrow(series) + D = ceiling( log2(L) ) + # Series are interpolated to all have length 2^D + nb_sample_points = 2^D + apply(series, 2, function(x) { + interpolated_curve = spline(1:L, x, n=nb_sample_points)$y + W = wavelets::dwt(interpolated_curve, filter=wav_filt, D)@W + # Compute the sum of squared discrete wavelet coefficients, for each scale + nrj = rev( sapply( W, function(v) ( sqrt( sum(v^2) ) ) ) ) + if (contrib_type!="absolute") + nrj = nrj / sum(nrj) + if (contrib_type=="logit") + nrj = - log(1 - nrj) + nrj + }) +} + +# Helper function to divide indices into balanced sets +# If max == TRUE, sets sizes cannot exceed nb_per_set +.splitIndices = function(indices, nb_per_set, max=FALSE) +{ + L = length(indices) + nb_workers = floor( L / nb_per_set ) + rem = L %% nb_per_set + if (nb_workers == 0 || (nb_workers==1 && rem==0)) + { + # L <= nb_per_set, simple case + indices_workers = list(indices) + } + else + { + indices_workers = lapply( seq_len(nb_workers), function(i) + indices[(nb_per_set*(i-1)+1):(nb_per_set*i)] ) + + if (max) + { + # Sets are not so well balanced, but size is supposed to be critical + return ( c( indices_workers, if (rem>0) list((L-rem+1):L) else NULL ) ) + } + + # Spread the remaining load among the workers + rem = L %% nb_per_set + while (rem > 0) + { + index = rem%%nb_workers + 1 + indices_workers[[index]] = c(indices_workers[[index]], indices[L-rem+1]) + rem = rem - 1 + } + } + indices_workers +} + +#' filterMA +#' +#' Filter [time-]series by replacing all values by the moving average of values +#' centered around current one. Border values are averaged with available data. +#' +#' @param M_ A real matrix of size LxD +#' @param w_ The (odd) number of values to average +#' +#' @return The filtered matrix, of same size as the input +#' @export +filterMA = function(M_, w_) + .Call("filterMA", M_, w_, PACKAGE="epclust") + +#' cleanBin +#' +#' Remove binary files to re-generate them at next run of \code{claws()}. +#' Note: run it in the folder where the computations occurred (or no effect). +#' +#' @export +cleanBin <- function() +{ + bin_files = list.files(pattern = "*.epclust.bin", all.files=TRUE) + for (file in bin_files) + unlink(file) +} diff --git a/epclust/inst/CITATION b/epclust/inst/CITATION deleted file mode 100644 index b6f2f6d..0000000 --- a/epclust/inst/CITATION +++ /dev/null @@ -1,18 +0,0 @@ -citHeader("To cite epclust in publications use:") - -citEntry(entry = "Manual", - title = ".", - author = personList(as.person("Benjamin Auder"), - as.person("Jairo Cugliari"), - as.person("Yannig Goude"), - as.person("Jean-Michel Poggi")), - organization = "Paris-Sud, Saclay & Lyon 2", - address = "Orsay, Saclay & Lyon, France", - year = "2017", - url = "https://git.auder.net/?p=edfclust.git", - - textVersion = - paste("Benjamin Auder, Jairo Cugliari, Yannig Goude, Jean-Michel Poggi (2017).", - "EPCLUST: Electric Power curves CLUSTering.", - "URL https://git.auder.net/?p=edfclust.git") -) diff --git a/epclust/src/computeMedoidsIndices.cpp b/epclust/src/computeMedoidsIndices.cpp deleted file mode 100644 index 895031a..0000000 --- a/epclust/src/computeMedoidsIndices.cpp +++ /dev/null @@ -1,57 +0,0 @@ -#include - -// [[Rcpp::depends(BH, bigmemory)]] -#include - -#include -#include - -using namespace Rcpp; - -//' computeMedoidsIndices -//' -//' For each column of the 'series' matrix input, search for the closest medoid -//' (euclidian distance) and store its index -//' -//' @param pMedoids External pointer (a big.matrix 'address' slot in R) -//' @param series (reference) series, a matrix of size Lxn -//' -//' @return An integer vector of the closest medoids indices, for each (column) serie -// [[Rcpp::export]] -IntegerVector computeMedoidsIndices(SEXP pMedoids, NumericMatrix series) -{ - // Turn SEXP external pointer into BigMatrix (description) object - XPtr pMed(pMedoids); - // medoids: access to the content of the BigMatrix object - MatrixAccessor medoids = MatrixAccessor(*pMed); - - int nb_series = series.ncol(), - K = pMed->ncol(), - L = pMed->nrow(); - IntegerVector mi(nb_series); - - for (int i=0; i +#include + +// filterMA +// +// Filter [time-]series by replacing all values by the moving average of values +// centered around current one. Border values are averaged with available data. +// +// @param M_ A real matrix of size LxD +// @param w_ The (odd) number of values to average +// +// @return The filtered matrix, of same size as the input +SEXP filterMA(SEXP M_, SEXP w_) +{ + int L = INTEGER(getAttrib(cwt_, R_DimSymbol))[0], + D = INTEGER(getAttrib(cwt_, R_DimSymbol))[1], + w = INTEGER_VALUE(w_), + half_w = (w-1)/2, + i, + nt; //number of terms in the current sum (max: w) + double *cwt = REAL(cwt_), + cs, //current sum (max: w terms) + left; //leftward term in the current moving sum + + SEXP fcwt_; //the filtered result + PROTECT(fcwt_ = allocMatrix(REALSXP, L, D)); + double* fcwt = REAL(fcwt_); //pointer to the encapsulated vector + + // NOTE: unused loop parameter: shifting at the end of the loop is more efficient + for (int col=D-1; col>=0; col--) + { + // Initialization + nt = half_w + 1; + left = cwt[0]; + cs = 0.; + for (i=half_w; i>=0; i--) + cs += cwt[i]; + + // Left border + for (i=0; i - -using namespace Rcpp; - -//' filter -//' -//' Filter [time-]series by replacing all values by the moving average of previous, current and -//' next value. Possible extensions: larger window, gaussian kernel... (but would be costly!). -//' Note: border values are unchanged. -//' -//' @param cwt Continuous wavelets transform (in columns): a matrix of size LxD -//' -//' @return The filtered CWT, in a matrix of same size (LxD) -// [[Rcpp::export]] -RcppExport SEXP filterMA(SEXP cwt_) -{ - // NOTE: C-style for better efficiency (this must be as fast as possible) - int L = INTEGER(Rf_getAttrib(cwt_, R_DimSymbol))[0], - D = INTEGER(Rf_getAttrib(cwt_, R_DimSymbol))[1]; - double *cwt = REAL(cwt_); - - SEXP fcwt_; //the filtered result - PROTECT(fcwt_ = Rf_allocMatrix(REALSXP, L, D)); - double* fcwt = REAL(fcwt_); //pointer to the encapsulated vector - - // NOTE: unused loop parameter: shifting at the end of the loop is more efficient - for (int col=D-1; col>=0; col--) - { - double v1 = cwt[0]; //first value - double ms = v1 + cwt[1] + cwt[2]; //moving sum at second value - for (int i=1; i1, 2->2, 3->3, 4->1, ..., 149->2, 150->3, ... (if base==3) +I = function(i, base) + (i-1) %% base + 1 diff --git a/epclust/tests/testthat/helper.clustering.R b/epclust/tests/testthat/helper.clustering.R deleted file mode 100644 index f39257e..0000000 --- a/epclust/tests/testthat/helper.clustering.R +++ /dev/null @@ -1,18 +0,0 @@ -# Shorthand: map 1->1, 2->2, 3->3, 4->1, ..., 149->2, 150->3, ... (is base==3) -I = function(i, base) - (i-1) %% base + 1 - -# Compute the sum of (normalized) sum of squares of closest distances to a medoid. -# Note: medoids can be a big.matrix -computeDistortion = function(series, medoids) -{ - if (bigmemory::is.big.matrix(medoids)) - medoids = medoids[,] #extract standard matrix - - n = ncol(series) ; L = nrow(series) - distortion = 0. - for (i in seq_len(n)) - distortion = distortion + min( colSums( sweep(medoids,1,series[,i],'-')^2 ) / L ) - - sqrt( distortion / n ) -} diff --git a/epclust/tests/testthat/helper.computeMedoidsIndices.R b/epclust/tests/testthat/helper.computeMedoidsIndices.R deleted file mode 100644 index cd6a30e..0000000 --- a/epclust/tests/testthat/helper.computeMedoidsIndices.R +++ /dev/null @@ -1,12 +0,0 @@ -# R-equivalent of computeMedoidsIndices, requiring a matrix -# (thus potentially breaking "fit-in-memory" hope) -R_computeMedoidsIndices <- function(medoids, series) -{ - nb_series = ncol(series) #series in columns - - mi = rep(NA,nb_series) - for (i in 1:nb_series) - mi[i] <- which.min( colSums( sweep(medoids, 1, series[,i], '-')^2 ) ) - - mi -} diff --git a/epclust/tests/testthat/test-assignMedoids.R b/epclust/tests/testthat/test-assignMedoids.R new file mode 100644 index 0000000..0192563 --- /dev/null +++ b/epclust/tests/testthat/test-assignMedoids.R @@ -0,0 +1,37 @@ +context("assignMedoids") + +test_that("assignMedoids behave as expected", +{ + # Generate a gaussian mixture + n = 999 + L = 7 + medoids = cbind( rep(0,L), rep(-5,L), rep(5,L) ) + # short series... + series = t( rbind( MASS::mvrnorm(n/3, medoids[,1], diag(L)), + MASS::mvrnorm(n/3, medoids[,2], diag(L)), + MASS::mvrnorm(n/3, medoids[,3], diag(L)) ) ) + + # With high probability, medoids indices should resemble 1,1,1,...,2,2,2,...,3,3,3,... + mi = epclust:::assignMedoids(medoids, series) + mi_ref = rep(1:3, each=n/3) + expect_lt( mean(mi != mi_ref), 0.01 ) + + # Now with a random matrix, compare with (~trusted) R version + series = matrix(runif(n*L, min=-7, max=7), nrow=L) + mi = epclust:::assignMedoids(medoids, series) + mi_ref = R_assignMedoids(medoids, series) + expect_equal(mi, mi_ref) +}) + +# R-equivalent of , requiring a matrix +# (thus potentially breaking "fit-in-memory" hope) +R_assignMedoids <- function(medoids, series) +{ + nb_series = ncol(series) #series in columns + + mi = rep(NA,nb_series) + for (i in 1:nb_series) + mi[i] <- which.min( colSums( sweep(medoids, 1, series[,i], '-')^2 ) ) + + mi +} diff --git a/epclust/tests/testthat/test.clustering.R b/epclust/tests/testthat/test-clustering.R similarity index 55% rename from epclust/tests/testthat/test.clustering.R rename to epclust/tests/testthat/test-clustering.R index 2f24d08..fa22dff 100644 --- a/epclust/tests/testthat/test.clustering.R +++ b/epclust/tests/testthat/test-clustering.R @@ -1,68 +1,5 @@ context("clustering") -test_that("computeSynchrones behave as expected", -{ - # Generate 300 sinusoïdal series of 3 kinds: all series of indices == 0 mod 3 are the same - # (plus noise), all series of indices == 1 mod 3 are the same (plus noise) ... - n = 300 - x = seq(0,9.5,0.1) - L = length(x) #96 1/4h - K = 3 - s1 = cos(x) - s2 = sin(x) - s3 = c( s1[1:(L%/%2)] , s2[(L%/%2+1):L] ) - #sum((s1-s2)^2) == 96 - #sum((s1-s3)^2) == 58 - #sum((s2-s3)^2) == 38 - s = list(s1, s2, s3) - series = matrix(nrow=L, ncol=n) - for (i in seq_len(n)) - series[,i] = s[[I(i,K)]] + rnorm(L,sd=0.01) - - getRefSeries = function(indices) { - indices = indices[indices <= n] - if (length(indices)>0) as.matrix(series[,indices]) else NULL - } - - synchrones = computeSynchrones(bigmemory::as.big.matrix(cbind(s1,s2,s3)), getRefSeries, - n, 100, verbose=TRUE, parll=FALSE) - - expect_equal(dim(synchrones), c(L,K)) - for (i in 1:K) - { - # Synchrones are (for each medoid) sums of closest curves. - # Here, we expect exactly 100 curves of each kind to be assigned respectively to - # synchrone 1, 2 and 3 => division by 100 should be very close to the ref curve - expect_equal(synchrones[,i]/100, s[[i]], tolerance=0.01) - } -}) - -test_that("Helper function to spread indices work properly", -{ - indices <- 1:400 - - # bigger nb_per_set than length(indices) - expect_equal(epclust:::.spreadIndices(indices,500), list(indices)) - - # nb_per_set == length(indices) - expect_equal(epclust:::.spreadIndices(indices,400), list(indices)) - - # length(indices) %% nb_per_set == 0 - expect_equal(epclust:::.spreadIndices(indices,200), - c( list(indices[1:200]), list(indices[201:400]) )) - expect_equal(epclust:::.spreadIndices(indices,100), - c( list(indices[1:100]), list(indices[101:200]), - list(indices[201:300]), list(indices[301:400]) )) - - # length(indices) / nb_per_set == 1, length(indices) %% nb_per_set == 100 - expect_equal(epclust:::.spreadIndices(indices,300), list(indices)) - # length(indices) / nb_per_set == 2, length(indices) %% nb_per_set == 42 - repartition <- epclust:::.spreadIndices(indices,179) - expect_equal(length(repartition), 2) - expect_equal(length(repartition[[1]]), 179 + 21) - expect_equal(length(repartition[[1]]), 179 + 21) -}) - test_that("clusteringTask1 behave as expected", { # Generate 60 reference sinusoïdal series (medoids to be found), @@ -83,7 +20,7 @@ test_that("clusteringTask1 behave as expected", wf = "haar" ctype = "absolute" - getContribs = function(indices) curvesToContribs(series[,indices],wf,ctype) + getContribs = function(indices) curvesToContribs(as.matrix(series[,indices]),wf,ctype) require("cluster", quietly=TRUE) algoClust1 = function(contribs,K) cluster::pam(t(contribs),K,diss=FALSE)$id.med @@ -135,3 +72,18 @@ test_that("clusteringTask2 behave as expected", for (i in 1:3) expect_lte( distor_good, computeDistortion(synchrones, synchrones[,sample(1:K1,3)]) ) }) + +# Compute the sum of (normalized) sum of squares of closest distances to a medoid. +# Note: medoids can be a big.matrix +computeDistortion = function(series, medoids) +{ + if (bigmemory::is.big.matrix(medoids)) + medoids = medoids[,] #extract standard matrix + + n = ncol(series) ; L = nrow(series) + distortion = 0. + for (i in seq_len(n)) + distortion = distortion + min( colSums( sweep(medoids,1,series[,i],'-')^2 ) / L ) + + sqrt( distortion / n ) +} diff --git a/epclust/tests/testthat/test-computeSynchrones.R b/epclust/tests/testthat/test-computeSynchrones.R new file mode 100644 index 0000000..db139ed --- /dev/null +++ b/epclust/tests/testthat/test-computeSynchrones.R @@ -0,0 +1,38 @@ +context("computeSynchrones") + +test_that("computeSynchrones behave as expected", +{ + # Generate 300 sinusoïdal series of 3 kinds: all series of indices == 0 mod 3 are the same + # (plus noise), all series of indices == 1 mod 3 are the same (plus noise) ... + n = 300 + x = seq(0,9.5,0.1) + L = length(x) #96 1/4h + K = 3 + s1 = cos(x) + s2 = sin(x) + s3 = c( s1[1:(L%/%2)] , s2[(L%/%2+1):L] ) + #sum((s1-s2)^2) == 96 + #sum((s1-s3)^2) == 58 + #sum((s2-s3)^2) == 38 + s = list(s1, s2, s3) + series = matrix(nrow=L, ncol=n) + for (i in seq_len(n)) + series[,i] = s[[I(i,K)]] + rnorm(L,sd=0.01) + + getRefSeries = function(indices) { + indices = indices[indices <= n] + if (length(indices)>0) as.matrix(series[,indices]) else NULL + } + + synchrones = computeSynchrones(bigmemory::as.big.matrix(cbind(s1,s2,s3)), getRefSeries, + n, 100, verbose=TRUE, parll=FALSE) + + expect_equal(dim(synchrones), c(L,K)) + for (i in 1:K) + { + # Synchrones are (for each medoid) sums of closest curves. + # Here, we expect exactly 100 curves of each kind to be assigned respectively to + # synchrone 1, 2 and 3 => division by 100 should be very close to the ref curve + expect_equal(synchrones[,i]/100, s[[i]], tolerance=0.01) + } +}) diff --git a/epclust/tests/testthat/test.wavelets.R b/epclust/tests/testthat/test-computeWerDists.R similarity index 74% rename from epclust/tests/testthat/test.wavelets.R rename to epclust/tests/testthat/test-computeWerDists.R index f29312d..d83c590 100644 --- a/epclust/tests/testthat/test.wavelets.R +++ b/epclust/tests/testthat/test-computeWerDists.R @@ -1,9 +1,4 @@ -context("wavelets discrete & continuous") - -test_that("curvesToContribs behave as expected", -{ -# curvesToContribs(...) -}) +context("computeWerDists") test_that("computeWerDists output correct results", { @@ -19,3 +14,4 @@ test_that("computeWerDists output correct results", # On two constant series # TODO:... }) + diff --git a/epclust/tests/testthat/test.de_serialize.R b/epclust/tests/testthat/test-de_serialize.R similarity index 100% rename from epclust/tests/testthat/test.de_serialize.R rename to epclust/tests/testthat/test-de_serialize.R diff --git a/epclust/tests/testthat/test.filter.R b/epclust/tests/testthat/test-filterMA.R similarity index 100% rename from epclust/tests/testthat/test.filter.R rename to epclust/tests/testthat/test-filterMA.R diff --git a/epclust/tests/testthat/test-utils.R b/epclust/tests/testthat/test-utils.R new file mode 100644 index 0000000..4448df0 --- /dev/null +++ b/epclust/tests/testthat/test-utils.R @@ -0,0 +1,32 @@ +context("utils functions") + +test_that("Helper function to split indices work properly", +{ + indices <- 1:400 + + # bigger nb_per_set than length(indices) + expect_equal(epclust:::.splitIndices(indices,500), list(indices)) + + # nb_per_set == length(indices) + expect_equal(epclust:::.splitIndices(indices,400), list(indices)) + + # length(indices) %% nb_per_set == 0 + expect_equal(epclust:::.splitIndices(indices,200), + c( list(indices[1:200]), list(indices[201:400]) )) + expect_equal(epclust:::.splitIndices(indices,100), + c( list(indices[1:100]), list(indices[101:200]), + list(indices[201:300]), list(indices[301:400]) )) + + # length(indices) / nb_per_set == 1, length(indices) %% nb_per_set == 100 + expect_equal(epclust:::.splitIndices(indices,300), list(indices)) + # length(indices) / nb_per_set == 2, length(indices) %% nb_per_set == 42 + repartition <- epclust:::.splitIndices(indices,179) + expect_equal(length(repartition), 2) + expect_equal(length(repartition[[1]]), 179 + 21) + expect_equal(length(repartition[[1]]), 179 + 21) +}) + +test_that("curvesToContribs output correct results", +{ +# curvesToContribs(...) +}) diff --git a/epclust/tests/testthat/test.computeMedoidsIndices.R b/epclust/tests/testthat/test.computeMedoidsIndices.R deleted file mode 100644 index 8347fb6..0000000 --- a/epclust/tests/testthat/test.computeMedoidsIndices.R +++ /dev/null @@ -1,25 +0,0 @@ -context("computeMedoidsIndices") - -test_that("computeMedoidsIndices behave as expected", -{ - # Generate a gaussian mixture - n = 999 - L = 7 - medoids = cbind( rep(0,L), rep(-5,L), rep(5,L) ) - # short series... - series = t( rbind( MASS::mvrnorm(n/3, medoids[,1], diag(L)), - MASS::mvrnorm(n/3, medoids[,2], diag(L)), - MASS::mvrnorm(n/3, medoids[,3], diag(L)) ) ) - - # With high probability, medoids indices should resemble 1,1,1,...,2,2,2,...,3,3,3,... - require("bigmemory", quietly=TRUE) - mi = epclust:::computeMedoidsIndices(bigmemory::as.big.matrix(medoids)@address, series) - mi_ref = rep(1:3, each=n/3) - expect_lt( mean(mi != mi_ref), 0.01 ) - - # Now with a random matrix, compare with (trusted) R version - series = matrix(runif(n*L, min=-7, max=7), nrow=L) - mi = epclust:::computeMedoidsIndices(bigmemory::as.big.matrix(medoids)@address, series) - mi_ref = R_computeMedoidsIndices(medoids, series) - expect_equal(mi, mi_ref) -}) -- 2.44.0