X-Git-Url: https://git.auder.net/pieces/Cwda/n_black_bishop.svg?a=blobdiff_plain;f=code%2Fdraft_R_pkg%2FR%2Fmain.R;h=695b9282945444db8dff267efc291edf2e95dc87;hb=7f0781b723158cf8e6b25dcab0bae18acae40be8;hp=3411720a01c234dce67a3f68203673be0b2fd543;hpb=3dcbfeef0dc92444287dd78a16c80e58a98a6ee7;p=epclust.git diff --git a/code/draft_R_pkg/R/main.R b/code/draft_R_pkg/R/main.R index 3411720..695b928 100644 --- a/code/draft_R_pkg/R/main.R +++ b/code/draft_R_pkg/R/main.R @@ -1,36 +1,100 @@ -#TODO: setRefClass... to avoid copy data !! -#http://stackoverflow.com/questions/2603184/r-pass-by-reference - -#fields: data (can be NULL or provided by user), coeffs (will be computed -#con can be a character string naming a file; see readLines() -#data can be in DB format, on one column : TODO: guess (from header, or col. length...) -epclust = function(data=NULL, con=NULL, raw=FALSE, K, nbPerChunk, ..., where_to_store_tmp_data, and how ?) -#options for tmp files: in RAM, on disk, on DB (can be distributed) -{ - +#' @include defaults.R - #on input: can be data or con; data handled by writing it to file (ascii or bin ?!), +#' @title Cluster power curves with PAM in parallel +#' +#' @description Groups electricity power curves (or any series of similar nature) by applying PAM +#' algorithm in parallel to chunks of size \code{nbSeriesPerChunk} +#' +#' @param data Access to the data, which can be of one of the three following types: +#' \itemize{ +#' \item data.frame: each line contains its ID in the first cell, and all values after +#' \item connection: any R connection object (e.g. a file) providing lines as described above +#' \item function: a custom way to retrieve the curves; it has two arguments: the start index +#' (start) and number of curves (n); see example in package vignette. +#' } +#' @param K Number of clusters +#' @param nbSeriesPerChunk Number of series in each group +#' @param writeTmp Function to write temporary wavelets coefficients (+ identifiers); +#' see defaults in defaults.R +#' @param readTmp Function to read temporary wavelets coefficients (see defaults.R) +#' @param WER "end" to apply stage 2 after stage 1 has iterated and finished, or "mix" +#' to apply it after every stage 1 +#' @param ncores number of parallel processes; if NULL, use parallel::detectCores() +#' +#' @return A data.frame of the final medoids curves (identifiers + values) +epclust = function(data, K, nbSeriesPerChunk, writeTmp=ref_writeTmp, readTmp=ref_readTmp, + WER="end", ncores=NULL) +{ + #TODO: setRefClass(...) to avoid copy data: + #http://stackoverflow.com/questions/2603184/r-pass-by-reference + #0) check arguments + if (!is.data.frame(data) && !is.function(data)) + tryCatch({dataCon = open(data)}, + error="data should be a data.frame, a function or a valid connection") + if (!is.integer(K) || K < 2) + stop("K should be an integer greater or equal to 2") + if (!is.integer(nbSeriesPerChunk) || nbSeriesPerChunk < K) + stop("nbSeriesPerChunk should be an integer greater or equal to K") + if (!is.function(writeTmp) || !is.function(readTmp)) + stop("read/writeTmp should be functional (see defaults.R)") + if (WER!="end" && WER!="mix") + stop("WER takes values in {'end','mix'}") + #concerning ncores, any non-integer type will be treated as "use parallel:detectCores()" - if (!is.null(data)) + #1) acquire data (process curves, get as coeffs) + index = 1 + nbCurves = nrow(data) + while (index < nbCurves) { - #full data matrix - index = 1 - n = nrow(data) - while (index < n) + if (is.data.frame(data)) + { + #full data matrix + writeTmp( getCoeffs( data[index:(min(index+nbSeriesPerChunk-1,nbCurves)),] ) ) + } else if (is.function(data)) + { + #custom user function to retrieve next n curves, probably to read from DB + writeTmp( getCoeffs( data(index, nbSeriesPerChunk) ) ) + } else { - getCoeffs(data - index = index + nbSeriesPerChunk + #incremental connection + #TODO: find a better way to parse than using a temp file + ascii_lines = readLines(dataCon, nbSeriesPerChunk) + seriesChunkFile = ".tmp/seriesChunk" + writeLines(ascii_lines, seriesChunkFile) + writeTmp( getCoeffs( read.csv(seriesChunkFile) ) ) } - } else if (!is.null(con)) + index = index + nbSeriesPerChunk + } + if (exists(dataCon)) + close(dataCon) + + library(parallel) + ncores = ifelse(is.integer(ncores), ncores, parallel::detectCores()) + cl = parallel::makeCluster(ncores) + parallel::clusterExport(cl=cl, varlist=c("X", "Y", "K", "p"), envir=environment()) + li = parallel::parLapply(cl, 1:B, getParamsAtIndex) + + #2) process coeffs (by nbSeriesPerChunk) and cluster in parallel (just launch async task, wait for them to complete, and re-do if necessary) + repeat { - #incremental connection - #read it one by one and get coeffs until nbSeriesPerChunk - #then launch a clustering task............ - readLines() - } else - stop("at least 'data' or 'con' argument must be present") + completed = rep(FALSE, ............) + #while there is jobs to do (i.e. size of tmp "file" is greater than nbSeriesPerChunk), + #A) determine which tasks which processor will do (OK) + #B) send each (sets of) tasks in parallel + #C) flush tmp file (current parallel processes will write in it) + #always check "complete" flag (array, as I did in MPI) to know if "slaves" finished + } + + parallel::stopCluster(cl) + #3) readTmp last results, apply PAM on it, and return medoids + identifiers + + #4) apply stage 2 (in parallel ? inside task 2) ?) + if (WER == "end") + { + #from center curves, apply stage 2... + } } getCoeffs = function(series)