/data/*
!/data/README
!/data/preprocessing/
-/data/prrprocessing/*
+/data/preprocessing/*
!/data/preprocessing/convert.c
!/data/preprocessing/Makefile
#fct qui pour deux series (ID, medoides) renvoie distance WER (Rwave ou à moi)
#transformee croisee , smoothing lissage 3 composantes , + calcul pour WER
#determiner nvoice noctave (entre octave + petit et + grand)
+
+#TODO: load some dataset ASCII CSV
+#data_bin_file <<- "/tmp/epclust_test.bin"
+#unlink(data_bin_file)
+
+#https://stat.ethz.ch/pipermail/r-help/2011-June/280133.html
+#randCov = function(d)
+#{
+# x <- matrix(rnorm(d*d), nrow=d)
+# x <- x / sqrt(rowSums(x^2))
+# x %*% t(x)
+#}
License: MIT + file LICENSE
RoxygenNote: 5.0.1
Collate:
- 'utils.R'
'clustering.R'
'de_serialize.R'
'main.R'
clusteringTask = function(indices, getCoefs, K1, nb_series_per_chunk, ncores)
{
cl = parallel::makeCluster(ncores)
+ parallel::clusterExport(cl, varlist=c("getCoefs","K1"), envir=environment())
repeat
{
- nb_workers = max( 1, round( length(indices) / nb_series_per_chunk ) )
- indices_workers = lapply(seq_len(nb_workers), function(i) {
- upper_bound = ifelse( i<nb_workers,
- min(nb_series_per_chunk*i,length(indices)), length(indices) )
- indices[(nb_series_per_chunk*(i-1)+1):upper_bound]
- })
- indices = unlist( parallel::parLapply(cl, indices_workers, function(inds)
- computeClusters1(getCoefs(inds), K1)) )
+ nb_workers = max( 1, floor( length(indices) / nb_series_per_chunk ) )
+ indices_workers = lapply( seq_len(nb_workers), function(i)
+ indices[(nb_series_per_chunk*(i-1)+1):(nb_series_per_chunk*i)] )
+ # Spread the remaining load among the workers
+ rem = length(indices) %% nb_series_per_chunk
+ while (rem > 0)
+ {
+ index = rem%%nb_workers + 1
+ indices_workers[[index]] = c(indices_workers[[index]], tail(indices,rem))
+ rem = rem - 1
+ }
+ indices = unlist( parallel::parLapply( cl, indices_workers, function(inds) {
+ require("epclust", quietly=TRUE)
+ inds[ computeClusters1(getCoefs(inds), K1) ]
+ } ) )
if (length(indices) == K1)
break
}
# Apply the clustering algorithm (PAM) on a coeffs or distances matrix
computeClusters1 = function(coefs, K1)
- indices[ cluster::pam(coefs, K1, diss=FALSE)$id.med ]
+ cluster::pam(coefs, K1, diss=FALSE)$id.med
# Cluster a chunk of series inside one task (~max nb_series_per_chunk)
computeClusters2 = function(medoids, K2, getRefSeries, nb_series_per_chunk)
{
synchrones = computeSynchrones(medoids, getRefSeries, nb_series_per_chunk)
- cluster::pam(computeWerDists(synchrones), K2, diss=TRUE)$medoids
+ medoids[ cluster::pam(computeWerDists(synchrones), K2, diss=TRUE)$medoids , ]
}
# Compute the synchrones curves (sum of clusters elements) from a clustering result
computeSynchrones = function(medoids, getRefSeries, nb_series_per_chunk)
{
- #les getSeries(indices) sont les medoides --> init vect nul pour chacun, puis incr avec les
- #courbes (getSeriesForSynchrones) les plus proches... --> au sens de la norme L2 ?
K = nrow(medoids)
synchrones = matrix(0, nrow=K, ncol=ncol(medoids))
counts = rep(0,K)
#get medoids indices for this chunk of series
for (i in seq_len(nrow(ref_series)))
{
- j = which.min( rowSums( sweep(medoids, 2, series[i,], '-')^2 ) )
- synchrones[j,] = synchrones[j,] + series[i,]
+ j = which.min( rowSums( sweep(medoids, 2, ref_series[i,], '-')^2 ) )
+ synchrones[j,] = synchrones[j,] + ref_series[i,]
counts[j] = counts[j] + 1
}
index = index + nb_series_per_chunk
}
#NOTE: odds for some clusters to be empty? (when series already come from stage 2)
- sweep(synchrones, 1, counts, '/')
+ # ...maybe; but let's hope resulting K1' be still quite bigger than K2
+ synchrones = sweep(synchrones, 1, counts, '/')
+ synchrones[ sapply(seq_len(K), function(i) all(!is.nan(synchrones[i,]))) , ]
}
# Compute the WER distance between the synchrones curves (in rows)
-computeWerDist = function(curves)
+computeWerDists = function(curves)
{
if (!require("Rwave", quietly=TRUE))
stop("Unable to load Rwave library")
getDataInFile = function(indices, data_bin_file, nbytes=4, endian=.Platform$endian)
{
data_bin = file(data_bin_file, "rb")
+ data_size = file.info(data_bin)$size
data_length = readBin(data_bin, "integer", 1, 8, endian)
#Ou t(sapply(...)) (+ rapide ?)
data_ascii = do.call( rbind, lapply( indices, function(i) {
- ignored = seek(data_bin, 8+((i-1)*data_length*nbytes))
+ offset = 8+(i-1)*data_length*nbytes
+ if (offset > data_size)
+ return (vector("double",0))
+ ignored = seek(data_bin, offset)
readBin(data_bin, "double", n=data_length, size=nbytes)
} ) )
close(data_bin)
- data_ascii
+ if (ncol(data_ascii)>0) data_ascii else NULL
}
-#' @include utils.R
+#' @include de_serialize.R
#' @include clustering.R
NULL
-#' Cluster power curves with PAM in parallel CLAWS: CLustering with wAvelets and Wer distanceS
+#' CLAWS: CLustering with wAvelets and Wer distanceS
#'
#' Groups electricity power curves (or any series of similar nature) by applying PAM
-#' algorithm in parallel to chunks of size \code{nb_series_per_chunk}
+#' algorithm in parallel to chunks of size \code{nb_series_per_chunk}. Input series
+#' must be sampled on the same time grid, no missing values.
#'
-#' @param data Access to the data, which can be of one of the three following types:
-#' \itemize{
-#' \item data.frame: each line contains its ID in the first cell, and all values after
-#' \item connection: any R connection object (e.g. a file) providing lines as described above
-#' \item function: a custom way to retrieve the curves; it has two arguments: the ranks to be
-#' retrieved, and the IDs - at least one of them must be present (priority: ranks).
-#' }
+#' @param getSeries Access to the (time-)series, which can be of one of the three
+#' following types:
+#' \itemize{
+#' \item matrix: each line contains all the values for one time-serie, ordered by time
+#' \item connection: any R connection object (e.g. a file) providing lines as described above
+#' \item function: a custom way to retrieve the curves; it has only one argument:
+#' the indices of the series to be retrieved. See examples
+#' }
#' @param K1 Number of super-consumers to be found after stage 1 (K1 << N)
#' @param K2 Number of clusters to be found after stage 2 (K2 << K1)
+#' @param random TRUE (default) for random chunks repartition
+#' @param wf Wavelet transform filter; see ?wavelets::wt.filter. Default: haar
+#' @param WER "end" to apply stage 2 after stage 1 has fully iterated, or "mix" to apply stage 2
+#' at the end of each task
#' @param ntasks Number of tasks (parallel iterations to obtain K1 medoids); default: 1.
#' Note: ntasks << N, so that N is "roughly divisible" by N (number of series)
-#' @param nb_series_per_chunk (Maximum) number of series in each group, inside a task
-#' @param min_series_per_chunk Minimum number of series in each group
-#' @param wf Wavelet transform filter; see ?wt.filter. Default: haar
-#' @param WER "end" to apply stage 2 after stage 1 has iterated and finished, or "mix"
-#' to apply it after every stage 1
#' @param ncores_tasks "MPI" number of parallel tasks (1 to disable: sequential tasks)
#' @param ncores_clust "OpenMP" number of parallel clusterings in one task
-#' @param random Randomize chunks repartition
-#' @param ... Other arguments to be passed to \code{data} function
+#' @param nb_series_per_chunk (~Maximum) number of series in each group, inside a task
+#' @param min_series_per_chunk Minimum number of series in each group
+#' @param sep Separator in CSV input file (relevant only if getSeries is a file name)
+#' @param nbytes Number of bytes to serialize a floating-point number; 4 or 8
+#' @param endian Endianness to use for (de)serialization. Use "little" or "big" for portability
#'
-#' @return A data.frame of the final medoids curves (identifiers + values)
+#' @return A matrix of the final medoids curves
#'
#' @examples
#' getData = function(start, n) {
series = getSeries((index-1)+seq_len(nb_series_per_chunk))
if (is.null(series))
break
- coeffs_chunk = curvesToCoeffs(series, wf)
- serialize(coeffs_chunk, coefs_file, nb_series_per_chunk, sep, nbytes, endian)
+ coefs_chunk = curvesToCoefs(series, wf)
+ serialize(coefs_chunk, coefs_file, nb_series_per_chunk, sep, nbytes, endian)
index = index + nb_series_per_chunk
- nb_curves = nb_curves + nrow(coeffs_chunk)
+ nb_curves = nb_curves + nrow(coefs_chunk)
}
getCoefs = function(indices) getDataInFile(indices, coefs_file, nbytes, endian)
}) )
parallel::stopCluster(cl)
- getSeriesForSynchrones = getSeries
+ getRefSeries = getSeries
synchrones_file = paste(bin_dir,"synchrones",sep="") ; unlink(synchrones_file)
if (WER=="mix")
{
series = getSeries((index-1)+seq_len(nb_series_per_chunk))
if (is.null(series))
break
- coeffs_chunk = curvesToCoeffs(series, wf)
- serialize(coeffs_chunk, coefs_file, nb_series_per_chunk, sep, nbytes, endian)
+ coefs_chunk = curvesToCoefs(series, wf)
+ serialize(coefs_chunk, coefs_file, nb_series_per_chunk, sep, nbytes, endian)
index = index + nb_series_per_chunk
}
}
# Run step2 on resulting indices or series (from file)
indices_medoids = clusteringTask(
indices, getCoefs, K1, nb_series_per_chunk, ncores_tasks*ncores_clust)
- computeClusters2(getSeries(indices_medoids),K2,getSeriesForSynchrones,nb_series_per_chunk)
+ computeClusters2(getSeries(indices_medoids),K2,getRefSeries,nb_series_per_chunk)
}
# helper
-curvesToCoeffs = function(series, wf)
+curvesToCoefs = function(series, wf)
{
L = length(series[1,])
D = ceiling( log2(L) )
nb_sample_points = 2^D
- apply(series, 1, function(x) {
+ t( apply(series, 1, function(x) {
interpolated_curve = spline(1:L, x, n=nb_sample_points)$y
W = wavelets::dwt(interpolated_curve, filter=wf, D)@W
rev( sapply( W, function(v) ( sqrt( sum(v^2) ) ) ) )
- })
+ }) )
}
# helper
context("clustering")
-#TODO: load some dataset ASCII CSV
-#data_bin_file <<- "/tmp/epclust_test.bin"
-#unlink(data_bin_file)
+#shorthand: map 1->1, 2->2, 3->3, 4->1, ..., 149->2, 150->3, ... (is base==3)
+I = function(i, base)
+ (i-1) %% base + 1
test_that("computeClusters1 behave as expected",
{
+ require("MASS", quietly=TRUE)
+ require("clue", quietly=TRUE)
+ # 3 gaussian clusters, 300 items; and then 7 gaussian clusters, 490 items
+ n = 300
+ d = 5
+ K = 3
+ for (ndK in list( c(300,5,3), c(490,10,7) ))
+ {
+ n = ndK[1] ; d = ndK[2] ; K = ndK[3]
+ cs = n/K #cluster size
+ Id = diag(d)
+ coefs = do.call(rbind,
+ lapply(1:K, function(i) MASS::mvrnorm(cs, c(rep(0,(i-1)),5,rep(0,d-i)), Id)))
+ indices_medoids = computeClusters1(coefs, K)
+ # Get coefs assignments (to medoids)
+ assignment = sapply(seq_len(n), function(i)
+ which.min( rowSums( sweep(coefs[indices_medoids,],2,coefs[i,],'-')^2 ) ) )
+ for (i in 1:K)
+ expect_equal(sum(assignment==i), cs, tolerance=5)
+
+ costs_matrix = matrix(nrow=K,ncol=K)
+ for (i in 1:K)
+ {
+ for (j in 1:K)
+ {
+ # assign i (in result) to j (order 1,2,3)
+ costs_matrix[i,j] = abs( mean(assignment[((i-1)*cs+1):(i*cs)]) - j )
+ }
+ }
+ permutation = as.integer( clue::solve_LSAP(costs_matrix) )
+ for (i in 1:K)
+ {
+ expect_equal(
+ mean(assignment[((i-1)*cs+1):(i*cs)]), permutation[i], tolerance=0.05)
+ }
+ }
})
test_that("computeSynchrones behave as expected",
{
+ n = 300
+ x = seq(0,9.5,0.1)
+ L = length(x) #96 1/4h
+ K = 3
+ s1 = cos(x)
+ s2 = sin(x)
+ s3 = c( s1[1:(L%/%2)] , s2[(L%/%2+1):L] )
+ #sum((s1-s2)^2) == 96
+ #sum((s1-s3)^2) == 58
+ #sum((s2-s3)^2) == 38
+ s = list(s1, s2, s3)
+ series = matrix(nrow=n, ncol=L)
+ for (i in seq_len(n))
+ series[i,] = s[[I(i,K)]] + rnorm(L,sd=0.01)
+ getRefSeries = function(indices) {
+ indices = indices[indices < n]
+ if (length(indices)>0) series[indices,] else NULL
+ }
+ synchrones = computeSynchrones(rbind(s1,s2,s3), getRefSeries, 100)
+ expect_equal(dim(synchrones), c(K,L))
+ for (i in 1:K)
+ expect_equal(synchrones[i,], s[[i]], tolerance=0.01)
})
+computeDistortion = function(series, medoids)
+{
+ n = nrow(series) ; L = ncol(series)
+ distortion = 0.
+ for (i in seq_len(n))
+ distortion = distortion + min( rowSums( sweep(medoids,2,series[i,],'-')^2 ) / L )
+ distortion / n
+}
+
test_that("computeClusters2 behave as expected",
{
+ n = 900
+ x = seq(0,9.5,0.1)
+ L = length(x) #96 1/4h
+ K1 = 60
+ K2 = 3
+ #for (i in 1:60) {plot(x^(1+i/30)*cos(x+i),type="l",col=i,ylim=c(-50,50)); par(new=TRUE)}
+ s = lapply( seq_len(K1), function(i) x^(1+i/30)*cos(x+i) )
+ series = matrix(nrow=n, ncol=L)
+ for (i in seq_len(n))
+ series[i,] = s[[I(i,K1)]] + rnorm(L,sd=0.01)
+ getRefSeries = function(indices) {
+ indices = indices[indices < n]
+ if (length(indices)>0) series[indices,] else NULL
+ }
+ # Artificially simulate 60 medoids - perfect situation, all equal to one of the refs
+ medoids_K1 = do.call(rbind, lapply( 1:K1, function(i) s[[I(i,K1)]] ) )
+ medoids_K2 = computeClusters2(medoids_K1, K2, getRefSeries, 75)
+ expect_equal(dim(medoids_K2), c(K2,L))
+ # Not easy to evaluate result: at least we expect it to be better than random selection of
+ # medoids within 1...K1 (among references)
+
+ distorGood = computeDistortion(series, medoids_K2)
+ for (i in 1:3)
+ expect_lte( distorGood, computeDistortion(series,medoids_K1[sample(1:K1, K2),]) )
})
test_that("clusteringTask + computeClusters2 behave as expected",
{
+ n = 900
+ x = seq(0,9.5,0.1)
+ L = length(x) #96 1/4h
+ K1 = 60
+ K2 = 3
+ s = lapply( seq_len(K1), function(i) x^(1+i/30)*cos(x+i) )
+ series = matrix(nrow=n, ncol=L)
+ for (i in seq_len(n))
+ series[i,] = s[[I(i,K1)]] + rnorm(L,sd=0.01)
+ getSeries = function(indices) {
+ indices = indices[indices <= n]
+ if (length(indices)>0) series[indices,] else NULL
+ }
+ wf = "haar"
+ getCoefs = function(indices) curvesToCoefs(series[indices,],wf)
+ medoids_K1 = getSeries( clusteringTask(1:n, getCoefs, K1, 75, 4) )
+ medoids_K2 = computeClusters2(medoids_K1, K2, getSeries, 120)
+ expect_equal(dim(medoids_K1), c(K1,L))
+ expect_equal(dim(medoids_K2), c(K2,L))
+ # Not easy to evaluate result: at least we expect it to be better than random selection of
+ # medoids within 1...K1 (among references)
+ distorGood = computeDistortion(series, medoids_K2)
+ for (i in 1:3)
+ expect_lte( distorGood, computeDistortion(series,medoids_K1[sample(1:K1, K2),]) )
})