-#' @include utils.R
+#' @include de_serialize.R
#' @include clustering.R
NULL
-#' Cluster power curves with PAM in parallel CLAWS: CLustering with wAvelets and Wer distanceS
+#' CLAWS: CLustering with wAvelets and Wer distanceS
#'
#' Groups electricity power curves (or any series of similar nature) by applying PAM
-#' algorithm in parallel to chunks of size \code{nb_series_per_chunk}
+#' algorithm in parallel to chunks of size \code{nb_series_per_chunk}. Input series
+#' must be sampled on the same time grid, no missing values.
#'
-#' @param data Access to the data, which can be of one of the three following types:
-#' \itemize{
-#' \item data.frame: each line contains its ID in the first cell, and all values after
-#' \item connection: any R connection object (e.g. a file) providing lines as described above
-#' \item function: a custom way to retrieve the curves; it has two arguments: the ranks to be
-#' retrieved, and the IDs - at least one of them must be present (priority: ranks).
-#' }
+#' @param getSeries Access to the (time-)series, which can be of one of the three
+#' following types:
+#' \itemize{
+#' \item matrix: each line contains all the values for one time-serie, ordered by time
+#' \item connection: any R connection object (e.g. a file) providing lines as described above
+#' \item function: a custom way to retrieve the curves; it has only one argument:
+#' the indices of the series to be retrieved. See examples
+#' }
#' @param K1 Number of super-consumers to be found after stage 1 (K1 << N)
#' @param K2 Number of clusters to be found after stage 2 (K2 << K1)
+#' @param random TRUE (default) for random chunks repartition
+#' @param wf Wavelet transform filter; see ?wavelets::wt.filter. Default: haar
+#' @param WER "end" to apply stage 2 after stage 1 has fully iterated, or "mix" to apply stage 2
+#' at the end of each task
#' @param ntasks Number of tasks (parallel iterations to obtain K1 medoids); default: 1.
#' Note: ntasks << N, so that N is "roughly divisible" by N (number of series)
-#' @param nb_series_per_chunk (Maximum) number of series in each group, inside a task
-#' @param min_series_per_chunk Minimum number of series in each group
-#' @param wf Wavelet transform filter; see ?wt.filter. Default: haar
-#' @param WER "end" to apply stage 2 after stage 1 has iterated and finished, or "mix"
-#' to apply it after every stage 1
#' @param ncores_tasks "MPI" number of parallel tasks (1 to disable: sequential tasks)
#' @param ncores_clust "OpenMP" number of parallel clusterings in one task
-#' @param random Randomize chunks repartition
-#' @param ... Other arguments to be passed to \code{data} function
+#' @param nb_series_per_chunk (~Maximum) number of series in each group, inside a task
+#' @param min_series_per_chunk Minimum number of series in each group
+#' @param sep Separator in CSV input file (relevant only if getSeries is a file name)
+#' @param nbytes Number of bytes to serialize a floating-point number; 4 or 8
+#' @param endian Endianness to use for (de)serialization. Use "little" or "big" for portability
#'
-#' @return A data.frame of the final medoids curves (identifiers + values)
+#' @return A matrix of the final medoids curves (K2) in rows
#'
#' @examples
-#' getData = function(start, n) {
-#' con = dbConnect(drv = RSQLite::SQLite(), dbname = "mydata.sqlite")
-#' df = dbGetQuery(con, paste(
-#' "SELECT * FROM times_values GROUP BY id OFFSET ",start,
+#' \dontrun{
+#' # WER distances computations are a bit too long for CRAN (for now)
+#'
+#' # Random series around cos(x,2x,3x)/sin(x,2x,3x)
+#' x = seq(0,500,0.05)
+#' L = length(x) #10001
+#' ref_series = matrix( c(cos(x), cos(2*x), cos(3*x), sin(x), sin(2*x), sin(3*x)),
+#' byrows=TRUE, ncol=L )
+#' library(wmtsa)
+#' series = do.call( rbind, lapply( 1:6, function(i)
+#' do.call(rbind, wmtsa::wavBootstrap(ref_series[i,], n.realization=400)) ) )
+#' #dim(series) #c(2400,10001)
+#' medoids_ascii = claws(series_RData, K1=60, K2=6, wf="d8", nb_series_per_chunk=500)
+#'
+#' # Same example, from CSV file
+#' csv_file = "/tmp/epclust_series.csv"
+#' write.table(series, csv_file, sep=",", row.names=FALSE, col.names=FALSE)
+#' medoids_csv = claws(csv_file, K1=60, K2=6, wf="d8", nb_series_per_chunk=500)
+#'
+#' # Same example, from binary file
+#' bin_file = "/tmp/epclust_series.bin"
+#' nbytes = 8
+#' endian = "little"
+#' epclust::serialize(csv_file, bin_file, 500, nbytes, endian)
+#' getSeries = function(indices) getDataInFile(indices, bin_file, nbytes, endian)
+#' medoids_bin = claws(getSeries, K1=60, K2=6, wf="d8", nb_series_per_chunk=500)
+#' unlink(csv_file)
+#' unlink(bin_file)
+#'
+#' # Same example, from SQLite database
+#' library(DBI)
+#' series_db <- dbConnect(RSQLite::SQLite(), "file::memory:")
+#' # Prepare data.frame in DB-format
+#' n = nrow(series)
+#' formatted_series = data.frame(
+#' ID = rep(1:n,each=L),
+#' time = as.POSIXct(1800*(0:n),"GMT",origin="2001-01-01"),
+#' value
+
+
+
+
+#' TODO
+
+
+#' times_values = as.data.frame(series)
+#' dbWriteTable(series_db, "times_values", times_values)
+#' # NOTE: assume that DB internal data is not reorganized when computing coefficients
+#' indexToID_inDB <<- list()
+#' getSeries = function(indices) {
+#' con = dbConnect(drv = RSQLite::SQLite(), dbname = db_file)
+#' if (indices %in% indexToID_inDB)
+#' {
+#' df = dbGetQuery(con, paste(
+#' "SELECT value FROM times_values GROUP BY id OFFSET ",start,
#' "LIMIT ", n, " ORDER BY date", sep=""))
-#' return (df)
+#' return (df)
+#' }
+#' else
+#' {
+#' ...
+#' }
+#' }
+#' dbDisconnect(mydb)
#' }
-#' #####TODO: if DB, array rank --> ID at first retrieval, when computing coeffs; so:: NO use of IDs !
-#' #TODO: 3 examples, data.frame / binary file / DB sqLite
-#' + sampleCurves : wavBootstrap de package wmtsa
-#' cl = epclust(getData, K1=200, K2=15, ntasks=1000, nb_series_per_chunk=5000, WER="mix")
#' @export
claws = function(getSeries, K1, K2,
random=TRUE, #randomize series order?
series = getSeries((index-1)+seq_len(nb_series_per_chunk))
if (is.null(series))
break
- coeffs_chunk = curvesToCoeffs(series, wf)
- serialize(coeffs_chunk, coefs_file, nb_series_per_chunk, sep, nbytes, endian)
+ coefs_chunk = curvesToCoefs(series, wf)
+ serialize(coefs_chunk, coefs_file, nb_series_per_chunk, sep, nbytes, endian)
index = index + nb_series_per_chunk
- nb_curves = nb_curves + nrow(coeffs_chunk)
+ nb_curves = nb_curves + nrow(coefs_chunk)
}
getCoefs = function(indices) getDataInFile(indices, coefs_file, nbytes, endian)
cl = parallel::makeCluster(ncores_tasks)
# 1000*K1 indices [if WER=="end"], or empty vector [if WER=="mix"] --> series on file
indices = unlist( parallel::parLapply(cl, indices_tasks, function(inds) {
+ require("epclust", quietly=TRUE)
indices_medoids = clusteringTask(inds,getCoefs,K1,nb_series_per_chunk,ncores_clust)
if (WER=="mix")
{
}) )
parallel::stopCluster(cl)
- getSeriesForSynchrones = getSeries
+ getRefSeries = getSeries
synchrones_file = paste(bin_dir,"synchrones",sep="") ; unlink(synchrones_file)
if (WER=="mix")
{
series = getSeries((index-1)+seq_len(nb_series_per_chunk))
if (is.null(series))
break
- coeffs_chunk = curvesToCoeffs(series, wf)
- serialize(coeffs_chunk, coefs_file, nb_series_per_chunk, sep, nbytes, endian)
+ coefs_chunk = curvesToCoefs(series, wf)
+ serialize(coefs_chunk, coefs_file, nb_series_per_chunk, sep, nbytes, endian)
index = index + nb_series_per_chunk
}
}
# Run step2 on resulting indices or series (from file)
indices_medoids = clusteringTask(
indices, getCoefs, K1, nb_series_per_chunk, ncores_tasks*ncores_clust)
- computeClusters2(getSeries(indices_medoids),K2,getSeriesForSynchrones,nb_series_per_chunk)
+ computeClusters2(getSeries(indices_medoids),K2,getRefSeries,nb_series_per_chunk)
}
# helper
-curvesToCoeffs = function(series, wf)
+curvesToCoefs = function(series, wf)
{
L = length(series[1,])
D = ceiling( log2(L) )
nb_sample_points = 2^D
- apply(series, 1, function(x) {
+ t( apply(series, 1, function(x) {
interpolated_curve = spline(1:L, x, n=nb_sample_points)$y
W = wavelets::dwt(interpolated_curve, filter=wf, D)@W
rev( sapply( W, function(v) ( sqrt( sum(v^2) ) ) ) )
- })
+ }) )
}
# helper