X-Git-Url: https://git.auder.net/?a=blobdiff_plain;f=code%2Fdraft_R_pkg%2FR%2Fmain.R;h=19729ed5bbc0367615463088e9ce08f7cab91bda;hb=6ecf5c2d4c84eb3c0c70d3f2ce44b900699cc0b4;hp=695b9282945444db8dff267efc291edf2e95dc87;hpb=7f0781b723158cf8e6b25dcab0bae18acae40be8;p=epclust.git diff --git a/code/draft_R_pkg/R/main.R b/code/draft_R_pkg/R/main.R index 695b928..19729ed 100644 --- a/code/draft_R_pkg/R/main.R +++ b/code/draft_R_pkg/R/main.R @@ -30,7 +30,17 @@ epclust = function(data, K, nbSeriesPerChunk, writeTmp=ref_writeTmp, readTmp=ref #0) check arguments if (!is.data.frame(data) && !is.function(data)) - tryCatch({dataCon = open(data)}, + tryCatch( + { + if (is.character(data)) + { + dataCon = file(data, open="r") + } else if (!isOpen(data)) + { + open(data) + dataCon = data + } + }, error="data should be a data.frame, a function or a valid connection") if (!is.integer(K) || K < 2) stop("K should be an integer greater or equal to 2") @@ -44,17 +54,17 @@ epclust = function(data, K, nbSeriesPerChunk, writeTmp=ref_writeTmp, readTmp=ref #1) acquire data (process curves, get as coeffs) index = 1 - nbCurves = nrow(data) - while (index < nbCurves) + nbCurves = 0 + repeat { if (is.data.frame(data)) { #full data matrix - writeTmp( getCoeffs( data[index:(min(index+nbSeriesPerChunk-1,nbCurves)),] ) ) + error = writeTmp( getCoeffs( data[index:(min(index+nbSeriesPerChunk-1,nrow(data))),] ) ) } else if (is.function(data)) { #custom user function to retrieve next n curves, probably to read from DB - writeTmp( getCoeffs( data(index, nbSeriesPerChunk) ) ) + error = writeTmp( getCoeffs( data(index, nbSeriesPerChunk) ) ) } else { #incremental connection @@ -62,7 +72,7 @@ epclust = function(data, K, nbSeriesPerChunk, writeTmp=ref_writeTmp, readTmp=ref ascii_lines = readLines(dataCon, nbSeriesPerChunk) seriesChunkFile = ".tmp/seriesChunk" writeLines(ascii_lines, seriesChunkFile) - writeTmp( getCoeffs( read.csv(seriesChunkFile) ) ) + error = writeTmp( getCoeffs( read.csv(seriesChunkFile) ) ) } index = index + nbSeriesPerChunk } @@ -73,9 +83,11 @@ epclust = function(data, K, nbSeriesPerChunk, writeTmp=ref_writeTmp, readTmp=ref ncores = ifelse(is.integer(ncores), ncores, parallel::detectCores()) cl = parallel::makeCluster(ncores) parallel::clusterExport(cl=cl, varlist=c("X", "Y", "K", "p"), envir=environment()) + library(cluster) li = parallel::parLapply(cl, 1:B, getParamsAtIndex) - #2) process coeffs (by nbSeriesPerChunk) and cluster in parallel (just launch async task, wait for them to complete, and re-do if necessary) + #2) process coeffs (by nbSeriesPerChunk) and cluster them in parallel + #TODO: be careful of writing to a new temp file, then flush initial one, then re-use it... repeat { completed = rep(FALSE, ............) @@ -85,7 +97,7 @@ epclust = function(data, K, nbSeriesPerChunk, writeTmp=ref_writeTmp, readTmp=ref #C) flush tmp file (current parallel processes will write in it) #always check "complete" flag (array, as I did in MPI) to know if "slaves" finished } - +pam(x, k parallel::stopCluster(cl) #3) readTmp last results, apply PAM on it, and return medoids + identifiers