-#' @name clustering
-#' @rdname clustering
-#' @aliases clusteringTask1 clusteringTask2 computeClusters1 computeClusters2
+#' Two-stage clustering, within one task (see \code{claws()})
#'
-#' @title Two-stage clustering, withing one task (see \code{claws()})
+#' \code{clusteringTask1()} runs one full stage-1 task, which consists in iterated
+#' stage 1 clustering on nb_curves / ntasks energy contributions, computed through
+#' discrete wavelets coefficients.
+#' \code{clusteringTask2()} runs a full stage-2 task, which consists in WER distances
+#' computations between medoids (indices) output from stage 1, before applying
+#' the second clustering algorithm on the distances matrix.
#'
-#' @description \code{clusteringTask1()} runs one full stage-1 task, which consists in
-#' iterated stage 1 clustering on nb_curves / ntasks energy contributions, computed
-#' through discrete wavelets coefficients.
-#' \code{clusteringTask2()} runs a full stage-2 task, which consists in
-#' WER distances computations between medoids indices output from stage 1,
-#' before applying the second clustering algorithm, on the distances matrix.
-#'
-#' @param indices Range of series indices to cluster
#' @param getContribs Function to retrieve contributions from initial series indices:
#' \code{getContribs(indices)} outputs a contributions matrix
#' @inheritParams claws
#' @inheritParams computeSynchrones
+#' @inheritParams computeWerDists
+#'
+#' @return The indices of the computed (resp. K1 and K2) medoids.
#'
-#' @return For \code{clusteringTask1()}, the indices of the computed (K1) medoids.
-#' Indices are irrelevant for stage 2 clustering, thus \code{clusteringTask2()}
-#' outputs a big.matrix of medoids (of size LxK2, K2 = final number of clusters)
+#' @name clustering
+#' @rdname clustering
+#' @aliases clusteringTask1 clusteringTask2
NULL
#' @rdname clustering
#' @export
-clusteringTask1 = function(indices, getContribs, K1, algoClust1, nb_series_per_chunk,
+clusteringTask1 = function(indices, getContribs, K1, algoClust1, nb_items_clust,
ncores_clust=1, verbose=FALSE, parll=TRUE)
{
if (parll)
while (length(indices) > K1)
{
# Balance tasks by splitting the indices set - as evenly as possible
- indices_workers = .splitIndices(indices, nb_items_clust1)
+ indices_workers = .splitIndices(indices, nb_items_clust, min_size=K1+1)
if (verbose)
cat(paste("*** [iterated] Clustering task 1 on ",length(indices)," series\n", sep=""))
indices <-
nvoice, nbytes, endian, ncores_clust=1, verbose=FALSE, parll=TRUE)
{
if (verbose)
- cat(paste("*** Clustering task 2 on ",ncol(medoids)," synchrones\n", sep=""))
-
- if (ncol(medoids) <= K2)
- return (medoids)
+ cat(paste("*** Clustering task 2 on ",length(indices)," medoids\n", sep=""))
- # A) Obtain synchrones, that is to say the cumulated power consumptions
- # for each of the K1 initial groups
- synchrones = computeSynchrones(medoids, getRefSeries, nb_ref_curves,
- nb_series_per_chunk, ncores_clust, verbose, parll)
+ if (length(indices) <= K2)
+ return (indices)
- # B) Compute the WER distances (Wavelets Extended coefficient of deteRmination)
- distances = computeWerDists(
- synchrones, nvoice, nbytes, endian, ncores_clust, verbose, parll)
+ # A) Compute the WER distances (Wavelets Extended coefficient of deteRmination)
+ distances = computeWerDists(indices, getSeries, nb_series_per_chunk,
+ nvoice, nbytes, endian, ncores_clust, verbose, parll)
- # C) Apply clustering algorithm 2 on the WER distances matrix
+ # B) Apply clustering algorithm 2 on the WER distances matrix
if (verbose)
cat(paste("*** algoClust2() on ",nrow(distances)," items\n", sep=""))
- medoids[ ,algoClust2(distances,K2) ]
+ indices[ algoClust2(distances,K2) ]
}
medoids_desc <- bigmemory::describe(medoids)
cl = parallel::makeCluster(ncores_clust)
parallel::clusterExport(cl, envir=environment(),
- varlist=c("synchrones_desc","m_desc","medoids_desc","getRefSeries"))
+ varlist=c("synchrones_desc","m_desc","medoids_desc","getSeries"))
}
if (verbose)
cat(paste("--- Compute ",K," synchrones with ",nb_curves," series\n", sep=""))
- # Balance tasks by splitting 1:nb_ref_curves into groups of size <= nb_series_per_chunk
- indices_workers = .splitIndices(seq_len(nb_ref_curves), nb_series_per_chunk)
+ # Balance tasks by splitting 1:nb_curves into groups of size <= nb_series_per_chunk
+ indices_workers = .splitIndices(seq_len(nb_curves), nb_series_per_chunk)
ignored <-
if (parll)
parallel::parLapply(cl, indices_workers, computeSynchronesChunk)
#' Compute the WER distances between the synchrones curves (in columns), which are
#' returned (e.g.) by \code{computeSynchrones()}
#'
-#' @param synchrones A big.matrix of synchrones, in columns. The series have same
-#' length as the series in the initial dataset
+#' @param indices Range of series indices to cluster
#' @inheritParams claws
+#' @inheritParams computeSynchrones
#'
-#' @return A distances matrix of size K1 x K1
+#' @return A distances matrix of size K x K where K == length(indices)
#'
#' @export
-computeWerDists = function(synchrones, nvoice, nbytes,endian,ncores_clust=1,
- verbose=FALSE,parll=TRUE)
+computeWerDists = function(indices, getSeries, nb_series_per_chunk, nvoice, nbytes, endian,
+ ncores_clust=1, verbose=FALSE, parll=TRUE)
{
- n <- ncol(synchrones)
- L <- nrow(synchrones)
+ n <- length(indices)
+ L <- length(getSeries(1)) #TODO: not very nice way to get L
noctave = ceiling(log2(L)) #min power of 2 to cover serie range
+ # Since a CWT contains noctave*nvoice complex series, we deduce the number of CWT to
+ # retrieve/put in one chunk.
+ nb_cwt_per_chunk = max(1, floor(nb_series_per_chunk / (nvoice*noctave*2)))
- # Initialize result as a square big.matrix of size 'number of synchrones'
+ # Initialize result as a square big.matrix of size 'number of medoids'
Xwer_dist <- bigmemory::big.matrix(nrow=n, ncol=n, type="double")
# Generate n(n-1)/2 pairs for WER distances computations
}
cwt_file = ".cwt.bin"
- # Compute the synchrones[,indices] CWT, and store the results in the binary file
+ # Compute the getSeries(indices) CWT, and store the results in the binary file
computeSaveCWT = function(indices)
{
if (parll)
require("bigmemory", quietly=TRUE)
require("Rwave", quietly=TRUE)
require("epclust", quietly=TRUE)
- synchrones <- bigmemory::attach.big.matrix(synchrones_desc)
}
# Obtain CWT as big vectors of real part + imaginary part (concatenate)
ts_cwt <- sapply(indices, function(i) {
- ts <- scale(ts(synchrones[,i]), center=TRUE, scale=FALSE)
+ ts <- scale(ts(getSeries(i)), center=TRUE, scale=FALSE)
ts_cwt <- Rwave::cwt(ts, noctave, nvoice, w0=2*pi, twoD=TRUE, plot=FALSE)
c( as.double(Re(ts_cwt)),as.double(Im(ts_cwt)) )
})
# Serialization
- binarize(ts_cwt, cwt_file, 1, ",", nbytes, endian)
+ binarize(ts_cwt, cwt_file, nb_cwt_per_chunk, ",", nbytes, endian)
}
if (parll)
{
cl = parallel::makeCluster(ncores_clust)
- synchrones_desc <- bigmemory::describe(synchrones)
Xwer_dist_desc <- bigmemory::describe(Xwer_dist)
- parallel::clusterExport(cl, varlist=c("parll","synchrones_desc","Xwer_dist_desc",
- "noctave","nvoice","verbose","getCWT"), envir=environment())
+ parallel::clusterExport(cl, varlist=c("parll","nb_cwt_per_chunk","L",
+ "Xwer_dist_desc","noctave","nvoice","getCWT"), envir=environment())
}
if (verbose)
lapply(1:n, computeSaveCWT)
# Function to retrieve a synchrone CWT from (binary) file
- getSynchroneCWT = function(index, L)
+ getCWT = function(index, L)
{
flat_cwt <- getDataInFile(index, cwt_file, nbytes, endian)
cwt_length = length(flat_cwt) / 2
#TODO: better repartition here,
- #better code in .splitIndices :: never exceed nb_per_chunk; arg: min_per_chunk (default: 1)
-###TODO: reintroduire nb_items_clust ======> l'autre est typiquement + grand !!! (pas de relation !)
# parallel workers start with an empty environment
require("bigmemory", quietly=TRUE)
require("epclust", quietly=TRUE)
- synchrones <- bigmemory::attach.big.matrix(synchrones_desc)
Xwer_dist <- bigmemory::attach.big.matrix(Xwer_dist_desc)
}
cat(paste(" Distances (",i,",",j,"), (",i,",",j+1,") ...\n", sep=""))
# Compute CWT of columns i and j in synchrones
- L = nrow(synchrones)
- cwt_i <- getSynchroneCWT(i, L)
- cwt_j <- getSynchroneCWT(j, L)
+ cwt_i <- getCWT(i, L)
+ cwt_j <- getCWT(j, L)
# Compute the ratio of integrals formula 5.6 for WER^2
# in https://arxiv.org/abs/1101.4744v2 §5.3
-#' @name de_serialize
-#' @rdname de_serialize
-#' @aliases binarize binarizeTransform getDataInFile
+#' (De)Serialization of a [big]matrix or data stream
#'
-#' @title (De)Serialization of a [big]matrix or data stream
+#' \code{binarize()} serializes a matrix or CSV file with minimal overhead, into a
+#' binary file. \code{getDataInFile()} achieves the inverse task: she retrieves (ASCII)
+#' data rows from indices in the binary file. Finally, \code{binarizeTransform()}
+#' serialize transformations of all data chunks. To use it a data-retrieval function
+#' must be provided -- thus \code{binarize} will most likely be used first
+#' (and then a function defined to seek in generated binary file)
#'
-#' @description \code{binarize()} serializes a matrix or CSV file with minimal overhead,
-#' into a binary file. \code{getDataInFile()} achieves the inverse task: she retrieves
-#' (ASCII) data rows from indices in the binary file. Finally,
-#' \code{binarizeTransform()} serialize transformations of all data chunks; to use it,
-#' a data-retrieval function must be provided, thus \code{binarize} will most likely be
-#' used first (and then a function defined to seek in generated binary file)
-#'
-#' @param data_ascii Either a matrix or CSV file, with items in rows
-#' @param indices Indices of the lines to retrieve
-#' @param data_bin_file Name of binary file on output (\code{binarize})
-#' or input (\code{getDataInFile})
+#' @param data_ascii Either a matrix (by columns) or CSV file or connection (by rows)
+#' @param data_bin_file Name of binary file on output of (\code{binarize})
+#' or input of (\code{getDataInFile})
#' @param nb_per_chunk Number of lines to process in one batch (big.matrix or connection)
-#' @inheritParams claws
#' @param getData Function to retrieve data chunks
#' @param transform Transformation function to apply on data chunks
+#' @param indices Indices of the lines to retrieve
+#' @inheritParams claws
#'
#' @return For \code{getDataInFile()}, the matrix with rows corresponding to the
#' requested indices. \code{binarizeTransform} returns the number of processed lines.
#' \code{binarize} is designed to serialize in several calls, thus returns nothing.
+#'
+#' @name de_serialize
+#' @rdname de_serialize
+#' @aliases binarize binarizeTransform getDataInFile
NULL
#' @rdname de_serialize
#' }
#' @param K1 Number of clusters to be found after stage 1 (K1 << N [number of series])
#' @param K2 Number of clusters to be found after stage 2 (K2 << K1)
-#' @param nb_series_per_chunk (Maximum) number of series to retrieve in one batch;
-#' this value is also used for the (maximum) number of series to cluster at a time
+#' @param nb_series_per_chunk (Maximum) number of series to retrieve in one batch
+#' @param nb_items_clust (~Maximum) number of items in clustering algorithm 1 input
#' @param algoClust1 Clustering algorithm for stage 1. A function which takes (data, K)
#' as argument where data is a matrix in columns and K the desired number of clusters,
#' and outputs K medoids ranks. Default: PAM. In our method, this function is called
#' digest::sha1(res_db)
#' }
#' @export
-claws <- function(getSeries, K1, K2, nb_series_per_chunk,
+claws <- function(series, K1, K2, nb_series_per_chunk, nb_items_clust=7*K1,
algoClust1=function(data,K) cluster::pam(t(data),K,diss=FALSE,pamonce=1)$id.med,
algoClust2=function(dists,K) cluster::pam(dists,K,diss=TRUE,pamonce=1)$id.med,
wav_filt="d8", contrib_type="absolute", WER="end", nvoice=4, random=TRUE,
K1 <- .toInteger(K1, function(x) x>=2)
K2 <- .toInteger(K2, function(x) x>=2)
nb_series_per_chunk <- .toInteger(nb_series_per_chunk, function(x) x>=1)
- # K1 (number of clusters at step 1) cannot exceed nb_series_per_chunk, because we will need
- # to load K1 series in memory for clustering stage 2.
- if (K1 > nb_series_per_chunk)
- stop("'K1' cannot exceed 'nb_series_per_chunk'")
+ nb_items_clust <- .toInteger(nb_items_clust, function(x) x>K1)
random <- .toLogical(random)
tryCatch( {ignored <- wavelets::wt.filter(wav_filt)},
error = function(e) stop("Invalid wavelet filter; see ?wavelets::wt.filter") )
# Initialize parallel runs: outfile="" allow to output verbose traces in the console
# under Linux. All necessary variables are passed to the workers.
cl = parallel::makeCluster(ncores_tasks, outfile="")
- parallel::clusterExport(cl, envir = environment(),
- varlist = c("getSeries","getContribs","K1","K2","algoClust1","algoClust2",
- "nb_series_per_chunk","ncores_clust","nvoice","nbytes","endian","verbose","parll"))
+ varlist = c("ncores_clust","verbose","parll", #task 1 & 2
+ "K1","getContribs","algoClust1","nb_items_clust") #task 1
+ if (WER=="mix")
+ {
+ # Add variables for task 2
+ varlist = c(varlist, "K2","getSeries","algoClust2","nb_series_per_chunk",
+ "nvoice","nbytes","endian")
+ }
+ parallel::clusterExport(cl, varlist, envir = environment())
}
# This function achieves one complete clustering task, divided in stage 1 + stage 2.
- # stage 1: n indices --> clusteringTask1(...) --> K1 medoids
- # stage 2: K1 medoids --> clusteringTask2(...) --> K2 medoids,
+ # stage 1: n indices --> clusteringTask1(...) --> K1 medoids (indices)
+ # stage 2: K1 indices --> K1xK1 WER distances --> clusteringTask2(...) --> K2 medoids,
# where n = N / ntasks, N being the total number of curves.
runTwoStepClustering = function(inds)
{
if (parll && ntasks>1)
require("epclust", quietly=TRUE)
indices_medoids = clusteringTask1(inds, getContribs, K1, algoClust1,
- nb_series_per_chunk, ncores_clust, verbose, parll)
+ nb_items_clust, ncores_clust, verbose, parll)
if (WER=="mix")
{
indices_medoids = clusteringTask2(indices_medoids, getSeries, K2, algoClust2,
cat(paste(message,"\n", sep=""))
}
- # As explained above, indices will be assigned to ntasks*K1 medoids indices [if WER=="end"],
- # or nothing (empty vector) if WER=="mix"; in this case, synchrones are stored in a file.
+ # As explained above, we obtain after all runs ntasks*[K1 or K2] medoids indices,
+ # depending wether WER=="end" or "mix", respectively.
indices_medoids_all <-
if (parll && ntasks>1)
unlist( parallel::parLapply(cl, indices_tasks, runTwoStepClustering) )
if (parll && ntasks>1)
parallel::stopCluster(cl)
- # Right before the final stage, input data still is the initial set of curves, referenced
- # by the ntasks*[K1 or K2] medoids indices.
+ # For the last stage, ncores_tasks*(ncores_clusts+1) cores should be available:
+ # - ntasks for level 1 parallelism
+ # - ntasks*ncores_clust for level 2 parallelism,
+ # but since an extension MPI <--> tasks / OpenMP <--> sub-tasks is on the way,
+ # it's better to just re-use ncores_clust
+ ncores_last_stage <- ncores_clust
- # Run last clustering tasks to obtain only K2 medoids indices, from ntasks*[K1 or K2]
- # indices, depending wether WER=="end" or WER=="mix"
+ # Run last clustering tasks to obtain only K2 medoids indices
if (verbose)
cat("...Run final // stage 1 + stage 2\n")
indices_medoids = clusteringTask1(indices_medoids_all, getContribs, K1, algoClust1,
- nb_series_per_chunk, ncores_tasks*ncores_clust, verbose, parll)
+ nb_items_clust, ncores_tasks*ncores_clust, verbose, parll)
indices_medoids = clusteringTask2(indices_medoids, getContribs, K2, algoClust2,
- nb_series_per_chunk, nvoice, nbytes, endian, ncores_tasks*ncores_clust, verbose, parll)
+ nb_series_per_chunk, nvoice, nbytes, endian, ncores_last_stage, verbose, parll)
- # Compute synchrones
+ # Compute synchrones, that is to say the cumulated power consumptions for each of the K2
+ # final groups.
medoids = getSeries(indices_medoids)
synchrones = computeSynchrones(medoids, getSeries, nb_curves, nb_series_per_chunk,
- ncores_tasks*ncores_clust, verbose, parll)
+ ncores_last_stage, verbose, parll)
# NOTE: no need to use big.matrix here, since there are only K2 << K1 << N remaining curves
list("medoids"=medoids, "ranks"=indices_medoids, "synchrones"=synchrones)
#' Compute the discrete wavelet coefficients for each series, and aggregate them in
#' energy contribution across scales as described in https://arxiv.org/abs/1101.4744v2
#'
-#' @param series [big.]matrix of series (in columns), of size L x n
+#' @param curves [big.]matrix of series (in columns), of size L x n
#' @inheritParams claws
#'
#' @return A matrix of size log(L) x n containing contributions in columns
#'
#' @export
-curvesToContribs = function(series, wav_filt, contrib_type, coin=FALSE)
+curvesToContribs = function(series, wav_filt, contrib_type)
{
L = nrow(series)
D = ceiling( log2(L) )
})
}
-# Helper function to divide indices into balanced sets
-# If max == TRUE, sets sizes cannot exceed nb_per_set
-.splitIndices = function(indices, nb_per_set, max=FALSE)
+# Helper function to divide indices into balanced sets.
+# Ensure that all indices sets have at least min_size elements.
+.splitIndices = function(indices, nb_per_set, min_size=1)
{
L = length(indices)
nb_workers = floor( L / nb_per_set )
if (nb_workers == 0 || (nb_workers==1 && rem==0))
{
# L <= nb_per_set, simple case
- indices_workers = list(indices)
+ return (list(indices))
}
- else
- {
- indices_workers = lapply( seq_len(nb_workers), function(i)
- indices[(nb_per_set*(i-1)+1):(nb_per_set*i)] )
- if (max)
- {
- # Sets are not so well balanced, but size is supposed to be critical
- return ( c( indices_workers, if (rem>0) list((L-rem+1):L) else NULL ) )
- }
+ indices_workers = lapply( seq_len(nb_workers), function(i)
+ indices[(nb_per_set*(i-1)+1):(nb_per_set*i)] )
- # Spread the remaining load among the workers
- rem = L %% nb_per_set
- while (rem > 0)
+ rem = L %% nb_per_set #number of remaining unassigned items
+ if (rem == 0)
+ return (indices_workers)
+
+ rem <- (L-rem+1):L
+ # If remainder is smaller than min_size, feed it with indices from other sets
+ # until either its size exceed min_size (success) or other sets' size
+ # get lower min_size (failure).
+ while (length(rem) < min_size)
+ {
+ index = length(rem) %% nb_workers + 1
+ if (length(indices_workers[[index]]) <= min_size)
{
- index = rem%%nb_workers + 1
- indices_workers[[index]] = c(indices_workers[[index]], indices[L-rem+1])
- rem = rem - 1
+ stop("Impossible to split indices properly for clustering.
+ Try increasing nb_items_clust or decreasing K1")
}
+ rem = c(rem, tail(indices_workers[[index]],1))
+ indices_workers[[index]] = head( indices_workers[[index]], -1)
}
- indices_workers
+ return ( c(indices_workers, list(rem) ) )
}
#' filterMA
#' @param M_ A real matrix of size LxD
#' @param w_ The (odd) number of values to average
#'
-#' @return The filtered matrix, of same size as the input
+#' @return The filtered matrix (in columns), of same size as the input
#' @export
filterMA = function(M_, w_)
.Call("filterMA", M_, w_, PACKAGE="epclust")