X-Git-Url: https://git.auder.net/?p=epclust.git;a=blobdiff_plain;f=epclust%2FR%2Fmain.R;h=0b598329b87935a3ea668f8953e6967f8bd9ea5f;hp=ac4ea8ddc40567b72d84c240743fbc38d4e57971;hb=e205f2187f0ccdff00bffc47642392ec5e33214d;hpb=0e2dce80a3fddaca50c96c6c27a8b32468095d6c diff --git a/epclust/R/main.R b/epclust/R/main.R index ac4ea8d..0b59832 100644 --- a/epclust/R/main.R +++ b/epclust/R/main.R @@ -34,6 +34,7 @@ #' "LIMIT ", n, " ORDER BY date", sep="")) #' return (df) #' } +#' #####TODO: if DB, array rank --> ID at first retrieval, when computing coeffs; so:: NO use of IDs ! #' #TODO: 3 examples, data.frame / binary file / DB sqLite #' + sampleCurves : wavBootstrap de package wmtsa #' cl = epclust(getData, K1=200, K2=15, ntasks=1000, nb_series_per_chunk=5000, WER="mix") @@ -98,7 +99,6 @@ epclust = function(series,K1,K2,ntasks=1,nb_series_per_chunk=50*K1,min_series_pe nb_curves = nb_curves + nrow(coeffs_chunk) } getCoefs = function(indices) getDataInFile(indices, coefs_file) -######TODO: if DB, array rank --> ID at first retrieval, when computing coeffs; so:: NO use of IDs ! if (nb_curves < min_series_per_chunk) stop("Not enough data: less rows than min_series_per_chunk!") @@ -112,17 +112,36 @@ epclust = function(series,K1,K2,ntasks=1,nb_series_per_chunk=50*K1,min_series_pe upper_bound = ifelse( i series on file) - indices = parallel::parLapply(cl_tasks, indices_tasks, clusteringTask) - parallel::stopCluster(cl_tasks) + indices = unlist( parallel::parLapply(cl, indices_tasks, function(inds) { + clusteringTask(inds, getSeries, getSeries, getCoefs, K1, K2*(WER=="mix"), + nb_series_per_chunk,ncores_clust,to_file=TRUE) + }) ) + parallel::stopCluster(cl) - #Now series must be retrieved from synchrones_file, and have no ID - getSeries = function(indices, ids) getDataInFile(indices, synchrones_file) + getSeriesForSynchrones = getSeries + synchrones_file = paste(bin_dir,"synchrones",sep="") + if (WER=="mix") + { + indices = seq_len(ntasks*K2) + #Now series must be retrieved from synchrones_file + getSeries = function(inds) getDataInFile(inds, synchrones_file) + #Coefs must be re-computed + unlink(coefs_file) + index = 1 + repeat + { + series = getSeries((index-1)+seq_len(nb_series_per_chunk)) + if (is.null(series)) + break + coeffs_chunk = curvesToCoeffs(series, wf) + serialize(coeffs_chunk, coefs_file) + index = index + nb_series_per_chunk + } + } # Run step2 on resulting indices or series (from file) - computeClusters2(indices=if (WER=="end") indices else NULL, K2, to_file=FALSE) + clusteringTask(indices, getSeries, getSeriesForSynchrones, getCoefs, K1, K2, + nb_series_per_chunk, ncores_tasks*ncores_clust, to_file=FALSE) }