From 37c82bbafbffc19e8b47a521952bac58f189e9ea Mon Sep 17 00:00:00 2001
From: Benjamin Auder <benjamin.auder@somewhere>
Date: Fri, 10 Mar 2017 16:43:00 +0100
Subject: [PATCH] save state: wrong idea for indices repartition

---
 epclust/R/clustering.R                        | 43 +++++++++----------
 epclust/R/main.R                              | 35 ++++++++-------
 epclust/tests/testthat/test.clustering.R      | 40 ++++++++---------
 epclust/tests/testthat/test.de_serialize.R    | 36 ++++++++--------
 .../testthat/{test.Filter.R => test.filter.R} |  0
 5 files changed, 75 insertions(+), 79 deletions(-)
 rename epclust/tests/testthat/{test.Filter.R => test.filter.R} (100%)

diff --git a/epclust/R/clustering.R b/epclust/R/clustering.R
index 70d263e..36b4769 100644
--- a/epclust/R/clustering.R
+++ b/epclust/R/clustering.R
@@ -30,32 +30,20 @@ NULL
 
 #' @rdname clustering
 #' @export
-clusteringTask1 = function(
-	indices, getContribs, K1, nb_per_chunk, nb_items_clust, ncores_clust=1,
-	verbose=FALSE, parll=TRUE)
+clusteringTask1 = function(indices, getContribs, K1, nb_items_clust1,
+	ncores_clust=1, verbose=FALSE, parll=TRUE)
 {
 	if (verbose)
 		cat(paste("*** Clustering task 1 on ",length(indices)," lines\n", sep=""))
 
-
-
-
-
-
-##TODO: reviser le spreadIndices, tenant compte de nb_items_clust
-
-	##TODO: reviser / harmoniser avec getContribs qui en récupère pt'et + pt'et - !!
-
-
-
 	if (parll)
 	{
-		cl = parallel::makeCluster(ncores_clust)
+		cl = parallel::makeCluster(ncores_clust, outfile = "")
 		parallel::clusterExport(cl, varlist=c("getContribs","K1","verbose"), envir=environment())
 	}
 	while (length(indices) > K1)
 	{
-		indices_workers = .spreadIndices(indices, nb_series_per_chunk)
+		indices_workers = .spreadIndices(indices, nb_items_clust1, K1+1)
 		indices <-
 			if (parll)
 			{
@@ -329,20 +317,31 @@ computeWerDists = function(synchrones, nbytes,endian,ncores_clust=1,verbose=FALS
 }
 
 # Helper function to divide indices into balanced sets
-.spreadIndices = function(indices, nb_per_chunk)
+.spreadIndices = function(indices, max_per_set, min_nb_per_set = 1)
 {
 	L = length(indices)
-	nb_workers = floor( L / nb_per_chunk )
-	if (nb_workers == 0)
+	min_nb_workers = floor( L / max_per_set )
+	rem = L %% max_per_set
+	if (nb_workers == 0 || (nb_workers==1 && rem==0))
 	{
-		# L < nb_series_per_chunk, simple case
+		# L <= max_nb_per_set, simple case
 		indices_workers = list(indices)
 	}
 	else
 	{
 		indices_workers = lapply( seq_len(nb_workers), function(i)
-			indices[(nb_per_chunk*(i-1)+1):(nb_per_chunk*i)] )
-		# Spread the remaining load among the workers
+			indices[(max_nb_per_set*(i-1)+1):(max_per_set*i)] )
+		# Two cases: remainder is >= min_per_set (easy)...
+		if (rem >= min_nb_per_set)
+			indices_workers = c( indices_workers, list(tail(indices,rem)) )
+		#...or < min_per_set: harder, need to remove indices from current sets to feed
+		# the too-small remainder. It may fail: then fallback to "slightly bigger sets"
+		else
+		{
+			save_indices_workers = indices_workers
+			small_set = tail(indices,rem)
+			# Try feeding small_set until it reaches min_per_set, whle keeping the others big enough
+			# Spread the remaining load among the workers
 		rem = L %% nb_per_chunk
 		while (rem > 0)
 		{
diff --git a/epclust/R/main.R b/epclust/R/main.R
index 31ce390..a039d1c 100644
--- a/epclust/R/main.R
+++ b/epclust/R/main.R
@@ -39,16 +39,16 @@
 #'   }
 #' @param K1 Number of clusters to be found after stage 1 (K1 << N [number of series])
 #' @param K2 Number of clusters to be found after stage 2 (K2 << K1)
-#' @param nb_per_chunk (Maximum) number of items to retrieve in one batch, for both types of
-#'   retrieval: resp. series and contribution; in a vector of size 2
+#' @param nb_series_per_chunk (Maximum) number of series to retrieve in one batch
 #' @param algo_clust1 Clustering algorithm for stage 1. A function which takes (data, K)
 #'   as argument where data is a matrix in columns and K the desired number of clusters,
 #'   and outputs K medoids ranks. Default: PAM
 #' @param algo_clust2 Clustering algorithm for stage 2. A function which takes (dists, K)
 #'   as argument where dists is a matrix of distances and K the desired number of clusters,
 #'   and outputs K clusters representatives (curves). Default: k-means
-#' @param nb_items_clust1 (Maximum) number of items in input of the clustering algorithm
-#'   for stage 1
+#' @param nb_items_clust1 (~Maximum) number of items in input of the clustering algorithm
+#'   for stage 1. At worst, a clustering algorithm might be called with ~2*nb_items_clust1
+#'   items; but this could only happen at the last few iterations.
 #' @param wav_filt Wavelet transform filter; see ?wavelets::wt.filter
 #' @param contrib_type Type of contribution: "relative", "logit" or "absolute" (any prefix)
 #' @param WER "end" to apply stage 2 after stage 1 has fully iterated, or "mix" to apply
@@ -84,12 +84,12 @@
 #' series = do.call( cbind, lapply( 1:6, function(i)
 #'   do.call(cbind, wmtsa::wavBootstrap(ref_series[i,], n.realization=400)) ) )
 #' #dim(series) #c(2400,10001)
-#' medoids_ascii = claws(series, K1=60, K2=6, nb_per_chunk=c(200,500), verbose=TRUE)
+#' medoids_ascii = claws(series, K1=60, K2=6, 200, verbose=TRUE)
 #'
 #' # Same example, from CSV file
 #' csv_file = "/tmp/epclust_series.csv"
 #' write.table(series, csv_file, sep=",", row.names=FALSE, col.names=FALSE)
-#' medoids_csv = claws(csv_file, K1=60, K2=6, nb_per_chunk=c(200,500))
+#' medoids_csv = claws(csv_file, K1=60, K2=6, 200)
 #'
 #' # Same example, from binary file
 #' bin_file <- "/tmp/epclust_series.bin"
@@ -97,7 +97,7 @@
 #' endian <- "little"
 #' binarize(csv_file, bin_file, 500, nbytes, endian)
 #' getSeries <- function(indices) getDataInFile(indices, bin_file, nbytes, endian)
-#' medoids_bin <- claws(getSeries, K1=60, K2=6, nb_per_chunk=c(200,500))
+#' medoids_bin <- claws(getSeries, K1=60, K2=6, 200)
 #' unlink(csv_file)
 #' unlink(bin_file)
 #'
@@ -124,7 +124,7 @@
 #'   df_series <- dbGetQuery(series_db, request)
 #'   as.matrix(df_series[,"value"], nrow=serie_length)
 #' }
-#' medoids_db = claws(getSeries, K1=60, K2=6, nb_per_chunk=c(200,500))
+#' medoids_db = claws(getSeries, K1=60, K2=6, 200))
 #' dbDisconnect(series_db)
 #'
 #' # All computed medoids should be the same:
@@ -134,11 +134,11 @@
 #' digest::sha1(medoids_db)
 #' }
 #' @export
-claws <- function(getSeries, K1, K2, nb_per_chunk,
+claws <- function(getSeries, K1, K2, nb_series_per_chunk,
 	nb_items_clust1=7*K1,
 	algo_clust1=function(data,K) cluster::pam(data,K,diss=FALSE),
 	algo_clust2=function(dists,K) stats::kmeans(dists,K,iter.max=50,nstart=3),
-	wav_filt="d8",contrib_type="absolute",
+	wav_filt="d8", contrib_type="absolute",
 	WER="end",
 	random=TRUE,
 	ntasks=1, ncores_tasks=1, ncores_clust=4,
@@ -155,12 +155,11 @@ claws <- function(getSeries, K1, K2, nb_per_chunk,
 	}
 	K1 <- .toInteger(K1, function(x) x>=2)
 	K2 <- .toInteger(K2, function(x) x>=2)
-	if (!is.numeric(nb_per_chunk) || length(nb_per_chunk)!=2)
-		stop("'nb_per_chunk': numeric, size 2")
-	nb_per_chunk[1] <- .toInteger(nb_per_chunk[1], function(x) x>=1)
-	# A batch of contributions should have at least as many elements as a batch of series,
-	# because it always contains much less values
-	nb_per_chunk[2] <- max(.toInteger(nb_per_chunk[2],function(x) x>=1), nb_per_chunk[1])
+	nb_series_per_chunk <- .toInteger(nb_series_per_chunk, function(x) x>=1)
+	# K1 (number of clusters at step 1) cannot exceed nb_series_per_chunk, because we will need
+	# to load K1 series in memory for clustering stage 2.
+	if (K1 > nb_series_per_chunk)
+		stop("'K1' cannot exceed 'nb_series_per_chunk'")
 	nb_items_clust1 <- .toInteger(nb_items_clust1, function(x) x>K1)
 	random <- .toLogical(random)
 	tryCatch
@@ -236,8 +235,8 @@ claws <- function(getSeries, K1, K2, nb_per_chunk,
 		# under Linux. All necessary variables are passed to the workers.
 		cl = parallel::makeCluster(ncores_tasks, outfile="")
 		varlist = c("getSeries","getContribs","K1","K2","algo_clust1","algo_clust2",
-			"nb_per_chunk","nb_items_clust","ncores_clust","sep","nbytes","endian",
-			"verbose","parll")
+			"nb_series_per_chunk","nb_items_clust","ncores_clust","sep",
+			"nbytes","endian","verbose","parll")
 		if (WER=="mix")
 			varlist = c(varlist, "medoids_file")
 		parallel::clusterExport(cl, varlist, envir = environment())
diff --git a/epclust/tests/testthat/test.clustering.R b/epclust/tests/testthat/test.clustering.R
index 6c94f92..7116e73 100644
--- a/epclust/tests/testthat/test.clustering.R
+++ b/epclust/tests/testthat/test.clustering.R
@@ -19,15 +19,14 @@ test_that("computeClusters1&2 behave as expected",
 		n = ndK[1] ; d = ndK[2] ; K = ndK[3]
 		cs = n/K #cluster size
 		Id = diag(d)
-		coefs = do.call(rbind,
-			lapply(1:K, function(i) MASS::mvrnorm(cs, c(rep(0,(i-1)),5,rep(0,d-i)), Id)))
+		coefs = sapply(1:K, function(i) MASS::mvrnorm(cs, c(rep(0,(i-1)),5,rep(0,d-i)), Id))
 		indices_medoids1 = computeClusters1(coefs, K, verbose=TRUE)
 		indices_medoids2 = computeClusters2(dist(coefs), K, verbose=TRUE)
 		# Get coefs assignments (to medoids)
 		assignment1 = sapply(seq_len(n), function(i)
-			which.min( rowSums( sweep(coefs[indices_medoids1,],2,coefs[i,],'-')^2 ) ) )
+			which.min( colSums( sweep(coefs[,indices_medoids1],1,coefs[,i],'-')^2 ) ) )
 		assignment2 = sapply(seq_len(n), function(i)
-			which.min( rowSums( sweep(coefs[indices_medoids2,],2,coefs[i,],'-')^2 ) ) )
+			which.min( colSums( sweep(coefs[,indices_medoids2],1,coefs[,i],'-')^2 ) ) )
 		for (i in 1:K)
 		{
 			expect_equal(sum(assignment1==i), cs, tolerance=5)
@@ -70,19 +69,19 @@ test_that("computeSynchrones behave as expected",
 	#sum((s1-s3)^2) == 58
 	#sum((s2-s3)^2) == 38
 	s = list(s1, s2, s3)
-	series = matrix(nrow=n, ncol=L)
+	series = matrix(nrow=L, ncol=n)
 	for (i in seq_len(n))
-		series[i,] = s[[I(i,K)]] + rnorm(L,sd=0.01)
+		series[,i] = s[[I(i,K)]] + rnorm(L,sd=0.01)
 	getRefSeries = function(indices) {
 		indices = indices[indices <= n]
-		if (length(indices)>0) series[indices,] else NULL
+		if (length(indices)>0) series[,indices] else NULL
 	}
-	synchrones = computeSynchrones(bigmemory::as.big.matrix(rbind(s1,s2,s3)), getRefSeries,
+	synchrones = computeSynchrones(bigmemory::as.big.matrix(cbind(s1,s2,s3)), getRefSeries,
 		n, 100, verbose=TRUE, parll=FALSE)
 
-	expect_equal(dim(synchrones), c(K,L))
+	expect_equal(dim(synchrones), c(L,K))
 	for (i in 1:K)
-		expect_equal(synchrones[i,], s[[i]], tolerance=0.01)
+		expect_equal(synchrones[,i], s[[i]], tolerance=0.01)
 })
 
 # NOTE: medoids can be a big.matrix
@@ -93,7 +92,7 @@ computeDistortion = function(series, medoids)
 	if (bigmemory::is.big.matrix(medoids))
 		medoids = medoids[,]
 	for (i in seq_len(n))
-		distortion = distortion + min( rowSums( sweep(medoids,2,series[i,],'-')^2 ) / L )
+		distortion = distortion + min( colSums( sweep(medoids,1,series[,i],'-')^2 ) / L )
 	distortion / n
 }
 
@@ -106,23 +105,23 @@ test_that("clusteringTask1 behave as expected",
 	s = lapply( seq_len(K1), function(i) x^(1+i/30)*cos(x+i) )
 	series = matrix(nrow=n, ncol=L)
 	for (i in seq_len(n))
-		series[i,] = s[[I(i,K1)]] + rnorm(L,sd=0.01)
+		series[,i] = s[[I(i,K1)]] + rnorm(L,sd=0.01)
 	getSeries = function(indices) {
 		indices = indices[indices <= n]
-		if (length(indices)>0) series[indices,] else NULL
+		if (length(indices)>0) series[,indices] else NULL
 	}
 	wf = "haar"
 	ctype = "absolute"
-	getContribs = function(indices) curvesToContribs(series[indices,],wf,ctype)
+	getContribs = function(indices) curvesToContribs(series[,indices],wf,ctype)
 	indices1 = clusteringTask1(1:n, getContribs, K1, 75, verbose=TRUE, parll=FALSE)
 	medoids_K1 = getSeries(indices1)
 
-	expect_equal(dim(medoids_K1), c(K1,L))
+	expect_equal(dim(medoids_K1), c(L,K1))
 	# Not easy to evaluate result: at least we expect it to be better than random selection of
 	# medoids within initial series
 	distorGood = computeDistortion(series, medoids_K1)
 	for (i in 1:3)
-		expect_lte( distorGood, computeDistortion(series,series[sample(1:n, K1),]) )
+		expect_lte( distorGood, computeDistortion(series,series[,sample(1:n, K1)]) )
 })
 
 test_that("clusteringTask2 behave as expected",
@@ -139,19 +138,18 @@ test_that("clusteringTask2 behave as expected",
 		series[i,] = s[[I(i,K1)]] + rnorm(L,sd=0.01)
 	getRefSeries = function(indices) {
 		indices = indices[indices <= n]
-		if (length(indices)>0) series[indices,] else NULL
+		if (length(indices)>0) series[,indices] else NULL
 	}
 	# Artificially simulate 60 medoids - perfect situation, all equal to one of the refs
-	medoids_K1 = bigmemory::as.big.matrix(
-		do.call(rbind, lapply( 1:K1, function(i) s[[I(i,K1)]] ) ) )
+	medoids_K1 = bigmemory::as.big.matrix( sapply( 1:K1, function(i) s[[I(i,K1)]] ) )
 	medoids_K2 = clusteringTask2(medoids_K1, K2, getRefSeries, n, 75, verbose=TRUE, parll=FALSE)
 
-	expect_equal(dim(medoids_K2), c(K2,L))
+	expect_equal(dim(medoids_K2), c(L,K2))
 	# Not easy to evaluate result: at least we expect it to be better than random selection of
 	# medoids within 1...K1 (among references)
 	distorGood = computeDistortion(series, medoids_K2)
 	for (i in 1:3)
-		expect_lte( distorGood, computeDistortion(series,medoids_K1[sample(1:K1, K2),]) )
+		expect_lte( distorGood, computeDistortion(series,medoids_K1[,sample(1:K1, K2)]) )
 })
 
 #NOTE: rather redundant test
diff --git a/epclust/tests/testthat/test.de_serialize.R b/epclust/tests/testthat/test.de_serialize.R
index 8403e6d..25f2c2b 100644
--- a/epclust/tests/testthat/test.de_serialize.R
+++ b/epclust/tests/testthat/test.de_serialize.R
@@ -2,11 +2,11 @@ context("de_serialize")
 
 test_that("serialization + getDataInFile retrieve original data / from matrix",
 {
-	data_bin_file = "/tmp/epclust_test_m.bin"
+	data_bin_file = ".epclust_test_m.bin"
 	unlink(data_bin_file)
 
-	#dataset 200 lignes / 30 columns
-	data_ascii = matrix(runif(200*30,-10,10),ncol=30)
+	#dataset 200 cols / 30 rows
+	data_ascii = matrix(runif(200*30,-10,10),nrow=30)
 	nbytes = 4 #lead to a precision of 1e-7 / 1e-8
 	endian = "little"
 
@@ -16,7 +16,7 @@ test_that("serialization + getDataInFile retrieve original data / from matrix",
 	for (indices in list(c(1,3,5), 3:13, c(5,20,50), c(75,130:135), 196:200))
 	{
 		data_lines = getDataInFile(indices, data_bin_file, nbytes, endian)
-		expect_equal(data_lines, data_ascii[indices,], tolerance=1e-6)
+		expect_equal(data_lines, data_ascii[,indices], tolerance=1e-6)
 	}
 	unlink(data_bin_file)
 
@@ -27,44 +27,44 @@ test_that("serialization + getDataInFile retrieve original data / from matrix",
 	for (indices in list(c(1,3,5), 3:13, c(5,20,50), c(75,130:135), 196:200))
 	{
 		data_lines = getDataInFile(indices, data_bin_file, nbytes, endian)
-		expect_equal(data_lines, data_ascii[indices,], tolerance=1e-6)
+		expect_equal(data_lines, data_ascii[,indices], tolerance=1e-6)
 	}
 	unlink(data_bin_file)
 })
 
 test_that("serialization + transform + getDataInFile retrieve original transforms",
 {
-	data_bin_file = "/tmp/epclust_test_t.bin"
+	data_bin_file = ".epclust_test_t.bin"
 	unlink(data_bin_file)
 
-	#dataset 200 lignes / 30 columns
-	data_ascii = matrix(runif(200*30,-10,10),ncol=30)
+	#dataset 200 cols / 30 rows
+	data_ascii = matrix(runif(200*30,-10,10),nrow=30)
 	nbytes = 8
 	endian = "little"
 
 	binarize(data_ascii, data_bin_file, 500, ",", nbytes, endian)
 	# Serialize transformation (just compute range) into a new binary file
-	trans_bin_file = "/tmp/epclust_test_t_trans.bin"
+	trans_bin_file = ".epclust_test_t_trans.bin"
 	unlink(trans_bin_file)
 	getSeries = function(inds) getDataInFile(inds, data_bin_file, nbytes, endian)
-	binarizeTransform(getSeries, function(series) t(apply(series, 1, range)),
+	binarizeTransform(getSeries, function(series) apply(series, 2, range),
 		trans_bin_file, 250, nbytes, endian)
 	unlink(data_bin_file)
 	expect_equal(file.info(trans_bin_file)$size, 2*nrow(data_ascii)*nbytes+8)
 	for (indices in list(c(1,3,5), 3:13, c(5,20,50), c(75,130:135), 196:200))
 	{
-		trans_lines = getDataInFile(indices, trans_bin_file, nbytes, endian)
-		expect_equal(trans_lines, t(apply(data_ascii[indices,],1,range)), tolerance=1e-6)
+		trans_cols = getDataInFile(indices, trans_bin_file, nbytes, endian)
+		expect_equal(trans_cols, apply(data_ascii[indices,],2,range), tolerance=1e-6)
 	}
 	unlink(trans_bin_file)
 })
 
 test_that("serialization + getDataInFile retrieve original data / from connection",
 {
-	data_bin_file = "/tmp/epclust_test_c.bin"
+	data_bin_file = ".epclust_test_c.bin"
 	unlink(data_bin_file)
 
-	#dataset 300 lignes / 50 columns
+	#dataset 300 cols / 50 rows
 	data_csv = system.file("testdata","de_serialize.csv",package="epclust")
 	nbytes = 8
 	endian = "big"
@@ -76,8 +76,8 @@ test_that("serialization + getDataInFile retrieve original data / from connectio
 	for (indices in list(c(1,3,5), 3:13, c(5,20,50), c(75,130:135), 196:200))
 	{
 		#HACK: as.matrix(as.data.frame( )) required to match (ref) data structure
-		data_lines = as.matrix(as.data.frame( getDataInFile(indices,data_bin_file,nbytes,endian) ))
-		expect_equal(data_lines, data_ascii[indices,])
+		data_cols = as.matrix(as.data.frame( getDataInFile(indices,data_bin_file,nbytes,endian) ))
+		expect_equal(data_cols, data_ascii[,indices])
 	}
 	unlink(data_bin_file)
 
@@ -87,8 +87,8 @@ test_that("serialization + getDataInFile retrieve original data / from connectio
 	expect_equal(file.info(data_bin_file)$size, 300*50*8+8)
 	for (indices in list(c(1,3,5), 3:13, c(5,20,50), c(75,130:135), 196:200))
 	{
-		data_lines = as.matrix(as.data.frame( getDataInFile(indices,data_bin_file,nbytes,endian) ))
-		expect_equal(data_lines, data_ascii[indices,])
+		data_cols = as.matrix(as.data.frame( getDataInFile(indices,data_bin_file,nbytes,endian) ))
+		expect_equal(data_cols, data_ascii[,indices])
 	}
 	unlink(data_bin_file)
 	#close(data_con) --> done in binarize()
diff --git a/epclust/tests/testthat/test.Filter.R b/epclust/tests/testthat/test.filter.R
similarity index 100%
rename from epclust/tests/testthat/test.Filter.R
rename to epclust/tests/testthat/test.filter.R
-- 
2.44.0