From 48108c3999d28d973443fa5e78f73a0a9f2bfc07 Mon Sep 17 00:00:00 2001 From: Benjamin Auder <benjamin.auder@somewhere> Date: Fri, 3 Mar 2017 15:29:08 +0100 Subject: [PATCH] export vars to nodes --- epclust/R/clustering.R | 25 ++++++++++++------------- epclust/R/main.R | 28 +++++++++++++--------------- epclust/R/utils.R | 11 ++++------- 3 files changed, 29 insertions(+), 35 deletions(-) diff --git a/epclust/R/clustering.R b/epclust/R/clustering.R index 42e894c..578b2f3 100644 --- a/epclust/R/clustering.R +++ b/epclust/R/clustering.R @@ -1,20 +1,19 @@ # Cluster one full task (nb_curves / ntasks series) -clusteringTask = function(K1, K2, WER, nb_series_per_chunk, indices_tasks, ncores_clust) +clusteringTask = function(indices_clust) { cl_clust = parallel::makeCluster(ncores_clust) - #parallel::clusterExport(cl=cl_clust, varlist=c("fonctions_du_package"), envir=environment()) - indices_clust = indices_task[[i]] + parallel::clusterExport(cl_clust, + varlist=c("K1","K2","WER"), + envir=environment()) repeat { nb_workers = max( 1, round( length(indices_clust) / nb_series_per_chunk ) ) - indices_workers = list() - for (i in 1:nb_workers) - { + indices_workers = lapply(seq_len(nb_workers), function(i) { upper_bound = ifelse( i<nb_workers, min(nb_series_per_chunk*i,length(indices_clust)), length(indices_clust) ) - indices_workers[[i]] = indices_clust[(nb_series_per_chunk*(i-1)+1):upper_bound] - } - indices_clust = parallel::parLapply(cl, indices_workers, clusterChunk, K1, K2*(WER=="mix")) + indices_clust[(nb_series_per_chunk*(i-1)+1):upper_bound] + }) + indices_clust = parallel::parLapply(cl, indices_workers, clusterChunk) # TODO: soft condition between K2 and K1, before applying final WER step if ((WER=="end" && length(indices_clust)==K1) || (WER=="mix" && length(indices_clust)==K2)) break @@ -24,17 +23,17 @@ clusteringTask = function(K1, K2, WER, nb_series_per_chunk, indices_tasks, ncore } # Cluster a chunk of series inside one task (~max nb_series_per_chunk) -clusterChunk = function(indices, K1, K2) +clusterChunk = function(indices_chunk) { - coeffs = getCoeffs(indices) + coeffs = readCoeffs(indices_chunk) cl = computeClusters(as.matrix(coeffs[,2:ncol(coeffs)]), K1, diss=FALSE) - if (K2 > 0) + if (WER=="mix" > 0) { curves = computeSynchrones(cl) dists = computeWerDists(curves) cl = computeClusters(dists, K2, diss=TRUE) } - indices[cl] + indices_chunk[cl] } # Apply the clustering algorithm (PAM) on a coeffs or distances matrix diff --git a/epclust/R/main.R b/epclust/R/main.R index f5ad81a..75041a4 100644 --- a/epclust/R/main.R +++ b/epclust/R/main.R @@ -67,8 +67,7 @@ epclust = function(data, K1, K2, ntasks=1, nb_series_per_chunk=50*K1, min_series stop("WER takes values in {'end','mix'}") # Serialize all wavelets coefficients (+ IDs) onto a file - coeffs_file = ".coeffs" - ids_files = ".ids" + unlink(".coeffs") index = 1 nb_curves = 0 nb_coeffs = NA @@ -77,7 +76,7 @@ epclust = function(data, K1, K2, ntasks=1, nb_series_per_chunk=50*K1, min_series coeffs_chunk = computeCoeffs(data, index, nb_series_per_chunk, wf) if (is.null(coeffs_chunk)) break - serialize(coeffs_chunk, coeffs_file, append=TRUE) + writeCoeffs(coeffs_chunk) index = index + nb_series_per_chunk nb_curves = nb_curves + nrow(coeffs_chunk) if (is.na(nb_coeffs)) @@ -91,22 +90,21 @@ epclust = function(data, K1, K2, ntasks=1, nb_series_per_chunk=50*K1, min_series stop("Too many tasks: less series in one task than min_series_per_chunk!") # Cluster coefficients in parallel (by nb_series_per_chunk) - indices = if (random) sample(nb_curves) else seq_len(nb_curves) #all indices - indices_tasks = list() #indices to be processed in each task - for (i in seq_len(ntasks)) - { + indices = if (random) sample(nb_curves) else seq_len(nb_curves) + indices_tasks = lapply(seq_len(ntasks), function(i) { upper_bound = ifelse( i<ntasks, min(nb_series_per_task*i,nb_curves), nb_curves ) - indices_task[[i]] = indices[((i-1)*nb_series_per_task+1):upper_bound] - } + indices[((i-1)*nb_series_per_task+1):upper_bound] + }) library(parallel, quietly=TRUE) cl_tasks = parallel::makeCluster(ncores_tasks) - #parallel::clusterExport(cl=cl_tasks, varlist=c("ncores_clust", ...), envir=environment()) - indices = parallel::parLapply(cl_tasks, indices_tasks, clusteringStep12, ) + parallel::clusterExport(cl_tasks, + varlist=c("K1","K2","WER","nb_series_per_chunk","ncores_clust"),#TODO: pass also + #nb_coeffs...and filename (in a list... ?) + envir=environment()) + indices = parallel::parLapply(cl_tasks, indices_tasks, clusteringTask) parallel::stopCluster(cl_tasks) -##TODO: passer data ?! - # Run step1+2 step on resulting ranks - ranks = clusteringStep12() - return (list("ranks"=ranks, "medoids"=getSeries(data, ranks))) + indices = clusterChunk(indices, K1, K2) + return (list("indices"=indices, "medoids"=getSeries(data, indices))) } diff --git a/epclust/R/utils.R b/epclust/R/utils.R index 347c2c6..e0f25ec 100644 --- a/epclust/R/utils.R +++ b/epclust/R/utils.R @@ -10,8 +10,9 @@ toInteger <- function(x, condition) x } -serialize = function(coeffs, file, append) +writeCoeffs = function(coeffs) { + file = ".coeffs" #......... #C function (from data.frame, type of IDs ??! force integers ? [yes]) #return raw vector @@ -19,9 +20,10 @@ serialize = function(coeffs, file, append) #TODO: appendCoeffs() en C --> serialize et append to file } -deserialize = function(file, range, ncoefs) +readCoeffs = function(indices) { #...... + file = ".coeffs" #C function (from file name) } @@ -29,8 +31,3 @@ getSeries(data, rank=NULL, id=NULL) { #TODO: } - -getCoeffs(.....) #FROM BINARY FILE !!! -{ - -} -- 2.44.0