From 3eef8d3df59ded9a281cff51f79fe824198a7427 Mon Sep 17 00:00:00 2001
From: Benjamin Auder <benjamin.auder@somewhere>
Date: Sun, 5 Mar 2017 15:36:26 +0100
Subject: [PATCH] complete first draft of package

---
 epclust/R/clustering.R | 34 ++++++++++++++-----
 epclust/R/main.R       | 14 ++++----
 epclust/R/utils.R      | 74 +++++++++++++++++++++++++++++-------------
 3 files changed, 84 insertions(+), 38 deletions(-)

diff --git a/epclust/R/clustering.R b/epclust/R/clustering.R
index c8bad66..87a5f91 100644
--- a/epclust/R/clustering.R
+++ b/epclust/R/clustering.R
@@ -1,6 +1,6 @@
 # Cluster one full task (nb_curves / ntasks series)
 clusteringTask = function(indices,getSeries,getSeriesForSynchrones,synchrones_file,
-	getCoefs,K1,K2,nb_series_per_chunk,ncores,to_file)
+	getCoefs,K1,K2,nb_series_per_chunk,ncores,to_file,ftype)
 {
 	cl = parallel::makeCluster(ncores)
 	repeat
@@ -19,7 +19,8 @@ clusteringTask = function(indices,getSeries,getSeriesForSynchrones,synchrones_fi
 	parallel::stopCluster(cl)
 	if (K2 == 0)
 		return (indices)
-	computeClusters2(indices, K2, getSeries, getSeriesForSynchrones, to_file)
+	computeClusters2(indices, K2, getSeries, getSeriesForSynchrones, to_file,
+									 nb_series_per_chunk,ftype)
 	vector("integer",0)
 }
 
@@ -31,27 +32,42 @@ computeClusters1 = function(indices, getCoefs, K1)
 }
 
 # Cluster a chunk of series inside one task (~max nb_series_per_chunk)
-computeClusters2 = function(indices, K2, getSeries, getSeriesForSynchrones, to_file)
+computeClusters2 = function(indices, K2, getSeries, getSeriesForSynchrones, to_file,
+														nb_series_per_chunk, ftype)
 {
-	curves = computeSynchrones(indices, getSeries, getSeriesForSynchrones)
+	curves = computeSynchrones(indices, getSeries, getSeriesForSynchrones, nb_series_per_chunk)
 	dists = computeWerDists(curves)
 	medoids = cluster::pam(dists, K2, diss=TRUE)$medoids
 	if (to_file)
 	{
-		serialize(medoids, synchrones_file)
+		serialize(medoids, synchrones_file, ftype, nb_series_per_chunk)
 		return (NULL)
 	}
 	medoids
 }
 
 # Compute the synchrones curves (sum of clusters elements) from a clustering result
-computeSynchrones = function(indices, getSeries, getSeriesForSynchrones)
+computeSynchrones = function(indices, getSeries, getSeriesForSynchrones, nb_series_per_chunk)
 {
 	#les getSeries(indices) sont les medoides --> init vect nul pour chacun, puis incr avec les
 	#courbes (getSeriesForSynchrones) les plus proches... --> au sens de la norme L2 ?
-	series = getSeries(indices)
-	#...........
-	#sapply(seq_along(inds), colMeans(getSeries(inds[[i]]$indices,inds[[i]]$ids)))
+	medoids = getSeries(indices)
+	K = nrow(medoids)
+	synchrones = matrix(0, nrow=K, ncol=ncol(medoids))
+	counts = rep(0,K)
+	index = 1
+	repeat
+	{
+		series = getSeriesForSynchrones((index-1)+seq_len(nb_series_per_chunk))
+		if (is.null(series))
+			break
+		#get medoids indices for this chunk of series
+		index = which.min( rowSums( sweep(medoids, 2, series[i,], '-')^2 ) )
+		synchrones[index,] = synchrones[index,] + series[i,]
+		counts[index] = counts[index] + 1
+	}
+	#NOTE: odds for some clusters to be empty? (when series already come from stage 2)
+	synchrones = sweep(synchrones, 1, counts, '/')
 }
 
 # Compute the WER distance between the synchrones curves (in rows)
diff --git a/epclust/R/main.R b/epclust/R/main.R
index 0b59832..27fbb74 100644
--- a/epclust/R/main.R
+++ b/epclust/R/main.R
@@ -40,7 +40,7 @@
 #' cl = epclust(getData, K1=200, K2=15, ntasks=1000, nb_series_per_chunk=5000, WER="mix")
 #' @export
 epclust = function(series,K1,K2,ntasks=1,nb_series_per_chunk=50*K1,min_series_per_chunk=5*K1,
-	wf="haar",WER="end",ncores_tasks=1,ncores_clust=4,random=TRUE,...)
+	wf="haar",WER="end",ncores_tasks=1,ncores_clust=4,random=TRUE,ftype="float",...)
 {
 	# Check/transform arguments
 	bin_dir = "epclust.bin/"
@@ -51,7 +51,7 @@ epclust = function(series,K1,K2,ntasks=1,nb_series_per_chunk=50*K1,min_series_pe
 		unlink(series_file)
 	}
 	if (is.matrix(series))
-		serialize(series, series_file)
+		serialize(series, series_file, ftype, nb_series_per_chunk)
 	else if (!is.function(series))
 	{
 		tryCatch(
@@ -63,7 +63,7 @@ epclust = function(series,K1,K2,ntasks=1,nb_series_per_chunk=50*K1,min_series_pe
 					open(series)
 					series_con = series
 				}
-				serialize(series_con, series_file)
+				serialize(series_con, series_file, ftype, nb_series_per_chunk)
 				close(series_con)
 			},
 			error=function(e) "series should be a data.frame, a function or a valid connection"
@@ -94,7 +94,7 @@ epclust = function(series,K1,K2,ntasks=1,nb_series_per_chunk=50*K1,min_series_pe
 		if (is.null(series))
 			break
 		coeffs_chunk = curvesToCoeffs(series, wf)
-		serialize(coeffs_chunk, coefs_file)
+		serialize(coeffs_chunk, coefs_file, ftype, nb_series_per_chunk)
 		index = index + nb_series_per_chunk
 		nb_curves = nb_curves + nrow(coeffs_chunk)
 	}
@@ -116,7 +116,7 @@ epclust = function(series,K1,K2,ntasks=1,nb_series_per_chunk=50*K1,min_series_pe
 	#1000*K1 (or K2) indices (or NOTHING--> series on file)
 	indices = unlist( parallel::parLapply(cl, indices_tasks, function(inds) {
 		clusteringTask(inds, getSeries, getSeries, getCoefs, K1, K2*(WER=="mix"),
-			nb_series_per_chunk,ncores_clust,to_file=TRUE)
+			nb_series_per_chunk,ncores_clust,to_file=TRUE, ftype)
 	}) )
 	parallel::stopCluster(cl)
 
@@ -136,12 +136,12 @@ epclust = function(series,K1,K2,ntasks=1,nb_series_per_chunk=50*K1,min_series_pe
 			if (is.null(series))
 				break
 			coeffs_chunk = curvesToCoeffs(series, wf)
-			serialize(coeffs_chunk, coefs_file)
+			serialize(coeffs_chunk, coefs_file, ftype, nb_series_per_chunk)
 			index = index + nb_series_per_chunk
 		}
 	}
 
 	# Run step2 on resulting indices or series (from file)
 	clusteringTask(indices, getSeries, getSeriesForSynchrones, getCoefs, K1, K2,
-		nb_series_per_chunk, ncores_tasks*ncores_clust, to_file=FALSE)
+		nb_series_per_chunk, ncores_tasks*ncores_clust, to_file=FALSE, ftype)
 }
diff --git a/epclust/R/utils.R b/epclust/R/utils.R
index 7083674..40b0a18 100644
--- a/epclust/R/utils.R
+++ b/epclust/R/utils.R
@@ -10,28 +10,6 @@ toInteger <- function(x, condition)
 	x
 }
 
-writeCoeffs = function(coeffs)
-{
-	file = ".coeffs"
-	#.........
-	#C function (from data.frame, type of IDs ??! force integers ? [yes])
-	#return raw vector
-	#take raw vector, append it (binary mode) to a file
-#TODO: appendCoeffs() en C --> serialize et append to file
-}
-
-readCoeffs = function(indices)
-{
-	#......
-	file = ".coeffs"
-	#C function (from file name)
-}
-
-getSeries(data, rank=NULL, id=NULL)
-{
-	#TODO:
-}
-
 curvesToCoeffs = function(series, wf)
 {
 	L = length(series[1,])
@@ -43,3 +21,55 @@ curvesToCoeffs = function(series, wf)
 		rev( sapply( W, function(v) ( sqrt( sum(v^2) ) ) ) )
 	})
 }
+
+#data: matrix of double or connection
+serialize = function(data, file, type, nb_per_chunk)
+{
+	bin_data = file(file, "ab")
+	#write data length on first call
+	nbytes = ifelse(type=="double",8,4)
+	first_write = FALSE
+	if (file.info(file)$size == 0)
+	{
+		#number of items always on 8 bytes
+		writeBin(0L, bin_data, size=8) #,endian="little")
+		first_write = TRUE
+	}
+	if (is.matrix(data))
+	{
+		writeBin(t(data), bin_data, size=nbytes)
+		data_length = ncol(data)
+	}
+	else #if (is(data, "connection"))
+	{
+		if (first_write)
+		{
+			data_line = scan(data, double(), sep=",", nlines=1)
+			writeBin(data_line, bin_data, size=nbytes)
+			data_length = length(data_line)
+		}
+		repeat
+		{
+			data_chunk = scan(data, double(), sep=",", nlines=nb_per_chunk)
+			if (length(data_chunk)==0)
+				break
+			writeBin(data_chunk, bin_data, size=nbytes)
+		}
+	}
+	if (first_write)
+	{
+		#ecrire file_size-1 / (nbytes*nbWritten) en 0 dans bin_data ! ignored == file_size
+		ignored = seek(bin_data, 0)
+		writeBin(data_length, bin_data, size=8)
+	}
+	close(bin_data)
+}
+
+#TODO: read in binary file, always same structure
+getDataFromFile(indices, file, type)
+{
+	bin_data = file(file, "rb")
+	nbytes = ifelse(type=="double",8,4)
+	data_length = readBin(bin_data,"double",1,nbytes) #,endian="little")
+	t(sapply(indices, function(i) readBin(bin_data,"double",n=data_length,size=nbytes)))
+}
-- 
2.44.0