From c45fd66342e40c8b5387fc6f0059c4d3a9718340 Mon Sep 17 00:00:00 2001
From: Benjamin Auder <benjamin.auder@somewhere>
Date: Wed, 8 Mar 2017 03:12:36 +0100
Subject: [PATCH] parallel version running; TODO: check==sequential, plotting
 routines, parser; check memory

---
 epclust/R/clustering.R | 31 ++++++++++++++-----------------
 epclust/R/main.R       | 37 ++++++++++++++++++++-----------------
 epclust/R/plot.R       |  3 +++
 3 files changed, 37 insertions(+), 34 deletions(-)
 create mode 100644 epclust/R/plot.R

diff --git a/epclust/R/clustering.R b/epclust/R/clustering.R
index 14e1f83..9a55495 100644
--- a/epclust/R/clustering.R
+++ b/epclust/R/clustering.R
@@ -93,6 +93,8 @@ computeSynchrones = function(medoids, getRefSeries,
 {
 	computeSynchronesChunk = function(indices)
 	{
+		if (verbose)
+			cat(paste("--- Compute synchrones for ",length(indices)," lines\n", sep=""))
 		ref_series = getRefSeries(indices)
 		#get medoids indices for this chunk of series
 		for (i in seq_len(nrow(ref_series)))
@@ -118,17 +120,14 @@ computeSynchrones = function(medoids, getRefSeries,
 		m <- synchronicity::boost.mutex()
 
 	indices_workers = .spreadIndices(seq_len(nb_ref_curves), nb_series_per_chunk)
-	for (inds in indices_workers)
-	{
-		if (verbose)
-			cat(paste("--- Compute synchrones for ",length(inds)," lines\n", sep=""))
+	ignored <-
 		if (parll)
-			ignored <- parallel::mcparallel(computeSynchronesChunk(inds))
+		{
+			parallel::mclapply(indices_workers, computeSynchronesChunk,
+				mc.cores=ncores_clust, mc.allow.recursive=FALSE)
+		}
 		else
-			computeSynchronesChunk(inds)
-	}
-	if (parll)
-		parallel::mccollect()
+			lapply(indices_workers, computeSynchronesChunk)
 
 	mat_syncs = matrix(nrow=K, ncol=ncol(medoids))
 	vec_count = rep(NA, K)
@@ -234,18 +233,16 @@ computeWerDists = function(synchrones, ncores_clust=1,verbose=FALSE,parll=TRUE)
 	if (parll)
 		m <- synchronicity::boost.mutex()
 
-	for (i in 1:(n-1))
-	{
+	ignored <-
 		if (parll)
-			ignored <- parallel::mcparallel(computeDistancesLineI(i))
+		{
+			parallel::mclapply(seq_len(n-1), computeDistancesLineI,
+				mc.cores=ncores_clust, mc.allow.recursive=FALSE)
+		}
 		else
-			computeDistancesLineI(i)
-	}
+			lapply(seq_len(n-1), computeDistancesLineI)
 	Xwer_dist[n,n] = 0.
 
-	if (parll)
-		parallel::mccollect()
-
 	mat_dists = matrix(nrow=n, ncol=n)
 	#TODO: avoid this loop?
 	for (i in 1:n)
diff --git a/epclust/R/main.R b/epclust/R/main.R
index bf31c87..bba0618 100644
--- a/epclust/R/main.R
+++ b/epclust/R/main.R
@@ -161,22 +161,6 @@ claws = function(getSeries, K1, K2,
 	if (nb_series_per_task < min_series_per_chunk)
 		stop("Too many tasks: less series in one task than min_series_per_chunk!")
 
-	# Cluster contributions in parallel (by nb_series_per_chunk)
-	indices_all = if (random) sample(nb_curves) else seq_len(nb_curves)
-	indices_tasks = lapply(seq_len(ntasks), function(i) {
-		upper_bound = ifelse( i<ntasks, min(nb_series_per_task*i,nb_curves), nb_curves )
-		indices_all[((i-1)*nb_series_per_task+1):upper_bound]
-	})
-	if (verbose)
-		cat(paste("...Run ",ntasks," x stage 1 in parallel\n",sep=""))
-	if (parll)
-	{
-		cl = parallel::makeCluster(ncores_tasks)
-		parallel::clusterExport(cl, varlist=c("getSeries","getContribs","K1","K2","verbose","parll",
-			"nb_series_per_chunk","ncores_clust","synchrones_file","sep","nbytes","endian"),
-			envir = environment())
-	}
-
 	runTwoStepClustering = function(inds)
 	{
 		if (parll)
@@ -193,6 +177,26 @@ claws = function(getSeries, K1, K2,
 		indices_medoids
 	}
 
+	# Cluster contributions in parallel (by nb_series_per_chunk)
+	indices_all = if (random) sample(nb_curves) else seq_len(nb_curves)
+	indices_tasks = lapply(seq_len(ntasks), function(i) {
+		upper_bound = ifelse( i<ntasks, min(nb_series_per_task*i,nb_curves), nb_curves )
+		indices_all[((i-1)*nb_series_per_task+1):upper_bound]
+	})
+	if (verbose)
+		cat(paste("...Run ",ntasks," x stage 1 in parallel\n",sep=""))
+	if (WER=="mix")
+		{synchrones_file = paste(bin_dir,"synchrones",sep="") ; unlink(synchrones_file)}
+	if (parll)
+	{
+		cl = parallel::makeCluster(ncores_tasks)
+		varlist = c("getSeries","getContribs","K1","K2","verbose","parll",
+			"nb_series_per_chunk","ncores_clust","sep","nbytes","endian")
+		if (WER=="mix")
+			varlist = c(varlist, "synchrones_file")
+		parallel::clusterExport(cl, varlist=varlist, envir = environment())
+	}
+
 	# 1000*K1 indices [if WER=="end"], or empty vector [if WER=="mix"] --> series on file
 	if (parll)
 		indices = unlist( parallel::parLapply(cl, indices_tasks, runTwoStepClustering) )
@@ -202,7 +206,6 @@ claws = function(getSeries, K1, K2,
 		parallel::stopCluster(cl)
 
 	getRefSeries = getSeries
-	synchrones_file = paste(bin_dir,"synchrones",sep="") ; unlink(synchrones_file)
 	if (WER=="mix")
 	{
 		indices = seq_len(ntasks*K2)
diff --git a/epclust/R/plot.R b/epclust/R/plot.R
new file mode 100644
index 0000000..096159d
--- /dev/null
+++ b/epclust/R/plot.R
@@ -0,0 +1,3 @@
+#TODO: some visualization
+#for (i in 1:6) {plot(medoids_ascii[i,1:200],type="l",ylim=c(-5,5),col=i);par(new=TRUE)}
+#...
-- 
2.44.0