X-Git-Url: https://git.auder.net/?a=blobdiff_plain;f=epclust%2FR%2Fmain.R;h=27fbb7488394bcc39009c86b74251b3c458cce4b;hb=3eef8d3df59ded9a281cff51f79fe824198a7427;hp=ac4ea8ddc40567b72d84c240743fbc38d4e57971;hpb=0e2dce80a3fddaca50c96c6c27a8b32468095d6c;p=epclust.git diff --git a/epclust/R/main.R b/epclust/R/main.R index ac4ea8d..27fbb74 100644 --- a/epclust/R/main.R +++ b/epclust/R/main.R @@ -34,12 +34,13 @@ #' "LIMIT ", n, " ORDER BY date", sep="")) #' return (df) #' } +#' #####TODO: if DB, array rank --> ID at first retrieval, when computing coeffs; so:: NO use of IDs ! #' #TODO: 3 examples, data.frame / binary file / DB sqLite #' + sampleCurves : wavBootstrap de package wmtsa #' cl = epclust(getData, K1=200, K2=15, ntasks=1000, nb_series_per_chunk=5000, WER="mix") #' @export epclust = function(series,K1,K2,ntasks=1,nb_series_per_chunk=50*K1,min_series_per_chunk=5*K1, - wf="haar",WER="end",ncores_tasks=1,ncores_clust=4,random=TRUE,...) + wf="haar",WER="end",ncores_tasks=1,ncores_clust=4,random=TRUE,ftype="float",...) { # Check/transform arguments bin_dir = "epclust.bin/" @@ -50,7 +51,7 @@ epclust = function(series,K1,K2,ntasks=1,nb_series_per_chunk=50*K1,min_series_pe unlink(series_file) } if (is.matrix(series)) - serialize(series, series_file) + serialize(series, series_file, ftype, nb_series_per_chunk) else if (!is.function(series)) { tryCatch( @@ -62,7 +63,7 @@ epclust = function(series,K1,K2,ntasks=1,nb_series_per_chunk=50*K1,min_series_pe open(series) series_con = series } - serialize(series_con, series_file) + serialize(series_con, series_file, ftype, nb_series_per_chunk) close(series_con) }, error=function(e) "series should be a data.frame, a function or a valid connection" @@ -93,12 +94,11 @@ epclust = function(series,K1,K2,ntasks=1,nb_series_per_chunk=50*K1,min_series_pe if (is.null(series)) break coeffs_chunk = curvesToCoeffs(series, wf) - serialize(coeffs_chunk, coefs_file) + serialize(coeffs_chunk, coefs_file, ftype, nb_series_per_chunk) index = index + nb_series_per_chunk nb_curves = nb_curves + nrow(coeffs_chunk) } getCoefs = function(indices) getDataInFile(indices, coefs_file) -######TODO: if DB, array rank --> ID at first retrieval, when computing coeffs; so:: NO use of IDs ! if (nb_curves < min_series_per_chunk) stop("Not enough data: less rows than min_series_per_chunk!") @@ -112,17 +112,36 @@ epclust = function(series,K1,K2,ntasks=1,nb_series_per_chunk=50*K1,min_series_pe upper_bound = ifelse( i series on file) - indices = parallel::parLapply(cl_tasks, indices_tasks, clusteringTask) - parallel::stopCluster(cl_tasks) + indices = unlist( parallel::parLapply(cl, indices_tasks, function(inds) { + clusteringTask(inds, getSeries, getSeries, getCoefs, K1, K2*(WER=="mix"), + nb_series_per_chunk,ncores_clust,to_file=TRUE, ftype) + }) ) + parallel::stopCluster(cl) - #Now series must be retrieved from synchrones_file, and have no ID - getSeries = function(indices, ids) getDataInFile(indices, synchrones_file) + getSeriesForSynchrones = getSeries + synchrones_file = paste(bin_dir,"synchrones",sep="") + if (WER=="mix") + { + indices = seq_len(ntasks*K2) + #Now series must be retrieved from synchrones_file + getSeries = function(inds) getDataInFile(inds, synchrones_file) + #Coefs must be re-computed + unlink(coefs_file) + index = 1 + repeat + { + series = getSeries((index-1)+seq_len(nb_series_per_chunk)) + if (is.null(series)) + break + coeffs_chunk = curvesToCoeffs(series, wf) + serialize(coeffs_chunk, coefs_file, ftype, nb_series_per_chunk) + index = index + nb_series_per_chunk + } + } # Run step2 on resulting indices or series (from file) - computeClusters2(indices=if (WER=="end") indices else NULL, K2, to_file=FALSE) + clusteringTask(indices, getSeries, getSeriesForSynchrones, getCoefs, K1, K2, + nb_series_per_chunk, ncores_tasks*ncores_clust, to_file=FALSE, ftype) }