use Rcpp; ongoing debug for parallel synchrones computation
authorBenjamin Auder <benjamin.auder@somewhere>
Thu, 9 Mar 2017 03:25:51 +0000 (04:25 +0100)
committerBenjamin Auder <benjamin.auder@somewhere>
Thu, 9 Mar 2017 03:25:51 +0000 (04:25 +0100)
.gitignore
epclust/DESCRIPTION
epclust/R/a_NAMESPACE.R
epclust/R/clustering.R
epclust/R/main.R
epclust/TOTO [new file with mode: 0644]
epclust/src/computeMedoidsIndices.c [deleted file]
epclust/src/computeMedoidsIndices.cpp [new file with mode: 0644]
epclust/src/filter.c [deleted file]
epclust/src/filter.cpp [new file with mode: 0644]

index 3a326ea..c946480 100644 (file)
@@ -36,3 +36,7 @@
 #ignore object files
 *.o
 *.so
+
+#ignore RcppExports, generated by Rcpp::compileAttributes
+/epclust/R/RcppExports.R
+/epclust/src/RcppExports.cpp
index 1f2b5ea..300f86e 100644 (file)
@@ -18,19 +18,25 @@ Imports:
     parallel,
     cluster,
     wavelets,
-               bigmemory,
-               Rwave
+    bigmemory,
+    Rwave,
+    Rcpp
+LinkingTo:
+    Rcpp,
+    BH,
+    bigmemory
 Suggests:
     synchronicity,
     devtools,
     testthat,
-               MASS,
-               clue,
+    MASS,
+    clue,
     wmtsa,
     DBI
 License: MIT + file LICENSE
 RoxygenNote: 6.0.1
 Collate: 
+    'RcppExports.R'
     'main.R'
     'clustering.R'
     'de_serialize.R'
index e9aa830..8407d23 100644 (file)
@@ -4,6 +4,7 @@
 #'
 #' @useDynLib epclust
 #'
+#' @importFrom Rcpp evalCpp sourceCpp
 #' @importFrom Rwave cwt
 #' @importFrom cluster pam
 #' @importFrom parallel makeCluster clusterExport parLapply stopCluster
index 0d37c24..4d43b2b 100644 (file)
@@ -123,17 +123,46 @@ computeSynchrones = function(medoids, getRefSeries,
        {
                ref_series = getRefSeries(indices)
                nb_series = nrow(ref_series)
-               #get medoids indices for this chunk of series
 
-               #TODO: debug this (address is OK but values are garbage: why?)
-#               mi = .Call("computeMedoidsIndices", medoids@address, ref_series, PACKAGE="epclust")
+               if (parll)
+               {
+                       require("bigmemory", quietly=TRUE)
+                       require("synchronicity", quietly=TRUE)
+                       require("epclust", quietly=TRUE)
+                       synchrones <- bigmemory::attach.big.matrix(synchrones_desc)
+                       medoids <- bigmemory::attach.big.matrix(medoids_desc)
+                       m <- synchronicity::attach.mutex(m_desc)
+               }
+
+
 
-               #R-equivalent, requiring a matrix (thus potentially breaking "fit-in-memory" hope)
-               mat_meds = medoids[,]
-               mi = rep(NA,nb_series)
-               for (i in 1:nb_series)
-                       mi[i] <- which.min( rowSums( sweep(mat_meds, 2, ref_series[i,], '-')^2 ) )
-               rm(mat_meds); gc()
+#TODO: use dbs(),
+               #https://www.r-bloggers.com/debugging-parallel-code-with-dbs/
+               #http://gforge.se/2015/02/how-to-go-parallel-in-r-basics-tips/
+
+#OK ::
+#write(length(indices), file="TOTO")
+#write( computeMedoidsIndices(medoids@address, getRefSeries(indices[1:600])), file="TOTO")
+#stop()
+
+#              write(indices, file="TOTO", ncolumns=10, append=TRUE)
+#write("medoids", file = "TOTO", ncolumns=1, append=TRUE)
+#write(medoids[1,1:3], file = "TOTO", ncolumns=1, append=TRUE)
+#write("synchrones", file = "TOTO", ncolumns=1, append=TRUE)
+#write(synchrones[1,1:3], file = "TOTO", ncolumns=1, append=TRUE)
+
+#NOT OK :: (should just be "ref_series") ...or yes ? race problems mutex then ? ?!
+               #get medoids indices for this chunk of series
+               mi = computeMedoidsIndices(medoids@address, getRefSeries(indices[1:600]))  #ref_series)
+write("MI ::::", file = "TOTO", ncolumns=1, append=TRUE)
+write(mi[1:3], file = "TOTO", ncolumns=1, append=TRUE)
+
+#              #R-equivalent, requiring a matrix (thus potentially breaking "fit-in-memory" hope)
+#              mat_meds = medoids[,]
+#              mi = rep(NA,nb_series)
+#              for (i in 1:nb_series)
+#                      mi[i] <- which.min( rowSums( sweep(mat_meds, 2, ref_series[i,], '-')^2 ) )
+#              rm(mat_meds); gc()
 
                for (i in seq_len(nb_series))
                {
@@ -155,18 +184,19 @@ computeSynchrones = function(medoids, getRefSeries,
        parll = (requireNamespace("synchronicity",quietly=TRUE)
                && parll && Sys.info()['sysname'] != "Windows")
        if (parll)
+       {
                m <- synchronicity::boost.mutex()
+               m_desc <- synchronicity::describe(m)
+               synchrones_desc = bigmemory::describe(synchrones)
+               medoids_desc = bigmemory::describe(medoids)
 
-       if (parll)
-       {
                cl = parallel::makeCluster(ncores_clust)
                parallel::clusterExport(cl,
-                       varlist=c("synchrones","counts","verbose","medoids","getRefSeries"),
+                       varlist=c("synchrones_desc","counts","verbose","m_desc","medoids_desc","getRefSeries"),
                        envir=environment())
        }
 
        indices_workers = .spreadIndices(seq_len(nb_ref_curves), nb_series_per_chunk)
-#browser()
        ignored <-
                if (parll)
                        parallel::parLapply(cl, indices_workers, computeSynchronesChunk)
@@ -233,28 +263,33 @@ computeWerDists = function(synchrones, ncores_clust=1,verbose=FALSE,parll=TRUE)
                pairs = c(pairs, lapply(V, function(v) c(i,v)))
        }
 
-       computeCWT = function(i)
-       {
-               ts <- scale(ts(synchrones[i,]), center=TRUE, scale=scaled)
-               totts.cwt = Rwave::cwt(ts, totnoct, nvoice, w0, plot=FALSE)
-               ts.cwt = totts.cwt[,s0log:(s0log+noctave*nvoice)]
-               #Normalization
-               sqs <- sqrt(2^(0:(noctave*nvoice)/nvoice)*s0)
-               sqres <- sweep(ts.cwt,2,sqs,'*')
-               sqres / max(Mod(sqres))
-       }
-
        # Distance between rows i and j
        computeDistancesIJ = function(pair)
        {
+               require("bigmemory", quietly=TRUE)
+               require("epclust", quietly=TRUE)
+               synchrones <- bigmemory::attach.big.matrix(synchrones_desc)
+               Xwer_dist <- bigmemory::attach.big.matrix(Xwer_dist_desc)
+       
+               computeCWT = function(i)
+               {
+                       ts <- scale(ts(synchrones[i,]), center=TRUE, scale=scaled)
+                       totts.cwt = Rwave::cwt(ts, totnoct, nvoice, w0, plot=FALSE)
+                       ts.cwt = totts.cwt[,s0log:(s0log+noctave*nvoice)]
+                       #Normalization
+                       sqs <- sqrt(2^(0:(noctave*nvoice)/nvoice)*s0)
+                       sqres <- sweep(ts.cwt,2,sqs,'*')
+                       sqres / max(Mod(sqres))
+               }
+
                i = pair[1] ; j = pair[2]
                if (verbose && j==i+1)
                        cat(paste("   Distances (",i,",",j,"), (",i,",",j+1,") ...\n", sep=""))
                cwt_i = computeCWT(i)
                cwt_j = computeCWT(j)
-               num <- .Call("filter", Mod(cwt_i * Conj(cwt_j)), PACKAGE="epclust")
-               WX  <- .Call("filter", Mod(cwt_i * Conj(cwt_i)), PACKAGE="epclust")
-               WY  <- .Call("filter", Mod(cwt_j * Conj(cwt_j)), PACKAGE="epclust")
+               num <- epclustFilter(Mod(cwt_i * Conj(cwt_j)))
+               WX  <- epclustFilter(Mod(cwt_i * Conj(cwt_i)))
+               WY  <- epclustFilter(Mod(cwt_j * Conj(cwt_j)))
                wer2 <- sum(colSums(num)^2) / sum(colSums(WX) * colSums(WY))
                Xwer_dist[i,j] <- sqrt(delta * ncol(cwt_i) * (1 - wer2))
                Xwer_dist[j,i] <- Xwer_dist[i,j]
@@ -264,9 +299,11 @@ computeWerDists = function(synchrones, ncores_clust=1,verbose=FALSE,parll=TRUE)
        if (parll)
        {
                cl = parallel::makeCluster(ncores_clust)
-               parallel::clusterExport(cl,
-                       varlist=c("synchrones","totnoct","nvoice","w0","s0log","noctave","s0","verbose"),
-                       envir=environment())
+               synchrones_desc <- bigmemory::describe(synchrones)
+               Xwer_dist_desc_desc <- bigmemory::describe(Xwer_dist)
+
+               parallel::clusterExport(cl, varlist=c("synchrones_desc","Xwer_dist_desc","totnoct",
+                       "nvoice","w0","s0log","noctave","s0","verbose"), envir=environment())
        }
 
        ignored <-
index 977e61b..9ba23ae 100644 (file)
@@ -170,6 +170,7 @@ claws = function(getSeries, K1, K2,
                        inds, getContribs, K1, nb_series_per_chunk, ncores_clust, verbose, parll)
                if (WER=="mix")
                {
+                       require("bigmemory", quietly=TRUE)
                        medoids1 = bigmemory::as.big.matrix( getSeries(indices_medoids) )
                        medoids2 = clusteringTask2(medoids1,
                                K2, getSeries, nb_curves, nb_series_per_chunk, ncores_clust, verbose, parll)
diff --git a/epclust/TOTO b/epclust/TOTO
new file mode 100644 (file)
index 0000000..c6836a6
--- /dev/null
@@ -0,0 +1,16 @@
+MI ::::
+14
+52
+55
+MI ::::
+56
+21
+47
+MI ::::
+45
+41
+5
+MI ::::
+48
+58
+28
diff --git a/epclust/src/computeMedoidsIndices.c b/epclust/src/computeMedoidsIndices.c
deleted file mode 100644 (file)
index 98a0111..0000000
+++ /dev/null
@@ -1,51 +0,0 @@
-#include <stdlib.h>
-#include <float.h>
-#include <R.h>
-#include <Rinternals.h>
-#include <Rmath.h>
-
-#include <stdio.h>
-
-// (K,L): dim(medoids)
-// mi: medoids indices
-SEXP computeMedoidsIndices(SEXP medoids_, SEXP ref_series_)
-{
-       double *medoids = (double*) R_ExternalPtrAddr(medoids_),
-               *ref_series = REAL(ref_series_);
-       int nb_series = INTEGER(getAttrib(ref_series_, R_DimSymbol))[0],
-               K = INTEGER(getAttrib(medoids_, R_DimSymbol))[0],
-               L = INTEGER(getAttrib(ref_series_, R_DimSymbol))[1],
-               *mi = (int*)malloc(nb_series*sizeof(int));
-
-       //TODO: debug this: medoids have same addresses on both sides, but this side fails
-       printf("MED: %x\n",medoids);
-
-       for (int i=0; i<nb_series ; i++)
-       {
-               // mi[i] <- which.min( rowSums( sweep(medoids, 2, ref_series[i,], '-')^2 ) )
-               mi[i] = 0;
-               double best_dist = DBL_MAX;
-               for (int j=0; j<K; j++)
-               {
-                       double dist_ij = 0.;
-                       for (int k=0; k<L; k++)
-                       {
-                               double delta = ref_series[k*K+i] - medoids[k*K+j];
-                               dist_ij += delta * delta;
-                       }
-                       if (dist_ij < best_dist)
-                       {
-                               mi[i] = j+1; //R indices start at 1
-                               best_dist = dist_ij;
-                       }
-               }
-       }
-
-       SEXP R_mi;
-       PROTECT(R_mi = allocVector(INTSXP, nb_series));
-       for (int i = 0; i < nb_series; i++)
-               INTEGER(R_mi)[i] = mi[i];
-       UNPROTECT(1);
-       free(mi);
-       return R_mi;
-}
diff --git a/epclust/src/computeMedoidsIndices.cpp b/epclust/src/computeMedoidsIndices.cpp
new file mode 100644 (file)
index 0000000..6934181
--- /dev/null
@@ -0,0 +1,51 @@
+#include <Rcpp.h>
+
+// [[Rcpp::depends(BH, bigmemory)]]
+#include <bigmemory/MatrixAccessor.hpp>
+
+#include <numeric>
+#include <cfloat>
+
+using namespace Rcpp;
+
+//' computeMedoidsIndices
+//'
+//' Compute medoids indices
+//'
+//' @param pMedoids External pointer
+//' @param ref_series reference series
+//'
+//' @return A map serie number -> medoid index
+// [[Rcpp::export]]
+IntegerVector computeMedoidsIndices(SEXP pMedoids, NumericMatrix ref_series)
+{
+       XPtr<BigMatrix> pMed(pMedoids);
+       MatrixAccessor<double> medoids = MatrixAccessor<double>(*pMed);
+       int nb_series = ref_series.nrow(),
+               K = pMed->nrow(),
+               L = pMed->ncol();
+       IntegerVector mi(nb_series);
+
+       for (int i=0; i<nb_series ; i++)
+       {
+               // mi[i] <- which.min( rowSums( sweep(medoids, 2, ref_series[i,], '-')^2 ) )
+               mi[i] = 0;
+               double best_dist = DBL_MAX;
+               for (int j=0; j<K; j++)
+               {
+                       double dist_ij = 0.;
+                       for (int k=0; k<L; k++)
+                       {
+                               double delta = ref_series(i,k) - *(medoids[k]+j);
+                               dist_ij += delta * delta;
+                       }
+                       if (dist_ij < best_dist)
+                       {
+                               mi[i] = j+1; //R indices start at 1
+                               best_dist = dist_ij;
+                       }
+               }
+       }
+
+       return mi;
+}
diff --git a/epclust/src/filter.c b/epclust/src/filter.c
deleted file mode 100644 (file)
index 382910a..0000000
+++ /dev/null
@@ -1,37 +0,0 @@
-#include <stdlib.h>
-#include <R.h>
-#include <Rinternals.h>
-#include <Rmath.h>
-
-#include <stdio.h>
-
-SEXP filter(SEXP cwt_)
-{
-       int L = INTEGER(getAttrib(cwt_, R_DimSymbol))[0],
-               D = INTEGER(getAttrib(cwt_, R_DimSymbol))[1];
-       double *cwt = REAL(cwt_);
-       SEXP R_fcwt;
-       PROTECT(R_fcwt = allocMatrix(REALSXP, L, D));
-       double* fcwt = REAL(R_fcwt);
-
-       //TODO: coding style is terrible... no time for now.
-       for (int col=0; col<D; col++)
-       {
-               double v1 = cwt[0];
-               double ma = v1 + cwt[1] + cwt[2];
-               for (int i=1; i<L-2; i++)
-               {
-                       fcwt[i] = ma / 3.;
-                       ma = ma - v1 + cwt[i+2];
-                       v1 = cwt[i];
-               }
-               fcwt[0] = cwt[0];
-               fcwt[L-2] = ma / 3.;
-               fcwt[L-1] = cwt[L-1];
-               cwt = cwt + L;
-               fcwt = fcwt + L;
-       }
-
-       UNPROTECT(1);
-       return R_fcwt;
-}
diff --git a/epclust/src/filter.cpp b/epclust/src/filter.cpp
new file mode 100644 (file)
index 0000000..ee24af6
--- /dev/null
@@ -0,0 +1,40 @@
+#include <Rcpp.h>
+
+using namespace Rcpp;
+
+//' filter
+//'
+//' Filter time-series
+//'
+//' @param cwt Continuous wavelets transform
+//'
+//' @return The filtered CWT
+// [[Rcpp::export]]
+NumericMatrix epclustFilter(NumericMatrix cwt)
+{
+       int L = cwt.nrow(),
+               D = cwt.ncol();
+       NumericMatrix fcwt(L, D); //fill with 0... TODO: back to SEXP C-style?
+       double *cwt_c = cwt.begin(),
+               *fcwt_c = fcwt.begin();
+
+       //TODO: coding style is terrible... no time for now.
+       for (int col=0; col<D; col++)
+       {
+               double v1 = cwt_c[0];
+               double ma = v1 + cwt[1] + cwt_c[2];
+               for (int i=1; i<L-2; i++)
+               {
+                       fcwt_c[i] = ma / 3.;
+                       ma = ma - v1 + cwt_c[i+2];
+                       v1 = cwt_c[i];
+               }
+               fcwt_c[0] = cwt_c[0];
+               fcwt_c[L-2] = ma / 3.;
+               fcwt_c[L-1] = cwt_c[L-1];
+               cwt_c = cwt_c + L;
+               fcwt_c = fcwt_c + L;
+       }
+
+       return fcwt;
+}