From: Benjamin Auder Date: Mon, 9 Jan 2017 02:17:35 +0000 (+0100) Subject: advance on main.R X-Git-Url: https://git.auder.net/doc/html/%7B%7B%20path%28%27mixstore_static_about%27%29%20%7D%7D?a=commitdiff_plain;h=8e6accca063927f6864a9cc6b01e2e77085bab95;p=epclust.git advance on main.R --- diff --git a/code/draft_R_pkg/R/main.R b/code/draft_R_pkg/R/main.R index a01385e..c3e4b54 100644 --- a/code/draft_R_pkg/R/main.R +++ b/code/draft_R_pkg/R/main.R @@ -10,7 +10,11 @@ writeTmp(curves [uncompressed coeffs, limited number - nbSeriesPerChunk], last=F readTmp(..., from index, n curves) #careful: connection must remain open #TODO: write read/write tmp reference ( on file in .tmp/ folder ... ) -epclust = function(data=NULL, K, nbPerChunk, ..., writeTmp=ref_writeTmp, readTmp=ref_readTmp) #where to put/retrieve intermediate results; if not provided, use file on disk +#data: +#stop("Unrecognizable 'data' argument (must be numeric, functional or connection)") + +#WER: "end" to apply stage 2 after stage 1 iterated, "mix" (or anything else...?!) to apply it after every stage 1 +epclust = function(data, K, nbPerChunk, WER="end", ncores=NULL, writeTmp=ref_writeTmp, readTmp=ref_readTmp) #where to put/retrieve intermediate results; if not provided, use file on disk { @@ -37,17 +41,39 @@ epclust = function(data=NULL, K, nbPerChunk, ..., writeTmp=ref_writeTmp, readTmp #incremental connection #read it one by one and get coeffs until nbSeriesPerChunk #then launch a clustering task............ + #TODO: find a better way to parse than using a temp file ascii_lines = readLines(data, nbSeriesPerChunk) - seriesChunkFile = ".tmp/seriesChunk" #TODO: find a better way + seriesChunkFile = ".tmp/seriesChunk" writeLines(ascii_lines, seriesChunkFile) writeTmp( getCoeffs( read.csv(seriesChunkFile) ) ) - } else - stop("Unrecognizable 'data' argument (must be numeric, functional or connection)") + } + + library(parallel) + ncores = ifelse(is.numeric(ncores), ncores, parallel::detectCores()) + cl = parallel::makeCluster(ncores) +115 parallel::clusterExport(cl=cl, varlist=c("X", "Y", "K", "p"), envir=environment()) +116 li = parallel::parLapply(cl, 1:B, getParamsAtIndex) #2) process coeffs (by nbSeriesPerChunk) and cluster in parallel (just launch async task, wait for them to complete, and re-do if necessary) + repeat + { + completed = rep(FALSE, ............) + #while there is jobs to do (i.e. size of tmp "file" is greater than nbSeriesPerChunk), + #A) determine which tasks which processor will do (OK) + #B) send each (sets of) tasks in parallel + #C) flush tmp file (current parallel processes will write in it) + #always check "complete" flag (array, as I did in MPI) to know if "slaves" finished + } + +parallel::stopCluster(cl) + #3) readTmp last results, apply PAM on it, and return medoids + identifiers - #3) apply stage 2 (in parallel ? inside task 2) ?) + #4) apply stage 2 (in parallel ? inside task 2) ?) + if (WER == "end") + { + #from center curves, apply stage 2... + } } getCoeffs = function(series)