X-Git-Url: https://git.auder.net/?p=epclust.git;a=blobdiff_plain;f=epclust%2FR%2Fclustering.R;h=36b476987452e84e2978e2093957d2f9d25c25e0;hp=70d263e951f68b1cdc742caf15cf46f4aaf0fe83;hb=37c82bbafbffc19e8b47a521952bac58f189e9ea;hpb=bccecb19b4faa20808dab761d30f2d2dd4dcf587 diff --git a/epclust/R/clustering.R b/epclust/R/clustering.R index 70d263e..36b4769 100644 --- a/epclust/R/clustering.R +++ b/epclust/R/clustering.R @@ -30,32 +30,20 @@ NULL #' @rdname clustering #' @export -clusteringTask1 = function( - indices, getContribs, K1, nb_per_chunk, nb_items_clust, ncores_clust=1, - verbose=FALSE, parll=TRUE) +clusteringTask1 = function(indices, getContribs, K1, nb_items_clust1, + ncores_clust=1, verbose=FALSE, parll=TRUE) { if (verbose) cat(paste("*** Clustering task 1 on ",length(indices)," lines\n", sep="")) - - - - - -##TODO: reviser le spreadIndices, tenant compte de nb_items_clust - - ##TODO: reviser / harmoniser avec getContribs qui en récupère pt'et + pt'et - !! - - - if (parll) { - cl = parallel::makeCluster(ncores_clust) + cl = parallel::makeCluster(ncores_clust, outfile = "") parallel::clusterExport(cl, varlist=c("getContribs","K1","verbose"), envir=environment()) } while (length(indices) > K1) { - indices_workers = .spreadIndices(indices, nb_series_per_chunk) + indices_workers = .spreadIndices(indices, nb_items_clust1, K1+1) indices <- if (parll) { @@ -329,20 +317,31 @@ computeWerDists = function(synchrones, nbytes,endian,ncores_clust=1,verbose=FALS } # Helper function to divide indices into balanced sets -.spreadIndices = function(indices, nb_per_chunk) +.spreadIndices = function(indices, max_per_set, min_nb_per_set = 1) { L = length(indices) - nb_workers = floor( L / nb_per_chunk ) - if (nb_workers == 0) + min_nb_workers = floor( L / max_per_set ) + rem = L %% max_per_set + if (nb_workers == 0 || (nb_workers==1 && rem==0)) { - # L < nb_series_per_chunk, simple case + # L <= max_nb_per_set, simple case indices_workers = list(indices) } else { indices_workers = lapply( seq_len(nb_workers), function(i) - indices[(nb_per_chunk*(i-1)+1):(nb_per_chunk*i)] ) - # Spread the remaining load among the workers + indices[(max_nb_per_set*(i-1)+1):(max_per_set*i)] ) + # Two cases: remainder is >= min_per_set (easy)... + if (rem >= min_nb_per_set) + indices_workers = c( indices_workers, list(tail(indices,rem)) ) + #...or < min_per_set: harder, need to remove indices from current sets to feed + # the too-small remainder. It may fail: then fallback to "slightly bigger sets" + else + { + save_indices_workers = indices_workers + small_set = tail(indices,rem) + # Try feeding small_set until it reaches min_per_set, whle keeping the others big enough + # Spread the remaining load among the workers rem = L %% nb_per_chunk while (rem > 0) {