projects
/
epclust.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
drop enercast submodule; drop Rcpp requirement; fix doc, complete code, fix fix fix
[epclust.git]
/
epclust
/
R
/
computeSynchrones.R
diff --git
a/epclust/R/computeSynchrones.R
b/epclust/R/computeSynchrones.R
index
f73e64e
..
16bf0b4
100644
(file)
--- a/
epclust/R/computeSynchrones.R
+++ b/
epclust/R/computeSynchrones.R
@@
-11,11
+11,11
@@
#' @return A matrix of K synchrones in columns (same length as the series)
#'
#' @export
#' @return A matrix of K synchrones in columns (same length as the series)
#'
#' @export
-computeSynchrones
=
function(medoids, getSeries, nb_curves,
+computeSynchrones
<-
function(medoids, getSeries, nb_curves,
nb_series_per_chunk, ncores_clust=1,verbose=FALSE,parll=TRUE)
{
# Synchrones computation is embarassingly parallel: compute it by chunks of series
nb_series_per_chunk, ncores_clust=1,verbose=FALSE,parll=TRUE)
{
# Synchrones computation is embarassingly parallel: compute it by chunks of series
- computeSynchronesChunk
=
function(indices)
+ computeSynchronesChunk
<-
function(indices)
{
if (parll)
{
{
if (parll)
{
@@
-29,12
+29,11
@@
computeSynchrones = function(medoids, getSeries, nb_curves,
}
# Obtain a chunk of reference series
}
# Obtain a chunk of reference series
- series_chunk
=
getSeries(indices)
- nb_series_chunk
=
ncol(series_chunk)
+ series_chunk
<-
getSeries(indices)
+ nb_series_chunk
<-
ncol(series_chunk)
# Get medoids indices for this chunk of series
# Get medoids indices for this chunk of series
- for (i in seq_len(nb_series_chunk))
- mi[i] <- which.min( colSums( sweep(medoids, 1, series_chunk[,i], '-')^2 ) )
+ mi <- assignMedoids(series_chunk, medoids[,])
# Update synchrones using mi above, grouping it by values of mi (in 1...K)
# to avoid too many lock/unlock
# Update synchrones using mi above, grouping it by values of mi (in 1...K)
# to avoid too many lock/unlock
@@
-43,19
+42,19
@@
computeSynchrones = function(medoids, getSeries, nb_curves,
# lock / unlock required because several writes at the same time
if (parll)
synchronicity::lock(m)
# lock / unlock required because several writes at the same time
if (parll)
synchronicity::lock(m)
- synchrones[,i]
= synchrones[,i] + rowSums(series_chunk[,mi==i]
)
+ synchrones[,i]
<- synchrones[,i] + rowSums(as.matrix(series_chunk[,mi==i])
)
if (parll)
synchronicity::unlock(m)
}
NULL
}
if (parll)
synchronicity::unlock(m)
}
NULL
}
- K
=
ncol(medoids)
- L
=
nrow(medoids)
+ K
<-
ncol(medoids)
+ L
<-
nrow(medoids)
# Use bigmemory (shared==TRUE by default) + synchronicity to fill synchrones in //
# Use bigmemory (shared==TRUE by default) + synchronicity to fill synchrones in //
- synchrones
=
bigmemory::big.matrix(nrow=L, ncol=K, type="double", init=0.)
+ synchrones
<-
bigmemory::big.matrix(nrow=L, ncol=K, type="double", init=0.)
# NOTE: synchronicity is only for Linux & MacOS; on Windows: run sequentially
# NOTE: synchronicity is only for Linux & MacOS; on Windows: run sequentially
- parll
=
(parll && requireNamespace("synchronicity",quietly=TRUE)
+ parll
<-
(parll && requireNamespace("synchronicity",quietly=TRUE)
&& Sys.info()['sysname'] != "Windows")
if (parll)
{
&& Sys.info()['sysname'] != "Windows")
if (parll)
{
@@
-63,19
+62,20
@@
computeSynchrones = function(medoids, getSeries, nb_curves,
# mutex and big.matrix objects cannot be passed directly:
# they will be accessed from their description
m_desc <- synchronicity::describe(m)
# mutex and big.matrix objects cannot be passed directly:
# they will be accessed from their description
m_desc <- synchronicity::describe(m)
- synchrones_desc
=
bigmemory::describe(synchrones)
+ synchrones_desc
<-
bigmemory::describe(synchrones)
medoids <- bigmemory::as.big.matrix(medoids)
medoids_desc <- bigmemory::describe(medoids)
medoids <- bigmemory::as.big.matrix(medoids)
medoids_desc <- bigmemory::describe(medoids)
- cl = parallel::makeCluster(ncores_clust)
+ # outfile=="" to see stderr/stdout on terminal
+ cl <- parallel::makeCluster(ncores_clust, outfile="")
parallel::clusterExport(cl, envir=environment(),
parallel::clusterExport(cl, envir=environment(),
- varlist=c("synchrones_desc","m_desc","medoids_desc","get
Ref
Series"))
+ varlist=c("synchrones_desc","m_desc","medoids_desc","getSeries"))
}
if (verbose)
cat(paste("--- Compute ",K," synchrones with ",nb_curves," series\n", sep=""))
}
if (verbose)
cat(paste("--- Compute ",K," synchrones with ",nb_curves," series\n", sep=""))
- # Balance tasks by splitting 1:nb_
ref_
curves into groups of size <= nb_series_per_chunk
- indices_workers
= .splitIndices(seq_len(nb_ref
_curves), nb_series_per_chunk)
+ # Balance tasks by splitting 1:nb_curves into groups of size <= nb_series_per_chunk
+ indices_workers
<- .splitIndices(seq_len(nb
_curves), nb_series_per_chunk)
ignored <-
if (parll)
parallel::parLapply(cl, indices_workers, computeSynchronesChunk)
ignored <-
if (parll)
parallel::parLapply(cl, indices_workers, computeSynchronesChunk)