de_serialize works. Variables names improved. Code beautified. TODO: clustering tests
[epclust.git] / epclust / R / main.R
index 27fbb74..280cc17 100644 (file)
@@ -1,6 +1,10 @@
-#' @title Cluster power curves with PAM in parallel
+#' @include utils.R
+#' @include clustering.R
+NULL
+
+#' Cluster power curves with PAM in parallel CLAWS: CLustering with wAvelets and Wer distanceS
 #'
-#' @description Groups electricity power curves (or any series of similar nature) by applying PAM
+#' Groups electricity power curves (or any series of similar nature) by applying PAM
 #' algorithm in parallel to chunks of size \code{nb_series_per_chunk}
 #'
 #' @param data Access to the data, which can be of one of the three following types:
 #'   + sampleCurves : wavBootstrap de package wmtsa
 #' cl = epclust(getData, K1=200, K2=15, ntasks=1000, nb_series_per_chunk=5000, WER="mix")
 #' @export
-epclust = function(series,K1,K2,ntasks=1,nb_series_per_chunk=50*K1,min_series_per_chunk=5*K1,
-       wf="haar",WER="end",ncores_tasks=1,ncores_clust=4,random=TRUE,ftype="float",...)
+claws = function(getSeries, K1, K2,
+       random=TRUE, #randomize series order?
+       wf="haar", #stage 1
+       WER="end", #stage 2
+       ntasks=1, ncores_tasks=1, ncores_clust=4, #control parallelism
+       nb_series_per_chunk=50*K1, min_series_per_chunk=5*K1, #chunk size
+       sep=",", #ASCII input separator
+       nbytes=4, endian=.Platform$endian) #serialization (write,read)
 {
        # Check/transform arguments
-       bin_dir = "epclust.bin/"
-       dir.create(bin_dir, showWarnings=FALSE, mode="0755")
-       if (!is.function(series))
+       if (!is.matrix(getSeries) && !is.function(getSeries) &&
+               !is(getSeries, "connection" && !is.character(getSeries)))
        {
-               series_file = paste(bin_dir,"data",sep="")
-               unlink(series_file)
-       }
-       if (is.matrix(series))
-               serialize(series, series_file, ftype, nb_series_per_chunk)
-       else if (!is.function(series))
-       {
-               tryCatch(
-                       {
-                               if (is.character(series))
-                                       series_con = file(series, open="r")
-                               else if (!isOpen(series))
-                               {
-                                       open(series)
-                                       series_con = series
-                               }
-                               serialize(series_con, series_file, ftype, nb_series_per_chunk)
-                               close(series_con)
-                       },
-                       error=function(e) "series should be a data.frame, a function or a valid connection"
-               )
+               stop("'getSeries': matrix, function, file or valid connection (no NA)")
        }
-       if (!is.function(series))
-               series = function(indices) getDataInFile(indices, series_file)
-       getSeries = series
-
-       K1 = toInteger(K1, function(x) x>=2)
-       K2 = toInteger(K2, function(x) x>=2)
-       ntasks = toInteger(ntasks, function(x) x>=1)
-       nb_series_per_chunk = toInteger(nb_series_per_chunk, function(x) x>=K1)
-       min_series_per_chunk = toInteger(K1, function(x) x>=K1 && x<=nb_series_per_chunk)
-       ncores_tasks = toInteger(ncores_tasks, function(x) x>=1)
-       ncores_clust = toInteger(ncores_clust, function(x) x>=1)
+       K1 = .toInteger(K1, function(x) x>=2)
+       K2 = .toInteger(K2, function(x) x>=2)
+       if (!is.logical(random))
+               stop("'random': logical")
+       tryCatch(
+               {ignored <- wt.filter(wf)},
+               error = function(e) stop("Invalid wavelet filter; see ?wavelets::wt.filter"))
        if (WER!="end" && WER!="mix")
                stop("WER takes values in {'end','mix'}")
+       ntasks = .toInteger(ntasks, function(x) x>=1)
+       ncores_tasks = .toInteger(ncores_tasks, function(x) x>=1)
+       ncores_clust = .toInteger(ncores_clust, function(x) x>=1)
+       nb_series_per_chunk = .toInteger(nb_series_per_chunk, function(x) x>=K1)
+       min_series_per_chunk = .toInteger(K1, function(x) x>=K1 && x<=nb_series_per_chunk)
+       if (!is.character(sep))
+               stop("'sep': character")
+       nbytes = .toInteger(nbytes, function(x) x==4 || x==8)
+
+       # Serialize series if required, to always use a function
+       bin_dir = "epclust.bin/"
+       dir.create(bin_dir, showWarnings=FALSE, mode="0755")
+       if (!is.function(getSeries))
+       {
+               series_file = paste(bin_dir,"data",sep="") ; unlink(series_file)
+               serialize(getSeries, series_file, nb_series_per_chunk, sep, nbytes, endian)
+               getSeries = function(indices) getDataInFile(indices, series_file, nbytes, endian)
+       }
 
        # Serialize all wavelets coefficients (+ IDs) onto a file
-       coefs_file = paste(bin_dir,"coefs",sep="")
-       unlink(coefs_file)
+       coefs_file = paste(bin_dir,"coefs",sep="") ; unlink(coefs_file)
        index = 1
        nb_curves = 0
        repeat
@@ -94,11 +96,11 @@ epclust = function(series,K1,K2,ntasks=1,nb_series_per_chunk=50*K1,min_series_pe
                if (is.null(series))
                        break
                coeffs_chunk = curvesToCoeffs(series, wf)
-               serialize(coeffs_chunk, coefs_file, ftype, nb_series_per_chunk)
+               serialize(coeffs_chunk, coefs_file, nb_series_per_chunk, sep, nbytes, endian)
                index = index + nb_series_per_chunk
                nb_curves = nb_curves + nrow(coeffs_chunk)
        }
-       getCoefs = function(indices) getDataInFile(indices, coefs_file)
+       getCoefs = function(indices) getDataInFile(indices, coefs_file, nbytes, endian)
 
        if (nb_curves < min_series_per_chunk)
                stop("Not enough data: less rows than min_series_per_chunk!")
@@ -107,26 +109,33 @@ epclust = function(series,K1,K2,ntasks=1,nb_series_per_chunk=50*K1,min_series_pe
                stop("Too many tasks: less series in one task than min_series_per_chunk!")
 
        # Cluster coefficients in parallel (by nb_series_per_chunk)
-       indices = if (random) sample(nb_curves) else seq_len(nb_curves)
+       indices_all = if (random) sample(nb_curves) else seq_len(nb_curves)
        indices_tasks = lapply(seq_len(ntasks), function(i) {
                upper_bound = ifelse( i<ntasks, min(nb_series_per_task*i,nb_curves), nb_curves )
-               indices[((i-1)*nb_series_per_task+1):upper_bound]
+               indices_all[((i-1)*nb_series_per_task+1):upper_bound]
        })
        cl = parallel::makeCluster(ncores_tasks)
-       #1000*K1 (or K2) indices (or NOTHING--> series on file)
+       # 1000*K1 indices [if WER=="end"], or empty vector [if WER=="mix"] --> series on file
        indices = unlist( parallel::parLapply(cl, indices_tasks, function(inds) {
-               clusteringTask(inds, getSeries, getSeries, getCoefs, K1, K2*(WER=="mix"),
-                       nb_series_per_chunk,ncores_clust,to_file=TRUE, ftype)
+               indices_medoids = clusteringTask(inds,getCoefs,K1,nb_series_per_chunk,ncores_clust)
+               if (WER=="mix")
+               {
+                       medoids2 = computeClusters2(
+                               getSeries(indices_medoids), K2, getSeries, nb_series_per_chunk)
+                       serialize(medoids2, synchrones_file, nb_series_per_chunk, sep, nbytes, endian)
+                       return (vector("integer",0))
+               }
+               indices_medoids
        }) )
        parallel::stopCluster(cl)
 
        getSeriesForSynchrones = getSeries
-       synchrones_file = paste(bin_dir,"synchrones",sep="")
+       synchrones_file = paste(bin_dir,"synchrones",sep="") ; unlink(synchrones_file)
        if (WER=="mix")
        {
                indices = seq_len(ntasks*K2)
                #Now series must be retrieved from synchrones_file
-               getSeries = function(inds) getDataInFile(inds, synchrones_file)
+               getSeries = function(inds) getDataInFile(inds, synchrones_file, nbytes, endian)
                #Coefs must be re-computed
                unlink(coefs_file)
                index = 1
@@ -136,12 +145,39 @@ epclust = function(series,K1,K2,ntasks=1,nb_series_per_chunk=50*K1,min_series_pe
                        if (is.null(series))
                                break
                        coeffs_chunk = curvesToCoeffs(series, wf)
-                       serialize(coeffs_chunk, coefs_file, ftype, nb_series_per_chunk)
+                       serialize(coeffs_chunk, coefs_file, nb_series_per_chunk, sep, nbytes, endian)
                        index = index + nb_series_per_chunk
                }
        }
 
        # Run step2 on resulting indices or series (from file)
-       clusteringTask(indices, getSeries, getSeriesForSynchrones, getCoefs, K1, K2,
-               nb_series_per_chunk, ncores_tasks*ncores_clust, to_file=FALSE, ftype)
+       indices_medoids = clusteringTask(
+               indices, getCoefs, K1, nb_series_per_chunk, ncores_tasks*ncores_clust)
+       computeClusters2(getSeries(indices_medoids),K2,getSeriesForSynchrones,nb_series_per_chunk)
+}
+
+# helper
+curvesToCoeffs = function(series, wf)
+{
+       L = length(series[1,])
+       D = ceiling( log2(L) )
+       nb_sample_points = 2^D
+       apply(series, 1, function(x) {
+               interpolated_curve = spline(1:L, x, n=nb_sample_points)$y
+               W = wavelets::dwt(interpolated_curve, filter=wf, D)@W
+               rev( sapply( W, function(v) ( sqrt( sum(v^2) ) ) ) )
+       })
+}
+
+# helper
+.toInteger <- function(x, condition)
+{
+       if (!is.integer(x))
+               tryCatch(
+                       {x = as.integer(x)[1]},
+                       error = function(e) paste("Cannot convert argument",substitute(x),"to integer")
+               )
+       if (!condition(x))
+               stop(paste("Argument",substitute(x),"does not verify condition",body(condition)))
+       x
 }