computeCoeffs = function(data, index, nb_series_per_chunk, wf) { coeffs_chunk = NULL if (is.data.frame(data) && index < nrow(data)) { #full data matrix coeffs_chunk = curvesToCoeffs( data[index:(min(index+nb_series_per_chunk-1,nrow(data))),], wf) } else if (is.function(data)) { #custom user function to retrieve next n curves, probably to read from DB coeffs_chunk = curvesToCoeffs( data(rank=(index-1)+seq_len(nb_series_per_chunk)), wf ) } else if (exists(data_con)) { #incremental connection ; TODO: more efficient way to parse than using a temp file ascii_lines = readLines(data_con, nb_series_per_chunk) if (length(ascii_lines > 0)) { series_chunk_file = ".series_chunk" writeLines(ascii_lines, series_chunk_file) coeffs_chunk = curvesToCoeffs( read.csv(series_chunk_file), wf ) unlink(series_chunk_file) } } coeffs_chunk } #NOTE: always keep ID in first column (...? is it good ?!) curvesToCoeffs = function(series, wf) { if (!require(wavelets, quietly=TRUE)) stop("Couldn't load wavelets library") L = length(series[1,]) D = ceiling( log2(L-1) ) nb_sample_points = 2^D #TODO: parallel::parApply() ?! res = apply(series, 1, function(x) { interpolated_curve = spline(1:(L-1), x[2:L], n=nb_sample_points)$y W = wavelets::dwt(interpolated_curve, filter=wf, D)@W nrj_coeffs = rev( sapply( W, function(v) ( sqrt( sum(v^2) ) ) ) ) return ( c(x[1], nrj_coeffs) ) }) return (as.data.frame(res)) }