Commit | Line | Data |
---|---|---|
40f12a2f BA |
1 | #' computeSynchrones |
2 | #' | |
3 | #' Compute the synchrones curves (sum of clusters elements) from a matrix of medoids, | |
4 | #' using euclidian distance. | |
5 | #' | |
6 | #' @param medoids matrix of medoids in columns (curves of same length as the series) | |
7 | #' @param getSeries Function to retrieve series (argument: 'indices', integer vector) | |
8 | #' @param nb_curves How many series? (this is known, at this stage) | |
9 | #' @inheritParams claws | |
10 | #' | |
11 | #' @return A matrix of K synchrones in columns (same length as the series) | |
12 | #' | |
13 | #' @export | |
282342ba | 14 | computeSynchrones <- function(medoids, getSeries, nb_curves, |
40f12a2f BA |
15 | nb_series_per_chunk, ncores_clust=1,verbose=FALSE,parll=TRUE) |
16 | { | |
17 | # Synchrones computation is embarassingly parallel: compute it by chunks of series | |
282342ba | 18 | computeSynchronesChunk <- function(indices) |
40f12a2f BA |
19 | { |
20 | if (parll) | |
21 | { | |
22 | require("bigmemory", quietly=TRUE) | |
23 | requireNamespace("synchronicity", quietly=TRUE) | |
24 | require("epclust", quietly=TRUE) | |
25 | # The big.matrix objects need to be attached to be usable on the workers | |
26 | synchrones <- bigmemory::attach.big.matrix(synchrones_desc) | |
27 | medoids <- bigmemory::attach.big.matrix(medoids_desc) | |
28 | m <- synchronicity::attach.mutex(m_desc) | |
29 | } | |
30 | ||
31 | # Obtain a chunk of reference series | |
282342ba BA |
32 | series_chunk <- getSeries(indices) |
33 | nb_series_chunk <- ncol(series_chunk) | |
40f12a2f BA |
34 | |
35 | # Get medoids indices for this chunk of series | |
282342ba | 36 | mi <- assignMedoids(series_chunk, medoids[,]) |
40f12a2f BA |
37 | |
38 | # Update synchrones using mi above, grouping it by values of mi (in 1...K) | |
39 | # to avoid too many lock/unlock | |
40 | for (i in seq_len(K)) | |
41 | { | |
42 | # lock / unlock required because several writes at the same time | |
43 | if (parll) | |
44 | synchronicity::lock(m) | |
282342ba | 45 | synchrones[,i] <- synchrones[,i] + rowSums(as.matrix(series_chunk[,mi==i])) |
40f12a2f BA |
46 | if (parll) |
47 | synchronicity::unlock(m) | |
48 | } | |
49 | NULL | |
50 | } | |
51 | ||
282342ba BA |
52 | K <- ncol(medoids) |
53 | L <- nrow(medoids) | |
40f12a2f | 54 | # Use bigmemory (shared==TRUE by default) + synchronicity to fill synchrones in // |
282342ba | 55 | synchrones <- bigmemory::big.matrix(nrow=L, ncol=K, type="double", init=0.) |
40f12a2f | 56 | # NOTE: synchronicity is only for Linux & MacOS; on Windows: run sequentially |
282342ba | 57 | parll <- (parll && requireNamespace("synchronicity",quietly=TRUE) |
40f12a2f BA |
58 | && Sys.info()['sysname'] != "Windows") |
59 | if (parll) | |
60 | { | |
61 | m <- synchronicity::boost.mutex() #for lock/unlock, see computeSynchronesChunk | |
62 | # mutex and big.matrix objects cannot be passed directly: | |
63 | # they will be accessed from their description | |
64 | m_desc <- synchronicity::describe(m) | |
282342ba | 65 | synchrones_desc <- bigmemory::describe(synchrones) |
40f12a2f BA |
66 | medoids <- bigmemory::as.big.matrix(medoids) |
67 | medoids_desc <- bigmemory::describe(medoids) | |
282342ba BA |
68 | # outfile=="" to see stderr/stdout on terminal |
69 | cl <- parallel::makeCluster(ncores_clust, outfile="") | |
40f12a2f | 70 | parallel::clusterExport(cl, envir=environment(), |
3c5a4b08 | 71 | varlist=c("synchrones_desc","m_desc","medoids_desc","getSeries")) |
40f12a2f BA |
72 | } |
73 | ||
74 | if (verbose) | |
75 | cat(paste("--- Compute ",K," synchrones with ",nb_curves," series\n", sep="")) | |
76 | ||
3c5a4b08 | 77 | # Balance tasks by splitting 1:nb_curves into groups of size <= nb_series_per_chunk |
282342ba | 78 | indices_workers <- .splitIndices(seq_len(nb_curves), nb_series_per_chunk) |
40f12a2f BA |
79 | ignored <- |
80 | if (parll) | |
81 | parallel::parLapply(cl, indices_workers, computeSynchronesChunk) | |
82 | else | |
83 | lapply(indices_workers, computeSynchronesChunk) | |
84 | ||
85 | if (parll) | |
86 | parallel::stopCluster(cl) | |
87 | ||
88 | return (synchrones[,]) | |
89 | } |