From 2b9f5356793c245a5e10229a74ac0dabd8f62508 Mon Sep 17 00:00:00 2001
From: Benjamin Auder <benjamin.auder@somewhere>
Date: Fri, 10 Mar 2017 12:09:09 +0100
Subject: [PATCH] Add comments to easily read code

---
 epclust/R/clustering.R |  18 +++++--
 epclust/R/main.R       | 106 +++++++++++++++++++++++++++++------------
 2 files changed, 90 insertions(+), 34 deletions(-)

diff --git a/epclust/R/clustering.R b/epclust/R/clustering.R
index 14915ab..70d263e 100644
--- a/epclust/R/clustering.R
+++ b/epclust/R/clustering.R
@@ -11,8 +11,8 @@
 #'   and then WER distances computations, before applying the clustering algorithm.
 #'   \code{computeClusters1()} and \code{computeClusters2()} correspond to the atomic
 #'   clustering procedures respectively for stage 1 and 2. The former applies the
-#'   clustering algorithm (PAM) on a contributions matrix, while the latter clusters
-#'   a chunk of series inside one task (~max nb_series_per_chunk)
+#'   first clustering algorithm on a contributions matrix, while the latter clusters
+#'   a set of series inside one task (~nb_items_clust)
 #'
 #' @param indices Range of series indices to cluster in parallel (initial data)
 #' @param getContribs Function to retrieve contributions from initial series indices:
@@ -31,11 +31,23 @@ NULL
 #' @rdname clustering
 #' @export
 clusteringTask1 = function(
-	indices, getContribs, K1, nb_items_per_chunk, ncores_clust=1, verbose=FALSE, parll=TRUE)
+	indices, getContribs, K1, nb_per_chunk, nb_items_clust, ncores_clust=1,
+	verbose=FALSE, parll=TRUE)
 {
 	if (verbose)
 		cat(paste("*** Clustering task 1 on ",length(indices)," lines\n", sep=""))
 
+
+
+
+
+
+##TODO: reviser le spreadIndices, tenant compte de nb_items_clust
+
+	##TODO: reviser / harmoniser avec getContribs qui en récupère pt'et + pt'et - !!
+
+
+
 	if (parll)
 	{
 		cl = parallel::makeCluster(ncores_clust)
diff --git a/epclust/R/main.R b/epclust/R/main.R
index 86dac64..cbc92b1 100644
--- a/epclust/R/main.R
+++ b/epclust/R/main.R
@@ -41,6 +41,12 @@
 #' @param K2 Number of clusters to be found after stage 2 (K2 << K1)
 #' @param nb_per_chunk (Maximum) number of items to retrieve in one batch, for both types of
 #'   retrieval: resp. series and contribution; in a vector of size 2
+#' @param algo_clust1 Clustering algorithm for stage 1. A function which takes (data, K)
+#'   as argument where data is a matrix in columns and K the desired number of clusters,
+#'   and outputs K medoids ranks. Default: PAM
+#' @param algo_clust2 Clustering algorithm for stage 2. A function which takes (dists, K)
+#'   as argument where dists is a matrix of distances and K the desired number of clusters,
+#'   and outputs K clusters representatives (curves). Default: k-means
 #' @param nb_items_clust1 (Maximum) number of items in input of the clustering algorithm
 #'   for stage 1
 #' @param wav_filt Wavelet transform filter; see ?wavelets::wt.filter
@@ -128,14 +134,16 @@
 #' digest::sha1(medoids_db)
 #' }
 #' @export
-claws <- function(getSeries, K1, K2,
-	nb_per_chunk,nb_items_clust1=7*K1 #volumes of data
-	wav_filt="d8",contrib_type="absolute", #stage 1
-	WER="end", #stage 2
-	random=TRUE, #randomize series order?
-	ntasks=1, ncores_tasks=1, ncores_clust=4, #parallelism
-	sep=",", #ASCII input separator
-	nbytes=4, endian=.Platform$endian, #serialization (write,read)
+claws <- function(getSeries, K1, K2, nb_per_chunk,
+	nb_items_clust1=7*K1,
+	algo_clust1=function(data,K) cluster::pam(data,K,diss=FALSE),
+	algo_clust2=function(dists,K) stats::kmeans(dists,K,iter.max=50,nstart=3),
+	wav_filt="d8",contrib_type="absolute",
+	WER="end",
+	random=TRUE,
+	ntasks=1, ncores_tasks=1, ncores_clust=4,
+	sep=",",
+	nbytes=4, endian=.Platform$endian,
 	verbose=FALSE, parll=TRUE)
 {
 	# Check/transform arguments
@@ -176,9 +184,16 @@ claws <- function(getSeries, K1, K2,
 	verbose <- .toLogical(verbose)
 	parll <- .toLogical(parll)
 
-	# Serialize series if required, to always use a function
+	# Since we don't make assumptions on initial data, there is a possibility that even
+	# when serialized, contributions or synchrones do not fit in RAM. For example,
+	# 30e6 series of length 100,000 would lead to a +4Go contribution matrix. Therefore,
+	# it's safer to place these in (binary) files, located in the following folder.
 	bin_dir <- ".epclust_bin/"
 	dir.create(bin_dir, showWarnings=FALSE, mode="0755")
+
+	# Binarize series if getSeries is not a function; the aim is to always use a function,
+	# to uniformize treatments. An equally good alternative would be to use a file-backed
+	# bigmemory::big.matrix, but it would break the uniformity.
 	if (!is.function(getSeries))
 	{
 		if (verbose)
@@ -199,14 +214,43 @@ claws <- function(getSeries, K1, K2,
 		contribs_file, nb_series_per_chunk, nbytes, endian)
 	getContribs = function(indices) getDataInFile(indices, contribs_file, nbytes, endian)
 
+	# A few sanity checks: do not continue if too few data available.
 	if (nb_curves < K2)
 		stop("Not enough data: less series than final number of clusters")
 	nb_series_per_task = round(nb_curves / ntasks)
 	if (nb_series_per_task < K2)
 		stop("Too many tasks: less series in one task than final number of clusters")
 
+	# Generate a random permutation of 1:N (if random==TRUE); otherwise just use arrival
+	# (storage) order.
+	indices_all = if (random) sample(nb_curves) else seq_len(nb_curves)
+	# Split (all) indices into ntasks groups of ~same size
+	indices_tasks = lapply(seq_len(ntasks), function(i) {
+		upper_bound = ifelse( i<ntasks, min(nb_series_per_task*i,nb_curves), nb_curves )
+		indices_all[((i-1)*nb_series_per_task+1):upper_bound]
+	})
+
+	if (parll && ntasks>1)
+	{
+		# Initialize parallel runs: outfile="" allow to output verbose traces in the console
+		# under Linux. All necessary variables are passed to the workers.
+		cl = parallel::makeCluster(ncores_tasks, outfile="")
+		varlist = c("getSeries","getContribs","K1","K2","algo_clust1","algo_clust2",
+			"nb_per_chunk","nb_items_clust","ncores_clust","sep","nbytes","endian",
+			"verbose","parll")
+		if (WER=="mix")
+			varlist = c(varlist, "medoids_file")
+		parallel::clusterExport(cl, varlist, envir = environment())
+	}
+
+	# This function achieves one complete clustering task, divided in stage 1 + stage 2.
+	# stage 1: n indices  --> clusteringTask1(...) --> K1 medoids
+	# stage 2: K1 medoids --> clusteringTask2(...) --> K2 medoids,
+	# where n = N / ntasks, N being the total number of curves.
 	runTwoStepClustering = function(inds)
 	{
+		# When running in parallel, the environment is blank: we need to load required
+		# packages, and pass useful variables.
 		if (parll && ntasks>1)
 			require("epclust", quietly=TRUE)
 		indices_medoids = clusteringTask1(
@@ -218,18 +262,18 @@ claws <- function(getSeries, K1, K2,
 			medoids1 = bigmemory::as.big.matrix( getSeries(indices_medoids) )
 			medoids2 = clusteringTask2(medoids1, K2, getSeries, nb_curves, nb_series_per_chunk,
 				nbytes, endian, ncores_clust, verbose, parll)
-			binarize(medoids2, synchrones_file, nb_series_per_chunk, sep, nbytes, endian)
+			binarize(medoids2, medoids_file, nb_series_per_chunk, sep, nbytes, endian)
 			return (vector("integer",0))
 		}
 		indices_medoids
 	}
 
-	# Cluster contributions in parallel (by nb_series_per_chunk)
-	indices_all = if (random) sample(nb_curves) else seq_len(nb_curves)
-	indices_tasks = lapply(seq_len(ntasks), function(i) {
-		upper_bound = ifelse( i<ntasks, min(nb_series_per_task*i,nb_curves), nb_curves )
-		indices_all[((i-1)*nb_series_per_task+1):upper_bound]
-	})
+	# Synchrones (medoids) need to be stored only if WER=="mix"; indeed in this case, every
+	# task output is a set of new (medoids) curves. If WER=="end" however, output is just a
+	# set of indices, representing some initial series.
+	if (WER=="mix")
+		{medoids_file = paste(bin_dir,"medoids",sep="") ; unlink(medoids_file)}
+
 	if (verbose)
 	{
 		message = paste("...Run ",ntasks," x stage 1", sep="")
@@ -237,19 +281,10 @@ claws <- function(getSeries, K1, K2,
 			message = paste(message," + stage 2", sep="")
 		cat(paste(message,"\n", sep=""))
 	}
-	if (WER=="mix")
-		{synchrones_file = paste(bin_dir,"synchrones",sep="") ; unlink(synchrones_file)}
-	if (parll && ntasks>1)
-	{
-		cl = parallel::makeCluster(ncores_tasks, outfile="")
-		varlist = c("getSeries","getContribs","K1","K2","verbose","parll",
-			"nb_series_per_chunk","ntasks","ncores_clust","sep","nbytes","endian")
-		if (WER=="mix")
-			varlist = c(varlist, "synchrones_file")
-		parallel::clusterExport(cl, varlist=varlist, envir = environment())
-	}
 
-	# 1000*K1 indices [if WER=="end"], or empty vector [if WER=="mix"] --> series on file
+	# As explained above, indices will be assigned to ntasks*K1 medoids indices [if WER=="end"],
+	# or nothing (empty vector) if WER=="mix"; in this case, medoids (synchrones) are stored
+	# in a file.
 	indices <-
 		if (parll && ntasks>1)
 			unlist( parallel::parLapply(cl, indices_tasks, runTwoStepClustering) )
@@ -258,13 +293,20 @@ claws <- function(getSeries, K1, K2,
 	if (parll && ntasks>1)
 		parallel::stopCluster(cl)
 
+	# Right before the final stage, two situations are possible:
+	#  a. data to be processed now sit in binary format in medoids_file (if WER=="mix")
+	#  b. data still is the initial set of curves, referenced by the ntasks*K1 indices
+	# So, the function getSeries() will potentially change. However, computeSynchrones()
+	# requires a function retrieving the initial series. Thus, the next line saves future
+	# conditional instructions.
 	getRefSeries = getSeries
+
 	if (WER=="mix")
 	{
 		indices = seq_len(ntasks*K2)
-		#Now series must be retrieved from synchrones_file
+		# Now series must be retrieved from synchrones_file
 		getSeries = function(inds) getDataInFile(inds, synchrones_file, nbytes, endian)
-		#Contributions must be re-computed
+		# Contributions must be re-computed
 		unlink(contribs_file)
 		index = 1
 		if (verbose)
@@ -283,9 +325,11 @@ claws <- function(getSeries, K1, K2,
 	medoids2 = clusteringTask2(medoids1, K2, getRefSeries, nb_curves, nb_series_per_chunk,
 		nbytes, endian, ncores_tasks*ncores_clust, verbose, parll)
 
-	# Cleanup
+	# Cleanup: remove temporary binary files and their folder
 	unlink(bin_dir, recursive=TRUE)
 
+	# Return medoids as a standard matrix, since K2 series have to fit in RAM
+	# (clustering algorithm 1 takes K1 > K2 of them as input)
 	medoids2[,]
 }
 
-- 
2.44.0