X-Git-Url: https://git.auder.net/?a=blobdiff_plain;f=code%2Fdraft_R_pkg%2FR%2Fmain.R;h=6746d88a3d14b3e83b53f97c39b70d0d3ab5cfd2;hb=aa7daeaacfda268c392adf1c5efbccea77be9fe0;hp=3411720a01c234dce67a3f68203673be0b2fd543;hpb=3dcbfeef0dc92444287dd78a16c80e58a98a6ee7;p=epclust.git diff --git a/code/draft_R_pkg/R/main.R b/code/draft_R_pkg/R/main.R index 3411720..6746d88 100644 --- a/code/draft_R_pkg/R/main.R +++ b/code/draft_R_pkg/R/main.R @@ -1,39 +1,161 @@ -#TODO: setRefClass... to avoid copy data !! -#http://stackoverflow.com/questions/2603184/r-pass-by-reference - -#fields: data (can be NULL or provided by user), coeffs (will be computed -#con can be a character string naming a file; see readLines() -#data can be in DB format, on one column : TODO: guess (from header, or col. length...) -epclust = function(data=NULL, con=NULL, raw=FALSE, K, nbPerChunk, ..., where_to_store_tmp_data, and how ?) -#options for tmp files: in RAM, on disk, on DB (can be distributed) -{ +#' @include defaults.R +#' @title Cluster power curves with PAM in parallel +#' +#' @description Groups electricity power curves (or any series of similar nature) by applying PAM +#' algorithm in parallel to chunks of size \code{nb_series_per_chunk} +#' +#' @param data Access to the data, which can be of one of the three following types: +#' \itemize{ +#' \item data.frame: each line contains its ID in the first cell, and all values after +#' \item connection: any R connection object (e.g. a file) providing lines as described above +#' \item function: a custom way to retrieve the curves; it has two arguments: the start index +#' (start) and number of curves (n); see example in package vignette. +#' } +#' @param K Number of clusters +#' @param nb_series_per_chunk (Maximum) number of series in each group +#' @param min_series_per_chunk Minimum number of series in each group +#' @param writeTmp Function to write temporary wavelets coefficients (+ identifiers); +#' see defaults in defaults.R +#' @param readTmp Function to read temporary wavelets coefficients (see defaults.R) +#' @param wf Wavelet transform filter; see ?wt.filter. Default: haar +#' @param WER "end" to apply stage 2 after stage 1 has iterated and finished, or "mix" +#' to apply it after every stage 1 +#' @param ncores number of parallel processes; if NULL, use parallel::detectCores() +#' +#' @return A data.frame of the final medoids curves (identifiers + values) +epclust = function(data, K, nb_series_per_chunk, min_series_per_chunk=10*K, + writeTmp=defaultWriteTmp, readTmp=defaultReadTmp, wf="haar", WER="end", ncores=NULL) +{ + #TODO: setRefClass(...) to avoid copy data: + #http://stackoverflow.com/questions/2603184/r-pass-by-reference - #on input: can be data or con; data handled by writing it to file (ascii or bin ?!), + #0) check arguments + if (!is.data.frame(data) && !is.function(data)) + tryCatch( + { + if (is.character(data)) + { + data_con = file(data, open="r") + } else if (!isOpen(data)) + { + open(data) + data_con = data + } + }, + error="data should be a data.frame, a function or a valid connection") + if (!is.integer(K) || K < 2) + stop("K should be an integer greater or equal to 2") + if (!is.integer(nb_series_per_chunk) || nb_series_per_chunk < K) + stop("nb_series_per_chunk should be an integer greater or equal to K") + if (!is.function(writeTmp) || !is.function(readTmp)) + stop("read/writeTmp should be functional (see defaults.R)") + if (WER!="end" && WER!="mix") + stop("WER takes values in {'end','mix'}") + #concerning ncores, any non-integer type will be treated as "use parallel:detectCores()" + #1) acquire data (process curves, get as coeffs) + #TODO: for data.frame and custom function, run in parallel (connections are sequential[?!]) + index = 1 + nb_curves = 0 + repeat + { + coeffs_chunk = NULL + if (is.data.frame(data)) + { + #full data matrix + if (index < nrow(data)) + { + coeffs_chunk = curvesToCoeffs( + data[index:(min(index+nb_series_per_chunk-1,nrow(data))),], wf) + } + } else if (is.function(data)) + { + #custom user function to retrieve next n curves, probably to read from DB + coeffs_chunk = curvesToCoeffs( data(index, nb_series_per_chunk), wf ) + } else + { + #incremental connection + #TODO: find a better way to parse than using a temp file + ascii_lines = readLines(data_con, nb_series_per_chunk) + if (length(ascii_lines > 0)) + { + series_chunk_file = ".tmp/series_chunk" + writeLines(ascii_lines, series_chunk_file) + coeffs_chunk = curvesToCoeffs( read.csv(series_chunk_file), wf ) + } + } + if (is.null(coeffs_chunk)) + break + writeTmp(coeffs_chunk) + nb_curves = nb_curves + nrow(coeffs_chunk) + index = index + nb_series_per_chunk + } + if (exists(data_con)) + close(data_con) + if (nb_curves < min_series_per_chunk) + stop("Not enough data: less rows than min_series_per_chunk!") - if (!is.null(data)) + #2) process coeffs (by nb_series_per_chunk) and cluster them in parallel + library(parallel) + ncores = ifelse(is.integer(ncores), ncores, parallel::detectCores()) + cl = parallel::makeCluster(ncores) + parallel::clusterExport(cl=cl, varlist=c("TODO:", "what", "to", "export?"), envir=environment()) + #TODO: be careful of writing to a new temp file, then flush initial one, then re-use it... + repeat { - #full data matrix - index = 1 - n = nrow(data) - while (index < n) + #while there is jobs to do (i.e. size of tmp "file" is greater than nb_series_per_chunk) + nb_workers = nb_curves %/% nb_series_per_chunk + indices = list() + #indices[[i]] == (start_index,number_of_elements) + for (i in 1:nb_workers) + indices[[i]] = c(nb_series_per_chunk*(i-1)+1, nb_series_per_chunk) + remainder = nb_curves %% nb_series_per_chunk + if (remainder >= min_series_per_chunk) + { + nb_workers = nb_workers + 1 + indices[[nb_workers]] = c(nb_curves-remainder+1, nb_curves) + } else if (remainder > 0) { - getCoeffs(data - index = index + nbSeriesPerChunk + #spread the load among other workers + #... } - } else if (!is.null(con)) + li = parallel::parLapply(cl, indices, processChunk, K, WER=="mix") + #C) flush tmp file (current parallel processes will write in it) + } + parallel::stopCluster(cl) + + #3) readTmp last results, apply PAM on it, and return medoids + identifiers + final_coeffs = readTmp(1, nb_series_per_chunk) + if (nrow(final_coeffs) == K) { - #incremental connection - #read it one by one and get coeffs until nbSeriesPerChunk - #then launch a clustering task............ - readLines() - } else - stop("at least 'data' or 'con' argument must be present") + return ( list( medoids=coeffsToCurves(final_coeffs[,2:ncol(final_coeffs)]), + ids=final_coeffs[,1] ) ) + } + pam_output = getClusters(as.matrix(final_coeffs[,2:ncol(final_coeffs)]), K) + medoids = coeffsToCurves(pam_output$medoids, wf) + ids = final_coeffs[,1] [pam_output$ranks] + #4) apply stage 2 (in parallel ? inside task 2) ?) + if (WER == "end") + { + #from center curves, apply stage 2... + #TODO: + } + + return (list(medoids=medoids, ids=ids)) } -getCoeffs = function(series) +processChunk = function(indice, K, WER) { - #... return wavelets coeffs : compute in parallel ! + #1) retrieve data + coeffs = readTmp(indice[1], indice[2]) + #2) cluster + cl = getClusters(as.matrix(coeffs[,2:ncol(coeffs)]), K) + #3) WER (optional) + #TODO: } + +#TODO: difficulté : retrouver courbe à partir de l'identifiant (DB ok mais le reste ?) +#aussi : que passe-t-on aux noeuds ? curvesToCoeffs en // ? +#enfin : WER ?!