From: Benjamin Auder Date: Mon, 9 Jan 2017 11:06:39 +0000 (+0100) Subject: progress in main.R X-Git-Url: https://git.auder.net/%7B%7B%20path%28%27fos_user_resetting_request%27%29%20%7D%7D?a=commitdiff_plain;h=6ecf5c2d4c84eb3c0c70d3f2ce44b900699cc0b4;p=epclust.git progress in main.R --- diff --git a/code/draft_R_pkg/DESCRIPTION b/code/draft_R_pkg/DESCRIPTION index ce9129b..669e8c0 100644 --- a/code/draft_R_pkg/DESCRIPTION +++ b/code/draft_R_pkg/DESCRIPTION @@ -10,12 +10,11 @@ Author: Jairo Cugliari [aut] Maintainer: Benjamin Auder Depends: - R (>= 3.0.0) -Imports: - MASS + R (>= 3.0.0), + parallel, + cluster Suggests: testthat, - parallel, knitr License: MIT + file LICENSE VignetteBuilder: knitr diff --git a/code/draft_R_pkg/R/main.R b/code/draft_R_pkg/R/main.R index 695b928..19729ed 100644 --- a/code/draft_R_pkg/R/main.R +++ b/code/draft_R_pkg/R/main.R @@ -30,7 +30,17 @@ epclust = function(data, K, nbSeriesPerChunk, writeTmp=ref_writeTmp, readTmp=ref #0) check arguments if (!is.data.frame(data) && !is.function(data)) - tryCatch({dataCon = open(data)}, + tryCatch( + { + if (is.character(data)) + { + dataCon = file(data, open="r") + } else if (!isOpen(data)) + { + open(data) + dataCon = data + } + }, error="data should be a data.frame, a function or a valid connection") if (!is.integer(K) || K < 2) stop("K should be an integer greater or equal to 2") @@ -44,17 +54,17 @@ epclust = function(data, K, nbSeriesPerChunk, writeTmp=ref_writeTmp, readTmp=ref #1) acquire data (process curves, get as coeffs) index = 1 - nbCurves = nrow(data) - while (index < nbCurves) + nbCurves = 0 + repeat { if (is.data.frame(data)) { #full data matrix - writeTmp( getCoeffs( data[index:(min(index+nbSeriesPerChunk-1,nbCurves)),] ) ) + error = writeTmp( getCoeffs( data[index:(min(index+nbSeriesPerChunk-1,nrow(data))),] ) ) } else if (is.function(data)) { #custom user function to retrieve next n curves, probably to read from DB - writeTmp( getCoeffs( data(index, nbSeriesPerChunk) ) ) + error = writeTmp( getCoeffs( data(index, nbSeriesPerChunk) ) ) } else { #incremental connection @@ -62,7 +72,7 @@ epclust = function(data, K, nbSeriesPerChunk, writeTmp=ref_writeTmp, readTmp=ref ascii_lines = readLines(dataCon, nbSeriesPerChunk) seriesChunkFile = ".tmp/seriesChunk" writeLines(ascii_lines, seriesChunkFile) - writeTmp( getCoeffs( read.csv(seriesChunkFile) ) ) + error = writeTmp( getCoeffs( read.csv(seriesChunkFile) ) ) } index = index + nbSeriesPerChunk } @@ -73,9 +83,11 @@ epclust = function(data, K, nbSeriesPerChunk, writeTmp=ref_writeTmp, readTmp=ref ncores = ifelse(is.integer(ncores), ncores, parallel::detectCores()) cl = parallel::makeCluster(ncores) parallel::clusterExport(cl=cl, varlist=c("X", "Y", "K", "p"), envir=environment()) + library(cluster) li = parallel::parLapply(cl, 1:B, getParamsAtIndex) - #2) process coeffs (by nbSeriesPerChunk) and cluster in parallel (just launch async task, wait for them to complete, and re-do if necessary) + #2) process coeffs (by nbSeriesPerChunk) and cluster them in parallel + #TODO: be careful of writing to a new temp file, then flush initial one, then re-use it... repeat { completed = rep(FALSE, ............) @@ -85,7 +97,7 @@ epclust = function(data, K, nbSeriesPerChunk, writeTmp=ref_writeTmp, readTmp=ref #C) flush tmp file (current parallel processes will write in it) #always check "complete" flag (array, as I did in MPI) to know if "slaves" finished } - +pam(x, k parallel::stopCluster(cl) #3) readTmp last results, apply PAM on it, and return medoids + identifiers