X-Git-Url: https://git.auder.net/variants/Dynamo/style.css?a=blobdiff_plain;f=code%2Fdraft_R_pkg%2FR%2Fmain.R;h=c3e4b5401be28ec4ad4262c0f6082610a390aa7a;hb=8e6accca063927f6864a9cc6b01e2e77085bab95;hp=3411720a01c234dce67a3f68203673be0b2fd543;hpb=3dcbfeef0dc92444287dd78a16c80e58a98a6ee7;p=epclust.git diff --git a/code/draft_R_pkg/R/main.R b/code/draft_R_pkg/R/main.R index 3411720..c3e4b54 100644 --- a/code/draft_R_pkg/R/main.R +++ b/code/draft_R_pkg/R/main.R @@ -4,33 +4,76 @@ #fields: data (can be NULL or provided by user), coeffs (will be computed #con can be a character string naming a file; see readLines() #data can be in DB format, on one column : TODO: guess (from header, or col. length...) -epclust = function(data=NULL, con=NULL, raw=FALSE, K, nbPerChunk, ..., where_to_store_tmp_data, and how ?) -#options for tmp files: in RAM, on disk, on DB (can be distributed) + + +writeTmp(curves [uncompressed coeffs, limited number - nbSeriesPerChunk], last=FALSE) #if last=TRUE, close the conn +readTmp(..., from index, n curves) #careful: connection must remain open +#TODO: write read/write tmp reference ( on file in .tmp/ folder ... ) + +#data: +#stop("Unrecognizable 'data' argument (must be numeric, functional or connection)") + +#WER: "end" to apply stage 2 after stage 1 iterated, "mix" (or anything else...?!) to apply it after every stage 1 +epclust = function(data, K, nbPerChunk, WER="end", ncores=NULL, writeTmp=ref_writeTmp, readTmp=ref_readTmp) #where to put/retrieve intermediate results; if not provided, use file on disk { #on input: can be data or con; data handled by writing it to file (ascii or bin ?!), +#data: con or matrix or DB - - if (!is.null(data)) + #1) acquire data (process curves, get as coeffs) + if (is.numeric(data)) { #full data matrix index = 1 n = nrow(data) while (index < n) { - getCoeffs(data + writeTmp( getCoeffs(data) ) index = index + nbSeriesPerChunk } - } else if (!is.null(con)) + } else if (is.function(data)) + { + #custom user function to retrieve next n curves, probably to read from DB + writeTmp( getCoeffs( data(nbPerChunk) ) ) + } else { #incremental connection #read it one by one and get coeffs until nbSeriesPerChunk #then launch a clustering task............ - readLines() - } else - stop("at least 'data' or 'con' argument must be present") + #TODO: find a better way to parse than using a temp file + ascii_lines = readLines(data, nbSeriesPerChunk) + seriesChunkFile = ".tmp/seriesChunk" + writeLines(ascii_lines, seriesChunkFile) + writeTmp( getCoeffs( read.csv(seriesChunkFile) ) ) + } + + library(parallel) + ncores = ifelse(is.numeric(ncores), ncores, parallel::detectCores()) + cl = parallel::makeCluster(ncores) +115 parallel::clusterExport(cl=cl, varlist=c("X", "Y", "K", "p"), envir=environment()) +116 li = parallel::parLapply(cl, 1:B, getParamsAtIndex) + + #2) process coeffs (by nbSeriesPerChunk) and cluster in parallel (just launch async task, wait for them to complete, and re-do if necessary) + repeat + { + completed = rep(FALSE, ............) + #while there is jobs to do (i.e. size of tmp "file" is greater than nbSeriesPerChunk), + #A) determine which tasks which processor will do (OK) + #B) send each (sets of) tasks in parallel + #C) flush tmp file (current parallel processes will write in it) + #always check "complete" flag (array, as I did in MPI) to know if "slaves" finished + } +parallel::stopCluster(cl) + + #3) readTmp last results, apply PAM on it, and return medoids + identifiers + + #4) apply stage 2 (in parallel ? inside task 2) ?) + if (WER == "end") + { + #from center curves, apply stage 2... + } } getCoeffs = function(series)