From e205f2187f0ccdff00bffc47642392ec5e33214d Mon Sep 17 00:00:00 2001 From: Benjamin Auder <benjamin.auder@somewhere> Date: Sun, 5 Mar 2017 00:03:37 +0100 Subject: [PATCH] before computeSynchrones --- epclust/R/clustering.R | 65 ++++++++++++++++++++++-------------------- epclust/R/main.R | 39 ++++++++++++++++++------- 2 files changed, 63 insertions(+), 41 deletions(-) diff --git a/epclust/R/clustering.R b/epclust/R/clustering.R index 6090517..c8bad66 100644 --- a/epclust/R/clustering.R +++ b/epclust/R/clustering.R @@ -1,57 +1,60 @@ # Cluster one full task (nb_curves / ntasks series) -clusteringTask = function(indices, ncores) +clusteringTask = function(indices,getSeries,getSeriesForSynchrones,synchrones_file, + getCoefs,K1,K2,nb_series_per_chunk,ncores,to_file) { cl = parallel::makeCluster(ncores) - parallel::clusterExport(cl, - varlist=c("K1","getCoefs"), - envir=environment()) repeat { - nb_workers = max( 1, round( length(indices_clust) / nb_series_per_chunk ) ) + nb_workers = max( 1, round( length(indices) / nb_series_per_chunk ) ) indices_workers = lapply(seq_len(nb_workers), function(i) { upper_bound = ifelse( i<nb_workers, - min(nb_series_per_chunk*i,length(indices_clust)), length(indices_clust) ) - indices_clust[(nb_series_per_chunk*(i-1)+1):upper_bound] + min(nb_series_per_chunk*i,length(indices)), length(indices) ) + indices[(nb_series_per_chunk*(i-1)+1):upper_bound] }) - indices_clust = unlist( parallel::parLapply(cl, indices_workers, function(indices) - computeClusters1(indices, getCoefs, K1)) ) + indices = unlist( parallel::parLapply(cl, indices_workers, function(inds) + computeClusters1(inds, getCoefs, K1)) ) if (length(indices_clust) == K1) break } - parallel::stopCluster(cl_clust) - if (WER == "end") - return (indices_clust) - #WER=="mix" - computeClusters2(indices_clust, K2, getSeries, to_file=TRUE) + parallel::stopCluster(cl) + if (K2 == 0) + return (indices) + computeClusters2(indices, K2, getSeries, getSeriesForSynchrones, to_file) + vector("integer",0) } # Apply the clustering algorithm (PAM) on a coeffs or distances matrix computeClusters1 = function(indices, getCoefs, K1) - indices[ cluster::pam(getCoefs(indices), K1, diss=FALSE)$id.med ] +{ + coefs = getCoefs(indices) + indices[ cluster::pam(coefs, K1, diss=FALSE)$id.med ] +} # Cluster a chunk of series inside one task (~max nb_series_per_chunk) -computeClusters2 = function(indices, K2, getSeries, to_file) +computeClusters2 = function(indices, K2, getSeries, getSeriesForSynchrones, to_file) { - if (is.null(indices)) - { - #get series from file - } -#Puis K-means après WER... - if (WER=="mix" > 0) + curves = computeSynchrones(indices, getSeries, getSeriesForSynchrones) + dists = computeWerDists(curves) + medoids = cluster::pam(dists, K2, diss=TRUE)$medoids + if (to_file) { - curves = computeSynchrones(indices) - dists = computeWerDists(curves) - indices = computeClusters(dists, K2, diss=TRUE) + serialize(medoids, synchrones_file) + return (NULL) } - if (to_file) - #write results to file (JUST series ; no possible ID here) + medoids } # Compute the synchrones curves (sum of clusters elements) from a clustering result -computeSynchrones = function(inds) - sapply(seq_along(inds), colMeans(getSeries(inds[[i]]$indices,inds[[i]]$ids))) +computeSynchrones = function(indices, getSeries, getSeriesForSynchrones) +{ + #les getSeries(indices) sont les medoides --> init vect nul pour chacun, puis incr avec les + #courbes (getSeriesForSynchrones) les plus proches... --> au sens de la norme L2 ? + series = getSeries(indices) + #........... + #sapply(seq_along(inds), colMeans(getSeries(inds[[i]]$indices,inds[[i]]$ids))) +} -# Compute the WER distance between the synchrones curves (in columns) +# Compute the WER distance between the synchrones curves (in rows) computeWerDist = function(curves) { if (!require("Rwave", quietly=TRUE)) @@ -74,7 +77,7 @@ computeWerDist = function(curves) # (normalized) observations node with CWT Xcwt4 <- lapply(seq_len(n), function(i) { - ts <- scale(ts(curves[,i]), center=TRUE, scale=scaled) + ts <- scale(ts(curves[i,]), center=TRUE, scale=scaled) totts.cwt = Rwave::cwt(ts,totnoct,nvoice,w0,plot=0) ts.cwt = totts.cwt[,s0log:(s0log+noctave*nvoice)] #Normalization diff --git a/epclust/R/main.R b/epclust/R/main.R index ac4ea8d..0b59832 100644 --- a/epclust/R/main.R +++ b/epclust/R/main.R @@ -34,6 +34,7 @@ #' "LIMIT ", n, " ORDER BY date", sep="")) #' return (df) #' } +#' #####TODO: if DB, array rank --> ID at first retrieval, when computing coeffs; so:: NO use of IDs ! #' #TODO: 3 examples, data.frame / binary file / DB sqLite #' + sampleCurves : wavBootstrap de package wmtsa #' cl = epclust(getData, K1=200, K2=15, ntasks=1000, nb_series_per_chunk=5000, WER="mix") @@ -98,7 +99,6 @@ epclust = function(series,K1,K2,ntasks=1,nb_series_per_chunk=50*K1,min_series_pe nb_curves = nb_curves + nrow(coeffs_chunk) } getCoefs = function(indices) getDataInFile(indices, coefs_file) -######TODO: if DB, array rank --> ID at first retrieval, when computing coeffs; so:: NO use of IDs ! if (nb_curves < min_series_per_chunk) stop("Not enough data: less rows than min_series_per_chunk!") @@ -112,17 +112,36 @@ epclust = function(series,K1,K2,ntasks=1,nb_series_per_chunk=50*K1,min_series_pe upper_bound = ifelse( i<ntasks, min(nb_series_per_task*i,nb_curves), nb_curves ) indices[((i-1)*nb_series_per_task+1):upper_bound] }) - cl_tasks = parallel::makeCluster(ncores_tasks) - parallel::clusterExport(cl_tasks, - varlist=c("getSeries","getCoefs","K1","K2","WER","nb_series_per_chunk","ncores_clust"), - envir=environment()) + cl = parallel::makeCluster(ncores_tasks) #1000*K1 (or K2) indices (or NOTHING--> series on file) - indices = parallel::parLapply(cl_tasks, indices_tasks, clusteringTask) - parallel::stopCluster(cl_tasks) + indices = unlist( parallel::parLapply(cl, indices_tasks, function(inds) { + clusteringTask(inds, getSeries, getSeries, getCoefs, K1, K2*(WER=="mix"), + nb_series_per_chunk,ncores_clust,to_file=TRUE) + }) ) + parallel::stopCluster(cl) - #Now series must be retrieved from synchrones_file, and have no ID - getSeries = function(indices, ids) getDataInFile(indices, synchrones_file) + getSeriesForSynchrones = getSeries + synchrones_file = paste(bin_dir,"synchrones",sep="") + if (WER=="mix") + { + indices = seq_len(ntasks*K2) + #Now series must be retrieved from synchrones_file + getSeries = function(inds) getDataInFile(inds, synchrones_file) + #Coefs must be re-computed + unlink(coefs_file) + index = 1 + repeat + { + series = getSeries((index-1)+seq_len(nb_series_per_chunk)) + if (is.null(series)) + break + coeffs_chunk = curvesToCoeffs(series, wf) + serialize(coeffs_chunk, coefs_file) + index = index + nb_series_per_chunk + } + } # Run step2 on resulting indices or series (from file) - computeClusters2(indices=if (WER=="end") indices else NULL, K2, to_file=FALSE) + clusteringTask(indices, getSeries, getSeriesForSynchrones, getCoefs, K1, K2, + nb_series_per_chunk, ncores_tasks*ncores_clust, to_file=FALSE) } -- 2.44.0