From 7f0781b723158cf8e6b25dcab0bae18acae40be8 Mon Sep 17 00:00:00 2001 From: Benjamin Auder Date: Mon, 9 Jan 2017 11:32:45 +0100 Subject: [PATCH] progress on main.R --- code/draft_R_pkg/R/defaults.R | 4 ++ code/draft_R_pkg/R/main.R | 109 ++++++++++++++++++++-------------- 2 files changed, 69 insertions(+), 44 deletions(-) create mode 100644 code/draft_R_pkg/R/defaults.R diff --git a/code/draft_R_pkg/R/defaults.R b/code/draft_R_pkg/R/defaults.R new file mode 100644 index 0000000..7880ddd --- /dev/null +++ b/code/draft_R_pkg/R/defaults.R @@ -0,0 +1,4 @@ +#TODO: ascii format (default) (+ binary format?) +writeTmp(curves [uncompressed coeffs, limited number - nbSeriesPerChunk], last=FALSE) #if last=TRUE, close the conn +readTmp(..., from index, n curves) #careful: connection must remain open + diff --git a/code/draft_R_pkg/R/main.R b/code/draft_R_pkg/R/main.R index c3e4b54..695b928 100644 --- a/code/draft_R_pkg/R/main.R +++ b/code/draft_R_pkg/R/main.R @@ -1,58 +1,79 @@ -#TODO: setRefClass... to avoid copy data !! -#http://stackoverflow.com/questions/2603184/r-pass-by-reference +#' @include defaults.R -#fields: data (can be NULL or provided by user), coeffs (will be computed -#con can be a character string naming a file; see readLines() -#data can be in DB format, on one column : TODO: guess (from header, or col. length...) - - -writeTmp(curves [uncompressed coeffs, limited number - nbSeriesPerChunk], last=FALSE) #if last=TRUE, close the conn -readTmp(..., from index, n curves) #careful: connection must remain open -#TODO: write read/write tmp reference ( on file in .tmp/ folder ... ) - -#data: -#stop("Unrecognizable 'data' argument (must be numeric, functional or connection)") - -#WER: "end" to apply stage 2 after stage 1 iterated, "mix" (or anything else...?!) to apply it after every stage 1 -epclust = function(data, K, nbPerChunk, WER="end", ncores=NULL, writeTmp=ref_writeTmp, readTmp=ref_readTmp) #where to put/retrieve intermediate results; if not provided, use file on disk +#' @title Cluster power curves with PAM in parallel +#' +#' @description Groups electricity power curves (or any series of similar nature) by applying PAM +#' algorithm in parallel to chunks of size \code{nbSeriesPerChunk} +#' +#' @param data Access to the data, which can be of one of the three following types: +#' \itemize{ +#' \item data.frame: each line contains its ID in the first cell, and all values after +#' \item connection: any R connection object (e.g. a file) providing lines as described above +#' \item function: a custom way to retrieve the curves; it has two arguments: the start index +#' (start) and number of curves (n); see example in package vignette. +#' } +#' @param K Number of clusters +#' @param nbSeriesPerChunk Number of series in each group +#' @param writeTmp Function to write temporary wavelets coefficients (+ identifiers); +#' see defaults in defaults.R +#' @param readTmp Function to read temporary wavelets coefficients (see defaults.R) +#' @param WER "end" to apply stage 2 after stage 1 has iterated and finished, or "mix" +#' to apply it after every stage 1 +#' @param ncores number of parallel processes; if NULL, use parallel::detectCores() +#' +#' @return A data.frame of the final medoids curves (identifiers + values) +epclust = function(data, K, nbSeriesPerChunk, writeTmp=ref_writeTmp, readTmp=ref_readTmp, + WER="end", ncores=NULL) { + #TODO: setRefClass(...) to avoid copy data: + #http://stackoverflow.com/questions/2603184/r-pass-by-reference - - #on input: can be data or con; data handled by writing it to file (ascii or bin ?!), -#data: con or matrix or DB + #0) check arguments + if (!is.data.frame(data) && !is.function(data)) + tryCatch({dataCon = open(data)}, + error="data should be a data.frame, a function or a valid connection") + if (!is.integer(K) || K < 2) + stop("K should be an integer greater or equal to 2") + if (!is.integer(nbSeriesPerChunk) || nbSeriesPerChunk < K) + stop("nbSeriesPerChunk should be an integer greater or equal to K") + if (!is.function(writeTmp) || !is.function(readTmp)) + stop("read/writeTmp should be functional (see defaults.R)") + if (WER!="end" && WER!="mix") + stop("WER takes values in {'end','mix'}") + #concerning ncores, any non-integer type will be treated as "use parallel:detectCores()" #1) acquire data (process curves, get as coeffs) - if (is.numeric(data)) + index = 1 + nbCurves = nrow(data) + while (index < nbCurves) { - #full data matrix - index = 1 - n = nrow(data) - while (index < n) + if (is.data.frame(data)) { - writeTmp( getCoeffs(data) ) - index = index + nbSeriesPerChunk + #full data matrix + writeTmp( getCoeffs( data[index:(min(index+nbSeriesPerChunk-1,nbCurves)),] ) ) + } else if (is.function(data)) + { + #custom user function to retrieve next n curves, probably to read from DB + writeTmp( getCoeffs( data(index, nbSeriesPerChunk) ) ) + } else + { + #incremental connection + #TODO: find a better way to parse than using a temp file + ascii_lines = readLines(dataCon, nbSeriesPerChunk) + seriesChunkFile = ".tmp/seriesChunk" + writeLines(ascii_lines, seriesChunkFile) + writeTmp( getCoeffs( read.csv(seriesChunkFile) ) ) } - } else if (is.function(data)) - { - #custom user function to retrieve next n curves, probably to read from DB - writeTmp( getCoeffs( data(nbPerChunk) ) ) - } else - { - #incremental connection - #read it one by one and get coeffs until nbSeriesPerChunk - #then launch a clustering task............ - #TODO: find a better way to parse than using a temp file - ascii_lines = readLines(data, nbSeriesPerChunk) - seriesChunkFile = ".tmp/seriesChunk" - writeLines(ascii_lines, seriesChunkFile) - writeTmp( getCoeffs( read.csv(seriesChunkFile) ) ) + index = index + nbSeriesPerChunk } + if (exists(dataCon)) + close(dataCon) library(parallel) - ncores = ifelse(is.numeric(ncores), ncores, parallel::detectCores()) + ncores = ifelse(is.integer(ncores), ncores, parallel::detectCores()) cl = parallel::makeCluster(ncores) -115 parallel::clusterExport(cl=cl, varlist=c("X", "Y", "K", "p"), envir=environment()) -116 li = parallel::parLapply(cl, 1:B, getParamsAtIndex) + parallel::clusterExport(cl=cl, varlist=c("X", "Y", "K", "p"), envir=environment()) + li = parallel::parLapply(cl, 1:B, getParamsAtIndex) #2) process coeffs (by nbSeriesPerChunk) and cluster in parallel (just launch async task, wait for them to complete, and re-do if necessary) repeat @@ -65,7 +86,7 @@ epclust = function(data, K, nbPerChunk, WER="end", ncores=NULL, writeTmp=ref_wri #always check "complete" flag (array, as I did in MPI) to know if "slaves" finished } -parallel::stopCluster(cl) + parallel::stopCluster(cl) #3) readTmp last results, apply PAM on it, and return medoids + identifiers -- 2.44.0