From: Benjamin Auder Date: Tue, 14 Mar 2017 02:23:59 +0000 (+0100) Subject: several fixes; still some issues db,bin != ascii,csv X-Git-Url: https://git.auder.net/%7B%7B%20asset%28%27mixstore/css/img/pieces/cr.svg?a=commitdiff_plain;h=dc86eb0c992e6e4ab119d48398d040c4cf3a75fd;p=epclust.git several fixes; still some issues db,bin != ascii,csv --- diff --git a/epclust/R/clustering.R b/epclust/R/clustering.R index 1774b19..3e7fd38 100644 --- a/epclust/R/clustering.R +++ b/epclust/R/clustering.R @@ -25,6 +25,12 @@ NULL clusteringTask1 <- function(indices, getContribs, K1, algoClust1, nb_items_clust, ncores_clust=3, verbose=FALSE, parll=TRUE) { + if (verbose) + cat(paste("*** Clustering task 1 on ",length(indices)," series\n", sep="")) + + if (length(indices) <= K1) + return (indices) + if (parll) { # outfile=="" to see stderr/stdout on terminal @@ -40,8 +46,6 @@ clusteringTask1 <- function(indices, getContribs, K1, algoClust1, nb_items_clust { # Balance tasks by splitting the indices set - as evenly as possible indices_workers <- .splitIndices(indices, nb_items_clust, min_size=K1+1) - if (verbose) - cat(paste("*** [iterated] Clustering task 1 on ",length(indices)," series\n", sep="")) indices <- if (parll) { @@ -56,6 +60,11 @@ clusteringTask1 <- function(indices, getContribs, K1, algoClust1, nb_items_clust inds[ algoClust1(getContribs(inds), K1) ] ) ) } + if (verbose) + { + cat(paste("*** [iterated] Clustering task 1: now ", + length(indices)," medoids\n", sep="")) + } } if (parll) parallel::stopCluster(cl) diff --git a/epclust/R/computeSynchrones.R b/epclust/R/computeSynchrones.R index f8d7a06..3c1959a 100644 --- a/epclust/R/computeSynchrones.R +++ b/epclust/R/computeSynchrones.R @@ -4,30 +4,19 @@ #' using euclidian distance. #' #' @param medoids matrix of K medoids curves in columns -#' @param getSeries Function to retrieve series (argument: 'indices', integer vector), -#' as columns of a matrix #' @param nb_curves How many series? (this is known, at this stage) #' @inheritParams claws +#' @inheritParams computeWerDists #' #' @return A matrix of K synchrones in columns (same length as the series) #' #' @export computeSynchrones <- function(medoids, getSeries, nb_curves, - nb_series_per_chunk, ncores_clust=3, verbose=FALSE, parll=TRUE) + nb_series_per_chunk, ncores=3, verbose=FALSE, parll=TRUE) { # Synchrones computation is embarassingly parallel: compute it by chunks of series computeSynchronesChunk <- function(indices) { - if (parll) - { - require("epclust", quietly=TRUE) - requireNamespace("synchronicity", quietly=TRUE) - # The big.matrix objects need to be attached to be usable on the workers - synchrones <- bigmemory::attach.big.matrix(synchrones_desc) - medoids <- bigmemory::attach.big.matrix(medoids_desc) - m <- synchronicity::attach.mutex(m_desc) - } - # Obtain a chunk of reference series series_chunk <- getSeries(indices) nb_series_chunk <- ncol(series_chunk) @@ -56,24 +45,9 @@ computeSynchrones <- function(medoids, getSeries, nb_curves, # NOTE: synchronicity is only for Linux & MacOS; on Windows: run sequentially parll <- (parll && requireNamespace("synchronicity",quietly=TRUE) && Sys.info()['sysname'] != "Windows") + if (parll) - { m <- synchronicity::boost.mutex() #for lock/unlock, see computeSynchronesChunk - # mutex and big.matrix objects cannot be passed directly: - # they will be accessed from their description - m_desc <- synchronicity::describe(m) - synchrones_desc <- bigmemory::describe(synchrones) - medoids <- bigmemory::as.big.matrix(medoids) - medoids_desc <- bigmemory::describe(medoids) - # outfile=="" to see stderr/stdout on terminal - cl <- - if (verbose) - parallel::makeCluster(ncores_clust, outfile="") - else - parallel::makeCluster(ncores_clust) - parallel::clusterExport(cl, envir=environment(), - varlist=c("synchrones_desc","m_desc","medoids_desc","getSeries")) - } if (verbose) cat(paste("--- Compute ",K," synchrones with ",nb_curves," series\n", sep="")) @@ -82,12 +56,12 @@ computeSynchrones <- function(medoids, getSeries, nb_curves, indices_workers <- .splitIndices(seq_len(nb_curves), nb_series_per_chunk) ignored <- if (parll) - parallel::parLapply(cl, indices_workers, computeSynchronesChunk) + { + parallel::mclapply(indices_workers, + function(inds) computeSynchronesChunk(inds), mc.cores=ncores) + } else lapply(indices_workers, computeSynchronesChunk) - if (parll) - parallel::stopCluster(cl) - return (synchrones[,]) } diff --git a/epclust/R/computeWerDists.R b/epclust/R/computeWerDists.R index 568a826..8eb755c 100644 --- a/epclust/R/computeWerDists.R +++ b/epclust/R/computeWerDists.R @@ -4,14 +4,16 @@ #' obtaind by \code{getSeries(indices)} #' #' @param indices Range of series indices to cluster +#' @param getSeries Function to retrieve series (argument: 'indices', integer vector), +#' as columns of a matrix +#' @param ncores Number of cores for parallel runs #' @inheritParams claws -#' @inheritParams computeSynchrones #' #' @return A distances matrix of size K x K where K == length(indices) #' #' @export computeWerDists <- function(indices, getSeries, nb_series_per_chunk, smooth_lvl, nvoice, - nbytes, endian, ncores_clust=3, verbose=FALSE, parll=TRUE) + nbytes, endian, ncores=3, verbose=FALSE, parll=TRUE) { n <- length(indices) L <- length(getSeries(1)) #TODO: not very neat way to get L @@ -27,12 +29,6 @@ computeWerDists <- function(indices, getSeries, nb_series_per_chunk, smooth_lvl, # Compute the getSeries(indices) CWT, and store the results in the binary file computeSaveCWT <- function(indices) { - if (parll) - { - # parallel workers start with an empty environment - require("epclust", quietly=TRUE) - } - # Obtain CWT as big vectors of real part + imaginary part (concatenate) ts_cwt <- sapply(indices, function(i) { ts <- scale(ts(getSeries(i)), center=TRUE, scale=FALSE) @@ -54,7 +50,7 @@ computeWerDists <- function(indices, getSeries, nb_series_per_chunk, smooth_lvl, re_part + 1i * im_part } - # Compute distance between columns i and j for j>i + # Compute distances between columns i and j for j>i computeDistances <- function(i) { if (parll) @@ -87,30 +83,30 @@ computeWerDists <- function(indices, getSeries, nb_series_per_chunk, smooth_lvl, Xwer_dist[i,i] <- 0. } + if (verbose) + cat(paste("--- Precompute and serialize synchrones CWT\n", sep="")) + + # Split indices by packets of length at most nb_cwt_per_chunk + indices_cwt <- .splitIndices(seq_len(n), nb_cwt_per_chunk) + # NOTE: next loop could potentially be run in //. Indices would be permuted (by + # serialization order), and synchronicity would be required because of concurrent + # writes. Probably not worth the effort - but possible to gain some bits of speed. + for (inds in indices_cwt) + computeSaveCWT(inds) + if (parll) { # outfile=="" to see stderr/stdout on terminal cl <- if (verbose) - parallel::makeCluster(ncores_clust, outfile="") + parallel::makeCluster(ncores, outfile="") else - parallel::makeCluster(ncores_clust) + parallel::makeCluster(ncores) Xwer_dist_desc <- bigmemory::describe(Xwer_dist) - parallel::clusterExport(cl, varlist=c("parll","nb_cwt_per_chunk","n","L", - "Xwer_dist_desc","noctave","nvoice","getCWT"), envir=environment()) + parallel::clusterExport(cl, envir=environment(), + varlist=c("parll","n","L","Xwer_dist_desc","getCWT","verbose")) } - if (verbose) - cat(paste("--- Precompute and serialize synchrones CWT\n", sep="")) - - # Split indices by packets of length at most nb_cwt_per_chunk - indices_cwt <- .splitIndices(seq_len(n), nb_cwt_per_chunk) - ignored <- - if (parll) - parallel::parLapply(cl, indices_cwt, computeSaveCWT) - else - lapply(indices_cwt, computeSaveCWT) - if (verbose) cat(paste("--- Compute WER distances\n", sep="")) diff --git a/epclust/R/main.R b/epclust/R/main.R index 6d3c842..e3fa807 100644 --- a/epclust/R/main.R +++ b/epclust/R/main.R @@ -71,6 +71,7 @@ #' @param endian Endianness for (de)serialization: "little" or "big" #' @param verbose FALSE: nothing printed; TRUE: some execution traces #' @param parll TRUE: run in parallel. FALSE: run sequentially +#' @param reuse_bin Re-use previously stored binary series and contributions #' #' @return A list: #' \itemize{ @@ -95,21 +96,24 @@ #' library(wmtsa) #' series <- do.call( cbind, lapply( 1:6, function(i) #' do.call(cbind, wmtsa::wavBootstrap(ref_series[,i], n.realization=40)) ) ) +#' # Mix series so that all groups are evenly spread +#' permut <- (0:239)%%6 * 40 + (0:239)%/%6 + 1 +#' series = series[,permut] #' #dim(series) #c(240,1001) -#' res_ascii <- claws(series, K1=30, K2=6, 100, verbose=TRUE) +#' res_ascii <- claws(series, K1=30, K2=6, 100, random=FALSE, verbose=TRUE) #' #' # Same example, from CSV file #' csv_file <- tempfile(pattern="epclust_series.csv_") #' write.table(t(series), csv_file, sep=",", row.names=FALSE, col.names=FALSE) -#' res_csv <- claws(csv_file, K1=30, K2=6, 100) +#' res_csv <- claws(csv_file, K1=30, K2=6, 100, random=FALSE) #' #' # Same example, from binary file #' bin_file <- tempfile(pattern="epclust_series.bin_") #' nbytes <- 8 #' endian <- "little" -#' binarize(csv_file, bin_file, 500, nbytes, endian) +#' binarize(csv_file, bin_file, 500, ",", nbytes, endian) #' getSeries <- function(indices) getDataInFile(indices, bin_file, nbytes, endian) -#' res_bin <- claws(getSeries, K1=30, K2=6, 100) +#' res_bin <- claws(getSeries, K1=30, K2=6, 100, random=FALSE) #' unlink(csv_file) #' unlink(bin_file) #' @@ -117,37 +121,39 @@ #' library(DBI) #' series_db <- dbConnect(RSQLite::SQLite(), "file::memory:") #' # Prepare data.frame in DB-format -#' n <- nrow(series) -#' time_values <- data.frame( -#' id <- rep(1:n,each=L), -#' time <- rep( as.POSIXct(1800*(0:n),"GMT",origin="2001-01-01"), L ), -#' value <- as.double(t(series)) ) +#' n <- ncol(series) +#' times_values <- data.frame( +#' id = rep(1:n,each=L), +#' time = rep( as.POSIXct(1800*(1:L),"GMT",origin="2001-01-01"), n ), +#' value = as.double(series) ) #' dbWriteTable(series_db, "times_values", times_values) #' # Fill associative array, map index to identifier #' indexToID_inDB <- as.character( -#' dbGetQuery(series_db, 'SELECT DISTINCT id FROM time_values')[,"id"] ) +#' dbGetQuery(series_db, 'SELECT DISTINCT id FROM times_values')[,"id"] ) #' serie_length <- as.integer( dbGetQuery(series_db, -#' paste("SELECT COUNT * FROM time_values WHERE id == ",indexToID_inDB[1],sep="")) ) +#' paste("SELECT COUNT(*) FROM times_values WHERE id == ",indexToID_inDB[1],sep="")) ) #' getSeries <- function(indices) { #' request <- "SELECT id,value FROM times_values WHERE id in (" -#' for (i in indices) -#' request <- paste(request, indexToID_inDB[i], ",", sep="") +#' for (i in seq_along(indices)) { +#' request <- paste(request, indexToID_inDB[ indices[i] ], sep="") +#' if (i < length(indices)) +#' request <- paste(request, ",", sep="") +#' } #' request <- paste(request, ")", sep="") #' df_series <- dbGetQuery(series_db, request) -#' if (length(df_series) >= 1) -#' as.matrix(df_series[,"value"], nrow=serie_length) +#' if (nrow(df_series) >= 1) +#' matrix(df_series[,"value"], nrow=serie_length) #' else #' NULL #' } -#' res_db <- claws(getSeries, K1=30, K2=6, 100)) +#' # reuse_bin==FALSE: DB do not garantee ordering +#' res_db <- claws(getSeries, K1=30, K2=6, 100, random=FALSE, reuse_bin=FALSE) #' dbDisconnect(series_db) #' -#' # All results should be the same: -#' library(digest) -#' digest::sha1(res_ascii) -#' digest::sha1(res_csv) -#' digest::sha1(res_bin) -#' digest::sha1(res_db) +#' # All results should be equal: +#' all(res_ascii$ranks == res_csv$ranks +#' & res_ascii$ranks == res_bin$ranks +#' & res_ascii$ranks == res_db$ranks) #' } #' @export claws <- function(series, K1, K2, nb_series_per_chunk, nb_items_clust=7*K1, @@ -155,8 +161,13 @@ claws <- function(series, K1, K2, nb_series_per_chunk, nb_items_clust=7*K1, algoClust2=function(dists,K) cluster::pam(dists,K,diss=TRUE,pamonce=1)$id.med, wav_filt="d8", contrib_type="absolute", WER="end", smooth_lvl=3, nvoice=4, random=TRUE, ntasks=1, ncores_tasks=1, ncores_clust=3, sep=",", nbytes=4, - endian=.Platform$endian, verbose=FALSE, parll=TRUE) + endian=.Platform$endian, verbose=FALSE, parll=TRUE, reuse_bin=TRUE) { + + +#TODO: comprendre differences.......... debuguer getSeries for DB + + # Check/transform arguments if (!is.matrix(series) && !bigmemory::is.big.matrix(series) && !is.function(series) @@ -195,8 +206,11 @@ claws <- function(series, K1, K2, nb_series_per_chunk, nb_items_clust=7*K1, if (verbose) cat("...Serialize time-series (or retrieve past binary file)\n") series_file <- ".series.epclust.bin" - if (!file.exists(series_file)) + if (!file.exists(series_file) || !reuse_bin) + { + unlink(series_file,) binarize(series, series_file, nb_series_per_chunk, sep, nbytes, endian) + } getSeries <- function(inds) getDataInFile(inds, series_file, nbytes, endian) } else @@ -204,12 +218,11 @@ claws <- function(series, K1, K2, nb_series_per_chunk, nb_items_clust=7*K1, # Serialize all computed wavelets contributions into a file contribs_file <- ".contribs.epclust.bin" - index <- 1 - nb_curves <- 0 if (verbose) cat("...Compute contributions and serialize them (or retrieve past binary file)\n") - if (!file.exists(contribs_file)) + if (!file.exists(contribs_file) || !reuse_bin) { + unlink(contribs_file,) nb_curves <- binarizeTransform(getSeries, function(curves) curvesToContribs(curves, wav_filt, contrib_type), contribs_file, nb_series_per_chunk, nbytes, endian) @@ -305,18 +318,13 @@ claws <- function(series, K1, K2, nb_series_per_chunk, nb_items_clust=7*K1, # it's better to just re-use ncores_clust ncores_last_stage <- ncores_clust - - -#TODO: here, save all inputs to clusteringTask2 and compare :: must have differences... - - - # Run last clustering tasks to obtain only K2 medoids indices if (verbose) cat("...Run final // stage 1 + stage 2\n") indices_medoids <- clusteringTask1(indices_medoids_all, getContribs, K1, algoClust1, nb_items_clust, ncores_tasks*ncores_clust, verbose, parll) - indices_medoids <- clusteringTask2(indices_medoids, getContribs, K2, algoClust2, + + indices_medoids <- clusteringTask2(indices_medoids, getSeries, K2, algoClust2, nb_series_per_chunk,smooth_lvl,nvoice,nbytes,endian,ncores_last_stage,verbose,parll) # Compute synchrones, that is to say the cumulated power consumptions for each of the K2