From: Benjamin Auder <benjamin.auder@somewhere>
Date: Mon, 9 Jan 2017 02:17:35 +0000 (+0100)
Subject: advance on main.R
X-Git-Url: https://git.auder.net/%7B%7B%20asset%28%27mixstore/images/assets/current/doc/%7B%7B?a=commitdiff_plain;h=8e6accca063927f6864a9cc6b01e2e77085bab95;p=epclust.git

advance on main.R
---

diff --git a/code/draft_R_pkg/R/main.R b/code/draft_R_pkg/R/main.R
index a01385e..c3e4b54 100644
--- a/code/draft_R_pkg/R/main.R
+++ b/code/draft_R_pkg/R/main.R
@@ -10,7 +10,11 @@ writeTmp(curves [uncompressed coeffs, limited number - nbSeriesPerChunk], last=F
 readTmp(..., from index, n curves) #careful: connection must remain open
 #TODO: write read/write tmp reference ( on file in .tmp/ folder ... )
 
-epclust = function(data=NULL, K, nbPerChunk, ..., writeTmp=ref_writeTmp, readTmp=ref_readTmp) #where to put/retrieve intermediate results; if not provided, use file on disk
+#data:
+#stop("Unrecognizable 'data' argument (must be numeric, functional or connection)")
+
+#WER: "end" to apply stage 2 after stage 1 iterated, "mix" (or anything else...?!) to apply it after every stage 1
+epclust = function(data, K, nbPerChunk, WER="end", ncores=NULL, writeTmp=ref_writeTmp, readTmp=ref_readTmp) #where to put/retrieve intermediate results; if not provided, use file on disk
 {
 
 
@@ -37,17 +41,39 @@ epclust = function(data=NULL, K, nbPerChunk, ..., writeTmp=ref_writeTmp, readTmp
 		#incremental connection
 		#read it one by one and get coeffs until nbSeriesPerChunk
 		#then launch a clustering task............
+		#TODO: find a better way to parse than using a temp file
 		ascii_lines = readLines(data, nbSeriesPerChunk)
-		seriesChunkFile = ".tmp/seriesChunk" #TODO: find a better way
+		seriesChunkFile = ".tmp/seriesChunk"
 		writeLines(ascii_lines, seriesChunkFile)
 		writeTmp( getCoeffs( read.csv(seriesChunkFile) ) )
-	} else
-		stop("Unrecognizable 'data' argument (must be numeric, functional or connection)")
+	}
+
+	library(parallel)
+	ncores = ifelse(is.numeric(ncores), ncores, parallel::detectCores())
+	cl = parallel::makeCluster(ncores)
+115     parallel::clusterExport(cl=cl, varlist=c("X", "Y", "K", "p"), envir=environment())
+116     li = parallel::parLapply(cl, 1:B, getParamsAtIndex)
 
 	#2) process coeffs (by nbSeriesPerChunk) and cluster in parallel (just launch async task, wait for them to complete, and re-do if necessary)
+	repeat
+	{
+		completed = rep(FALSE, ............)
+		#while there is jobs to do (i.e. size of tmp "file" is greater than nbSeriesPerChunk),
+		#A) determine which tasks which processor will do (OK)
+		#B) send each (sets of) tasks in parallel
+		#C) flush tmp file (current parallel processes will write in it)
+		#always check "complete" flag (array, as I did in MPI) to know if "slaves" finished
+	}
+
+parallel::stopCluster(cl)
 
+	#3) readTmp last results, apply PAM on it, and return medoids + identifiers
 
-	#3) apply stage 2 (in parallel ? inside task 2) ?)
+	#4) apply stage 2 (in parallel ? inside task 2) ?)
+	if (WER == "end")
+	{
+		#from center curves, apply stage 2...
+	}
 }
 
 getCoeffs = function(series)