-#' @include utils.R
+#' @include de_serialize.R
#' @include clustering.R
NULL
-#' Cluster power curves with PAM in parallel CLAWS: CLustering with wAvelets and Wer distanceS
+#' CLAWS: CLustering with wAvelets and Wer distanceS
#'
#' Groups electricity power curves (or any series of similar nature) by applying PAM
-#' algorithm in parallel to chunks of size \code{nb_series_per_chunk}
+#' algorithm in parallel to chunks of size \code{nb_series_per_chunk}. Input series
+#' must be sampled on the same time grid, no missing values.
#'
-#' @param data Access to the data, which can be of one of the three following types:
-#' \itemize{
-#' \item data.frame: each line contains its ID in the first cell, and all values after
-#' \item connection: any R connection object (e.g. a file) providing lines as described above
-#' \item function: a custom way to retrieve the curves; it has two arguments: the ranks to be
-#' retrieved, and the IDs - at least one of them must be present (priority: ranks).
-#' }
+#' @param getSeries Access to the (time-)series, which can be of one of the three
+#' following types:
+#' \itemize{
+#' \item matrix: each line contains all the values for one time-serie, ordered by time
+#' \item connection: any R connection object (e.g. a file) providing lines as described above
+#' \item function: a custom way to retrieve the curves; it has only one argument:
+#' the indices of the series to be retrieved. See examples
+#' }
#' @param K1 Number of super-consumers to be found after stage 1 (K1 << N)
#' @param K2 Number of clusters to be found after stage 2 (K2 << K1)
+#' @param random TRUE (default) for random chunks repartition
+#' @param wf Wavelet transform filter; see ?wavelets::wt.filter. Default: haar
+#' @param WER "end" to apply stage 2 after stage 1 has fully iterated, or "mix" to apply stage 2
+#' at the end of each task
#' @param ntasks Number of tasks (parallel iterations to obtain K1 medoids); default: 1.
#' Note: ntasks << N, so that N is "roughly divisible" by N (number of series)
-#' @param nb_series_per_chunk (Maximum) number of series in each group, inside a task
-#' @param min_series_per_chunk Minimum number of series in each group
-#' @param wf Wavelet transform filter; see ?wt.filter. Default: haar
-#' @param WER "end" to apply stage 2 after stage 1 has iterated and finished, or "mix"
-#' to apply it after every stage 1
#' @param ncores_tasks "MPI" number of parallel tasks (1 to disable: sequential tasks)
#' @param ncores_clust "OpenMP" number of parallel clusterings in one task
-#' @param random Randomize chunks repartition
-#' @param ... Other arguments to be passed to \code{data} function
+#' @param nb_series_per_chunk (~Maximum) number of series in each group, inside a task
+#' @param min_series_per_chunk Minimum number of series in each group
+#' @param sep Separator in CSV input file (relevant only if getSeries is a file name)
+#' @param nbytes Number of bytes to serialize a floating-point number; 4 or 8
+#' @param endian Endianness to use for (de)serialization. Use "little" or "big" for portability
#'
-#' @return A data.frame of the final medoids curves (identifiers + values)
+#' @return A matrix of the final medoids curves
#'
#' @examples
#' getData = function(start, n) {
series = getSeries((index-1)+seq_len(nb_series_per_chunk))
if (is.null(series))
break
- coeffs_chunk = curvesToCoeffs(series, wf)
- serialize(coeffs_chunk, coefs_file, nb_series_per_chunk, sep, nbytes, endian)
+ coefs_chunk = curvesToCoefs(series, wf)
+ serialize(coefs_chunk, coefs_file, nb_series_per_chunk, sep, nbytes, endian)
index = index + nb_series_per_chunk
- nb_curves = nb_curves + nrow(coeffs_chunk)
+ nb_curves = nb_curves + nrow(coefs_chunk)
}
getCoefs = function(indices) getDataInFile(indices, coefs_file, nbytes, endian)
}) )
parallel::stopCluster(cl)
- getSeriesForSynchrones = getSeries
+ getRefSeries = getSeries
synchrones_file = paste(bin_dir,"synchrones",sep="") ; unlink(synchrones_file)
if (WER=="mix")
{
series = getSeries((index-1)+seq_len(nb_series_per_chunk))
if (is.null(series))
break
- coeffs_chunk = curvesToCoeffs(series, wf)
- serialize(coeffs_chunk, coefs_file, nb_series_per_chunk, sep, nbytes, endian)
+ coefs_chunk = curvesToCoefs(series, wf)
+ serialize(coefs_chunk, coefs_file, nb_series_per_chunk, sep, nbytes, endian)
index = index + nb_series_per_chunk
}
}
# Run step2 on resulting indices or series (from file)
indices_medoids = clusteringTask(
indices, getCoefs, K1, nb_series_per_chunk, ncores_tasks*ncores_clust)
- computeClusters2(getSeries(indices_medoids),K2,getSeriesForSynchrones,nb_series_per_chunk)
+ computeClusters2(getSeries(indices_medoids),K2,getRefSeries,nb_series_per_chunk)
}
# helper
-curvesToCoeffs = function(series, wf)
+curvesToCoefs = function(series, wf)
{
L = length(series[1,])
D = ceiling( log2(L) )
nb_sample_points = 2^D
- apply(series, 1, function(x) {
+ t( apply(series, 1, function(x) {
interpolated_curve = spline(1:L, x, n=nb_sample_points)$y
W = wavelets::dwt(interpolated_curve, filter=wf, D)@W
rev( sapply( W, function(v) ( sqrt( sum(v^2) ) ) ) )
- })
+ }) )
}
# helper