X-Git-Url: https://git.auder.net/?a=blobdiff_plain;f=epclust%2FR%2Fmain.R;h=2af6f90c00c85afbdc74a7e7237426ed4bd2cfa3;hb=9f05a4a0b703deffd7bdb9cd99b0aaa2246a5c83;hp=cbc92b140a6c18b4bba865f81f588834887402ff;hpb=2b9f5356793c245a5e10229a74ac0dabd8f62508;p=epclust.git diff --git a/epclust/R/main.R b/epclust/R/main.R index cbc92b1..2af6f90 100644 --- a/epclust/R/main.R +++ b/epclust/R/main.R @@ -11,7 +11,7 @@ #' \item Divide series into \code{ntasks} groups to process in parallel. In each task: #' \enumerate{ #' \item iterate the first clustering algorithm on its aggregated outputs, -#' on inputs of size \code{nb_items_clust} +#' on inputs of size \code{nb_items_clust1} #' \item optionally, if WER=="mix": #' a) compute the K1 synchrones curves, #' b) compute WER distances (K1xK1 matrix) between synchrones and @@ -39,20 +39,23 @@ #' } #' @param K1 Number of clusters to be found after stage 1 (K1 << N [number of series]) #' @param K2 Number of clusters to be found after stage 2 (K2 << K1) -#' @param nb_per_chunk (Maximum) number of items to retrieve in one batch, for both types of -#' retrieval: resp. series and contribution; in a vector of size 2 -#' @param algo_clust1 Clustering algorithm for stage 1. A function which takes (data, K) +#' @param nb_series_per_chunk (Maximum) number of series to retrieve in one batch +#' @param algoClust1 Clustering algorithm for stage 1. A function which takes (data, K) #' as argument where data is a matrix in columns and K the desired number of clusters, -#' and outputs K medoids ranks. Default: PAM -#' @param algo_clust2 Clustering algorithm for stage 2. A function which takes (dists, K) +#' and outputs K medoids ranks. Default: PAM. In our method, this function is called +#' on iterated medoids during stage 1 +#' @param algoClust2 Clustering algorithm for stage 2. A function which takes (dists, K) #' as argument where dists is a matrix of distances and K the desired number of clusters, -#' and outputs K clusters representatives (curves). Default: k-means -#' @param nb_items_clust1 (Maximum) number of items in input of the clustering algorithm -#' for stage 1 +#' and outputs K medoids ranks. Default: PAM. In our method, this function is called +#' on a matrix of K1 x K1 (WER) distances computed between synchrones +#' @param nb_items_clust1 (~Maximum) number of items in input of the clustering algorithm +#' for stage 1. At worst, a clustering algorithm might be called with ~2*nb_items_clust1 +#' items; but this could only happen at the last few iterations. #' @param wav_filt Wavelet transform filter; see ?wavelets::wt.filter #' @param contrib_type Type of contribution: "relative", "logit" or "absolute" (any prefix) #' @param WER "end" to apply stage 2 after stage 1 has fully iterated, or "mix" to apply #' stage 2 at the end of each task +#' @param sync_mean TRUE to compute a synchrone as a mean curve, FALSE for a sum #' @param random TRUE (default) for random chunks repartition #' @param ntasks Number of tasks (parallel iterations to obtain K1 [if WER=="end"] #' or K2 [if WER=="mix"] medoids); default: 1. @@ -84,12 +87,12 @@ #' series = do.call( cbind, lapply( 1:6, function(i) #' do.call(cbind, wmtsa::wavBootstrap(ref_series[i,], n.realization=400)) ) ) #' #dim(series) #c(2400,10001) -#' medoids_ascii = claws(series, K1=60, K2=6, nb_per_chunk=c(200,500), verbose=TRUE) +#' medoids_ascii = claws(series, K1=60, K2=6, 200, verbose=TRUE) #' #' # Same example, from CSV file #' csv_file = "/tmp/epclust_series.csv" #' write.table(series, csv_file, sep=",", row.names=FALSE, col.names=FALSE) -#' medoids_csv = claws(csv_file, K1=60, K2=6, nb_per_chunk=c(200,500)) +#' medoids_csv = claws(csv_file, K1=60, K2=6, 200) #' #' # Same example, from binary file #' bin_file <- "/tmp/epclust_series.bin" @@ -97,7 +100,7 @@ #' endian <- "little" #' binarize(csv_file, bin_file, 500, nbytes, endian) #' getSeries <- function(indices) getDataInFile(indices, bin_file, nbytes, endian) -#' medoids_bin <- claws(getSeries, K1=60, K2=6, nb_per_chunk=c(200,500)) +#' medoids_bin <- claws(getSeries, K1=60, K2=6, 200) #' unlink(csv_file) #' unlink(bin_file) #' @@ -124,7 +127,7 @@ #' df_series <- dbGetQuery(series_db, request) #' as.matrix(df_series[,"value"], nrow=serie_length) #' } -#' medoids_db = claws(getSeries, K1=60, K2=6, nb_per_chunk=c(200,500)) +#' medoids_db = claws(getSeries, K1=60, K2=6, 200)) #' dbDisconnect(series_db) #' #' # All computed medoids should be the same: @@ -134,12 +137,12 @@ #' digest::sha1(medoids_db) #' } #' @export -claws <- function(getSeries, K1, K2, nb_per_chunk, +claws <- function(getSeries, K1, K2, nb_series_per_chunk, nb_items_clust1=7*K1, - algo_clust1=function(data,K) cluster::pam(data,K,diss=FALSE), - algo_clust2=function(dists,K) stats::kmeans(dists,K,iter.max=50,nstart=3), - wav_filt="d8",contrib_type="absolute", - WER="end", + algoClust1=function(data,K) cluster::pam(t(data),K,diss=FALSE)$id.med, + algoClust2=function(dists,K) cluster::pam(dists,K,diss=TRUE)$id.med, + wav_filt="d8", contrib_type="absolute", + WER="end",sync_mean=TRUE, random=TRUE, ntasks=1, ncores_tasks=1, ncores_clust=4, sep=",", @@ -155,25 +158,22 @@ claws <- function(getSeries, K1, K2, nb_per_chunk, } K1 <- .toInteger(K1, function(x) x>=2) K2 <- .toInteger(K2, function(x) x>=2) - if (!is.numeric(nb_per_chunk) || length(nb_per_chunk)!=2) - stop("'nb_per_chunk': numeric, size 2") - nb_per_chunk[1] <- .toInteger(nb_per_chunk[1], function(x) x>=1) - # A batch of contributions should have at least as many elements as a batch of series, - # because it always contains much less values - nb_per_chunk[2] <- max(.toInteger(nb_per_chunk[2],function(x) x>=1), nb_per_chunk[1]) + nb_series_per_chunk <- .toInteger(nb_series_per_chunk, function(x) x>=1) + # K1 (number of clusters at step 1) cannot exceed nb_series_per_chunk, because we will need + # to load K1 series in memory for clustering stage 2. + if (K1 > nb_series_per_chunk) + stop("'K1' cannot exceed 'nb_series_per_chunk'") nb_items_clust1 <- .toInteger(nb_items_clust1, function(x) x>K1) random <- .toLogical(random) - tryCatch - ( - {ignored <- wavelets::wt.filter(wav_filt)}, - error = function(e) stop("Invalid wavelet filter; see ?wavelets::wt.filter") - ) + tryCatch( {ignored <- wavelets::wt.filter(wav_filt)}, + error = function(e) stop("Invalid wavelet filter; see ?wavelets::wt.filter") ) ctypes = c("relative","absolute","logit") contrib_type = ctypes[ pmatch(contrib_type,ctypes) ] if (is.na(contrib_type)) stop("'contrib_type' in {'relative','absolute','logit'}") if (WER!="end" && WER!="mix") stop("'WER': in {'end','mix'}") + sync_mean <- .toLogical(sync_mean) random <- .toLogical(random) ntasks <- .toInteger(ntasks, function(x) x>=1) ncores_tasks <- .toInteger(ncores_tasks, function(x) x>=1) @@ -210,7 +210,7 @@ claws <- function(getSeries, K1, K2, nb_per_chunk, if (verbose) cat("...Compute contributions and serialize them\n") nb_curves = binarizeTransform(getSeries, - function(series) curvesToContribs(series, wf, ctype), + function(series) curvesToContribs(series, wav_filt, contrib_type), contribs_file, nb_series_per_chunk, nbytes, endian) getContribs = function(indices) getDataInFile(indices, contribs_file, nbytes, endian) @@ -235,9 +235,9 @@ claws <- function(getSeries, K1, K2, nb_per_chunk, # Initialize parallel runs: outfile="" allow to output verbose traces in the console # under Linux. All necessary variables are passed to the workers. cl = parallel::makeCluster(ncores_tasks, outfile="") - varlist = c("getSeries","getContribs","K1","K2","algo_clust1","algo_clust2", - "nb_per_chunk","nb_items_clust","ncores_clust","sep","nbytes","endian", - "verbose","parll") + varlist = c("getSeries","getContribs","K1","K2","algoClust1","algoClust2", + "nb_series_per_chunk","nb_items_clust1","ncores_clust","sep", + "nbytes","endian","verbose","parll") if (WER=="mix") varlist = c(varlist, "medoids_file") parallel::clusterExport(cl, varlist, envir = environment()) @@ -254,14 +254,14 @@ claws <- function(getSeries, K1, K2, nb_per_chunk, if (parll && ntasks>1) require("epclust", quietly=TRUE) indices_medoids = clusteringTask1( - inds, getContribs, K1, nb_series_per_chunk, ncores_clust, verbose, parll) + inds, getContribs, K1, algoClust1, nb_series_per_chunk, ncores_clust, verbose, parll) if (WER=="mix") { if (parll && ntasks>1) require("bigmemory", quietly=TRUE) medoids1 = bigmemory::as.big.matrix( getSeries(indices_medoids) ) - medoids2 = clusteringTask2(medoids1, K2, getSeries, nb_curves, nb_series_per_chunk, - nbytes, endian, ncores_clust, verbose, parll) + medoids2 = clusteringTask2(medoids1, K2, algoClust2, getSeries, nb_curves, + nb_series_per_chunk, sync_mean, nbytes, endian, ncores_clust, verbose, parll) binarize(medoids2, medoids_file, nb_series_per_chunk, sep, nbytes, endian) return (vector("integer",0)) } @@ -304,26 +304,29 @@ claws <- function(getSeries, K1, K2, nb_per_chunk, if (WER=="mix") { indices = seq_len(ntasks*K2) - # Now series must be retrieved from synchrones_file - getSeries = function(inds) getDataInFile(inds, synchrones_file, nbytes, endian) + # Now series (synchrones) must be retrieved from medoids_file + getSeries = function(inds) getDataInFile(inds, medoids_file, nbytes, endian) # Contributions must be re-computed unlink(contribs_file) index = 1 if (verbose) cat("...Serialize contributions computed on synchrones\n") ignored = binarizeTransform(getSeries, - function(series) curvesToContribs(series, wf, ctype), + function(series) curvesToContribs(series, wav_filt, contrib_type), contribs_file, nb_series_per_chunk, nbytes, endian) } +#TODO: check THAT + + # Run step2 on resulting indices or series (from file) if (verbose) cat("...Run final // stage 1 + stage 2\n") - indices_medoids = clusteringTask1( - indices, getContribs, K1, nb_series_per_chunk, ncores_tasks*ncores_clust, verbose, parll) + indices_medoids = clusteringTask1(indices, getContribs, K1, algoClust1, + nb_series_per_chunk, ncores_tasks*ncores_clust, verbose, parll) medoids1 = bigmemory::as.big.matrix( getSeries(indices_medoids) ) - medoids2 = clusteringTask2(medoids1, K2, getRefSeries, nb_curves, nb_series_per_chunk, - nbytes, endian, ncores_tasks*ncores_clust, verbose, parll) + medoids2 = clusteringTask2(medoids1, K2, algoClust2, getRefSeries, nb_curves, + nb_series_per_chunk, sync_mean, nbytes, endian, ncores_tasks*ncores_clust, verbose, parll) # Cleanup: remove temporary binary files and their folder unlink(bin_dir, recursive=TRUE) @@ -344,14 +347,15 @@ claws <- function(getSeries, K1, K2, nb_per_chunk, #' @return A [big.]matrix of size log(L) x n containing contributions in columns #' #' @export -curvesToContribs = function(series, wav_filt, contrib_type) +curvesToContribs = function(series, wav_filt, contrib_type, coin=FALSE) { + series = as.matrix(series) #1D serie could occur L = nrow(series) D = ceiling( log2(L) ) nb_sample_points = 2^D apply(series, 2, function(x) { interpolated_curve = spline(1:L, x, n=nb_sample_points)$y - W = wavelets::dwt(interpolated_curve, filter=wf, D)@W + W = wavelets::dwt(interpolated_curve, filter=wav_filt, D)@W nrj = rev( sapply( W, function(v) ( sqrt( sum(v^2) ) ) ) ) if (contrib_type!="absolute") nrj = nrj / sum(nrj)