if (is.data.frame(data))
{
#full data matrix
- error = writeTmp( getCoeffs( data[index:(min(index+nbSeriesPerChunk-1,nrow(data))),] ) )
+ if (index < nrow(data))
+ {
+ writeTmp( getCoeffs( data[index:(min(index+nbSeriesPerChunk-1,nrow(data))),] ) )
+ } else
+ {
+ break
+ }
} else if (is.function(data))
{
#custom user function to retrieve next n curves, probably to read from DB
- error = writeTmp( getCoeffs( data(index, nbSeriesPerChunk) ) )
+ coeffs_chunk = getCoeffs( data(index, nbSeriesPerChunk) )
+ if (!is.null(coeffs_chunk))
+ {
+ writeTmp(coeffs_chunk)
+ } else
+ {
+ break
+ }
} else
{
#incremental connection
#TODO: find a better way to parse than using a temp file
ascii_lines = readLines(dataCon, nbSeriesPerChunk)
- seriesChunkFile = ".tmp/seriesChunk"
- writeLines(ascii_lines, seriesChunkFile)
- error = writeTmp( getCoeffs( read.csv(seriesChunkFile) ) )
+ if (length(ascii_lines > 0))
+ {
+ seriesChunkFile = ".tmp/seriesChunk"
+ writeLines(ascii_lines, seriesChunkFile)
+ writeTmp( getCoeffs( read.csv(seriesChunkFile) ) )
+ } else
+ {
+ break
+ }
}
index = index + nbSeriesPerChunk
}
cl = parallel::makeCluster(ncores)
parallel::clusterExport(cl=cl, varlist=c("X", "Y", "K", "p"), envir=environment())
library(cluster)
- li = parallel::parLapply(cl, 1:B, getParamsAtIndex)
+ li = parallel::parLapply(cl, 1:B, )
#2) process coeffs (by nbSeriesPerChunk) and cluster them in parallel
#TODO: be careful of writing to a new temp file, then flush initial one, then re-use it...
#C) flush tmp file (current parallel processes will write in it)
#always check "complete" flag (array, as I did in MPI) to know if "slaves" finished
}
-pam(x, k
+pam(x, k)
parallel::stopCluster(cl)
#3) readTmp last results, apply PAM on it, and return medoids + identifiers
#from center curves, apply stage 2...
}
}
-
-getCoeffs = function(series)
-{
- #... return wavelets coeffs : compute in parallel !
-}