parallel version running; TODO: check==sequential, plotting routines, parser; check...
[epclust.git] / epclust / R / clustering.R
index 74d009e..9a55495 100644 (file)
@@ -93,6 +93,8 @@ computeSynchrones = function(medoids, getRefSeries,
 {
        computeSynchronesChunk = function(indices)
        {
+               if (verbose)
+                       cat(paste("--- Compute synchrones for ",length(indices)," lines\n", sep=""))
                ref_series = getRefSeries(indices)
                #get medoids indices for this chunk of series
                for (i in seq_len(nrow(ref_series)))
@@ -118,20 +120,14 @@ computeSynchrones = function(medoids, getRefSeries,
                m <- synchronicity::boost.mutex()
 
        indices_workers = .spreadIndices(seq_len(nb_ref_curves), nb_series_per_chunk)
-       for (inds in indices_workers)
-       {
-               if (verbose)
+       ignored <-
+               if (parll)
                {
-                       cat(paste("--- Compute synchrones for indices range ",
-                               min(inds)," -> ",max(inds),"\n", sep=""))
+                       parallel::mclapply(indices_workers, computeSynchronesChunk,
+                               mc.cores=ncores_clust, mc.allow.recursive=FALSE)
                }
-               if (parll)
-                       ignored <- parallel::mcparallel(computeSynchronesChunk(inds))
                else
-                       computeSynchronesChunk(inds)
-       }
-       if (parll)
-               parallel::mccollect()
+                       lapply(indices_workers, computeSynchronesChunk)
 
        mat_syncs = matrix(nrow=K, ncol=ncol(medoids))
        vec_count = rep(NA, K)
@@ -184,14 +180,16 @@ computeWerDists = function(synchrones, ncores_clust=1,verbose=FALSE,parll=TRUE)
                ts.cwt = totts.cwt[,s0log:(s0log+noctave*nvoice)]
                #Normalization
                sqs <- sqrt(2^(0:(noctave*nvoice)/nvoice)*s0)
-               sqres <- sweep(ts.cwt,MARGIN=2,sqs,'*')
+               sqres <- sweep(ts.cwt,2,sqs,'*')
                sqres / max(Mod(sqres))
        }
 
        if (parll)
        {
                cl = parallel::makeCluster(ncores_clust)
-               parallel::clusterExport(cl, varlist=c("getContribs","K1","verbose"), envir=environment())
+               parallel::clusterExport(cl,
+                       varlist=c("synchrones","totnoct","nvoice","w0","s0log","noctave","s0","verbose"),
+                       envir=environment())
        }
 
        # (normalized) observations node with CWT
@@ -235,18 +233,16 @@ computeWerDists = function(synchrones, ncores_clust=1,verbose=FALSE,parll=TRUE)
        if (parll)
                m <- synchronicity::boost.mutex()
 
-       for (i in 1:(n-1))
-       {
+       ignored <-
                if (parll)
-                       ignored <- parallel::mcparallel(computeDistancesLineI(i))
+               {
+                       parallel::mclapply(seq_len(n-1), computeDistancesLineI,
+                               mc.cores=ncores_clust, mc.allow.recursive=FALSE)
+               }
                else
-                       computeDistancesLineI(i)
-       }
+                       lapply(seq_len(n-1), computeDistancesLineI)
        Xwer_dist[n,n] = 0.
 
-       if (parll)
-               parallel::mccollect()
-
        mat_dists = matrix(nrow=n, ncol=n)
        #TODO: avoid this loop?
        for (i in 1:n)