From: Benjamin Auder Date: Sat, 11 Mar 2017 15:57:48 +0000 (+0100) Subject: add comments, fix some things. TODO: comment tests, finish computeWerDists, test it X-Git-Url: https://git.auder.net/variants/%24%7Bvname%7D/%7B%7B%20asset%28%27mixstore/current/%7B%7B?a=commitdiff_plain;h=d9bb53c5e1392018bf67f92140edb10137f3423c;p=epclust.git add comments, fix some things. TODO: comment tests, finish computeWerDists, test it --- diff --git a/epclust/R/clustering.R b/epclust/R/clustering.R index a431ba8..2ce4267 100644 --- a/epclust/R/clustering.R +++ b/epclust/R/clustering.R @@ -33,10 +33,12 @@ clusteringTask1 = function(indices, getContribs, K1, algoClust1, nb_items_clust1 if (parll) { cl = parallel::makeCluster(ncores_clust, outfile = "") - parallel::clusterExport(cl, varlist=c("getContribs","K1","verbose"), envir=environment()) + parallel::clusterExport(cl, c("getContribs","K1","verbose"), envir=environment()) } + # Iterate clustering algorithm 1 until K1 medoids are found while (length(indices) > K1) { + # Balance tasks by splitting the indices set - as evenly as possible indices_workers = .spreadIndices(indices, nb_items_clust1) if (verbose) cat(paste("*** [iterated] Clustering task 1 on ",length(indices)," series\n", sep="")) @@ -64,16 +66,23 @@ clusteringTask1 = function(indices, getContribs, K1, algoClust1, nb_items_clust1 #' @rdname clustering #' @export clusteringTask2 = function(medoids, K2, algoClust2, getRefSeries, nb_ref_curves, - nb_series_per_chunk, sync_mean, nbytes,endian,ncores_clust=1,verbose=FALSE,parll=TRUE) + nb_series_per_chunk, nbytes,endian,ncores_clust=1,verbose=FALSE,parll=TRUE) { if (verbose) cat(paste("*** Clustering task 2 on ",ncol(medoids)," synchrones\n", sep="")) if (ncol(medoids) <= K2) return (medoids) + + # A) Obtain synchrones, that is to say the cumulated power consumptions + # for each of the K1 initial groups synchrones = computeSynchrones(medoids, getRefSeries, nb_ref_curves, - nb_series_per_chunk, sync_mean, ncores_clust, verbose, parll) + nb_series_per_chunk, ncores_clust, verbose, parll) + + # B) Compute the WER distances (Wavelets Extended coefficient of deteRmination) distances = computeWerDists(synchrones, nbytes, endian, ncores_clust, verbose, parll) + + # C) Apply clustering algorithm 2 on the WER distances matrix if (verbose) cat(paste(" algoClust2() on ",nrow(distances)," items\n", sep="")) medoids[ ,algoClust2(distances,K2) ] @@ -82,7 +91,7 @@ clusteringTask2 = function(medoids, K2, algoClust2, getRefSeries, nb_ref_curves, #' computeSynchrones #' #' Compute the synchrones curves (sum of clusters elements) from a matrix of medoids, -#' using L2 distances. +#' using euclidian distance. #' #' @param medoids big.matrix of medoids (curves of same length as initial series) #' @param getRefSeries Function to retrieve initial series (e.g. in stage 2 after series @@ -94,8 +103,9 @@ clusteringTask2 = function(medoids, K2, algoClust2, getRefSeries, nb_ref_curves, #' #' @export computeSynchrones = function(medoids, getRefSeries, nb_ref_curves, - nb_series_per_chunk, sync_mean, ncores_clust=1,verbose=FALSE,parll=TRUE) + nb_series_per_chunk, ncores_clust=1,verbose=FALSE,parll=TRUE) { + # Synchrones computation is embarassingly parallel: compute it by chunks of series computeSynchronesChunk = function(indices) { if (parll) @@ -103,26 +113,25 @@ computeSynchrones = function(medoids, getRefSeries, nb_ref_curves, require("bigmemory", quietly=TRUE) requireNamespace("synchronicity", quietly=TRUE) require("epclust", quietly=TRUE) + # The big.matrix objects need to be attached to be usable on the workers synchrones <- bigmemory::attach.big.matrix(synchrones_desc) - if (sync_mean) - counts <- bigmemory::attach.big.matrix(counts_desc) medoids <- bigmemory::attach.big.matrix(medoids_desc) m <- synchronicity::attach.mutex(m_desc) } + # Obtain a chunk of reference series ref_series = getRefSeries(indices) nb_series = ncol(ref_series) # Get medoids indices for this chunk of series mi = computeMedoidsIndices(medoids@address, ref_series) + # Update synchrones using mi above for (i in seq_len(nb_series)) { if (parll) - synchronicity::lock(m) + synchronicity::lock(m) #locking required because several writes at the same time synchrones[, mi[i] ] = synchrones[, mi[i] ] + ref_series[,i] - if (sync_mean) - counts[ mi[i] ] = counts[ mi[i] ] + 1 if (parll) synchronicity::unlock(m) } @@ -130,34 +139,29 @@ computeSynchrones = function(medoids, getRefSeries, nb_ref_curves, K = ncol(medoids) ; L = nrow(medoids) # Use bigmemory (shared==TRUE by default) + synchronicity to fill synchrones in // - # TODO: if size > RAM (not our case), use file-backed big.matrix synchrones = bigmemory::big.matrix(nrow=L, ncol=K, type="double", init=0.) - if (sync_mean) - counts = bigmemory::big.matrix(nrow=K, ncol=1, type="double", init=0) - # synchronicity is only for Linux & MacOS; on Windows: run sequentially + # NOTE: synchronicity is only for Linux & MacOS; on Windows: run sequentially parll = (requireNamespace("synchronicity",quietly=TRUE) && parll && Sys.info()['sysname'] != "Windows") if (parll) { - m <- synchronicity::boost.mutex() + m <- synchronicity::boost.mutex() #for lock/unlock, see computeSynchronesChunk + # mutex and big.matrix objects cannot be passed directly: + # they will be accessed from their description m_desc <- synchronicity::describe(m) synchrones_desc = bigmemory::describe(synchrones) - if (sync_mean) - counts_desc = bigmemory::describe(counts) medoids_desc = bigmemory::describe(medoids) cl = parallel::makeCluster(ncores_clust) - varlist=c("synchrones_desc","sync_mean","m_desc","medoids_desc","getRefSeries") - if (sync_mean) - varlist = c(varlist, "counts_desc") - parallel::clusterExport(cl, varlist, envir=environment()) + parallel::clusterExport(cl, envir=environment(), + varlist=c("synchrones_desc","m_desc","medoids_desc","getRefSeries")) } if (verbose) - { - if (verbose) - cat(paste("--- Compute ",K," synchrones with ",nb_ref_curves," series\n", sep="")) - } - indices_workers = .spreadIndices(seq_len(nb_ref_curves), nb_series_per_chunk) + cat(paste("--- Compute ",K," synchrones with ",nb_ref_curves," series\n", sep="")) + + # Balance tasks by splitting the indices set - maybe not so evenly, but + # max==TRUE in next call ensures that no set has more than nb_series_per_chunk items. + indices_workers = .spreadIndices(seq_len(nb_ref_curves), nb_series_per_chunk, max=TRUE) ignored <- if (parll) parallel::parLapply(cl, indices_workers, computeSynchronesChunk) @@ -167,19 +171,7 @@ computeSynchrones = function(medoids, getRefSeries, nb_ref_curves, if (parll) parallel::stopCluster(cl) - if (!sync_mean) - return (synchrones) - - #TODO: can we avoid this loop? ( synchrones = sweep(synchrones, 2, counts, '/') ) - for (i in seq_len(K)) - synchrones[,i] = synchrones[,i] / counts[i] - #NOTE: odds for some clusters to be empty? (when series already come from stage 2) - # ...maybe; but let's hope resulting K1' be still quite bigger than K2 - noNA_rows = sapply(seq_len(K), function(i) all(!is.nan(synchrones[,i]))) - if (all(noNA_rows)) - return (synchrones) - # Else: some clusters are empty, need to slice synchrones - bigmemory::as.big.matrix(synchrones[,noNA_rows]) + return (synchrones) } #' computeWerDists @@ -196,21 +188,13 @@ computeSynchrones = function(medoids, getRefSeries, nb_ref_curves, #' @export computeWerDists = function(synchrones, nbytes,endian,ncores_clust=1,verbose=FALSE,parll=TRUE) { - n <- nrow(synchrones) - delta <- ncol(synchrones) + n <- ncol(synchrones) + L <- nrow(synchrones) #TODO: automatic tune of all these parameters ? (for other users) + # 4 here represent 2^5 = 32 half-hours ~ 1 day nvoice <- 4 # noctave = 2^13 = 8192 half hours ~ 180 days ; ~log2(ncol(synchrones)) noctave = 13 - # 4 here represent 2^5 = 32 half-hours ~ 1 day - #NOTE: default scalevector == 2^(0:(noctave * nvoice) / nvoice) * s0 (?) - scalevector <- 2^(4:(noctave * nvoice) / nvoice + 1) - #condition: ( log2(s0*w0/(2*pi)) - 1 ) * nvoice + 1.5 >= 1 - s0 = 2 - w0 = 2*pi - scaled=FALSE - s0log = as.integer( (log2( s0*w0/(2*pi) ) - 1) * nvoice + 1.5 ) - totnoct = noctave + as.integer(s0log/nvoice) + 1 Xwer_dist <- bigmemory::big.matrix(nrow=n, ncol=n, type="double") @@ -228,15 +212,15 @@ computeWerDists = function(synchrones, nbytes,endian,ncores_clust=1,verbose=FALS computeSaveCWT = function(index) { - ts <- scale(ts(synchrones[index,]), center=TRUE, scale=scaled) - totts.cwt = Rwave::cwt(ts, totnoct, nvoice, w0, plot=FALSE) + ts <- scale(ts(synchrones[,index]), center=TRUE, scale=FALSE) + totts.cwt = Rwave::cwt(ts, totnoct, nvoice, w0=2*pi, twoD=TRUE, plot=FALSE) ts.cwt = totts.cwt[,s0log:(s0log+noctave*nvoice)] #Normalization sqs <- sqrt(2^(0:(noctave*nvoice)/nvoice)*s0) sqres <- sweep(ts.cwt,2,sqs,'*') res <- sqres / max(Mod(sqres)) #TODO: serializer les CWT, les récupérer via getDataInFile ; - #--> OK, faut juste stocker comme séries simples de taille delta*ncol (53*17519) + #--> OK, faut juste stocker comme séries simples de taille L*n' (53*17519) binarize(c(as.double(Re(res)),as.double(Im(res))), cwt_file, ncol(res), ",", nbytes, endian) } @@ -245,8 +229,9 @@ computeWerDists = function(synchrones, nbytes,endian,ncores_clust=1,verbose=FALS cl = parallel::makeCluster(ncores_clust) synchrones_desc <- bigmemory::describe(synchrones) Xwer_dist_desc <- bigmemory::describe(Xwer_dist) - parallel::clusterExport(cl, varlist=c("synchrones_desc","Xwer_dist_desc","totnoct", - "nvoice","w0","s0log","noctave","s0","verbose","getCWT"), envir=environment()) + parallel::clusterExport(cl, envir=environment(), + varlist=c("synchrones_desc","Xwer_dist_desc","totnoct","nvoice","w0","s0log", + "noctave","s0","verbose","getCWT")) } if (verbose) @@ -289,7 +274,7 @@ computeWerDists = function(synchrones, nbytes,endian,ncores_clust=1,verbose=FALS WX <- epclustFilter(Mod(cwt_i * Conj(cwt_i))) WY <- epclustFilter(Mod(cwt_j * Conj(cwt_j))) wer2 <- sum(colSums(num)^2) / sum(colSums(WX) * colSums(WY)) - Xwer_dist[i,j] <- sqrt(delta * ncol(cwt_i) * max(1 - wer2, 0.)) #FIXME: wer2 should be < 1 + Xwer_dist[i,j] <- sqrt(L * ncol(cwt_i) * max(1 - wer2, 0.)) Xwer_dist[j,i] <- Xwer_dist[i,j] Xwer_dist[i,i] = 0. } @@ -314,7 +299,8 @@ computeWerDists = function(synchrones, nbytes,endian,ncores_clust=1,verbose=FALS } # Helper function to divide indices into balanced sets -.spreadIndices = function(indices, nb_per_set) +# If max == TRUE, sets sizes cannot exceed nb_per_set +.spreadIndices = function(indices, nb_per_set, max=FALSE) { L = length(indices) nb_workers = floor( L / nb_per_set ) @@ -328,6 +314,13 @@ computeWerDists = function(synchrones, nbytes,endian,ncores_clust=1,verbose=FALS { indices_workers = lapply( seq_len(nb_workers), function(i) indices[(nb_per_set*(i-1)+1):(nb_per_set*i)] ) + + if (max) + { + # Sets are not so well balanced, but size is supposed to be critical + return ( c( indices_workers, (L-rem+1):L ) ) + } + # Spread the remaining load among the workers rem = L %% nb_per_set while (rem > 0) diff --git a/epclust/R/de_serialize.R b/epclust/R/de_serialize.R index fd3a5db..5a9dd1f 100644 --- a/epclust/R/de_serialize.R +++ b/epclust/R/de_serialize.R @@ -14,8 +14,8 @@ #' @param data_ascii Either a matrix or CSV file, with items in rows #' @param indices Indices of the lines to retrieve #' @param data_bin_file Name of binary file on output (\code{binarize}) -#' or intput (\code{getDataInFile}) -#' @param nb_per_chunk Number of lines to process in one batch +#' or input (\code{getDataInFile}) +#' @param nb_per_chunk Number of lines to process in one batch (big.matrix or connection) #' @inheritParams claws #' @param getData Function to retrieve data chunks #' @param transform Transformation function to apply on data chunks @@ -30,24 +30,29 @@ NULL binarize = function(data_ascii, data_bin_file, nb_per_chunk, sep=",", nbytes=4, endian=.Platform$endian) { + # data_ascii can be of two types: [big.]matrix, or connection if (is.character(data_ascii)) data_ascii = file(data_ascii, open="r") else if (methods::is(data_ascii,"connection") && !isOpen(data_ascii)) open(data_ascii) is_matrix = !methods::is(data_ascii,"connection") + # At first call, the length of a stored row is written. So it's important to determine + # if the serialization process already started. first_write = (!file.exists(data_bin_file) || file.info(data_bin_file)$size == 0) + + # Open the binary file for writing (or 'append' if already exists) data_bin = file(data_bin_file, open=ifelse(first_write,"wb","ab")) - #write data length on first call if (first_write) { - #number of items always on 8 bytes + # Write data length on first call: number of items always on 8 bytes writeBin(0L, data_bin, size=8, endian=endian) if (is_matrix) data_length = nrow(data_ascii) else #connection { + # Read the first line to know data length, and write it then data_line = scan(data_ascii, double(), sep=sep, nlines=1, quiet=TRUE) writeBin(data_line, data_bin, size=nbytes, endian=endian) data_length = length(data_line) @@ -55,7 +60,11 @@ binarize = function(data_ascii, data_bin_file, nb_per_chunk, } if (is_matrix) + { + # Data is processed by chunks; although this may not be so useful for (normal) matrix + # input, it could for a file-backed big.matrix. It's easier to follow a unified pattern. index = 1 + } repeat { if (is_matrix) @@ -67,10 +76,14 @@ binarize = function(data_ascii, data_bin_file, nb_per_chunk, double(0) index = index + nb_per_chunk } - else + else #connection data_chunk = scan(data_ascii, double(), sep=sep, nlines=nb_per_chunk, quiet=TRUE) + + # Data size is unknown in the case of a connection if (length(data_chunk)==0) break + + # Write this chunk of data to the binary file writeBin(data_chunk, data_bin, size=nbytes, endian=endian) } @@ -91,35 +104,47 @@ binarize = function(data_ascii, data_bin_file, nb_per_chunk, binarizeTransform = function(getData, transform, data_bin_file, nb_per_chunk, nbytes=4, endian=.Platform$endian) { - nb_items = 0 + nb_items = 0 #side-effect: store the number of transformed items index = 1 repeat { + # Retrieve a chunk of data in a binary file (generally obtained by binarize()) data_chunk = getData((index-1)+seq_len(nb_per_chunk)) if (is.null(data_chunk)) break + + # Apply transformation on the current chunk (by columns) transformed_chunk = transform(data_chunk) + + # Save the result in binary format binarize(transformed_chunk, data_bin_file, nb_per_chunk, ",", nbytes, endian) + index = index + nb_per_chunk nb_items = nb_items + nrow(data_chunk) } - nb_items + nb_items #number of transformed items } #' @rdname de_serialize #' @export getDataInFile = function(indices, data_bin_file, nbytes=4, endian=.Platform$endian) { - data_bin = file(data_bin_file, "rb") - data_size = file.info(data_bin_file)$size + data_bin = file(data_bin_file, "rb") #source binary file + + data_size = file.info(data_bin_file)$size #number of bytes in the file + # data_length: length of a vector in the binary file (first element, 8 bytes) data_length = readBin(data_bin, "integer", n=1, size=8, endian=endian) + + # Seek all 'indices' columns in the binary file, using data_length and nbytes + # to compute the offset ( index i at 8 + i*data_length*nbytes ) data_ascii = do.call( cbind, lapply( indices, function(i) { offset = 8+(i-1)*data_length*nbytes if (offset > data_size) return (NULL) - ignored = seek(data_bin, offset) + ignored = seek(data_bin, offset) #position cursor at computed offset readBin(data_bin, "double", n=data_length, size=nbytes, endian=endian) } ) ) close(data_bin) - data_ascii + + data_ascii #retrieved data, in columns } diff --git a/epclust/R/main.R b/epclust/R/main.R index 2af6f90..bcc650a 100644 --- a/epclust/R/main.R +++ b/epclust/R/main.R @@ -4,29 +4,35 @@ #' two stage procedure in parallel (see details). #' Input series must be sampled on the same time grid, no missing values. #' -#' @details Summary of the function execution flow: +#' Summary of the function execution flow: +#' \enumerate{ +#' \item Compute and serialize all contributions, obtained through discrete wavelet +#' decomposition (see Antoniadis & al. [2013]) +#' \item Divide series into \code{ntasks} groups to process in parallel. In each task: #' \enumerate{ -#' \item Compute and serialize all contributions, obtained through discrete wavelet -#' decomposition (see Antoniadis & al. [2013]) -#' \item Divide series into \code{ntasks} groups to process in parallel. In each task: -#' \enumerate{ -#' \item iterate the first clustering algorithm on its aggregated outputs, -#' on inputs of size \code{nb_items_clust1} -#' \item optionally, if WER=="mix": -#' a) compute the K1 synchrones curves, -#' b) compute WER distances (K1xK1 matrix) between synchrones and -#' c) apply the second clustering algorithm -#' } -#' \item Launch a final task on the aggregated outputs of all previous tasks: -#' in the case WER=="end" this task takes indices in input, otherwise -#' (medoid) curves +#' \item iterate the first clustering algorithm on its aggregated outputs, +#' on inputs of size \code{nb_items_clust1} +#' \item optionally, if WER=="mix": +#' a) compute the K1 synchrones curves, +#' b) compute WER distances (K1xK1 matrix) between synchrones and +#' c) apply the second clustering algorithm #' } -#' The main argument -- \code{getSeries} -- has a quite misleading name, since it can be -#' either a [big.]matrix, a CSV file, a connection or a user function to retrieve -#' series; the name was chosen because all types of arguments are converted to a function. -#' When \code{getSeries} is given as a function, it must take a single argument, -#' 'indices', integer vector equal to the indices of the curves to retrieve; -#' see SQLite example. The nature and role of other arguments should be clear +#' \item Launch a final task on the aggregated outputs of all previous tasks: +#' in the case WER=="end" this task takes indices in input, otherwise +#' (medoid) curves +#' } +#' \cr +#' The main argument -- \code{getSeries} -- has a quite misleading name, since it can be +#' either a [big.]matrix, a CSV file, a connection or a user function to retrieve +#' series; the name was chosen because all types of arguments are converted to a function. +#' When \code{getSeries} is given as a function, it must take a single argument, +#' 'indices', integer vector equal to the indices of the curves to retrieve; +#' see SQLite example. The nature and role of other arguments should be clear +#' \cr +#' Note: Since we don't make assumptions on initial data, there is a possibility that +#' even when serialized, contributions or synchrones do not fit in RAM. For example, +#' 30e6 series of length 100,000 would lead to a +4Go contribution matrix. Therefore, +#' it's safer to place these in (binary) files; that's what we do. #' #' @param getSeries Access to the (time-)series, which can be of one of the three #' following types: @@ -55,7 +61,6 @@ #' @param contrib_type Type of contribution: "relative", "logit" or "absolute" (any prefix) #' @param WER "end" to apply stage 2 after stage 1 has fully iterated, or "mix" to apply #' stage 2 at the end of each task -#' @param sync_mean TRUE to compute a synchrone as a mean curve, FALSE for a sum #' @param random TRUE (default) for random chunks repartition #' @param ntasks Number of tasks (parallel iterations to obtain K1 [if WER=="end"] #' or K2 [if WER=="mix"] medoids); default: 1. @@ -137,17 +142,12 @@ #' digest::sha1(medoids_db) #' } #' @export -claws <- function(getSeries, K1, K2, nb_series_per_chunk, - nb_items_clust1=7*K1, +claws <- function(getSeries, K1, K2, nb_series_per_chunk, nb_items_clust1=7*K1, algoClust1=function(data,K) cluster::pam(t(data),K,diss=FALSE)$id.med, algoClust2=function(dists,K) cluster::pam(dists,K,diss=TRUE)$id.med, - wav_filt="d8", contrib_type="absolute", - WER="end",sync_mean=TRUE, - random=TRUE, - ntasks=1, ncores_tasks=1, ncores_clust=4, - sep=",", - nbytes=4, endian=.Platform$endian, - verbose=FALSE, parll=TRUE) + wav_filt="d8", contrib_type="absolute", WER="end", random=TRUE, + ntasks=1, ncores_tasks=1, ncores_clust=4, sep=",", nbytes=4, + endian=.Platform$endian, verbose=FALSE, parll=TRUE) { # Check/transform arguments if (!is.matrix(getSeries) && !bigmemory::is.big.matrix(getSeries) @@ -173,7 +173,6 @@ claws <- function(getSeries, K1, K2, nb_series_per_chunk, stop("'contrib_type' in {'relative','absolute','logit'}") if (WER!="end" && WER!="mix") stop("'WER': in {'end','mix'}") - sync_mean <- .toLogical(sync_mean) random <- .toLogical(random) ntasks <- .toInteger(ntasks, function(x) x>=1) ncores_tasks <- .toInteger(ncores_tasks, function(x) x>=1) @@ -184,27 +183,20 @@ claws <- function(getSeries, K1, K2, nb_series_per_chunk, verbose <- .toLogical(verbose) parll <- .toLogical(parll) - # Since we don't make assumptions on initial data, there is a possibility that even - # when serialized, contributions or synchrones do not fit in RAM. For example, - # 30e6 series of length 100,000 would lead to a +4Go contribution matrix. Therefore, - # it's safer to place these in (binary) files, located in the following folder. - bin_dir <- ".epclust_bin/" - dir.create(bin_dir, showWarnings=FALSE, mode="0755") - # Binarize series if getSeries is not a function; the aim is to always use a function, # to uniformize treatments. An equally good alternative would be to use a file-backed - # bigmemory::big.matrix, but it would break the uniformity. + # bigmemory::big.matrix, but it would break the "all-is-function" pattern. if (!is.function(getSeries)) { if (verbose) cat("...Serialize time-series\n") - series_file = paste(bin_dir,"data",sep="") ; unlink(series_file) + series_file = ".series.bin" ; unlink(series_file) binarize(getSeries, series_file, nb_series_per_chunk, sep, nbytes, endian) getSeries = function(inds) getDataInFile(inds, series_file, nbytes, endian) } # Serialize all computed wavelets contributions into a file - contribs_file = paste(bin_dir,"contribs",sep="") ; unlink(contribs_file) + contribs_file = ".contribs.bin" ; unlink(contribs_file) index = 1 nb_curves = 0 if (verbose) @@ -221,8 +213,8 @@ claws <- function(getSeries, K1, K2, nb_series_per_chunk, if (nb_series_per_task < K2) stop("Too many tasks: less series in one task than final number of clusters") - # Generate a random permutation of 1:N (if random==TRUE); otherwise just use arrival - # (storage) order. + # Generate a random permutation of 1:N (if random==TRUE); + # otherwise just use arrival (storage) order. indices_all = if (random) sample(nb_curves) else seq_len(nb_curves) # Split (all) indices into ntasks groups of ~same size indices_tasks = lapply(seq_len(ntasks), function(i) { @@ -236,9 +228,9 @@ claws <- function(getSeries, K1, K2, nb_series_per_chunk, # under Linux. All necessary variables are passed to the workers. cl = parallel::makeCluster(ncores_tasks, outfile="") varlist = c("getSeries","getContribs","K1","K2","algoClust1","algoClust2", - "nb_series_per_chunk","nb_items_clust1","ncores_clust","sep", - "nbytes","endian","verbose","parll") - if (WER=="mix") + "nb_series_per_chunk","nb_items_clust1","ncores_clust", + "sep","nbytes","endian","verbose","parll") + if (WER=="mix" && ntasks>1) varlist = c(varlist, "medoids_file") parallel::clusterExport(cl, varlist, envir = environment()) } @@ -249,19 +241,19 @@ claws <- function(getSeries, K1, K2, nb_series_per_chunk, # where n = N / ntasks, N being the total number of curves. runTwoStepClustering = function(inds) { - # When running in parallel, the environment is blank: we need to load required + # When running in parallel, the environment is blank: we need to load the required # packages, and pass useful variables. if (parll && ntasks>1) require("epclust", quietly=TRUE) indices_medoids = clusteringTask1( inds, getContribs, K1, algoClust1, nb_series_per_chunk, ncores_clust, verbose, parll) - if (WER=="mix") + if (WER=="mix" && ntasks>1) { - if (parll && ntasks>1) + if (parll) require("bigmemory", quietly=TRUE) medoids1 = bigmemory::as.big.matrix( getSeries(indices_medoids) ) medoids2 = clusteringTask2(medoids1, K2, algoClust2, getSeries, nb_curves, - nb_series_per_chunk, sync_mean, nbytes, endian, ncores_clust, verbose, parll) + nb_series_per_chunk, nbytes, endian, ncores_clust, verbose, parll) binarize(medoids2, medoids_file, nb_series_per_chunk, sep, nbytes, endian) return (vector("integer",0)) } @@ -271,8 +263,8 @@ claws <- function(getSeries, K1, K2, nb_series_per_chunk, # Synchrones (medoids) need to be stored only if WER=="mix"; indeed in this case, every # task output is a set of new (medoids) curves. If WER=="end" however, output is just a # set of indices, representing some initial series. - if (WER=="mix") - {medoids_file = paste(bin_dir,"medoids",sep="") ; unlink(medoids_file)} + if (WER=="mix" && ntasks>1) + {medoids_file = ".medoids.bin" ; unlink(medoids_file)} if (verbose) { @@ -283,8 +275,7 @@ claws <- function(getSeries, K1, K2, nb_series_per_chunk, } # As explained above, indices will be assigned to ntasks*K1 medoids indices [if WER=="end"], - # or nothing (empty vector) if WER=="mix"; in this case, medoids (synchrones) are stored - # in a file. + # or nothing (empty vector) if WER=="mix"; in this case, synchrones are stored in a file. indices <- if (parll && ntasks>1) unlist( parallel::parLapply(cl, indices_tasks, runTwoStepClustering) ) @@ -294,14 +285,14 @@ claws <- function(getSeries, K1, K2, nb_series_per_chunk, parallel::stopCluster(cl) # Right before the final stage, two situations are possible: - # a. data to be processed now sit in binary format in medoids_file (if WER=="mix") + # a. data to be processed now sit in a binary format in medoids_file (if WER=="mix") # b. data still is the initial set of curves, referenced by the ntasks*K1 indices # So, the function getSeries() will potentially change. However, computeSynchrones() # requires a function retrieving the initial series. Thus, the next line saves future # conditional instructions. getRefSeries = getSeries - if (WER=="mix") + if (WER=="mix" && ntasks>1) { indices = seq_len(ntasks*K2) # Now series (synchrones) must be retrieved from medoids_file @@ -316,9 +307,6 @@ claws <- function(getSeries, K1, K2, nb_series_per_chunk, contribs_file, nb_series_per_chunk, nbytes, endian) } -#TODO: check THAT - - # Run step2 on resulting indices or series (from file) if (verbose) cat("...Run final // stage 1 + stage 2\n") @@ -326,10 +314,12 @@ claws <- function(getSeries, K1, K2, nb_series_per_chunk, nb_series_per_chunk, ncores_tasks*ncores_clust, verbose, parll) medoids1 = bigmemory::as.big.matrix( getSeries(indices_medoids) ) medoids2 = clusteringTask2(medoids1, K2, algoClust2, getRefSeries, nb_curves, - nb_series_per_chunk, sync_mean, nbytes, endian, ncores_tasks*ncores_clust, verbose, parll) + nb_series_per_chunk, nbytes, endian, ncores_tasks*ncores_clust, verbose, parll) - # Cleanup: remove temporary binary files and their folder - unlink(bin_dir, recursive=TRUE) + # Cleanup: remove temporary binary files + tryCatch( + {unlink(series_file); unlink(contribs_file); unlink(medoids_file)}, + error = function(e) {}) # Return medoids as a standard matrix, since K2 series have to fit in RAM # (clustering algorithm 1 takes K1 > K2 of them as input) @@ -352,10 +342,12 @@ curvesToContribs = function(series, wav_filt, contrib_type, coin=FALSE) series = as.matrix(series) #1D serie could occur L = nrow(series) D = ceiling( log2(L) ) + # Series are interpolated to all have length 2^D nb_sample_points = 2^D apply(series, 2, function(x) { interpolated_curve = spline(1:L, x, n=nb_sample_points)$y W = wavelets::dwt(interpolated_curve, filter=wav_filt, D)@W + # Compute the sum of squared discrete wavelet coefficients, for each scale nrj = rev( sapply( W, function(v) ( sqrt( sum(v^2) ) ) ) ) if (contrib_type!="absolute") nrj = nrj / sum(nrj) diff --git a/epclust/src/computeMedoidsIndices.cpp b/epclust/src/computeMedoidsIndices.cpp index f247584..895031a 100644 --- a/epclust/src/computeMedoidsIndices.cpp +++ b/epclust/src/computeMedoidsIndices.cpp @@ -10,33 +10,39 @@ using namespace Rcpp; //' computeMedoidsIndices //' -//' Compute medoids indices +//' For each column of the 'series' matrix input, search for the closest medoid +//' (euclidian distance) and store its index //' -//' @param pMedoids External pointer -//' @param ref_series reference series +//' @param pMedoids External pointer (a big.matrix 'address' slot in R) +//' @param series (reference) series, a matrix of size Lxn //' -//' @return A map serie number -> medoid index +//' @return An integer vector of the closest medoids indices, for each (column) serie // [[Rcpp::export]] -IntegerVector computeMedoidsIndices(SEXP pMedoids, NumericMatrix ref_series) +IntegerVector computeMedoidsIndices(SEXP pMedoids, NumericMatrix series) { + // Turn SEXP external pointer into BigMatrix (description) object XPtr pMed(pMedoids); + // medoids: access to the content of the BigMatrix object MatrixAccessor medoids = MatrixAccessor(*pMed); - int nb_series = ref_series.ncol(), + + int nb_series = series.ncol(), K = pMed->ncol(), L = pMed->nrow(); IntegerVector mi(nb_series); - for (int i=0; i -#include // Rprintf() -//#include // user interrupts -#include -#include - -#include - using namespace Rcpp; //' filter //' -//' Filter time-series +//' Filter [time-]series by replacing all values by the moving average of previous, current and +//' next value. Possible extensions: larger window, gaussian kernel... (but would be costly!). +//' Note: border values are unchanged. //' -//' @param cwt Continuous wavelets transform +//' @param cwt Continuous wavelets transform (in columns): a matrix of size LxD //' -//' @return The filtered CWT +//' @return The filtered CWT, in a matrix of same size (LxD) // [[Rcpp::export]] RcppExport SEXP epclustFilter(SEXP cwt_) { + // NOTE: C-style for better efficiency (this must be as fast as possible) int L = INTEGER(Rf_getAttrib(cwt_, R_DimSymbol))[0], D = INTEGER(Rf_getAttrib(cwt_, R_DimSymbol))[1]; double *cwt = REAL(cwt_); - SEXP fcwt_; + + SEXP fcwt_; //the filtered result PROTECT(fcwt_ = Rf_allocMatrix(REALSXP, L, D)); - double* fcwt = REAL(fcwt_); //(double*)malloc(L*D*sizeof(double)); + double* fcwt = REAL(fcwt_); //pointer to the encapsulated vector - //TODO: coding style is terrible... no time for now. - for (int col=0; col=0; col++) { - double v1 = cwt[0]; - double ma = v1 + cwt[1] + cwt[2]; + double v1 = cwt[0]; //first value + double ms = v1 + cwt[1] + cwt[2]; //moving sum at second value for (int i=1; i1, 2->2, 3->3, 4->1, ..., 149->2, 150->3, ... (is base==3) +# Shorthand: map 1->1, 2->2, 3->3, 4->1, ..., 149->2, 150->3, ... (is base==3) I = function(i, base) (i-1) %% base + 1 -# NOTE: medoids can be a big.matrix +# Compute the sum of (normalized) sum of squares of closest distances to a medoid. +# Note: medoids can be a big.matrix computeDistortion = function(series, medoids) { + if (bigmemory::is.big.matrix(medoids)) + medoids = medoids[,] #extract standard matrix + n = nrow(series) ; L = ncol(series) distortion = 0. - if (bigmemory::is.big.matrix(medoids)) - medoids = medoids[,] for (i in seq_len(n)) distortion = distortion + min( colSums( sweep(medoids,1,series[,i],'-')^2 ) / L ) + distortion / n } diff --git a/epclust/tests/testthat/helper.computeMedoidsIndices.R b/epclust/tests/testthat/helper.computeMedoidsIndices.R index 9342feb..cd6a30e 100644 --- a/epclust/tests/testthat/helper.computeMedoidsIndices.R +++ b/epclust/tests/testthat/helper.computeMedoidsIndices.R @@ -1,10 +1,12 @@ -#R-equivalent of computeMedoidsIndices, requiring a matrix -#(thus potentially breaking "fit-in-memory" hope) +# R-equivalent of computeMedoidsIndices, requiring a matrix +# (thus potentially breaking "fit-in-memory" hope) R_computeMedoidsIndices <- function(medoids, series) { - nb_series = ncol(series) + nb_series = ncol(series) #series in columns + mi = rep(NA,nb_series) for (i in 1:nb_series) mi[i] <- which.min( colSums( sweep(medoids, 1, series[,i], '-')^2 ) ) + mi } diff --git a/epclust/tests/testthat/test.clustering.R b/epclust/tests/testthat/test.clustering.R index e22835a..c10f820 100644 --- a/epclust/tests/testthat/test.clustering.R +++ b/epclust/tests/testthat/test.clustering.R @@ -21,11 +21,11 @@ test_that("computeSynchrones behave as expected", if (length(indices)>0) series[,indices] else NULL } synchrones = computeSynchrones(bigmemory::as.big.matrix(cbind(s1,s2,s3)), getRefSeries, - n, 100, sync_mean=TRUE, verbose=TRUE, parll=FALSE) + n, 100, verbose=TRUE, parll=FALSE) expect_equal(dim(synchrones), c(L,K)) for (i in 1:K) - expect_equal(synchrones[,i], s[[i]], tolerance=0.01) + expect_equal(synchrones[,i]/100, s[[i]], tolerance=0.01) }) # Helper function to divide indices into balanced sets @@ -105,7 +105,7 @@ test_that("clusteringTask2 behave as expected", medoids_K1 = bigmemory::as.big.matrix( sapply( 1:K1, function(i) s[[I(i,K1)]] ) ) algoClust2 = function(dists,K) cluster::pam(dists,K,diss=TRUE)$id.med medoids_K2 = clusteringTask2(medoids_K1, K2, algoClust2, getRefSeries, - n, 75, sync_mean=TRUE, verbose=TRUE, parll=FALSE) + n, 75, verbose=TRUE, parll=FALSE) expect_equal(dim(medoids_K2), c(L,K2)) # Not easy to evaluate result: at least we expect it to be better than random selection of diff --git a/epclust/tests/testthat/test.wavelets.R b/epclust/tests/testthat/test.wavelets.R index 5eb19c5..96f9db3 100644 --- a/epclust/tests/testthat/test.wavelets.R +++ b/epclust/tests/testthat/test.wavelets.R @@ -1,3 +1,21 @@ -#TODO: test computeWerDists (reference result? Jairo?) +context("wavelets discrete & continuous") -#TODO: test sur curvesToCoefs! ref results ? +test_that("curvesToContribs behave as expected", +{ + curvesToContribs(...) +}) + +test_that("computeWerDists output correct results", +{ + nbytes = 8 + endian = "little" + + # On two identical series + serie = rnorm(212, sd=5) + synchrones = cbind(serie, serie) + dists = computeWerDists(synchrones, nbytes,endian,verbose=TRUE,parll=FALSE) + expect_equal(dists, matrix(0.,nrow=2,ncol=2)) + + # On two constant series + # TODO:... +})