From: Benjamin Auder <benjamin.auder@somewhere>
Date: Mon, 9 Jan 2017 11:06:39 +0000 (+0100)
Subject: progress in main.R
X-Git-Url: https://git.auder.net/js/doc/%7B%7B%20asset%28%27mixstore/css/form.css%27%29%20%7D%7D?a=commitdiff_plain;h=6ecf5c2d4c84eb3c0c70d3f2ce44b900699cc0b4;p=epclust.git

progress in main.R
---

diff --git a/code/draft_R_pkg/DESCRIPTION b/code/draft_R_pkg/DESCRIPTION
index ce9129b..669e8c0 100644
--- a/code/draft_R_pkg/DESCRIPTION
+++ b/code/draft_R_pkg/DESCRIPTION
@@ -10,12 +10,11 @@ Author:
 	Jairo Cugliari <Jairo.Cugliari@univ-lyon2.fr> [aut]
 Maintainer: Benjamin Auder <Benjamin.Auder@math.u-psud.fr>
 Depends:
-    R (>= 3.0.0)
-Imports:
-		MASS
+    R (>= 3.0.0),
+		parallel,
+		cluster
 Suggests:
     testthat,
-    parallel,
     knitr
 License: MIT + file LICENSE
 VignetteBuilder: knitr
diff --git a/code/draft_R_pkg/R/main.R b/code/draft_R_pkg/R/main.R
index 695b928..19729ed 100644
--- a/code/draft_R_pkg/R/main.R
+++ b/code/draft_R_pkg/R/main.R
@@ -30,7 +30,17 @@ epclust = function(data, K, nbSeriesPerChunk, writeTmp=ref_writeTmp, readTmp=ref
 
 	#0) check arguments
 	if (!is.data.frame(data) && !is.function(data))
-		tryCatch({dataCon = open(data)},
+		tryCatch(
+			{
+				if (is.character(data))
+				{
+					dataCon = file(data, open="r")
+				} else if (!isOpen(data))
+				{
+					open(data)
+					dataCon = data
+				}
+			},
 			error="data should be a data.frame, a function or a valid connection")
 	if (!is.integer(K) || K < 2)
 		stop("K should be an integer greater or equal to 2")
@@ -44,17 +54,17 @@ epclust = function(data, K, nbSeriesPerChunk, writeTmp=ref_writeTmp, readTmp=ref
 
 	#1) acquire data (process curves, get as coeffs)
 	index = 1
-	nbCurves = nrow(data)
-	while (index < nbCurves)
+	nbCurves = 0
+	repeat
 	{
 		if (is.data.frame(data))
 		{
 			#full data matrix
-			writeTmp( getCoeffs( data[index:(min(index+nbSeriesPerChunk-1,nbCurves)),] ) )
+			error = writeTmp( getCoeffs( data[index:(min(index+nbSeriesPerChunk-1,nrow(data))),] ) )
 		} else if (is.function(data))
 		{
 			#custom user function to retrieve next n curves, probably to read from DB
-			writeTmp( getCoeffs( data(index, nbSeriesPerChunk) ) )
+			error = writeTmp( getCoeffs( data(index, nbSeriesPerChunk) ) )
 		} else
 		{
 			#incremental connection
@@ -62,7 +72,7 @@ epclust = function(data, K, nbSeriesPerChunk, writeTmp=ref_writeTmp, readTmp=ref
 			ascii_lines = readLines(dataCon, nbSeriesPerChunk)
 			seriesChunkFile = ".tmp/seriesChunk"
 			writeLines(ascii_lines, seriesChunkFile)
-			writeTmp( getCoeffs( read.csv(seriesChunkFile) ) )
+			error = writeTmp( getCoeffs( read.csv(seriesChunkFile) ) )
 		}
 		index = index + nbSeriesPerChunk
 	}
@@ -73,9 +83,11 @@ epclust = function(data, K, nbSeriesPerChunk, writeTmp=ref_writeTmp, readTmp=ref
 	ncores = ifelse(is.integer(ncores), ncores, parallel::detectCores())
 	cl = parallel::makeCluster(ncores)
 	parallel::clusterExport(cl=cl, varlist=c("X", "Y", "K", "p"), envir=environment())
+	library(cluster)
 	li = parallel::parLapply(cl, 1:B, getParamsAtIndex)
 
-	#2) process coeffs (by nbSeriesPerChunk) and cluster in parallel (just launch async task, wait for them to complete, and re-do if necessary)
+	#2) process coeffs (by nbSeriesPerChunk) and cluster them in parallel
+	#TODO: be careful of writing to a new temp file, then flush initial one, then re-use it...
 	repeat
 	{
 		completed = rep(FALSE, ............)
@@ -85,7 +97,7 @@ epclust = function(data, K, nbSeriesPerChunk, writeTmp=ref_writeTmp, readTmp=ref
 		#C) flush tmp file (current parallel processes will write in it)
 		#always check "complete" flag (array, as I did in MPI) to know if "slaves" finished
 	}
-
+pam(x, k
 	parallel::stopCluster(cl)
 
 	#3) readTmp last results, apply PAM on it, and return medoids + identifiers