From: Benjamin Auder Date: Mon, 9 Jan 2017 14:56:00 +0000 (+0100) Subject: progress in main.R X-Git-Url: https://git.auder.net/game/%7B%7B%20path%28%27mixstore_store_package_upsert%27%2C%20%7B%20id:%20pkg.id%20%7D%29%20%7D%7D?a=commitdiff_plain;h=b9f1c0c7e2b96b52dadc5c4fcc77372f8c7dd8d8;p=epclust.git progress in main.R --- diff --git a/code/draft_R_pkg/R/algorithms.R b/code/draft_R_pkg/R/algorithms.R new file mode 100644 index 0000000..e27a235 --- /dev/null +++ b/code/draft_R_pkg/R/algorithms.R @@ -0,0 +1,10 @@ +getCoeffs = function(series) +{ + #... return wavelets coeffs : compute in parallel ! +} + +getClusters = function(data, K) +{ + pam_output = pam(data, K) + return ( list(clusts=pam_output$clustering, medoids=pam_output$medoids) ) +} diff --git a/code/draft_R_pkg/R/main.R b/code/draft_R_pkg/R/main.R index 19729ed..6dca708 100644 --- a/code/draft_R_pkg/R/main.R +++ b/code/draft_R_pkg/R/main.R @@ -60,19 +60,38 @@ epclust = function(data, K, nbSeriesPerChunk, writeTmp=ref_writeTmp, readTmp=ref if (is.data.frame(data)) { #full data matrix - error = writeTmp( getCoeffs( data[index:(min(index+nbSeriesPerChunk-1,nrow(data))),] ) ) + if (index < nrow(data)) + { + writeTmp( getCoeffs( data[index:(min(index+nbSeriesPerChunk-1,nrow(data))),] ) ) + } else + { + break + } } else if (is.function(data)) { #custom user function to retrieve next n curves, probably to read from DB - error = writeTmp( getCoeffs( data(index, nbSeriesPerChunk) ) ) + coeffs_chunk = getCoeffs( data(index, nbSeriesPerChunk) ) + if (!is.null(coeffs_chunk)) + { + writeTmp(coeffs_chunk) + } else + { + break + } } else { #incremental connection #TODO: find a better way to parse than using a temp file ascii_lines = readLines(dataCon, nbSeriesPerChunk) - seriesChunkFile = ".tmp/seriesChunk" - writeLines(ascii_lines, seriesChunkFile) - error = writeTmp( getCoeffs( read.csv(seriesChunkFile) ) ) + if (length(ascii_lines > 0)) + { + seriesChunkFile = ".tmp/seriesChunk" + writeLines(ascii_lines, seriesChunkFile) + writeTmp( getCoeffs( read.csv(seriesChunkFile) ) ) + } else + { + break + } } index = index + nbSeriesPerChunk } @@ -84,7 +103,7 @@ epclust = function(data, K, nbSeriesPerChunk, writeTmp=ref_writeTmp, readTmp=ref cl = parallel::makeCluster(ncores) parallel::clusterExport(cl=cl, varlist=c("X", "Y", "K", "p"), envir=environment()) library(cluster) - li = parallel::parLapply(cl, 1:B, getParamsAtIndex) + li = parallel::parLapply(cl, 1:B, ) #2) process coeffs (by nbSeriesPerChunk) and cluster them in parallel #TODO: be careful of writing to a new temp file, then flush initial one, then re-use it... @@ -97,7 +116,7 @@ epclust = function(data, K, nbSeriesPerChunk, writeTmp=ref_writeTmp, readTmp=ref #C) flush tmp file (current parallel processes will write in it) #always check "complete" flag (array, as I did in MPI) to know if "slaves" finished } -pam(x, k +pam(x, k) parallel::stopCluster(cl) #3) readTmp last results, apply PAM on it, and return medoids + identifiers @@ -108,8 +127,3 @@ pam(x, k #from center curves, apply stage 2... } } - -getCoeffs = function(series) -{ - #... return wavelets coeffs : compute in parallel ! -}