-curvesToCoeffs = function(series)
+#NOTE: always keep ID in first column
+curvesToCoeffs = function(series, wf)
{
- #... return wavelets coeffs : compute in parallel !
- #TODO: always keep ID in first column
-}
-
-coeffsToCurves = function(coeffs)
-{
- #re-expand on wavelet basis
+ library(wavelets)
+ L = length(series[1,])
+ D = ceiling( log(L-1) )
+ nb_sample_points = 2^D
+ #TODO: parallel::parApply() ?!
+ res = apply(series, 1, function(x) {
+ interpolated_curve = spline(1:(L-1), x[2:L], n=nb_sample_points)$y
+ W = wavelets::dwt(interpolated_curve, filter=wf, D)@W
+ nrj_coeffs = rev( sapply( W, function(v) ( sqrt( sum(v^2) ) ) ) )
+ return ( c(x[1], nrj_coeffs) )
+ })
+ return (as.data.frame(res))
}
getClusters = function(data, K)
{
- pam_output = pam(data, K)
+ library(cluster)
+ pam_output = cluster::pam(data, K)
return ( list( clusts=pam_output$clustering, medoids=pam_output$medoids,
ranks=pam_output$id.med ) )
}
#' @param writeTmp Function to write temporary wavelets coefficients (+ identifiers);
#' see defaults in defaults.R
#' @param readTmp Function to read temporary wavelets coefficients (see defaults.R)
+#' @param wf Wavelet transform filter; see ?wt.filter. Default: haar
#' @param WER "end" to apply stage 2 after stage 1 has iterated and finished, or "mix"
#' to apply it after every stage 1
#' @param ncores number of parallel processes; if NULL, use parallel::detectCores()
#'
#' @return A data.frame of the final medoids curves (identifiers + values)
epclust = function(data, K, nb_series_per_chunk, min_series_per_chunk=10*K,
- writeTmp=defaultWriteTmp, readTmp=defaultReadTmp, WER="end", ncores=NULL)
+ writeTmp=defaultWriteTmp, readTmp=defaultReadTmp, wf="haar", WER="end", ncores=NULL)
{
#TODO: setRefClass(...) to avoid copy data:
#http://stackoverflow.com/questions/2603184/r-pass-by-reference
if (index < nrow(data))
{
coeffs_chunk = curvesToCoeffs(
- data[index:(min(index+nb_series_per_chunk-1,nrow(data))),])
+ data[index:(min(index+nb_series_per_chunk-1,nrow(data))),], wf)
}
} else if (is.function(data))
{
#custom user function to retrieve next n curves, probably to read from DB
- coeffs_chunk = curvesToCoeffs( data(index, nb_series_per_chunk) )
+ coeffs_chunk = curvesToCoeffs( data(index, nb_series_per_chunk), wf )
} else
{
#incremental connection
{
series_chunk_file = ".tmp/series_chunk"
writeLines(ascii_lines, series_chunk_file)
- coeffs_chunk = curvesToCoeffs( read.csv(series_chunk_file) )
+ coeffs_chunk = curvesToCoeffs( read.csv(series_chunk_file), wf )
}
}
if (is.null(coeffs_chunk))
ncores = ifelse(is.integer(ncores), ncores, parallel::detectCores())
cl = parallel::makeCluster(ncores)
parallel::clusterExport(cl=cl, varlist=c("X", "Y", "K", "p"), envir=environment())
- library(cluster)
#TODO: be careful of writing to a new temp file, then flush initial one, then re-use it...
repeat
{
#while there is jobs to do (i.e. size of tmp "file" is greater than nb_series_per_chunk)
nb_workers = nb_curves %/% nb_series_per_chunk
indices = list()
- #incides[[i]] == (start_index,number_of_elements)
+ #indices[[i]] == (start_index,number_of_elements)
for (i in 1:nb_workers)
indices[[i]] = c(nb_series_per_chunk*(i-1)+1, nb_series_per_chunk)
remainder = nb_curves %% nb_series_per_chunk
#spread the load among other workers
}
- li = parallel::parLapply(cl, indices, processChunk, WER=="mix")
+ li = parallel::parLapply(cl, indices, processChunk, K, WER=="mix")
#C) flush tmp file (current parallel processes will write in it)
}
parallel::stopCluster(cl)
ids=final_coeffs[,1] ) )
}
pam_output = getClusters(as.matrix(final_coeffs[,2:ncol(final_coeffs)]), K)
- medoids = coeffsToCurves(pam_output$medoids)
+ medoids = coeffsToCurves(pam_output$medoids, wf)
ids = final_coeffs[,1] [pam_output$ranks]
- return (list(medoids=medoids, ids=ids))
#4) apply stage 2 (in parallel ? inside task 2) ?)
if (WER == "end")
{
#from center curves, apply stage 2...
+ #TODO:
}
+
+ return (list(medoids=medoids, ids=ids))
}
-processChunk = function(indice, WER)
+processChunk = function(indice, K, WER)
{
#1) retrieve data
+ coeffs = readTmp(indice[1], indice[2])
#2) cluster
+ cl = getClusters(as.matrix(coeffs[,2:ncol(coeffs)]), K)
#3) WER (optional)
+ #TODO:
}
+
+#TODO: difficulté : retrouver courbe à partir de l'identifiant (DB ok mais le reste ?)
+#aussi : que passe-t-on aux noeuds ? curvesToCoeffs en // ?
+#enfin : WER ?!