From: Benjamin Auder Date: Wed, 8 Mar 2017 00:52:24 +0000 (+0100) Subject: option to run sequentially. various fixes. R CMD check OK X-Git-Url: https://git.auder.net/js/%7B%7B%20path%28%27mixstore_store_usecase_delete%27%2C%20%7B%20id:%20ucs.id%20%7D%29%20%7D%7D?a=commitdiff_plain;h=492cd9e74a79cbcc0ecde55fa3071a44b7e463dc;p=epclust.git option to run sequentially. various fixes. R CMD check OK --- diff --git a/TODO b/TODO index 275c10d..1788ce6 100644 --- a/TODO +++ b/TODO @@ -38,3 +38,10 @@ utiliser Rcpp ? # x <- x / sqrt(rowSums(x^2)) # x %*% t(x) #} + +#TODO: soften condition clustering.R line 37 ? +#regarder mapply et mcmapply pour le // (pas OK pour Windows ou GUI... mais ?) +#TODO: map-reduce more appropriate R/clustering.R ligne 88 +#Alternative: use bigmemory to share series when CSV or matrix(...) + +#' @importFrom synchronicity boost.mutex lock unlock diff --git a/epclust/DESCRIPTION b/epclust/DESCRIPTION index 66d1712..304fdff 100644 --- a/epclust/DESCRIPTION +++ b/epclust/DESCRIPTION @@ -18,8 +18,10 @@ Imports: parallel, cluster, wavelets, + bigmemory, Rwave Suggests: + synchronicity, devtools, testthat, MASS, diff --git a/epclust/R/a_NAMESPACE.R b/epclust/R/a_NAMESPACE.R index 3453108..89aa453 100644 --- a/epclust/R/a_NAMESPACE.R +++ b/epclust/R/a_NAMESPACE.R @@ -9,5 +9,5 @@ #' @importFrom stats filter spline #' @importFrom utils tail #' @importFrom methods is +#' @importFrom bigmemory as.big.matrix is.big.matrix NULL - diff --git a/epclust/R/clustering.R b/epclust/R/clustering.R index 6408370..74d009e 100644 --- a/epclust/R/clustering.R +++ b/epclust/R/clustering.R @@ -1,15 +1,16 @@ #' @name clustering #' @rdname clustering -#' @aliases clusteringTask computeClusters1 computeClusters2 +#' @aliases clusteringTask1 computeClusters1 computeClusters2 #' -#' @title Two-stages clustering, withing one task (see \code{claws()}) +#' @title Two-stage clustering, withing one task (see \code{claws()}) #' -#' @description \code{clusteringTask()} runs one full task, which consists in iterated stage 1 -#' clustering (on nb_curves / ntasks energy contributions, computed through discrete -#' wavelets coefficients). \code{computeClusters1()} and \code{computeClusters2()} -#' correspond to the atomic clustering procedures respectively for stage 1 and 2. -#' The former applies the clustering algorithm (PAM) on a contributions matrix, while -#' the latter clusters a chunk of series inside one task (~max nb_series_per_chunk) +#' @description \code{clusteringTask1()} runs one full stage-1 task, which consists in +#' iterated stage 1 clustering (on nb_curves / ntasks energy contributions, computed +#' through discrete wavelets coefficients). \code{computeClusters1()} and +#' \code{computeClusters2()} correspond to the atomic clustering procedures respectively +#' for stage 1 and 2. The former applies the clustering algorithm (PAM) on a +#' contributions matrix, while the latter clusters a chunk of series inside one task +#' (~max nb_series_per_chunk) #' #' @param indices Range of series indices to cluster in parallel (initial data) #' @param getContribs Function to retrieve contributions from initial series indices: @@ -18,7 +19,7 @@ #' @inheritParams computeSynchrones #' @inheritParams claws #' -#' @return For \code{clusteringTask()} and \code{computeClusters1()}, the indices of the +#' @return For \code{clusteringTask1()} and \code{computeClusters1()}, the indices of the #' computed (K1) medoids. Indices are irrelevant for stage 2 clustering, thus #' \code{computeClusters2()} outputs a matrix of medoids #' (of size limited by nb_series_per_chunk) @@ -26,42 +27,36 @@ NULL #' @rdname clustering #' @export -clusteringTask = function(indices, getContribs, K1, nb_series_per_chunk, ncores_clust) +clusteringTask1 = function( + indices, getContribs, K1, nb_series_per_chunk, ncores_clust=1, verbose=FALSE, parll=TRUE) { + if (verbose) + cat(paste("*** Clustering task on ",length(indices)," lines\n", sep="")) -#NOTE: comment out parallel sections for debugging -#propagate verbose arg ?! + wrapComputeClusters1 = function(inds) { + if (parll) + require("epclust", quietly=TRUE) + if (verbose) + cat(paste(" computeClusters1() on ",length(inds)," lines\n", sep="")) + inds[ computeClusters1(getContribs(inds), K1) ] + } -# cl = parallel::makeCluster(ncores_clust) -# parallel::clusterExport(cl, varlist=c("getContribs","K1"), envir=environment()) - repeat + if (parll) { - -print(length(indices)) - - nb_workers = max( 1, floor( length(indices) / nb_series_per_chunk ) ) - indices_workers = lapply( seq_len(nb_workers), function(i) - indices[(nb_series_per_chunk*(i-1)+1):(nb_series_per_chunk*i)] ) - # Spread the remaining load among the workers - rem = length(indices) %% nb_series_per_chunk - while (rem > 0) - { - index = rem%%nb_workers + 1 - indices_workers[[index]] = c(indices_workers[[index]], tail(indices,rem)) - rem = rem - 1 - } -# indices = unlist( parallel::parLapply( cl, indices_workers, function(inds) { - indices = unlist( lapply( indices_workers, function(inds) { -# require("epclust", quietly=TRUE) - -print(paste(" ",length(inds))) ## PROBLEME ICI : 21104 ??! - - inds[ computeClusters1(getContribs(inds), K1) ] - } ) ) - if (length(indices) == K1) - break + cl = parallel::makeCluster(ncores_clust) + parallel::clusterExport(cl, varlist=c("getContribs","K1","verbose"), envir=environment()) } -# parallel::stopCluster(cl) + while (length(indices) > K1) + { + indices_workers = .spreadIndices(indices, nb_series_per_chunk) + if (parll) + indices = unlist( parallel::parLapply(cl, indices_workers, wrapComputeClusters1) ) + else + indices = unlist( lapply(indices_workers, wrapComputeClusters1) ) + } + if (parll) + parallel::stopCluster(cl) + indices #medoids } @@ -72,10 +67,13 @@ computeClusters1 = function(contribs, K1) #' @rdname clustering #' @export -computeClusters2 = function(medoids, K2, getRefSeries, nb_series_per_chunk) +computeClusters2 = function(medoids, K2, + getRefSeries, nb_ref_curves, nb_series_per_chunk, ncores_clust=1,verbose=FALSE,parll=TRUE) { - synchrones = computeSynchrones(medoids, getRefSeries, nb_series_per_chunk) - medoids[ cluster::pam(computeWerDists(synchrones), K2, diss=TRUE)$medoids , ] + synchrones = computeSynchrones(medoids, + getRefSeries, nb_ref_curves, nb_series_per_chunk, ncores_clust, verbose, parll) + distances = computeWerDists(synchrones, ncores_clust, verbose, parll) + medoids[ cluster::pam(distances, K2, diss=TRUE)$medoids , ] } #' computeSynchrones @@ -86,34 +84,67 @@ computeClusters2 = function(medoids, K2, getRefSeries, nb_series_per_chunk) #' @param medoids Matrix of medoids (curves of same legnth as initial series) #' @param getRefSeries Function to retrieve initial series (e.g. in stage 2 after series #' have been replaced by stage-1 medoids) +#' @param nb_ref_curves How many reference series? (This number is known at this stage) #' @inheritParams claws #' #' @export -computeSynchrones = function(medoids, getRefSeries, nb_series_per_chunk) +computeSynchrones = function(medoids, getRefSeries, + nb_ref_curves, nb_series_per_chunk, ncores_clust=1,verbose=FALSE,parll=TRUE) { - K = nrow(medoids) - synchrones = matrix(0, nrow=K, ncol=ncol(medoids)) - counts = rep(0,K) - index = 1 - repeat + computeSynchronesChunk = function(indices) { - range = (index-1) + seq_len(nb_series_per_chunk) - ref_series = getRefSeries(range) - if (is.null(ref_series)) - break + ref_series = getRefSeries(indices) #get medoids indices for this chunk of series for (i in seq_len(nrow(ref_series))) { j = which.min( rowSums( sweep(medoids, 2, ref_series[i,], '-')^2 ) ) + if (parll) + synchronicity::lock(m) synchrones[j,] = synchrones[j,] + ref_series[i,] - counts[j] = counts[j] + 1 + counts[j,1] = counts[j,1] + 1 + if (parll) + synchronicity::unlock(m) + } + } + + K = nrow(medoids) + # Use bigmemory (shared==TRUE by default) + synchronicity to fill synchrones in // + synchrones = bigmemory::big.matrix(nrow=K,ncol=ncol(medoids),type="double",init=0.) + counts = bigmemory::big.matrix(nrow=K,ncol=1,type="double",init=0) + # Fork (// run) only on Linux & MacOS; on Windows: run sequentially + parll = (requireNamespace("synchronicity",quietly=TRUE) + && parll && Sys.info()['sysname'] != "Windows") + if (parll) + m <- synchronicity::boost.mutex() + + indices_workers = .spreadIndices(seq_len(nb_ref_curves), nb_series_per_chunk) + for (inds in indices_workers) + { + if (verbose) + { + cat(paste("--- Compute synchrones for indices range ", + min(inds)," -> ",max(inds),"\n", sep="")) } - index = index + nb_series_per_chunk + if (parll) + ignored <- parallel::mcparallel(computeSynchronesChunk(inds)) + else + computeSynchronesChunk(inds) + } + if (parll) + parallel::mccollect() + + mat_syncs = matrix(nrow=K, ncol=ncol(medoids)) + vec_count = rep(NA, K) + #TODO: can we avoid this loop? + for (i in seq_len(K)) + { + mat_syncs[i,] = synchrones[i,] + vec_count[i] = counts[i,1] } #NOTE: odds for some clusters to be empty? (when series already come from stage 2) # ...maybe; but let's hope resulting K1' be still quite bigger than K2 - synchrones = sweep(synchrones, 1, counts, '/') - synchrones[ sapply(seq_len(K), function(i) all(!is.nan(synchrones[i,]))) , ] + mat_syncs = sweep(mat_syncs, 1, vec_count, '/') + mat_syncs[ sapply(seq_len(K), function(i) all(!is.nan(mat_syncs[i,]))) , ] } #' computeWerDists @@ -122,10 +153,11 @@ computeSynchrones = function(medoids, getRefSeries, nb_series_per_chunk) #' returned (e.g.) by \code{computeSynchrones()} #' #' @param synchrones A matrix of synchrones, in rows. The series have same length as the -#' series in the initial dataset +#' series in the initial dataset +#' @inheritParams claws #' #' @export -computeWerDists = function(synchrones) +computeWerDists = function(synchrones, ncores_clust=1,verbose=FALSE,parll=TRUE) { n <- nrow(synchrones) delta <- ncol(synchrones) @@ -143,8 +175,10 @@ computeWerDists = function(synchrones) s0log = as.integer( (log2( s0*w0/(2*pi) ) - 1) * nvoice + 1.5 ) totnoct = noctave + as.integer(s0log/nvoice) + 1 - # (normalized) observations node with CWT - Xcwt4 <- lapply(seq_len(n), function(i) { + computeCWT = function(i) + { + if (verbose) + cat(paste("+++ Compute Rwave::cwt() on serie ",i,"\n", sep="")) ts <- scale(ts(synchrones[i,]), center=TRUE, scale=scaled) totts.cwt = Rwave::cwt(ts,totnoct,nvoice,w0,plot=0) ts.cwt = totts.cwt[,s0log:(s0log+noctave*nvoice)] @@ -152,24 +186,96 @@ computeWerDists = function(synchrones) sqs <- sqrt(2^(0:(noctave*nvoice)/nvoice)*s0) sqres <- sweep(ts.cwt,MARGIN=2,sqs,'*') sqres / max(Mod(sqres)) - }) + } - Xwer_dist <- matrix(0., n, n) + if (parll) + { + cl = parallel::makeCluster(ncores_clust) + parallel::clusterExport(cl, varlist=c("getContribs","K1","verbose"), envir=environment()) + } + + # (normalized) observations node with CWT + Xcwt4 <- + if (parll) + parallel::parLapply(cl, seq_len(n), computeCWT) + else + lapply(seq_len(n), computeCWT) + + if (parll) + parallel::stopCluster(cl) + + Xwer_dist <- bigmemory::big.matrix(nrow=n, ncol=n, type="double") fcoefs = rep(1/3, 3) #moving average on 3 values (TODO: very slow! correct?!) - for (i in 1:(n-1)) + if (verbose) + cat("*** Compute WER distances from CWT\n") + + computeDistancesLineI = function(i) { + if (verbose) + cat(paste(" Line ",i,"\n", sep="")) for (j in (i+1):n) { - #TODO: later, compute CWT here (because not enough storage space for 200k series) - # 'circular=TRUE' is wrong, should just take values on the sides; to rewrite in C + #TODO: 'circular=TRUE' is wrong, should just take values on the sides; to rewrite in C num <- filter(Mod(Xcwt4[[i]] * Conj(Xcwt4[[j]])), fcoefs, circular=TRUE) WX <- filter(Mod(Xcwt4[[i]] * Conj(Xcwt4[[i]])), fcoefs, circular=TRUE) WY <- filter(Mod(Xcwt4[[j]] * Conj(Xcwt4[[j]])), fcoefs, circular=TRUE) wer2 <- sum(colSums(num)^2) / sum( sum(colSums(WX) * colSums(WY)) ) + if (parll) + synchronicity::lock(m) Xwer_dist[i,j] <- sqrt(delta * ncol(Xcwt4[[1]]) * (1 - wer2)) Xwer_dist[j,i] <- Xwer_dist[i,j] + if (parll) + synchronicity::unlock(m) + } + Xwer_dist[i,i] = 0. + } + + parll = (requireNamespace("synchronicity",quietly=TRUE) + && parll && Sys.info()['sysname'] != "Windows") + if (parll) + m <- synchronicity::boost.mutex() + + for (i in 1:(n-1)) + { + if (parll) + ignored <- parallel::mcparallel(computeDistancesLineI(i)) + else + computeDistancesLineI(i) + } + Xwer_dist[n,n] = 0. + + if (parll) + parallel::mccollect() + + mat_dists = matrix(nrow=n, ncol=n) + #TODO: avoid this loop? + for (i in 1:n) + mat_dists[i,] = Xwer_dist[i,] + mat_dists +} + +# Helper function to divide indices into balanced sets +.spreadIndices = function(indices, nb_per_chunk) +{ + L = length(indices) + nb_workers = floor( L / nb_per_chunk ) + if (nb_workers == 0) + { + # L < nb_series_per_chunk, simple case + indices_workers = list(indices) + } + else + { + indices_workers = lapply( seq_len(nb_workers), function(i) + indices[(nb_per_chunk*(i-1)+1):(nb_per_chunk*i)] ) + # Spread the remaining load among the workers + rem = L %% nb_per_chunk + while (rem > 0) + { + index = rem%%nb_workers + 1 + indices_workers[[index]] = c(indices_workers[[index]], indices[L-rem+1]) + rem = rem - 1 } } - diag(Xwer_dist) <- numeric(n) - Xwer_dist + indices_workers } diff --git a/epclust/R/de_serialize.R b/epclust/R/de_serialize.R index 8dde258..b6684d2 100644 --- a/epclust/R/de_serialize.R +++ b/epclust/R/de_serialize.R @@ -1,12 +1,15 @@ #' @name de_serialize #' @rdname de_serialize -#' @aliases binarize getDataInFile +#' @aliases binarize binarizeTransform getDataInFile #' -#' @title (De)Serialization of a matrix +#' @title (De)Serialization of a [big]matrix or data stream #' #' @description \code{binarize()} serializes a matrix or CSV file with minimal overhead, #' into a binary file. \code{getDataInFile()} achieves the inverse task: she retrieves -#' (ASCII) data rows from indices in the binary file +#' (ASCII) data rows from indices in the binary file. Finally, +#' \code{binarizeTransform()} serialize transformations of all data chunks; to use it, +#' a data-retrieval function must be provided, thus \code{binarize} will most likely be +#' used first (and then a function defined to seek in generated binary file) #' #' @param data_ascii Either a matrix or CSV file, with items in rows #' @param indices Indices of the lines to retrieve @@ -14,9 +17,12 @@ #' or intput (\code{getDataInFile}) #' @param nb_per_chunk Number of lines to process in one batch #' @inheritParams claws +#' @param getData Function to retrieve data chunks +#' @param transform Transformation function to apply on data chunks #' #' @return For \code{getDataInFile()}, the matrix with rows corresponding to the -#' requested indices +#' requested indices. \code{binarizeTransform} returns the number of processed lines. +#' \code{binarize} is designed to serialize in several calls, thus returns nothing. NULL #' @rdname de_serialize @@ -28,6 +34,7 @@ binarize = function(data_ascii, data_bin_file, nb_per_chunk, data_ascii = file(data_ascii, open="r") else if (methods::is(data_ascii,"connection") && !isOpen(data_ascii)) open(data_ascii) + is_matrix = !methods::is(data_ascii,"connection") first_write = (!file.exists(data_bin_file) || file.info(data_bin_file)$size == 0) data_bin = file(data_bin_file, open=ifelse(first_write,"wb","ab")) @@ -37,9 +44,9 @@ binarize = function(data_ascii, data_bin_file, nb_per_chunk, { #number of items always on 8 bytes writeBin(0L, data_bin, size=8, endian=endian) - if (is.matrix(data_ascii)) + if ( is_matrix ) data_length = ncol(data_ascii) - else #if (is(data, "connection")) + else #connection { data_line = scan(data_ascii, double(), sep=sep, nlines=1, quiet=TRUE) writeBin(data_line, data_bin, size=nbytes, endian=endian) @@ -47,18 +54,17 @@ binarize = function(data_ascii, data_bin_file, nb_per_chunk, } } - if (is.matrix(data_ascii)) + if (is_matrix) index = 1 repeat { - if (is.matrix(data_ascii)) + if ( is_matrix ) { - range = index:min(nrow(data_ascii),index+nb_per_chunk) data_chunk = - if (range[1] <= nrow(data_ascii)) - as.double(t(data_ascii[range,])) + if (index <= nrow(data_ascii)) + as.double(t(data_ascii[index:min(nrow(data_ascii),index+nb_per_chunk-1),])) else - integer(0) + double(0) index = index + nb_per_chunk } else @@ -70,16 +76,36 @@ binarize = function(data_ascii, data_bin_file, nb_per_chunk, if (first_write) { - #ecrire file_size-1 / (nbytes*nbWritten) en 0 dans bin_data ! ignored == file_size + # Write data_length, = (file_size-1) / (nbytes*nbWritten) at offset 0 in data_bin ignored = seek(data_bin, 0) writeBin(data_length, data_bin, size=8, endian=endian) } close(data_bin) - if (methods::is(data_ascii,"connection")) + if ( ! is_matrix ) close(data_ascii) } +#' @rdname de_serialize +#' @export +binarizeTransform = function(getData, transform, data_bin_file, nb_per_chunk, + nbytes=4, endian=.Platform$endian) +{ + nb_items = 0 + index = 1 + repeat + { + data_chunk = getData((index-1)+seq_len(nb_per_chunk)) + if (is.null(data_chunk)) + break + transformed_chunk = transform(data_chunk) + binarize(transformed_chunk, data_bin_file, nb_per_chunk, ",", nbytes, endian) + index = index + nb_per_chunk + nb_items = nb_items + nrow(data_chunk) + } + nb_items +} + #' @rdname de_serialize #' @export getDataInFile = function(indices, data_bin_file, nbytes=4, endian=.Platform$endian) diff --git a/epclust/R/main.R b/epclust/R/main.R index a982f4c..9064dfa 100644 --- a/epclust/R/main.R +++ b/epclust/R/main.R @@ -30,6 +30,7 @@ #' @param nbytes Number of bytes to serialize a floating-point number; 4 or 8 #' @param endian Endianness to use for (de)serialization. Use "little" or "big" for portability #' @param verbose Level of verbosity (0/FALSE for nothing or 1/TRUE for all; devel stage) +#' @param parll TRUE to fully parallelize; otherwise run sequentially (debug, comparison) #' #' @return A matrix of the final medoids curves (K2) in rows #' @@ -104,13 +105,14 @@ claws = function(getSeries, K1, K2, nb_series_per_chunk=50*K1, min_series_per_chunk=5*K1, #chunk size sep=",", #ASCII input separator nbytes=4, endian=.Platform$endian, #serialization (write,read) - verbose=FALSE) + verbose=FALSE, parll=TRUE) { # Check/transform arguments - if (!is.matrix(getSeries) && !is.function(getSeries) && - !methods::is(getSeries, "connection" && !is.character(getSeries))) + if (!is.matrix(getSeries) && !bigmemory::is.big.matrix(getSeries) + && !is.function(getSeries) + && !methods::is(getSeries,"connection") && !is.character(getSeries)) { - stop("'getSeries': matrix, function, file or valid connection (no NA)") + stop("'getSeries': [big]matrix, function, file or valid connection (no NA)") } K1 = .toInteger(K1, function(x) x>=2) K2 = .toInteger(K2, function(x) x>=2) @@ -148,16 +150,9 @@ claws = function(getSeries, K1, K2, nb_curves = 0 if (verbose) cat("...Compute contributions and serialize them\n") - repeat - { - series = getSeries((index-1)+seq_len(nb_series_per_chunk)) - if (is.null(series)) - break - contribs_chunk = curvesToContribs(series, wf, ctype) - binarize(contribs_chunk, contribs_file, nb_series_per_chunk, sep, nbytes, endian) - index = index + nb_series_per_chunk - nb_curves = nb_curves + nrow(contribs_chunk) - } + nb_curves = binarizeTransform(getSeries, + function(series) curvesToContribs(series, wf, ctype), + contribs_file, nb_series_per_chunk, nbytes, endian) getContribs = function(indices) getDataInFile(indices, contribs_file, nbytes, endian) if (nb_curves < min_series_per_chunk) @@ -174,28 +169,37 @@ claws = function(getSeries, K1, K2, }) if (verbose) cat(paste("...Run ",ntasks," x stage 1 in parallel\n",sep="")) -# cl = parallel::makeCluster(ncores_tasks) -# parallel::clusterExport(cl, varlist=c("getSeries","getContribs","K1","K2", -# "nb_series_per_chunk","ncores_clust","synchrones_file","sep","nbytes","endian"), -# envir = environment()) - # 1000*K1 indices [if WER=="end"], or empty vector [if WER=="mix"] --> series on file -# indices = unlist( parallel::parLapply(cl, indices_tasks, function(inds) { - indices = unlist( lapply(indices_tasks, function(inds) { -# require("epclust", quietly=TRUE) - - browser() #TODO: CONTINUE DEBUG HERE + if (parll) + { + cl = parallel::makeCluster(ncores_tasks) + parallel::clusterExport(cl, varlist=c("getSeries","getContribs","K1","K2","verbose","parll", + "nb_series_per_chunk","ncores_clust","synchrones_file","sep","nbytes","endian"), + envir = environment()) + } - indices_medoids = clusteringTask(inds,getContribs,K1,nb_series_per_chunk,ncores_clust) + runTwoStepClustering = function(inds) + { + if (parll) + require("epclust", quietly=TRUE) + indices_medoids = clusteringTask1( + inds, getContribs, K1, nb_series_per_chunk, ncores_clust, verbose, parll) if (WER=="mix") { - medoids2 = computeClusters2( - getSeries(indices_medoids), K2, getSeries, nb_series_per_chunk) + medoids2 = computeClusters2(getSeries(indices_medoids), + K2, getSeries, nb_curves, nb_series_per_chunk, ncores_clust, verbose, parll) binarize(medoids2, synchrones_file, nb_series_per_chunk, sep, nbytes, endian) return (vector("integer",0)) } indices_medoids - }) ) -# parallel::stopCluster(cl) + } + + # 1000*K1 indices [if WER=="end"], or empty vector [if WER=="mix"] --> series on file + if (parll) + indices = unlist( parallel::parLapply(cl, indices_tasks, runTwoStepClustering) ) + else + indices = unlist( lapply(indices_tasks, runTwoStepClustering) ) + if (parll) + parallel::stopCluster(cl) getRefSeries = getSeries synchrones_file = paste(bin_dir,"synchrones",sep="") ; unlink(synchrones_file) @@ -209,23 +213,18 @@ claws = function(getSeries, K1, K2, index = 1 if (verbose) cat("...Serialize contributions computed on synchrones\n") - repeat - { - series = getSeries((index-1)+seq_len(nb_series_per_chunk)) - if (is.null(series)) - break - contribs_chunk = curvesToContribs(series, wf, ctype) - binarize(contribs_chunk, contribs_file, nb_series_per_chunk, sep, nbytes, endian) - index = index + nb_series_per_chunk - } + ignored = binarizeTransform(getSeries, + function(series) curvesToContribs(series, wf, ctype), + contribs_file, nb_series_per_chunk, nbytes, endian) } # Run step2 on resulting indices or series (from file) if (verbose) cat("...Run final // stage 1 + stage 2\n") - indices_medoids = clusteringTask( - indices, getContribs, K1, nb_series_per_chunk, ncores_tasks*ncores_clust) - medoids = computeClusters2(getSeries(indices_medoids),K2,getRefSeries,nb_series_per_chunk) + indices_medoids = clusteringTask1( + indices, getContribs, K1, nb_series_per_chunk, ncores_tasks*ncores_clust, verbose) + medoids = computeClusters2(getSeries(indices_medoids), + K2, getRefSeries, nb_curves, nb_series_per_chunk, ncores_tasks*ncores_clust, verbose) # Cleanup unlink(bin_dir, recursive=TRUE) @@ -259,7 +258,7 @@ curvesToContribs = function(series, wf, ctype) }) ) } -# Helper for main function: check integer arguments with functiional conditions +# Check integer arguments with functional conditions .toInteger <- function(x, condition) { if (!is.integer(x)) diff --git a/epclust/tests/testthat/test.clustering.R b/epclust/tests/testthat/test.clustering.R index 9333876..49afe60 100644 --- a/epclust/tests/testthat/test.clustering.R +++ b/epclust/tests/testthat/test.clustering.R @@ -63,10 +63,11 @@ test_that("computeSynchrones behave as expected", for (i in seq_len(n)) series[i,] = s[[I(i,K)]] + rnorm(L,sd=0.01) getRefSeries = function(indices) { - indices = indices[indices < n] + indices = indices[indices <= n] if (length(indices)>0) series[indices,] else NULL } - synchrones = computeSynchrones(rbind(s1,s2,s3), getRefSeries, 100) + synchrones = computeSynchrones(rbind(s1,s2,s3), getRefSeries, n, 100, + verbose=TRUE, parll=FALSE) expect_equal(dim(synchrones), c(K,L)) for (i in 1:K) @@ -95,23 +96,23 @@ test_that("computeClusters2 behave as expected", for (i in seq_len(n)) series[i,] = s[[I(i,K1)]] + rnorm(L,sd=0.01) getRefSeries = function(indices) { - indices = indices[indices < n] + indices = indices[indices <= n] if (length(indices)>0) series[indices,] else NULL } # Artificially simulate 60 medoids - perfect situation, all equal to one of the refs medoids_K1 = do.call(rbind, lapply( 1:K1, function(i) s[[I(i,K1)]] ) ) - medoids_K2 = computeClusters2(medoids_K1, K2, getRefSeries, 75) + medoids_K2 = computeClusters2(medoids_K1, K2, getRefSeries, n, 75, + verbose=TRUE, parll=FALSE) expect_equal(dim(medoids_K2), c(K2,L)) # Not easy to evaluate result: at least we expect it to be better than random selection of # medoids within 1...K1 (among references) - distorGood = computeDistortion(series, medoids_K2) for (i in 1:3) expect_lte( distorGood, computeDistortion(series,medoids_K1[sample(1:K1, K2),]) ) }) -test_that("clusteringTask + computeClusters2 behave as expected", +test_that("clusteringTask1 + computeClusters2 behave as expected", { n = 900 x = seq(0,9.5,0.1) @@ -129,8 +130,10 @@ test_that("clusteringTask + computeClusters2 behave as expected", wf = "haar" ctype = "absolute" getContribs = function(indices) curvesToContribs(series[indices,],wf,ctype) - medoids_K1 = getSeries( clusteringTask(1:n, getContribs, K1, 75, 4) ) - medoids_K2 = computeClusters2(medoids_K1, K2, getSeries, 120) + medoids_K1 = getSeries( clusteringTask1(1:n, getContribs, K1, 75, + verbose=TRUE, parll=FALSE) ) + medoids_K2 = computeClusters2(medoids_K1, K2, getSeries, n, 120, + verbose=TRUE, parll=FALSE) expect_equal(dim(medoids_K1), c(K1,L)) expect_equal(dim(medoids_K2), c(K2,L)) diff --git a/epclust/tests/testthat/test.de_serialize.R b/epclust/tests/testthat/test.de_serialize.R index a2fae5e..8403e6d 100644 --- a/epclust/tests/testthat/test.de_serialize.R +++ b/epclust/tests/testthat/test.de_serialize.R @@ -32,6 +32,33 @@ test_that("serialization + getDataInFile retrieve original data / from matrix", unlink(data_bin_file) }) +test_that("serialization + transform + getDataInFile retrieve original transforms", +{ + data_bin_file = "/tmp/epclust_test_t.bin" + unlink(data_bin_file) + + #dataset 200 lignes / 30 columns + data_ascii = matrix(runif(200*30,-10,10),ncol=30) + nbytes = 8 + endian = "little" + + binarize(data_ascii, data_bin_file, 500, ",", nbytes, endian) + # Serialize transformation (just compute range) into a new binary file + trans_bin_file = "/tmp/epclust_test_t_trans.bin" + unlink(trans_bin_file) + getSeries = function(inds) getDataInFile(inds, data_bin_file, nbytes, endian) + binarizeTransform(getSeries, function(series) t(apply(series, 1, range)), + trans_bin_file, 250, nbytes, endian) + unlink(data_bin_file) + expect_equal(file.info(trans_bin_file)$size, 2*nrow(data_ascii)*nbytes+8) + for (indices in list(c(1,3,5), 3:13, c(5,20,50), c(75,130:135), 196:200)) + { + trans_lines = getDataInFile(indices, trans_bin_file, nbytes, endian) + expect_equal(trans_lines, t(apply(data_ascii[indices,],1,range)), tolerance=1e-6) + } + unlink(trans_bin_file) +}) + test_that("serialization + getDataInFile retrieve original data / from connection", { data_bin_file = "/tmp/epclust_test_c.bin"