X-Git-Url: https://git.auder.net/?p=epclust.git;a=blobdiff_plain;f=epclust%2FR%2Fmain.R;h=867843b813ff561f17588cb2ec0a18b7e35b0329;hp=eded9523e4c2cc1c8316ed01617e35dfa13c6b18;hb=1c6f223e4dc7f7f022fd18b1c99deff0da022387;hpb=dc1aa85a96bbf815b0d896c22a9b4a539a9e8a9c diff --git a/epclust/R/main.R b/epclust/R/main.R index eded952..867843b 100644 --- a/epclust/R/main.R +++ b/epclust/R/main.R @@ -12,8 +12,11 @@ #' \item function: a custom way to retrieve the curves; it has two arguments: the start index #' (start) and number of curves (n); see example in package vignette. #' } -#' @param K Number of clusters -#' @param nb_series_per_chunk (Maximum) number of series in each group +#' @param K1 Number of super-consumers to be found after stage 1 (K1 << N) +#' @param K2 Number of clusters to be found after stage 2 (K2 << K1) +#' @param ntasks Number of tasks (parallel iterations to obtain K1 medoids); default: 1. +#' Note: ntasks << N, so that N is "roughly divisible" by N (number of series) +#' @param nb_series_per_chunk (Maximum) number of series in each group, inside a task #' @param min_series_per_chunk Minimum number of series in each group #' @param writeTmp Function to write temporary wavelets coefficients (+ identifiers); #' see defaults in defaults.R @@ -21,11 +24,25 @@ #' @param wf Wavelet transform filter; see ?wt.filter. Default: haar #' @param WER "end" to apply stage 2 after stage 1 has iterated and finished, or "mix" #' to apply it after every stage 1 -#' @param ncores number of parallel processes; if NULL, use parallel::detectCores() +#' @param ncores_tasks number of parallel tasks (1 to disable: sequential tasks) +#' @param ncores_clust number of parallel clusterings in one task #' #' @return A data.frame of the final medoids curves (identifiers + values) -epclust = function(data, K, nb_series_per_chunk, min_series_per_chunk=10*K, - writeTmp=defaultWriteTmp, readTmp=defaultReadTmp, wf="haar", WER="end", ncores=NULL) +#' +#' @examples +#' getData = function(start, n) { +#' con = dbConnect(drv = RSQLite::SQLite(), dbname = "mydata.sqlite") +#' df = dbGetQuery(con, paste( +#' "SELECT * FROM times_values GROUP BY id OFFSET ",start, +#' "LIMIT ", n, " ORDER BY date", sep="")) +#' return (df) +#' } +#' cl = epclust(getData, K1=200, K2=15, ntasks=1000, nb_series_per_chunk=5000, WER="mix") +#' @export +epclust = function(data, K1, K2, + ntasks=1, nb_series_per_chunk=50*K1, min_series_per_chunk=5*K1, + writeTmp=defaultWriteTmp, readTmp=defaultReadTmp, wf="haar", WER="end", + ncores_tasks=1, ncores_clust=4) { #TODO: setRefClass(...) to avoid copy data: #http://stackoverflow.com/questions/2603184/r-pass-by-reference @@ -98,32 +115,37 @@ epclust = function(data, K, nb_series_per_chunk, min_series_per_chunk=10*K, #2) process coeffs (by nb_series_per_chunk) and cluster them in parallel library(parallel) - ncores = ifelse(is.integer(ncores), ncores, parallel::detectCores()%/%4) - cl = parallel::makeCluster(ncores) - parallel::clusterExport(cl=cl, varlist=c("TODO:", "what", "to", "export?"), envir=environment()) + cl_tasks = parallel::makeCluster(ncores_tasks) + #Nothing to export because each worker retrieve and put data from/on files (or DB) + #parallel::clusterExport(cl=cl, varlist=c("nothing","to","export"), envir=environment()) #TODO: be careful of writing to a new temp file, then flush initial one, then re-use it... - repeat - { - #while there is jobs to do (i.e. size of tmp "file" is greater than nb_series_per_chunk) - nb_workers = nb_curves %/% nb_series_per_chunk - indices = list() - #indices[[i]] == (start_index,number_of_elements) - for (i in 1:nb_workers) - indices[[i]] = c(nb_series_per_chunk*(i-1)+1, nb_series_per_chunk) - remainder = nb_curves %% nb_series_per_chunk - if (remainder >= min_series_per_chunk) - { - nb_workers = nb_workers + 1 - indices[[nb_workers]] = c(nb_curves-remainder+1, nb_curves) - } else if (remainder > 0) + res_tasks = parallel::parSapply(cl_tasks, 1:ntasks, function() { + cl_clust = parallel::makeCluster(ncores_clust) + repeat { - #spread the load among other workers - #... + #while there are jobs to do + #(i.e. size of tmp "file" is greater than ntasks * nb_series_per_chunk) + nb_workers = nb_curves %/% nb_series_per_chunk + indices = list() + #indices[[i]] == (start_index,number_of_elements) + for (i in 1:nb_workers) + indices[[i]] = c(nb_series_per_chunk*(i-1)+1, nb_series_per_chunk) + remainder = nb_curves %% nb_series_per_chunk + if (remainder >= min_series_per_chunk) + { + nb_workers = nb_workers + 1 + indices[[nb_workers]] = c(nb_curves-remainder+1, nb_curves) + } else if (remainder > 0) + { + #spread the load among other workers + #... + } + res_clust = parallel::parSapply(cl, indices, processChunk, K, WER=="mix") + #C) flush tmp file (current parallel processes will write in it) } - li = parallel::parLapply(cl, indices, processChunk, K, WER=="mix") - #C) flush tmp file (current parallel processes will write in it) - } - parallel::stopCluster(cl) + parallel:stopCluster(cl_clust) + }) + parallel::stopCluster(cl_tasks) #3) readTmp last results, apply PAM on it, and return medoids + identifiers final_coeffs = readTmp(1, nb_series_per_chunk)