X-Git-Url: https://git.auder.net/?a=blobdiff_plain;f=epclust%2FR%2Fclustering.R;h=36b476987452e84e2978e2093957d2f9d25c25e0;hb=37c82bbafbffc19e8b47a521952bac58f189e9ea;hp=14915abf861bace1b6d4bd4f9f68283c004bfff9;hpb=eef6f6c97277ea3ce760981e5244cbde7fc904a0;p=epclust.git diff --git a/epclust/R/clustering.R b/epclust/R/clustering.R index 14915ab..36b4769 100644 --- a/epclust/R/clustering.R +++ b/epclust/R/clustering.R @@ -11,8 +11,8 @@ #' and then WER distances computations, before applying the clustering algorithm. #' \code{computeClusters1()} and \code{computeClusters2()} correspond to the atomic #' clustering procedures respectively for stage 1 and 2. The former applies the -#' clustering algorithm (PAM) on a contributions matrix, while the latter clusters -#' a chunk of series inside one task (~max nb_series_per_chunk) +#' first clustering algorithm on a contributions matrix, while the latter clusters +#' a set of series inside one task (~nb_items_clust) #' #' @param indices Range of series indices to cluster in parallel (initial data) #' @param getContribs Function to retrieve contributions from initial series indices: @@ -30,20 +30,20 @@ NULL #' @rdname clustering #' @export -clusteringTask1 = function( - indices, getContribs, K1, nb_items_per_chunk, ncores_clust=1, verbose=FALSE, parll=TRUE) +clusteringTask1 = function(indices, getContribs, K1, nb_items_clust1, + ncores_clust=1, verbose=FALSE, parll=TRUE) { if (verbose) cat(paste("*** Clustering task 1 on ",length(indices)," lines\n", sep="")) if (parll) { - cl = parallel::makeCluster(ncores_clust) + cl = parallel::makeCluster(ncores_clust, outfile = "") parallel::clusterExport(cl, varlist=c("getContribs","K1","verbose"), envir=environment()) } while (length(indices) > K1) { - indices_workers = .spreadIndices(indices, nb_series_per_chunk) + indices_workers = .spreadIndices(indices, nb_items_clust1, K1+1) indices <- if (parll) { @@ -317,20 +317,31 @@ computeWerDists = function(synchrones, nbytes,endian,ncores_clust=1,verbose=FALS } # Helper function to divide indices into balanced sets -.spreadIndices = function(indices, nb_per_chunk) +.spreadIndices = function(indices, max_per_set, min_nb_per_set = 1) { L = length(indices) - nb_workers = floor( L / nb_per_chunk ) - if (nb_workers == 0) + min_nb_workers = floor( L / max_per_set ) + rem = L %% max_per_set + if (nb_workers == 0 || (nb_workers==1 && rem==0)) { - # L < nb_series_per_chunk, simple case + # L <= max_nb_per_set, simple case indices_workers = list(indices) } else { indices_workers = lapply( seq_len(nb_workers), function(i) - indices[(nb_per_chunk*(i-1)+1):(nb_per_chunk*i)] ) - # Spread the remaining load among the workers + indices[(max_nb_per_set*(i-1)+1):(max_per_set*i)] ) + # Two cases: remainder is >= min_per_set (easy)... + if (rem >= min_nb_per_set) + indices_workers = c( indices_workers, list(tail(indices,rem)) ) + #...or < min_per_set: harder, need to remove indices from current sets to feed + # the too-small remainder. It may fail: then fallback to "slightly bigger sets" + else + { + save_indices_workers = indices_workers + small_set = tail(indices,rem) + # Try feeding small_set until it reaches min_per_set, whle keeping the others big enough + # Spread the remaining load among the workers rem = L %% nb_per_chunk while (rem > 0) {