From 363ae13430cdee6ba76b42b7316aa4b292b04d93 Mon Sep 17 00:00:00 2001
From: Benjamin Auder <benjamin.auder@somewhere>
Date: Thu, 9 Mar 2017 04:25:51 +0100
Subject: [PATCH] use Rcpp; ongoing debug for parallel synchrones computation

---
 .gitignore                            |  4 ++
 epclust/DESCRIPTION                   | 14 ++--
 epclust/R/a_NAMESPACE.R               |  1 +
 epclust/R/clustering.R                | 97 ++++++++++++++++++---------
 epclust/R/main.R                      |  1 +
 epclust/TOTO                          | 16 +++++
 epclust/src/computeMedoidsIndices.c   | 51 --------------
 epclust/src/computeMedoidsIndices.cpp | 51 ++++++++++++++
 epclust/src/filter.c                  | 37 ----------
 epclust/src/filter.cpp                | 40 +++++++++++
 10 files changed, 190 insertions(+), 122 deletions(-)
 create mode 100644 epclust/TOTO
 delete mode 100644 epclust/src/computeMedoidsIndices.c
 create mode 100644 epclust/src/computeMedoidsIndices.cpp
 delete mode 100644 epclust/src/filter.c
 create mode 100644 epclust/src/filter.cpp

diff --git a/.gitignore b/.gitignore
index 3a326ea..c946480 100644
--- a/.gitignore
+++ b/.gitignore
@@ -36,3 +36,7 @@
 #ignore object files
 *.o
 *.so
+
+#ignore RcppExports, generated by Rcpp::compileAttributes
+/epclust/R/RcppExports.R
+/epclust/src/RcppExports.cpp
diff --git a/epclust/DESCRIPTION b/epclust/DESCRIPTION
index 1f2b5ea..300f86e 100644
--- a/epclust/DESCRIPTION
+++ b/epclust/DESCRIPTION
@@ -18,19 +18,25 @@ Imports:
     parallel,
     cluster,
     wavelets,
-		bigmemory,
-		Rwave
+    bigmemory,
+    Rwave,
+    Rcpp
+LinkingTo:
+    Rcpp,
+    BH,
+    bigmemory
 Suggests:
     synchronicity,
     devtools,
     testthat,
-		MASS,
-		clue,
+    MASS,
+    clue,
     wmtsa,
     DBI
 License: MIT + file LICENSE
 RoxygenNote: 6.0.1
 Collate: 
+    'RcppExports.R'
     'main.R'
     'clustering.R'
     'de_serialize.R'
diff --git a/epclust/R/a_NAMESPACE.R b/epclust/R/a_NAMESPACE.R
index e9aa830..8407d23 100644
--- a/epclust/R/a_NAMESPACE.R
+++ b/epclust/R/a_NAMESPACE.R
@@ -4,6 +4,7 @@
 #'
 #' @useDynLib epclust
 #'
+#' @importFrom Rcpp evalCpp sourceCpp
 #' @importFrom Rwave cwt
 #' @importFrom cluster pam
 #' @importFrom parallel makeCluster clusterExport parLapply stopCluster
diff --git a/epclust/R/clustering.R b/epclust/R/clustering.R
index 0d37c24..4d43b2b 100644
--- a/epclust/R/clustering.R
+++ b/epclust/R/clustering.R
@@ -123,17 +123,46 @@ computeSynchrones = function(medoids, getRefSeries,
 	{
 		ref_series = getRefSeries(indices)
 		nb_series = nrow(ref_series)
-		#get medoids indices for this chunk of series
 
-		#TODO: debug this (address is OK but values are garbage: why?)
-#		 mi = .Call("computeMedoidsIndices", medoids@address, ref_series, PACKAGE="epclust")
+		if (parll)
+		{
+			require("bigmemory", quietly=TRUE)
+			require("synchronicity", quietly=TRUE)
+			require("epclust", quietly=TRUE)
+			synchrones <- bigmemory::attach.big.matrix(synchrones_desc)
+			medoids <- bigmemory::attach.big.matrix(medoids_desc)
+			m <- synchronicity::attach.mutex(m_desc)
+		}
+
+
 
-		#R-equivalent, requiring a matrix (thus potentially breaking "fit-in-memory" hope)
-		mat_meds = medoids[,]
-		mi = rep(NA,nb_series)
-		for (i in 1:nb_series)
-			mi[i] <- which.min( rowSums( sweep(mat_meds, 2, ref_series[i,], '-')^2 ) )
-		rm(mat_meds); gc()
+#TODO: use dbs(),
+		#https://www.r-bloggers.com/debugging-parallel-code-with-dbs/
+		#http://gforge.se/2015/02/how-to-go-parallel-in-r-basics-tips/
+
+#OK ::
+#write(length(indices), file="TOTO")
+#write( computeMedoidsIndices(medoids@address, getRefSeries(indices[1:600])), file="TOTO")
+#stop()
+
+#		write(indices, file="TOTO", ncolumns=10, append=TRUE)
+#write("medoids", file = "TOTO", ncolumns=1, append=TRUE)
+#write(medoids[1,1:3], file = "TOTO", ncolumns=1, append=TRUE)
+#write("synchrones", file = "TOTO", ncolumns=1, append=TRUE)
+#write(synchrones[1,1:3], file = "TOTO", ncolumns=1, append=TRUE)
+
+#NOT OK :: (should just be "ref_series") ...or yes ? race problems mutex then ? ?!
+		#get medoids indices for this chunk of series
+		mi = computeMedoidsIndices(medoids@address, getRefSeries(indices[1:600]))  #ref_series)
+write("MI ::::", file = "TOTO", ncolumns=1, append=TRUE)
+write(mi[1:3], file = "TOTO", ncolumns=1, append=TRUE)
+
+#		#R-equivalent, requiring a matrix (thus potentially breaking "fit-in-memory" hope)
+#		mat_meds = medoids[,]
+#		mi = rep(NA,nb_series)
+#		for (i in 1:nb_series)
+#			mi[i] <- which.min( rowSums( sweep(mat_meds, 2, ref_series[i,], '-')^2 ) )
+#		rm(mat_meds); gc()
 
 		for (i in seq_len(nb_series))
 		{
@@ -155,18 +184,19 @@ computeSynchrones = function(medoids, getRefSeries,
 	parll = (requireNamespace("synchronicity",quietly=TRUE)
 		&& parll && Sys.info()['sysname'] != "Windows")
 	if (parll)
+	{
 		m <- synchronicity::boost.mutex()
+		m_desc <- synchronicity::describe(m)
+		synchrones_desc = bigmemory::describe(synchrones)
+		medoids_desc = bigmemory::describe(medoids)
 
-	if (parll)
-	{
 		cl = parallel::makeCluster(ncores_clust)
 		parallel::clusterExport(cl,
-			varlist=c("synchrones","counts","verbose","medoids","getRefSeries"),
+			varlist=c("synchrones_desc","counts","verbose","m_desc","medoids_desc","getRefSeries"),
 			envir=environment())
 	}
 
 	indices_workers = .spreadIndices(seq_len(nb_ref_curves), nb_series_per_chunk)
-#browser()
 	ignored <-
 		if (parll)
 			parallel::parLapply(cl, indices_workers, computeSynchronesChunk)
@@ -233,28 +263,33 @@ computeWerDists = function(synchrones, ncores_clust=1,verbose=FALSE,parll=TRUE)
 		pairs = c(pairs, lapply(V, function(v) c(i,v)))
 	}
 
-	computeCWT = function(i)
-	{
-		ts <- scale(ts(synchrones[i,]), center=TRUE, scale=scaled)
-		totts.cwt = Rwave::cwt(ts, totnoct, nvoice, w0, plot=FALSE)
-		ts.cwt = totts.cwt[,s0log:(s0log+noctave*nvoice)]
-		#Normalization
-		sqs <- sqrt(2^(0:(noctave*nvoice)/nvoice)*s0)
-		sqres <- sweep(ts.cwt,2,sqs,'*')
-		sqres / max(Mod(sqres))
-	}
-
 	# Distance between rows i and j
 	computeDistancesIJ = function(pair)
 	{
+		require("bigmemory", quietly=TRUE)
+		require("epclust", quietly=TRUE)
+		synchrones <- bigmemory::attach.big.matrix(synchrones_desc)
+		Xwer_dist <- bigmemory::attach.big.matrix(Xwer_dist_desc)
+	
+		computeCWT = function(i)
+		{
+			ts <- scale(ts(synchrones[i,]), center=TRUE, scale=scaled)
+			totts.cwt = Rwave::cwt(ts, totnoct, nvoice, w0, plot=FALSE)
+			ts.cwt = totts.cwt[,s0log:(s0log+noctave*nvoice)]
+			#Normalization
+			sqs <- sqrt(2^(0:(noctave*nvoice)/nvoice)*s0)
+			sqres <- sweep(ts.cwt,2,sqs,'*')
+			sqres / max(Mod(sqres))
+		}
+
 		i = pair[1] ; j = pair[2]
 		if (verbose && j==i+1)
 			cat(paste("   Distances (",i,",",j,"), (",i,",",j+1,") ...\n", sep=""))
 		cwt_i = computeCWT(i)
 		cwt_j = computeCWT(j)
-		num <- .Call("filter", Mod(cwt_i * Conj(cwt_j)), PACKAGE="epclust")
-		WX  <- .Call("filter", Mod(cwt_i * Conj(cwt_i)), PACKAGE="epclust")
-		WY  <- .Call("filter", Mod(cwt_j * Conj(cwt_j)), PACKAGE="epclust")
+		num <- epclustFilter(Mod(cwt_i * Conj(cwt_j)))
+		WX  <- epclustFilter(Mod(cwt_i * Conj(cwt_i)))
+		WY  <- epclustFilter(Mod(cwt_j * Conj(cwt_j)))
 		wer2 <- sum(colSums(num)^2) / sum(colSums(WX) * colSums(WY))
 		Xwer_dist[i,j] <- sqrt(delta * ncol(cwt_i) * (1 - wer2))
 		Xwer_dist[j,i] <- Xwer_dist[i,j]
@@ -264,9 +299,11 @@ computeWerDists = function(synchrones, ncores_clust=1,verbose=FALSE,parll=TRUE)
 	if (parll)
 	{
 		cl = parallel::makeCluster(ncores_clust)
-		parallel::clusterExport(cl,
-			varlist=c("synchrones","totnoct","nvoice","w0","s0log","noctave","s0","verbose"),
-			envir=environment())
+		synchrones_desc <- bigmemory::describe(synchrones)
+		Xwer_dist_desc_desc <- bigmemory::describe(Xwer_dist)
+
+		parallel::clusterExport(cl, varlist=c("synchrones_desc","Xwer_dist_desc","totnoct",
+			"nvoice","w0","s0log","noctave","s0","verbose"), envir=environment())
 	}
 
 	ignored <-
diff --git a/epclust/R/main.R b/epclust/R/main.R
index 977e61b..9ba23ae 100644
--- a/epclust/R/main.R
+++ b/epclust/R/main.R
@@ -170,6 +170,7 @@ claws = function(getSeries, K1, K2,
 			inds, getContribs, K1, nb_series_per_chunk, ncores_clust, verbose, parll)
 		if (WER=="mix")
 		{
+			require("bigmemory", quietly=TRUE)
 			medoids1 = bigmemory::as.big.matrix( getSeries(indices_medoids) )
 			medoids2 = clusteringTask2(medoids1,
 				K2, getSeries, nb_curves, nb_series_per_chunk, ncores_clust, verbose, parll)
diff --git a/epclust/TOTO b/epclust/TOTO
new file mode 100644
index 0000000..c6836a6
--- /dev/null
+++ b/epclust/TOTO
@@ -0,0 +1,16 @@
+MI ::::
+14
+52
+55
+MI ::::
+56
+21
+47
+MI ::::
+45
+41
+5
+MI ::::
+48
+58
+28
diff --git a/epclust/src/computeMedoidsIndices.c b/epclust/src/computeMedoidsIndices.c
deleted file mode 100644
index 98a0111..0000000
--- a/epclust/src/computeMedoidsIndices.c
+++ /dev/null
@@ -1,51 +0,0 @@
-#include <stdlib.h>
-#include <float.h>
-#include <R.h>
-#include <Rinternals.h>
-#include <Rmath.h>
-
-#include <stdio.h>
-
-// (K,L): dim(medoids)
-// mi: medoids indices
-SEXP computeMedoidsIndices(SEXP medoids_, SEXP ref_series_)
-{
-	double *medoids = (double*) R_ExternalPtrAddr(medoids_),
-		*ref_series = REAL(ref_series_);
-	int nb_series = INTEGER(getAttrib(ref_series_, R_DimSymbol))[0],
-		K = INTEGER(getAttrib(medoids_, R_DimSymbol))[0],
-		L = INTEGER(getAttrib(ref_series_, R_DimSymbol))[1],
-		*mi = (int*)malloc(nb_series*sizeof(int));
-
-	//TODO: debug this: medoids have same addresses on both sides, but this side fails
-	printf("MED: %x\n",medoids);
-
-	for (int i=0; i<nb_series ; i++)
-	{
-		// mi[i] <- which.min( rowSums( sweep(medoids, 2, ref_series[i,], '-')^2 ) )
-		mi[i] = 0;
-		double best_dist = DBL_MAX;
-		for (int j=0; j<K; j++)
-		{
-			double dist_ij = 0.;
-			for (int k=0; k<L; k++)
-			{
-				double delta = ref_series[k*K+i] - medoids[k*K+j];
-				dist_ij += delta * delta;
-			}
-			if (dist_ij < best_dist)
-			{
-				mi[i] = j+1; //R indices start at 1
-				best_dist = dist_ij;
-			}
-		}
-	}
-
-	SEXP R_mi;
-	PROTECT(R_mi = allocVector(INTSXP, nb_series));
-	for (int i = 0; i < nb_series; i++)
-		INTEGER(R_mi)[i] = mi[i];
-	UNPROTECT(1);
-	free(mi);
-	return R_mi;
-}
diff --git a/epclust/src/computeMedoidsIndices.cpp b/epclust/src/computeMedoidsIndices.cpp
new file mode 100644
index 0000000..6934181
--- /dev/null
+++ b/epclust/src/computeMedoidsIndices.cpp
@@ -0,0 +1,51 @@
+#include <Rcpp.h>
+
+// [[Rcpp::depends(BH, bigmemory)]]
+#include <bigmemory/MatrixAccessor.hpp>
+
+#include <numeric>
+#include <cfloat>
+
+using namespace Rcpp;
+
+//' computeMedoidsIndices
+//'
+//' Compute medoids indices
+//'
+//' @param pMedoids External pointer
+//' @param ref_series reference series
+//'
+//' @return A map serie number -> medoid index
+// [[Rcpp::export]]
+IntegerVector computeMedoidsIndices(SEXP pMedoids, NumericMatrix ref_series)
+{
+	XPtr<BigMatrix> pMed(pMedoids);
+	MatrixAccessor<double> medoids = MatrixAccessor<double>(*pMed);
+	int nb_series = ref_series.nrow(),
+		K = pMed->nrow(),
+		L = pMed->ncol();
+	IntegerVector mi(nb_series);
+
+	for (int i=0; i<nb_series ; i++)
+	{
+		// mi[i] <- which.min( rowSums( sweep(medoids, 2, ref_series[i,], '-')^2 ) )
+		mi[i] = 0;
+		double best_dist = DBL_MAX;
+		for (int j=0; j<K; j++)
+		{
+			double dist_ij = 0.;
+			for (int k=0; k<L; k++)
+			{
+				double delta = ref_series(i,k) - *(medoids[k]+j);
+				dist_ij += delta * delta;
+			}
+			if (dist_ij < best_dist)
+			{
+				mi[i] = j+1; //R indices start at 1
+				best_dist = dist_ij;
+			}
+		}
+	}
+
+	return mi;
+}
diff --git a/epclust/src/filter.c b/epclust/src/filter.c
deleted file mode 100644
index 382910a..0000000
--- a/epclust/src/filter.c
+++ /dev/null
@@ -1,37 +0,0 @@
-#include <stdlib.h>
-#include <R.h>
-#include <Rinternals.h>
-#include <Rmath.h>
-
-#include <stdio.h>
-
-SEXP filter(SEXP cwt_)
-{
-	int L = INTEGER(getAttrib(cwt_, R_DimSymbol))[0],
-		D = INTEGER(getAttrib(cwt_, R_DimSymbol))[1];
-	double *cwt = REAL(cwt_);
-	SEXP R_fcwt;
-	PROTECT(R_fcwt = allocMatrix(REALSXP, L, D));
-	double* fcwt = REAL(R_fcwt);
-
-	//TODO: coding style is terrible... no time for now.
-	for (int col=0; col<D; col++)
-	{
-		double v1 = cwt[0];
-		double ma = v1 + cwt[1] + cwt[2];
-		for (int i=1; i<L-2; i++)
-		{
-			fcwt[i] = ma / 3.;
-			ma = ma - v1 + cwt[i+2];
-			v1 = cwt[i];
-		}
-		fcwt[0] = cwt[0];
-		fcwt[L-2] = ma / 3.;
-		fcwt[L-1] = cwt[L-1];
-		cwt = cwt + L;
-		fcwt = fcwt + L;
-	}
-
-	UNPROTECT(1);
-	return R_fcwt;
-}
diff --git a/epclust/src/filter.cpp b/epclust/src/filter.cpp
new file mode 100644
index 0000000..ee24af6
--- /dev/null
+++ b/epclust/src/filter.cpp
@@ -0,0 +1,40 @@
+#include <Rcpp.h>
+
+using namespace Rcpp;
+
+//' filter
+//'
+//' Filter time-series
+//'
+//' @param cwt Continuous wavelets transform
+//'
+//' @return The filtered CWT
+// [[Rcpp::export]]
+NumericMatrix epclustFilter(NumericMatrix cwt)
+{
+	int L = cwt.nrow(),
+		D = cwt.ncol();
+	NumericMatrix fcwt(L, D); //fill with 0... TODO: back to SEXP C-style?
+	double *cwt_c = cwt.begin(),
+		*fcwt_c = fcwt.begin();
+
+	//TODO: coding style is terrible... no time for now.
+	for (int col=0; col<D; col++)
+	{
+		double v1 = cwt_c[0];
+		double ma = v1 + cwt[1] + cwt_c[2];
+		for (int i=1; i<L-2; i++)
+		{
+			fcwt_c[i] = ma / 3.;
+			ma = ma - v1 + cwt_c[i+2];
+			v1 = cwt_c[i];
+		}
+		fcwt_c[0] = cwt_c[0];
+		fcwt_c[L-2] = ma / 3.;
+		fcwt_c[L-1] = cwt_c[L-1];
+		cwt_c = cwt_c + L;
+		fcwt_c = fcwt_c + L;
+	}
+
+	return fcwt;
+}
-- 
2.44.0