+ indices_all[((i-1)*nb_series_per_task+1):upper_bound]
+ })
+ if (verbose)
+ {
+ message = paste("...Run ",ntasks," x stage 1", sep="")
+ if (WER=="mix")
+ message = paste(message," + stage 2", sep="")
+ cat(paste(message,"\n", sep=""))
+ }
+ if (WER=="mix")
+ {synchrones_file = paste(bin_dir,"synchrones",sep="") ; unlink(synchrones_file)}
+ if (parll && ntasks>1)
+ {
+ cl = parallel::makeCluster(ncores_tasks, outfile="")
+ varlist = c("getSeries","getContribs","K1","K2","verbose","parll",
+ "nb_series_per_chunk","ntasks","ncores_clust","sep","nbytes","endian")
+ if (WER=="mix")
+ varlist = c(varlist, "synchrones_file")
+ parallel::clusterExport(cl, varlist=varlist, envir = environment())
+ }
+
+ # 1000*K1 indices [if WER=="end"], or empty vector [if WER=="mix"] --> series on file
+ indices <-
+ if (parll && ntasks>1)
+ unlist( parallel::parLapply(cl, indices_tasks, runTwoStepClustering) )
+ else
+ unlist( lapply(indices_tasks, runTwoStepClustering) )
+ if (parll && ntasks>1)
+ parallel::stopCluster(cl)
+
+ getRefSeries = getSeries
+ if (WER=="mix")
+ {
+ indices = seq_len(ntasks*K2)
+ #Now series must be retrieved from synchrones_file
+ getSeries = function(inds) getDataInFile(inds, synchrones_file, nbytes, endian)
+ #Contributions must be re-computed
+ unlink(contribs_file)
+ index = 1
+ if (verbose)
+ cat("...Serialize contributions computed on synchrones\n")
+ ignored = binarizeTransform(getSeries,
+ function(series) curvesToContribs(series, wf, ctype),
+ contribs_file, nb_series_per_chunk, nbytes, endian)
+ }
+
+ # Run step2 on resulting indices or series (from file)
+ if (verbose)
+ cat("...Run final // stage 1 + stage 2\n")
+ indices_medoids = clusteringTask1(
+ indices, getContribs, K1, nb_series_per_chunk, ncores_tasks*ncores_clust, verbose, parll)
+ medoids1 = bigmemory::as.big.matrix( getSeries(indices_medoids) )
+ medoids2 = clusteringTask2(medoids1, K2, getRefSeries, nb_curves, nb_series_per_chunk,
+ nbytes, endian, ncores_tasks*ncores_clust, verbose, parll)
+
+ # Cleanup
+ unlink(bin_dir, recursive=TRUE)
+
+ medoids2[,]
+}
+
+#' curvesToContribs
+#'
+#' Compute the discrete wavelet coefficients for each series, and aggregate them in
+#' energy contribution across scales as described in https://arxiv.org/abs/1101.4744v2
+#'
+#' @param series [big.]matrix of series (in columns), of size L x n
+#' @inheritParams claws
+#'
+#' @return A [big.]matrix of size log(L) x n containing contributions in columns
+#'
+#' @export
+curvesToContribs = function(series, wav_filt, contrib_type)
+{
+ L = nrow(series)
+ D = ceiling( log2(L) )
+ nb_sample_points = 2^D
+ apply(series, 2, function(x) {
+ interpolated_curve = spline(1:L, x, n=nb_sample_points)$y
+ W = wavelets::dwt(interpolated_curve, filter=wf, D)@W
+ nrj = rev( sapply( W, function(v) ( sqrt( sum(v^2) ) ) ) )
+ if (contrib_type!="absolute")
+ nrj = nrj / sum(nrj)
+ if (contrib_type=="logit")
+ nrj = - log(1 - nrj)
+ nrj
+ })
+}
+
+# Check integer arguments with functional conditions
+.toInteger <- function(x, condition)
+{
+ errWarn <- function(ignored)
+ paste("Cannot convert argument' ",substitute(x),"' to integer", sep="")
+ if (!is.integer(x))
+ tryCatch({x = as.integer(x)[1]; if (is.na(x)) stop()},
+ warning = errWarn, error = errWarn)
+ if (!condition(x))
+ {
+ stop(paste("Argument '",substitute(x),
+ "' does not verify condition ",body(condition), sep=""))