From c45fd66342e40c8b5387fc6f0059c4d3a9718340 Mon Sep 17 00:00:00 2001 From: Benjamin Auder Date: Wed, 8 Mar 2017 03:12:36 +0100 Subject: [PATCH] parallel version running; TODO: check==sequential, plotting routines, parser; check memory --- epclust/R/clustering.R | 31 ++++++++++++++----------------- epclust/R/main.R | 37 ++++++++++++++++++++----------------- epclust/R/plot.R | 3 +++ 3 files changed, 37 insertions(+), 34 deletions(-) create mode 100644 epclust/R/plot.R diff --git a/epclust/R/clustering.R b/epclust/R/clustering.R index 14e1f83..9a55495 100644 --- a/epclust/R/clustering.R +++ b/epclust/R/clustering.R @@ -93,6 +93,8 @@ computeSynchrones = function(medoids, getRefSeries, { computeSynchronesChunk = function(indices) { + if (verbose) + cat(paste("--- Compute synchrones for ",length(indices)," lines\n", sep="")) ref_series = getRefSeries(indices) #get medoids indices for this chunk of series for (i in seq_len(nrow(ref_series))) @@ -118,17 +120,14 @@ computeSynchrones = function(medoids, getRefSeries, m <- synchronicity::boost.mutex() indices_workers = .spreadIndices(seq_len(nb_ref_curves), nb_series_per_chunk) - for (inds in indices_workers) - { - if (verbose) - cat(paste("--- Compute synchrones for ",length(inds)," lines\n", sep="")) + ignored <- if (parll) - ignored <- parallel::mcparallel(computeSynchronesChunk(inds)) + { + parallel::mclapply(indices_workers, computeSynchronesChunk, + mc.cores=ncores_clust, mc.allow.recursive=FALSE) + } else - computeSynchronesChunk(inds) - } - if (parll) - parallel::mccollect() + lapply(indices_workers, computeSynchronesChunk) mat_syncs = matrix(nrow=K, ncol=ncol(medoids)) vec_count = rep(NA, K) @@ -234,18 +233,16 @@ computeWerDists = function(synchrones, ncores_clust=1,verbose=FALSE,parll=TRUE) if (parll) m <- synchronicity::boost.mutex() - for (i in 1:(n-1)) - { + ignored <- if (parll) - ignored <- parallel::mcparallel(computeDistancesLineI(i)) + { + parallel::mclapply(seq_len(n-1), computeDistancesLineI, + mc.cores=ncores_clust, mc.allow.recursive=FALSE) + } else - computeDistancesLineI(i) - } + lapply(seq_len(n-1), computeDistancesLineI) Xwer_dist[n,n] = 0. - if (parll) - parallel::mccollect() - mat_dists = matrix(nrow=n, ncol=n) #TODO: avoid this loop? for (i in 1:n) diff --git a/epclust/R/main.R b/epclust/R/main.R index bf31c87..bba0618 100644 --- a/epclust/R/main.R +++ b/epclust/R/main.R @@ -161,22 +161,6 @@ claws = function(getSeries, K1, K2, if (nb_series_per_task < min_series_per_chunk) stop("Too many tasks: less series in one task than min_series_per_chunk!") - # Cluster contributions in parallel (by nb_series_per_chunk) - indices_all = if (random) sample(nb_curves) else seq_len(nb_curves) - indices_tasks = lapply(seq_len(ntasks), function(i) { - upper_bound = ifelse( i series on file if (parll) indices = unlist( parallel::parLapply(cl, indices_tasks, runTwoStepClustering) ) @@ -202,7 +206,6 @@ claws = function(getSeries, K1, K2, parallel::stopCluster(cl) getRefSeries = getSeries - synchrones_file = paste(bin_dir,"synchrones",sep="") ; unlink(synchrones_file) if (WER=="mix") { indices = seq_len(ntasks*K2) diff --git a/epclust/R/plot.R b/epclust/R/plot.R new file mode 100644 index 0000000..096159d --- /dev/null +++ b/epclust/R/plot.R @@ -0,0 +1,3 @@ +#TODO: some visualization +#for (i in 1:6) {plot(medoids_ascii[i,1:200],type="l",ylim=c(-5,5),col=i);par(new=TRUE)} +#... -- 2.44.0