parallel version running; TODO: check==sequential, plotting routines, parser; check...
authorBenjamin Auder <benjamin.auder@somewhere>
Wed, 8 Mar 2017 02:12:36 +0000 (03:12 +0100)
committerBenjamin Auder <benjamin.auder@somewhere>
Wed, 8 Mar 2017 02:12:36 +0000 (03:12 +0100)
epclust/R/clustering.R
epclust/R/main.R
epclust/R/plot.R [new file with mode: 0644]

index 14e1f83..9a55495 100644 (file)
@@ -93,6 +93,8 @@ computeSynchrones = function(medoids, getRefSeries,
 {
        computeSynchronesChunk = function(indices)
        {
+               if (verbose)
+                       cat(paste("--- Compute synchrones for ",length(indices)," lines\n", sep=""))
                ref_series = getRefSeries(indices)
                #get medoids indices for this chunk of series
                for (i in seq_len(nrow(ref_series)))
@@ -118,17 +120,14 @@ computeSynchrones = function(medoids, getRefSeries,
                m <- synchronicity::boost.mutex()
 
        indices_workers = .spreadIndices(seq_len(nb_ref_curves), nb_series_per_chunk)
-       for (inds in indices_workers)
-       {
-               if (verbose)
-                       cat(paste("--- Compute synchrones for ",length(inds)," lines\n", sep=""))
+       ignored <-
                if (parll)
-                       ignored <- parallel::mcparallel(computeSynchronesChunk(inds))
+               {
+                       parallel::mclapply(indices_workers, computeSynchronesChunk,
+                               mc.cores=ncores_clust, mc.allow.recursive=FALSE)
+               }
                else
-                       computeSynchronesChunk(inds)
-       }
-       if (parll)
-               parallel::mccollect()
+                       lapply(indices_workers, computeSynchronesChunk)
 
        mat_syncs = matrix(nrow=K, ncol=ncol(medoids))
        vec_count = rep(NA, K)
@@ -234,18 +233,16 @@ computeWerDists = function(synchrones, ncores_clust=1,verbose=FALSE,parll=TRUE)
        if (parll)
                m <- synchronicity::boost.mutex()
 
-       for (i in 1:(n-1))
-       {
+       ignored <-
                if (parll)
-                       ignored <- parallel::mcparallel(computeDistancesLineI(i))
+               {
+                       parallel::mclapply(seq_len(n-1), computeDistancesLineI,
+                               mc.cores=ncores_clust, mc.allow.recursive=FALSE)
+               }
                else
-                       computeDistancesLineI(i)
-       }
+                       lapply(seq_len(n-1), computeDistancesLineI)
        Xwer_dist[n,n] = 0.
 
-       if (parll)
-               parallel::mccollect()
-
        mat_dists = matrix(nrow=n, ncol=n)
        #TODO: avoid this loop?
        for (i in 1:n)
index bf31c87..bba0618 100644 (file)
@@ -161,22 +161,6 @@ claws = function(getSeries, K1, K2,
        if (nb_series_per_task < min_series_per_chunk)
                stop("Too many tasks: less series in one task than min_series_per_chunk!")
 
-       # Cluster contributions in parallel (by nb_series_per_chunk)
-       indices_all = if (random) sample(nb_curves) else seq_len(nb_curves)
-       indices_tasks = lapply(seq_len(ntasks), function(i) {
-               upper_bound = ifelse( i<ntasks, min(nb_series_per_task*i,nb_curves), nb_curves )
-               indices_all[((i-1)*nb_series_per_task+1):upper_bound]
-       })
-       if (verbose)
-               cat(paste("...Run ",ntasks," x stage 1 in parallel\n",sep=""))
-       if (parll)
-       {
-               cl = parallel::makeCluster(ncores_tasks)
-               parallel::clusterExport(cl, varlist=c("getSeries","getContribs","K1","K2","verbose","parll",
-                       "nb_series_per_chunk","ncores_clust","synchrones_file","sep","nbytes","endian"),
-                       envir = environment())
-       }
-
        runTwoStepClustering = function(inds)
        {
                if (parll)
@@ -193,6 +177,26 @@ claws = function(getSeries, K1, K2,
                indices_medoids
        }
 
+       # Cluster contributions in parallel (by nb_series_per_chunk)
+       indices_all = if (random) sample(nb_curves) else seq_len(nb_curves)
+       indices_tasks = lapply(seq_len(ntasks), function(i) {
+               upper_bound = ifelse( i<ntasks, min(nb_series_per_task*i,nb_curves), nb_curves )
+               indices_all[((i-1)*nb_series_per_task+1):upper_bound]
+       })
+       if (verbose)
+               cat(paste("...Run ",ntasks," x stage 1 in parallel\n",sep=""))
+       if (WER=="mix")
+               {synchrones_file = paste(bin_dir,"synchrones",sep="") ; unlink(synchrones_file)}
+       if (parll)
+       {
+               cl = parallel::makeCluster(ncores_tasks)
+               varlist = c("getSeries","getContribs","K1","K2","verbose","parll",
+                       "nb_series_per_chunk","ncores_clust","sep","nbytes","endian")
+               if (WER=="mix")
+                       varlist = c(varlist, "synchrones_file")
+               parallel::clusterExport(cl, varlist=varlist, envir = environment())
+       }
+
        # 1000*K1 indices [if WER=="end"], or empty vector [if WER=="mix"] --> series on file
        if (parll)
                indices = unlist( parallel::parLapply(cl, indices_tasks, runTwoStepClustering) )
@@ -202,7 +206,6 @@ claws = function(getSeries, K1, K2,
                parallel::stopCluster(cl)
 
        getRefSeries = getSeries
-       synchrones_file = paste(bin_dir,"synchrones",sep="") ; unlink(synchrones_file)
        if (WER=="mix")
        {
                indices = seq_len(ntasks*K2)
diff --git a/epclust/R/plot.R b/epclust/R/plot.R
new file mode 100644 (file)
index 0000000..096159d
--- /dev/null
@@ -0,0 +1,3 @@
+#TODO: some visualization
+#for (i in 1:6) {plot(medoids_ascii[i,1:200],type="l",ylim=c(-5,5),col=i);par(new=TRUE)}
+#...