add some TODOs
[epclust.git] / epclust / R / main.R
index 9064dfa..f1e435c 100644 (file)
@@ -161,22 +161,6 @@ claws = function(getSeries, K1, K2,
        if (nb_series_per_task < min_series_per_chunk)
                stop("Too many tasks: less series in one task than min_series_per_chunk!")
 
-       # Cluster contributions in parallel (by nb_series_per_chunk)
-       indices_all = if (random) sample(nb_curves) else seq_len(nb_curves)
-       indices_tasks = lapply(seq_len(ntasks), function(i) {
-               upper_bound = ifelse( i<ntasks, min(nb_series_per_task*i,nb_curves), nb_curves )
-               indices_all[((i-1)*nb_series_per_task+1):upper_bound]
-       })
-       if (verbose)
-               cat(paste("...Run ",ntasks," x stage 1 in parallel\n",sep=""))
-       if (parll)
-       {
-               cl = parallel::makeCluster(ncores_tasks)
-               parallel::clusterExport(cl, varlist=c("getSeries","getContribs","K1","K2","verbose","parll",
-                       "nb_series_per_chunk","ncores_clust","synchrones_file","sep","nbytes","endian"),
-                       envir = environment())
-       }
-
        runTwoStepClustering = function(inds)
        {
                if (parll)
@@ -185,6 +169,15 @@ claws = function(getSeries, K1, K2,
                        inds, getContribs, K1, nb_series_per_chunk, ncores_clust, verbose, parll)
                if (WER=="mix")
                {
+
+
+
+
+#TODO: getSeries(indices_medoids) BAD ; il faudrait une big.matrix de medoids en entree
+                       #OK en RAM il y en aura 1000 (donc 1000*K1*17519... OK)
+                       #...mais du coup chaque process ne re-dupliquera pas medoids
+
+
                        medoids2 = computeClusters2(getSeries(indices_medoids),
                                K2, getSeries, nb_curves, nb_series_per_chunk, ncores_clust, verbose, parll)
                        binarize(medoids2, synchrones_file, nb_series_per_chunk, sep, nbytes, endian)
@@ -193,6 +186,26 @@ claws = function(getSeries, K1, K2,
                indices_medoids
        }
 
+       # Cluster contributions in parallel (by nb_series_per_chunk)
+       indices_all = if (random) sample(nb_curves) else seq_len(nb_curves)
+       indices_tasks = lapply(seq_len(ntasks), function(i) {
+               upper_bound = ifelse( i<ntasks, min(nb_series_per_task*i,nb_curves), nb_curves )
+               indices_all[((i-1)*nb_series_per_task+1):upper_bound]
+       })
+       if (verbose)
+               cat(paste("...Run ",ntasks," x stage 1 in parallel\n",sep=""))
+       if (WER=="mix")
+               {synchrones_file = paste(bin_dir,"synchrones",sep="") ; unlink(synchrones_file)}
+       if (parll)
+       {
+               cl = parallel::makeCluster(ncores_tasks)
+               varlist = c("getSeries","getContribs","K1","K2","verbose","parll",
+                       "nb_series_per_chunk","ncores_clust","sep","nbytes","endian")
+               if (WER=="mix")
+                       varlist = c(varlist, "synchrones_file")
+               parallel::clusterExport(cl, varlist=varlist, envir = environment())
+       }
+
        # 1000*K1 indices [if WER=="end"], or empty vector [if WER=="mix"] --> series on file
        if (parll)
                indices = unlist( parallel::parLapply(cl, indices_tasks, runTwoStepClustering) )
@@ -202,7 +215,6 @@ claws = function(getSeries, K1, K2,
                parallel::stopCluster(cl)
 
        getRefSeries = getSeries
-       synchrones_file = paste(bin_dir,"synchrones",sep="") ; unlink(synchrones_file)
        if (WER=="mix")
        {
                indices = seq_len(ntasks*K2)
@@ -222,9 +234,9 @@ claws = function(getSeries, K1, K2,
        if (verbose)
                cat("...Run final // stage 1 + stage 2\n")
        indices_medoids = clusteringTask1(
-               indices, getContribs, K1, nb_series_per_chunk, ncores_tasks*ncores_clust, verbose)
-       medoids = computeClusters2(getSeries(indices_medoids),
-               K2, getRefSeries, nb_curves, nb_series_per_chunk, ncores_tasks*ncores_clust, verbose)
+               indices, getContribs, K1, nb_series_per_chunk, ncores_tasks*ncores_clust, verbose, parll)
+       medoids = computeClusters2(getSeries(indices_medoids), K2,
+               getRefSeries, nb_curves, nb_series_per_chunk, ncores_tasks*ncores_clust, verbose, parll)
 
        # Cleanup
        unlink(bin_dir, recursive=TRUE)