several fixes; still some issues db,bin != ascii,csv
authorBenjamin Auder <benjamin.auder@somewhere>
Tue, 14 Mar 2017 02:23:59 +0000 (03:23 +0100)
committerBenjamin Auder <benjamin.auder@somewhere>
Tue, 14 Mar 2017 02:23:59 +0000 (03:23 +0100)
epclust/R/clustering.R
epclust/R/computeSynchrones.R
epclust/R/computeWerDists.R
epclust/R/main.R

index 1774b19..3e7fd38 100644 (file)
@@ -25,6 +25,12 @@ NULL
 clusteringTask1 <- function(indices, getContribs, K1, algoClust1, nb_items_clust,
        ncores_clust=3, verbose=FALSE, parll=TRUE)
 {
+       if (verbose)
+               cat(paste("*** Clustering task 1 on ",length(indices)," series\n", sep=""))
+
+       if (length(indices) <= K1)
+               return (indices)
+
        if (parll)
        {
                # outfile=="" to see stderr/stdout on terminal
@@ -40,8 +46,6 @@ clusteringTask1 <- function(indices, getContribs, K1, algoClust1, nb_items_clust
        {
                # Balance tasks by splitting the indices set - as evenly as possible
                indices_workers <- .splitIndices(indices, nb_items_clust, min_size=K1+1)
-               if (verbose)
-                       cat(paste("*** [iterated] Clustering task 1 on ",length(indices)," series\n", sep=""))
                indices <-
                        if (parll)
                        {
@@ -56,6 +60,11 @@ clusteringTask1 <- function(indices, getContribs, K1, algoClust1, nb_items_clust
                                        inds[ algoClust1(getContribs(inds), K1) ]
                                ) )
                        }
+               if (verbose)
+               {
+                       cat(paste("*** [iterated] Clustering task 1: now ",
+                               length(indices)," medoids\n", sep=""))
+               }
        }
        if (parll)
                parallel::stopCluster(cl)
index f8d7a06..3c1959a 100644 (file)
@@ -4,30 +4,19 @@
 #' using euclidian distance.
 #'
 #' @param medoids matrix of K medoids curves in columns
-#' @param getSeries Function to retrieve series (argument: 'indices', integer vector),
-#'   as columns of a matrix
 #' @param nb_curves How many series? (this is known, at this stage)
 #' @inheritParams claws
+#' @inheritParams computeWerDists
 #'
 #' @return A matrix of K synchrones in columns (same length as the series)
 #'
 #' @export
 computeSynchrones <- function(medoids, getSeries, nb_curves,
-       nb_series_per_chunk, ncores_clust=3, verbose=FALSE, parll=TRUE)
+       nb_series_per_chunk, ncores=3, verbose=FALSE, parll=TRUE)
 {
        # Synchrones computation is embarassingly parallel: compute it by chunks of series
        computeSynchronesChunk <- function(indices)
        {
-               if (parll)
-               {
-                       require("epclust", quietly=TRUE)
-                       requireNamespace("synchronicity", quietly=TRUE)
-                       # The big.matrix objects need to be attached to be usable on the workers
-                       synchrones <- bigmemory::attach.big.matrix(synchrones_desc)
-                       medoids <- bigmemory::attach.big.matrix(medoids_desc)
-                       m <- synchronicity::attach.mutex(m_desc)
-               }
-
                # Obtain a chunk of reference series
                series_chunk <- getSeries(indices)
                nb_series_chunk <- ncol(series_chunk)
@@ -56,24 +45,9 @@ computeSynchrones <- function(medoids, getSeries, nb_curves,
        # NOTE: synchronicity is only for Linux & MacOS; on Windows: run sequentially
        parll <- (parll && requireNamespace("synchronicity",quietly=TRUE)
                && Sys.info()['sysname'] != "Windows")
+
        if (parll)
-       {
                m <- synchronicity::boost.mutex() #for lock/unlock, see computeSynchronesChunk
-               # mutex and big.matrix objects cannot be passed directly:
-               # they will be accessed from their description
-               m_desc <- synchronicity::describe(m)
-               synchrones_desc <- bigmemory::describe(synchrones)
-               medoids <- bigmemory::as.big.matrix(medoids)
-               medoids_desc <- bigmemory::describe(medoids)
-               # outfile=="" to see stderr/stdout on terminal
-               cl <-
-                       if (verbose)
-                               parallel::makeCluster(ncores_clust, outfile="")
-                       else
-                               parallel::makeCluster(ncores_clust)
-               parallel::clusterExport(cl, envir=environment(),
-                       varlist=c("synchrones_desc","m_desc","medoids_desc","getSeries"))
-       }
 
        if (verbose)
                cat(paste("--- Compute ",K," synchrones with ",nb_curves," series\n", sep=""))
@@ -82,12 +56,12 @@ computeSynchrones <- function(medoids, getSeries, nb_curves,
        indices_workers <- .splitIndices(seq_len(nb_curves), nb_series_per_chunk)
        ignored <-
                if (parll)
-                       parallel::parLapply(cl, indices_workers, computeSynchronesChunk)
+               {
+                       parallel::mclapply(indices_workers,
+                               function(inds) computeSynchronesChunk(inds), mc.cores=ncores)
+               }
                else
                        lapply(indices_workers, computeSynchronesChunk)
 
-       if (parll)
-               parallel::stopCluster(cl)
-
        return (synchrones[,])
 }
index 568a826..8eb755c 100644 (file)
@@ -4,14 +4,16 @@
 #' obtaind by \code{getSeries(indices)}
 #'
 #' @param indices Range of series indices to cluster
+#' @param getSeries Function to retrieve series (argument: 'indices', integer vector),
+#'   as columns of a matrix
+#' @param ncores Number of cores for parallel runs
 #' @inheritParams claws
-#' @inheritParams computeSynchrones
 #'
 #' @return A distances matrix of size K x K where K == length(indices)
 #'
 #' @export
 computeWerDists <- function(indices, getSeries, nb_series_per_chunk, smooth_lvl, nvoice,
-       nbytes, endian, ncores_clust=3, verbose=FALSE, parll=TRUE)
+       nbytes, endian, ncores=3, verbose=FALSE, parll=TRUE)
 {
        n <- length(indices)
        L <- length(getSeries(1)) #TODO: not very neat way to get L
@@ -27,12 +29,6 @@ computeWerDists <- function(indices, getSeries, nb_series_per_chunk, smooth_lvl,
        # Compute the getSeries(indices) CWT, and store the results in the binary file
        computeSaveCWT <- function(indices)
        {
-               if (parll)
-               {
-                       # parallel workers start with an empty environment
-                       require("epclust", quietly=TRUE)
-               }
-
                # Obtain CWT as big vectors of real part + imaginary part (concatenate)
                ts_cwt <- sapply(indices, function(i) {
                        ts <- scale(ts(getSeries(i)), center=TRUE, scale=FALSE)
@@ -54,7 +50,7 @@ computeWerDists <- function(indices, getSeries, nb_series_per_chunk, smooth_lvl,
                re_part + 1i * im_part
        }
 
-       # Compute distance between columns i and j for j>i
+       # Compute distances between columns i and j for j>i
        computeDistances <- function(i)
        {
                if (parll)
@@ -87,30 +83,30 @@ computeWerDists <- function(indices, getSeries, nb_series_per_chunk, smooth_lvl,
                Xwer_dist[i,i] <- 0.
        }
 
+       if (verbose)
+               cat(paste("--- Precompute and serialize synchrones CWT\n", sep=""))
+
+       # Split indices by packets of length at most nb_cwt_per_chunk
+       indices_cwt <- .splitIndices(seq_len(n), nb_cwt_per_chunk)
+       # NOTE: next loop could potentially be run in //. Indices would be permuted (by
+       # serialization order), and synchronicity would be required because of concurrent
+       # writes. Probably not worth the effort - but possible to gain some bits of speed.
+       for (inds in indices_cwt)
+               computeSaveCWT(inds)
+
        if (parll)
        {
                # outfile=="" to see stderr/stdout on terminal
                cl <-
                        if (verbose)
-                               parallel::makeCluster(ncores_clust, outfile="")
+                               parallel::makeCluster(ncores, outfile="")
                        else
-                               parallel::makeCluster(ncores_clust)
+                               parallel::makeCluster(ncores)
                Xwer_dist_desc <- bigmemory::describe(Xwer_dist)
-               parallel::clusterExport(cl, varlist=c("parll","nb_cwt_per_chunk","n","L",
-                       "Xwer_dist_desc","noctave","nvoice","getCWT"), envir=environment())
+               parallel::clusterExport(cl, envir=environment(),
+                       varlist=c("parll","n","L","Xwer_dist_desc","getCWT","verbose"))
        }
 
-       if (verbose)
-               cat(paste("--- Precompute and serialize synchrones CWT\n", sep=""))
-
-       # Split indices by packets of length at most nb_cwt_per_chunk
-       indices_cwt <- .splitIndices(seq_len(n), nb_cwt_per_chunk)
-       ignored <-
-               if (parll)
-                       parallel::parLapply(cl, indices_cwt, computeSaveCWT)
-               else
-                       lapply(indices_cwt, computeSaveCWT)
-
        if (verbose)
                cat(paste("--- Compute WER distances\n", sep=""))
 
index 6d3c842..e3fa807 100644 (file)
@@ -71,6 +71,7 @@
 #' @param endian Endianness for (de)serialization: "little" or "big"
 #' @param verbose FALSE: nothing printed; TRUE: some execution traces
 #' @param parll TRUE: run in parallel. FALSE: run sequentially
+#' @param reuse_bin Re-use previously stored binary series and contributions
 #'
 #' @return A list:
 #' \itemize{
 #' library(wmtsa)
 #' series <- do.call( cbind, lapply( 1:6, function(i)
 #'   do.call(cbind, wmtsa::wavBootstrap(ref_series[,i], n.realization=40)) ) )
+#' # Mix series so that all groups are evenly spread
+#' permut <- (0:239)%%6 * 40 + (0:239)%/%6 + 1
+#' series = series[,permut]
 #' #dim(series) #c(240,1001)
-#' res_ascii <- claws(series, K1=30, K2=6, 100, verbose=TRUE)
+#' res_ascii <- claws(series, K1=30, K2=6, 100, random=FALSE, verbose=TRUE)
 #'
 #' # Same example, from CSV file
 #' csv_file <- tempfile(pattern="epclust_series.csv_")
 #' write.table(t(series), csv_file, sep=",", row.names=FALSE, col.names=FALSE)
-#' res_csv <- claws(csv_file, K1=30, K2=6, 100)
+#' res_csv <- claws(csv_file, K1=30, K2=6, 100, random=FALSE)
 #'
 #' # Same example, from binary file
 #' bin_file <- tempfile(pattern="epclust_series.bin_")
 #' nbytes <- 8
 #' endian <- "little"
-#' binarize(csv_file, bin_file, 500, nbytes, endian)
+#' binarize(csv_file, bin_file, 500, ",", nbytes, endian)
 #' getSeries <- function(indices) getDataInFile(indices, bin_file, nbytes, endian)
-#' res_bin <- claws(getSeries, K1=30, K2=6, 100)
+#' res_bin <- claws(getSeries, K1=30, K2=6, 100, random=FALSE)
 #' unlink(csv_file)
 #' unlink(bin_file)
 #'
 #' library(DBI)
 #' series_db <- dbConnect(RSQLite::SQLite(), "file::memory:")
 #' # Prepare data.frame in DB-format
-#' n <- nrow(series)
-#' time_values <- data.frame(
-#'   id <- rep(1:n,each=L),
-#'   time <- rep( as.POSIXct(1800*(0:n),"GMT",origin="2001-01-01"), L ),
-#'   value <- as.double(t(series)) )
+#' n <- ncol(series)
+#' times_values <- data.frame(
+#'   id = rep(1:n,each=L),
+#'   time = rep( as.POSIXct(1800*(1:L),"GMT",origin="2001-01-01"), n ),
+#'   value = as.double(series) )
 #' dbWriteTable(series_db, "times_values", times_values)
 #' # Fill associative array, map index to identifier
 #' indexToID_inDB <- as.character(
-#'   dbGetQuery(series_db, 'SELECT DISTINCT id FROM time_values')[,"id"] )
+#'   dbGetQuery(series_db, 'SELECT DISTINCT id FROM times_values')[,"id"] )
 #' serie_length <- as.integer( dbGetQuery(series_db,
-#'   paste("SELECT COUNT * FROM time_values WHERE id == ",indexToID_inDB[1],sep="")) )
+#'   paste("SELECT COUNT(*) FROM times_values WHERE id == ",indexToID_inDB[1],sep="")) )
 #' getSeries <- function(indices) {
 #'   request <- "SELECT id,value FROM times_values WHERE id in ("
-#'   for (i in indices)
-#'     request <- paste(request, indexToID_inDB[i], ",", sep="")
+#'   for (i in seq_along(indices)) {
+#'     request <- paste(request, indexToID_inDB[ indices[i] ],  sep="")
+#'     if (i < length(indices))
+#'       request <- paste(request, ",", sep="")
+#'   }
 #'   request <- paste(request, ")", sep="")
 #'   df_series <- dbGetQuery(series_db, request)
-#'   if (length(df_series) >= 1)
-#'     as.matrix(df_series[,"value"], nrow=serie_length)
+#'   if (nrow(df_series) >= 1)
+#'     matrix(df_series[,"value"], nrow=serie_length)
 #'   else
 #'     NULL
 #' }
-#' res_db <- claws(getSeries, K1=30, K2=6, 100))
+#' # reuse_bin==FALSE: DB do not garantee ordering
+#' res_db <- claws(getSeries, K1=30, K2=6, 100, random=FALSE, reuse_bin=FALSE)
 #' dbDisconnect(series_db)
 #'
-#' # All results should be the same:
-#' library(digest)
-#' digest::sha1(res_ascii)
-#' digest::sha1(res_csv)
-#' digest::sha1(res_bin)
-#' digest::sha1(res_db)
+#' # All results should be equal:
+#' all(res_ascii$ranks == res_csv$ranks
+#'   & res_ascii$ranks == res_bin$ranks
+#'   & res_ascii$ranks == res_db$ranks)
 #' }
 #' @export
 claws <- function(series, K1, K2, nb_series_per_chunk, nb_items_clust=7*K1,
@@ -155,8 +161,13 @@ claws <- function(series, K1, K2, nb_series_per_chunk, nb_items_clust=7*K1,
        algoClust2=function(dists,K) cluster::pam(dists,K,diss=TRUE,pamonce=1)$id.med,
        wav_filt="d8", contrib_type="absolute", WER="end", smooth_lvl=3, nvoice=4,
        random=TRUE, ntasks=1, ncores_tasks=1, ncores_clust=3, sep=",", nbytes=4,
-       endian=.Platform$endian, verbose=FALSE, parll=TRUE)
+       endian=.Platform$endian, verbose=FALSE, parll=TRUE, reuse_bin=TRUE)
 {
+
+
+#TODO: comprendre differences.......... debuguer getSeries for DB
+
+
        # Check/transform arguments
        if (!is.matrix(series) && !bigmemory::is.big.matrix(series)
                && !is.function(series)
@@ -195,8 +206,11 @@ claws <- function(series, K1, K2, nb_series_per_chunk, nb_items_clust=7*K1,
                if (verbose)
                        cat("...Serialize time-series (or retrieve past binary file)\n")
                series_file <- ".series.epclust.bin"
-               if (!file.exists(series_file))
+               if (!file.exists(series_file) || !reuse_bin)
+               {
+                       unlink(series_file,)
                        binarize(series, series_file, nb_series_per_chunk, sep, nbytes, endian)
+               }
                getSeries <- function(inds) getDataInFile(inds, series_file, nbytes, endian)
        }
        else
@@ -204,12 +218,11 @@ claws <- function(series, K1, K2, nb_series_per_chunk, nb_items_clust=7*K1,
 
        # Serialize all computed wavelets contributions into a file
        contribs_file <- ".contribs.epclust.bin"
-       index <- 1
-       nb_curves <- 0
        if (verbose)
                cat("...Compute contributions and serialize them (or retrieve past binary file)\n")
-       if (!file.exists(contribs_file))
+       if (!file.exists(contribs_file) || !reuse_bin)
        {
+               unlink(contribs_file,)
                nb_curves <- binarizeTransform(getSeries,
                        function(curves) curvesToContribs(curves, wav_filt, contrib_type),
                        contribs_file, nb_series_per_chunk, nbytes, endian)
@@ -305,18 +318,13 @@ claws <- function(series, K1, K2, nb_series_per_chunk, nb_items_clust=7*K1,
        # it's better to just re-use ncores_clust
        ncores_last_stage <- ncores_clust
 
-
-
-#TODO: here, save all inputs to clusteringTask2 and compare :: must have differences...
-
-
-
        # Run last clustering tasks to obtain only K2 medoids indices
        if (verbose)
                cat("...Run final // stage 1 + stage 2\n")
        indices_medoids <- clusteringTask1(indices_medoids_all, getContribs, K1, algoClust1,
                nb_items_clust, ncores_tasks*ncores_clust, verbose, parll)
-       indices_medoids <- clusteringTask2(indices_medoids, getContribs, K2, algoClust2,
+
+       indices_medoids <- clusteringTask2(indices_medoids, getSeries, K2, algoClust2,
                nb_series_per_chunk,smooth_lvl,nvoice,nbytes,endian,ncores_last_stage,verbose,parll)
 
        # Compute synchrones, that is to say the cumulated power consumptions for each of the K2