- library(parallel, quietly=TRUE)
- cl_tasks = parallel::makeCluster(ncores_tasks)
- parallel::clusterExport(cl_tasks,
- varlist=c("K1","K2","WER","nb_series_per_chunk","ncores_clust"),#TODO: pass also
- #nb_coeffs...and filename (in a list... ?)
- envir=environment())
- indices = parallel::parLapply(cl_tasks, indices_tasks, clusteringTask)
- parallel::stopCluster(cl_tasks)
-
- # Run step1+2 step on resulting ranks
- indices = clusterChunk(indices, K1, K2)
- return (list("indices"=indices, "medoids"=getSeries(data, indices)))
+ if (verbose)
+ cat(paste("...Run ",ntasks," x stage 1 in parallel\n",sep=""))
+ if (WER=="mix")
+ {synchrones_file = paste(bin_dir,"synchrones",sep="") ; unlink(synchrones_file)}
+ if (parll)
+ {
+ cl = parallel::makeCluster(ncores_tasks)
+ varlist = c("getSeries","getContribs","K1","K2","verbose","parll",
+ "nb_series_per_chunk","ncores_clust","sep","nbytes","endian")
+ if (WER=="mix")
+ varlist = c(varlist, "synchrones_file")
+ parallel::clusterExport(cl, varlist=varlist, envir = environment())
+ }
+
+ # 1000*K1 indices [if WER=="end"], or empty vector [if WER=="mix"] --> series on file
+ if (parll)
+ indices = unlist( parallel::parLapply(cl, indices_tasks, runTwoStepClustering) )
+ else
+ indices = unlist( lapply(indices_tasks, runTwoStepClustering) )
+ if (parll)
+ parallel::stopCluster(cl)
+
+ getRefSeries = getSeries
+ if (WER=="mix")
+ {
+ indices = seq_len(ntasks*K2)
+ #Now series must be retrieved from synchrones_file
+ getSeries = function(inds) getDataInFile(inds, synchrones_file, nbytes, endian)
+ #Contributions must be re-computed
+ unlink(contribs_file)
+ index = 1
+ if (verbose)
+ cat("...Serialize contributions computed on synchrones\n")
+ ignored = binarizeTransform(getSeries,
+ function(series) curvesToContribs(series, wf, ctype),
+ contribs_file, nb_series_per_chunk, nbytes, endian)
+ }
+
+
+
+#TODO: if ntasks==1, c'est deja terminé
+
+ # Run step2 on resulting indices or series (from file)
+ if (verbose)
+ cat("...Run final // stage 1 + stage 2\n")
+ indices_medoids = clusteringTask1(
+ indices, getContribs, K1, nb_series_per_chunk, ncores_tasks*ncores_clust, verbose, parll)
+ medoids = computeClusters2(getSeries(indices_medoids), K2,
+ getRefSeries, nb_curves, nb_series_per_chunk, ncores_tasks*ncores_clust, verbose, parll)
+
+ # Cleanup
+ unlink(bin_dir, recursive=TRUE)
+
+ medoids
+}
+
+#' curvesToContribs
+#'
+#' Compute the discrete wavelet coefficients for each series, and aggregate them in
+#' energy contribution across scales as described in https://arxiv.org/abs/1101.4744v2
+#'
+#' @param series Matrix of series (in rows), of size n x L
+#' @inheritParams claws
+#'
+#' @return A matrix of size n x log(L) containing contributions in rows
+#'
+#' @export
+curvesToContribs = function(series, wf, ctype)
+{
+ L = length(series[1,])
+ D = ceiling( log2(L) )
+ nb_sample_points = 2^D
+ cont_types = c("relative","absolute")
+ ctype = cont_types[ pmatch(ctype,cont_types) ]
+ t( apply(series, 1, function(x) {
+ interpolated_curve = spline(1:L, x, n=nb_sample_points)$y
+ W = wavelets::dwt(interpolated_curve, filter=wf, D)@W
+ nrj = rev( sapply( W, function(v) ( sqrt( sum(v^2) ) ) ) )
+ if (ctype=="relative") nrj / sum(nrj) else nrj
+ }) )
+}
+
+# Check integer arguments with functional conditions
+.toInteger <- function(x, condition)
+{
+ if (!is.integer(x))
+ tryCatch(
+ {x = as.integer(x)[1]},
+ error = function(e) paste("Cannot convert argument",substitute(x),"to integer")
+ )
+ if (!condition(x))
+ stop(paste("Argument",substitute(x),"does not verify condition",body(condition)))
+ x