Fix unit tests
authorBenjamin Auder <benjamin.auder@somewhere>
Mon, 6 Mar 2017 19:18:22 +0000 (20:18 +0100)
committerBenjamin Auder <benjamin.auder@somewhere>
Mon, 6 Mar 2017 19:18:22 +0000 (20:18 +0100)
.gitignore
TODO
epclust/DESCRIPTION
epclust/R/clustering.R
epclust/R/de_serialize.R
epclust/R/main.R
epclust/data/TODO_example.RData [deleted file]
epclust/inst/testdata/TODO_clusteringInput.csv [deleted file]
epclust/tests/testthat/test.clustering.R

index 8db5c77..e735132 100644 (file)
@@ -2,7 +2,7 @@
 /data/*
 !/data/README
 !/data/preprocessing/
-/data/prrprocessing/*
+/data/preprocessing/*
 !/data/preprocessing/convert.c
 !/data/preprocessing/Makefile
 
diff --git a/TODO b/TODO
index f5e0015..275c10d 100644 (file)
--- a/TODO
+++ b/TODO
@@ -26,3 +26,15 @@ utiliser Rcpp ?
 #fct qui pour deux series (ID, medoides) renvoie distance WER (Rwave ou à moi)
 #transformee croisee , smoothing lissage 3 composantes , + calcul pour WER
 #determiner nvoice noctave (entre octave + petit et + grand)
+
+#TODO: load some dataset ASCII CSV
+#data_bin_file <<- "/tmp/epclust_test.bin"
+#unlink(data_bin_file)
+
+#https://stat.ethz.ch/pipermail/r-help/2011-June/280133.html
+#randCov = function(d)
+#{
+#      x <- matrix(rnorm(d*d), nrow=d)
+#      x <- x / sqrt(rowSums(x^2))
+#      x %*% t(x)
+#}
index c607000..030e944 100644 (file)
@@ -23,7 +23,6 @@ Suggests:
 License: MIT + file LICENSE
 RoxygenNote: 5.0.1
 Collate:
-    'utils.R'
     'clustering.R'
     'de_serialize.R'
     'main.R'
index fce1b1c..493f90f 100644 (file)
@@ -2,16 +2,24 @@
 clusteringTask = function(indices, getCoefs, K1, nb_series_per_chunk, ncores)
 {
        cl = parallel::makeCluster(ncores)
+       parallel::clusterExport(cl, varlist=c("getCoefs","K1"), envir=environment())
        repeat
        {
-               nb_workers = max( 1, round( length(indices) / nb_series_per_chunk ) )
-               indices_workers = lapply(seq_len(nb_workers), function(i) {
-                       upper_bound = ifelse( i<nb_workers,
-                               min(nb_series_per_chunk*i,length(indices)), length(indices) )
-                       indices[(nb_series_per_chunk*(i-1)+1):upper_bound]
-               })
-               indices = unlist( parallel::parLapply(cl, indices_workers, function(inds)
-                       computeClusters1(getCoefs(inds), K1)) )
+               nb_workers = max( 1, floor( length(indices) / nb_series_per_chunk ) )
+               indices_workers = lapply( seq_len(nb_workers), function(i)
+                       indices[(nb_series_per_chunk*(i-1)+1):(nb_series_per_chunk*i)] )
+               # Spread the remaining load among the workers
+               rem = length(indices) %% nb_series_per_chunk
+               while (rem > 0)
+               {
+                       index = rem%%nb_workers + 1
+                       indices_workers[[index]] = c(indices_workers[[index]], tail(indices,rem))
+                       rem = rem - 1
+               }
+               indices = unlist( parallel::parLapply( cl, indices_workers, function(inds) {
+                       require("epclust", quietly=TRUE)
+                       inds[ computeClusters1(getCoefs(inds), K1) ]
+               } ) )
                if (length(indices) == K1)
                        break
        }
@@ -21,20 +29,18 @@ clusteringTask = function(indices, getCoefs, K1, nb_series_per_chunk, ncores)
 
 # Apply the clustering algorithm (PAM) on a coeffs or distances matrix
 computeClusters1 = function(coefs, K1)
-       indices[ cluster::pam(coefs, K1, diss=FALSE)$id.med ]
+       cluster::pam(coefs, K1, diss=FALSE)$id.med
 
 # Cluster a chunk of series inside one task (~max nb_series_per_chunk)
 computeClusters2 = function(medoids, K2, getRefSeries, nb_series_per_chunk)
 {
        synchrones = computeSynchrones(medoids, getRefSeries, nb_series_per_chunk)
-       cluster::pam(computeWerDists(synchrones), K2, diss=TRUE)$medoids
+       medoids[ cluster::pam(computeWerDists(synchrones), K2, diss=TRUE)$medoids , ]
 }
 
 # Compute the synchrones curves (sum of clusters elements) from a clustering result
 computeSynchrones = function(medoids, getRefSeries, nb_series_per_chunk)
 {
-       #les getSeries(indices) sont les medoides --> init vect nul pour chacun, puis incr avec les
-       #courbes (getSeriesForSynchrones) les plus proches... --> au sens de la norme L2 ?
        K = nrow(medoids)
        synchrones = matrix(0, nrow=K, ncol=ncol(medoids))
        counts = rep(0,K)
@@ -48,18 +54,20 @@ computeSynchrones = function(medoids, getRefSeries, nb_series_per_chunk)
                #get medoids indices for this chunk of series
                for (i in seq_len(nrow(ref_series)))
                {
-                       j = which.min( rowSums( sweep(medoids, 2, series[i,], '-')^2 ) )
-                       synchrones[j,] = synchrones[j,] + series[i,]
+                       j = which.min( rowSums( sweep(medoids, 2, ref_series[i,], '-')^2 ) )
+                       synchrones[j,] = synchrones[j,] + ref_series[i,]
                        counts[j] = counts[j] + 1
                }
                index = index + nb_series_per_chunk
        }
        #NOTE: odds for some clusters to be empty? (when series already come from stage 2)
-       sweep(synchrones, 1, counts, '/')
+       #      ...maybe; but let's hope resulting K1' be still quite bigger than K2
+       synchrones = sweep(synchrones, 1, counts, '/')
+       synchrones[ sapply(seq_len(K), function(i) all(!is.nan(synchrones[i,]))) , ]
 }
 
 # Compute the WER distance between the synchrones curves (in rows)
-computeWerDist = function(curves)
+computeWerDists = function(curves)
 {
        if (!require("Rwave", quietly=TRUE))
                stop("Unable to load Rwave library")
index 682db53..242e23a 100644 (file)
@@ -62,12 +62,16 @@ serialize = function(data_ascii, data_bin_file, nb_per_chunk,
 getDataInFile = function(indices, data_bin_file, nbytes=4, endian=.Platform$endian)
 {
        data_bin = file(data_bin_file, "rb")
+       data_size = file.info(data_bin)$size
        data_length = readBin(data_bin, "integer", 1, 8, endian)
        #Ou t(sapply(...)) (+ rapide ?)
        data_ascii = do.call( rbind, lapply( indices, function(i) {
-               ignored = seek(data_bin, 8+((i-1)*data_length*nbytes))
+               offset = 8+(i-1)*data_length*nbytes
+               if (offset > data_size)
+                       return (vector("double",0))
+               ignored = seek(data_bin, offset)
                readBin(data_bin, "double", n=data_length, size=nbytes)
        } ) )
        close(data_bin)
-       data_ascii
+       if (ncol(data_ascii)>0) data_ascii else NULL
 }
index 280cc17..5e47f19 100644 (file)
@@ -1,34 +1,38 @@
-#' @include utils.R
+#' @include de_serialize.R
 #' @include clustering.R
 NULL
 
-#' Cluster power curves with PAM in parallel CLAWS: CLustering with wAvelets and Wer distanceS
+#' CLAWS: CLustering with wAvelets and Wer distanceS
 #'
 #' Groups electricity power curves (or any series of similar nature) by applying PAM
-#' algorithm in parallel to chunks of size \code{nb_series_per_chunk}
+#' algorithm in parallel to chunks of size \code{nb_series_per_chunk}. Input series
+#' must be sampled on the same time grid, no missing values.
 #'
-#' @param data Access to the data, which can be of one of the three following types:
-#' \itemize{
-#'   \item data.frame: each line contains its ID in the first cell, and all values after
-#'   \item connection: any R connection object (e.g. a file) providing lines as described above
-#'   \item function: a custom way to retrieve the curves; it has two arguments: the ranks to be
-#'     retrieved, and the IDs - at least one of them must be present (priority: ranks).
-#' }
+#' @param getSeries Access to the (time-)series, which can be of one of the three
+#'   following types:
+#'   \itemize{
+#'     \item matrix: each line contains all the values for one time-serie, ordered by time
+#'     \item connection: any R connection object (e.g. a file) providing lines as described above
+#'     \item function: a custom way to retrieve the curves; it has only one argument:
+#'       the indices of the series to be retrieved. See examples
+#'   }
 #' @param K1 Number of super-consumers to be found after stage 1 (K1 << N)
 #' @param K2 Number of clusters to be found after stage 2 (K2 << K1)
+#' @param random TRUE (default) for random chunks repartition
+#' @param wf Wavelet transform filter; see ?wavelets::wt.filter. Default: haar
+#' @param WER "end" to apply stage 2 after stage 1 has fully iterated, or "mix" to apply stage 2
+#'   at the end of each task
 #' @param ntasks Number of tasks (parallel iterations to obtain K1 medoids); default: 1.
 #'   Note: ntasks << N, so that N is "roughly divisible" by N (number of series)
-#' @param nb_series_per_chunk (Maximum) number of series in each group, inside a task
-#' @param min_series_per_chunk Minimum number of series in each group
-#' @param wf Wavelet transform filter; see ?wt.filter. Default: haar
-#' @param WER "end" to apply stage 2 after stage 1 has iterated and finished, or "mix"
-#'   to apply it after every stage 1
 #' @param ncores_tasks "MPI" number of parallel tasks (1 to disable: sequential tasks)
 #' @param ncores_clust "OpenMP" number of parallel clusterings in one task
-#' @param random Randomize chunks repartition
-#' @param ... Other arguments to be passed to \code{data} function
+#' @param nb_series_per_chunk (~Maximum) number of series in each group, inside a task
+#' @param min_series_per_chunk Minimum number of series in each group
+#' @param sep Separator in CSV input file (relevant only if getSeries is a file name)
+#' @param nbytes Number of bytes to serialize a floating-point number; 4 or 8
+#' @param endian Endianness to use for (de)serialization. Use "little" or "big" for portability
 #'
-#' @return A data.frame of the final medoids curves (identifiers + values)
+#' @return A matrix of the final medoids curves
 #'
 #' @examples
 #' getData = function(start, n) {
@@ -95,10 +99,10 @@ claws = function(getSeries, K1, K2,
                series = getSeries((index-1)+seq_len(nb_series_per_chunk))
                if (is.null(series))
                        break
-               coeffs_chunk = curvesToCoeffs(series, wf)
-               serialize(coeffs_chunk, coefs_file, nb_series_per_chunk, sep, nbytes, endian)
+               coefs_chunk = curvesToCoefs(series, wf)
+               serialize(coefs_chunk, coefs_file, nb_series_per_chunk, sep, nbytes, endian)
                index = index + nb_series_per_chunk
-               nb_curves = nb_curves + nrow(coeffs_chunk)
+               nb_curves = nb_curves + nrow(coefs_chunk)
        }
        getCoefs = function(indices) getDataInFile(indices, coefs_file, nbytes, endian)
 
@@ -129,7 +133,7 @@ claws = function(getSeries, K1, K2,
        }) )
        parallel::stopCluster(cl)
 
-       getSeriesForSynchrones = getSeries
+       getRefSeries = getSeries
        synchrones_file = paste(bin_dir,"synchrones",sep="") ; unlink(synchrones_file)
        if (WER=="mix")
        {
@@ -144,8 +148,8 @@ claws = function(getSeries, K1, K2,
                        series = getSeries((index-1)+seq_len(nb_series_per_chunk))
                        if (is.null(series))
                                break
-                       coeffs_chunk = curvesToCoeffs(series, wf)
-                       serialize(coeffs_chunk, coefs_file, nb_series_per_chunk, sep, nbytes, endian)
+                       coefs_chunk = curvesToCoefs(series, wf)
+                       serialize(coefs_chunk, coefs_file, nb_series_per_chunk, sep, nbytes, endian)
                        index = index + nb_series_per_chunk
                }
        }
@@ -153,20 +157,20 @@ claws = function(getSeries, K1, K2,
        # Run step2 on resulting indices or series (from file)
        indices_medoids = clusteringTask(
                indices, getCoefs, K1, nb_series_per_chunk, ncores_tasks*ncores_clust)
-       computeClusters2(getSeries(indices_medoids),K2,getSeriesForSynchrones,nb_series_per_chunk)
+       computeClusters2(getSeries(indices_medoids),K2,getRefSeries,nb_series_per_chunk)
 }
 
 # helper
-curvesToCoeffs = function(series, wf)
+curvesToCoefs = function(series, wf)
 {
        L = length(series[1,])
        D = ceiling( log2(L) )
        nb_sample_points = 2^D
-       apply(series, 1, function(x) {
+       t( apply(series, 1, function(x) {
                interpolated_curve = spline(1:L, x, n=nb_sample_points)$y
                W = wavelets::dwt(interpolated_curve, filter=wf, D)@W
                rev( sapply( W, function(v) ( sqrt( sum(v^2) ) ) ) )
-       })
+       }) )
 }
 
 # helper
diff --git a/epclust/data/TODO_example.RData b/epclust/data/TODO_example.RData
deleted file mode 100644 (file)
index e69de29..0000000
diff --git a/epclust/inst/testdata/TODO_clusteringInput.csv b/epclust/inst/testdata/TODO_clusteringInput.csv
deleted file mode 100644 (file)
index e69de29..0000000
index 527f6bd..a4d59d9 100644 (file)
 context("clustering")
 
-#TODO: load some dataset ASCII CSV
-#data_bin_file <<- "/tmp/epclust_test.bin"
-#unlink(data_bin_file)
+#shorthand: map 1->1, 2->2, 3->3, 4->1, ..., 149->2, 150->3, ... (is base==3)
+I = function(i, base)
+       (i-1) %% base + 1
 
 test_that("computeClusters1 behave as expected",
 {
+       require("MASS", quietly=TRUE)
+       require("clue", quietly=TRUE)
 
+       # 3 gaussian clusters, 300 items; and then 7 gaussian clusters, 490 items
+       n = 300
+       d = 5
+       K = 3
+       for (ndK in list( c(300,5,3), c(490,10,7) ))
+       {
+               n = ndK[1] ; d = ndK[2] ; K = ndK[3]
+               cs = n/K #cluster size
+               Id = diag(d)
+               coefs = do.call(rbind,
+                       lapply(1:K, function(i) MASS::mvrnorm(cs, c(rep(0,(i-1)),5,rep(0,d-i)), Id)))
+               indices_medoids = computeClusters1(coefs, K)
+               # Get coefs assignments (to medoids)
+               assignment = sapply(seq_len(n), function(i)
+                       which.min( rowSums( sweep(coefs[indices_medoids,],2,coefs[i,],'-')^2 ) ) )
+               for (i in 1:K)
+                       expect_equal(sum(assignment==i), cs, tolerance=5)
+
+               costs_matrix = matrix(nrow=K,ncol=K)
+               for (i in 1:K)
+               {
+                       for (j in 1:K)
+                       {
+                               # assign i (in result) to j (order 1,2,3)
+                               costs_matrix[i,j] = abs( mean(assignment[((i-1)*cs+1):(i*cs)]) - j )
+                       }
+               }
+               permutation = as.integer( clue::solve_LSAP(costs_matrix) )
+               for (i in 1:K)
+               {
+                       expect_equal(
+                               mean(assignment[((i-1)*cs+1):(i*cs)]), permutation[i], tolerance=0.05)
+               }
+       }
 })
 
 test_that("computeSynchrones behave as expected",
 {
+       n = 300
+       x = seq(0,9.5,0.1)
+       L = length(x) #96 1/4h
+       K = 3
+       s1 = cos(x)
+       s2 = sin(x)
+       s3 = c( s1[1:(L%/%2)] , s2[(L%/%2+1):L] )
+       #sum((s1-s2)^2) == 96
+       #sum((s1-s3)^2) == 58
+       #sum((s2-s3)^2) == 38
+       s = list(s1, s2, s3)
+       series = matrix(nrow=n, ncol=L)
+       for (i in seq_len(n))
+               series[i,] = s[[I(i,K)]] + rnorm(L,sd=0.01)
+       getRefSeries = function(indices) {
+               indices = indices[indices < n]
+               if (length(indices)>0) series[indices,] else NULL
+       }
+       synchrones = computeSynchrones(rbind(s1,s2,s3), getRefSeries, 100)
 
+       expect_equal(dim(synchrones), c(K,L))
+       for (i in 1:K)
+               expect_equal(synchrones[i,], s[[i]], tolerance=0.01)
 })
 
+computeDistortion = function(series, medoids)
+{
+       n = nrow(series) ; L = ncol(series)
+       distortion = 0.
+       for (i in seq_len(n))
+               distortion = distortion + min( rowSums( sweep(medoids,2,series[i,],'-')^2 ) / L )
+       distortion / n
+}
+
 test_that("computeClusters2 behave as expected",
 {
+       n = 900
+       x = seq(0,9.5,0.1)
+       L = length(x) #96 1/4h
+       K1 = 60
+       K2 = 3
+       #for (i in 1:60) {plot(x^(1+i/30)*cos(x+i),type="l",col=i,ylim=c(-50,50)); par(new=TRUE)}
+       s = lapply( seq_len(K1), function(i) x^(1+i/30)*cos(x+i) )
+       series = matrix(nrow=n, ncol=L)
+       for (i in seq_len(n))
+               series[i,] = s[[I(i,K1)]] + rnorm(L,sd=0.01)
+       getRefSeries = function(indices) {
+               indices = indices[indices < n]
+               if (length(indices)>0) series[indices,] else NULL
+       }
+       # Artificially simulate 60 medoids - perfect situation, all equal to one of the refs
+       medoids_K1 = do.call(rbind, lapply( 1:K1, function(i) s[[I(i,K1)]] ) )
+       medoids_K2 = computeClusters2(medoids_K1, K2, getRefSeries, 75)
 
+       expect_equal(dim(medoids_K2), c(K2,L))
+       # Not easy to evaluate result: at least we expect it to be better than random selection of
+       # medoids within 1...K1 (among references)
+       
+       distorGood = computeDistortion(series, medoids_K2)
+       for (i in 1:3)
+               expect_lte( distorGood, computeDistortion(series,medoids_K1[sample(1:K1, K2),]) )
 })
 
 test_that("clusteringTask + computeClusters2 behave as expected",
 {
+       n = 900
+       x = seq(0,9.5,0.1)
+       L = length(x) #96 1/4h
+       K1 = 60
+       K2 = 3
+       s = lapply( seq_len(K1), function(i) x^(1+i/30)*cos(x+i) )
+       series = matrix(nrow=n, ncol=L)
+       for (i in seq_len(n))
+               series[i,] = s[[I(i,K1)]] + rnorm(L,sd=0.01)
+       getSeries = function(indices) {
+               indices = indices[indices <= n]
+               if (length(indices)>0) series[indices,] else NULL
+       }
+       wf = "haar"
+       getCoefs = function(indices) curvesToCoefs(series[indices,],wf)
+       medoids_K1 = getSeries( clusteringTask(1:n, getCoefs, K1, 75, 4) )
+       medoids_K2 = computeClusters2(medoids_K1, K2, getSeries, 120)
 
+       expect_equal(dim(medoids_K1), c(K1,L))
+       expect_equal(dim(medoids_K2), c(K2,L))
+       # Not easy to evaluate result: at least we expect it to be better than random selection of
+       # medoids within 1...K1 (among references)
+       distorGood = computeDistortion(series, medoids_K2)
+       for (i in 1:3)
+               expect_lte( distorGood, computeDistortion(series,medoids_K1[sample(1:K1, K2),]) )
 })