'update'
authorBenjamin Auder <benjamin.auder@somewhere>
Mon, 13 Mar 2017 03:51:37 +0000 (04:51 +0100)
committerBenjamin Auder <benjamin.auder@somewhere>
Mon, 13 Mar 2017 03:51:37 +0000 (04:51 +0100)
22 files changed:
epclust/DESCRIPTION
epclust/R/A_NAMESPACE.R
epclust/R/clustering.R
epclust/R/computeSynchrones.R [new file with mode: 0644]
epclust/R/computeWerDists.R [new file with mode: 0644]
epclust/R/main.R
epclust/R/utils.R [new file with mode: 0644]
epclust/inst/CITATION [deleted file]
epclust/src/computeMedoidsIndices.cpp [deleted file]
epclust/src/filter.c [new file with mode: 0644]
epclust/src/filter.cpp [deleted file]
epclust/tests/testthat/helper-common.R [new file with mode: 0644]
epclust/tests/testthat/helper.clustering.R [deleted file]
epclust/tests/testthat/helper.computeMedoidsIndices.R [deleted file]
epclust/tests/testthat/test-assignMedoids.R [new file with mode: 0644]
epclust/tests/testthat/test-clustering.R [moved from epclust/tests/testthat/test.clustering.R with 55% similarity]
epclust/tests/testthat/test-computeSynchrones.R [new file with mode: 0644]
epclust/tests/testthat/test-computeWerDists.R [moved from epclust/tests/testthat/test.wavelets.R with 74% similarity]
epclust/tests/testthat/test-de_serialize.R [moved from epclust/tests/testthat/test.de_serialize.R with 100% similarity]
epclust/tests/testthat/test-filterMA.R [moved from epclust/tests/testthat/test.filter.R with 100% similarity]
epclust/tests/testthat/test-utils.R [new file with mode: 0644]
epclust/tests/testthat/test.computeMedoidsIndices.R [deleted file]

index 1c6c51d..7fd3410 100644 (file)
@@ -32,7 +32,8 @@ Suggests:
     MASS,
     clue,
     wmtsa,
-    DBI
+    DBI,
+               digest
 License: MIT + file LICENSE
 RoxygenNote: 6.0.1
 Collate: 
index 8407d23..e9aa830 100644 (file)
@@ -4,7 +4,6 @@
 #'
 #' @useDynLib epclust
 #'
-#' @importFrom Rcpp evalCpp sourceCpp
 #' @importFrom Rwave cwt
 #' @importFrom cluster pam
 #' @importFrom parallel makeCluster clusterExport parLapply stopCluster
index 8be8715..bea073a 100644 (file)
@@ -5,20 +5,17 @@
 #' @title Two-stage clustering, withing one task (see \code{claws()})
 #'
 #' @description \code{clusteringTask1()} runs one full stage-1 task, which consists in
-#'   iterated stage 1 clustering (on nb_curves / ntasks energy contributions, computed
-#'   through discrete wavelets coefficients).
-#'   \code{clusteringTask2()} runs a full stage-2 task, which consists in synchrones
-#'   and then WER distances computations, before applying the clustering algorithm.
-#'   \code{computeClusters1()} and \code{computeClusters2()} correspond to the atomic
-#'   clustering procedures respectively for stage 1 and 2. The former applies the
-#'   first clustering algorithm on a contributions matrix, while the latter clusters
-#'   a set of series inside one task (~nb_items_clust1)
+#'   iterated stage 1 clustering on nb_curves / ntasks energy contributions, computed
+#'   through discrete wavelets coefficients.
+#'   \code{clusteringTask2()} runs a full stage-2 task, which consists in
+#'   WER distances computations between medoids indices output from stage 1,
+#'   before applying the second clustering algorithm, on the distances matrix.
 #'
-#' @param indices Range of series indices to cluster in parallel (initial data)
+#' @param indices Range of series indices to cluster
 #' @param getContribs Function to retrieve contributions from initial series indices:
-#'   \code{getContribs(indices)} outpus a contributions matrix
-#' @inheritParams computeSynchrones
+#'   \code{getContribs(indices)} outputs a contributions matrix
 #' @inheritParams claws
+#' @inheritParams computeSynchrones
 #'
 #' @return For \code{clusteringTask1()}, the indices of the computed (K1) medoids.
 #'   Indices are irrelevant for stage 2 clustering, thus \code{clusteringTask2()}
@@ -27,7 +24,7 @@ NULL
 
 #' @rdname clustering
 #' @export
-clusteringTask1 = function(indices, getContribs, K1, algoClust1, nb_items_clust1,
+clusteringTask1 = function(indices, getContribs, K1, algoClust1, nb_series_per_chunk,
        ncores_clust=1, verbose=FALSE, parll=TRUE)
 {
        if (parll)
@@ -39,7 +36,7 @@ clusteringTask1 = function(indices, getContribs, K1, algoClust1, nb_items_clust1
        while (length(indices) > K1)
        {
                # Balance tasks by splitting the indices set - as evenly as possible
-               indices_workers = .spreadIndices(indices, nb_items_clust1)
+               indices_workers = .splitIndices(indices, nb_items_clust1)
                if (verbose)
                        cat(paste("*** [iterated] Clustering task 1 on ",length(indices)," series\n", sep=""))
                indices <-
@@ -65,8 +62,8 @@ clusteringTask1 = function(indices, getContribs, K1, algoClust1, nb_items_clust1
 
 #' @rdname clustering
 #' @export
-clusteringTask2 = function(medoids, K2, algoClust2, getRefSeries, nb_ref_curves,
-       nb_series_per_chunk, nvoice, nbytes,endian,ncores_clust=1,verbose=FALSE,parll=TRUE)
+clusteringTask2 = function(indices, getSeries, K2, algoClust2, nb_series_per_chunk,
+       nvoice, nbytes, endian, ncores_clust=1, verbose=FALSE, parll=TRUE)
 {
        if (verbose)
                cat(paste("*** Clustering task 2 on ",ncol(medoids)," synchrones\n", sep=""))
@@ -88,254 +85,3 @@ clusteringTask2 = function(medoids, K2, algoClust2, getRefSeries, nb_ref_curves,
                cat(paste("*** algoClust2() on ",nrow(distances)," items\n", sep=""))
        medoids[ ,algoClust2(distances,K2) ]
 }
-
-#' computeSynchrones
-#'
-#' Compute the synchrones curves (sum of clusters elements) from a matrix of medoids,
-#' using euclidian distance.
-#'
-#' @param medoids big.matrix of medoids (curves of same length as initial series)
-#' @param getRefSeries Function to retrieve initial series (e.g. in stage 2 after series
-#'   have been replaced by stage-1 medoids)
-#' @param nb_ref_curves How many reference series? (This number is known at this stage)
-#' @inheritParams claws
-#'
-#' @return A big.matrix of size L x K1 where L = length of a serie
-#'
-#' @export
-computeSynchrones = function(medoids, getRefSeries, nb_ref_curves,
-       nb_series_per_chunk, ncores_clust=1,verbose=FALSE,parll=TRUE)
-{
-       # Synchrones computation is embarassingly parallel: compute it by chunks of series
-       computeSynchronesChunk = function(indices)
-       {
-               if (parll)
-               {
-                       require("bigmemory", quietly=TRUE)
-                       requireNamespace("synchronicity", quietly=TRUE)
-                       require("epclust", quietly=TRUE)
-                       # The big.matrix objects need to be attached to be usable on the workers
-                       synchrones <- bigmemory::attach.big.matrix(synchrones_desc)
-                       medoids <- bigmemory::attach.big.matrix(medoids_desc)
-                       m <- synchronicity::attach.mutex(m_desc)
-               }
-
-               # Obtain a chunk of reference series
-               ref_series = getRefSeries(indices)
-               nb_series = ncol(ref_series)
-
-               # Get medoids indices for this chunk of series
-               mi = computeMedoidsIndices(medoids@address, ref_series)
-
-               # Update synchrones using mi above
-               for (i in seq_len(nb_series))
-               {
-                       if (parll)
-                               synchronicity::lock(m) #locking required because several writes at the same time
-                       synchrones[, mi[i] ] = synchrones[, mi[i] ] + ref_series[,i]
-                       if (parll)
-                               synchronicity::unlock(m)
-               }
-               NULL
-       }
-
-       K = ncol(medoids) ; L = nrow(medoids)
-       # Use bigmemory (shared==TRUE by default) + synchronicity to fill synchrones in //
-       synchrones = bigmemory::big.matrix(nrow=L, ncol=K, type="double", init=0.)
-       # NOTE: synchronicity is only for Linux & MacOS; on Windows: run sequentially
-       parll = (parll && requireNamespace("synchronicity",quietly=TRUE)
-               && Sys.info()['sysname'] != "Windows")
-       if (parll)
-       {
-               m <- synchronicity::boost.mutex() #for lock/unlock, see computeSynchronesChunk
-               # mutex and big.matrix objects cannot be passed directly:
-               # they will be accessed from their description
-               m_desc <- synchronicity::describe(m)
-               synchrones_desc = bigmemory::describe(synchrones)
-               medoids_desc = bigmemory::describe(medoids)
-               cl = parallel::makeCluster(ncores_clust)
-               parallel::clusterExport(cl, envir=environment(),
-                       varlist=c("synchrones_desc","m_desc","medoids_desc","getRefSeries"))
-       }
-
-       if (verbose)
-               cat(paste("--- Compute ",K," synchrones with ",nb_ref_curves," series\n", sep=""))
-
-       # Balance tasks by splitting the indices set - maybe not so evenly, but
-       # max==TRUE in next call ensures that no set has more than nb_series_per_chunk items.
-       indices_workers = .spreadIndices(seq_len(nb_ref_curves), nb_series_per_chunk, max=TRUE)
-       ignored <-
-               if (parll)
-                       parallel::parLapply(cl, indices_workers, computeSynchronesChunk)
-               else
-                       lapply(indices_workers, computeSynchronesChunk)
-
-       if (parll)
-               parallel::stopCluster(cl)
-
-       return (synchrones)
-}
-
-#' computeWerDists
-#'
-#' Compute the WER distances between the synchrones curves (in columns), which are
-#' returned (e.g.) by \code{computeSynchrones()}
-#'
-#' @param synchrones A big.matrix of synchrones, in columns. The series have same
-#'   length as the series in the initial dataset
-#' @inheritParams claws
-#'
-#' @return A distances matrix of size K1 x K1
-#'
-#' @export
-computeWerDists = function(synchrones, nvoice, nbytes,endian,ncores_clust=1,
-       verbose=FALSE,parll=TRUE)
-{
-       n <- ncol(synchrones)
-       L <- nrow(synchrones)
-       noctave = ceiling(log2(L)) #min power of 2 to cover serie range
-
-       # Initialize result as a square big.matrix of size 'number of synchrones'
-       Xwer_dist <- bigmemory::big.matrix(nrow=n, ncol=n, type="double")
-
-       # Generate n(n-1)/2 pairs for WER distances computations
-       pairs = list()
-       V = seq_len(n)
-       for (i in 1:n)
-       {
-               V = V[-1]
-               pairs = c(pairs, lapply(V, function(v) c(i,v)))
-       }
-
-       cwt_file = ".cwt.bin"
-       # Compute the synchrones[,index] CWT, and store it in the binary file above
-       computeSaveCWT = function(index)
-       {
-               if (parll && !exists(synchrones)) #avoid going here after first call on a worker
-               {
-                       require("bigmemory", quietly=TRUE)
-                       require("Rwave", quietly=TRUE)
-                       require("epclust", quietly=TRUE)
-                       synchrones <- bigmemory::attach.big.matrix(synchrones_desc)
-               }
-               ts <- scale(ts(synchrones[,index]), center=TRUE, scale=FALSE)
-               ts_cwt = Rwave::cwt(ts, noctave, nvoice, w0=2*pi, twoD=TRUE, plot=FALSE)
-
-               # Serialization
-               binarize(as.matrix(c(as.double(Re(ts_cwt)),as.double(Im(ts_cwt)))), cwt_file, 1,
-                       ",", nbytes, endian)
-       }
-
-       if (parll)
-       {
-               cl = parallel::makeCluster(ncores_clust)
-               synchrones_desc <- bigmemory::describe(synchrones)
-               Xwer_dist_desc <- bigmemory::describe(Xwer_dist)
-               parallel::clusterExport(cl, varlist=c("parll","synchrones_desc","Xwer_dist_desc",
-                       "noctave","nvoice","verbose","getCWT"), envir=environment())
-       }
-
-       if (verbose)
-               cat(paste("--- Precompute and serialize synchrones CWT\n", sep=""))
-
-       ignored <-
-               if (parll)
-                       parallel::parLapply(cl, 1:n, computeSaveCWT)
-               else
-                       lapply(1:n, computeSaveCWT)
-
-       # Function to retrieve a synchrone CWT from (binary) file
-       getSynchroneCWT = function(index, L)
-       {
-               flat_cwt <- getDataInFile(index, cwt_file, nbytes, endian)
-               cwt_length = length(flat_cwt) / 2
-               re_part = as.matrix(flat_cwt[1:cwt_length], nrow=L)
-               im_part = as.matrix(flat_cwt[(cwt_length+1):(2*cwt_length)], nrow=L)
-               re_part + 1i * im_part
-       }
-
-       # Compute distance between columns i and j in synchrones
-       computeDistanceIJ = function(pair)
-       {
-               if (parll)
-               {
-                       # parallel workers start with an empty environment
-                       require("bigmemory", quietly=TRUE)
-                       require("epclust", quietly=TRUE)
-                       synchrones <- bigmemory::attach.big.matrix(synchrones_desc)
-                       Xwer_dist <- bigmemory::attach.big.matrix(Xwer_dist_desc)
-               }
-
-               i = pair[1] ; j = pair[2]
-               if (verbose && j==i+1 && !parll)
-                       cat(paste("   Distances (",i,",",j,"), (",i,",",j+1,") ...\n", sep=""))
-
-               # Compute CWT of columns i and j in synchrones
-               L = nrow(synchrones)
-               cwt_i <- getSynchroneCWT(i, L)
-               cwt_j <- getSynchroneCWT(j, L)
-
-               # Compute the ratio of integrals formula 5.6 for WER^2
-               # in https://arxiv.org/abs/1101.4744v2 §5.3
-               num <- filterMA(Mod(cwt_i * Conj(cwt_j)))
-               WX  <- filterMA(Mod(cwt_i * Conj(cwt_i)))
-               WY <- filterMA(Mod(cwt_j * Conj(cwt_j)))
-               wer2 <- sum(colSums(num)^2) / sum(colSums(WX) * colSums(WY))
-
-               Xwer_dist[i,j] <- sqrt(L * ncol(cwt_i) * (1 - wer2))
-               Xwer_dist[j,i] <- Xwer_dist[i,j]
-               Xwer_dist[i,i] <- 0.
-       }
-
-       if (verbose)
-               cat(paste("--- Compute WER distances\n", sep=""))
-
-       ignored <-
-               if (parll)
-                       parallel::parLapply(cl, pairs, computeDistanceIJ)
-               else
-                       lapply(pairs, computeDistanceIJ)
-
-       if (parll)
-               parallel::stopCluster(cl)
-
-       unlink(cwt_file)
-
-       Xwer_dist[n,n] = 0.
-       Xwer_dist[,] #~small matrix K1 x K1
-}
-
-# Helper function to divide indices into balanced sets
-# If max == TRUE, sets sizes cannot exceed nb_per_set
-.spreadIndices = function(indices, nb_per_set, max=FALSE)
-{
-       L = length(indices)
-       nb_workers = floor( L / nb_per_set )
-       rem = L %% nb_per_set
-       if (nb_workers == 0 || (nb_workers==1 && rem==0))
-       {
-               # L <= nb_per_set, simple case
-               indices_workers = list(indices)
-       }
-       else
-       {
-               indices_workers = lapply( seq_len(nb_workers), function(i)
-                       indices[(nb_per_set*(i-1)+1):(nb_per_set*i)] )
-
-               if (max)
-               {
-                       # Sets are not so well balanced, but size is supposed to be critical
-                       return ( c( indices_workers, if (rem>0) list((L-rem+1):L) else NULL ) )
-               }
-
-               # Spread the remaining load among the workers
-               rem = L %% nb_per_set
-               while (rem > 0)
-               {
-                       index = rem%%nb_workers + 1
-                       indices_workers[[index]] = c(indices_workers[[index]], indices[L-rem+1])
-                       rem = rem - 1
-               }
-       }
-       indices_workers
-}
diff --git a/epclust/R/computeSynchrones.R b/epclust/R/computeSynchrones.R
new file mode 100644 (file)
index 0000000..f73e64e
--- /dev/null
@@ -0,0 +1,89 @@
+#' computeSynchrones
+#'
+#' Compute the synchrones curves (sum of clusters elements) from a matrix of medoids,
+#' using euclidian distance.
+#'
+#' @param medoids matrix of medoids in columns (curves of same length as the series)
+#' @param getSeries Function to retrieve series (argument: 'indices', integer vector)
+#' @param nb_curves How many series? (this is known, at this stage)
+#' @inheritParams claws
+#'
+#' @return A matrix of K synchrones in columns (same length as the series)
+#'
+#' @export
+computeSynchrones = function(medoids, getSeries, nb_curves,
+       nb_series_per_chunk, ncores_clust=1,verbose=FALSE,parll=TRUE)
+{
+       # Synchrones computation is embarassingly parallel: compute it by chunks of series
+       computeSynchronesChunk = function(indices)
+       {
+               if (parll)
+               {
+                       require("bigmemory", quietly=TRUE)
+                       requireNamespace("synchronicity", quietly=TRUE)
+                       require("epclust", quietly=TRUE)
+                       # The big.matrix objects need to be attached to be usable on the workers
+                       synchrones <- bigmemory::attach.big.matrix(synchrones_desc)
+                       medoids <- bigmemory::attach.big.matrix(medoids_desc)
+                       m <- synchronicity::attach.mutex(m_desc)
+               }
+
+               # Obtain a chunk of reference series
+               series_chunk = getSeries(indices)
+               nb_series_chunk = ncol(series_chunk)
+
+               # Get medoids indices for this chunk of series
+               for (i in seq_len(nb_series_chunk))
+                       mi[i] <- which.min( colSums( sweep(medoids, 1, series_chunk[,i], '-')^2 ) )
+
+               # Update synchrones using mi above, grouping it by values of mi (in 1...K)
+               # to avoid too many lock/unlock
+               for (i in seq_len(K))
+               {
+                       # lock / unlock required because several writes at the same time
+                       if (parll)
+                               synchronicity::lock(m)
+                       synchrones[,i] = synchrones[,i] + rowSums(series_chunk[,mi==i])
+                       if (parll)
+                               synchronicity::unlock(m)
+               }
+               NULL
+       }
+
+       K = ncol(medoids)
+       L = nrow(medoids)
+       # Use bigmemory (shared==TRUE by default) + synchronicity to fill synchrones in //
+       synchrones = bigmemory::big.matrix(nrow=L, ncol=K, type="double", init=0.)
+       # NOTE: synchronicity is only for Linux & MacOS; on Windows: run sequentially
+       parll = (parll && requireNamespace("synchronicity",quietly=TRUE)
+               && Sys.info()['sysname'] != "Windows")
+       if (parll)
+       {
+               m <- synchronicity::boost.mutex() #for lock/unlock, see computeSynchronesChunk
+               # mutex and big.matrix objects cannot be passed directly:
+               # they will be accessed from their description
+               m_desc <- synchronicity::describe(m)
+               synchrones_desc = bigmemory::describe(synchrones)
+               medoids <- bigmemory::as.big.matrix(medoids)
+               medoids_desc <- bigmemory::describe(medoids)
+               cl = parallel::makeCluster(ncores_clust)
+               parallel::clusterExport(cl, envir=environment(),
+                       varlist=c("synchrones_desc","m_desc","medoids_desc","getRefSeries"))
+       }
+
+       if (verbose)
+               cat(paste("--- Compute ",K," synchrones with ",nb_curves," series\n", sep=""))
+
+       # Balance tasks by splitting 1:nb_ref_curves into groups of size <= nb_series_per_chunk
+       indices_workers = .splitIndices(seq_len(nb_ref_curves), nb_series_per_chunk)
+       ignored <-
+               if (parll)
+                       parallel::parLapply(cl, indices_workers, computeSynchronesChunk)
+               else
+                       lapply(indices_workers, computeSynchronesChunk)
+
+       if (parll)
+               parallel::stopCluster(cl)
+
+       return (synchrones[,])
+}
diff --git a/epclust/R/computeWerDists.R b/epclust/R/computeWerDists.R
new file mode 100644 (file)
index 0000000..aae1cc1
--- /dev/null
@@ -0,0 +1,141 @@
+#' computeWerDists
+#'
+#' Compute the WER distances between the synchrones curves (in columns), which are
+#' returned (e.g.) by \code{computeSynchrones()}
+#'
+#' @param synchrones A big.matrix of synchrones, in columns. The series have same
+#'   length as the series in the initial dataset
+#' @inheritParams claws
+#'
+#' @return A distances matrix of size K1 x K1
+#'
+#' @export
+computeWerDists = function(synchrones, nvoice, nbytes,endian,ncores_clust=1,
+       verbose=FALSE,parll=TRUE)
+{
+       n <- ncol(synchrones)
+       L <- nrow(synchrones)
+       noctave = ceiling(log2(L)) #min power of 2 to cover serie range
+
+       # Initialize result as a square big.matrix of size 'number of synchrones'
+       Xwer_dist <- bigmemory::big.matrix(nrow=n, ncol=n, type="double")
+
+       # Generate n(n-1)/2 pairs for WER distances computations
+       pairs = list()
+       V = seq_len(n)
+       for (i in 1:n)
+       {
+               V = V[-1]
+               pairs = c(pairs, lapply(V, function(v) c(i,v)))
+       }
+
+       cwt_file = ".cwt.bin"
+       # Compute the synchrones[,indices] CWT, and store the results in the binary file
+       computeSaveCWT = function(indices)
+       {
+               if (parll)
+               {
+                       require("bigmemory", quietly=TRUE)
+                       require("Rwave", quietly=TRUE)
+                       require("epclust", quietly=TRUE)
+                       synchrones <- bigmemory::attach.big.matrix(synchrones_desc)
+               }
+
+               # Obtain CWT as big vectors of real part + imaginary part (concatenate)
+               ts_cwt <- sapply(indices, function(i) {
+                       ts <- scale(ts(synchrones[,i]), center=TRUE, scale=FALSE)
+                       ts_cwt <- Rwave::cwt(ts, noctave, nvoice, w0=2*pi, twoD=TRUE, plot=FALSE)
+                       c( as.double(Re(ts_cwt)),as.double(Im(ts_cwt)) )
+               })
+
+               # Serialization
+               binarize(ts_cwt, cwt_file, 1, ",", nbytes, endian)
+       }
+
+       if (parll)
+       {
+               cl = parallel::makeCluster(ncores_clust)
+               synchrones_desc <- bigmemory::describe(synchrones)
+               Xwer_dist_desc <- bigmemory::describe(Xwer_dist)
+               parallel::clusterExport(cl, varlist=c("parll","synchrones_desc","Xwer_dist_desc",
+                       "noctave","nvoice","verbose","getCWT"), envir=environment())
+       }
+
+       if (verbose)
+               cat(paste("--- Precompute and serialize synchrones CWT\n", sep=""))
+
+       ignored <-
+               if (parll)
+                       parallel::parLapply(cl, 1:n, computeSaveCWT)
+               else
+                       lapply(1:n, computeSaveCWT)
+
+       # Function to retrieve a synchrone CWT from (binary) file
+       getSynchroneCWT = function(index, L)
+       {
+               flat_cwt <- getDataInFile(index, cwt_file, nbytes, endian)
+               cwt_length = length(flat_cwt) / 2
+               re_part = as.matrix(flat_cwt[1:cwt_length], nrow=L)
+               im_part = as.matrix(flat_cwt[(cwt_length+1):(2*cwt_length)], nrow=L)
+               re_part + 1i * im_part
+       }
+
+
+
+
+#TODO: better repartition here, 
+       #better code in .splitIndices :: never exceed nb_per_chunk; arg: min_per_chunk (default: 1)
+###TODO: reintroduire nb_items_clust ======> l'autre est typiquement + grand !!! (pas de relation !)
+
+
+
+       # Compute distance between columns i and j in synchrones
+       computeDistanceIJ = function(pair)
+       {
+               if (parll)
+               {
+                       # parallel workers start with an empty environment
+                       require("bigmemory", quietly=TRUE)
+                       require("epclust", quietly=TRUE)
+                       synchrones <- bigmemory::attach.big.matrix(synchrones_desc)
+                       Xwer_dist <- bigmemory::attach.big.matrix(Xwer_dist_desc)
+               }
+
+               i = pair[1] ; j = pair[2]
+               if (verbose && j==i+1 && !parll)
+                       cat(paste("   Distances (",i,",",j,"), (",i,",",j+1,") ...\n", sep=""))
+
+               # Compute CWT of columns i and j in synchrones
+               L = nrow(synchrones)
+               cwt_i <- getSynchroneCWT(i, L)
+               cwt_j <- getSynchroneCWT(j, L)
+
+               # Compute the ratio of integrals formula 5.6 for WER^2
+               # in https://arxiv.org/abs/1101.4744v2 §5.3
+               num <- filterMA(Mod(cwt_i * Conj(cwt_j)))
+               WX  <- filterMA(Mod(cwt_i * Conj(cwt_i)))
+               WY <- filterMA(Mod(cwt_j * Conj(cwt_j)))
+               wer2 <- sum(colSums(num)^2) / sum(colSums(WX) * colSums(WY))
+
+               Xwer_dist[i,j] <- sqrt(L * ncol(cwt_i) * (1 - wer2))
+               Xwer_dist[j,i] <- Xwer_dist[i,j]
+               Xwer_dist[i,i] <- 0.
+       }
+
+       if (verbose)
+               cat(paste("--- Compute WER distances\n", sep=""))
+
+       ignored <-
+               if (parll)
+                       parallel::parLapply(cl, pairs, computeDistanceIJ)
+               else
+                       lapply(pairs, computeDistanceIJ)
+
+       if (parll)
+               parallel::stopCluster(cl)
+
+       unlink(cwt_file)
+
+       Xwer_dist[n,n] = 0.
+       Xwer_dist[,] #~small matrix K1 x K1
+}
index f666267..603f7bf 100644 (file)
 #'   \item Divide series into \code{ntasks} groups to process in parallel. In each task:
 #'   \enumerate{
 #'     \item iterate the first clustering algorithm on its aggregated outputs,
-#'       on inputs of size \code{nb_items_clust1}
+#'       on inputs of size \code{nb_series_per_chunk}
 #'     \item optionally, if WER=="mix":
 #'       a) compute the K1 synchrones curves,
-#'       b) compute WER distances (K1xK1 matrix) between synchrones and
-#'       c) apply the second clustering algorithm
+#'       a) compute WER distances (K1xK1 matrix) between medoids and
+#'       b) apply the second clustering algorithm (output: K2 indices)
 #'   }
 #'   \item Launch a final task on the aggregated outputs of all previous tasks:
-#'     in the case WER=="end" this task takes indices in input, otherwise
-#'     (medoid) curves
+#'     ntasks*K1 if WER=="end", ntasks*K2 otherwise
+#'   \item Compute synchrones (sum of series within each final group)
 #' }
 #' \cr
-#' The main argument -- \code{getSeries} -- has a quite misleading name, since it can be
-#' either a [big.]matrix, a CSV file, a connection or a user function to retrieve
-#' series; the name was chosen because all types of arguments are converted to a function.
-#' When \code{getSeries} is given as a function, it must take a single argument,
+#' The main argument -- \code{series} -- has a quite misleading name, since it can be
+#' either a [big.]matrix, a CSV file, a connection or a user function to retrieve series.
+#' When \code{series} is given as a function, it must take a single argument,
 #' 'indices', integer vector equal to the indices of the curves to retrieve;
-#' see SQLite example. The nature and role of other arguments should be clear.
+#' see SQLite example.
 #' WARNING: the return value must be a matrix (in columns), or NULL if no matches.
 #' \cr
 #' Note: Since we don't make assumptions on initial data, there is a possibility that
-#' even when serialized, contributions or synchrones do not fit in RAM. For example,
+#' even when serialized, contributions do not fit in RAM. For example,
 #' 30e6 series of length 100,000 would lead to a +4Go contribution matrix. Therefore,
 #' it's safer to place these in (binary) files; that's what we do.
 #'
-#' @param getSeries Access to the (time-)series, which can be of one of the three
+#' @param series Access to the (time-)series, which can be of one of the three
 #'   following types:
 #'   \itemize{
 #'     \item [big.]matrix: each column contains the (time-ordered) values of one time-serie
@@ -46,7 +45,8 @@
 #'   }
 #' @param K1 Number of clusters to be found after stage 1 (K1 << N [number of series])
 #' @param K2 Number of clusters to be found after stage 2 (K2 << K1)
-#' @param nb_series_per_chunk (Maximum) number of series to retrieve in one batch
+#' @param nb_series_per_chunk (Maximum) number of series to retrieve in one batch;
+#'   this value is also used for the (maximum) number of series to cluster at a time
 #' @param algoClust1 Clustering algorithm for stage 1. A function which takes (data, K)
 #'   as argument where data is a matrix in columns and K the desired number of clusters,
 #'   and outputs K medoids ranks. Default: PAM. In our method, this function is called
 #' @param algoClust2 Clustering algorithm for stage 2. A function which takes (dists, K)
 #'   as argument where dists is a matrix of distances and K the desired number of clusters,
 #'   and outputs K medoids ranks. Default: PAM.  In our method, this function is called
-#'   on a matrix of K1 x K1 (WER) distances computed between synchrones
-#' @param nb_items_clust1 (~Maximum) number of items in input of the clustering algorithm
-#'   for stage 1. At worst, a clustering algorithm might be called with ~2*nb_items_clust1
-#'   items; but this could only happen at the last few iterations.
+#'   on a matrix of K1 x K1 (WER) distances computed between medoids after algorithm 1
 #' @param wav_filt Wavelet transform filter; see ?wavelets::wt.filter
 #' @param contrib_type Type of contribution: "relative", "logit" or "absolute" (any prefix)
 #' @param WER "end" to apply stage 2 after stage 1 has fully iterated, or "mix" to apply
 #'   or K2 [if WER=="mix"] medoids); default: 1.
 #'   Note: ntasks << N (number of series), so that N is "roughly divisible" by ntasks
 #' @param ncores_tasks Number of parallel tasks (1 to disable: sequential tasks)
-#' @param ncores_clust Number of parallel clusterings in one task (4 should be a minimum)
+#' @param ncores_clust Number of parallel clusterings in one task (3 should be a minimum)
 #' @param sep Separator in CSV input file (if any provided)
 #' @param nbytes Number of bytes to serialize a floating-point number; 4 or 8
 #' @param endian Endianness for (de)serialization ("little" or "big")
 #' @param verbose Level of verbosity (0/FALSE for nothing or 1/TRUE for all; devel stage)
 #' @param parll TRUE to fully parallelize; otherwise run sequentially (debug, comparison)
 #'
-#' @return A matrix of the final K2 medoids curves, in columns
+#' @return A list with
+#' \itemize{
+#'   medoids: a matrix of the final K2 medoids curves, in columns
+#'   ranks: corresponding indices in the dataset
+#'   synchrones: a matrix of the K2 sum of series within each final group
+#' }
 #'
 #' @references Clustering functional data using Wavelets [2013];
 #'   A. Antoniadis, X. Brossat, J. Cugliari & J.-M. Poggi.
 #' series = do.call( cbind, lapply( 1:6, function(i)
 #'   do.call(cbind, wmtsa::wavBootstrap(ref_series[,i], n.realization=400)) ) )
 #' #dim(series) #c(2400,10001)
-#' medoids_ascii = claws(series, K1=60, K2=6, 200, verbose=TRUE)
+#' res_ascii = claws(series, K1=60, K2=6, 200, verbose=TRUE)
 #'
 #' # Same example, from CSV file
 #' csv_file = "/tmp/epclust_series.csv"
 #' write.table(series, csv_file, sep=",", row.names=FALSE, col.names=FALSE)
-#' medoids_csv = claws(csv_file, K1=60, K2=6, 200)
+#' res_csv = claws(csv_file, K1=60, K2=6, 200)
 #'
 #' # Same example, from binary file
 #' bin_file <- "/tmp/epclust_series.bin"
 #' endian <- "little"
 #' binarize(csv_file, bin_file, 500, nbytes, endian)
 #' getSeries <- function(indices) getDataInFile(indices, bin_file, nbytes, endian)
-#' medoids_bin <- claws(getSeries, K1=60, K2=6, 200)
+#' res_bin <- claws(getSeries, K1=60, K2=6, 200)
 #' unlink(csv_file)
 #' unlink(bin_file)
 #'
 #'   else
 #'     NULL
 #' }
-#' medoids_db = claws(getSeries, K1=60, K2=6, 200))
+#' res_db = claws(getSeries, K1=60, K2=6, 200))
 #' dbDisconnect(series_db)
 #'
-#' # All computed medoids should be the same:
-#' digest::sha1(medoids_ascii)
-#' digest::sha1(medoids_csv)
-#' digest::sha1(medoids_bin)
-#' digest::sha1(medoids_db)
+#' # All results should be the same:
+#' library(digest)
+#' digest::sha1(res_ascii)
+#' digest::sha1(res_csv)
+#' digest::sha1(res_bin)
+#' digest::sha1(res_db)
 #' }
 #' @export
-claws <- function(getSeries, K1, K2, nb_series_per_chunk, nb_items_clust1=7*K1,
-       algoClust1=function(data,K) cluster::pam(t(data),K,diss=FALSE)$id.med,
-       algoClust2=function(dists,K) cluster::pam(dists,K,diss=TRUE)$id.med,
+claws <- function(getSeries, K1, K2, nb_series_per_chunk,
+       algoClust1=function(data,K) cluster::pam(t(data),K,diss=FALSE,pamonce=1)$id.med,
+       algoClust2=function(dists,K) cluster::pam(dists,K,diss=TRUE,pamonce=1)$id.med,
        wav_filt="d8", contrib_type="absolute", WER="end", nvoice=4, random=TRUE,
        ntasks=1, ncores_tasks=1, ncores_clust=4, sep=",", nbytes=4,
        endian=.Platform$endian, verbose=FALSE, parll=TRUE)
 {
        # Check/transform arguments
-       if (!is.matrix(getSeries) && !bigmemory::is.big.matrix(getSeries)
-               && !is.function(getSeries)
-               && !methods::is(getSeries,"connection") && !is.character(getSeries))
+       if (!is.matrix(series) && !bigmemory::is.big.matrix(series)
+               && !is.function(series)
+               && !methods::is(series,"connection") && !is.character(series))
        {
-               stop("'getSeries': [big]matrix, function, file or valid connection (no NA)")
+               stop("'series': [big]matrix, function, file or valid connection (no NA)")
        }
        K1 <- .toInteger(K1, function(x) x>=2)
        K2 <- .toInteger(K2, function(x) x>=2)
@@ -168,7 +171,6 @@ claws <- function(getSeries, K1, K2, nb_series_per_chunk, nb_items_clust1=7*K1,
        # to load K1 series in memory for clustering stage 2.
        if (K1 > nb_series_per_chunk)
                stop("'K1' cannot exceed 'nb_series_per_chunk'")
-       nb_items_clust1 <- .toInteger(nb_items_clust1, function(x) x>K1)
        random <- .toLogical(random)
        tryCatch( {ignored <- wavelets::wt.filter(wav_filt)},
                error = function(e) stop("Invalid wavelet filter; see ?wavelets::wt.filter") )
@@ -188,21 +190,23 @@ claws <- function(getSeries, K1, K2, nb_series_per_chunk, nb_items_clust1=7*K1,
        verbose <- .toLogical(verbose)
        parll <- .toLogical(parll)
 
-       # Binarize series if getSeries is not a function; the aim is to always use a function,
+       # Binarize series if it is not a function; the aim is to always use a function,
        # to uniformize treatments. An equally good alternative would be to use a file-backed
        # bigmemory::big.matrix, but it would break the "all-is-function" pattern.
-       if (!is.function(getSeries))
+       if (!is.function(series))
        {
                if (verbose)
                        cat("...Serialize time-series (or retrieve past binary file)\n")
-               series_file = ".series.bin"
+               series_file = ".series.epclust.bin"
                if (!file.exists(series_file))
-                       binarize(getSeries, series_file, nb_series_per_chunk, sep, nbytes, endian)
+                       binarize(series, series_file, nb_series_per_chunk, sep, nbytes, endian)
                getSeries = function(inds) getDataInFile(inds, series_file, nbytes, endian)
        }
+       else
+               getSeries = series
 
        # Serialize all computed wavelets contributions into a file
-       contribs_file = ".contribs.bin"
+       contribs_file = ".contribs.epclust.bin"
        index = 1
        nb_curves = 0
        if (verbose)
@@ -210,7 +214,7 @@ claws <- function(getSeries, K1, K2, nb_series_per_chunk, nb_items_clust1=7*K1,
        if (!file.exists(contribs_file))
        {
                nb_curves = binarizeTransform(getSeries,
-                       function(series) curvesToContribs(series, wav_filt, contrib_type),
+                       function(curves) curvesToContribs(curves, wav_filt, contrib_type),
                        contribs_file, nb_series_per_chunk, nbytes, endian)
        }
        else
@@ -243,12 +247,9 @@ claws <- function(getSeries, K1, K2, nb_series_per_chunk, nb_items_clust1=7*K1,
                # Initialize parallel runs: outfile="" allow to output verbose traces in the console
                # under Linux. All necessary variables are passed to the workers.
                cl = parallel::makeCluster(ncores_tasks, outfile="")
-               varlist = c("getSeries","getContribs","K1","K2","algoClust1","algoClust2",
-                       "nb_series_per_chunk","nb_items_clust1","ncores_clust",
-                       "nvoice","sep","nbytes","endian","verbose","parll")
-               if (WER=="mix" && ntasks>1)
-                       varlist = c(varlist, "medoids_file")
-               parallel::clusterExport(cl, varlist, envir = environment())
+               parallel::clusterExport(cl, envir = environment(),
+                       varlist = c("getSeries","getContribs","K1","K2","algoClust1","algoClust2",
+                       "nb_series_per_chunk","ncores_clust","nvoice","nbytes","endian","verbose","parll"))
        }
 
        # This function achieves one complete clustering task, divided in stage 1 + stage 2.
@@ -261,27 +262,16 @@ claws <- function(getSeries, K1, K2, nb_series_per_chunk, nb_items_clust1=7*K1,
                # packages, and pass useful variables.
                if (parll && ntasks>1)
                        require("epclust", quietly=TRUE)
-               indices_medoids = clusteringTask1(
-                       inds, getContribs, K1, algoClust1, nb_series_per_chunk, ncores_clust, verbose, parll)
-               if (WER=="mix" && ntasks>1)
+               indices_medoids = clusteringTask1(inds, getContribs, K1, algoClust1,
+                       nb_series_per_chunk, ncores_clust, verbose, parll)
+               if (WER=="mix")
                {
-                       if (parll)
-                               require("bigmemory", quietly=TRUE)
-                       medoids1 = bigmemory::as.big.matrix( getSeries(indices_medoids) )
-                       medoids2 = clusteringTask2(medoids1, K2, algoClust2, getSeries, nb_curves,
+                       indices_medoids = clusteringTask2(indices_medoids, getSeries, K2, algoClust2,
                                nb_series_per_chunk, nvoice, nbytes, endian, ncores_clust, verbose, parll)
-                       binarize(medoids2, medoids_file, nb_series_per_chunk, sep, nbytes, endian)
-                       return (vector("integer",0))
                }
                indices_medoids
        }
 
-       # Synchrones (medoids) need to be stored only if WER=="mix"; indeed in this case, every
-       # task output is a set of new (medoids) curves. If WER=="end" however, output is just a
-       # set of indices, representing some initial series.
-       if (WER=="mix" && ntasks>1)
-               {medoids_file = ".medoids.bin" ; unlink(medoids_file)}
-
        if (verbose)
        {
                message = paste("...Run ",ntasks," x stage 1", sep="")
@@ -292,110 +282,32 @@ claws <- function(getSeries, K1, K2, nb_series_per_chunk, nb_items_clust1=7*K1,
 
        # As explained above, indices will be assigned to ntasks*K1 medoids indices [if WER=="end"],
        # or nothing (empty vector) if WER=="mix"; in this case, synchrones are stored in a file.
-       indices <-
+       indices_medoids_all <-
                if (parll && ntasks>1)
                        unlist( parallel::parLapply(cl, indices_tasks, runTwoStepClustering) )
                else
                        unlist( lapply(indices_tasks, runTwoStepClustering) )
+
        if (parll && ntasks>1)
                parallel::stopCluster(cl)
 
-       # Right before the final stage, two situations are possible:
-       #  a. data to be processed now sit in a binary format in medoids_file (if WER=="mix")
-       #  b. data still is the initial set of curves, referenced by the ntasks*K1 indices
-       # So, the function getSeries() will potentially change. However, computeSynchrones()
-       # requires a function retrieving the initial series. Thus, the next line saves future
-       # conditional instructions.
-       getRefSeries = getSeries
-
-       if (WER=="mix" && ntasks>1)
-       {
-               indices = seq_len(ntasks*K2)
-               # Now series (synchrones) must be retrieved from medoids_file
-               getSeries = function(inds) getDataInFile(inds, medoids_file, nbytes, endian)
-               # Contributions must be re-computed
-               unlink(contribs_file)
-               index = 1
-               if (verbose)
-                       cat("...Serialize contributions computed on synchrones\n")
-               ignored = binarizeTransform(getSeries,
-                       function(series) curvesToContribs(series, wav_filt, contrib_type),
-                       contribs_file, nb_series_per_chunk, nbytes, endian)
-       }
+       # Right before the final stage, input data still is the initial set of curves, referenced
+       # by the ntasks*[K1 or K2] medoids indices.
 
-       # Run step2 on resulting indices or series (from file)
+       # Run last clustering tasks to obtain only K2 medoids indices, from ntasks*[K1 or K2]
+       # indices, depending wether WER=="end" or WER=="mix"
        if (verbose)
                cat("...Run final // stage 1 + stage 2\n")
-       indices_medoids = clusteringTask1(indices, getContribs, K1, algoClust1,
+       indices_medoids = clusteringTask1(indices_medoids_all, getContribs, K1, algoClust1,
                nb_series_per_chunk, ncores_tasks*ncores_clust, verbose, parll)
-       medoids1 = bigmemory::as.big.matrix( getSeries(indices_medoids) )
-       medoids2 = clusteringTask2(medoids1, K2, algoClust2, getRefSeries, nb_curves,
+       indices_medoids = clusteringTask2(indices_medoids, getContribs, K2, algoClust2,
                nb_series_per_chunk, nvoice, nbytes, endian, ncores_tasks*ncores_clust, verbose, parll)
 
-       # Cleanup: remove temporary binary files
-       tryCatch(
-               {unlink(series_file); unlink(contribs_file); unlink(medoids_file)},
-               error = function(e) {})
+       # Compute synchrones
+       medoids = getSeries(indices_medoids)
+       synchrones = computeSynchrones(medoids, getSeries, nb_curves, nb_series_per_chunk,
+               ncores_tasks*ncores_clust, verbose, parll)
 
-       # Return medoids as a standard matrix, since K2 series have to fit in RAM
-       # (clustering algorithm 1 takes K1 > K2 of them as input)
-       medoids2[,]
-}
-
-#' curvesToContribs
-#'
-#' Compute the discrete wavelet coefficients for each series, and aggregate them in
-#' energy contribution across scales as described in https://arxiv.org/abs/1101.4744v2
-#'
-#' @param series [big.]matrix of series (in columns), of size L x n
-#' @inheritParams claws
-#'
-#' @return A [big.]matrix of size log(L) x n containing contributions in columns
-#'
-#' @export
-curvesToContribs = function(series, wav_filt, contrib_type, coin=FALSE)
-{
-       series = as.matrix(series) #1D serie could occur
-       L = nrow(series)
-       D = ceiling( log2(L) )
-       # Series are interpolated to all have length 2^D
-       nb_sample_points = 2^D
-       apply(series, 2, function(x) {
-               interpolated_curve = spline(1:L, x, n=nb_sample_points)$y
-               W = wavelets::dwt(interpolated_curve, filter=wav_filt, D)@W
-               # Compute the sum of squared discrete wavelet coefficients, for each scale
-               nrj = rev( sapply( W, function(v) ( sqrt( sum(v^2) ) ) ) )
-               if (contrib_type!="absolute")
-                       nrj = nrj / sum(nrj)
-               if (contrib_type=="logit")
-                       nrj = - log(1 - nrj)
-               nrj
-       })
-}
-
-# Check integer arguments with functional conditions
-.toInteger <- function(x, condition)
-{
-       errWarn <- function(ignored)
-               paste("Cannot convert argument' ",substitute(x),"' to integer", sep="")
-       if (!is.integer(x))
-               tryCatch({x = as.integer(x)[1]; if (is.na(x)) stop()},
-                       warning = errWarn, error = errWarn)
-       if (!condition(x))
-       {
-               stop(paste("Argument '",substitute(x),
-                       "' does not verify condition ",body(condition), sep=""))
-       }
-       x
-}
-
-# Check logical arguments
-.toLogical <- function(x)
-{
-       errWarn <- function(ignored)
-               paste("Cannot convert argument' ",substitute(x),"' to logical", sep="")
-       if (!is.logical(x))
-               tryCatch({x = as.logical(x)[1]; if (is.na(x)) stop()},
-                       warning = errWarn, error = errWarn)
-       x
+       # NOTE: no need to use big.matrix here, since there are only K2 << K1 << N remaining curves
+       list("medoids"=medoids, "ranks"=indices_medoids, "synchrones"=synchrones)
 }
diff --git a/epclust/R/utils.R b/epclust/R/utils.R
new file mode 100644 (file)
index 0000000..ba643d0
--- /dev/null
@@ -0,0 +1,117 @@
+# Check integer arguments with functional conditions
+.toInteger <- function(x, condition)
+{
+       errWarn <- function(ignored)
+               paste("Cannot convert argument' ",substitute(x),"' to integer", sep="")
+       if (!is.integer(x))
+               tryCatch({x = as.integer(x)[1]; if (is.na(x)) stop()},
+                       warning = errWarn, error = errWarn)
+       if (!condition(x))
+       {
+               stop(paste("Argument '",substitute(x),
+                       "' does not verify condition ",body(condition), sep=""))
+       }
+       x
+}
+
+# Check logical arguments
+.toLogical <- function(x)
+{
+       errWarn <- function(ignored)
+               paste("Cannot convert argument' ",substitute(x),"' to logical", sep="")
+       if (!is.logical(x))
+               tryCatch({x = as.logical(x)[1]; if (is.na(x)) stop()},
+                       warning = errWarn, error = errWarn)
+       x
+}
+
+#' curvesToContribs
+#'
+#' Compute the discrete wavelet coefficients for each series, and aggregate them in
+#' energy contribution across scales as described in https://arxiv.org/abs/1101.4744v2
+#'
+#' @param series [big.]matrix of series (in columns), of size L x n
+#' @inheritParams claws
+#'
+#' @return A matrix of size log(L) x n containing contributions in columns
+#'
+#' @export
+curvesToContribs = function(series, wav_filt, contrib_type, coin=FALSE)
+{
+       L = nrow(series)
+       D = ceiling( log2(L) )
+       # Series are interpolated to all have length 2^D
+       nb_sample_points = 2^D
+       apply(series, 2, function(x) {
+               interpolated_curve = spline(1:L, x, n=nb_sample_points)$y
+               W = wavelets::dwt(interpolated_curve, filter=wav_filt, D)@W
+               # Compute the sum of squared discrete wavelet coefficients, for each scale
+               nrj = rev( sapply( W, function(v) ( sqrt( sum(v^2) ) ) ) )
+               if (contrib_type!="absolute")
+                       nrj = nrj / sum(nrj)
+               if (contrib_type=="logit")
+                       nrj = - log(1 - nrj)
+               nrj
+       })
+}
+
+# Helper function to divide indices into balanced sets
+# If max == TRUE, sets sizes cannot exceed nb_per_set
+.splitIndices = function(indices, nb_per_set, max=FALSE)
+{
+       L = length(indices)
+       nb_workers = floor( L / nb_per_set )
+       rem = L %% nb_per_set
+       if (nb_workers == 0 || (nb_workers==1 && rem==0))
+       {
+               # L <= nb_per_set, simple case
+               indices_workers = list(indices)
+       }
+       else
+       {
+               indices_workers = lapply( seq_len(nb_workers), function(i)
+                       indices[(nb_per_set*(i-1)+1):(nb_per_set*i)] )
+
+               if (max)
+               {
+                       # Sets are not so well balanced, but size is supposed to be critical
+                       return ( c( indices_workers, if (rem>0) list((L-rem+1):L) else NULL ) )
+               }
+
+               # Spread the remaining load among the workers
+               rem = L %% nb_per_set
+               while (rem > 0)
+               {
+                       index = rem%%nb_workers + 1
+                       indices_workers[[index]] = c(indices_workers[[index]], indices[L-rem+1])
+                       rem = rem - 1
+               }
+       }
+       indices_workers
+}
+
+#' filterMA
+#'
+#' Filter [time-]series by replacing all values by the moving average of values
+#' centered around current one. Border values are averaged with available data.
+#'
+#' @param M_ A real matrix of size LxD
+#' @param w_ The (odd) number of values to average
+#'
+#' @return The filtered matrix, of same size as the input
+#' @export
+filterMA = function(M_, w_)
+       .Call("filterMA", M_, w_, PACKAGE="epclust")
+
+#' cleanBin
+#'
+#' Remove binary files to re-generate them at next run of \code{claws()}.
+#' Note: run it in the folder where the computations occurred (or no effect).
+#'
+#' @export
+cleanBin <- function()
+{
+       bin_files = list.files(pattern = "*.epclust.bin", all.files=TRUE)
+       for (file in bin_files)
+               unlink(file)
+}
diff --git a/epclust/inst/CITATION b/epclust/inst/CITATION
deleted file mode 100644 (file)
index b6f2f6d..0000000
+++ /dev/null
@@ -1,18 +0,0 @@
-citHeader("To cite epclust in publications use:")
-
-citEntry(entry = "Manual",
-  title        = ".",
-  author       = personList(as.person("Benjamin Auder"),
-                   as.person("Jairo Cugliari"),
-                   as.person("Yannig Goude"),
-                   as.person("Jean-Michel Poggi")),
-  organization = "Paris-Sud, Saclay & Lyon 2",
-  address      = "Orsay, Saclay & Lyon, France",
-  year         = "2017",
-  url          = "https://git.auder.net/?p=edfclust.git",
-
-  textVersion  =
-  paste("Benjamin Auder, Jairo Cugliari, Yannig Goude, Jean-Michel Poggi (2017).",
-        "EPCLUST: Electric Power curves CLUSTering.",
-        "URL https://git.auder.net/?p=edfclust.git")
-)
diff --git a/epclust/src/computeMedoidsIndices.cpp b/epclust/src/computeMedoidsIndices.cpp
deleted file mode 100644 (file)
index 895031a..0000000
+++ /dev/null
@@ -1,57 +0,0 @@
-#include <Rcpp.h>
-
-// [[Rcpp::depends(BH, bigmemory)]]
-#include <bigmemory/MatrixAccessor.hpp>
-
-#include <numeric>
-#include <cfloat>
-
-using namespace Rcpp;
-
-//' computeMedoidsIndices
-//'
-//' For each column of the 'series' matrix input, search for the closest medoid
-//' (euclidian distance) and store its index
-//'
-//' @param pMedoids External pointer (a big.matrix 'address' slot in R)
-//' @param series (reference) series, a matrix of size Lxn
-//'
-//' @return An integer vector of the closest medoids indices, for each (column) serie
-// [[Rcpp::export]]
-IntegerVector computeMedoidsIndices(SEXP pMedoids, NumericMatrix series)
-{
-       // Turn SEXP external pointer into BigMatrix (description) object
-       XPtr<BigMatrix> pMed(pMedoids);
-       // medoids: access to the content of the BigMatrix object
-       MatrixAccessor<double> medoids = MatrixAccessor<double>(*pMed);
-
-       int nb_series = series.ncol(),
-               K = pMed->ncol(),
-               L = pMed->nrow();
-       IntegerVector mi(nb_series);
-
-       for (int i=0; i<nb_series ; i++) //column index in series
-       {
-               // In R: mi[i] <- which.min( rowSums( sweep(medoids, 2, series[i,], '-')^2 ) )
-               // In C(++), computations must be unrolled
-               mi[i] = 0;
-               double best_dist = DBL_MAX;
-               for (int j=0; j<K; j++) //column index in medoids
-               {
-                       double dist_ij = 0.;
-                       for (int k=0; k<L; k++) //common row index (medoids, series)
-                       {
-                               // Accessing values for a big matrix is a bit weird; see Rcpp gallery/bigmemory
-                               double delta = series(k,i) - *(medoids[j]+k);
-                               dist_ij += delta * delta;
-                       }
-                       if (dist_ij < best_dist)
-                       {
-                               mi[i] = j+1; //R indices start at 1
-                               best_dist = dist_ij;
-                       }
-               }
-       }
-
-       return mi;
-}
diff --git a/epclust/src/filter.c b/epclust/src/filter.c
new file mode 100644 (file)
index 0000000..97dbef2
--- /dev/null
@@ -0,0 +1,70 @@
+#include <R.h>
+#include <Rdefines.h>
+
+// filterMA
+//
+// Filter [time-]series by replacing all values by the moving average of values
+// centered around current one. Border values are averaged with available data.
+//
+// @param M_ A real matrix of size LxD
+// @param w_ The (odd) number of values to average
+//
+// @return The filtered matrix, of same size as the input
+SEXP filterMA(SEXP M_, SEXP w_)
+{
+       int L = INTEGER(getAttrib(cwt_, R_DimSymbol))[0],
+               D = INTEGER(getAttrib(cwt_, R_DimSymbol))[1],
+               w = INTEGER_VALUE(w_),
+               half_w = (w-1)/2,
+               i,
+               nt; //number of terms in the current sum (max: w)
+       double *cwt = REAL(cwt_),
+               cs, //current sum (max: w terms)
+               left; //leftward term in the current moving sum
+
+       SEXP fcwt_; //the filtered result
+       PROTECT(fcwt_ = allocMatrix(REALSXP, L, D));
+       double* fcwt = REAL(fcwt_); //pointer to the encapsulated vector
+
+       // NOTE: unused loop parameter: shifting at the end of the loop is more efficient
+       for (int col=D-1; col>=0; col--)
+       {
+               // Initialization
+               nt = half_w + 1;
+               left = cwt[0];
+               cs = 0.;
+               for (i=half_w; i>=0; i--)
+                       cs += cwt[i];
+
+               // Left border
+               for (i=0; i<half_w; i++)
+               {
+                       fcwt[i] = cs / nt; //(partial) moving average at current index i
+                       cs += cwt[i+half_w+1];
+                       nt++;
+               }
+
+               // Middle: now nt == w, i == half_w
+               for (; i<L-half_w-1; i++)
+               {
+                       fcwt[i] = cs / w; //moving average at current index i
+                       cs = ms - left + cwt[i+half_w+1]; //remove oldest items, add next
+                       left = cwt[i-half_w+1]; //update first value for next iteration
+               }
+
+               // (Last "fully averaged" index +) Right border
+               for (; i<L; i++)
+               {
+                       fcwt[i] = cs / nt; //(partial) moving average at current index i
+                       cs -= cwt[i-half_w];
+                       nt--;
+               }
+
+               // Shift by L == increment column index by 1 (storage per column)
+               cwt = cwt + L;
+               fcwt = fcwt + L;
+       }
+
+       UNPROTECT(1);
+       return fcwt_;
+}
diff --git a/epclust/src/filter.cpp b/epclust/src/filter.cpp
deleted file mode 100644 (file)
index 1750000..0000000
+++ /dev/null
@@ -1,50 +0,0 @@
-#include <Rcpp.h>
-
-using namespace Rcpp;
-
-//' filter
-//'
-//' Filter [time-]series by replacing all values by the moving average of previous, current and
-//' next value. Possible extensions: larger window, gaussian kernel... (but would be costly!).
-//' Note: border values are unchanged.
-//'
-//' @param cwt Continuous wavelets transform (in columns): a matrix of size LxD
-//'
-//' @return The filtered CWT, in a matrix of same size (LxD)
-// [[Rcpp::export]]
-RcppExport SEXP filterMA(SEXP cwt_)
-{
-       // NOTE: C-style for better efficiency (this must be as fast as possible)
-       int L = INTEGER(Rf_getAttrib(cwt_, R_DimSymbol))[0],
-               D = INTEGER(Rf_getAttrib(cwt_, R_DimSymbol))[1];
-       double *cwt = REAL(cwt_);
-
-       SEXP fcwt_; //the filtered result
-       PROTECT(fcwt_ = Rf_allocMatrix(REALSXP, L, D));
-       double* fcwt = REAL(fcwt_); //pointer to the encapsulated vector
-
-       // NOTE: unused loop parameter: shifting at the end of the loop is more efficient
-       for (int col=D-1; col>=0; col--)
-       {
-               double v1 = cwt[0]; //first value
-               double ms = v1 + cwt[1] + cwt[2]; //moving sum at second value
-               for (int i=1; i<L-2; i++)
-               {
-                       fcwt[i] = ms / 3.; //ms / 3: moving average at current index i
-                       ms = ms - v1 + cwt[i+2]; //update ms: remove oldest items, add next
-                       v1 = cwt[i]; //update first value for next iteration
-               }
-
-               // Fill a few border values not computed in the loop
-               fcwt[0] = cwt[0];
-               fcwt[L-2] = ms / 3.;
-               fcwt[L-1] = cwt[L-1];
-
-               // Shift by L == increment column index by 1 (storage per column)
-               cwt = cwt + L;
-               fcwt = fcwt + L;
-       }
-
-       UNPROTECT(1);
-       return fcwt_;
-}
diff --git a/epclust/tests/testthat/helper-common.R b/epclust/tests/testthat/helper-common.R
new file mode 100644 (file)
index 0000000..f442a44
--- /dev/null
@@ -0,0 +1,3 @@
+# Shorthand: map 1->1, 2->2, 3->3, 4->1, ..., 149->2, 150->3, ... (if base==3)
+I = function(i, base)
+       (i-1) %% base + 1
diff --git a/epclust/tests/testthat/helper.clustering.R b/epclust/tests/testthat/helper.clustering.R
deleted file mode 100644 (file)
index f39257e..0000000
+++ /dev/null
@@ -1,18 +0,0 @@
-# Shorthand: map 1->1, 2->2, 3->3, 4->1, ..., 149->2, 150->3, ... (is base==3)
-I = function(i, base)
-       (i-1) %% base + 1
-
-# Compute the sum of (normalized) sum of squares of closest distances to a medoid.
-# Note: medoids can be a big.matrix
-computeDistortion = function(series, medoids)
-{
-       if (bigmemory::is.big.matrix(medoids))
-               medoids = medoids[,] #extract standard matrix
-
-       n = ncol(series) ; L = nrow(series)
-       distortion = 0.
-       for (i in seq_len(n))
-               distortion = distortion + min( colSums( sweep(medoids,1,series[,i],'-')^2 ) / L )
-
-       sqrt( distortion / n )
-}
diff --git a/epclust/tests/testthat/helper.computeMedoidsIndices.R b/epclust/tests/testthat/helper.computeMedoidsIndices.R
deleted file mode 100644 (file)
index cd6a30e..0000000
+++ /dev/null
@@ -1,12 +0,0 @@
-# R-equivalent of computeMedoidsIndices, requiring a matrix
-# (thus potentially breaking "fit-in-memory" hope)
-R_computeMedoidsIndices <- function(medoids, series)
-{
-       nb_series = ncol(series) #series in columns
-
-       mi = rep(NA,nb_series)
-       for (i in 1:nb_series)
-               mi[i] <- which.min( colSums( sweep(medoids, 1, series[,i], '-')^2 ) )
-
-       mi
-}
diff --git a/epclust/tests/testthat/test-assignMedoids.R b/epclust/tests/testthat/test-assignMedoids.R
new file mode 100644 (file)
index 0000000..0192563
--- /dev/null
@@ -0,0 +1,37 @@
+context("assignMedoids")
+
+test_that("assignMedoids behave as expected",
+{
+       # Generate a gaussian mixture
+       n = 999
+       L = 7
+       medoids = cbind( rep(0,L), rep(-5,L), rep(5,L) )
+       # short series...
+       series = t( rbind( MASS::mvrnorm(n/3, medoids[,1], diag(L)),
+               MASS::mvrnorm(n/3, medoids[,2], diag(L)),
+               MASS::mvrnorm(n/3, medoids[,3], diag(L)) ) )
+
+       # With high probability, medoids indices should resemble 1,1,1,...,2,2,2,...,3,3,3,...
+       mi = epclust:::assignMedoids(medoids, series)
+       mi_ref = rep(1:3, each=n/3)
+       expect_lt( mean(mi != mi_ref), 0.01 )
+
+       # Now with a random matrix, compare with (~trusted) R version
+       series = matrix(runif(n*L, min=-7, max=7), nrow=L)
+       mi = epclust:::assignMedoids(medoids, series)
+       mi_ref = R_assignMedoids(medoids, series)
+       expect_equal(mi, mi_ref)
+})
+
+# R-equivalent of , requiring a matrix
+# (thus potentially breaking "fit-in-memory" hope)
+R_assignMedoids <- function(medoids, series)
+{
+       nb_series = ncol(series) #series in columns
+
+       mi = rep(NA,nb_series)
+       for (i in 1:nb_series)
+               mi[i] <- which.min( colSums( sweep(medoids, 1, series[,i], '-')^2 ) )
+
+       mi
+}
similarity index 55%
rename from epclust/tests/testthat/test.clustering.R
rename to epclust/tests/testthat/test-clustering.R
index 2f24d08..fa22dff 100644 (file)
@@ -1,68 +1,5 @@
 context("clustering")
 
-test_that("computeSynchrones behave as expected",
-{
-       # Generate 300 sinusoïdal series of 3 kinds: all series of indices == 0 mod 3 are the same
-       # (plus noise), all series of indices == 1 mod 3 are the same (plus noise) ...
-       n = 300
-       x = seq(0,9.5,0.1)
-       L = length(x) #96 1/4h
-       K = 3
-       s1 = cos(x)
-       s2 = sin(x)
-       s3 = c( s1[1:(L%/%2)] , s2[(L%/%2+1):L] )
-       #sum((s1-s2)^2) == 96
-       #sum((s1-s3)^2) == 58
-       #sum((s2-s3)^2) == 38
-       s = list(s1, s2, s3)
-       series = matrix(nrow=L, ncol=n)
-       for (i in seq_len(n))
-               series[,i] = s[[I(i,K)]] + rnorm(L,sd=0.01)
-
-       getRefSeries = function(indices) {
-               indices = indices[indices <= n]
-               if (length(indices)>0) as.matrix(series[,indices]) else NULL
-       }
-
-       synchrones = computeSynchrones(bigmemory::as.big.matrix(cbind(s1,s2,s3)), getRefSeries,
-               n, 100, verbose=TRUE, parll=FALSE)
-
-       expect_equal(dim(synchrones), c(L,K))
-       for (i in 1:K)
-       {
-               # Synchrones are (for each medoid) sums of closest curves.
-               # Here, we expect exactly 100 curves of each kind to be assigned respectively to
-               # synchrone 1, 2 and 3 => division by 100 should be very close to the ref curve
-               expect_equal(synchrones[,i]/100, s[[i]], tolerance=0.01)
-       }
-})
-
-test_that("Helper function to spread indices work properly",
-{
-       indices <- 1:400
-
-       # bigger nb_per_set than length(indices)
-       expect_equal(epclust:::.spreadIndices(indices,500), list(indices))
-
-       # nb_per_set == length(indices)
-       expect_equal(epclust:::.spreadIndices(indices,400), list(indices))
-
-       # length(indices) %% nb_per_set == 0
-       expect_equal(epclust:::.spreadIndices(indices,200),
-               c( list(indices[1:200]), list(indices[201:400]) ))
-       expect_equal(epclust:::.spreadIndices(indices,100),
-               c( list(indices[1:100]), list(indices[101:200]),
-                       list(indices[201:300]), list(indices[301:400]) ))
-
-       # length(indices) / nb_per_set == 1, length(indices) %% nb_per_set == 100
-       expect_equal(epclust:::.spreadIndices(indices,300), list(indices))
-       # length(indices) / nb_per_set == 2, length(indices) %% nb_per_set == 42
-       repartition <- epclust:::.spreadIndices(indices,179)
-       expect_equal(length(repartition), 2)
-       expect_equal(length(repartition[[1]]), 179 + 21)
-       expect_equal(length(repartition[[1]]), 179 + 21)
-})
-
 test_that("clusteringTask1 behave as expected",
 {
        # Generate 60 reference sinusoïdal series (medoids to be found),
@@ -83,7 +20,7 @@ test_that("clusteringTask1 behave as expected",
 
        wf = "haar"
        ctype = "absolute"
-       getContribs = function(indices) curvesToContribs(series[,indices],wf,ctype)
+       getContribs = function(indices) curvesToContribs(as.matrix(series[,indices]),wf,ctype)
 
        require("cluster", quietly=TRUE)
        algoClust1 = function(contribs,K) cluster::pam(t(contribs),K,diss=FALSE)$id.med
@@ -135,3 +72,18 @@ test_that("clusteringTask2 behave as expected",
        for (i in 1:3)
                expect_lte( distor_good, computeDistortion(synchrones, synchrones[,sample(1:K1,3)]) )
 })
+
+# Compute the sum of (normalized) sum of squares of closest distances to a medoid.
+# Note: medoids can be a big.matrix
+computeDistortion = function(series, medoids)
+{
+       if (bigmemory::is.big.matrix(medoids))
+               medoids = medoids[,] #extract standard matrix
+
+       n = ncol(series) ; L = nrow(series)
+       distortion = 0.
+       for (i in seq_len(n))
+               distortion = distortion + min( colSums( sweep(medoids,1,series[,i],'-')^2 ) / L )
+
+       sqrt( distortion / n )
+}
diff --git a/epclust/tests/testthat/test-computeSynchrones.R b/epclust/tests/testthat/test-computeSynchrones.R
new file mode 100644 (file)
index 0000000..db139ed
--- /dev/null
@@ -0,0 +1,38 @@
+context("computeSynchrones")
+
+test_that("computeSynchrones behave as expected",
+{
+       # Generate 300 sinusoïdal series of 3 kinds: all series of indices == 0 mod 3 are the same
+       # (plus noise), all series of indices == 1 mod 3 are the same (plus noise) ...
+       n = 300
+       x = seq(0,9.5,0.1)
+       L = length(x) #96 1/4h
+       K = 3
+       s1 = cos(x)
+       s2 = sin(x)
+       s3 = c( s1[1:(L%/%2)] , s2[(L%/%2+1):L] )
+       #sum((s1-s2)^2) == 96
+       #sum((s1-s3)^2) == 58
+       #sum((s2-s3)^2) == 38
+       s = list(s1, s2, s3)
+       series = matrix(nrow=L, ncol=n)
+       for (i in seq_len(n))
+               series[,i] = s[[I(i,K)]] + rnorm(L,sd=0.01)
+
+       getRefSeries = function(indices) {
+               indices = indices[indices <= n]
+               if (length(indices)>0) as.matrix(series[,indices]) else NULL
+       }
+
+       synchrones = computeSynchrones(bigmemory::as.big.matrix(cbind(s1,s2,s3)), getRefSeries,
+               n, 100, verbose=TRUE, parll=FALSE)
+
+       expect_equal(dim(synchrones), c(L,K))
+       for (i in 1:K)
+       {
+               # Synchrones are (for each medoid) sums of closest curves.
+               # Here, we expect exactly 100 curves of each kind to be assigned respectively to
+               # synchrone 1, 2 and 3 => division by 100 should be very close to the ref curve
+               expect_equal(synchrones[,i]/100, s[[i]], tolerance=0.01)
+       }
+})
similarity index 74%
rename from epclust/tests/testthat/test.wavelets.R
rename to epclust/tests/testthat/test-computeWerDists.R
index f29312d..d83c590 100644 (file)
@@ -1,9 +1,4 @@
-context("wavelets discrete & continuous")
-
-test_that("curvesToContribs behave as expected",
-{
-#      curvesToContribs(...)
-})
+context("computeWerDists")
 
 test_that("computeWerDists output correct results",
 {
@@ -19,3 +14,4 @@ test_that("computeWerDists output correct results",
        # On two constant series
        # TODO:...
 })
+
diff --git a/epclust/tests/testthat/test-utils.R b/epclust/tests/testthat/test-utils.R
new file mode 100644 (file)
index 0000000..4448df0
--- /dev/null
@@ -0,0 +1,32 @@
+context("utils functions")
+
+test_that("Helper function to split indices work properly",
+{
+       indices <- 1:400
+
+       # bigger nb_per_set than length(indices)
+       expect_equal(epclust:::.splitIndices(indices,500), list(indices))
+
+       # nb_per_set == length(indices)
+       expect_equal(epclust:::.splitIndices(indices,400), list(indices))
+
+       # length(indices) %% nb_per_set == 0
+       expect_equal(epclust:::.splitIndices(indices,200),
+               c( list(indices[1:200]), list(indices[201:400]) ))
+       expect_equal(epclust:::.splitIndices(indices,100),
+               c( list(indices[1:100]), list(indices[101:200]),
+                       list(indices[201:300]), list(indices[301:400]) ))
+
+       # length(indices) / nb_per_set == 1, length(indices) %% nb_per_set == 100
+       expect_equal(epclust:::.splitIndices(indices,300), list(indices))
+       # length(indices) / nb_per_set == 2, length(indices) %% nb_per_set == 42
+       repartition <- epclust:::.splitIndices(indices,179)
+       expect_equal(length(repartition), 2)
+       expect_equal(length(repartition[[1]]), 179 + 21)
+       expect_equal(length(repartition[[1]]), 179 + 21)
+})
+
+test_that("curvesToContribs output correct results",
+{
+#      curvesToContribs(...)
+})
diff --git a/epclust/tests/testthat/test.computeMedoidsIndices.R b/epclust/tests/testthat/test.computeMedoidsIndices.R
deleted file mode 100644 (file)
index 8347fb6..0000000
+++ /dev/null
@@ -1,25 +0,0 @@
-context("computeMedoidsIndices")
-
-test_that("computeMedoidsIndices behave as expected",
-{
-       # Generate a gaussian mixture
-       n = 999
-       L = 7
-       medoids = cbind( rep(0,L), rep(-5,L), rep(5,L) )
-       # short series...
-       series = t( rbind( MASS::mvrnorm(n/3, medoids[,1], diag(L)),
-               MASS::mvrnorm(n/3, medoids[,2], diag(L)),
-               MASS::mvrnorm(n/3, medoids[,3], diag(L)) ) )
-
-       # With high probability, medoids indices should resemble 1,1,1,...,2,2,2,...,3,3,3,...
-       require("bigmemory", quietly=TRUE)
-       mi = epclust:::computeMedoidsIndices(bigmemory::as.big.matrix(medoids)@address, series)
-       mi_ref = rep(1:3, each=n/3)
-       expect_lt( mean(mi != mi_ref), 0.01 )
-
-       # Now with a random matrix, compare with (trusted) R version
-       series = matrix(runif(n*L, min=-7, max=7), nrow=L)
-       mi = epclust:::computeMedoidsIndices(bigmemory::as.big.matrix(medoids)@address, series)
-       mi_ref = R_computeMedoidsIndices(medoids, series)
-       expect_equal(mi, mi_ref)
-})