Commit | Line | Data |
---|---|---|
40f12a2f BA |
1 | #' computeSynchrones |
2 | #' | |
3 | #' Compute the synchrones curves (sum of clusters elements) from a matrix of medoids, | |
4 | #' using euclidian distance. | |
5 | #' | |
6 | #' @param medoids matrix of medoids in columns (curves of same length as the series) | |
7 | #' @param getSeries Function to retrieve series (argument: 'indices', integer vector) | |
8 | #' @param nb_curves How many series? (this is known, at this stage) | |
9 | #' @inheritParams claws | |
10 | #' | |
11 | #' @return A matrix of K synchrones in columns (same length as the series) | |
12 | #' | |
13 | #' @export | |
14 | computeSynchrones = function(medoids, getSeries, nb_curves, | |
15 | nb_series_per_chunk, ncores_clust=1,verbose=FALSE,parll=TRUE) | |
16 | { | |
17 | # Synchrones computation is embarassingly parallel: compute it by chunks of series | |
18 | computeSynchronesChunk = function(indices) | |
19 | { | |
20 | if (parll) | |
21 | { | |
22 | require("bigmemory", quietly=TRUE) | |
23 | requireNamespace("synchronicity", quietly=TRUE) | |
24 | require("epclust", quietly=TRUE) | |
25 | # The big.matrix objects need to be attached to be usable on the workers | |
26 | synchrones <- bigmemory::attach.big.matrix(synchrones_desc) | |
27 | medoids <- bigmemory::attach.big.matrix(medoids_desc) | |
28 | m <- synchronicity::attach.mutex(m_desc) | |
29 | } | |
30 | ||
31 | # Obtain a chunk of reference series | |
32 | series_chunk = getSeries(indices) | |
33 | nb_series_chunk = ncol(series_chunk) | |
34 | ||
35 | # Get medoids indices for this chunk of series | |
36 | for (i in seq_len(nb_series_chunk)) | |
37 | mi[i] <- which.min( colSums( sweep(medoids, 1, series_chunk[,i], '-')^2 ) ) | |
38 | ||
39 | # Update synchrones using mi above, grouping it by values of mi (in 1...K) | |
40 | # to avoid too many lock/unlock | |
41 | for (i in seq_len(K)) | |
42 | { | |
43 | # lock / unlock required because several writes at the same time | |
44 | if (parll) | |
45 | synchronicity::lock(m) | |
46 | synchrones[,i] = synchrones[,i] + rowSums(series_chunk[,mi==i]) | |
47 | if (parll) | |
48 | synchronicity::unlock(m) | |
49 | } | |
50 | NULL | |
51 | } | |
52 | ||
53 | K = ncol(medoids) | |
54 | L = nrow(medoids) | |
55 | # Use bigmemory (shared==TRUE by default) + synchronicity to fill synchrones in // | |
56 | synchrones = bigmemory::big.matrix(nrow=L, ncol=K, type="double", init=0.) | |
57 | # NOTE: synchronicity is only for Linux & MacOS; on Windows: run sequentially | |
58 | parll = (parll && requireNamespace("synchronicity",quietly=TRUE) | |
59 | && Sys.info()['sysname'] != "Windows") | |
60 | if (parll) | |
61 | { | |
62 | m <- synchronicity::boost.mutex() #for lock/unlock, see computeSynchronesChunk | |
63 | # mutex and big.matrix objects cannot be passed directly: | |
64 | # they will be accessed from their description | |
65 | m_desc <- synchronicity::describe(m) | |
66 | synchrones_desc = bigmemory::describe(synchrones) | |
67 | medoids <- bigmemory::as.big.matrix(medoids) | |
68 | medoids_desc <- bigmemory::describe(medoids) | |
69 | cl = parallel::makeCluster(ncores_clust) | |
70 | parallel::clusterExport(cl, envir=environment(), | |
3c5a4b08 | 71 | varlist=c("synchrones_desc","m_desc","medoids_desc","getSeries")) |
40f12a2f BA |
72 | } |
73 | ||
74 | if (verbose) | |
75 | cat(paste("--- Compute ",K," synchrones with ",nb_curves," series\n", sep="")) | |
76 | ||
3c5a4b08 BA |
77 | # Balance tasks by splitting 1:nb_curves into groups of size <= nb_series_per_chunk |
78 | indices_workers = .splitIndices(seq_len(nb_curves), nb_series_per_chunk) | |
40f12a2f BA |
79 | ignored <- |
80 | if (parll) | |
81 | parallel::parLapply(cl, indices_workers, computeSynchronesChunk) | |
82 | else | |
83 | lapply(indices_workers, computeSynchronesChunk) | |
84 | ||
85 | if (parll) | |
86 | parallel::stopCluster(cl) | |
87 | ||
88 | return (synchrones[,]) | |
89 | } |