From: Benjamin Auder <benjamin.auder@somewhere>
Date: Wed, 8 Mar 2017 10:52:39 +0000 (+0100)
Subject: add some TODOs
X-Git-Url: https://git.auder.net/variants/Dynamo/%7B%7B%20asset%28%27mixstore/css/scripts/R.css?a=commitdiff_plain;h=24ed5d835e2eebaaa4d5f8296f8d2e2132cc6398;p=epclust.git

add some TODOs
---

diff --git a/epclust/R/clustering.R b/epclust/R/clustering.R
index 9a55495..3993e76 100644
--- a/epclust/R/clustering.R
+++ b/epclust/R/clustering.R
@@ -21,7 +21,7 @@
 #'
 #' @return For \code{clusteringTask1()} and \code{computeClusters1()}, the indices of the
 #'   computed (K1) medoids. Indices are irrelevant for stage 2 clustering, thus
-#'   \code{computeClusters2()} outputs a matrix of medoids
+#'   \code{computeClusters2()} outputs a big.matrix of medoids
 #'   (of size limited by nb_series_per_chunk)
 NULL
 
@@ -73,6 +73,7 @@ computeClusters2 = function(medoids, K2,
 	synchrones = computeSynchrones(medoids,
 		getRefSeries, nb_ref_curves, nb_series_per_chunk, ncores_clust, verbose, parll)
 	distances = computeWerDists(synchrones, ncores_clust, verbose, parll)
+	#TODO: if PAM cannot take big.matrix in input, cast it before... (more than OK in RAM)
 	medoids[ cluster::pam(distances, K2, diss=TRUE)$medoids , ]
 }
 
@@ -81,16 +82,27 @@ computeClusters2 = function(medoids, K2,
 #' Compute the synchrones curves (sum of clusters elements) from a matrix of medoids,
 #' using L2 distances.
 #'
-#' @param medoids Matrix of medoids (curves of same legnth as initial series)
+#' @param medoids big.matrix of medoids (curves of same length as initial series)
 #' @param getRefSeries Function to retrieve initial series (e.g. in stage 2 after series
 #'   have been replaced by stage-1 medoids)
 #' @param nb_ref_curves How many reference series? (This number is known at this stage)
 #' @inheritParams claws
 #'
+#' @return A big.matrix of size K1 x L where L = data_length
+#'
 #' @export
 computeSynchrones = function(medoids, getRefSeries,
 	nb_ref_curves, nb_series_per_chunk, ncores_clust=1,verbose=FALSE,parll=TRUE)
 {
+
+
+
+#TODO: si parll, getMedoids + serialization, pass only getMedoids to nodes
+# --> BOF... chaque node chargera tous les medoids (efficacité) :/ ==> faut que ça tienne en RAM
+#au pire :: C-ifier et charger medoids 1 by 1...
+
+	#MIEUX :: medoids DOIT etre une big.matrix partagée !
+
 	computeSynchronesChunk = function(indices)
 	{
 		if (verbose)
@@ -111,36 +123,43 @@ computeSynchrones = function(medoids, getRefSeries,
 
 	K = nrow(medoids)
 	# Use bigmemory (shared==TRUE by default) + synchronicity to fill synchrones in //
+	# TODO: if size > RAM (not our case), use file-backed big.matrix
 	synchrones = bigmemory::big.matrix(nrow=K,ncol=ncol(medoids),type="double",init=0.)
 	counts = bigmemory::big.matrix(nrow=K,ncol=1,type="double",init=0)
-	# Fork (// run) only on Linux & MacOS; on Windows: run sequentially
+	# synchronicity is only for Linux & MacOS; on Windows: run sequentially
 	parll = (requireNamespace("synchronicity",quietly=TRUE)
 		&& parll && Sys.info()['sysname'] != "Windows")
 	if (parll)
 		m <- synchronicity::boost.mutex()
 
+	if (parll)
+	{
+		cl = parallel::makeCluster(ncores_clust)
+		parallel::clusterExport(cl,
+			varlist=c("synchrones","counts","verbose","medoids","getRefSeries"),
+			envir=environment())
+	}
+
 	indices_workers = .spreadIndices(seq_len(nb_ref_curves), nb_series_per_chunk)
 	ignored <-
 		if (parll)
-		{
-			parallel::mclapply(indices_workers, computeSynchronesChunk,
-				mc.cores=ncores_clust, mc.allow.recursive=FALSE)
-		}
+			parallel::parLapply(indices_workers, computeSynchronesChunk)
 		else
 			lapply(indices_workers, computeSynchronesChunk)
 
-	mat_syncs = matrix(nrow=K, ncol=ncol(medoids))
-	vec_count = rep(NA, K)
-	#TODO: can we avoid this loop?
+	if (parll)
+		parallel::stopCluster(cl)
+
+	#TODO: can we avoid this loop? ( synchrones = sweep(synchrones, 1, counts, '/') )
 	for (i in seq_len(K))
-	{
-		mat_syncs[i,] = synchrones[i,]
-		vec_count[i] = counts[i,1]
-	}
+		synchrones[i,] = synchrones[i,] / counts[i,1]
 	#NOTE: odds for some clusters to be empty? (when series already come from stage 2)
 	#      ...maybe; but let's hope resulting K1' be still quite bigger than K2
-	mat_syncs = sweep(mat_syncs, 1, vec_count, '/')
-	mat_syncs[ sapply(seq_len(K), function(i) all(!is.nan(mat_syncs[i,]))) , ]
+	noNA_rows = sapply(seq_len(K), function(i) all(!is.nan(synchrones[i,])))
+	if (all(noNA_rows))
+		return (synchrones)
+	# Else: some clusters are empty, need to slice synchrones
+	synchrones[noNA_rows,]
 }
 
 #' computeWerDists
@@ -148,13 +167,21 @@ computeSynchrones = function(medoids, getRefSeries,
 #' Compute the WER distances between the synchrones curves (in rows), which are
 #' returned (e.g.) by \code{computeSynchrones()}
 #'
-#' @param synchrones A matrix of synchrones, in rows. The series have same length as the
-#'   series in the initial dataset
+#' @param synchrones A big.matrix of synchrones, in rows. The series have same length
+#'   as the series in the initial dataset
 #' @inheritParams claws
 #'
+#' @return A big.matrix of size K1 x K1
+#'
 #' @export
 computeWerDists = function(synchrones, ncores_clust=1,verbose=FALSE,parll=TRUE)
 {
+
+
+
+#TODO: re-organize to call computeWerDist(x,y) [C] (in //?) from two indices + big.matrix
+
+
 	n <- nrow(synchrones)
 	delta <- ncol(synchrones)
 	#TODO: automatic tune of all these parameters ? (for other users)
@@ -163,7 +190,7 @@ computeWerDists = function(synchrones, ncores_clust=1,verbose=FALSE,parll=TRUE)
 	noctave = 13
 	# 4 here represent 2^5 = 32 half-hours ~ 1 day
 	#NOTE: default scalevector == 2^(0:(noctave * nvoice) / nvoice) * s0 (?)
-	scalevector  <- 2^(4:(noctave * nvoice) / nvoice) * 2
+	scalevector  <- 2^(4:(noctave * nvoice) / nvoice + 1)
 	#condition: ( log2(s0*w0/(2*pi)) - 1 ) * nvoice + 1.5 >= 1
 	s0=2
 	w0=2*pi
@@ -176,7 +203,7 @@ computeWerDists = function(synchrones, ncores_clust=1,verbose=FALSE,parll=TRUE)
 		if (verbose)
 			cat(paste("+++ Compute Rwave::cwt() on serie ",i,"\n", sep=""))
 		ts <- scale(ts(synchrones[i,]), center=TRUE, scale=scaled)
-		totts.cwt = Rwave::cwt(ts,totnoct,nvoice,w0,plot=0)
+		totts.cwt = Rwave::cwt(ts, totnoct, nvoice, w0, plot=FALSE)
 		ts.cwt = totts.cwt[,s0log:(s0log+noctave*nvoice)]
 		#Normalization
 		sqs <- sqrt(2^(0:(noctave*nvoice)/nvoice)*s0)
@@ -192,7 +219,8 @@ computeWerDists = function(synchrones, ncores_clust=1,verbose=FALSE,parll=TRUE)
 			envir=environment())
 	}
 
-	# (normalized) observations node with CWT
+	# list of CWT from synchrones
+	# TODO: fit in RAM, OK? If not, 2 options: serialize, compute individual distances
 	Xcwt4 <-
 		if (parll)
 			parallel::parLapply(cl, seq_len(n), computeCWT)
@@ -207,6 +235,9 @@ computeWerDists = function(synchrones, ncores_clust=1,verbose=FALSE,parll=TRUE)
 	if (verbose)
 		cat("*** Compute WER distances from CWT\n")
 
+	#TODO: computeDistances(i,j), et répartir les n(n-1)/2 couples d'indices
+	#là c'est trop déséquilibré
+
 	computeDistancesLineI = function(i)
 	{
 		if (verbose)
@@ -217,7 +248,7 @@ computeWerDists = function(synchrones, ncores_clust=1,verbose=FALSE,parll=TRUE)
 			num <- filter(Mod(Xcwt4[[i]] * Conj(Xcwt4[[j]])), fcoefs, circular=TRUE)
 			WX <- filter(Mod(Xcwt4[[i]] * Conj(Xcwt4[[i]])), fcoefs, circular=TRUE)
 			WY <- filter(Mod(Xcwt4[[j]] * Conj(Xcwt4[[j]])), fcoefs, circular=TRUE)
-			wer2    <- sum(colSums(num)^2) / sum( sum(colSums(WX) * colSums(WY)) )
+			wer2 <- sum(colSums(num)^2) / sum( sum(colSums(WX) * colSums(WY)) )
 			if (parll)
 				synchronicity::lock(m)
 			Xwer_dist[i,j] <- sqrt(delta * ncol(Xcwt4[[1]]) * (1 - wer2))
@@ -242,12 +273,7 @@ computeWerDists = function(synchrones, ncores_clust=1,verbose=FALSE,parll=TRUE)
 		else
 			lapply(seq_len(n-1), computeDistancesLineI)
 	Xwer_dist[n,n] = 0.
-
-	mat_dists = matrix(nrow=n, ncol=n)
-	#TODO: avoid this loop?
-	for (i in 1:n)
-		mat_dists[i,] = Xwer_dist[i,]
-	mat_dists
+	Xwer_dist
 }
 
 # Helper function to divide indices into balanced sets
diff --git a/epclust/R/main.R b/epclust/R/main.R
index bba0618..f1e435c 100644
--- a/epclust/R/main.R
+++ b/epclust/R/main.R
@@ -169,6 +169,15 @@ claws = function(getSeries, K1, K2,
 			inds, getContribs, K1, nb_series_per_chunk, ncores_clust, verbose, parll)
 		if (WER=="mix")
 		{
+
+
+
+
+#TODO: getSeries(indices_medoids) BAD ; il faudrait une big.matrix de medoids en entree
+			#OK en RAM il y en aura 1000 (donc 1000*K1*17519... OK)
+			#...mais du coup chaque process ne re-dupliquera pas medoids
+
+
 			medoids2 = computeClusters2(getSeries(indices_medoids),
 				K2, getSeries, nb_curves, nb_series_per_chunk, ncores_clust, verbose, parll)
 			binarize(medoids2, synchrones_file, nb_series_per_chunk, sep, nbytes, endian)
diff --git a/epclust/src/WER.c b/epclust/src/WER.c
new file mode 100644
index 0000000..36bfba7
--- /dev/null
+++ b/epclust/src/WER.c
@@ -0,0 +1,117 @@
+#include <stdlib.h>
+#include <math.h>
+#include <stdbool.h>
+
+#ifndef M_PI
+#define M_PI 3.14159265358979323846
+#endif
+
+// n: number of synchrones, m: length of a synchrone
+float computeWerDist(float* s1, float* s2, int n, int m)
+{
+	//TODO: automatic tune of all these parameters ? (for other users)
+	int nvoice = 4;
+	//noctave 2^13 = 8192 half hours ~ 180 days ; ~log2(ncol(synchrones))
+	int noctave = 13
+	// 4 here represent 2^5 = 32 half-hours ~ 1 day
+	//NOTE: default scalevector == 2^(0:(noctave * nvoice) / nvoice) * s0 (?)
+	//R: scalevector <- 2^(4:(noctave * nvoice) / nvoice + 1)
+	int* scalevector = (int*)malloc( (noctave*nvoice-4 + 1) * sizeof(int))
+	for (int i=4; i<=noctave*nvoice; i++)
+		scalevector[i-4] = pow(2., (float)i/nvoice + 1.);
+	//condition: ( log2(s0*w0/(2*pi)) - 1 ) * nvoice + 1.5 >= 1
+	int s0 = 2;
+	double w0 = 2*M_PI;
+	bool scaled = false;
+	int s0log = as.integer( (log2( s0*w0/(2*pi) ) - 1) * nvoice + 1.5 )
+	int totnoct = noctave + as.integer(s0log/nvoice) + 1
+
+
+
+
+
+///TODO: continue
+
+
+
+	computeCWT = function(i)
+	{
+		if (verbose)
+			cat(paste("+++ Compute Rwave::cwt() on serie ",i,"\n", sep=""))
+		ts <- scale(ts(synchrones[i,]), center=TRUE, scale=scaled)
+		totts.cwt = Rwave::cwt(ts,totnoct,nvoice,w0,plot=0)
+		ts.cwt = totts.cwt[,s0log:(s0log+noctave*nvoice)]
+		#Normalization
+		sqs <- sqrt(2^(0:(noctave*nvoice)/nvoice)*s0)
+		sqres <- sweep(ts.cwt,2,sqs,'*')
+		sqres / max(Mod(sqres))
+	}
+
+	if (parll)
+	{
+		cl = parallel::makeCluster(ncores_clust)
+		parallel::clusterExport(cl,
+			varlist=c("synchrones","totnoct","nvoice","w0","s0log","noctave","s0","verbose"),
+			envir=environment())
+	}
+
+	# (normalized) observations node with CWT
+	Xcwt4 <-
+		if (parll)
+			parallel::parLapply(cl, seq_len(n), computeCWT)
+		else
+			lapply(seq_len(n), computeCWT)
+
+	if (parll)
+		parallel::stopCluster(cl)
+
+	Xwer_dist <- bigmemory::big.matrix(nrow=n, ncol=n, type="double")
+	fcoefs = rep(1/3, 3) #moving average on 3 values (TODO: very slow! correct?!)
+	if (verbose)
+		cat("*** Compute WER distances from CWT\n")
+
+	#TODO: computeDistances(i,j), et répartir les n(n-1)/2 couples d'indices
+	#là c'est trop déséquilibré
+
+	computeDistancesLineI = function(i)
+	{
+		if (verbose)
+			cat(paste("   Line ",i,"\n", sep=""))
+		for (j in (i+1):n)
+		{
+			#TODO: 'circular=TRUE' is wrong, should just take values on the sides; to rewrite in C
+			num <- filter(Mod(Xcwt4[[i]] * Conj(Xcwt4[[j]])), fcoefs, circular=TRUE)
+			WX <- filter(Mod(Xcwt4[[i]] * Conj(Xcwt4[[i]])), fcoefs, circular=TRUE)
+			WY <- filter(Mod(Xcwt4[[j]] * Conj(Xcwt4[[j]])), fcoefs, circular=TRUE)
+			wer2    <- sum(colSums(num)^2) / sum( sum(colSums(WX) * colSums(WY)) )
+			if (parll)
+				synchronicity::lock(m)
+			Xwer_dist[i,j] <- sqrt(delta * ncol(Xcwt4[[1]]) * (1 - wer2))
+			Xwer_dist[j,i] <- Xwer_dist[i,j]
+			if (parll)
+				synchronicity::unlock(m)
+		}
+		Xwer_dist[i,i] = 0.
+	}
+
+	parll = (requireNamespace("synchronicity",quietly=TRUE)
+		&& parll && Sys.info()['sysname'] != "Windows")
+	if (parll)
+		m <- synchronicity::boost.mutex()
+
+	ignored <-
+		if (parll)
+		{
+			parallel::mclapply(seq_len(n-1), computeDistancesLineI,
+				mc.cores=ncores_clust, mc.allow.recursive=FALSE)
+		}
+		else
+			lapply(seq_len(n-1), computeDistancesLineI)
+	Xwer_dist[n,n] = 0.
+
+	mat_dists = matrix(nrow=n, ncol=n)
+	#TODO: avoid this loop?
+	for (i in 1:n)
+		mat_dists[i,] = Xwer_dist[i,]
+	mat_dists
+