#' computeSynchrones
#'
-#' Compute the synchrones curves (sum of clusters elements) from a matrix of medoids,
+#' Compute the synchrones curves (sums of clusters elements) from a matrix of medoids,
#' using euclidian distance.
#'
-#' @param medoids matrix of medoids in columns (curves of same length as the series)
-#' @param getSeries Function to retrieve series (argument: 'indices', integer vector)
+#' @param medoids matrix of K medoids curves in columns
#' @param nb_curves How many series? (this is known, at this stage)
#' @inheritParams claws
+#' @inheritParams computeWerDists
#'
#' @return A matrix of K synchrones in columns (same length as the series)
#'
#' @export
computeSynchrones <- function(medoids, getSeries, nb_curves,
- nb_series_per_chunk, ncores_clust=1,verbose=FALSE,parll=TRUE)
+ nb_series_per_chunk, ncores=3, verbose=FALSE)
{
# Synchrones computation is embarassingly parallel: compute it by chunks of series
computeSynchronesChunk <- function(indices)
{
- if (parll)
- {
- require("bigmemory", quietly=TRUE)
- requireNamespace("synchronicity", quietly=TRUE)
- require("epclust", quietly=TRUE)
- # The big.matrix objects need to be attached to be usable on the workers
- synchrones <- bigmemory::attach.big.matrix(synchrones_desc)
- medoids <- bigmemory::attach.big.matrix(medoids_desc)
- m <- synchronicity::attach.mutex(m_desc)
- }
-
# Obtain a chunk of reference series
series_chunk <- getSeries(indices)
nb_series_chunk <- ncol(series_chunk)
# Use bigmemory (shared==TRUE by default) + synchronicity to fill synchrones in //
synchrones <- bigmemory::big.matrix(nrow=L, ncol=K, type="double", init=0.)
# NOTE: synchronicity is only for Linux & MacOS; on Windows: run sequentially
- parll <- (parll && requireNamespace("synchronicity",quietly=TRUE)
+ parll <- (ncores > 1 && requireNamespace("synchronicity",quietly=TRUE)
&& Sys.info()['sysname'] != "Windows")
+
if (parll)
- {
m <- synchronicity::boost.mutex() #for lock/unlock, see computeSynchronesChunk
- # mutex and big.matrix objects cannot be passed directly:
- # they will be accessed from their description
- m_desc <- synchronicity::describe(m)
- synchrones_desc <- bigmemory::describe(synchrones)
- medoids <- bigmemory::as.big.matrix(medoids)
- medoids_desc <- bigmemory::describe(medoids)
- # outfile=="" to see stderr/stdout on terminal
- cl <- parallel::makeCluster(ncores_clust, outfile="")
- parallel::clusterExport(cl, envir=environment(),
- varlist=c("synchrones_desc","m_desc","medoids_desc","getSeries"))
- }
if (verbose)
cat(paste("--- Compute ",K," synchrones with ",nb_curves," series\n", sep=""))
indices_workers <- .splitIndices(seq_len(nb_curves), nb_series_per_chunk)
ignored <-
if (parll)
- parallel::parLapply(cl, indices_workers, computeSynchronesChunk)
+ {
+ parallel::mclapply(indices_workers,
+ function(inds) computeSynchronesChunk(inds), mc.cores=ncores)
+ }
else
lapply(indices_workers, computeSynchronesChunk)
- if (parll)
- parallel::stopCluster(cl)
-
return (synchrones[,])
}