X-Git-Url: https://git.auder.net/?a=blobdiff_plain;f=code%2Fdraft_R_pkg%2FR%2Fmain.R;h=6746d88a3d14b3e83b53f97c39b70d0d3ab5cfd2;hb=aa7daeaacfda268c392adf1c5efbccea77be9fe0;hp=4120b392e22fc3713b8e3418afa243e33e4e69ea;hpb=ac1d423158fc0f625af1d3ab8b0a509fc3ae015c;p=epclust.git diff --git a/code/draft_R_pkg/R/main.R b/code/draft_R_pkg/R/main.R index 4120b39..6746d88 100644 --- a/code/draft_R_pkg/R/main.R +++ b/code/draft_R_pkg/R/main.R @@ -1,26 +1,161 @@ -epclust = function(data=NULL, con=NULL, raw=FALSE, K, nbPerChunk, ...) -{ - - -#TODO: just a wrapper which calls ppam.exe (system("...")) and reads output (binary) file to retrieve medoids + IDs - #on input: can be data or con; data handled by writing it to file (ascii or bin ?!), - #con handled +#' @include defaults.R +#' @title Cluster power curves with PAM in parallel +#' +#' @description Groups electricity power curves (or any series of similar nature) by applying PAM +#' algorithm in parallel to chunks of size \code{nb_series_per_chunk} +#' +#' @param data Access to the data, which can be of one of the three following types: +#' \itemize{ +#' \item data.frame: each line contains its ID in the first cell, and all values after +#' \item connection: any R connection object (e.g. a file) providing lines as described above +#' \item function: a custom way to retrieve the curves; it has two arguments: the start index +#' (start) and number of curves (n); see example in package vignette. +#' } +#' @param K Number of clusters +#' @param nb_series_per_chunk (Maximum) number of series in each group +#' @param min_series_per_chunk Minimum number of series in each group +#' @param writeTmp Function to write temporary wavelets coefficients (+ identifiers); +#' see defaults in defaults.R +#' @param readTmp Function to read temporary wavelets coefficients (see defaults.R) +#' @param wf Wavelet transform filter; see ?wt.filter. Default: haar +#' @param WER "end" to apply stage 2 after stage 1 has iterated and finished, or "mix" +#' to apply it after every stage 1 +#' @param ncores number of parallel processes; if NULL, use parallel::detectCores() +#' +#' @return A data.frame of the final medoids curves (identifiers + values) +epclust = function(data, K, nb_series_per_chunk, min_series_per_chunk=10*K, + writeTmp=defaultWriteTmp, readTmp=defaultReadTmp, wf="haar", WER="end", ncores=NULL) +{ + #TODO: setRefClass(...) to avoid copy data: + #http://stackoverflow.com/questions/2603184/r-pass-by-reference - #options for tmp files: in RAM, on disk, on DB (can be distributed) + #0) check arguments + if (!is.data.frame(data) && !is.function(data)) + tryCatch( + { + if (is.character(data)) + { + data_con = file(data, open="r") + } else if (!isOpen(data)) + { + open(data) + data_con = data + } + }, + error="data should be a data.frame, a function or a valid connection") + if (!is.integer(K) || K < 2) + stop("K should be an integer greater or equal to 2") + if (!is.integer(nb_series_per_chunk) || nb_series_per_chunk < K) + stop("nb_series_per_chunk should be an integer greater or equal to K") + if (!is.function(writeTmp) || !is.function(readTmp)) + stop("read/writeTmp should be functional (see defaults.R)") + if (WER!="end" && WER!="mix") + stop("WER takes values in {'end','mix'}") + #concerning ncores, any non-integer type will be treated as "use parallel:detectCores()" + #1) acquire data (process curves, get as coeffs) + #TODO: for data.frame and custom function, run in parallel (connections are sequential[?!]) + index = 1 + nb_curves = 0 + repeat + { + coeffs_chunk = NULL + if (is.data.frame(data)) + { + #full data matrix + if (index < nrow(data)) + { + coeffs_chunk = curvesToCoeffs( + data[index:(min(index+nb_series_per_chunk-1,nrow(data))),], wf) + } + } else if (is.function(data)) + { + #custom user function to retrieve next n curves, probably to read from DB + coeffs_chunk = curvesToCoeffs( data(index, nb_series_per_chunk), wf ) + } else + { + #incremental connection + #TODO: find a better way to parse than using a temp file + ascii_lines = readLines(data_con, nb_series_per_chunk) + if (length(ascii_lines > 0)) + { + series_chunk_file = ".tmp/series_chunk" + writeLines(ascii_lines, series_chunk_file) + coeffs_chunk = curvesToCoeffs( read.csv(series_chunk_file), wf ) + } + } + if (is.null(coeffs_chunk)) + break + writeTmp(coeffs_chunk) + nb_curves = nb_curves + nrow(coeffs_chunk) + index = index + nb_series_per_chunk + } + if (exists(data_con)) + close(data_con) + if (nb_curves < min_series_per_chunk) + stop("Not enough data: less rows than min_series_per_chunk!") + #2) process coeffs (by nb_series_per_chunk) and cluster them in parallel + library(parallel) + ncores = ifelse(is.integer(ncores), ncores, parallel::detectCores()) + cl = parallel::makeCluster(ncores) + parallel::clusterExport(cl=cl, varlist=c("TODO:", "what", "to", "export?"), envir=environment()) + #TODO: be careful of writing to a new temp file, then flush initial one, then re-use it... + repeat + { + #while there is jobs to do (i.e. size of tmp "file" is greater than nb_series_per_chunk) + nb_workers = nb_curves %/% nb_series_per_chunk + indices = list() + #indices[[i]] == (start_index,number_of_elements) + for (i in 1:nb_workers) + indices[[i]] = c(nb_series_per_chunk*(i-1)+1, nb_series_per_chunk) + remainder = nb_curves %% nb_series_per_chunk + if (remainder >= min_series_per_chunk) + { + nb_workers = nb_workers + 1 + indices[[nb_workers]] = c(nb_curves-remainder+1, nb_curves) + } else if (remainder > 0) + { + #spread the load among other workers + #... + } + li = parallel::parLapply(cl, indices, processChunk, K, WER=="mix") + #C) flush tmp file (current parallel processes will write in it) + } + parallel::stopCluster(cl) - if (!is.null(data)) + #3) readTmp last results, apply PAM on it, and return medoids + identifiers + final_coeffs = readTmp(1, nb_series_per_chunk) + if (nrow(final_coeffs) == K) { - #full data matrix - - } else if (!is.null(con)) + return ( list( medoids=coeffsToCurves(final_coeffs[,2:ncol(final_coeffs)]), + ids=final_coeffs[,1] ) ) + } + pam_output = getClusters(as.matrix(final_coeffs[,2:ncol(final_coeffs)]), K) + medoids = coeffsToCurves(pam_output$medoids, wf) + ids = final_coeffs[,1] [pam_output$ranks] + + #4) apply stage 2 (in parallel ? inside task 2) ?) + if (WER == "end") { - #incremental connection - #read it one by one and get coeffs until nbSeriesPerChunk - #then launch a clustering task............ - } else - stop("at least 'data' or 'con' argument must be present") + #from center curves, apply stage 2... + #TODO: + } + return (list(medoids=medoids, ids=ids)) } + +processChunk = function(indice, K, WER) +{ + #1) retrieve data + coeffs = readTmp(indice[1], indice[2]) + #2) cluster + cl = getClusters(as.matrix(coeffs[,2:ncol(coeffs)]), K) + #3) WER (optional) + #TODO: +} + +#TODO: difficulté : retrouver courbe à partir de l'identifiant (DB ok mais le reste ?) +#aussi : que passe-t-on aux noeuds ? curvesToCoeffs en // ? +#enfin : WER ?!