X-Git-Url: https://git.auder.net/?a=blobdiff_plain;f=epclust%2FR%2FcomputeSynchrones.R;fp=epclust%2FR%2FcomputeSynchrones.R;h=f73e64ef29e2f20435bc0c16f10dca8ca7459212;hb=40f12a2f66d06fd77183ea02b996f5c66f90761c;hp=0000000000000000000000000000000000000000;hpb=a52836b23adb4bfa6722642ec6426fb7b5f39650;p=epclust.git diff --git a/epclust/R/computeSynchrones.R b/epclust/R/computeSynchrones.R new file mode 100644 index 0000000..f73e64e --- /dev/null +++ b/epclust/R/computeSynchrones.R @@ -0,0 +1,89 @@ +#' computeSynchrones +#' +#' Compute the synchrones curves (sum of clusters elements) from a matrix of medoids, +#' using euclidian distance. +#' +#' @param medoids matrix of medoids in columns (curves of same length as the series) +#' @param getSeries Function to retrieve series (argument: 'indices', integer vector) +#' @param nb_curves How many series? (this is known, at this stage) +#' @inheritParams claws +#' +#' @return A matrix of K synchrones in columns (same length as the series) +#' +#' @export +computeSynchrones = function(medoids, getSeries, nb_curves, + nb_series_per_chunk, ncores_clust=1,verbose=FALSE,parll=TRUE) +{ + # Synchrones computation is embarassingly parallel: compute it by chunks of series + computeSynchronesChunk = function(indices) + { + if (parll) + { + require("bigmemory", quietly=TRUE) + requireNamespace("synchronicity", quietly=TRUE) + require("epclust", quietly=TRUE) + # The big.matrix objects need to be attached to be usable on the workers + synchrones <- bigmemory::attach.big.matrix(synchrones_desc) + medoids <- bigmemory::attach.big.matrix(medoids_desc) + m <- synchronicity::attach.mutex(m_desc) + } + + # Obtain a chunk of reference series + series_chunk = getSeries(indices) + nb_series_chunk = ncol(series_chunk) + + # Get medoids indices for this chunk of series + for (i in seq_len(nb_series_chunk)) + mi[i] <- which.min( colSums( sweep(medoids, 1, series_chunk[,i], '-')^2 ) ) + + # Update synchrones using mi above, grouping it by values of mi (in 1...K) + # to avoid too many lock/unlock + for (i in seq_len(K)) + { + # lock / unlock required because several writes at the same time + if (parll) + synchronicity::lock(m) + synchrones[,i] = synchrones[,i] + rowSums(series_chunk[,mi==i]) + if (parll) + synchronicity::unlock(m) + } + NULL + } + + K = ncol(medoids) + L = nrow(medoids) + # Use bigmemory (shared==TRUE by default) + synchronicity to fill synchrones in // + synchrones = bigmemory::big.matrix(nrow=L, ncol=K, type="double", init=0.) + # NOTE: synchronicity is only for Linux & MacOS; on Windows: run sequentially + parll = (parll && requireNamespace("synchronicity",quietly=TRUE) + && Sys.info()['sysname'] != "Windows") + if (parll) + { + m <- synchronicity::boost.mutex() #for lock/unlock, see computeSynchronesChunk + # mutex and big.matrix objects cannot be passed directly: + # they will be accessed from their description + m_desc <- synchronicity::describe(m) + synchrones_desc = bigmemory::describe(synchrones) + medoids <- bigmemory::as.big.matrix(medoids) + medoids_desc <- bigmemory::describe(medoids) + cl = parallel::makeCluster(ncores_clust) + parallel::clusterExport(cl, envir=environment(), + varlist=c("synchrones_desc","m_desc","medoids_desc","getRefSeries")) + } + + if (verbose) + cat(paste("--- Compute ",K," synchrones with ",nb_curves," series\n", sep="")) + + # Balance tasks by splitting 1:nb_ref_curves into groups of size <= nb_series_per_chunk + indices_workers = .splitIndices(seq_len(nb_ref_curves), nb_series_per_chunk) + ignored <- + if (parll) + parallel::parLapply(cl, indices_workers, computeSynchronesChunk) + else + lapply(indices_workers, computeSynchronesChunk) + + if (parll) + parallel::stopCluster(cl) + + return (synchrones[,]) +}