From 3c5a4b0880db63367a474a568e1322b3999932fe Mon Sep 17 00:00:00 2001 From: Benjamin Auder Date: Mon, 13 Mar 2017 12:42:59 +0100 Subject: [PATCH] 'update' --- epclust/R/clustering.R | 53 +++++++++++++++-------------------- epclust/R/computeSynchrones.R | 6 ++-- epclust/R/computeWerDists.R | 41 +++++++++++++-------------- epclust/R/de_serialize.R | 32 ++++++++++----------- epclust/R/main.R | 53 ++++++++++++++++++++--------------- epclust/R/utils.R | 49 +++++++++++++++++--------------- 6 files changed, 117 insertions(+), 117 deletions(-) diff --git a/epclust/R/clustering.R b/epclust/R/clustering.R index bea073a..b91d512 100644 --- a/epclust/R/clustering.R +++ b/epclust/R/clustering.R @@ -1,30 +1,28 @@ -#' @name clustering -#' @rdname clustering -#' @aliases clusteringTask1 clusteringTask2 computeClusters1 computeClusters2 +#' Two-stage clustering, within one task (see \code{claws()}) #' -#' @title Two-stage clustering, withing one task (see \code{claws()}) +#' \code{clusteringTask1()} runs one full stage-1 task, which consists in iterated +#' stage 1 clustering on nb_curves / ntasks energy contributions, computed through +#' discrete wavelets coefficients. +#' \code{clusteringTask2()} runs a full stage-2 task, which consists in WER distances +#' computations between medoids (indices) output from stage 1, before applying +#' the second clustering algorithm on the distances matrix. #' -#' @description \code{clusteringTask1()} runs one full stage-1 task, which consists in -#' iterated stage 1 clustering on nb_curves / ntasks energy contributions, computed -#' through discrete wavelets coefficients. -#' \code{clusteringTask2()} runs a full stage-2 task, which consists in -#' WER distances computations between medoids indices output from stage 1, -#' before applying the second clustering algorithm, on the distances matrix. -#' -#' @param indices Range of series indices to cluster #' @param getContribs Function to retrieve contributions from initial series indices: #' \code{getContribs(indices)} outputs a contributions matrix #' @inheritParams claws #' @inheritParams computeSynchrones +#' @inheritParams computeWerDists +#' +#' @return The indices of the computed (resp. K1 and K2) medoids. #' -#' @return For \code{clusteringTask1()}, the indices of the computed (K1) medoids. -#' Indices are irrelevant for stage 2 clustering, thus \code{clusteringTask2()} -#' outputs a big.matrix of medoids (of size LxK2, K2 = final number of clusters) +#' @name clustering +#' @rdname clustering +#' @aliases clusteringTask1 clusteringTask2 NULL #' @rdname clustering #' @export -clusteringTask1 = function(indices, getContribs, K1, algoClust1, nb_series_per_chunk, +clusteringTask1 = function(indices, getContribs, K1, algoClust1, nb_items_clust, ncores_clust=1, verbose=FALSE, parll=TRUE) { if (parll) @@ -36,7 +34,7 @@ clusteringTask1 = function(indices, getContribs, K1, algoClust1, nb_series_per_c while (length(indices) > K1) { # Balance tasks by splitting the indices set - as evenly as possible - indices_workers = .splitIndices(indices, nb_items_clust1) + indices_workers = .splitIndices(indices, nb_items_clust, min_size=K1+1) if (verbose) cat(paste("*** [iterated] Clustering task 1 on ",length(indices)," series\n", sep="")) indices <- @@ -66,22 +64,17 @@ clusteringTask2 = function(indices, getSeries, K2, algoClust2, nb_series_per_chu nvoice, nbytes, endian, ncores_clust=1, verbose=FALSE, parll=TRUE) { if (verbose) - cat(paste("*** Clustering task 2 on ",ncol(medoids)," synchrones\n", sep="")) - - if (ncol(medoids) <= K2) - return (medoids) + cat(paste("*** Clustering task 2 on ",length(indices)," medoids\n", sep="")) - # A) Obtain synchrones, that is to say the cumulated power consumptions - # for each of the K1 initial groups - synchrones = computeSynchrones(medoids, getRefSeries, nb_ref_curves, - nb_series_per_chunk, ncores_clust, verbose, parll) + if (length(indices) <= K2) + return (indices) - # B) Compute the WER distances (Wavelets Extended coefficient of deteRmination) - distances = computeWerDists( - synchrones, nvoice, nbytes, endian, ncores_clust, verbose, parll) + # A) Compute the WER distances (Wavelets Extended coefficient of deteRmination) + distances = computeWerDists(indices, getSeries, nb_series_per_chunk, + nvoice, nbytes, endian, ncores_clust, verbose, parll) - # C) Apply clustering algorithm 2 on the WER distances matrix + # B) Apply clustering algorithm 2 on the WER distances matrix if (verbose) cat(paste("*** algoClust2() on ",nrow(distances)," items\n", sep="")) - medoids[ ,algoClust2(distances,K2) ] + indices[ algoClust2(distances,K2) ] } diff --git a/epclust/R/computeSynchrones.R b/epclust/R/computeSynchrones.R index f73e64e..09ff3a0 100644 --- a/epclust/R/computeSynchrones.R +++ b/epclust/R/computeSynchrones.R @@ -68,14 +68,14 @@ computeSynchrones = function(medoids, getSeries, nb_curves, medoids_desc <- bigmemory::describe(medoids) cl = parallel::makeCluster(ncores_clust) parallel::clusterExport(cl, envir=environment(), - varlist=c("synchrones_desc","m_desc","medoids_desc","getRefSeries")) + varlist=c("synchrones_desc","m_desc","medoids_desc","getSeries")) } if (verbose) cat(paste("--- Compute ",K," synchrones with ",nb_curves," series\n", sep="")) - # Balance tasks by splitting 1:nb_ref_curves into groups of size <= nb_series_per_chunk - indices_workers = .splitIndices(seq_len(nb_ref_curves), nb_series_per_chunk) + # Balance tasks by splitting 1:nb_curves into groups of size <= nb_series_per_chunk + indices_workers = .splitIndices(seq_len(nb_curves), nb_series_per_chunk) ignored <- if (parll) parallel::parLapply(cl, indices_workers, computeSynchronesChunk) diff --git a/epclust/R/computeWerDists.R b/epclust/R/computeWerDists.R index aae1cc1..a813b8f 100644 --- a/epclust/R/computeWerDists.R +++ b/epclust/R/computeWerDists.R @@ -3,21 +3,24 @@ #' Compute the WER distances between the synchrones curves (in columns), which are #' returned (e.g.) by \code{computeSynchrones()} #' -#' @param synchrones A big.matrix of synchrones, in columns. The series have same -#' length as the series in the initial dataset +#' @param indices Range of series indices to cluster #' @inheritParams claws +#' @inheritParams computeSynchrones #' -#' @return A distances matrix of size K1 x K1 +#' @return A distances matrix of size K x K where K == length(indices) #' #' @export -computeWerDists = function(synchrones, nvoice, nbytes,endian,ncores_clust=1, - verbose=FALSE,parll=TRUE) +computeWerDists = function(indices, getSeries, nb_series_per_chunk, nvoice, nbytes, endian, + ncores_clust=1, verbose=FALSE, parll=TRUE) { - n <- ncol(synchrones) - L <- nrow(synchrones) + n <- length(indices) + L <- length(getSeries(1)) #TODO: not very nice way to get L noctave = ceiling(log2(L)) #min power of 2 to cover serie range + # Since a CWT contains noctave*nvoice complex series, we deduce the number of CWT to + # retrieve/put in one chunk. + nb_cwt_per_chunk = max(1, floor(nb_series_per_chunk / (nvoice*noctave*2))) - # Initialize result as a square big.matrix of size 'number of synchrones' + # Initialize result as a square big.matrix of size 'number of medoids' Xwer_dist <- bigmemory::big.matrix(nrow=n, ncol=n, type="double") # Generate n(n-1)/2 pairs for WER distances computations @@ -30,7 +33,7 @@ computeWerDists = function(synchrones, nvoice, nbytes,endian,ncores_clust=1, } cwt_file = ".cwt.bin" - # Compute the synchrones[,indices] CWT, and store the results in the binary file + # Compute the getSeries(indices) CWT, and store the results in the binary file computeSaveCWT = function(indices) { if (parll) @@ -38,27 +41,25 @@ computeWerDists = function(synchrones, nvoice, nbytes,endian,ncores_clust=1, require("bigmemory", quietly=TRUE) require("Rwave", quietly=TRUE) require("epclust", quietly=TRUE) - synchrones <- bigmemory::attach.big.matrix(synchrones_desc) } # Obtain CWT as big vectors of real part + imaginary part (concatenate) ts_cwt <- sapply(indices, function(i) { - ts <- scale(ts(synchrones[,i]), center=TRUE, scale=FALSE) + ts <- scale(ts(getSeries(i)), center=TRUE, scale=FALSE) ts_cwt <- Rwave::cwt(ts, noctave, nvoice, w0=2*pi, twoD=TRUE, plot=FALSE) c( as.double(Re(ts_cwt)),as.double(Im(ts_cwt)) ) }) # Serialization - binarize(ts_cwt, cwt_file, 1, ",", nbytes, endian) + binarize(ts_cwt, cwt_file, nb_cwt_per_chunk, ",", nbytes, endian) } if (parll) { cl = parallel::makeCluster(ncores_clust) - synchrones_desc <- bigmemory::describe(synchrones) Xwer_dist_desc <- bigmemory::describe(Xwer_dist) - parallel::clusterExport(cl, varlist=c("parll","synchrones_desc","Xwer_dist_desc", - "noctave","nvoice","verbose","getCWT"), envir=environment()) + parallel::clusterExport(cl, varlist=c("parll","nb_cwt_per_chunk","L", + "Xwer_dist_desc","noctave","nvoice","getCWT"), envir=environment()) } if (verbose) @@ -71,7 +72,7 @@ computeWerDists = function(synchrones, nvoice, nbytes,endian,ncores_clust=1, lapply(1:n, computeSaveCWT) # Function to retrieve a synchrone CWT from (binary) file - getSynchroneCWT = function(index, L) + getCWT = function(index, L) { flat_cwt <- getDataInFile(index, cwt_file, nbytes, endian) cwt_length = length(flat_cwt) / 2 @@ -84,8 +85,6 @@ computeWerDists = function(synchrones, nvoice, nbytes,endian,ncores_clust=1, #TODO: better repartition here, - #better code in .splitIndices :: never exceed nb_per_chunk; arg: min_per_chunk (default: 1) -###TODO: reintroduire nb_items_clust ======> l'autre est typiquement + grand !!! (pas de relation !) @@ -97,7 +96,6 @@ computeWerDists = function(synchrones, nvoice, nbytes,endian,ncores_clust=1, # parallel workers start with an empty environment require("bigmemory", quietly=TRUE) require("epclust", quietly=TRUE) - synchrones <- bigmemory::attach.big.matrix(synchrones_desc) Xwer_dist <- bigmemory::attach.big.matrix(Xwer_dist_desc) } @@ -106,9 +104,8 @@ computeWerDists = function(synchrones, nvoice, nbytes,endian,ncores_clust=1, cat(paste(" Distances (",i,",",j,"), (",i,",",j+1,") ...\n", sep="")) # Compute CWT of columns i and j in synchrones - L = nrow(synchrones) - cwt_i <- getSynchroneCWT(i, L) - cwt_j <- getSynchroneCWT(j, L) + cwt_i <- getCWT(i, L) + cwt_j <- getCWT(j, L) # Compute the ratio of integrals formula 5.6 for WER^2 # in https://arxiv.org/abs/1101.4744v2 §5.3 diff --git a/epclust/R/de_serialize.R b/epclust/R/de_serialize.R index a49990b..f28038b 100644 --- a/epclust/R/de_serialize.R +++ b/epclust/R/de_serialize.R @@ -1,28 +1,28 @@ -#' @name de_serialize -#' @rdname de_serialize -#' @aliases binarize binarizeTransform getDataInFile +#' (De)Serialization of a [big]matrix or data stream #' -#' @title (De)Serialization of a [big]matrix or data stream +#' \code{binarize()} serializes a matrix or CSV file with minimal overhead, into a +#' binary file. \code{getDataInFile()} achieves the inverse task: she retrieves (ASCII) +#' data rows from indices in the binary file. Finally, \code{binarizeTransform()} +#' serialize transformations of all data chunks. To use it a data-retrieval function +#' must be provided -- thus \code{binarize} will most likely be used first +#' (and then a function defined to seek in generated binary file) #' -#' @description \code{binarize()} serializes a matrix or CSV file with minimal overhead, -#' into a binary file. \code{getDataInFile()} achieves the inverse task: she retrieves -#' (ASCII) data rows from indices in the binary file. Finally, -#' \code{binarizeTransform()} serialize transformations of all data chunks; to use it, -#' a data-retrieval function must be provided, thus \code{binarize} will most likely be -#' used first (and then a function defined to seek in generated binary file) -#' -#' @param data_ascii Either a matrix or CSV file, with items in rows -#' @param indices Indices of the lines to retrieve -#' @param data_bin_file Name of binary file on output (\code{binarize}) -#' or input (\code{getDataInFile}) +#' @param data_ascii Either a matrix (by columns) or CSV file or connection (by rows) +#' @param data_bin_file Name of binary file on output of (\code{binarize}) +#' or input of (\code{getDataInFile}) #' @param nb_per_chunk Number of lines to process in one batch (big.matrix or connection) -#' @inheritParams claws #' @param getData Function to retrieve data chunks #' @param transform Transformation function to apply on data chunks +#' @param indices Indices of the lines to retrieve +#' @inheritParams claws #' #' @return For \code{getDataInFile()}, the matrix with rows corresponding to the #' requested indices. \code{binarizeTransform} returns the number of processed lines. #' \code{binarize} is designed to serialize in several calls, thus returns nothing. +#' +#' @name de_serialize +#' @rdname de_serialize +#' @aliases binarize binarizeTransform getDataInFile NULL #' @rdname de_serialize diff --git a/epclust/R/main.R b/epclust/R/main.R index 603f7bf..ce650ff 100644 --- a/epclust/R/main.R +++ b/epclust/R/main.R @@ -45,8 +45,8 @@ #' } #' @param K1 Number of clusters to be found after stage 1 (K1 << N [number of series]) #' @param K2 Number of clusters to be found after stage 2 (K2 << K1) -#' @param nb_series_per_chunk (Maximum) number of series to retrieve in one batch; -#' this value is also used for the (maximum) number of series to cluster at a time +#' @param nb_series_per_chunk (Maximum) number of series to retrieve in one batch +#' @param nb_items_clust (~Maximum) number of items in clustering algorithm 1 input #' @param algoClust1 Clustering algorithm for stage 1. A function which takes (data, K) #' as argument where data is a matrix in columns and K the desired number of clusters, #' and outputs K medoids ranks. Default: PAM. In our method, this function is called @@ -150,7 +150,7 @@ #' digest::sha1(res_db) #' } #' @export -claws <- function(getSeries, K1, K2, nb_series_per_chunk, +claws <- function(series, K1, K2, nb_series_per_chunk, nb_items_clust=7*K1, algoClust1=function(data,K) cluster::pam(t(data),K,diss=FALSE,pamonce=1)$id.med, algoClust2=function(dists,K) cluster::pam(dists,K,diss=TRUE,pamonce=1)$id.med, wav_filt="d8", contrib_type="absolute", WER="end", nvoice=4, random=TRUE, @@ -167,10 +167,7 @@ claws <- function(getSeries, K1, K2, nb_series_per_chunk, K1 <- .toInteger(K1, function(x) x>=2) K2 <- .toInteger(K2, function(x) x>=2) nb_series_per_chunk <- .toInteger(nb_series_per_chunk, function(x) x>=1) - # K1 (number of clusters at step 1) cannot exceed nb_series_per_chunk, because we will need - # to load K1 series in memory for clustering stage 2. - if (K1 > nb_series_per_chunk) - stop("'K1' cannot exceed 'nb_series_per_chunk'") + nb_items_clust <- .toInteger(nb_items_clust, function(x) x>K1) random <- .toLogical(random) tryCatch( {ignored <- wavelets::wt.filter(wav_filt)}, error = function(e) stop("Invalid wavelet filter; see ?wavelets::wt.filter") ) @@ -247,14 +244,20 @@ claws <- function(getSeries, K1, K2, nb_series_per_chunk, # Initialize parallel runs: outfile="" allow to output verbose traces in the console # under Linux. All necessary variables are passed to the workers. cl = parallel::makeCluster(ncores_tasks, outfile="") - parallel::clusterExport(cl, envir = environment(), - varlist = c("getSeries","getContribs","K1","K2","algoClust1","algoClust2", - "nb_series_per_chunk","ncores_clust","nvoice","nbytes","endian","verbose","parll")) + varlist = c("ncores_clust","verbose","parll", #task 1 & 2 + "K1","getContribs","algoClust1","nb_items_clust") #task 1 + if (WER=="mix") + { + # Add variables for task 2 + varlist = c(varlist, "K2","getSeries","algoClust2","nb_series_per_chunk", + "nvoice","nbytes","endian") + } + parallel::clusterExport(cl, varlist, envir = environment()) } # This function achieves one complete clustering task, divided in stage 1 + stage 2. - # stage 1: n indices --> clusteringTask1(...) --> K1 medoids - # stage 2: K1 medoids --> clusteringTask2(...) --> K2 medoids, + # stage 1: n indices --> clusteringTask1(...) --> K1 medoids (indices) + # stage 2: K1 indices --> K1xK1 WER distances --> clusteringTask2(...) --> K2 medoids, # where n = N / ntasks, N being the total number of curves. runTwoStepClustering = function(inds) { @@ -263,7 +266,7 @@ claws <- function(getSeries, K1, K2, nb_series_per_chunk, if (parll && ntasks>1) require("epclust", quietly=TRUE) indices_medoids = clusteringTask1(inds, getContribs, K1, algoClust1, - nb_series_per_chunk, ncores_clust, verbose, parll) + nb_items_clust, ncores_clust, verbose, parll) if (WER=="mix") { indices_medoids = clusteringTask2(indices_medoids, getSeries, K2, algoClust2, @@ -280,8 +283,8 @@ claws <- function(getSeries, K1, K2, nb_series_per_chunk, cat(paste(message,"\n", sep="")) } - # As explained above, indices will be assigned to ntasks*K1 medoids indices [if WER=="end"], - # or nothing (empty vector) if WER=="mix"; in this case, synchrones are stored in a file. + # As explained above, we obtain after all runs ntasks*[K1 or K2] medoids indices, + # depending wether WER=="end" or "mix", respectively. indices_medoids_all <- if (parll && ntasks>1) unlist( parallel::parLapply(cl, indices_tasks, runTwoStepClustering) ) @@ -291,22 +294,26 @@ claws <- function(getSeries, K1, K2, nb_series_per_chunk, if (parll && ntasks>1) parallel::stopCluster(cl) - # Right before the final stage, input data still is the initial set of curves, referenced - # by the ntasks*[K1 or K2] medoids indices. + # For the last stage, ncores_tasks*(ncores_clusts+1) cores should be available: + # - ntasks for level 1 parallelism + # - ntasks*ncores_clust for level 2 parallelism, + # but since an extension MPI <--> tasks / OpenMP <--> sub-tasks is on the way, + # it's better to just re-use ncores_clust + ncores_last_stage <- ncores_clust - # Run last clustering tasks to obtain only K2 medoids indices, from ntasks*[K1 or K2] - # indices, depending wether WER=="end" or WER=="mix" + # Run last clustering tasks to obtain only K2 medoids indices if (verbose) cat("...Run final // stage 1 + stage 2\n") indices_medoids = clusteringTask1(indices_medoids_all, getContribs, K1, algoClust1, - nb_series_per_chunk, ncores_tasks*ncores_clust, verbose, parll) + nb_items_clust, ncores_tasks*ncores_clust, verbose, parll) indices_medoids = clusteringTask2(indices_medoids, getContribs, K2, algoClust2, - nb_series_per_chunk, nvoice, nbytes, endian, ncores_tasks*ncores_clust, verbose, parll) + nb_series_per_chunk, nvoice, nbytes, endian, ncores_last_stage, verbose, parll) - # Compute synchrones + # Compute synchrones, that is to say the cumulated power consumptions for each of the K2 + # final groups. medoids = getSeries(indices_medoids) synchrones = computeSynchrones(medoids, getSeries, nb_curves, nb_series_per_chunk, - ncores_tasks*ncores_clust, verbose, parll) + ncores_last_stage, verbose, parll) # NOTE: no need to use big.matrix here, since there are only K2 << K1 << N remaining curves list("medoids"=medoids, "ranks"=indices_medoids, "synchrones"=synchrones) diff --git a/epclust/R/utils.R b/epclust/R/utils.R index ba643d0..e79c009 100644 --- a/epclust/R/utils.R +++ b/epclust/R/utils.R @@ -30,13 +30,13 @@ #' Compute the discrete wavelet coefficients for each series, and aggregate them in #' energy contribution across scales as described in https://arxiv.org/abs/1101.4744v2 #' -#' @param series [big.]matrix of series (in columns), of size L x n +#' @param curves [big.]matrix of series (in columns), of size L x n #' @inheritParams claws #' #' @return A matrix of size log(L) x n containing contributions in columns #' #' @export -curvesToContribs = function(series, wav_filt, contrib_type, coin=FALSE) +curvesToContribs = function(series, wav_filt, contrib_type) { L = nrow(series) D = ceiling( log2(L) ) @@ -55,9 +55,9 @@ curvesToContribs = function(series, wav_filt, contrib_type, coin=FALSE) }) } -# Helper function to divide indices into balanced sets -# If max == TRUE, sets sizes cannot exceed nb_per_set -.splitIndices = function(indices, nb_per_set, max=FALSE) +# Helper function to divide indices into balanced sets. +# Ensure that all indices sets have at least min_size elements. +.splitIndices = function(indices, nb_per_set, min_size=1) { L = length(indices) nb_workers = floor( L / nb_per_set ) @@ -65,29 +65,32 @@ curvesToContribs = function(series, wav_filt, contrib_type, coin=FALSE) if (nb_workers == 0 || (nb_workers==1 && rem==0)) { # L <= nb_per_set, simple case - indices_workers = list(indices) + return (list(indices)) } - else - { - indices_workers = lapply( seq_len(nb_workers), function(i) - indices[(nb_per_set*(i-1)+1):(nb_per_set*i)] ) - if (max) - { - # Sets are not so well balanced, but size is supposed to be critical - return ( c( indices_workers, if (rem>0) list((L-rem+1):L) else NULL ) ) - } + indices_workers = lapply( seq_len(nb_workers), function(i) + indices[(nb_per_set*(i-1)+1):(nb_per_set*i)] ) - # Spread the remaining load among the workers - rem = L %% nb_per_set - while (rem > 0) + rem = L %% nb_per_set #number of remaining unassigned items + if (rem == 0) + return (indices_workers) + + rem <- (L-rem+1):L + # If remainder is smaller than min_size, feed it with indices from other sets + # until either its size exceed min_size (success) or other sets' size + # get lower min_size (failure). + while (length(rem) < min_size) + { + index = length(rem) %% nb_workers + 1 + if (length(indices_workers[[index]]) <= min_size) { - index = rem%%nb_workers + 1 - indices_workers[[index]] = c(indices_workers[[index]], indices[L-rem+1]) - rem = rem - 1 + stop("Impossible to split indices properly for clustering. + Try increasing nb_items_clust or decreasing K1") } + rem = c(rem, tail(indices_workers[[index]],1)) + indices_workers[[index]] = head( indices_workers[[index]], -1) } - indices_workers + return ( c(indices_workers, list(rem) ) ) } #' filterMA @@ -98,7 +101,7 @@ curvesToContribs = function(series, wav_filt, contrib_type, coin=FALSE) #' @param M_ A real matrix of size LxD #' @param w_ The (odd) number of values to average #' -#' @return The filtered matrix, of same size as the input +#' @return The filtered matrix (in columns), of same size as the input #' @export filterMA = function(M_, w_) .Call("filterMA", M_, w_, PACKAGE="epclust") -- 2.44.0