From 3c5a4b0880db63367a474a568e1322b3999932fe Mon Sep 17 00:00:00 2001
From: Benjamin Auder <benjamin.auder@somewhere>
Date: Mon, 13 Mar 2017 12:42:59 +0100
Subject: [PATCH] 'update'

---
 epclust/R/clustering.R        | 53 +++++++++++++++--------------------
 epclust/R/computeSynchrones.R |  6 ++--
 epclust/R/computeWerDists.R   | 41 +++++++++++++--------------
 epclust/R/de_serialize.R      | 32 ++++++++++-----------
 epclust/R/main.R              | 53 ++++++++++++++++++++---------------
 epclust/R/utils.R             | 49 +++++++++++++++++---------------
 6 files changed, 117 insertions(+), 117 deletions(-)

diff --git a/epclust/R/clustering.R b/epclust/R/clustering.R
index bea073a..b91d512 100644
--- a/epclust/R/clustering.R
+++ b/epclust/R/clustering.R
@@ -1,30 +1,28 @@
-#' @name clustering
-#' @rdname clustering
-#' @aliases clusteringTask1 clusteringTask2 computeClusters1 computeClusters2
+#' Two-stage clustering, within one task (see \code{claws()})
 #'
-#' @title Two-stage clustering, withing one task (see \code{claws()})
+#' \code{clusteringTask1()} runs one full stage-1 task, which consists in iterated
+#' stage 1 clustering on nb_curves / ntasks energy contributions, computed through
+#' discrete wavelets coefficients.
+#' \code{clusteringTask2()} runs a full stage-2 task, which consists in WER distances
+#' computations between medoids (indices) output from stage 1, before applying
+#' the second clustering algorithm on the distances matrix.
 #'
-#' @description \code{clusteringTask1()} runs one full stage-1 task, which consists in
-#'   iterated stage 1 clustering on nb_curves / ntasks energy contributions, computed
-#'   through discrete wavelets coefficients.
-#'   \code{clusteringTask2()} runs a full stage-2 task, which consists in
-#'   WER distances computations between medoids indices output from stage 1,
-#'   before applying the second clustering algorithm, on the distances matrix.
-#'
-#' @param indices Range of series indices to cluster
 #' @param getContribs Function to retrieve contributions from initial series indices:
 #'   \code{getContribs(indices)} outputs a contributions matrix
 #' @inheritParams claws
 #' @inheritParams computeSynchrones
+#' @inheritParams computeWerDists
+#'
+#' @return The indices of the computed (resp. K1 and K2) medoids.
 #'
-#' @return For \code{clusteringTask1()}, the indices of the computed (K1) medoids.
-#'   Indices are irrelevant for stage 2 clustering, thus \code{clusteringTask2()}
-#'   outputs a big.matrix of medoids (of size LxK2, K2 = final number of clusters)
+#' @name clustering
+#' @rdname clustering
+#' @aliases clusteringTask1 clusteringTask2
 NULL
 
 #' @rdname clustering
 #' @export
-clusteringTask1 = function(indices, getContribs, K1, algoClust1, nb_series_per_chunk,
+clusteringTask1 = function(indices, getContribs, K1, algoClust1, nb_items_clust,
 	ncores_clust=1, verbose=FALSE, parll=TRUE)
 {
 	if (parll)
@@ -36,7 +34,7 @@ clusteringTask1 = function(indices, getContribs, K1, algoClust1, nb_series_per_c
 	while (length(indices) > K1)
 	{
 		# Balance tasks by splitting the indices set - as evenly as possible
-		indices_workers = .splitIndices(indices, nb_items_clust1)
+		indices_workers = .splitIndices(indices, nb_items_clust, min_size=K1+1)
 		if (verbose)
 			cat(paste("*** [iterated] Clustering task 1 on ",length(indices)," series\n", sep=""))
 		indices <-
@@ -66,22 +64,17 @@ clusteringTask2 = function(indices, getSeries, K2, algoClust2, nb_series_per_chu
 	nvoice, nbytes, endian, ncores_clust=1, verbose=FALSE, parll=TRUE)
 {
 	if (verbose)
-		cat(paste("*** Clustering task 2 on ",ncol(medoids)," synchrones\n", sep=""))
-
-	if (ncol(medoids) <= K2)
-		return (medoids)
+		cat(paste("*** Clustering task 2 on ",length(indices)," medoids\n", sep=""))
 
-	# A) Obtain synchrones, that is to say the cumulated power consumptions
-	#    for each of the K1 initial groups
-	synchrones = computeSynchrones(medoids, getRefSeries, nb_ref_curves,
-		nb_series_per_chunk, ncores_clust, verbose, parll)
+	if (length(indices) <= K2)
+		return (indices)
 
-	# B) Compute the WER distances (Wavelets Extended coefficient of deteRmination)
-	distances = computeWerDists(
-		synchrones, nvoice, nbytes, endian, ncores_clust, verbose, parll)
+	# A) Compute the WER distances (Wavelets Extended coefficient of deteRmination)
+	distances = computeWerDists(indices, getSeries, nb_series_per_chunk,
+		nvoice, nbytes, endian, ncores_clust, verbose, parll)
 
-	# C) Apply clustering algorithm 2 on the WER distances matrix
+	# B) Apply clustering algorithm 2 on the WER distances matrix
 	if (verbose)
 		cat(paste("*** algoClust2() on ",nrow(distances)," items\n", sep=""))
-	medoids[ ,algoClust2(distances,K2) ]
+	indices[ algoClust2(distances,K2) ]
 }
diff --git a/epclust/R/computeSynchrones.R b/epclust/R/computeSynchrones.R
index f73e64e..09ff3a0 100644
--- a/epclust/R/computeSynchrones.R
+++ b/epclust/R/computeSynchrones.R
@@ -68,14 +68,14 @@ computeSynchrones = function(medoids, getSeries, nb_curves,
 		medoids_desc <- bigmemory::describe(medoids)
 		cl = parallel::makeCluster(ncores_clust)
 		parallel::clusterExport(cl, envir=environment(),
-			varlist=c("synchrones_desc","m_desc","medoids_desc","getRefSeries"))
+			varlist=c("synchrones_desc","m_desc","medoids_desc","getSeries"))
 	}
 
 	if (verbose)
 		cat(paste("--- Compute ",K," synchrones with ",nb_curves," series\n", sep=""))
 
-	# Balance tasks by splitting 1:nb_ref_curves into groups of size <= nb_series_per_chunk
-	indices_workers = .splitIndices(seq_len(nb_ref_curves), nb_series_per_chunk)
+	# Balance tasks by splitting 1:nb_curves into groups of size <= nb_series_per_chunk
+	indices_workers = .splitIndices(seq_len(nb_curves), nb_series_per_chunk)
 	ignored <-
 		if (parll)
 			parallel::parLapply(cl, indices_workers, computeSynchronesChunk)
diff --git a/epclust/R/computeWerDists.R b/epclust/R/computeWerDists.R
index aae1cc1..a813b8f 100644
--- a/epclust/R/computeWerDists.R
+++ b/epclust/R/computeWerDists.R
@@ -3,21 +3,24 @@
 #' Compute the WER distances between the synchrones curves (in columns), which are
 #' returned (e.g.) by \code{computeSynchrones()}
 #'
-#' @param synchrones A big.matrix of synchrones, in columns. The series have same
-#'   length as the series in the initial dataset
+#' @param indices Range of series indices to cluster
 #' @inheritParams claws
+#' @inheritParams computeSynchrones
 #'
-#' @return A distances matrix of size K1 x K1
+#' @return A distances matrix of size K x K where K == length(indices)
 #'
 #' @export
-computeWerDists = function(synchrones, nvoice, nbytes,endian,ncores_clust=1,
-	verbose=FALSE,parll=TRUE)
+computeWerDists = function(indices, getSeries, nb_series_per_chunk, nvoice, nbytes, endian,
+	ncores_clust=1, verbose=FALSE, parll=TRUE)
 {
-	n <- ncol(synchrones)
-	L <- nrow(synchrones)
+	n <- length(indices)
+	L <- length(getSeries(1)) #TODO: not very nice way to get L
 	noctave = ceiling(log2(L)) #min power of 2 to cover serie range
+	# Since a CWT contains noctave*nvoice complex series, we deduce the number of CWT to
+	# retrieve/put in one chunk.
+	nb_cwt_per_chunk = max(1, floor(nb_series_per_chunk / (nvoice*noctave*2)))
 
-	# Initialize result as a square big.matrix of size 'number of synchrones'
+	# Initialize result as a square big.matrix of size 'number of medoids'
 	Xwer_dist <- bigmemory::big.matrix(nrow=n, ncol=n, type="double")
 
 	# Generate n(n-1)/2 pairs for WER distances computations
@@ -30,7 +33,7 @@ computeWerDists = function(synchrones, nvoice, nbytes,endian,ncores_clust=1,
 	}
 
 	cwt_file = ".cwt.bin"
-	# Compute the synchrones[,indices] CWT, and store the results in the binary file
+	# Compute the getSeries(indices) CWT, and store the results in the binary file
 	computeSaveCWT = function(indices)
 	{
 		if (parll)
@@ -38,27 +41,25 @@ computeWerDists = function(synchrones, nvoice, nbytes,endian,ncores_clust=1,
 			require("bigmemory", quietly=TRUE)
 			require("Rwave", quietly=TRUE)
 			require("epclust", quietly=TRUE)
-			synchrones <- bigmemory::attach.big.matrix(synchrones_desc)
 		}
 
 		# Obtain CWT as big vectors of real part + imaginary part (concatenate)
 		ts_cwt <- sapply(indices, function(i) {
-			ts <- scale(ts(synchrones[,i]), center=TRUE, scale=FALSE)
+			ts <- scale(ts(getSeries(i)), center=TRUE, scale=FALSE)
 			ts_cwt <- Rwave::cwt(ts, noctave, nvoice, w0=2*pi, twoD=TRUE, plot=FALSE)
 			c( as.double(Re(ts_cwt)),as.double(Im(ts_cwt)) )
 		})
 
 		# Serialization
-		binarize(ts_cwt, cwt_file, 1, ",", nbytes, endian)
+		binarize(ts_cwt, cwt_file, nb_cwt_per_chunk, ",", nbytes, endian)
 	}
 
 	if (parll)
 	{
 		cl = parallel::makeCluster(ncores_clust)
-		synchrones_desc <- bigmemory::describe(synchrones)
 		Xwer_dist_desc <- bigmemory::describe(Xwer_dist)
-		parallel::clusterExport(cl, varlist=c("parll","synchrones_desc","Xwer_dist_desc",
-			"noctave","nvoice","verbose","getCWT"), envir=environment())
+		parallel::clusterExport(cl, varlist=c("parll","nb_cwt_per_chunk","L",
+			"Xwer_dist_desc","noctave","nvoice","getCWT"), envir=environment())
 	}
 
 	if (verbose)
@@ -71,7 +72,7 @@ computeWerDists = function(synchrones, nvoice, nbytes,endian,ncores_clust=1,
 			lapply(1:n, computeSaveCWT)
 
 	# Function to retrieve a synchrone CWT from (binary) file
-	getSynchroneCWT = function(index, L)
+	getCWT = function(index, L)
 	{
 		flat_cwt <- getDataInFile(index, cwt_file, nbytes, endian)
 		cwt_length = length(flat_cwt) / 2
@@ -84,8 +85,6 @@ computeWerDists = function(synchrones, nvoice, nbytes,endian,ncores_clust=1,
 
 
 #TODO: better repartition here, 
-	#better code in .splitIndices :: never exceed nb_per_chunk; arg: min_per_chunk (default: 1)
-###TODO: reintroduire nb_items_clust ======> l'autre est typiquement + grand !!! (pas de relation !)
 
 
 
@@ -97,7 +96,6 @@ computeWerDists = function(synchrones, nvoice, nbytes,endian,ncores_clust=1,
 			# parallel workers start with an empty environment
 			require("bigmemory", quietly=TRUE)
 			require("epclust", quietly=TRUE)
-			synchrones <- bigmemory::attach.big.matrix(synchrones_desc)
 			Xwer_dist <- bigmemory::attach.big.matrix(Xwer_dist_desc)
 		}
 
@@ -106,9 +104,8 @@ computeWerDists = function(synchrones, nvoice, nbytes,endian,ncores_clust=1,
 			cat(paste("   Distances (",i,",",j,"), (",i,",",j+1,") ...\n", sep=""))
 
 		# Compute CWT of columns i and j in synchrones
-		L = nrow(synchrones)
-		cwt_i <- getSynchroneCWT(i, L)
-		cwt_j <- getSynchroneCWT(j, L)
+		cwt_i <- getCWT(i, L)
+		cwt_j <- getCWT(j, L)
 
 		# Compute the ratio of integrals formula 5.6 for WER^2
 		# in https://arxiv.org/abs/1101.4744v2 §5.3
diff --git a/epclust/R/de_serialize.R b/epclust/R/de_serialize.R
index a49990b..f28038b 100644
--- a/epclust/R/de_serialize.R
+++ b/epclust/R/de_serialize.R
@@ -1,28 +1,28 @@
-#' @name de_serialize
-#' @rdname de_serialize
-#' @aliases binarize binarizeTransform getDataInFile
+#' (De)Serialization of a [big]matrix or data stream
 #'
-#' @title (De)Serialization of a [big]matrix or data stream
+#' \code{binarize()} serializes a matrix or CSV file with minimal overhead, into a
+#' binary file. \code{getDataInFile()} achieves the inverse task: she retrieves (ASCII)
+#' data rows from indices in the binary file. Finally, \code{binarizeTransform()}
+#' serialize transformations of all data chunks. To use it a data-retrieval function
+#' must be provided -- thus \code{binarize} will most likely be used first
+#' (and then a function defined to seek in generated binary file)
 #'
-#' @description \code{binarize()} serializes a matrix or CSV file with minimal overhead,
-#'   into a binary file. \code{getDataInFile()} achieves the inverse task: she retrieves
-#'   (ASCII) data rows from indices in the binary file. Finally,
-#'   \code{binarizeTransform()} serialize transformations of all data chunks; to use it,
-#'   a data-retrieval function must be provided, thus \code{binarize} will most likely be
-#'   used first (and then a function defined to seek in generated binary file)
-#'
-#' @param data_ascii Either a matrix or CSV file, with items in rows
-#' @param indices Indices of the lines to retrieve
-#' @param data_bin_file Name of binary file on output (\code{binarize})
-#'   or input (\code{getDataInFile})
+#' @param data_ascii Either a matrix (by columns) or CSV file or connection (by rows)
+#' @param data_bin_file Name of binary file on output of (\code{binarize})
+#'   or input of (\code{getDataInFile})
 #' @param nb_per_chunk Number of lines to process in one batch (big.matrix or connection)
-#' @inheritParams claws
 #' @param getData Function to retrieve data chunks
 #' @param transform Transformation function to apply on data chunks
+#' @param indices Indices of the lines to retrieve
+#' @inheritParams claws
 #'
 #' @return For \code{getDataInFile()}, the matrix with rows corresponding to the
 #'   requested indices. \code{binarizeTransform} returns the number of processed lines.
 #'   \code{binarize} is designed to serialize in several calls, thus returns nothing.
+#'
+#' @name de_serialize
+#' @rdname de_serialize
+#' @aliases binarize binarizeTransform getDataInFile
 NULL
 
 #' @rdname de_serialize
diff --git a/epclust/R/main.R b/epclust/R/main.R
index 603f7bf..ce650ff 100644
--- a/epclust/R/main.R
+++ b/epclust/R/main.R
@@ -45,8 +45,8 @@
 #'   }
 #' @param K1 Number of clusters to be found after stage 1 (K1 << N [number of series])
 #' @param K2 Number of clusters to be found after stage 2 (K2 << K1)
-#' @param nb_series_per_chunk (Maximum) number of series to retrieve in one batch;
-#'   this value is also used for the (maximum) number of series to cluster at a time
+#' @param nb_series_per_chunk (Maximum) number of series to retrieve in one batch
+#' @param nb_items_clust (~Maximum) number of items in clustering algorithm 1 input
 #' @param algoClust1 Clustering algorithm for stage 1. A function which takes (data, K)
 #'   as argument where data is a matrix in columns and K the desired number of clusters,
 #'   and outputs K medoids ranks. Default: PAM. In our method, this function is called
@@ -150,7 +150,7 @@
 #' digest::sha1(res_db)
 #' }
 #' @export
-claws <- function(getSeries, K1, K2, nb_series_per_chunk,
+claws <- function(series, K1, K2, nb_series_per_chunk, nb_items_clust=7*K1,
 	algoClust1=function(data,K) cluster::pam(t(data),K,diss=FALSE,pamonce=1)$id.med,
 	algoClust2=function(dists,K) cluster::pam(dists,K,diss=TRUE,pamonce=1)$id.med,
 	wav_filt="d8", contrib_type="absolute", WER="end", nvoice=4, random=TRUE,
@@ -167,10 +167,7 @@ claws <- function(getSeries, K1, K2, nb_series_per_chunk,
 	K1 <- .toInteger(K1, function(x) x>=2)
 	K2 <- .toInteger(K2, function(x) x>=2)
 	nb_series_per_chunk <- .toInteger(nb_series_per_chunk, function(x) x>=1)
-	# K1 (number of clusters at step 1) cannot exceed nb_series_per_chunk, because we will need
-	# to load K1 series in memory for clustering stage 2.
-	if (K1 > nb_series_per_chunk)
-		stop("'K1' cannot exceed 'nb_series_per_chunk'")
+	nb_items_clust <- .toInteger(nb_items_clust, function(x) x>K1)
 	random <- .toLogical(random)
 	tryCatch( {ignored <- wavelets::wt.filter(wav_filt)},
 		error = function(e) stop("Invalid wavelet filter; see ?wavelets::wt.filter") )
@@ -247,14 +244,20 @@ claws <- function(getSeries, K1, K2, nb_series_per_chunk,
 		# Initialize parallel runs: outfile="" allow to output verbose traces in the console
 		# under Linux. All necessary variables are passed to the workers.
 		cl = parallel::makeCluster(ncores_tasks, outfile="")
-		parallel::clusterExport(cl, envir = environment(),
-			varlist = c("getSeries","getContribs","K1","K2","algoClust1","algoClust2",
-			"nb_series_per_chunk","ncores_clust","nvoice","nbytes","endian","verbose","parll"))
+		varlist = c("ncores_clust","verbose","parll", #task 1 & 2
+			"K1","getContribs","algoClust1","nb_items_clust") #task 1
+		if (WER=="mix")
+		{
+			# Add variables for task 2
+			varlist = c(varlist, "K2","getSeries","algoClust2","nb_series_per_chunk",
+				"nvoice","nbytes","endian")
+		}
+		parallel::clusterExport(cl, varlist, envir = environment())
 	}
 
 	# This function achieves one complete clustering task, divided in stage 1 + stage 2.
-	# stage 1: n indices  --> clusteringTask1(...) --> K1 medoids
-	# stage 2: K1 medoids --> clusteringTask2(...) --> K2 medoids,
+	# stage 1: n indices  --> clusteringTask1(...) --> K1 medoids (indices)
+	# stage 2: K1 indices --> K1xK1 WER distances --> clusteringTask2(...) --> K2 medoids,
 	# where n = N / ntasks, N being the total number of curves.
 	runTwoStepClustering = function(inds)
 	{
@@ -263,7 +266,7 @@ claws <- function(getSeries, K1, K2, nb_series_per_chunk,
 		if (parll && ntasks>1)
 			require("epclust", quietly=TRUE)
 		indices_medoids = clusteringTask1(inds, getContribs, K1, algoClust1,
-			nb_series_per_chunk, ncores_clust, verbose, parll)
+			nb_items_clust, ncores_clust, verbose, parll)
 		if (WER=="mix")
 		{
 			indices_medoids = clusteringTask2(indices_medoids, getSeries, K2, algoClust2,
@@ -280,8 +283,8 @@ claws <- function(getSeries, K1, K2, nb_series_per_chunk,
 		cat(paste(message,"\n", sep=""))
 	}
 
-	# As explained above, indices will be assigned to ntasks*K1 medoids indices [if WER=="end"],
-	# or nothing (empty vector) if WER=="mix"; in this case, synchrones are stored in a file.
+	# As explained above, we obtain after all runs ntasks*[K1 or K2] medoids indices,
+	# depending wether WER=="end" or "mix", respectively.
 	indices_medoids_all <-
 		if (parll && ntasks>1)
 			unlist( parallel::parLapply(cl, indices_tasks, runTwoStepClustering) )
@@ -291,22 +294,26 @@ claws <- function(getSeries, K1, K2, nb_series_per_chunk,
 	if (parll && ntasks>1)
 		parallel::stopCluster(cl)
 
-	# Right before the final stage, input data still is the initial set of curves, referenced
-	# by the ntasks*[K1 or K2] medoids indices.
+	# For the last stage, ncores_tasks*(ncores_clusts+1) cores should be available:
+	#  - ntasks for level 1 parallelism
+	#  - ntasks*ncores_clust for level 2 parallelism,
+	# but since an extension MPI <--> tasks / OpenMP <--> sub-tasks is on the way,
+	# it's better to just re-use ncores_clust
+	ncores_last_stage <- ncores_clust
 
-	# Run last clustering tasks to obtain only K2 medoids indices, from ntasks*[K1 or K2]
-	# indices, depending wether WER=="end" or WER=="mix"
+	# Run last clustering tasks to obtain only K2 medoids indices
 	if (verbose)
 		cat("...Run final // stage 1 + stage 2\n")
 	indices_medoids = clusteringTask1(indices_medoids_all, getContribs, K1, algoClust1,
-		nb_series_per_chunk, ncores_tasks*ncores_clust, verbose, parll)
+		nb_items_clust, ncores_tasks*ncores_clust, verbose, parll)
 	indices_medoids = clusteringTask2(indices_medoids, getContribs, K2, algoClust2,
-		nb_series_per_chunk, nvoice, nbytes, endian, ncores_tasks*ncores_clust, verbose, parll)
+		nb_series_per_chunk, nvoice, nbytes, endian, ncores_last_stage, verbose, parll)
 
-	# Compute synchrones
+	# Compute synchrones, that is to say the cumulated power consumptions for each of the K2
+	# final groups.
 	medoids = getSeries(indices_medoids)
 	synchrones = computeSynchrones(medoids, getSeries, nb_curves, nb_series_per_chunk,
-		ncores_tasks*ncores_clust, verbose, parll)
+		ncores_last_stage, verbose, parll)
 
 	# NOTE: no need to use big.matrix here, since there are only K2 << K1 << N remaining curves
 	list("medoids"=medoids, "ranks"=indices_medoids, "synchrones"=synchrones)
diff --git a/epclust/R/utils.R b/epclust/R/utils.R
index ba643d0..e79c009 100644
--- a/epclust/R/utils.R
+++ b/epclust/R/utils.R
@@ -30,13 +30,13 @@
 #' Compute the discrete wavelet coefficients for each series, and aggregate them in
 #' energy contribution across scales as described in https://arxiv.org/abs/1101.4744v2
 #'
-#' @param series [big.]matrix of series (in columns), of size L x n
+#' @param curves [big.]matrix of series (in columns), of size L x n
 #' @inheritParams claws
 #'
 #' @return A matrix of size log(L) x n containing contributions in columns
 #'
 #' @export
-curvesToContribs = function(series, wav_filt, contrib_type, coin=FALSE)
+curvesToContribs = function(series, wav_filt, contrib_type)
 {
 	L = nrow(series)
 	D = ceiling( log2(L) )
@@ -55,9 +55,9 @@ curvesToContribs = function(series, wav_filt, contrib_type, coin=FALSE)
 	})
 }
 
-# Helper function to divide indices into balanced sets
-# If max == TRUE, sets sizes cannot exceed nb_per_set
-.splitIndices = function(indices, nb_per_set, max=FALSE)
+# Helper function to divide indices into balanced sets.
+# Ensure that all indices sets have at least min_size elements.
+.splitIndices = function(indices, nb_per_set, min_size=1)
 {
 	L = length(indices)
 	nb_workers = floor( L / nb_per_set )
@@ -65,29 +65,32 @@ curvesToContribs = function(series, wav_filt, contrib_type, coin=FALSE)
 	if (nb_workers == 0 || (nb_workers==1 && rem==0))
 	{
 		# L <= nb_per_set, simple case
-		indices_workers = list(indices)
+		return (list(indices))
 	}
-	else
-	{
-		indices_workers = lapply( seq_len(nb_workers), function(i)
-			indices[(nb_per_set*(i-1)+1):(nb_per_set*i)] )
 
-		if (max)
-		{
-			# Sets are not so well balanced, but size is supposed to be critical
-			return ( c( indices_workers, if (rem>0) list((L-rem+1):L) else NULL ) )
-		}
+	indices_workers = lapply( seq_len(nb_workers), function(i)
+		indices[(nb_per_set*(i-1)+1):(nb_per_set*i)] )
 
-		# Spread the remaining load among the workers
-		rem = L %% nb_per_set
-		while (rem > 0)
+	rem = L %% nb_per_set #number of remaining unassigned items
+	if (rem == 0)
+		return (indices_workers)
+
+	rem <- (L-rem+1):L
+	# If remainder is smaller than min_size, feed it with indices from other sets
+	# until either its size exceed min_size (success) or other sets' size
+	# get lower min_size (failure).
+	while (length(rem) < min_size)
+	{
+		index = length(rem) %% nb_workers + 1
+		if (length(indices_workers[[index]]) <= min_size)
 		{
-			index = rem%%nb_workers + 1
-			indices_workers[[index]] = c(indices_workers[[index]], indices[L-rem+1])
-			rem = rem - 1
+			stop("Impossible to split indices properly for clustering.
+				Try increasing nb_items_clust or decreasing K1")
 		}
+		rem = c(rem, tail(indices_workers[[index]],1))
+		indices_workers[[index]] = head( indices_workers[[index]], -1)
 	}
-	indices_workers
+	return ( c(indices_workers, list(rem) ) )
 }
 
 #' filterMA
@@ -98,7 +101,7 @@ curvesToContribs = function(series, wav_filt, contrib_type, coin=FALSE)
 #' @param M_ A real matrix of size LxD
 #' @param w_ The (odd) number of values to average
 #'
-#' @return The filtered matrix, of same size as the input
+#' @return The filtered matrix (in columns), of same size as the input
 #' @export
 filterMA = function(M_, w_)
 	.Call("filterMA", M_, w_, PACKAGE="epclust")
-- 
2.44.0