From b9f1c0c7e2b96b52dadc5c4fcc77372f8c7dd8d8 Mon Sep 17 00:00:00 2001
From: Benjamin Auder <benjamin.auder@somewhere>
Date: Mon, 9 Jan 2017 15:56:00 +0100
Subject: [PATCH] progress in main.R

---
 code/draft_R_pkg/R/algorithms.R | 10 +++++++++
 code/draft_R_pkg/R/main.R       | 38 ++++++++++++++++++++++-----------
 2 files changed, 36 insertions(+), 12 deletions(-)
 create mode 100644 code/draft_R_pkg/R/algorithms.R

diff --git a/code/draft_R_pkg/R/algorithms.R b/code/draft_R_pkg/R/algorithms.R
new file mode 100644
index 0000000..e27a235
--- /dev/null
+++ b/code/draft_R_pkg/R/algorithms.R
@@ -0,0 +1,10 @@
+getCoeffs = function(series)
+{
+	#... return wavelets coeffs : compute in parallel !
+}
+
+getClusters = function(data, K)
+{
+	pam_output = pam(data, K)
+	return ( list(clusts=pam_output$clustering, medoids=pam_output$medoids) )
+}
diff --git a/code/draft_R_pkg/R/main.R b/code/draft_R_pkg/R/main.R
index 19729ed..6dca708 100644
--- a/code/draft_R_pkg/R/main.R
+++ b/code/draft_R_pkg/R/main.R
@@ -60,19 +60,38 @@ epclust = function(data, K, nbSeriesPerChunk, writeTmp=ref_writeTmp, readTmp=ref
 		if (is.data.frame(data))
 		{
 			#full data matrix
-			error = writeTmp( getCoeffs( data[index:(min(index+nbSeriesPerChunk-1,nrow(data))),] ) )
+			if (index < nrow(data))
+			{
+				writeTmp( getCoeffs( data[index:(min(index+nbSeriesPerChunk-1,nrow(data))),] ) )
+			} else
+			{
+				break
+			}
 		} else if (is.function(data))
 		{
 			#custom user function to retrieve next n curves, probably to read from DB
-			error = writeTmp( getCoeffs( data(index, nbSeriesPerChunk) ) )
+			coeffs_chunk = getCoeffs( data(index, nbSeriesPerChunk) )
+			if (!is.null(coeffs_chunk))
+			{
+				writeTmp(coeffs_chunk)
+			} else
+			{
+				break
+			}
 		} else
 		{
 			#incremental connection
 			#TODO: find a better way to parse than using a temp file
 			ascii_lines = readLines(dataCon, nbSeriesPerChunk)
-			seriesChunkFile = ".tmp/seriesChunk"
-			writeLines(ascii_lines, seriesChunkFile)
-			error = writeTmp( getCoeffs( read.csv(seriesChunkFile) ) )
+			if (length(ascii_lines > 0))
+			{
+				seriesChunkFile = ".tmp/seriesChunk"
+				writeLines(ascii_lines, seriesChunkFile)
+				writeTmp( getCoeffs( read.csv(seriesChunkFile) ) )
+			} else
+			{
+				break
+			}
 		}
 		index = index + nbSeriesPerChunk
 	}
@@ -84,7 +103,7 @@ epclust = function(data, K, nbSeriesPerChunk, writeTmp=ref_writeTmp, readTmp=ref
 	cl = parallel::makeCluster(ncores)
 	parallel::clusterExport(cl=cl, varlist=c("X", "Y", "K", "p"), envir=environment())
 	library(cluster)
-	li = parallel::parLapply(cl, 1:B, getParamsAtIndex)
+	li = parallel::parLapply(cl, 1:B, )
 
 	#2) process coeffs (by nbSeriesPerChunk) and cluster them in parallel
 	#TODO: be careful of writing to a new temp file, then flush initial one, then re-use it...
@@ -97,7 +116,7 @@ epclust = function(data, K, nbSeriesPerChunk, writeTmp=ref_writeTmp, readTmp=ref
 		#C) flush tmp file (current parallel processes will write in it)
 		#always check "complete" flag (array, as I did in MPI) to know if "slaves" finished
 	}
-pam(x, k
+pam(x, k)
 	parallel::stopCluster(cl)
 
 	#3) readTmp last results, apply PAM on it, and return medoids + identifiers
@@ -108,8 +127,3 @@ pam(x, k
 		#from center curves, apply stage 2...
 	}
 }
-
-getCoeffs = function(series)
-{
-	#... return wavelets coeffs : compute in parallel !
-}
-- 
2.44.0