advance on main.R
authorBenjamin Auder <benjamin.auder@somewhere>
Mon, 9 Jan 2017 02:17:35 +0000 (03:17 +0100)
committerBenjamin Auder <benjamin.auder@somewhere>
Mon, 9 Jan 2017 02:17:35 +0000 (03:17 +0100)
code/draft_R_pkg/R/main.R

index a01385e..c3e4b54 100644 (file)
@@ -10,7 +10,11 @@ writeTmp(curves [uncompressed coeffs, limited number - nbSeriesPerChunk], last=F
 readTmp(..., from index, n curves) #careful: connection must remain open
 #TODO: write read/write tmp reference ( on file in .tmp/ folder ... )
 
 readTmp(..., from index, n curves) #careful: connection must remain open
 #TODO: write read/write tmp reference ( on file in .tmp/ folder ... )
 
-epclust = function(data=NULL, K, nbPerChunk, ..., writeTmp=ref_writeTmp, readTmp=ref_readTmp) #where to put/retrieve intermediate results; if not provided, use file on disk
+#data:
+#stop("Unrecognizable 'data' argument (must be numeric, functional or connection)")
+
+#WER: "end" to apply stage 2 after stage 1 iterated, "mix" (or anything else...?!) to apply it after every stage 1
+epclust = function(data, K, nbPerChunk, WER="end", ncores=NULL, writeTmp=ref_writeTmp, readTmp=ref_readTmp) #where to put/retrieve intermediate results; if not provided, use file on disk
 {
 
 
 {
 
 
@@ -37,17 +41,39 @@ epclust = function(data=NULL, K, nbPerChunk, ..., writeTmp=ref_writeTmp, readTmp
                #incremental connection
                #read it one by one and get coeffs until nbSeriesPerChunk
                #then launch a clustering task............
                #incremental connection
                #read it one by one and get coeffs until nbSeriesPerChunk
                #then launch a clustering task............
+               #TODO: find a better way to parse than using a temp file
                ascii_lines = readLines(data, nbSeriesPerChunk)
                ascii_lines = readLines(data, nbSeriesPerChunk)
-               seriesChunkFile = ".tmp/seriesChunk" #TODO: find a better way
+               seriesChunkFile = ".tmp/seriesChunk"
                writeLines(ascii_lines, seriesChunkFile)
                writeTmp( getCoeffs( read.csv(seriesChunkFile) ) )
                writeLines(ascii_lines, seriesChunkFile)
                writeTmp( getCoeffs( read.csv(seriesChunkFile) ) )
-       } else
-               stop("Unrecognizable 'data' argument (must be numeric, functional or connection)")
+       }
+
+       library(parallel)
+       ncores = ifelse(is.numeric(ncores), ncores, parallel::detectCores())
+       cl = parallel::makeCluster(ncores)
+115     parallel::clusterExport(cl=cl, varlist=c("X", "Y", "K", "p"), envir=environment())
+116     li = parallel::parLapply(cl, 1:B, getParamsAtIndex)
 
        #2) process coeffs (by nbSeriesPerChunk) and cluster in parallel (just launch async task, wait for them to complete, and re-do if necessary)
 
        #2) process coeffs (by nbSeriesPerChunk) and cluster in parallel (just launch async task, wait for them to complete, and re-do if necessary)
+       repeat
+       {
+               completed = rep(FALSE, ............)
+               #while there is jobs to do (i.e. size of tmp "file" is greater than nbSeriesPerChunk),
+               #A) determine which tasks which processor will do (OK)
+               #B) send each (sets of) tasks in parallel
+               #C) flush tmp file (current parallel processes will write in it)
+               #always check "complete" flag (array, as I did in MPI) to know if "slaves" finished
+       }
+
+parallel::stopCluster(cl)
 
 
+       #3) readTmp last results, apply PAM on it, and return medoids + identifiers
 
 
-       #3) apply stage 2 (in parallel ? inside task 2) ?)
+       #4) apply stage 2 (in parallel ? inside task 2) ?)
+       if (WER == "end")
+       {
+               #from center curves, apply stage 2...
+       }
 }
 
 getCoeffs = function(series)
 }
 
 getCoeffs = function(series)