+ if (verbose)
+ cat(paste("...Run ",ntasks," x stage 1 in parallel\n",sep=""))
+ if (parll)
+ {
+ cl = parallel::makeCluster(ncores_tasks)
+ parallel::clusterExport(cl, varlist=c("getSeries","getContribs","K1","K2","verbose","parll",
+ "nb_series_per_chunk","ncores_clust","synchrones_file","sep","nbytes","endian"),
+ envir = environment())
+ }
+
+ runTwoStepClustering = function(inds)
+ {
+ if (parll)
+ require("epclust", quietly=TRUE)
+ indices_medoids = clusteringTask1(
+ inds, getContribs, K1, nb_series_per_chunk, ncores_clust, verbose, parll)
+ if (WER=="mix")
+ {
+ medoids2 = computeClusters2(getSeries(indices_medoids),
+ K2, getSeries, nb_curves, nb_series_per_chunk, ncores_clust, verbose, parll)
+ binarize(medoids2, synchrones_file, nb_series_per_chunk, sep, nbytes, endian)
+ return (vector("integer",0))
+ }
+ indices_medoids
+ }
+
+ # 1000*K1 indices [if WER=="end"], or empty vector [if WER=="mix"] --> series on file
+ if (parll)
+ indices = unlist( parallel::parLapply(cl, indices_tasks, runTwoStepClustering) )
+ else
+ indices = unlist( lapply(indices_tasks, runTwoStepClustering) )
+ if (parll)
+ parallel::stopCluster(cl)
+
+ getRefSeries = getSeries
+ synchrones_file = paste(bin_dir,"synchrones",sep="") ; unlink(synchrones_file)
+ if (WER=="mix")
+ {
+ indices = seq_len(ntasks*K2)
+ #Now series must be retrieved from synchrones_file
+ getSeries = function(inds) getDataInFile(inds, synchrones_file, nbytes, endian)
+ #Contributions must be re-computed
+ unlink(contribs_file)
+ index = 1
+ if (verbose)
+ cat("...Serialize contributions computed on synchrones\n")
+ ignored = binarizeTransform(getSeries,
+ function(series) curvesToContribs(series, wf, ctype),
+ contribs_file, nb_series_per_chunk, nbytes, endian)
+ }
+
+ # Run step2 on resulting indices or series (from file)
+ if (verbose)
+ cat("...Run final // stage 1 + stage 2\n")
+ indices_medoids = clusteringTask1(
+ indices, getContribs, K1, nb_series_per_chunk, ncores_tasks*ncores_clust, verbose, parll)
+ medoids = computeClusters2(getSeries(indices_medoids), K2,
+ getRefSeries, nb_curves, nb_series_per_chunk, ncores_tasks*ncores_clust, verbose, parll)
+
+ # Cleanup
+ unlink(bin_dir, recursive=TRUE)
+
+ medoids
+}
+
+#' curvesToContribs
+#'
+#' Compute the discrete wavelet coefficients for each series, and aggregate them in
+#' energy contribution across scales as described in https://arxiv.org/abs/1101.4744v2
+#'
+#' @param series Matrix of series (in rows), of size n x L
+#' @inheritParams claws
+#'
+#' @return A matrix of size n x log(L) containing contributions in rows
+#'
+#' @export
+curvesToContribs = function(series, wf, ctype)
+{
+ L = length(series[1,])
+ D = ceiling( log2(L) )
+ nb_sample_points = 2^D
+ cont_types = c("relative","absolute")
+ ctype = cont_types[ pmatch(ctype,cont_types) ]
+ t( apply(series, 1, function(x) {
+ interpolated_curve = spline(1:L, x, n=nb_sample_points)$y
+ W = wavelets::dwt(interpolated_curve, filter=wf, D)@W
+ nrj = rev( sapply( W, function(v) ( sqrt( sum(v^2) ) ) ) )
+ if (ctype=="relative") nrj / sum(nrj) else nrj
+ }) )
+}