From dc86eb0c992e6e4ab119d48398d040c4cf3a75fd Mon Sep 17 00:00:00 2001
From: Benjamin Auder <benjamin.auder@somewhere>
Date: Tue, 14 Mar 2017 03:23:59 +0100
Subject: [PATCH] several fixes; still some issues db,bin != ascii,csv

---
 epclust/R/clustering.R        | 13 +++++-
 epclust/R/computeSynchrones.R | 40 ++++--------------
 epclust/R/computeWerDists.R   | 44 +++++++++-----------
 epclust/R/main.R              | 76 +++++++++++++++++++----------------
 4 files changed, 80 insertions(+), 93 deletions(-)

diff --git a/epclust/R/clustering.R b/epclust/R/clustering.R
index 1774b19..3e7fd38 100644
--- a/epclust/R/clustering.R
+++ b/epclust/R/clustering.R
@@ -25,6 +25,12 @@ NULL
 clusteringTask1 <- function(indices, getContribs, K1, algoClust1, nb_items_clust,
 	ncores_clust=3, verbose=FALSE, parll=TRUE)
 {
+	if (verbose)
+		cat(paste("*** Clustering task 1 on ",length(indices)," series\n", sep=""))
+
+	if (length(indices) <= K1)
+		return (indices)
+
 	if (parll)
 	{
 		# outfile=="" to see stderr/stdout on terminal
@@ -40,8 +46,6 @@ clusteringTask1 <- function(indices, getContribs, K1, algoClust1, nb_items_clust
 	{
 		# Balance tasks by splitting the indices set - as evenly as possible
 		indices_workers <- .splitIndices(indices, nb_items_clust, min_size=K1+1)
-		if (verbose)
-			cat(paste("*** [iterated] Clustering task 1 on ",length(indices)," series\n", sep=""))
 		indices <-
 			if (parll)
 			{
@@ -56,6 +60,11 @@ clusteringTask1 <- function(indices, getContribs, K1, algoClust1, nb_items_clust
 					inds[ algoClust1(getContribs(inds), K1) ]
 				) )
 			}
+		if (verbose)
+		{
+			cat(paste("*** [iterated] Clustering task 1: now ",
+				length(indices)," medoids\n", sep=""))
+		}
 	}
 	if (parll)
 		parallel::stopCluster(cl)
diff --git a/epclust/R/computeSynchrones.R b/epclust/R/computeSynchrones.R
index f8d7a06..3c1959a 100644
--- a/epclust/R/computeSynchrones.R
+++ b/epclust/R/computeSynchrones.R
@@ -4,30 +4,19 @@
 #' using euclidian distance.
 #'
 #' @param medoids matrix of K medoids curves in columns
-#' @param getSeries Function to retrieve series (argument: 'indices', integer vector),
-#'   as columns of a matrix
 #' @param nb_curves How many series? (this is known, at this stage)
 #' @inheritParams claws
+#' @inheritParams computeWerDists
 #'
 #' @return A matrix of K synchrones in columns (same length as the series)
 #'
 #' @export
 computeSynchrones <- function(medoids, getSeries, nb_curves,
-	nb_series_per_chunk, ncores_clust=3, verbose=FALSE, parll=TRUE)
+	nb_series_per_chunk, ncores=3, verbose=FALSE, parll=TRUE)
 {
 	# Synchrones computation is embarassingly parallel: compute it by chunks of series
 	computeSynchronesChunk <- function(indices)
 	{
-		if (parll)
-		{
-			require("epclust", quietly=TRUE)
-			requireNamespace("synchronicity", quietly=TRUE)
-			# The big.matrix objects need to be attached to be usable on the workers
-			synchrones <- bigmemory::attach.big.matrix(synchrones_desc)
-			medoids <- bigmemory::attach.big.matrix(medoids_desc)
-			m <- synchronicity::attach.mutex(m_desc)
-		}
-
 		# Obtain a chunk of reference series
 		series_chunk <- getSeries(indices)
 		nb_series_chunk <- ncol(series_chunk)
@@ -56,24 +45,9 @@ computeSynchrones <- function(medoids, getSeries, nb_curves,
 	# NOTE: synchronicity is only for Linux & MacOS; on Windows: run sequentially
 	parll <- (parll && requireNamespace("synchronicity",quietly=TRUE)
 		&& Sys.info()['sysname'] != "Windows")
+
 	if (parll)
-	{
 		m <- synchronicity::boost.mutex() #for lock/unlock, see computeSynchronesChunk
-		# mutex and big.matrix objects cannot be passed directly:
-		# they will be accessed from their description
-		m_desc <- synchronicity::describe(m)
-		synchrones_desc <- bigmemory::describe(synchrones)
-		medoids <- bigmemory::as.big.matrix(medoids)
-		medoids_desc <- bigmemory::describe(medoids)
-		# outfile=="" to see stderr/stdout on terminal
-		cl <-
-			if (verbose)
-				parallel::makeCluster(ncores_clust, outfile="")
-			else
-				parallel::makeCluster(ncores_clust)
-		parallel::clusterExport(cl, envir=environment(),
-			varlist=c("synchrones_desc","m_desc","medoids_desc","getSeries"))
-	}
 
 	if (verbose)
 		cat(paste("--- Compute ",K," synchrones with ",nb_curves," series\n", sep=""))
@@ -82,12 +56,12 @@ computeSynchrones <- function(medoids, getSeries, nb_curves,
 	indices_workers <- .splitIndices(seq_len(nb_curves), nb_series_per_chunk)
 	ignored <-
 		if (parll)
-			parallel::parLapply(cl, indices_workers, computeSynchronesChunk)
+		{
+			parallel::mclapply(indices_workers,
+				function(inds) computeSynchronesChunk(inds), mc.cores=ncores)
+		}
 		else
 			lapply(indices_workers, computeSynchronesChunk)
 
-	if (parll)
-		parallel::stopCluster(cl)
-
 	return (synchrones[,])
 }
diff --git a/epclust/R/computeWerDists.R b/epclust/R/computeWerDists.R
index 568a826..8eb755c 100644
--- a/epclust/R/computeWerDists.R
+++ b/epclust/R/computeWerDists.R
@@ -4,14 +4,16 @@
 #' obtaind by \code{getSeries(indices)}
 #'
 #' @param indices Range of series indices to cluster
+#' @param getSeries Function to retrieve series (argument: 'indices', integer vector),
+#'   as columns of a matrix
+#' @param ncores Number of cores for parallel runs
 #' @inheritParams claws
-#' @inheritParams computeSynchrones
 #'
 #' @return A distances matrix of size K x K where K == length(indices)
 #'
 #' @export
 computeWerDists <- function(indices, getSeries, nb_series_per_chunk, smooth_lvl, nvoice,
-	nbytes, endian, ncores_clust=3, verbose=FALSE, parll=TRUE)
+	nbytes, endian, ncores=3, verbose=FALSE, parll=TRUE)
 {
 	n <- length(indices)
 	L <- length(getSeries(1)) #TODO: not very neat way to get L
@@ -27,12 +29,6 @@ computeWerDists <- function(indices, getSeries, nb_series_per_chunk, smooth_lvl,
 	# Compute the getSeries(indices) CWT, and store the results in the binary file
 	computeSaveCWT <- function(indices)
 	{
-		if (parll)
-		{
-			# parallel workers start with an empty environment
-			require("epclust", quietly=TRUE)
-		}
-
 		# Obtain CWT as big vectors of real part + imaginary part (concatenate)
 		ts_cwt <- sapply(indices, function(i) {
 			ts <- scale(ts(getSeries(i)), center=TRUE, scale=FALSE)
@@ -54,7 +50,7 @@ computeWerDists <- function(indices, getSeries, nb_series_per_chunk, smooth_lvl,
 		re_part + 1i * im_part
 	}
 
-	# Compute distance between columns i and j for j>i
+	# Compute distances between columns i and j for j>i
 	computeDistances <- function(i)
 	{
 		if (parll)
@@ -87,30 +83,30 @@ computeWerDists <- function(indices, getSeries, nb_series_per_chunk, smooth_lvl,
 		Xwer_dist[i,i] <- 0.
 	}
 
+	if (verbose)
+		cat(paste("--- Precompute and serialize synchrones CWT\n", sep=""))
+
+	# Split indices by packets of length at most nb_cwt_per_chunk
+	indices_cwt <- .splitIndices(seq_len(n), nb_cwt_per_chunk)
+	# NOTE: next loop could potentially be run in //. Indices would be permuted (by
+	# serialization order), and synchronicity would be required because of concurrent
+	# writes. Probably not worth the effort - but possible to gain some bits of speed.
+	for (inds in indices_cwt)
+		computeSaveCWT(inds)
+
 	if (parll)
 	{
 		# outfile=="" to see stderr/stdout on terminal
 		cl <-
 			if (verbose)
-				parallel::makeCluster(ncores_clust, outfile="")
+				parallel::makeCluster(ncores, outfile="")
 			else
-				parallel::makeCluster(ncores_clust)
+				parallel::makeCluster(ncores)
 		Xwer_dist_desc <- bigmemory::describe(Xwer_dist)
-		parallel::clusterExport(cl, varlist=c("parll","nb_cwt_per_chunk","n","L",
-			"Xwer_dist_desc","noctave","nvoice","getCWT"), envir=environment())
+		parallel::clusterExport(cl, envir=environment(),
+			varlist=c("parll","n","L","Xwer_dist_desc","getCWT","verbose"))
 	}
 
-	if (verbose)
-		cat(paste("--- Precompute and serialize synchrones CWT\n", sep=""))
-
-	# Split indices by packets of length at most nb_cwt_per_chunk
-	indices_cwt <- .splitIndices(seq_len(n), nb_cwt_per_chunk)
-	ignored <-
-		if (parll)
-			parallel::parLapply(cl, indices_cwt, computeSaveCWT)
-		else
-			lapply(indices_cwt, computeSaveCWT)
-
 	if (verbose)
 		cat(paste("--- Compute WER distances\n", sep=""))
 
diff --git a/epclust/R/main.R b/epclust/R/main.R
index 6d3c842..e3fa807 100644
--- a/epclust/R/main.R
+++ b/epclust/R/main.R
@@ -71,6 +71,7 @@
 #' @param endian Endianness for (de)serialization: "little" or "big"
 #' @param verbose FALSE: nothing printed; TRUE: some execution traces
 #' @param parll TRUE: run in parallel. FALSE: run sequentially
+#' @param reuse_bin Re-use previously stored binary series and contributions
 #'
 #' @return A list:
 #' \itemize{
@@ -95,21 +96,24 @@
 #' library(wmtsa)
 #' series <- do.call( cbind, lapply( 1:6, function(i)
 #'   do.call(cbind, wmtsa::wavBootstrap(ref_series[,i], n.realization=40)) ) )
+#' # Mix series so that all groups are evenly spread
+#' permut <- (0:239)%%6 * 40 + (0:239)%/%6 + 1
+#' series = series[,permut]
 #' #dim(series) #c(240,1001)
-#' res_ascii <- claws(series, K1=30, K2=6, 100, verbose=TRUE)
+#' res_ascii <- claws(series, K1=30, K2=6, 100, random=FALSE, verbose=TRUE)
 #'
 #' # Same example, from CSV file
 #' csv_file <- tempfile(pattern="epclust_series.csv_")
 #' write.table(t(series), csv_file, sep=",", row.names=FALSE, col.names=FALSE)
-#' res_csv <- claws(csv_file, K1=30, K2=6, 100)
+#' res_csv <- claws(csv_file, K1=30, K2=6, 100, random=FALSE)
 #'
 #' # Same example, from binary file
 #' bin_file <- tempfile(pattern="epclust_series.bin_")
 #' nbytes <- 8
 #' endian <- "little"
-#' binarize(csv_file, bin_file, 500, nbytes, endian)
+#' binarize(csv_file, bin_file, 500, ",", nbytes, endian)
 #' getSeries <- function(indices) getDataInFile(indices, bin_file, nbytes, endian)
-#' res_bin <- claws(getSeries, K1=30, K2=6, 100)
+#' res_bin <- claws(getSeries, K1=30, K2=6, 100, random=FALSE)
 #' unlink(csv_file)
 #' unlink(bin_file)
 #'
@@ -117,37 +121,39 @@
 #' library(DBI)
 #' series_db <- dbConnect(RSQLite::SQLite(), "file::memory:")
 #' # Prepare data.frame in DB-format
-#' n <- nrow(series)
-#' time_values <- data.frame(
-#'   id <- rep(1:n,each=L),
-#'   time <- rep( as.POSIXct(1800*(0:n),"GMT",origin="2001-01-01"), L ),
-#'   value <- as.double(t(series)) )
+#' n <- ncol(series)
+#' times_values <- data.frame(
+#'   id = rep(1:n,each=L),
+#'   time = rep( as.POSIXct(1800*(1:L),"GMT",origin="2001-01-01"), n ),
+#'   value = as.double(series) )
 #' dbWriteTable(series_db, "times_values", times_values)
 #' # Fill associative array, map index to identifier
 #' indexToID_inDB <- as.character(
-#'   dbGetQuery(series_db, 'SELECT DISTINCT id FROM time_values')[,"id"] )
+#'   dbGetQuery(series_db, 'SELECT DISTINCT id FROM times_values')[,"id"] )
 #' serie_length <- as.integer( dbGetQuery(series_db,
-#'   paste("SELECT COUNT * FROM time_values WHERE id == ",indexToID_inDB[1],sep="")) )
+#'   paste("SELECT COUNT(*) FROM times_values WHERE id == ",indexToID_inDB[1],sep="")) )
 #' getSeries <- function(indices) {
 #'   request <- "SELECT id,value FROM times_values WHERE id in ("
-#'   for (i in indices)
-#'     request <- paste(request, indexToID_inDB[i], ",", sep="")
+#'   for (i in seq_along(indices)) {
+#'     request <- paste(request, indexToID_inDB[ indices[i] ],  sep="")
+#'     if (i < length(indices))
+#'       request <- paste(request, ",", sep="")
+#'   }
 #'   request <- paste(request, ")", sep="")
 #'   df_series <- dbGetQuery(series_db, request)
-#'   if (length(df_series) >= 1)
-#'     as.matrix(df_series[,"value"], nrow=serie_length)
+#'   if (nrow(df_series) >= 1)
+#'     matrix(df_series[,"value"], nrow=serie_length)
 #'   else
 #'     NULL
 #' }
-#' res_db <- claws(getSeries, K1=30, K2=6, 100))
+#' # reuse_bin==FALSE: DB do not garantee ordering
+#' res_db <- claws(getSeries, K1=30, K2=6, 100, random=FALSE, reuse_bin=FALSE)
 #' dbDisconnect(series_db)
 #'
-#' # All results should be the same:
-#' library(digest)
-#' digest::sha1(res_ascii)
-#' digest::sha1(res_csv)
-#' digest::sha1(res_bin)
-#' digest::sha1(res_db)
+#' # All results should be equal:
+#' all(res_ascii$ranks == res_csv$ranks
+#'   & res_ascii$ranks == res_bin$ranks
+#'   & res_ascii$ranks == res_db$ranks)
 #' }
 #' @export
 claws <- function(series, K1, K2, nb_series_per_chunk, nb_items_clust=7*K1,
@@ -155,8 +161,13 @@ claws <- function(series, K1, K2, nb_series_per_chunk, nb_items_clust=7*K1,
 	algoClust2=function(dists,K) cluster::pam(dists,K,diss=TRUE,pamonce=1)$id.med,
 	wav_filt="d8", contrib_type="absolute", WER="end", smooth_lvl=3, nvoice=4,
 	random=TRUE, ntasks=1, ncores_tasks=1, ncores_clust=3, sep=",", nbytes=4,
-	endian=.Platform$endian, verbose=FALSE, parll=TRUE)
+	endian=.Platform$endian, verbose=FALSE, parll=TRUE, reuse_bin=TRUE)
 {
+
+
+#TODO: comprendre differences.......... debuguer getSeries for DB
+
+
 	# Check/transform arguments
 	if (!is.matrix(series) && !bigmemory::is.big.matrix(series)
 		&& !is.function(series)
@@ -195,8 +206,11 @@ claws <- function(series, K1, K2, nb_series_per_chunk, nb_items_clust=7*K1,
 		if (verbose)
 			cat("...Serialize time-series (or retrieve past binary file)\n")
 		series_file <- ".series.epclust.bin"
-		if (!file.exists(series_file))
+		if (!file.exists(series_file) || !reuse_bin)
+		{
+			unlink(series_file,)
 			binarize(series, series_file, nb_series_per_chunk, sep, nbytes, endian)
+		}
 		getSeries <- function(inds) getDataInFile(inds, series_file, nbytes, endian)
 	}
 	else
@@ -204,12 +218,11 @@ claws <- function(series, K1, K2, nb_series_per_chunk, nb_items_clust=7*K1,
 
 	# Serialize all computed wavelets contributions into a file
 	contribs_file <- ".contribs.epclust.bin"
-	index <- 1
-	nb_curves <- 0
 	if (verbose)
 		cat("...Compute contributions and serialize them (or retrieve past binary file)\n")
-	if (!file.exists(contribs_file))
+	if (!file.exists(contribs_file) || !reuse_bin)
 	{
+		unlink(contribs_file,)
 		nb_curves <- binarizeTransform(getSeries,
 			function(curves) curvesToContribs(curves, wav_filt, contrib_type),
 			contribs_file, nb_series_per_chunk, nbytes, endian)
@@ -305,18 +318,13 @@ claws <- function(series, K1, K2, nb_series_per_chunk, nb_items_clust=7*K1,
 	# it's better to just re-use ncores_clust
 	ncores_last_stage <- ncores_clust
 
-
-
-#TODO: here, save all inputs to clusteringTask2 and compare :: must have differences...
-
-
-
 	# Run last clustering tasks to obtain only K2 medoids indices
 	if (verbose)
 		cat("...Run final // stage 1 + stage 2\n")
 	indices_medoids <- clusteringTask1(indices_medoids_all, getContribs, K1, algoClust1,
 		nb_items_clust, ncores_tasks*ncores_clust, verbose, parll)
-	indices_medoids <- clusteringTask2(indices_medoids, getContribs, K2, algoClust2,
+
+	indices_medoids <- clusteringTask2(indices_medoids, getSeries, K2, algoClust2,
 		nb_series_per_chunk,smooth_lvl,nvoice,nbytes,endian,ncores_last_stage,verbose,parll)
 
 	# Compute synchrones, that is to say the cumulated power consumptions for each of the K2
-- 
2.44.0