From 48108c3999d28d973443fa5e78f73a0a9f2bfc07 Mon Sep 17 00:00:00 2001 From: Benjamin Auder Date: Fri, 3 Mar 2017 15:29:08 +0100 Subject: [PATCH] export vars to nodes --- epclust/R/clustering.R | 25 ++++++++++++------------- epclust/R/main.R | 28 +++++++++++++--------------- epclust/R/utils.R | 11 ++++------- 3 files changed, 29 insertions(+), 35 deletions(-) diff --git a/epclust/R/clustering.R b/epclust/R/clustering.R index 42e894c..578b2f3 100644 --- a/epclust/R/clustering.R +++ b/epclust/R/clustering.R @@ -1,20 +1,19 @@ # Cluster one full task (nb_curves / ntasks series) -clusteringTask = function(K1, K2, WER, nb_series_per_chunk, indices_tasks, ncores_clust) +clusteringTask = function(indices_clust) { cl_clust = parallel::makeCluster(ncores_clust) - #parallel::clusterExport(cl=cl_clust, varlist=c("fonctions_du_package"), envir=environment()) - indices_clust = indices_task[[i]] + parallel::clusterExport(cl_clust, + varlist=c("K1","K2","WER"), + envir=environment()) repeat { nb_workers = max( 1, round( length(indices_clust) / nb_series_per_chunk ) ) - indices_workers = list() - for (i in 1:nb_workers) - { + indices_workers = lapply(seq_len(nb_workers), function(i) { upper_bound = ifelse( i 0) + if (WER=="mix" > 0) { curves = computeSynchrones(cl) dists = computeWerDists(curves) cl = computeClusters(dists, K2, diss=TRUE) } - indices[cl] + indices_chunk[cl] } # Apply the clustering algorithm (PAM) on a coeffs or distances matrix diff --git a/epclust/R/main.R b/epclust/R/main.R index f5ad81a..75041a4 100644 --- a/epclust/R/main.R +++ b/epclust/R/main.R @@ -67,8 +67,7 @@ epclust = function(data, K1, K2, ntasks=1, nb_series_per_chunk=50*K1, min_series stop("WER takes values in {'end','mix'}") # Serialize all wavelets coefficients (+ IDs) onto a file - coeffs_file = ".coeffs" - ids_files = ".ids" + unlink(".coeffs") index = 1 nb_curves = 0 nb_coeffs = NA @@ -77,7 +76,7 @@ epclust = function(data, K1, K2, ntasks=1, nb_series_per_chunk=50*K1, min_series coeffs_chunk = computeCoeffs(data, index, nb_series_per_chunk, wf) if (is.null(coeffs_chunk)) break - serialize(coeffs_chunk, coeffs_file, append=TRUE) + writeCoeffs(coeffs_chunk) index = index + nb_series_per_chunk nb_curves = nb_curves + nrow(coeffs_chunk) if (is.na(nb_coeffs)) @@ -91,22 +90,21 @@ epclust = function(data, K1, K2, ntasks=1, nb_series_per_chunk=50*K1, min_series stop("Too many tasks: less series in one task than min_series_per_chunk!") # Cluster coefficients in parallel (by nb_series_per_chunk) - indices = if (random) sample(nb_curves) else seq_len(nb_curves) #all indices - indices_tasks = list() #indices to be processed in each task - for (i in seq_len(ntasks)) - { + indices = if (random) sample(nb_curves) else seq_len(nb_curves) + indices_tasks = lapply(seq_len(ntasks), function(i) { upper_bound = ifelse( i serialize et append to file } -deserialize = function(file, range, ncoefs) +readCoeffs = function(indices) { #...... + file = ".coeffs" #C function (from file name) } @@ -29,8 +31,3 @@ getSeries(data, rank=NULL, id=NULL) { #TODO: } - -getCoeffs(.....) #FROM BINARY FILE !!! -{ - -} -- 2.44.0