Remove parll arg (redundant with ncores_XX)
[epclust.git] / epclust / R / computeSynchrones.R
CommitLineData
40f12a2f
BA
1#' computeSynchrones
2#'
3fb6e823 3#' Compute the synchrones curves (sums of clusters elements) from a matrix of medoids,
40f12a2f
BA
4#' using euclidian distance.
5#'
3fb6e823 6#' @param medoids matrix of K medoids curves in columns
40f12a2f
BA
7#' @param nb_curves How many series? (this is known, at this stage)
8#' @inheritParams claws
dc86eb0c 9#' @inheritParams computeWerDists
40f12a2f
BA
10#'
11#' @return A matrix of K synchrones in columns (same length as the series)
12#'
13#' @export
282342ba 14computeSynchrones <- function(medoids, getSeries, nb_curves,
074a48c4 15 nb_series_per_chunk, ncores=3, verbose=FALSE)
40f12a2f
BA
16{
17 # Synchrones computation is embarassingly parallel: compute it by chunks of series
282342ba 18 computeSynchronesChunk <- function(indices)
40f12a2f 19 {
40f12a2f 20 # Obtain a chunk of reference series
282342ba
BA
21 series_chunk <- getSeries(indices)
22 nb_series_chunk <- ncol(series_chunk)
40f12a2f
BA
23
24 # Get medoids indices for this chunk of series
282342ba 25 mi <- assignMedoids(series_chunk, medoids[,])
40f12a2f
BA
26
27 # Update synchrones using mi above, grouping it by values of mi (in 1...K)
28 # to avoid too many lock/unlock
29 for (i in seq_len(K))
30 {
31 # lock / unlock required because several writes at the same time
32 if (parll)
33 synchronicity::lock(m)
282342ba 34 synchrones[,i] <- synchrones[,i] + rowSums(as.matrix(series_chunk[,mi==i]))
40f12a2f
BA
35 if (parll)
36 synchronicity::unlock(m)
37 }
38 NULL
39 }
40
282342ba
BA
41 K <- ncol(medoids)
42 L <- nrow(medoids)
40f12a2f 43 # Use bigmemory (shared==TRUE by default) + synchronicity to fill synchrones in //
282342ba 44 synchrones <- bigmemory::big.matrix(nrow=L, ncol=K, type="double", init=0.)
40f12a2f 45 # NOTE: synchronicity is only for Linux & MacOS; on Windows: run sequentially
074a48c4 46 parll <- (ncores > 1 && requireNamespace("synchronicity",quietly=TRUE)
40f12a2f 47 && Sys.info()['sysname'] != "Windows")
dc86eb0c 48
40f12a2f 49 if (parll)
40f12a2f 50 m <- synchronicity::boost.mutex() #for lock/unlock, see computeSynchronesChunk
40f12a2f
BA
51
52 if (verbose)
53 cat(paste("--- Compute ",K," synchrones with ",nb_curves," series\n", sep=""))
54
3c5a4b08 55 # Balance tasks by splitting 1:nb_curves into groups of size <= nb_series_per_chunk
282342ba 56 indices_workers <- .splitIndices(seq_len(nb_curves), nb_series_per_chunk)
40f12a2f
BA
57 ignored <-
58 if (parll)
dc86eb0c
BA
59 {
60 parallel::mclapply(indices_workers,
61 function(inds) computeSynchronesChunk(inds), mc.cores=ncores)
62 }
40f12a2f
BA
63 else
64 lapply(indices_workers, computeSynchronesChunk)
65
40f12a2f
BA
66 return (synchrones[,])
67}