readTmp(..., from index, n curves) #careful: connection must remain open
#TODO: write read/write tmp reference ( on file in .tmp/ folder ... )
-epclust = function(data=NULL, K, nbPerChunk, ..., writeTmp=ref_writeTmp, readTmp=ref_readTmp) #where to put/retrieve intermediate results; if not provided, use file on disk
+#data:
+#stop("Unrecognizable 'data' argument (must be numeric, functional or connection)")
+
+#WER: "end" to apply stage 2 after stage 1 iterated, "mix" (or anything else...?!) to apply it after every stage 1
+epclust = function(data, K, nbPerChunk, WER="end", ncores=NULL, writeTmp=ref_writeTmp, readTmp=ref_readTmp) #where to put/retrieve intermediate results; if not provided, use file on disk
{
#incremental connection
#read it one by one and get coeffs until nbSeriesPerChunk
#then launch a clustering task............
+ #TODO: find a better way to parse than using a temp file
ascii_lines = readLines(data, nbSeriesPerChunk)
- seriesChunkFile = ".tmp/seriesChunk" #TODO: find a better way
+ seriesChunkFile = ".tmp/seriesChunk"
writeLines(ascii_lines, seriesChunkFile)
writeTmp( getCoeffs( read.csv(seriesChunkFile) ) )
- } else
- stop("Unrecognizable 'data' argument (must be numeric, functional or connection)")
+ }
+
+ library(parallel)
+ ncores = ifelse(is.numeric(ncores), ncores, parallel::detectCores())
+ cl = parallel::makeCluster(ncores)
+115 parallel::clusterExport(cl=cl, varlist=c("X", "Y", "K", "p"), envir=environment())
+116 li = parallel::parLapply(cl, 1:B, getParamsAtIndex)
#2) process coeffs (by nbSeriesPerChunk) and cluster in parallel (just launch async task, wait for them to complete, and re-do if necessary)
+ repeat
+ {
+ completed = rep(FALSE, ............)
+ #while there is jobs to do (i.e. size of tmp "file" is greater than nbSeriesPerChunk),
+ #A) determine which tasks which processor will do (OK)
+ #B) send each (sets of) tasks in parallel
+ #C) flush tmp file (current parallel processes will write in it)
+ #always check "complete" flag (array, as I did in MPI) to know if "slaves" finished
+ }
+
+parallel::stopCluster(cl)
+ #3) readTmp last results, apply PAM on it, and return medoids + identifiers
- #3) apply stage 2 (in parallel ? inside task 2) ?)
+ #4) apply stage 2 (in parallel ? inside task 2) ?)
+ if (WER == "end")
+ {
+ #from center curves, apply stage 2...
+ }
}
getCoeffs = function(series)