From c45fd66342e40c8b5387fc6f0059c4d3a9718340 Mon Sep 17 00:00:00 2001 From: Benjamin Auder <benjamin.auder@somewhere> Date: Wed, 8 Mar 2017 03:12:36 +0100 Subject: [PATCH] parallel version running; TODO: check==sequential, plotting routines, parser; check memory --- epclust/R/clustering.R | 31 ++++++++++++++----------------- epclust/R/main.R | 37 ++++++++++++++++++++----------------- epclust/R/plot.R | 3 +++ 3 files changed, 37 insertions(+), 34 deletions(-) create mode 100644 epclust/R/plot.R diff --git a/epclust/R/clustering.R b/epclust/R/clustering.R index 14e1f83..9a55495 100644 --- a/epclust/R/clustering.R +++ b/epclust/R/clustering.R @@ -93,6 +93,8 @@ computeSynchrones = function(medoids, getRefSeries, { computeSynchronesChunk = function(indices) { + if (verbose) + cat(paste("--- Compute synchrones for ",length(indices)," lines\n", sep="")) ref_series = getRefSeries(indices) #get medoids indices for this chunk of series for (i in seq_len(nrow(ref_series))) @@ -118,17 +120,14 @@ computeSynchrones = function(medoids, getRefSeries, m <- synchronicity::boost.mutex() indices_workers = .spreadIndices(seq_len(nb_ref_curves), nb_series_per_chunk) - for (inds in indices_workers) - { - if (verbose) - cat(paste("--- Compute synchrones for ",length(inds)," lines\n", sep="")) + ignored <- if (parll) - ignored <- parallel::mcparallel(computeSynchronesChunk(inds)) + { + parallel::mclapply(indices_workers, computeSynchronesChunk, + mc.cores=ncores_clust, mc.allow.recursive=FALSE) + } else - computeSynchronesChunk(inds) - } - if (parll) - parallel::mccollect() + lapply(indices_workers, computeSynchronesChunk) mat_syncs = matrix(nrow=K, ncol=ncol(medoids)) vec_count = rep(NA, K) @@ -234,18 +233,16 @@ computeWerDists = function(synchrones, ncores_clust=1,verbose=FALSE,parll=TRUE) if (parll) m <- synchronicity::boost.mutex() - for (i in 1:(n-1)) - { + ignored <- if (parll) - ignored <- parallel::mcparallel(computeDistancesLineI(i)) + { + parallel::mclapply(seq_len(n-1), computeDistancesLineI, + mc.cores=ncores_clust, mc.allow.recursive=FALSE) + } else - computeDistancesLineI(i) - } + lapply(seq_len(n-1), computeDistancesLineI) Xwer_dist[n,n] = 0. - if (parll) - parallel::mccollect() - mat_dists = matrix(nrow=n, ncol=n) #TODO: avoid this loop? for (i in 1:n) diff --git a/epclust/R/main.R b/epclust/R/main.R index bf31c87..bba0618 100644 --- a/epclust/R/main.R +++ b/epclust/R/main.R @@ -161,22 +161,6 @@ claws = function(getSeries, K1, K2, if (nb_series_per_task < min_series_per_chunk) stop("Too many tasks: less series in one task than min_series_per_chunk!") - # Cluster contributions in parallel (by nb_series_per_chunk) - indices_all = if (random) sample(nb_curves) else seq_len(nb_curves) - indices_tasks = lapply(seq_len(ntasks), function(i) { - upper_bound = ifelse( i<ntasks, min(nb_series_per_task*i,nb_curves), nb_curves ) - indices_all[((i-1)*nb_series_per_task+1):upper_bound] - }) - if (verbose) - cat(paste("...Run ",ntasks," x stage 1 in parallel\n",sep="")) - if (parll) - { - cl = parallel::makeCluster(ncores_tasks) - parallel::clusterExport(cl, varlist=c("getSeries","getContribs","K1","K2","verbose","parll", - "nb_series_per_chunk","ncores_clust","synchrones_file","sep","nbytes","endian"), - envir = environment()) - } - runTwoStepClustering = function(inds) { if (parll) @@ -193,6 +177,26 @@ claws = function(getSeries, K1, K2, indices_medoids } + # Cluster contributions in parallel (by nb_series_per_chunk) + indices_all = if (random) sample(nb_curves) else seq_len(nb_curves) + indices_tasks = lapply(seq_len(ntasks), function(i) { + upper_bound = ifelse( i<ntasks, min(nb_series_per_task*i,nb_curves), nb_curves ) + indices_all[((i-1)*nb_series_per_task+1):upper_bound] + }) + if (verbose) + cat(paste("...Run ",ntasks," x stage 1 in parallel\n",sep="")) + if (WER=="mix") + {synchrones_file = paste(bin_dir,"synchrones",sep="") ; unlink(synchrones_file)} + if (parll) + { + cl = parallel::makeCluster(ncores_tasks) + varlist = c("getSeries","getContribs","K1","K2","verbose","parll", + "nb_series_per_chunk","ncores_clust","sep","nbytes","endian") + if (WER=="mix") + varlist = c(varlist, "synchrones_file") + parallel::clusterExport(cl, varlist=varlist, envir = environment()) + } + # 1000*K1 indices [if WER=="end"], or empty vector [if WER=="mix"] --> series on file if (parll) indices = unlist( parallel::parLapply(cl, indices_tasks, runTwoStepClustering) ) @@ -202,7 +206,6 @@ claws = function(getSeries, K1, K2, parallel::stopCluster(cl) getRefSeries = getSeries - synchrones_file = paste(bin_dir,"synchrones",sep="") ; unlink(synchrones_file) if (WER=="mix") { indices = seq_len(ntasks*K2) diff --git a/epclust/R/plot.R b/epclust/R/plot.R new file mode 100644 index 0000000..096159d --- /dev/null +++ b/epclust/R/plot.R @@ -0,0 +1,3 @@ +#TODO: some visualization +#for (i in 1:6) {plot(medoids_ascii[i,1:200],type="l",ylim=c(-5,5),col=i);par(new=TRUE)} +#... -- 2.44.0