MASS,
clue,
wmtsa,
- DBI
+ DBI,
+ digest
License: MIT + file LICENSE
RoxygenNote: 6.0.1
Collate:
#'
#' @useDynLib epclust
#'
-#' @importFrom Rcpp evalCpp sourceCpp
#' @importFrom Rwave cwt
#' @importFrom cluster pam
#' @importFrom parallel makeCluster clusterExport parLapply stopCluster
#' @title Two-stage clustering, withing one task (see \code{claws()})
#'
#' @description \code{clusteringTask1()} runs one full stage-1 task, which consists in
-#' iterated stage 1 clustering (on nb_curves / ntasks energy contributions, computed
-#' through discrete wavelets coefficients).
-#' \code{clusteringTask2()} runs a full stage-2 task, which consists in synchrones
-#' and then WER distances computations, before applying the clustering algorithm.
-#' \code{computeClusters1()} and \code{computeClusters2()} correspond to the atomic
-#' clustering procedures respectively for stage 1 and 2. The former applies the
-#' first clustering algorithm on a contributions matrix, while the latter clusters
-#' a set of series inside one task (~nb_items_clust1)
+#' iterated stage 1 clustering on nb_curves / ntasks energy contributions, computed
+#' through discrete wavelets coefficients.
+#' \code{clusteringTask2()} runs a full stage-2 task, which consists in
+#' WER distances computations between medoids indices output from stage 1,
+#' before applying the second clustering algorithm, on the distances matrix.
#'
-#' @param indices Range of series indices to cluster in parallel (initial data)
+#' @param indices Range of series indices to cluster
#' @param getContribs Function to retrieve contributions from initial series indices:
-#' \code{getContribs(indices)} outpus a contributions matrix
-#' @inheritParams computeSynchrones
+#' \code{getContribs(indices)} outputs a contributions matrix
#' @inheritParams claws
+#' @inheritParams computeSynchrones
#'
#' @return For \code{clusteringTask1()}, the indices of the computed (K1) medoids.
#' Indices are irrelevant for stage 2 clustering, thus \code{clusteringTask2()}
#' @rdname clustering
#' @export
-clusteringTask1 = function(indices, getContribs, K1, algoClust1, nb_items_clust1,
+clusteringTask1 = function(indices, getContribs, K1, algoClust1, nb_series_per_chunk,
ncores_clust=1, verbose=FALSE, parll=TRUE)
{
if (parll)
while (length(indices) > K1)
{
# Balance tasks by splitting the indices set - as evenly as possible
- indices_workers = .spreadIndices(indices, nb_items_clust1)
+ indices_workers = .splitIndices(indices, nb_items_clust1)
if (verbose)
cat(paste("*** [iterated] Clustering task 1 on ",length(indices)," series\n", sep=""))
indices <-
#' @rdname clustering
#' @export
-clusteringTask2 = function(medoids, K2, algoClust2, getRefSeries, nb_ref_curves,
- nb_series_per_chunk, nvoice, nbytes,endian,ncores_clust=1,verbose=FALSE,parll=TRUE)
+clusteringTask2 = function(indices, getSeries, K2, algoClust2, nb_series_per_chunk,
+ nvoice, nbytes, endian, ncores_clust=1, verbose=FALSE, parll=TRUE)
{
if (verbose)
cat(paste("*** Clustering task 2 on ",ncol(medoids)," synchrones\n", sep=""))
cat(paste("*** algoClust2() on ",nrow(distances)," items\n", sep=""))
medoids[ ,algoClust2(distances,K2) ]
}
-
-#' computeSynchrones
-#'
-#' Compute the synchrones curves (sum of clusters elements) from a matrix of medoids,
-#' using euclidian distance.
-#'
-#' @param medoids big.matrix of medoids (curves of same length as initial series)
-#' @param getRefSeries Function to retrieve initial series (e.g. in stage 2 after series
-#' have been replaced by stage-1 medoids)
-#' @param nb_ref_curves How many reference series? (This number is known at this stage)
-#' @inheritParams claws
-#'
-#' @return A big.matrix of size L x K1 where L = length of a serie
-#'
-#' @export
-computeSynchrones = function(medoids, getRefSeries, nb_ref_curves,
- nb_series_per_chunk, ncores_clust=1,verbose=FALSE,parll=TRUE)
-{
- # Synchrones computation is embarassingly parallel: compute it by chunks of series
- computeSynchronesChunk = function(indices)
- {
- if (parll)
- {
- require("bigmemory", quietly=TRUE)
- requireNamespace("synchronicity", quietly=TRUE)
- require("epclust", quietly=TRUE)
- # The big.matrix objects need to be attached to be usable on the workers
- synchrones <- bigmemory::attach.big.matrix(synchrones_desc)
- medoids <- bigmemory::attach.big.matrix(medoids_desc)
- m <- synchronicity::attach.mutex(m_desc)
- }
-
- # Obtain a chunk of reference series
- ref_series = getRefSeries(indices)
- nb_series = ncol(ref_series)
-
- # Get medoids indices for this chunk of series
- mi = computeMedoidsIndices(medoids@address, ref_series)
-
- # Update synchrones using mi above
- for (i in seq_len(nb_series))
- {
- if (parll)
- synchronicity::lock(m) #locking required because several writes at the same time
- synchrones[, mi[i] ] = synchrones[, mi[i] ] + ref_series[,i]
- if (parll)
- synchronicity::unlock(m)
- }
- NULL
- }
-
- K = ncol(medoids) ; L = nrow(medoids)
- # Use bigmemory (shared==TRUE by default) + synchronicity to fill synchrones in //
- synchrones = bigmemory::big.matrix(nrow=L, ncol=K, type="double", init=0.)
- # NOTE: synchronicity is only for Linux & MacOS; on Windows: run sequentially
- parll = (parll && requireNamespace("synchronicity",quietly=TRUE)
- && Sys.info()['sysname'] != "Windows")
- if (parll)
- {
- m <- synchronicity::boost.mutex() #for lock/unlock, see computeSynchronesChunk
- # mutex and big.matrix objects cannot be passed directly:
- # they will be accessed from their description
- m_desc <- synchronicity::describe(m)
- synchrones_desc = bigmemory::describe(synchrones)
- medoids_desc = bigmemory::describe(medoids)
- cl = parallel::makeCluster(ncores_clust)
- parallel::clusterExport(cl, envir=environment(),
- varlist=c("synchrones_desc","m_desc","medoids_desc","getRefSeries"))
- }
-
- if (verbose)
- cat(paste("--- Compute ",K," synchrones with ",nb_ref_curves," series\n", sep=""))
-
- # Balance tasks by splitting the indices set - maybe not so evenly, but
- # max==TRUE in next call ensures that no set has more than nb_series_per_chunk items.
- indices_workers = .spreadIndices(seq_len(nb_ref_curves), nb_series_per_chunk, max=TRUE)
- ignored <-
- if (parll)
- parallel::parLapply(cl, indices_workers, computeSynchronesChunk)
- else
- lapply(indices_workers, computeSynchronesChunk)
-
- if (parll)
- parallel::stopCluster(cl)
-
- return (synchrones)
-}
-
-#' computeWerDists
-#'
-#' Compute the WER distances between the synchrones curves (in columns), which are
-#' returned (e.g.) by \code{computeSynchrones()}
-#'
-#' @param synchrones A big.matrix of synchrones, in columns. The series have same
-#' length as the series in the initial dataset
-#' @inheritParams claws
-#'
-#' @return A distances matrix of size K1 x K1
-#'
-#' @export
-computeWerDists = function(synchrones, nvoice, nbytes,endian,ncores_clust=1,
- verbose=FALSE,parll=TRUE)
-{
- n <- ncol(synchrones)
- L <- nrow(synchrones)
- noctave = ceiling(log2(L)) #min power of 2 to cover serie range
-
- # Initialize result as a square big.matrix of size 'number of synchrones'
- Xwer_dist <- bigmemory::big.matrix(nrow=n, ncol=n, type="double")
-
- # Generate n(n-1)/2 pairs for WER distances computations
- pairs = list()
- V = seq_len(n)
- for (i in 1:n)
- {
- V = V[-1]
- pairs = c(pairs, lapply(V, function(v) c(i,v)))
- }
-
- cwt_file = ".cwt.bin"
- # Compute the synchrones[,index] CWT, and store it in the binary file above
- computeSaveCWT = function(index)
- {
- if (parll && !exists(synchrones)) #avoid going here after first call on a worker
- {
- require("bigmemory", quietly=TRUE)
- require("Rwave", quietly=TRUE)
- require("epclust", quietly=TRUE)
- synchrones <- bigmemory::attach.big.matrix(synchrones_desc)
- }
- ts <- scale(ts(synchrones[,index]), center=TRUE, scale=FALSE)
- ts_cwt = Rwave::cwt(ts, noctave, nvoice, w0=2*pi, twoD=TRUE, plot=FALSE)
-
- # Serialization
- binarize(as.matrix(c(as.double(Re(ts_cwt)),as.double(Im(ts_cwt)))), cwt_file, 1,
- ",", nbytes, endian)
- }
-
- if (parll)
- {
- cl = parallel::makeCluster(ncores_clust)
- synchrones_desc <- bigmemory::describe(synchrones)
- Xwer_dist_desc <- bigmemory::describe(Xwer_dist)
- parallel::clusterExport(cl, varlist=c("parll","synchrones_desc","Xwer_dist_desc",
- "noctave","nvoice","verbose","getCWT"), envir=environment())
- }
-
- if (verbose)
- cat(paste("--- Precompute and serialize synchrones CWT\n", sep=""))
-
- ignored <-
- if (parll)
- parallel::parLapply(cl, 1:n, computeSaveCWT)
- else
- lapply(1:n, computeSaveCWT)
-
- # Function to retrieve a synchrone CWT from (binary) file
- getSynchroneCWT = function(index, L)
- {
- flat_cwt <- getDataInFile(index, cwt_file, nbytes, endian)
- cwt_length = length(flat_cwt) / 2
- re_part = as.matrix(flat_cwt[1:cwt_length], nrow=L)
- im_part = as.matrix(flat_cwt[(cwt_length+1):(2*cwt_length)], nrow=L)
- re_part + 1i * im_part
- }
-
- # Compute distance between columns i and j in synchrones
- computeDistanceIJ = function(pair)
- {
- if (parll)
- {
- # parallel workers start with an empty environment
- require("bigmemory", quietly=TRUE)
- require("epclust", quietly=TRUE)
- synchrones <- bigmemory::attach.big.matrix(synchrones_desc)
- Xwer_dist <- bigmemory::attach.big.matrix(Xwer_dist_desc)
- }
-
- i = pair[1] ; j = pair[2]
- if (verbose && j==i+1 && !parll)
- cat(paste(" Distances (",i,",",j,"), (",i,",",j+1,") ...\n", sep=""))
-
- # Compute CWT of columns i and j in synchrones
- L = nrow(synchrones)
- cwt_i <- getSynchroneCWT(i, L)
- cwt_j <- getSynchroneCWT(j, L)
-
- # Compute the ratio of integrals formula 5.6 for WER^2
- # in https://arxiv.org/abs/1101.4744v2 §5.3
- num <- filterMA(Mod(cwt_i * Conj(cwt_j)))
- WX <- filterMA(Mod(cwt_i * Conj(cwt_i)))
- WY <- filterMA(Mod(cwt_j * Conj(cwt_j)))
- wer2 <- sum(colSums(num)^2) / sum(colSums(WX) * colSums(WY))
-
- Xwer_dist[i,j] <- sqrt(L * ncol(cwt_i) * (1 - wer2))
- Xwer_dist[j,i] <- Xwer_dist[i,j]
- Xwer_dist[i,i] <- 0.
- }
-
- if (verbose)
- cat(paste("--- Compute WER distances\n", sep=""))
-
- ignored <-
- if (parll)
- parallel::parLapply(cl, pairs, computeDistanceIJ)
- else
- lapply(pairs, computeDistanceIJ)
-
- if (parll)
- parallel::stopCluster(cl)
-
- unlink(cwt_file)
-
- Xwer_dist[n,n] = 0.
- Xwer_dist[,] #~small matrix K1 x K1
-}
-
-# Helper function to divide indices into balanced sets
-# If max == TRUE, sets sizes cannot exceed nb_per_set
-.spreadIndices = function(indices, nb_per_set, max=FALSE)
-{
- L = length(indices)
- nb_workers = floor( L / nb_per_set )
- rem = L %% nb_per_set
- if (nb_workers == 0 || (nb_workers==1 && rem==0))
- {
- # L <= nb_per_set, simple case
- indices_workers = list(indices)
- }
- else
- {
- indices_workers = lapply( seq_len(nb_workers), function(i)
- indices[(nb_per_set*(i-1)+1):(nb_per_set*i)] )
-
- if (max)
- {
- # Sets are not so well balanced, but size is supposed to be critical
- return ( c( indices_workers, if (rem>0) list((L-rem+1):L) else NULL ) )
- }
-
- # Spread the remaining load among the workers
- rem = L %% nb_per_set
- while (rem > 0)
- {
- index = rem%%nb_workers + 1
- indices_workers[[index]] = c(indices_workers[[index]], indices[L-rem+1])
- rem = rem - 1
- }
- }
- indices_workers
-}
--- /dev/null
+#' computeSynchrones
+#'
+#' Compute the synchrones curves (sum of clusters elements) from a matrix of medoids,
+#' using euclidian distance.
+#'
+#' @param medoids matrix of medoids in columns (curves of same length as the series)
+#' @param getSeries Function to retrieve series (argument: 'indices', integer vector)
+#' @param nb_curves How many series? (this is known, at this stage)
+#' @inheritParams claws
+#'
+#' @return A matrix of K synchrones in columns (same length as the series)
+#'
+#' @export
+computeSynchrones = function(medoids, getSeries, nb_curves,
+ nb_series_per_chunk, ncores_clust=1,verbose=FALSE,parll=TRUE)
+{
+ # Synchrones computation is embarassingly parallel: compute it by chunks of series
+ computeSynchronesChunk = function(indices)
+ {
+ if (parll)
+ {
+ require("bigmemory", quietly=TRUE)
+ requireNamespace("synchronicity", quietly=TRUE)
+ require("epclust", quietly=TRUE)
+ # The big.matrix objects need to be attached to be usable on the workers
+ synchrones <- bigmemory::attach.big.matrix(synchrones_desc)
+ medoids <- bigmemory::attach.big.matrix(medoids_desc)
+ m <- synchronicity::attach.mutex(m_desc)
+ }
+
+ # Obtain a chunk of reference series
+ series_chunk = getSeries(indices)
+ nb_series_chunk = ncol(series_chunk)
+
+ # Get medoids indices for this chunk of series
+ for (i in seq_len(nb_series_chunk))
+ mi[i] <- which.min( colSums( sweep(medoids, 1, series_chunk[,i], '-')^2 ) )
+
+ # Update synchrones using mi above, grouping it by values of mi (in 1...K)
+ # to avoid too many lock/unlock
+ for (i in seq_len(K))
+ {
+ # lock / unlock required because several writes at the same time
+ if (parll)
+ synchronicity::lock(m)
+ synchrones[,i] = synchrones[,i] + rowSums(series_chunk[,mi==i])
+ if (parll)
+ synchronicity::unlock(m)
+ }
+ NULL
+ }
+
+ K = ncol(medoids)
+ L = nrow(medoids)
+ # Use bigmemory (shared==TRUE by default) + synchronicity to fill synchrones in //
+ synchrones = bigmemory::big.matrix(nrow=L, ncol=K, type="double", init=0.)
+ # NOTE: synchronicity is only for Linux & MacOS; on Windows: run sequentially
+ parll = (parll && requireNamespace("synchronicity",quietly=TRUE)
+ && Sys.info()['sysname'] != "Windows")
+ if (parll)
+ {
+ m <- synchronicity::boost.mutex() #for lock/unlock, see computeSynchronesChunk
+ # mutex and big.matrix objects cannot be passed directly:
+ # they will be accessed from their description
+ m_desc <- synchronicity::describe(m)
+ synchrones_desc = bigmemory::describe(synchrones)
+ medoids <- bigmemory::as.big.matrix(medoids)
+ medoids_desc <- bigmemory::describe(medoids)
+ cl = parallel::makeCluster(ncores_clust)
+ parallel::clusterExport(cl, envir=environment(),
+ varlist=c("synchrones_desc","m_desc","medoids_desc","getRefSeries"))
+ }
+
+ if (verbose)
+ cat(paste("--- Compute ",K," synchrones with ",nb_curves," series\n", sep=""))
+
+ # Balance tasks by splitting 1:nb_ref_curves into groups of size <= nb_series_per_chunk
+ indices_workers = .splitIndices(seq_len(nb_ref_curves), nb_series_per_chunk)
+ ignored <-
+ if (parll)
+ parallel::parLapply(cl, indices_workers, computeSynchronesChunk)
+ else
+ lapply(indices_workers, computeSynchronesChunk)
+
+ if (parll)
+ parallel::stopCluster(cl)
+
+ return (synchrones[,])
+}
--- /dev/null
+#' computeWerDists
+#'
+#' Compute the WER distances between the synchrones curves (in columns), which are
+#' returned (e.g.) by \code{computeSynchrones()}
+#'
+#' @param synchrones A big.matrix of synchrones, in columns. The series have same
+#' length as the series in the initial dataset
+#' @inheritParams claws
+#'
+#' @return A distances matrix of size K1 x K1
+#'
+#' @export
+computeWerDists = function(synchrones, nvoice, nbytes,endian,ncores_clust=1,
+ verbose=FALSE,parll=TRUE)
+{
+ n <- ncol(synchrones)
+ L <- nrow(synchrones)
+ noctave = ceiling(log2(L)) #min power of 2 to cover serie range
+
+ # Initialize result as a square big.matrix of size 'number of synchrones'
+ Xwer_dist <- bigmemory::big.matrix(nrow=n, ncol=n, type="double")
+
+ # Generate n(n-1)/2 pairs for WER distances computations
+ pairs = list()
+ V = seq_len(n)
+ for (i in 1:n)
+ {
+ V = V[-1]
+ pairs = c(pairs, lapply(V, function(v) c(i,v)))
+ }
+
+ cwt_file = ".cwt.bin"
+ # Compute the synchrones[,indices] CWT, and store the results in the binary file
+ computeSaveCWT = function(indices)
+ {
+ if (parll)
+ {
+ require("bigmemory", quietly=TRUE)
+ require("Rwave", quietly=TRUE)
+ require("epclust", quietly=TRUE)
+ synchrones <- bigmemory::attach.big.matrix(synchrones_desc)
+ }
+
+ # Obtain CWT as big vectors of real part + imaginary part (concatenate)
+ ts_cwt <- sapply(indices, function(i) {
+ ts <- scale(ts(synchrones[,i]), center=TRUE, scale=FALSE)
+ ts_cwt <- Rwave::cwt(ts, noctave, nvoice, w0=2*pi, twoD=TRUE, plot=FALSE)
+ c( as.double(Re(ts_cwt)),as.double(Im(ts_cwt)) )
+ })
+
+ # Serialization
+ binarize(ts_cwt, cwt_file, 1, ",", nbytes, endian)
+ }
+
+ if (parll)
+ {
+ cl = parallel::makeCluster(ncores_clust)
+ synchrones_desc <- bigmemory::describe(synchrones)
+ Xwer_dist_desc <- bigmemory::describe(Xwer_dist)
+ parallel::clusterExport(cl, varlist=c("parll","synchrones_desc","Xwer_dist_desc",
+ "noctave","nvoice","verbose","getCWT"), envir=environment())
+ }
+
+ if (verbose)
+ cat(paste("--- Precompute and serialize synchrones CWT\n", sep=""))
+
+ ignored <-
+ if (parll)
+ parallel::parLapply(cl, 1:n, computeSaveCWT)
+ else
+ lapply(1:n, computeSaveCWT)
+
+ # Function to retrieve a synchrone CWT from (binary) file
+ getSynchroneCWT = function(index, L)
+ {
+ flat_cwt <- getDataInFile(index, cwt_file, nbytes, endian)
+ cwt_length = length(flat_cwt) / 2
+ re_part = as.matrix(flat_cwt[1:cwt_length], nrow=L)
+ im_part = as.matrix(flat_cwt[(cwt_length+1):(2*cwt_length)], nrow=L)
+ re_part + 1i * im_part
+ }
+
+
+
+
+#TODO: better repartition here,
+ #better code in .splitIndices :: never exceed nb_per_chunk; arg: min_per_chunk (default: 1)
+###TODO: reintroduire nb_items_clust ======> l'autre est typiquement + grand !!! (pas de relation !)
+
+
+
+ # Compute distance between columns i and j in synchrones
+ computeDistanceIJ = function(pair)
+ {
+ if (parll)
+ {
+ # parallel workers start with an empty environment
+ require("bigmemory", quietly=TRUE)
+ require("epclust", quietly=TRUE)
+ synchrones <- bigmemory::attach.big.matrix(synchrones_desc)
+ Xwer_dist <- bigmemory::attach.big.matrix(Xwer_dist_desc)
+ }
+
+ i = pair[1] ; j = pair[2]
+ if (verbose && j==i+1 && !parll)
+ cat(paste(" Distances (",i,",",j,"), (",i,",",j+1,") ...\n", sep=""))
+
+ # Compute CWT of columns i and j in synchrones
+ L = nrow(synchrones)
+ cwt_i <- getSynchroneCWT(i, L)
+ cwt_j <- getSynchroneCWT(j, L)
+
+ # Compute the ratio of integrals formula 5.6 for WER^2
+ # in https://arxiv.org/abs/1101.4744v2 §5.3
+ num <- filterMA(Mod(cwt_i * Conj(cwt_j)))
+ WX <- filterMA(Mod(cwt_i * Conj(cwt_i)))
+ WY <- filterMA(Mod(cwt_j * Conj(cwt_j)))
+ wer2 <- sum(colSums(num)^2) / sum(colSums(WX) * colSums(WY))
+
+ Xwer_dist[i,j] <- sqrt(L * ncol(cwt_i) * (1 - wer2))
+ Xwer_dist[j,i] <- Xwer_dist[i,j]
+ Xwer_dist[i,i] <- 0.
+ }
+
+ if (verbose)
+ cat(paste("--- Compute WER distances\n", sep=""))
+
+ ignored <-
+ if (parll)
+ parallel::parLapply(cl, pairs, computeDistanceIJ)
+ else
+ lapply(pairs, computeDistanceIJ)
+
+ if (parll)
+ parallel::stopCluster(cl)
+
+ unlink(cwt_file)
+
+ Xwer_dist[n,n] = 0.
+ Xwer_dist[,] #~small matrix K1 x K1
+}
#' \item Divide series into \code{ntasks} groups to process in parallel. In each task:
#' \enumerate{
#' \item iterate the first clustering algorithm on its aggregated outputs,
-#' on inputs of size \code{nb_items_clust1}
+#' on inputs of size \code{nb_series_per_chunk}
#' \item optionally, if WER=="mix":
#' a) compute the K1 synchrones curves,
-#' b) compute WER distances (K1xK1 matrix) between synchrones and
-#' c) apply the second clustering algorithm
+#' a) compute WER distances (K1xK1 matrix) between medoids and
+#' b) apply the second clustering algorithm (output: K2 indices)
#' }
#' \item Launch a final task on the aggregated outputs of all previous tasks:
-#' in the case WER=="end" this task takes indices in input, otherwise
-#' (medoid) curves
+#' ntasks*K1 if WER=="end", ntasks*K2 otherwise
+#' \item Compute synchrones (sum of series within each final group)
#' }
#' \cr
-#' The main argument -- \code{getSeries} -- has a quite misleading name, since it can be
-#' either a [big.]matrix, a CSV file, a connection or a user function to retrieve
-#' series; the name was chosen because all types of arguments are converted to a function.
-#' When \code{getSeries} is given as a function, it must take a single argument,
+#' The main argument -- \code{series} -- has a quite misleading name, since it can be
+#' either a [big.]matrix, a CSV file, a connection or a user function to retrieve series.
+#' When \code{series} is given as a function, it must take a single argument,
#' 'indices', integer vector equal to the indices of the curves to retrieve;
-#' see SQLite example. The nature and role of other arguments should be clear.
+#' see SQLite example.
#' WARNING: the return value must be a matrix (in columns), or NULL if no matches.
#' \cr
#' Note: Since we don't make assumptions on initial data, there is a possibility that
-#' even when serialized, contributions or synchrones do not fit in RAM. For example,
+#' even when serialized, contributions do not fit in RAM. For example,
#' 30e6 series of length 100,000 would lead to a +4Go contribution matrix. Therefore,
#' it's safer to place these in (binary) files; that's what we do.
#'
-#' @param getSeries Access to the (time-)series, which can be of one of the three
+#' @param series Access to the (time-)series, which can be of one of the three
#' following types:
#' \itemize{
#' \item [big.]matrix: each column contains the (time-ordered) values of one time-serie
#' }
#' @param K1 Number of clusters to be found after stage 1 (K1 << N [number of series])
#' @param K2 Number of clusters to be found after stage 2 (K2 << K1)
-#' @param nb_series_per_chunk (Maximum) number of series to retrieve in one batch
+#' @param nb_series_per_chunk (Maximum) number of series to retrieve in one batch;
+#' this value is also used for the (maximum) number of series to cluster at a time
#' @param algoClust1 Clustering algorithm for stage 1. A function which takes (data, K)
#' as argument where data is a matrix in columns and K the desired number of clusters,
#' and outputs K medoids ranks. Default: PAM. In our method, this function is called
#' @param algoClust2 Clustering algorithm for stage 2. A function which takes (dists, K)
#' as argument where dists is a matrix of distances and K the desired number of clusters,
#' and outputs K medoids ranks. Default: PAM. In our method, this function is called
-#' on a matrix of K1 x K1 (WER) distances computed between synchrones
-#' @param nb_items_clust1 (~Maximum) number of items in input of the clustering algorithm
-#' for stage 1. At worst, a clustering algorithm might be called with ~2*nb_items_clust1
-#' items; but this could only happen at the last few iterations.
+#' on a matrix of K1 x K1 (WER) distances computed between medoids after algorithm 1
#' @param wav_filt Wavelet transform filter; see ?wavelets::wt.filter
#' @param contrib_type Type of contribution: "relative", "logit" or "absolute" (any prefix)
#' @param WER "end" to apply stage 2 after stage 1 has fully iterated, or "mix" to apply
#' or K2 [if WER=="mix"] medoids); default: 1.
#' Note: ntasks << N (number of series), so that N is "roughly divisible" by ntasks
#' @param ncores_tasks Number of parallel tasks (1 to disable: sequential tasks)
-#' @param ncores_clust Number of parallel clusterings in one task (4 should be a minimum)
+#' @param ncores_clust Number of parallel clusterings in one task (3 should be a minimum)
#' @param sep Separator in CSV input file (if any provided)
#' @param nbytes Number of bytes to serialize a floating-point number; 4 or 8
#' @param endian Endianness for (de)serialization ("little" or "big")
#' @param verbose Level of verbosity (0/FALSE for nothing or 1/TRUE for all; devel stage)
#' @param parll TRUE to fully parallelize; otherwise run sequentially (debug, comparison)
#'
-#' @return A matrix of the final K2 medoids curves, in columns
+#' @return A list with
+#' \itemize{
+#' medoids: a matrix of the final K2 medoids curves, in columns
+#' ranks: corresponding indices in the dataset
+#' synchrones: a matrix of the K2 sum of series within each final group
+#' }
#'
#' @references Clustering functional data using Wavelets [2013];
#' A. Antoniadis, X. Brossat, J. Cugliari & J.-M. Poggi.
#' series = do.call( cbind, lapply( 1:6, function(i)
#' do.call(cbind, wmtsa::wavBootstrap(ref_series[,i], n.realization=400)) ) )
#' #dim(series) #c(2400,10001)
-#' medoids_ascii = claws(series, K1=60, K2=6, 200, verbose=TRUE)
+#' res_ascii = claws(series, K1=60, K2=6, 200, verbose=TRUE)
#'
#' # Same example, from CSV file
#' csv_file = "/tmp/epclust_series.csv"
#' write.table(series, csv_file, sep=",", row.names=FALSE, col.names=FALSE)
-#' medoids_csv = claws(csv_file, K1=60, K2=6, 200)
+#' res_csv = claws(csv_file, K1=60, K2=6, 200)
#'
#' # Same example, from binary file
#' bin_file <- "/tmp/epclust_series.bin"
#' endian <- "little"
#' binarize(csv_file, bin_file, 500, nbytes, endian)
#' getSeries <- function(indices) getDataInFile(indices, bin_file, nbytes, endian)
-#' medoids_bin <- claws(getSeries, K1=60, K2=6, 200)
+#' res_bin <- claws(getSeries, K1=60, K2=6, 200)
#' unlink(csv_file)
#' unlink(bin_file)
#'
#' else
#' NULL
#' }
-#' medoids_db = claws(getSeries, K1=60, K2=6, 200))
+#' res_db = claws(getSeries, K1=60, K2=6, 200))
#' dbDisconnect(series_db)
#'
-#' # All computed medoids should be the same:
-#' digest::sha1(medoids_ascii)
-#' digest::sha1(medoids_csv)
-#' digest::sha1(medoids_bin)
-#' digest::sha1(medoids_db)
+#' # All results should be the same:
+#' library(digest)
+#' digest::sha1(res_ascii)
+#' digest::sha1(res_csv)
+#' digest::sha1(res_bin)
+#' digest::sha1(res_db)
#' }
#' @export
-claws <- function(getSeries, K1, K2, nb_series_per_chunk, nb_items_clust1=7*K1,
- algoClust1=function(data,K) cluster::pam(t(data),K,diss=FALSE)$id.med,
- algoClust2=function(dists,K) cluster::pam(dists,K,diss=TRUE)$id.med,
+claws <- function(getSeries, K1, K2, nb_series_per_chunk,
+ algoClust1=function(data,K) cluster::pam(t(data),K,diss=FALSE,pamonce=1)$id.med,
+ algoClust2=function(dists,K) cluster::pam(dists,K,diss=TRUE,pamonce=1)$id.med,
wav_filt="d8", contrib_type="absolute", WER="end", nvoice=4, random=TRUE,
ntasks=1, ncores_tasks=1, ncores_clust=4, sep=",", nbytes=4,
endian=.Platform$endian, verbose=FALSE, parll=TRUE)
{
# Check/transform arguments
- if (!is.matrix(getSeries) && !bigmemory::is.big.matrix(getSeries)
- && !is.function(getSeries)
- && !methods::is(getSeries,"connection") && !is.character(getSeries))
+ if (!is.matrix(series) && !bigmemory::is.big.matrix(series)
+ && !is.function(series)
+ && !methods::is(series,"connection") && !is.character(series))
{
- stop("'getSeries': [big]matrix, function, file or valid connection (no NA)")
+ stop("'series': [big]matrix, function, file or valid connection (no NA)")
}
K1 <- .toInteger(K1, function(x) x>=2)
K2 <- .toInteger(K2, function(x) x>=2)
# to load K1 series in memory for clustering stage 2.
if (K1 > nb_series_per_chunk)
stop("'K1' cannot exceed 'nb_series_per_chunk'")
- nb_items_clust1 <- .toInteger(nb_items_clust1, function(x) x>K1)
random <- .toLogical(random)
tryCatch( {ignored <- wavelets::wt.filter(wav_filt)},
error = function(e) stop("Invalid wavelet filter; see ?wavelets::wt.filter") )
verbose <- .toLogical(verbose)
parll <- .toLogical(parll)
- # Binarize series if getSeries is not a function; the aim is to always use a function,
+ # Binarize series if it is not a function; the aim is to always use a function,
# to uniformize treatments. An equally good alternative would be to use a file-backed
# bigmemory::big.matrix, but it would break the "all-is-function" pattern.
- if (!is.function(getSeries))
+ if (!is.function(series))
{
if (verbose)
cat("...Serialize time-series (or retrieve past binary file)\n")
- series_file = ".series.bin"
+ series_file = ".series.epclust.bin"
if (!file.exists(series_file))
- binarize(getSeries, series_file, nb_series_per_chunk, sep, nbytes, endian)
+ binarize(series, series_file, nb_series_per_chunk, sep, nbytes, endian)
getSeries = function(inds) getDataInFile(inds, series_file, nbytes, endian)
}
+ else
+ getSeries = series
# Serialize all computed wavelets contributions into a file
- contribs_file = ".contribs.bin"
+ contribs_file = ".contribs.epclust.bin"
index = 1
nb_curves = 0
if (verbose)
if (!file.exists(contribs_file))
{
nb_curves = binarizeTransform(getSeries,
- function(series) curvesToContribs(series, wav_filt, contrib_type),
+ function(curves) curvesToContribs(curves, wav_filt, contrib_type),
contribs_file, nb_series_per_chunk, nbytes, endian)
}
else
# Initialize parallel runs: outfile="" allow to output verbose traces in the console
# under Linux. All necessary variables are passed to the workers.
cl = parallel::makeCluster(ncores_tasks, outfile="")
- varlist = c("getSeries","getContribs","K1","K2","algoClust1","algoClust2",
- "nb_series_per_chunk","nb_items_clust1","ncores_clust",
- "nvoice","sep","nbytes","endian","verbose","parll")
- if (WER=="mix" && ntasks>1)
- varlist = c(varlist, "medoids_file")
- parallel::clusterExport(cl, varlist, envir = environment())
+ parallel::clusterExport(cl, envir = environment(),
+ varlist = c("getSeries","getContribs","K1","K2","algoClust1","algoClust2",
+ "nb_series_per_chunk","ncores_clust","nvoice","nbytes","endian","verbose","parll"))
}
# This function achieves one complete clustering task, divided in stage 1 + stage 2.
# packages, and pass useful variables.
if (parll && ntasks>1)
require("epclust", quietly=TRUE)
- indices_medoids = clusteringTask1(
- inds, getContribs, K1, algoClust1, nb_series_per_chunk, ncores_clust, verbose, parll)
- if (WER=="mix" && ntasks>1)
+ indices_medoids = clusteringTask1(inds, getContribs, K1, algoClust1,
+ nb_series_per_chunk, ncores_clust, verbose, parll)
+ if (WER=="mix")
{
- if (parll)
- require("bigmemory", quietly=TRUE)
- medoids1 = bigmemory::as.big.matrix( getSeries(indices_medoids) )
- medoids2 = clusteringTask2(medoids1, K2, algoClust2, getSeries, nb_curves,
+ indices_medoids = clusteringTask2(indices_medoids, getSeries, K2, algoClust2,
nb_series_per_chunk, nvoice, nbytes, endian, ncores_clust, verbose, parll)
- binarize(medoids2, medoids_file, nb_series_per_chunk, sep, nbytes, endian)
- return (vector("integer",0))
}
indices_medoids
}
- # Synchrones (medoids) need to be stored only if WER=="mix"; indeed in this case, every
- # task output is a set of new (medoids) curves. If WER=="end" however, output is just a
- # set of indices, representing some initial series.
- if (WER=="mix" && ntasks>1)
- {medoids_file = ".medoids.bin" ; unlink(medoids_file)}
-
if (verbose)
{
message = paste("...Run ",ntasks," x stage 1", sep="")
# As explained above, indices will be assigned to ntasks*K1 medoids indices [if WER=="end"],
# or nothing (empty vector) if WER=="mix"; in this case, synchrones are stored in a file.
- indices <-
+ indices_medoids_all <-
if (parll && ntasks>1)
unlist( parallel::parLapply(cl, indices_tasks, runTwoStepClustering) )
else
unlist( lapply(indices_tasks, runTwoStepClustering) )
+
if (parll && ntasks>1)
parallel::stopCluster(cl)
- # Right before the final stage, two situations are possible:
- # a. data to be processed now sit in a binary format in medoids_file (if WER=="mix")
- # b. data still is the initial set of curves, referenced by the ntasks*K1 indices
- # So, the function getSeries() will potentially change. However, computeSynchrones()
- # requires a function retrieving the initial series. Thus, the next line saves future
- # conditional instructions.
- getRefSeries = getSeries
-
- if (WER=="mix" && ntasks>1)
- {
- indices = seq_len(ntasks*K2)
- # Now series (synchrones) must be retrieved from medoids_file
- getSeries = function(inds) getDataInFile(inds, medoids_file, nbytes, endian)
- # Contributions must be re-computed
- unlink(contribs_file)
- index = 1
- if (verbose)
- cat("...Serialize contributions computed on synchrones\n")
- ignored = binarizeTransform(getSeries,
- function(series) curvesToContribs(series, wav_filt, contrib_type),
- contribs_file, nb_series_per_chunk, nbytes, endian)
- }
+ # Right before the final stage, input data still is the initial set of curves, referenced
+ # by the ntasks*[K1 or K2] medoids indices.
- # Run step2 on resulting indices or series (from file)
+ # Run last clustering tasks to obtain only K2 medoids indices, from ntasks*[K1 or K2]
+ # indices, depending wether WER=="end" or WER=="mix"
if (verbose)
cat("...Run final // stage 1 + stage 2\n")
- indices_medoids = clusteringTask1(indices, getContribs, K1, algoClust1,
+ indices_medoids = clusteringTask1(indices_medoids_all, getContribs, K1, algoClust1,
nb_series_per_chunk, ncores_tasks*ncores_clust, verbose, parll)
- medoids1 = bigmemory::as.big.matrix( getSeries(indices_medoids) )
- medoids2 = clusteringTask2(medoids1, K2, algoClust2, getRefSeries, nb_curves,
+ indices_medoids = clusteringTask2(indices_medoids, getContribs, K2, algoClust2,
nb_series_per_chunk, nvoice, nbytes, endian, ncores_tasks*ncores_clust, verbose, parll)
- # Cleanup: remove temporary binary files
- tryCatch(
- {unlink(series_file); unlink(contribs_file); unlink(medoids_file)},
- error = function(e) {})
+ # Compute synchrones
+ medoids = getSeries(indices_medoids)
+ synchrones = computeSynchrones(medoids, getSeries, nb_curves, nb_series_per_chunk,
+ ncores_tasks*ncores_clust, verbose, parll)
- # Return medoids as a standard matrix, since K2 series have to fit in RAM
- # (clustering algorithm 1 takes K1 > K2 of them as input)
- medoids2[,]
-}
-
-#' curvesToContribs
-#'
-#' Compute the discrete wavelet coefficients for each series, and aggregate them in
-#' energy contribution across scales as described in https://arxiv.org/abs/1101.4744v2
-#'
-#' @param series [big.]matrix of series (in columns), of size L x n
-#' @inheritParams claws
-#'
-#' @return A [big.]matrix of size log(L) x n containing contributions in columns
-#'
-#' @export
-curvesToContribs = function(series, wav_filt, contrib_type, coin=FALSE)
-{
- series = as.matrix(series) #1D serie could occur
- L = nrow(series)
- D = ceiling( log2(L) )
- # Series are interpolated to all have length 2^D
- nb_sample_points = 2^D
- apply(series, 2, function(x) {
- interpolated_curve = spline(1:L, x, n=nb_sample_points)$y
- W = wavelets::dwt(interpolated_curve, filter=wav_filt, D)@W
- # Compute the sum of squared discrete wavelet coefficients, for each scale
- nrj = rev( sapply( W, function(v) ( sqrt( sum(v^2) ) ) ) )
- if (contrib_type!="absolute")
- nrj = nrj / sum(nrj)
- if (contrib_type=="logit")
- nrj = - log(1 - nrj)
- nrj
- })
-}
-
-# Check integer arguments with functional conditions
-.toInteger <- function(x, condition)
-{
- errWarn <- function(ignored)
- paste("Cannot convert argument' ",substitute(x),"' to integer", sep="")
- if (!is.integer(x))
- tryCatch({x = as.integer(x)[1]; if (is.na(x)) stop()},
- warning = errWarn, error = errWarn)
- if (!condition(x))
- {
- stop(paste("Argument '",substitute(x),
- "' does not verify condition ",body(condition), sep=""))
- }
- x
-}
-
-# Check logical arguments
-.toLogical <- function(x)
-{
- errWarn <- function(ignored)
- paste("Cannot convert argument' ",substitute(x),"' to logical", sep="")
- if (!is.logical(x))
- tryCatch({x = as.logical(x)[1]; if (is.na(x)) stop()},
- warning = errWarn, error = errWarn)
- x
+ # NOTE: no need to use big.matrix here, since there are only K2 << K1 << N remaining curves
+ list("medoids"=medoids, "ranks"=indices_medoids, "synchrones"=synchrones)
}
--- /dev/null
+# Check integer arguments with functional conditions
+.toInteger <- function(x, condition)
+{
+ errWarn <- function(ignored)
+ paste("Cannot convert argument' ",substitute(x),"' to integer", sep="")
+ if (!is.integer(x))
+ tryCatch({x = as.integer(x)[1]; if (is.na(x)) stop()},
+ warning = errWarn, error = errWarn)
+ if (!condition(x))
+ {
+ stop(paste("Argument '",substitute(x),
+ "' does not verify condition ",body(condition), sep=""))
+ }
+ x
+}
+
+# Check logical arguments
+.toLogical <- function(x)
+{
+ errWarn <- function(ignored)
+ paste("Cannot convert argument' ",substitute(x),"' to logical", sep="")
+ if (!is.logical(x))
+ tryCatch({x = as.logical(x)[1]; if (is.na(x)) stop()},
+ warning = errWarn, error = errWarn)
+ x
+}
+
+#' curvesToContribs
+#'
+#' Compute the discrete wavelet coefficients for each series, and aggregate them in
+#' energy contribution across scales as described in https://arxiv.org/abs/1101.4744v2
+#'
+#' @param series [big.]matrix of series (in columns), of size L x n
+#' @inheritParams claws
+#'
+#' @return A matrix of size log(L) x n containing contributions in columns
+#'
+#' @export
+curvesToContribs = function(series, wav_filt, contrib_type, coin=FALSE)
+{
+ L = nrow(series)
+ D = ceiling( log2(L) )
+ # Series are interpolated to all have length 2^D
+ nb_sample_points = 2^D
+ apply(series, 2, function(x) {
+ interpolated_curve = spline(1:L, x, n=nb_sample_points)$y
+ W = wavelets::dwt(interpolated_curve, filter=wav_filt, D)@W
+ # Compute the sum of squared discrete wavelet coefficients, for each scale
+ nrj = rev( sapply( W, function(v) ( sqrt( sum(v^2) ) ) ) )
+ if (contrib_type!="absolute")
+ nrj = nrj / sum(nrj)
+ if (contrib_type=="logit")
+ nrj = - log(1 - nrj)
+ nrj
+ })
+}
+
+# Helper function to divide indices into balanced sets
+# If max == TRUE, sets sizes cannot exceed nb_per_set
+.splitIndices = function(indices, nb_per_set, max=FALSE)
+{
+ L = length(indices)
+ nb_workers = floor( L / nb_per_set )
+ rem = L %% nb_per_set
+ if (nb_workers == 0 || (nb_workers==1 && rem==0))
+ {
+ # L <= nb_per_set, simple case
+ indices_workers = list(indices)
+ }
+ else
+ {
+ indices_workers = lapply( seq_len(nb_workers), function(i)
+ indices[(nb_per_set*(i-1)+1):(nb_per_set*i)] )
+
+ if (max)
+ {
+ # Sets are not so well balanced, but size is supposed to be critical
+ return ( c( indices_workers, if (rem>0) list((L-rem+1):L) else NULL ) )
+ }
+
+ # Spread the remaining load among the workers
+ rem = L %% nb_per_set
+ while (rem > 0)
+ {
+ index = rem%%nb_workers + 1
+ indices_workers[[index]] = c(indices_workers[[index]], indices[L-rem+1])
+ rem = rem - 1
+ }
+ }
+ indices_workers
+}
+
+#' filterMA
+#'
+#' Filter [time-]series by replacing all values by the moving average of values
+#' centered around current one. Border values are averaged with available data.
+#'
+#' @param M_ A real matrix of size LxD
+#' @param w_ The (odd) number of values to average
+#'
+#' @return The filtered matrix, of same size as the input
+#' @export
+filterMA = function(M_, w_)
+ .Call("filterMA", M_, w_, PACKAGE="epclust")
+
+#' cleanBin
+#'
+#' Remove binary files to re-generate them at next run of \code{claws()}.
+#' Note: run it in the folder where the computations occurred (or no effect).
+#'
+#' @export
+cleanBin <- function()
+{
+ bin_files = list.files(pattern = "*.epclust.bin", all.files=TRUE)
+ for (file in bin_files)
+ unlink(file)
+}
+++ /dev/null
-citHeader("To cite epclust in publications use:")
-
-citEntry(entry = "Manual",
- title = ".",
- author = personList(as.person("Benjamin Auder"),
- as.person("Jairo Cugliari"),
- as.person("Yannig Goude"),
- as.person("Jean-Michel Poggi")),
- organization = "Paris-Sud, Saclay & Lyon 2",
- address = "Orsay, Saclay & Lyon, France",
- year = "2017",
- url = "https://git.auder.net/?p=edfclust.git",
-
- textVersion =
- paste("Benjamin Auder, Jairo Cugliari, Yannig Goude, Jean-Michel Poggi (2017).",
- "EPCLUST: Electric Power curves CLUSTering.",
- "URL https://git.auder.net/?p=edfclust.git")
-)
+++ /dev/null
-#include <Rcpp.h>
-
-// [[Rcpp::depends(BH, bigmemory)]]
-#include <bigmemory/MatrixAccessor.hpp>
-
-#include <numeric>
-#include <cfloat>
-
-using namespace Rcpp;
-
-//' computeMedoidsIndices
-//'
-//' For each column of the 'series' matrix input, search for the closest medoid
-//' (euclidian distance) and store its index
-//'
-//' @param pMedoids External pointer (a big.matrix 'address' slot in R)
-//' @param series (reference) series, a matrix of size Lxn
-//'
-//' @return An integer vector of the closest medoids indices, for each (column) serie
-// [[Rcpp::export]]
-IntegerVector computeMedoidsIndices(SEXP pMedoids, NumericMatrix series)
-{
- // Turn SEXP external pointer into BigMatrix (description) object
- XPtr<BigMatrix> pMed(pMedoids);
- // medoids: access to the content of the BigMatrix object
- MatrixAccessor<double> medoids = MatrixAccessor<double>(*pMed);
-
- int nb_series = series.ncol(),
- K = pMed->ncol(),
- L = pMed->nrow();
- IntegerVector mi(nb_series);
-
- for (int i=0; i<nb_series ; i++) //column index in series
- {
- // In R: mi[i] <- which.min( rowSums( sweep(medoids, 2, series[i,], '-')^2 ) )
- // In C(++), computations must be unrolled
- mi[i] = 0;
- double best_dist = DBL_MAX;
- for (int j=0; j<K; j++) //column index in medoids
- {
- double dist_ij = 0.;
- for (int k=0; k<L; k++) //common row index (medoids, series)
- {
- // Accessing values for a big matrix is a bit weird; see Rcpp gallery/bigmemory
- double delta = series(k,i) - *(medoids[j]+k);
- dist_ij += delta * delta;
- }
- if (dist_ij < best_dist)
- {
- mi[i] = j+1; //R indices start at 1
- best_dist = dist_ij;
- }
- }
- }
-
- return mi;
-}
--- /dev/null
+#include <R.h>
+#include <Rdefines.h>
+
+// filterMA
+//
+// Filter [time-]series by replacing all values by the moving average of values
+// centered around current one. Border values are averaged with available data.
+//
+// @param M_ A real matrix of size LxD
+// @param w_ The (odd) number of values to average
+//
+// @return The filtered matrix, of same size as the input
+SEXP filterMA(SEXP M_, SEXP w_)
+{
+ int L = INTEGER(getAttrib(cwt_, R_DimSymbol))[0],
+ D = INTEGER(getAttrib(cwt_, R_DimSymbol))[1],
+ w = INTEGER_VALUE(w_),
+ half_w = (w-1)/2,
+ i,
+ nt; //number of terms in the current sum (max: w)
+ double *cwt = REAL(cwt_),
+ cs, //current sum (max: w terms)
+ left; //leftward term in the current moving sum
+
+ SEXP fcwt_; //the filtered result
+ PROTECT(fcwt_ = allocMatrix(REALSXP, L, D));
+ double* fcwt = REAL(fcwt_); //pointer to the encapsulated vector
+
+ // NOTE: unused loop parameter: shifting at the end of the loop is more efficient
+ for (int col=D-1; col>=0; col--)
+ {
+ // Initialization
+ nt = half_w + 1;
+ left = cwt[0];
+ cs = 0.;
+ for (i=half_w; i>=0; i--)
+ cs += cwt[i];
+
+ // Left border
+ for (i=0; i<half_w; i++)
+ {
+ fcwt[i] = cs / nt; //(partial) moving average at current index i
+ cs += cwt[i+half_w+1];
+ nt++;
+ }
+
+ // Middle: now nt == w, i == half_w
+ for (; i<L-half_w-1; i++)
+ {
+ fcwt[i] = cs / w; //moving average at current index i
+ cs = ms - left + cwt[i+half_w+1]; //remove oldest items, add next
+ left = cwt[i-half_w+1]; //update first value for next iteration
+ }
+
+ // (Last "fully averaged" index +) Right border
+ for (; i<L; i++)
+ {
+ fcwt[i] = cs / nt; //(partial) moving average at current index i
+ cs -= cwt[i-half_w];
+ nt--;
+ }
+
+ // Shift by L == increment column index by 1 (storage per column)
+ cwt = cwt + L;
+ fcwt = fcwt + L;
+ }
+
+ UNPROTECT(1);
+ return fcwt_;
+}
+++ /dev/null
-#include <Rcpp.h>
-
-using namespace Rcpp;
-
-//' filter
-//'
-//' Filter [time-]series by replacing all values by the moving average of previous, current and
-//' next value. Possible extensions: larger window, gaussian kernel... (but would be costly!).
-//' Note: border values are unchanged.
-//'
-//' @param cwt Continuous wavelets transform (in columns): a matrix of size LxD
-//'
-//' @return The filtered CWT, in a matrix of same size (LxD)
-// [[Rcpp::export]]
-RcppExport SEXP filterMA(SEXP cwt_)
-{
- // NOTE: C-style for better efficiency (this must be as fast as possible)
- int L = INTEGER(Rf_getAttrib(cwt_, R_DimSymbol))[0],
- D = INTEGER(Rf_getAttrib(cwt_, R_DimSymbol))[1];
- double *cwt = REAL(cwt_);
-
- SEXP fcwt_; //the filtered result
- PROTECT(fcwt_ = Rf_allocMatrix(REALSXP, L, D));
- double* fcwt = REAL(fcwt_); //pointer to the encapsulated vector
-
- // NOTE: unused loop parameter: shifting at the end of the loop is more efficient
- for (int col=D-1; col>=0; col--)
- {
- double v1 = cwt[0]; //first value
- double ms = v1 + cwt[1] + cwt[2]; //moving sum at second value
- for (int i=1; i<L-2; i++)
- {
- fcwt[i] = ms / 3.; //ms / 3: moving average at current index i
- ms = ms - v1 + cwt[i+2]; //update ms: remove oldest items, add next
- v1 = cwt[i]; //update first value for next iteration
- }
-
- // Fill a few border values not computed in the loop
- fcwt[0] = cwt[0];
- fcwt[L-2] = ms / 3.;
- fcwt[L-1] = cwt[L-1];
-
- // Shift by L == increment column index by 1 (storage per column)
- cwt = cwt + L;
- fcwt = fcwt + L;
- }
-
- UNPROTECT(1);
- return fcwt_;
-}
--- /dev/null
+# Shorthand: map 1->1, 2->2, 3->3, 4->1, ..., 149->2, 150->3, ... (if base==3)
+I = function(i, base)
+ (i-1) %% base + 1
+++ /dev/null
-# Shorthand: map 1->1, 2->2, 3->3, 4->1, ..., 149->2, 150->3, ... (is base==3)
-I = function(i, base)
- (i-1) %% base + 1
-
-# Compute the sum of (normalized) sum of squares of closest distances to a medoid.
-# Note: medoids can be a big.matrix
-computeDistortion = function(series, medoids)
-{
- if (bigmemory::is.big.matrix(medoids))
- medoids = medoids[,] #extract standard matrix
-
- n = ncol(series) ; L = nrow(series)
- distortion = 0.
- for (i in seq_len(n))
- distortion = distortion + min( colSums( sweep(medoids,1,series[,i],'-')^2 ) / L )
-
- sqrt( distortion / n )
-}
+++ /dev/null
-# R-equivalent of computeMedoidsIndices, requiring a matrix
-# (thus potentially breaking "fit-in-memory" hope)
-R_computeMedoidsIndices <- function(medoids, series)
-{
- nb_series = ncol(series) #series in columns
-
- mi = rep(NA,nb_series)
- for (i in 1:nb_series)
- mi[i] <- which.min( colSums( sweep(medoids, 1, series[,i], '-')^2 ) )
-
- mi
-}
--- /dev/null
+context("assignMedoids")
+
+test_that("assignMedoids behave as expected",
+{
+ # Generate a gaussian mixture
+ n = 999
+ L = 7
+ medoids = cbind( rep(0,L), rep(-5,L), rep(5,L) )
+ # short series...
+ series = t( rbind( MASS::mvrnorm(n/3, medoids[,1], diag(L)),
+ MASS::mvrnorm(n/3, medoids[,2], diag(L)),
+ MASS::mvrnorm(n/3, medoids[,3], diag(L)) ) )
+
+ # With high probability, medoids indices should resemble 1,1,1,...,2,2,2,...,3,3,3,...
+ mi = epclust:::assignMedoids(medoids, series)
+ mi_ref = rep(1:3, each=n/3)
+ expect_lt( mean(mi != mi_ref), 0.01 )
+
+ # Now with a random matrix, compare with (~trusted) R version
+ series = matrix(runif(n*L, min=-7, max=7), nrow=L)
+ mi = epclust:::assignMedoids(medoids, series)
+ mi_ref = R_assignMedoids(medoids, series)
+ expect_equal(mi, mi_ref)
+})
+
+# R-equivalent of , requiring a matrix
+# (thus potentially breaking "fit-in-memory" hope)
+R_assignMedoids <- function(medoids, series)
+{
+ nb_series = ncol(series) #series in columns
+
+ mi = rep(NA,nb_series)
+ for (i in 1:nb_series)
+ mi[i] <- which.min( colSums( sweep(medoids, 1, series[,i], '-')^2 ) )
+
+ mi
+}
context("clustering")
-test_that("computeSynchrones behave as expected",
-{
- # Generate 300 sinusoïdal series of 3 kinds: all series of indices == 0 mod 3 are the same
- # (plus noise), all series of indices == 1 mod 3 are the same (plus noise) ...
- n = 300
- x = seq(0,9.5,0.1)
- L = length(x) #96 1/4h
- K = 3
- s1 = cos(x)
- s2 = sin(x)
- s3 = c( s1[1:(L%/%2)] , s2[(L%/%2+1):L] )
- #sum((s1-s2)^2) == 96
- #sum((s1-s3)^2) == 58
- #sum((s2-s3)^2) == 38
- s = list(s1, s2, s3)
- series = matrix(nrow=L, ncol=n)
- for (i in seq_len(n))
- series[,i] = s[[I(i,K)]] + rnorm(L,sd=0.01)
-
- getRefSeries = function(indices) {
- indices = indices[indices <= n]
- if (length(indices)>0) as.matrix(series[,indices]) else NULL
- }
-
- synchrones = computeSynchrones(bigmemory::as.big.matrix(cbind(s1,s2,s3)), getRefSeries,
- n, 100, verbose=TRUE, parll=FALSE)
-
- expect_equal(dim(synchrones), c(L,K))
- for (i in 1:K)
- {
- # Synchrones are (for each medoid) sums of closest curves.
- # Here, we expect exactly 100 curves of each kind to be assigned respectively to
- # synchrone 1, 2 and 3 => division by 100 should be very close to the ref curve
- expect_equal(synchrones[,i]/100, s[[i]], tolerance=0.01)
- }
-})
-
-test_that("Helper function to spread indices work properly",
-{
- indices <- 1:400
-
- # bigger nb_per_set than length(indices)
- expect_equal(epclust:::.spreadIndices(indices,500), list(indices))
-
- # nb_per_set == length(indices)
- expect_equal(epclust:::.spreadIndices(indices,400), list(indices))
-
- # length(indices) %% nb_per_set == 0
- expect_equal(epclust:::.spreadIndices(indices,200),
- c( list(indices[1:200]), list(indices[201:400]) ))
- expect_equal(epclust:::.spreadIndices(indices,100),
- c( list(indices[1:100]), list(indices[101:200]),
- list(indices[201:300]), list(indices[301:400]) ))
-
- # length(indices) / nb_per_set == 1, length(indices) %% nb_per_set == 100
- expect_equal(epclust:::.spreadIndices(indices,300), list(indices))
- # length(indices) / nb_per_set == 2, length(indices) %% nb_per_set == 42
- repartition <- epclust:::.spreadIndices(indices,179)
- expect_equal(length(repartition), 2)
- expect_equal(length(repartition[[1]]), 179 + 21)
- expect_equal(length(repartition[[1]]), 179 + 21)
-})
-
test_that("clusteringTask1 behave as expected",
{
# Generate 60 reference sinusoïdal series (medoids to be found),
wf = "haar"
ctype = "absolute"
- getContribs = function(indices) curvesToContribs(series[,indices],wf,ctype)
+ getContribs = function(indices) curvesToContribs(as.matrix(series[,indices]),wf,ctype)
require("cluster", quietly=TRUE)
algoClust1 = function(contribs,K) cluster::pam(t(contribs),K,diss=FALSE)$id.med
for (i in 1:3)
expect_lte( distor_good, computeDistortion(synchrones, synchrones[,sample(1:K1,3)]) )
})
+
+# Compute the sum of (normalized) sum of squares of closest distances to a medoid.
+# Note: medoids can be a big.matrix
+computeDistortion = function(series, medoids)
+{
+ if (bigmemory::is.big.matrix(medoids))
+ medoids = medoids[,] #extract standard matrix
+
+ n = ncol(series) ; L = nrow(series)
+ distortion = 0.
+ for (i in seq_len(n))
+ distortion = distortion + min( colSums( sweep(medoids,1,series[,i],'-')^2 ) / L )
+
+ sqrt( distortion / n )
+}
--- /dev/null
+context("computeSynchrones")
+
+test_that("computeSynchrones behave as expected",
+{
+ # Generate 300 sinusoïdal series of 3 kinds: all series of indices == 0 mod 3 are the same
+ # (plus noise), all series of indices == 1 mod 3 are the same (plus noise) ...
+ n = 300
+ x = seq(0,9.5,0.1)
+ L = length(x) #96 1/4h
+ K = 3
+ s1 = cos(x)
+ s2 = sin(x)
+ s3 = c( s1[1:(L%/%2)] , s2[(L%/%2+1):L] )
+ #sum((s1-s2)^2) == 96
+ #sum((s1-s3)^2) == 58
+ #sum((s2-s3)^2) == 38
+ s = list(s1, s2, s3)
+ series = matrix(nrow=L, ncol=n)
+ for (i in seq_len(n))
+ series[,i] = s[[I(i,K)]] + rnorm(L,sd=0.01)
+
+ getRefSeries = function(indices) {
+ indices = indices[indices <= n]
+ if (length(indices)>0) as.matrix(series[,indices]) else NULL
+ }
+
+ synchrones = computeSynchrones(bigmemory::as.big.matrix(cbind(s1,s2,s3)), getRefSeries,
+ n, 100, verbose=TRUE, parll=FALSE)
+
+ expect_equal(dim(synchrones), c(L,K))
+ for (i in 1:K)
+ {
+ # Synchrones are (for each medoid) sums of closest curves.
+ # Here, we expect exactly 100 curves of each kind to be assigned respectively to
+ # synchrone 1, 2 and 3 => division by 100 should be very close to the ref curve
+ expect_equal(synchrones[,i]/100, s[[i]], tolerance=0.01)
+ }
+})
-context("wavelets discrete & continuous")
-
-test_that("curvesToContribs behave as expected",
-{
-# curvesToContribs(...)
-})
+context("computeWerDists")
test_that("computeWerDists output correct results",
{
# On two constant series
# TODO:...
})
+
--- /dev/null
+context("utils functions")
+
+test_that("Helper function to split indices work properly",
+{
+ indices <- 1:400
+
+ # bigger nb_per_set than length(indices)
+ expect_equal(epclust:::.splitIndices(indices,500), list(indices))
+
+ # nb_per_set == length(indices)
+ expect_equal(epclust:::.splitIndices(indices,400), list(indices))
+
+ # length(indices) %% nb_per_set == 0
+ expect_equal(epclust:::.splitIndices(indices,200),
+ c( list(indices[1:200]), list(indices[201:400]) ))
+ expect_equal(epclust:::.splitIndices(indices,100),
+ c( list(indices[1:100]), list(indices[101:200]),
+ list(indices[201:300]), list(indices[301:400]) ))
+
+ # length(indices) / nb_per_set == 1, length(indices) %% nb_per_set == 100
+ expect_equal(epclust:::.splitIndices(indices,300), list(indices))
+ # length(indices) / nb_per_set == 2, length(indices) %% nb_per_set == 42
+ repartition <- epclust:::.splitIndices(indices,179)
+ expect_equal(length(repartition), 2)
+ expect_equal(length(repartition[[1]]), 179 + 21)
+ expect_equal(length(repartition[[1]]), 179 + 21)
+})
+
+test_that("curvesToContribs output correct results",
+{
+# curvesToContribs(...)
+})
+++ /dev/null
-context("computeMedoidsIndices")
-
-test_that("computeMedoidsIndices behave as expected",
-{
- # Generate a gaussian mixture
- n = 999
- L = 7
- medoids = cbind( rep(0,L), rep(-5,L), rep(5,L) )
- # short series...
- series = t( rbind( MASS::mvrnorm(n/3, medoids[,1], diag(L)),
- MASS::mvrnorm(n/3, medoids[,2], diag(L)),
- MASS::mvrnorm(n/3, medoids[,3], diag(L)) ) )
-
- # With high probability, medoids indices should resemble 1,1,1,...,2,2,2,...,3,3,3,...
- require("bigmemory", quietly=TRUE)
- mi = epclust:::computeMedoidsIndices(bigmemory::as.big.matrix(medoids)@address, series)
- mi_ref = rep(1:3, each=n/3)
- expect_lt( mean(mi != mi_ref), 0.01 )
-
- # Now with a random matrix, compare with (trusted) R version
- series = matrix(runif(n*L, min=-7, max=7), nrow=L)
- mi = epclust:::computeMedoidsIndices(bigmemory::as.big.matrix(medoids)@address, series)
- mi_ref = R_computeMedoidsIndices(medoids, series)
- expect_equal(mi, mi_ref)
-})