#0) check arguments
if (!is.data.frame(data) && !is.function(data))
- tryCatch({dataCon = open(data)},
+ tryCatch(
+ {
+ if (is.character(data))
+ {
+ dataCon = file(data, open="r")
+ } else if (!isOpen(data))
+ {
+ open(data)
+ dataCon = data
+ }
+ },
error="data should be a data.frame, a function or a valid connection")
if (!is.integer(K) || K < 2)
stop("K should be an integer greater or equal to 2")
#1) acquire data (process curves, get as coeffs)
index = 1
- nbCurves = nrow(data)
- while (index < nbCurves)
+ nbCurves = 0
+ repeat
{
if (is.data.frame(data))
{
#full data matrix
- writeTmp( getCoeffs( data[index:(min(index+nbSeriesPerChunk-1,nbCurves)),] ) )
+ error = writeTmp( getCoeffs( data[index:(min(index+nbSeriesPerChunk-1,nrow(data))),] ) )
} else if (is.function(data))
{
#custom user function to retrieve next n curves, probably to read from DB
- writeTmp( getCoeffs( data(index, nbSeriesPerChunk) ) )
+ error = writeTmp( getCoeffs( data(index, nbSeriesPerChunk) ) )
} else
{
#incremental connection
ascii_lines = readLines(dataCon, nbSeriesPerChunk)
seriesChunkFile = ".tmp/seriesChunk"
writeLines(ascii_lines, seriesChunkFile)
- writeTmp( getCoeffs( read.csv(seriesChunkFile) ) )
+ error = writeTmp( getCoeffs( read.csv(seriesChunkFile) ) )
}
index = index + nbSeriesPerChunk
}
ncores = ifelse(is.integer(ncores), ncores, parallel::detectCores())
cl = parallel::makeCluster(ncores)
parallel::clusterExport(cl=cl, varlist=c("X", "Y", "K", "p"), envir=environment())
+ library(cluster)
li = parallel::parLapply(cl, 1:B, getParamsAtIndex)
- #2) process coeffs (by nbSeriesPerChunk) and cluster in parallel (just launch async task, wait for them to complete, and re-do if necessary)
+ #2) process coeffs (by nbSeriesPerChunk) and cluster them in parallel
+ #TODO: be careful of writing to a new temp file, then flush initial one, then re-use it...
repeat
{
completed = rep(FALSE, ............)
#C) flush tmp file (current parallel processes will write in it)
#always check "complete" flag (array, as I did in MPI) to know if "slaves" finished
}
-
+pam(x, k
parallel::stopCluster(cl)
#3) readTmp last results, apply PAM on it, and return medoids + identifiers