parallel version running; TODO: check==sequential, plotting routines, parser; check...
[epclust.git] / epclust / R / main.R
index bf31c87..bba0618 100644 (file)
@@ -161,22 +161,6 @@ claws = function(getSeries, K1, K2,
        if (nb_series_per_task < min_series_per_chunk)
                stop("Too many tasks: less series in one task than min_series_per_chunk!")
 
-       # Cluster contributions in parallel (by nb_series_per_chunk)
-       indices_all = if (random) sample(nb_curves) else seq_len(nb_curves)
-       indices_tasks = lapply(seq_len(ntasks), function(i) {
-               upper_bound = ifelse( i<ntasks, min(nb_series_per_task*i,nb_curves), nb_curves )
-               indices_all[((i-1)*nb_series_per_task+1):upper_bound]
-       })
-       if (verbose)
-               cat(paste("...Run ",ntasks," x stage 1 in parallel\n",sep=""))
-       if (parll)
-       {
-               cl = parallel::makeCluster(ncores_tasks)
-               parallel::clusterExport(cl, varlist=c("getSeries","getContribs","K1","K2","verbose","parll",
-                       "nb_series_per_chunk","ncores_clust","synchrones_file","sep","nbytes","endian"),
-                       envir = environment())
-       }
-
        runTwoStepClustering = function(inds)
        {
                if (parll)
@@ -193,6 +177,26 @@ claws = function(getSeries, K1, K2,
                indices_medoids
        }
 
+       # Cluster contributions in parallel (by nb_series_per_chunk)
+       indices_all = if (random) sample(nb_curves) else seq_len(nb_curves)
+       indices_tasks = lapply(seq_len(ntasks), function(i) {
+               upper_bound = ifelse( i<ntasks, min(nb_series_per_task*i,nb_curves), nb_curves )
+               indices_all[((i-1)*nb_series_per_task+1):upper_bound]
+       })
+       if (verbose)
+               cat(paste("...Run ",ntasks," x stage 1 in parallel\n",sep=""))
+       if (WER=="mix")
+               {synchrones_file = paste(bin_dir,"synchrones",sep="") ; unlink(synchrones_file)}
+       if (parll)
+       {
+               cl = parallel::makeCluster(ncores_tasks)
+               varlist = c("getSeries","getContribs","K1","K2","verbose","parll",
+                       "nb_series_per_chunk","ncores_clust","sep","nbytes","endian")
+               if (WER=="mix")
+                       varlist = c(varlist, "synchrones_file")
+               parallel::clusterExport(cl, varlist=varlist, envir = environment())
+       }
+
        # 1000*K1 indices [if WER=="end"], or empty vector [if WER=="mix"] --> series on file
        if (parll)
                indices = unlist( parallel::parLapply(cl, indices_tasks, runTwoStepClustering) )
@@ -202,7 +206,6 @@ claws = function(getSeries, K1, K2,
                parallel::stopCluster(cl)
 
        getRefSeries = getSeries
-       synchrones_file = paste(bin_dir,"synchrones",sep="") ; unlink(synchrones_file)
        if (WER=="mix")
        {
                indices = seq_len(ntasks*K2)