Commit | Line | Data |
---|---|---|
40f12a2f BA |
1 | #' computeSynchrones |
2 | #' | |
3fb6e823 | 3 | #' Compute the synchrones curves (sums of clusters elements) from a matrix of medoids, |
40f12a2f BA |
4 | #' using euclidian distance. |
5 | #' | |
3fb6e823 BA |
6 | #' @param medoids matrix of K medoids curves in columns |
7 | #' @param getSeries Function to retrieve series (argument: 'indices', integer vector), | |
8 | #' as columns of a matrix | |
40f12a2f BA |
9 | #' @param nb_curves How many series? (this is known, at this stage) |
10 | #' @inheritParams claws | |
11 | #' | |
12 | #' @return A matrix of K synchrones in columns (same length as the series) | |
13 | #' | |
14 | #' @export | |
282342ba | 15 | computeSynchrones <- function(medoids, getSeries, nb_curves, |
3fb6e823 | 16 | nb_series_per_chunk, ncores_clust=3, verbose=FALSE, parll=TRUE) |
40f12a2f BA |
17 | { |
18 | # Synchrones computation is embarassingly parallel: compute it by chunks of series | |
282342ba | 19 | computeSynchronesChunk <- function(indices) |
40f12a2f BA |
20 | { |
21 | if (parll) | |
22 | { | |
40f12a2f | 23 | require("epclust", quietly=TRUE) |
3fb6e823 | 24 | requireNamespace("synchronicity", quietly=TRUE) |
40f12a2f BA |
25 | # The big.matrix objects need to be attached to be usable on the workers |
26 | synchrones <- bigmemory::attach.big.matrix(synchrones_desc) | |
27 | medoids <- bigmemory::attach.big.matrix(medoids_desc) | |
28 | m <- synchronicity::attach.mutex(m_desc) | |
29 | } | |
30 | ||
31 | # Obtain a chunk of reference series | |
282342ba BA |
32 | series_chunk <- getSeries(indices) |
33 | nb_series_chunk <- ncol(series_chunk) | |
40f12a2f BA |
34 | |
35 | # Get medoids indices for this chunk of series | |
282342ba | 36 | mi <- assignMedoids(series_chunk, medoids[,]) |
40f12a2f BA |
37 | |
38 | # Update synchrones using mi above, grouping it by values of mi (in 1...K) | |
39 | # to avoid too many lock/unlock | |
40 | for (i in seq_len(K)) | |
41 | { | |
42 | # lock / unlock required because several writes at the same time | |
43 | if (parll) | |
44 | synchronicity::lock(m) | |
282342ba | 45 | synchrones[,i] <- synchrones[,i] + rowSums(as.matrix(series_chunk[,mi==i])) |
40f12a2f BA |
46 | if (parll) |
47 | synchronicity::unlock(m) | |
48 | } | |
49 | NULL | |
50 | } | |
51 | ||
282342ba BA |
52 | K <- ncol(medoids) |
53 | L <- nrow(medoids) | |
40f12a2f | 54 | # Use bigmemory (shared==TRUE by default) + synchronicity to fill synchrones in // |
282342ba | 55 | synchrones <- bigmemory::big.matrix(nrow=L, ncol=K, type="double", init=0.) |
40f12a2f | 56 | # NOTE: synchronicity is only for Linux & MacOS; on Windows: run sequentially |
282342ba | 57 | parll <- (parll && requireNamespace("synchronicity",quietly=TRUE) |
40f12a2f BA |
58 | && Sys.info()['sysname'] != "Windows") |
59 | if (parll) | |
60 | { | |
61 | m <- synchronicity::boost.mutex() #for lock/unlock, see computeSynchronesChunk | |
62 | # mutex and big.matrix objects cannot be passed directly: | |
63 | # they will be accessed from their description | |
64 | m_desc <- synchronicity::describe(m) | |
282342ba | 65 | synchrones_desc <- bigmemory::describe(synchrones) |
40f12a2f BA |
66 | medoids <- bigmemory::as.big.matrix(medoids) |
67 | medoids_desc <- bigmemory::describe(medoids) | |
282342ba | 68 | # outfile=="" to see stderr/stdout on terminal |
3fb6e823 BA |
69 | cl <- |
70 | if (verbose) | |
71 | parallel::makeCluster(ncores_clust, outfile="") | |
72 | else | |
73 | parallel::makeCluster(ncores_clust) | |
40f12a2f | 74 | parallel::clusterExport(cl, envir=environment(), |
3c5a4b08 | 75 | varlist=c("synchrones_desc","m_desc","medoids_desc","getSeries")) |
40f12a2f BA |
76 | } |
77 | ||
78 | if (verbose) | |
79 | cat(paste("--- Compute ",K," synchrones with ",nb_curves," series\n", sep="")) | |
80 | ||
3c5a4b08 | 81 | # Balance tasks by splitting 1:nb_curves into groups of size <= nb_series_per_chunk |
282342ba | 82 | indices_workers <- .splitIndices(seq_len(nb_curves), nb_series_per_chunk) |
40f12a2f BA |
83 | ignored <- |
84 | if (parll) | |
85 | parallel::parLapply(cl, indices_workers, computeSynchronesChunk) | |
86 | else | |
87 | lapply(indices_workers, computeSynchronesChunk) | |
88 | ||
89 | if (parll) | |
90 | parallel::stopCluster(cl) | |
91 | ||
92 | return (synchrones[,]) | |
93 | } |