From 8702eb86906bd6d59e07bb887e690a20f29be63f Mon Sep 17 00:00:00 2001
From: Benjamin Auder <benjamin.auder@somewhere>
Date: Mon, 6 Mar 2017 20:18:22 +0100
Subject: [PATCH] Fix unit tests

---
 .gitignore                                    |   2 +-
 TODO                                          |  12 ++
 epclust/DESCRIPTION                           |   1 -
 epclust/R/clustering.R                        |  40 +++---
 epclust/R/de_serialize.R                      |   8 +-
 epclust/R/main.R                              |  60 +++++----
 epclust/data/TODO_example.RData               |   0
 .../inst/testdata/TODO_clusteringInput.csv    |   0
 epclust/tests/testthat/test.clustering.R      | 121 +++++++++++++++++-
 9 files changed, 193 insertions(+), 51 deletions(-)
 delete mode 100644 epclust/data/TODO_example.RData
 delete mode 100644 epclust/inst/testdata/TODO_clusteringInput.csv

diff --git a/.gitignore b/.gitignore
index 8db5c77..e735132 100644
--- a/.gitignore
+++ b/.gitignore
@@ -2,7 +2,7 @@
 /data/*
 !/data/README
 !/data/preprocessing/
-/data/prrprocessing/*
+/data/preprocessing/*
 !/data/preprocessing/convert.c
 !/data/preprocessing/Makefile
 
diff --git a/TODO b/TODO
index f5e0015..275c10d 100644
--- a/TODO
+++ b/TODO
@@ -26,3 +26,15 @@ utiliser Rcpp ?
 #fct qui pour deux series (ID, medoides) renvoie distance WER (Rwave ou à moi)
 #transformee croisee , smoothing lissage 3 composantes , + calcul pour WER
 #determiner nvoice noctave (entre octave + petit et + grand)
+
+#TODO: load some dataset ASCII CSV
+#data_bin_file <<- "/tmp/epclust_test.bin"
+#unlink(data_bin_file)
+
+#https://stat.ethz.ch/pipermail/r-help/2011-June/280133.html
+#randCov = function(d)
+#{
+#	x <- matrix(rnorm(d*d), nrow=d)
+#	x <- x / sqrt(rowSums(x^2))
+#	x %*% t(x)
+#}
diff --git a/epclust/DESCRIPTION b/epclust/DESCRIPTION
index c607000..030e944 100644
--- a/epclust/DESCRIPTION
+++ b/epclust/DESCRIPTION
@@ -23,7 +23,6 @@ Suggests:
 License: MIT + file LICENSE
 RoxygenNote: 5.0.1
 Collate:
-    'utils.R'
     'clustering.R'
     'de_serialize.R'
     'main.R'
diff --git a/epclust/R/clustering.R b/epclust/R/clustering.R
index fce1b1c..493f90f 100644
--- a/epclust/R/clustering.R
+++ b/epclust/R/clustering.R
@@ -2,16 +2,24 @@
 clusteringTask = function(indices, getCoefs, K1, nb_series_per_chunk, ncores)
 {
 	cl = parallel::makeCluster(ncores)
+	parallel::clusterExport(cl, varlist=c("getCoefs","K1"), envir=environment())
 	repeat
 	{
-		nb_workers = max( 1, round( length(indices) / nb_series_per_chunk ) )
-		indices_workers = lapply(seq_len(nb_workers), function(i) {
-			upper_bound = ifelse( i<nb_workers,
-				min(nb_series_per_chunk*i,length(indices)), length(indices) )
-			indices[(nb_series_per_chunk*(i-1)+1):upper_bound]
-		})
-		indices = unlist( parallel::parLapply(cl, indices_workers, function(inds)
-			computeClusters1(getCoefs(inds), K1)) )
+		nb_workers = max( 1, floor( length(indices) / nb_series_per_chunk ) )
+		indices_workers = lapply( seq_len(nb_workers), function(i)
+			indices[(nb_series_per_chunk*(i-1)+1):(nb_series_per_chunk*i)] )
+		# Spread the remaining load among the workers
+		rem = length(indices) %% nb_series_per_chunk
+		while (rem > 0)
+		{
+			index = rem%%nb_workers + 1
+			indices_workers[[index]] = c(indices_workers[[index]], tail(indices,rem))
+			rem = rem - 1
+		}
+		indices = unlist( parallel::parLapply( cl, indices_workers, function(inds) {
+			require("epclust", quietly=TRUE)
+			inds[ computeClusters1(getCoefs(inds), K1) ]
+		} ) )
 		if (length(indices) == K1)
 			break
 	}
@@ -21,20 +29,18 @@ clusteringTask = function(indices, getCoefs, K1, nb_series_per_chunk, ncores)
 
 # Apply the clustering algorithm (PAM) on a coeffs or distances matrix
 computeClusters1 = function(coefs, K1)
-	indices[ cluster::pam(coefs, K1, diss=FALSE)$id.med ]
+	cluster::pam(coefs, K1, diss=FALSE)$id.med
 
 # Cluster a chunk of series inside one task (~max nb_series_per_chunk)
 computeClusters2 = function(medoids, K2, getRefSeries, nb_series_per_chunk)
 {
 	synchrones = computeSynchrones(medoids, getRefSeries, nb_series_per_chunk)
-	cluster::pam(computeWerDists(synchrones), K2, diss=TRUE)$medoids
+	medoids[ cluster::pam(computeWerDists(synchrones), K2, diss=TRUE)$medoids , ]
 }
 
 # Compute the synchrones curves (sum of clusters elements) from a clustering result
 computeSynchrones = function(medoids, getRefSeries, nb_series_per_chunk)
 {
-	#les getSeries(indices) sont les medoides --> init vect nul pour chacun, puis incr avec les
-	#courbes (getSeriesForSynchrones) les plus proches... --> au sens de la norme L2 ?
 	K = nrow(medoids)
 	synchrones = matrix(0, nrow=K, ncol=ncol(medoids))
 	counts = rep(0,K)
@@ -48,18 +54,20 @@ computeSynchrones = function(medoids, getRefSeries, nb_series_per_chunk)
 		#get medoids indices for this chunk of series
 		for (i in seq_len(nrow(ref_series)))
 		{
-			j = which.min( rowSums( sweep(medoids, 2, series[i,], '-')^2 ) )
-			synchrones[j,] = synchrones[j,] + series[i,]
+			j = which.min( rowSums( sweep(medoids, 2, ref_series[i,], '-')^2 ) )
+			synchrones[j,] = synchrones[j,] + ref_series[i,]
 			counts[j] = counts[j] + 1
 		}
 		index = index + nb_series_per_chunk
 	}
 	#NOTE: odds for some clusters to be empty? (when series already come from stage 2)
-	sweep(synchrones, 1, counts, '/')
+	#      ...maybe; but let's hope resulting K1' be still quite bigger than K2
+	synchrones = sweep(synchrones, 1, counts, '/')
+	synchrones[ sapply(seq_len(K), function(i) all(!is.nan(synchrones[i,]))) , ]
 }
 
 # Compute the WER distance between the synchrones curves (in rows)
-computeWerDist = function(curves)
+computeWerDists = function(curves)
 {
 	if (!require("Rwave", quietly=TRUE))
 		stop("Unable to load Rwave library")
diff --git a/epclust/R/de_serialize.R b/epclust/R/de_serialize.R
index 682db53..242e23a 100644
--- a/epclust/R/de_serialize.R
+++ b/epclust/R/de_serialize.R
@@ -62,12 +62,16 @@ serialize = function(data_ascii, data_bin_file, nb_per_chunk,
 getDataInFile = function(indices, data_bin_file, nbytes=4, endian=.Platform$endian)
 {
 	data_bin = file(data_bin_file, "rb")
+	data_size = file.info(data_bin)$size
 	data_length = readBin(data_bin, "integer", 1, 8, endian)
 	#Ou t(sapply(...)) (+ rapide ?)
 	data_ascii = do.call( rbind, lapply( indices, function(i) {
-		ignored = seek(data_bin, 8+((i-1)*data_length*nbytes))
+		offset = 8+(i-1)*data_length*nbytes
+		if (offset > data_size)
+			return (vector("double",0))
+		ignored = seek(data_bin, offset)
 		readBin(data_bin, "double", n=data_length, size=nbytes)
 	} ) )
 	close(data_bin)
-	data_ascii
+	if (ncol(data_ascii)>0) data_ascii else NULL
 }
diff --git a/epclust/R/main.R b/epclust/R/main.R
index 280cc17..5e47f19 100644
--- a/epclust/R/main.R
+++ b/epclust/R/main.R
@@ -1,34 +1,38 @@
-#' @include utils.R
+#' @include de_serialize.R
 #' @include clustering.R
 NULL
 
-#' Cluster power curves with PAM in parallel CLAWS: CLustering with wAvelets and Wer distanceS
+#' CLAWS: CLustering with wAvelets and Wer distanceS
 #'
 #' Groups electricity power curves (or any series of similar nature) by applying PAM
-#' algorithm in parallel to chunks of size \code{nb_series_per_chunk}
+#' algorithm in parallel to chunks of size \code{nb_series_per_chunk}. Input series
+#' must be sampled on the same time grid, no missing values.
 #'
-#' @param data Access to the data, which can be of one of the three following types:
-#' \itemize{
-#'   \item data.frame: each line contains its ID in the first cell, and all values after
-#'   \item connection: any R connection object (e.g. a file) providing lines as described above
-#'   \item function: a custom way to retrieve the curves; it has two arguments: the ranks to be
-#'     retrieved, and the IDs - at least one of them must be present (priority: ranks).
-#' }
+#' @param getSeries Access to the (time-)series, which can be of one of the three
+#'   following types:
+#'   \itemize{
+#'     \item matrix: each line contains all the values for one time-serie, ordered by time
+#'     \item connection: any R connection object (e.g. a file) providing lines as described above
+#'     \item function: a custom way to retrieve the curves; it has only one argument:
+#'       the indices of the series to be retrieved. See examples
+#'   }
 #' @param K1 Number of super-consumers to be found after stage 1 (K1 << N)
 #' @param K2 Number of clusters to be found after stage 2 (K2 << K1)
+#' @param random TRUE (default) for random chunks repartition
+#' @param wf Wavelet transform filter; see ?wavelets::wt.filter. Default: haar
+#' @param WER "end" to apply stage 2 after stage 1 has fully iterated, or "mix" to apply stage 2
+#'   at the end of each task
 #' @param ntasks Number of tasks (parallel iterations to obtain K1 medoids); default: 1.
 #'   Note: ntasks << N, so that N is "roughly divisible" by N (number of series)
-#' @param nb_series_per_chunk (Maximum) number of series in each group, inside a task
-#' @param min_series_per_chunk Minimum number of series in each group
-#' @param wf Wavelet transform filter; see ?wt.filter. Default: haar
-#' @param WER "end" to apply stage 2 after stage 1 has iterated and finished, or "mix"
-#'   to apply it after every stage 1
 #' @param ncores_tasks "MPI" number of parallel tasks (1 to disable: sequential tasks)
 #' @param ncores_clust "OpenMP" number of parallel clusterings in one task
-#' @param random Randomize chunks repartition
-#' @param ... Other arguments to be passed to \code{data} function
+#' @param nb_series_per_chunk (~Maximum) number of series in each group, inside a task
+#' @param min_series_per_chunk Minimum number of series in each group
+#' @param sep Separator in CSV input file (relevant only if getSeries is a file name)
+#' @param nbytes Number of bytes to serialize a floating-point number; 4 or 8
+#' @param endian Endianness to use for (de)serialization. Use "little" or "big" for portability
 #'
-#' @return A data.frame of the final medoids curves (identifiers + values)
+#' @return A matrix of the final medoids curves
 #'
 #' @examples
 #' getData = function(start, n) {
@@ -95,10 +99,10 @@ claws = function(getSeries, K1, K2,
 		series = getSeries((index-1)+seq_len(nb_series_per_chunk))
 		if (is.null(series))
 			break
-		coeffs_chunk = curvesToCoeffs(series, wf)
-		serialize(coeffs_chunk, coefs_file, nb_series_per_chunk, sep, nbytes, endian)
+		coefs_chunk = curvesToCoefs(series, wf)
+		serialize(coefs_chunk, coefs_file, nb_series_per_chunk, sep, nbytes, endian)
 		index = index + nb_series_per_chunk
-		nb_curves = nb_curves + nrow(coeffs_chunk)
+		nb_curves = nb_curves + nrow(coefs_chunk)
 	}
 	getCoefs = function(indices) getDataInFile(indices, coefs_file, nbytes, endian)
 
@@ -129,7 +133,7 @@ claws = function(getSeries, K1, K2,
 	}) )
 	parallel::stopCluster(cl)
 
-	getSeriesForSynchrones = getSeries
+	getRefSeries = getSeries
 	synchrones_file = paste(bin_dir,"synchrones",sep="") ; unlink(synchrones_file)
 	if (WER=="mix")
 	{
@@ -144,8 +148,8 @@ claws = function(getSeries, K1, K2,
 			series = getSeries((index-1)+seq_len(nb_series_per_chunk))
 			if (is.null(series))
 				break
-			coeffs_chunk = curvesToCoeffs(series, wf)
-			serialize(coeffs_chunk, coefs_file, nb_series_per_chunk, sep, nbytes, endian)
+			coefs_chunk = curvesToCoefs(series, wf)
+			serialize(coefs_chunk, coefs_file, nb_series_per_chunk, sep, nbytes, endian)
 			index = index + nb_series_per_chunk
 		}
 	}
@@ -153,20 +157,20 @@ claws = function(getSeries, K1, K2,
 	# Run step2 on resulting indices or series (from file)
 	indices_medoids = clusteringTask(
 		indices, getCoefs, K1, nb_series_per_chunk, ncores_tasks*ncores_clust)
-	computeClusters2(getSeries(indices_medoids),K2,getSeriesForSynchrones,nb_series_per_chunk)
+	computeClusters2(getSeries(indices_medoids),K2,getRefSeries,nb_series_per_chunk)
 }
 
 # helper
-curvesToCoeffs = function(series, wf)
+curvesToCoefs = function(series, wf)
 {
 	L = length(series[1,])
 	D = ceiling( log2(L) )
 	nb_sample_points = 2^D
-	apply(series, 1, function(x) {
+	t( apply(series, 1, function(x) {
 		interpolated_curve = spline(1:L, x, n=nb_sample_points)$y
 		W = wavelets::dwt(interpolated_curve, filter=wf, D)@W
 		rev( sapply( W, function(v) ( sqrt( sum(v^2) ) ) ) )
-	})
+	}) )
 }
 
 # helper
diff --git a/epclust/data/TODO_example.RData b/epclust/data/TODO_example.RData
deleted file mode 100644
index e69de29..0000000
diff --git a/epclust/inst/testdata/TODO_clusteringInput.csv b/epclust/inst/testdata/TODO_clusteringInput.csv
deleted file mode 100644
index e69de29..0000000
diff --git a/epclust/tests/testthat/test.clustering.R b/epclust/tests/testthat/test.clustering.R
index 527f6bd..a4d59d9 100644
--- a/epclust/tests/testthat/test.clustering.R
+++ b/epclust/tests/testthat/test.clustering.R
@@ -1,25 +1,140 @@
 context("clustering")
 
-#TODO: load some dataset ASCII CSV
-#data_bin_file <<- "/tmp/epclust_test.bin"
-#unlink(data_bin_file)
+#shorthand: map 1->1, 2->2, 3->3, 4->1, ..., 149->2, 150->3, ... (is base==3)
+I = function(i, base)
+	(i-1) %% base + 1
 
 test_that("computeClusters1 behave as expected",
 {
+	require("MASS", quietly=TRUE)
+	require("clue", quietly=TRUE)
 
+	# 3 gaussian clusters, 300 items; and then 7 gaussian clusters, 490 items
+	n = 300
+	d = 5
+	K = 3
+	for (ndK in list( c(300,5,3), c(490,10,7) ))
+	{
+		n = ndK[1] ; d = ndK[2] ; K = ndK[3]
+		cs = n/K #cluster size
+		Id = diag(d)
+		coefs = do.call(rbind,
+			lapply(1:K, function(i) MASS::mvrnorm(cs, c(rep(0,(i-1)),5,rep(0,d-i)), Id)))
+		indices_medoids = computeClusters1(coefs, K)
+		# Get coefs assignments (to medoids)
+		assignment = sapply(seq_len(n), function(i)
+			which.min( rowSums( sweep(coefs[indices_medoids,],2,coefs[i,],'-')^2 ) ) )
+		for (i in 1:K)
+			expect_equal(sum(assignment==i), cs, tolerance=5)
+
+		costs_matrix = matrix(nrow=K,ncol=K)
+		for (i in 1:K)
+		{
+			for (j in 1:K)
+			{
+				# assign i (in result) to j (order 1,2,3)
+				costs_matrix[i,j] = abs( mean(assignment[((i-1)*cs+1):(i*cs)]) - j )
+			}
+		}
+		permutation = as.integer( clue::solve_LSAP(costs_matrix) )
+		for (i in 1:K)
+		{
+			expect_equal(
+				mean(assignment[((i-1)*cs+1):(i*cs)]), permutation[i], tolerance=0.05)
+		}
+	}
 })
 
 test_that("computeSynchrones behave as expected",
 {
+	n = 300
+	x = seq(0,9.5,0.1)
+	L = length(x) #96 1/4h
+	K = 3
+	s1 = cos(x)
+	s2 = sin(x)
+	s3 = c( s1[1:(L%/%2)] , s2[(L%/%2+1):L] )
+	#sum((s1-s2)^2) == 96
+	#sum((s1-s3)^2) == 58
+	#sum((s2-s3)^2) == 38
+	s = list(s1, s2, s3)
+	series = matrix(nrow=n, ncol=L)
+	for (i in seq_len(n))
+		series[i,] = s[[I(i,K)]] + rnorm(L,sd=0.01)
+	getRefSeries = function(indices) {
+		indices = indices[indices < n]
+		if (length(indices)>0) series[indices,] else NULL
+	}
+	synchrones = computeSynchrones(rbind(s1,s2,s3), getRefSeries, 100)
 
+	expect_equal(dim(synchrones), c(K,L))
+	for (i in 1:K)
+		expect_equal(synchrones[i,], s[[i]], tolerance=0.01)
 })
 
+computeDistortion = function(series, medoids)
+{
+	n = nrow(series) ; L = ncol(series)
+	distortion = 0.
+	for (i in seq_len(n))
+		distortion = distortion + min( rowSums( sweep(medoids,2,series[i,],'-')^2 ) / L )
+	distortion / n
+}
+
 test_that("computeClusters2 behave as expected",
 {
+	n = 900
+	x = seq(0,9.5,0.1)
+	L = length(x) #96 1/4h
+	K1 = 60
+	K2 = 3
+	#for (i in 1:60) {plot(x^(1+i/30)*cos(x+i),type="l",col=i,ylim=c(-50,50)); par(new=TRUE)}
+	s = lapply( seq_len(K1), function(i) x^(1+i/30)*cos(x+i) )
+	series = matrix(nrow=n, ncol=L)
+	for (i in seq_len(n))
+		series[i,] = s[[I(i,K1)]] + rnorm(L,sd=0.01)
+	getRefSeries = function(indices) {
+		indices = indices[indices < n]
+		if (length(indices)>0) series[indices,] else NULL
+	}
+	# Artificially simulate 60 medoids - perfect situation, all equal to one of the refs
+	medoids_K1 = do.call(rbind, lapply( 1:K1, function(i) s[[I(i,K1)]] ) )
+	medoids_K2 = computeClusters2(medoids_K1, K2, getRefSeries, 75)
 
+	expect_equal(dim(medoids_K2), c(K2,L))
+	# Not easy to evaluate result: at least we expect it to be better than random selection of
+	# medoids within 1...K1 (among references)
+	
+	distorGood = computeDistortion(series, medoids_K2)
+	for (i in 1:3)
+		expect_lte( distorGood, computeDistortion(series,medoids_K1[sample(1:K1, K2),]) )
 })
 
 test_that("clusteringTask + computeClusters2 behave as expected",
 {
+	n = 900
+	x = seq(0,9.5,0.1)
+	L = length(x) #96 1/4h
+	K1 = 60
+	K2 = 3
+	s = lapply( seq_len(K1), function(i) x^(1+i/30)*cos(x+i) )
+	series = matrix(nrow=n, ncol=L)
+	for (i in seq_len(n))
+		series[i,] = s[[I(i,K1)]] + rnorm(L,sd=0.01)
+	getSeries = function(indices) {
+		indices = indices[indices <= n]
+		if (length(indices)>0) series[indices,] else NULL
+	}
+	wf = "haar"
+	getCoefs = function(indices) curvesToCoefs(series[indices,],wf)
+	medoids_K1 = getSeries( clusteringTask(1:n, getCoefs, K1, 75, 4) )
+	medoids_K2 = computeClusters2(medoids_K1, K2, getSeries, 120)
 
+	expect_equal(dim(medoids_K1), c(K1,L))
+	expect_equal(dim(medoids_K2), c(K2,L))
+	# Not easy to evaluate result: at least we expect it to be better than random selection of
+	# medoids within 1...K1 (among references)
+	distorGood = computeDistortion(series, medoids_K2)
+	for (i in 1:3)
+		expect_lte( distorGood, computeDistortion(series,medoids_K1[sample(1:K1, K2),]) )
 })
-- 
2.44.0