From: Benjamin Auder Date: Fri, 10 Mar 2017 15:43:00 +0000 (+0100) Subject: save state: wrong idea for indices repartition X-Git-Url: https://git.auder.net/js/img/%7B%7B%20asset%28%27mixstore/images/pieces/DESCRIPTION?a=commitdiff_plain;h=37c82bbafbffc19e8b47a521952bac58f189e9ea;p=epclust.git save state: wrong idea for indices repartition --- diff --git a/epclust/R/clustering.R b/epclust/R/clustering.R index 70d263e..36b4769 100644 --- a/epclust/R/clustering.R +++ b/epclust/R/clustering.R @@ -30,32 +30,20 @@ NULL #' @rdname clustering #' @export -clusteringTask1 = function( - indices, getContribs, K1, nb_per_chunk, nb_items_clust, ncores_clust=1, - verbose=FALSE, parll=TRUE) +clusteringTask1 = function(indices, getContribs, K1, nb_items_clust1, + ncores_clust=1, verbose=FALSE, parll=TRUE) { if (verbose) cat(paste("*** Clustering task 1 on ",length(indices)," lines\n", sep="")) - - - - - -##TODO: reviser le spreadIndices, tenant compte de nb_items_clust - - ##TODO: reviser / harmoniser avec getContribs qui en récupère pt'et + pt'et - !! - - - if (parll) { - cl = parallel::makeCluster(ncores_clust) + cl = parallel::makeCluster(ncores_clust, outfile = "") parallel::clusterExport(cl, varlist=c("getContribs","K1","verbose"), envir=environment()) } while (length(indices) > K1) { - indices_workers = .spreadIndices(indices, nb_series_per_chunk) + indices_workers = .spreadIndices(indices, nb_items_clust1, K1+1) indices <- if (parll) { @@ -329,20 +317,31 @@ computeWerDists = function(synchrones, nbytes,endian,ncores_clust=1,verbose=FALS } # Helper function to divide indices into balanced sets -.spreadIndices = function(indices, nb_per_chunk) +.spreadIndices = function(indices, max_per_set, min_nb_per_set = 1) { L = length(indices) - nb_workers = floor( L / nb_per_chunk ) - if (nb_workers == 0) + min_nb_workers = floor( L / max_per_set ) + rem = L %% max_per_set + if (nb_workers == 0 || (nb_workers==1 && rem==0)) { - # L < nb_series_per_chunk, simple case + # L <= max_nb_per_set, simple case indices_workers = list(indices) } else { indices_workers = lapply( seq_len(nb_workers), function(i) - indices[(nb_per_chunk*(i-1)+1):(nb_per_chunk*i)] ) - # Spread the remaining load among the workers + indices[(max_nb_per_set*(i-1)+1):(max_per_set*i)] ) + # Two cases: remainder is >= min_per_set (easy)... + if (rem >= min_nb_per_set) + indices_workers = c( indices_workers, list(tail(indices,rem)) ) + #...or < min_per_set: harder, need to remove indices from current sets to feed + # the too-small remainder. It may fail: then fallback to "slightly bigger sets" + else + { + save_indices_workers = indices_workers + small_set = tail(indices,rem) + # Try feeding small_set until it reaches min_per_set, whle keeping the others big enough + # Spread the remaining load among the workers rem = L %% nb_per_chunk while (rem > 0) { diff --git a/epclust/R/main.R b/epclust/R/main.R index 31ce390..a039d1c 100644 --- a/epclust/R/main.R +++ b/epclust/R/main.R @@ -39,16 +39,16 @@ #' } #' @param K1 Number of clusters to be found after stage 1 (K1 << N [number of series]) #' @param K2 Number of clusters to be found after stage 2 (K2 << K1) -#' @param nb_per_chunk (Maximum) number of items to retrieve in one batch, for both types of -#' retrieval: resp. series and contribution; in a vector of size 2 +#' @param nb_series_per_chunk (Maximum) number of series to retrieve in one batch #' @param algo_clust1 Clustering algorithm for stage 1. A function which takes (data, K) #' as argument where data is a matrix in columns and K the desired number of clusters, #' and outputs K medoids ranks. Default: PAM #' @param algo_clust2 Clustering algorithm for stage 2. A function which takes (dists, K) #' as argument where dists is a matrix of distances and K the desired number of clusters, #' and outputs K clusters representatives (curves). Default: k-means -#' @param nb_items_clust1 (Maximum) number of items in input of the clustering algorithm -#' for stage 1 +#' @param nb_items_clust1 (~Maximum) number of items in input of the clustering algorithm +#' for stage 1. At worst, a clustering algorithm might be called with ~2*nb_items_clust1 +#' items; but this could only happen at the last few iterations. #' @param wav_filt Wavelet transform filter; see ?wavelets::wt.filter #' @param contrib_type Type of contribution: "relative", "logit" or "absolute" (any prefix) #' @param WER "end" to apply stage 2 after stage 1 has fully iterated, or "mix" to apply @@ -84,12 +84,12 @@ #' series = do.call( cbind, lapply( 1:6, function(i) #' do.call(cbind, wmtsa::wavBootstrap(ref_series[i,], n.realization=400)) ) ) #' #dim(series) #c(2400,10001) -#' medoids_ascii = claws(series, K1=60, K2=6, nb_per_chunk=c(200,500), verbose=TRUE) +#' medoids_ascii = claws(series, K1=60, K2=6, 200, verbose=TRUE) #' #' # Same example, from CSV file #' csv_file = "/tmp/epclust_series.csv" #' write.table(series, csv_file, sep=",", row.names=FALSE, col.names=FALSE) -#' medoids_csv = claws(csv_file, K1=60, K2=6, nb_per_chunk=c(200,500)) +#' medoids_csv = claws(csv_file, K1=60, K2=6, 200) #' #' # Same example, from binary file #' bin_file <- "/tmp/epclust_series.bin" @@ -97,7 +97,7 @@ #' endian <- "little" #' binarize(csv_file, bin_file, 500, nbytes, endian) #' getSeries <- function(indices) getDataInFile(indices, bin_file, nbytes, endian) -#' medoids_bin <- claws(getSeries, K1=60, K2=6, nb_per_chunk=c(200,500)) +#' medoids_bin <- claws(getSeries, K1=60, K2=6, 200) #' unlink(csv_file) #' unlink(bin_file) #' @@ -124,7 +124,7 @@ #' df_series <- dbGetQuery(series_db, request) #' as.matrix(df_series[,"value"], nrow=serie_length) #' } -#' medoids_db = claws(getSeries, K1=60, K2=6, nb_per_chunk=c(200,500)) +#' medoids_db = claws(getSeries, K1=60, K2=6, 200)) #' dbDisconnect(series_db) #' #' # All computed medoids should be the same: @@ -134,11 +134,11 @@ #' digest::sha1(medoids_db) #' } #' @export -claws <- function(getSeries, K1, K2, nb_per_chunk, +claws <- function(getSeries, K1, K2, nb_series_per_chunk, nb_items_clust1=7*K1, algo_clust1=function(data,K) cluster::pam(data,K,diss=FALSE), algo_clust2=function(dists,K) stats::kmeans(dists,K,iter.max=50,nstart=3), - wav_filt="d8",contrib_type="absolute", + wav_filt="d8", contrib_type="absolute", WER="end", random=TRUE, ntasks=1, ncores_tasks=1, ncores_clust=4, @@ -155,12 +155,11 @@ claws <- function(getSeries, K1, K2, nb_per_chunk, } K1 <- .toInteger(K1, function(x) x>=2) K2 <- .toInteger(K2, function(x) x>=2) - if (!is.numeric(nb_per_chunk) || length(nb_per_chunk)!=2) - stop("'nb_per_chunk': numeric, size 2") - nb_per_chunk[1] <- .toInteger(nb_per_chunk[1], function(x) x>=1) - # A batch of contributions should have at least as many elements as a batch of series, - # because it always contains much less values - nb_per_chunk[2] <- max(.toInteger(nb_per_chunk[2],function(x) x>=1), nb_per_chunk[1]) + nb_series_per_chunk <- .toInteger(nb_series_per_chunk, function(x) x>=1) + # K1 (number of clusters at step 1) cannot exceed nb_series_per_chunk, because we will need + # to load K1 series in memory for clustering stage 2. + if (K1 > nb_series_per_chunk) + stop("'K1' cannot exceed 'nb_series_per_chunk'") nb_items_clust1 <- .toInteger(nb_items_clust1, function(x) x>K1) random <- .toLogical(random) tryCatch @@ -236,8 +235,8 @@ claws <- function(getSeries, K1, K2, nb_per_chunk, # under Linux. All necessary variables are passed to the workers. cl = parallel::makeCluster(ncores_tasks, outfile="") varlist = c("getSeries","getContribs","K1","K2","algo_clust1","algo_clust2", - "nb_per_chunk","nb_items_clust","ncores_clust","sep","nbytes","endian", - "verbose","parll") + "nb_series_per_chunk","nb_items_clust","ncores_clust","sep", + "nbytes","endian","verbose","parll") if (WER=="mix") varlist = c(varlist, "medoids_file") parallel::clusterExport(cl, varlist, envir = environment()) diff --git a/epclust/tests/testthat/test.clustering.R b/epclust/tests/testthat/test.clustering.R index 6c94f92..7116e73 100644 --- a/epclust/tests/testthat/test.clustering.R +++ b/epclust/tests/testthat/test.clustering.R @@ -19,15 +19,14 @@ test_that("computeClusters1&2 behave as expected", n = ndK[1] ; d = ndK[2] ; K = ndK[3] cs = n/K #cluster size Id = diag(d) - coefs = do.call(rbind, - lapply(1:K, function(i) MASS::mvrnorm(cs, c(rep(0,(i-1)),5,rep(0,d-i)), Id))) + coefs = sapply(1:K, function(i) MASS::mvrnorm(cs, c(rep(0,(i-1)),5,rep(0,d-i)), Id)) indices_medoids1 = computeClusters1(coefs, K, verbose=TRUE) indices_medoids2 = computeClusters2(dist(coefs), K, verbose=TRUE) # Get coefs assignments (to medoids) assignment1 = sapply(seq_len(n), function(i) - which.min( rowSums( sweep(coefs[indices_medoids1,],2,coefs[i,],'-')^2 ) ) ) + which.min( colSums( sweep(coefs[,indices_medoids1],1,coefs[,i],'-')^2 ) ) ) assignment2 = sapply(seq_len(n), function(i) - which.min( rowSums( sweep(coefs[indices_medoids2,],2,coefs[i,],'-')^2 ) ) ) + which.min( colSums( sweep(coefs[,indices_medoids2],1,coefs[,i],'-')^2 ) ) ) for (i in 1:K) { expect_equal(sum(assignment1==i), cs, tolerance=5) @@ -70,19 +69,19 @@ test_that("computeSynchrones behave as expected", #sum((s1-s3)^2) == 58 #sum((s2-s3)^2) == 38 s = list(s1, s2, s3) - series = matrix(nrow=n, ncol=L) + series = matrix(nrow=L, ncol=n) for (i in seq_len(n)) - series[i,] = s[[I(i,K)]] + rnorm(L,sd=0.01) + series[,i] = s[[I(i,K)]] + rnorm(L,sd=0.01) getRefSeries = function(indices) { indices = indices[indices <= n] - if (length(indices)>0) series[indices,] else NULL + if (length(indices)>0) series[,indices] else NULL } - synchrones = computeSynchrones(bigmemory::as.big.matrix(rbind(s1,s2,s3)), getRefSeries, + synchrones = computeSynchrones(bigmemory::as.big.matrix(cbind(s1,s2,s3)), getRefSeries, n, 100, verbose=TRUE, parll=FALSE) - expect_equal(dim(synchrones), c(K,L)) + expect_equal(dim(synchrones), c(L,K)) for (i in 1:K) - expect_equal(synchrones[i,], s[[i]], tolerance=0.01) + expect_equal(synchrones[,i], s[[i]], tolerance=0.01) }) # NOTE: medoids can be a big.matrix @@ -93,7 +92,7 @@ computeDistortion = function(series, medoids) if (bigmemory::is.big.matrix(medoids)) medoids = medoids[,] for (i in seq_len(n)) - distortion = distortion + min( rowSums( sweep(medoids,2,series[i,],'-')^2 ) / L ) + distortion = distortion + min( colSums( sweep(medoids,1,series[,i],'-')^2 ) / L ) distortion / n } @@ -106,23 +105,23 @@ test_that("clusteringTask1 behave as expected", s = lapply( seq_len(K1), function(i) x^(1+i/30)*cos(x+i) ) series = matrix(nrow=n, ncol=L) for (i in seq_len(n)) - series[i,] = s[[I(i,K1)]] + rnorm(L,sd=0.01) + series[,i] = s[[I(i,K1)]] + rnorm(L,sd=0.01) getSeries = function(indices) { indices = indices[indices <= n] - if (length(indices)>0) series[indices,] else NULL + if (length(indices)>0) series[,indices] else NULL } wf = "haar" ctype = "absolute" - getContribs = function(indices) curvesToContribs(series[indices,],wf,ctype) + getContribs = function(indices) curvesToContribs(series[,indices],wf,ctype) indices1 = clusteringTask1(1:n, getContribs, K1, 75, verbose=TRUE, parll=FALSE) medoids_K1 = getSeries(indices1) - expect_equal(dim(medoids_K1), c(K1,L)) + expect_equal(dim(medoids_K1), c(L,K1)) # Not easy to evaluate result: at least we expect it to be better than random selection of # medoids within initial series distorGood = computeDistortion(series, medoids_K1) for (i in 1:3) - expect_lte( distorGood, computeDistortion(series,series[sample(1:n, K1),]) ) + expect_lte( distorGood, computeDistortion(series,series[,sample(1:n, K1)]) ) }) test_that("clusteringTask2 behave as expected", @@ -139,19 +138,18 @@ test_that("clusteringTask2 behave as expected", series[i,] = s[[I(i,K1)]] + rnorm(L,sd=0.01) getRefSeries = function(indices) { indices = indices[indices <= n] - if (length(indices)>0) series[indices,] else NULL + if (length(indices)>0) series[,indices] else NULL } # Artificially simulate 60 medoids - perfect situation, all equal to one of the refs - medoids_K1 = bigmemory::as.big.matrix( - do.call(rbind, lapply( 1:K1, function(i) s[[I(i,K1)]] ) ) ) + medoids_K1 = bigmemory::as.big.matrix( sapply( 1:K1, function(i) s[[I(i,K1)]] ) ) medoids_K2 = clusteringTask2(medoids_K1, K2, getRefSeries, n, 75, verbose=TRUE, parll=FALSE) - expect_equal(dim(medoids_K2), c(K2,L)) + expect_equal(dim(medoids_K2), c(L,K2)) # Not easy to evaluate result: at least we expect it to be better than random selection of # medoids within 1...K1 (among references) distorGood = computeDistortion(series, medoids_K2) for (i in 1:3) - expect_lte( distorGood, computeDistortion(series,medoids_K1[sample(1:K1, K2),]) ) + expect_lte( distorGood, computeDistortion(series,medoids_K1[,sample(1:K1, K2)]) ) }) #NOTE: rather redundant test diff --git a/epclust/tests/testthat/test.de_serialize.R b/epclust/tests/testthat/test.de_serialize.R index 8403e6d..25f2c2b 100644 --- a/epclust/tests/testthat/test.de_serialize.R +++ b/epclust/tests/testthat/test.de_serialize.R @@ -2,11 +2,11 @@ context("de_serialize") test_that("serialization + getDataInFile retrieve original data / from matrix", { - data_bin_file = "/tmp/epclust_test_m.bin" + data_bin_file = ".epclust_test_m.bin" unlink(data_bin_file) - #dataset 200 lignes / 30 columns - data_ascii = matrix(runif(200*30,-10,10),ncol=30) + #dataset 200 cols / 30 rows + data_ascii = matrix(runif(200*30,-10,10),nrow=30) nbytes = 4 #lead to a precision of 1e-7 / 1e-8 endian = "little" @@ -16,7 +16,7 @@ test_that("serialization + getDataInFile retrieve original data / from matrix", for (indices in list(c(1,3,5), 3:13, c(5,20,50), c(75,130:135), 196:200)) { data_lines = getDataInFile(indices, data_bin_file, nbytes, endian) - expect_equal(data_lines, data_ascii[indices,], tolerance=1e-6) + expect_equal(data_lines, data_ascii[,indices], tolerance=1e-6) } unlink(data_bin_file) @@ -27,44 +27,44 @@ test_that("serialization + getDataInFile retrieve original data / from matrix", for (indices in list(c(1,3,5), 3:13, c(5,20,50), c(75,130:135), 196:200)) { data_lines = getDataInFile(indices, data_bin_file, nbytes, endian) - expect_equal(data_lines, data_ascii[indices,], tolerance=1e-6) + expect_equal(data_lines, data_ascii[,indices], tolerance=1e-6) } unlink(data_bin_file) }) test_that("serialization + transform + getDataInFile retrieve original transforms", { - data_bin_file = "/tmp/epclust_test_t.bin" + data_bin_file = ".epclust_test_t.bin" unlink(data_bin_file) - #dataset 200 lignes / 30 columns - data_ascii = matrix(runif(200*30,-10,10),ncol=30) + #dataset 200 cols / 30 rows + data_ascii = matrix(runif(200*30,-10,10),nrow=30) nbytes = 8 endian = "little" binarize(data_ascii, data_bin_file, 500, ",", nbytes, endian) # Serialize transformation (just compute range) into a new binary file - trans_bin_file = "/tmp/epclust_test_t_trans.bin" + trans_bin_file = ".epclust_test_t_trans.bin" unlink(trans_bin_file) getSeries = function(inds) getDataInFile(inds, data_bin_file, nbytes, endian) - binarizeTransform(getSeries, function(series) t(apply(series, 1, range)), + binarizeTransform(getSeries, function(series) apply(series, 2, range), trans_bin_file, 250, nbytes, endian) unlink(data_bin_file) expect_equal(file.info(trans_bin_file)$size, 2*nrow(data_ascii)*nbytes+8) for (indices in list(c(1,3,5), 3:13, c(5,20,50), c(75,130:135), 196:200)) { - trans_lines = getDataInFile(indices, trans_bin_file, nbytes, endian) - expect_equal(trans_lines, t(apply(data_ascii[indices,],1,range)), tolerance=1e-6) + trans_cols = getDataInFile(indices, trans_bin_file, nbytes, endian) + expect_equal(trans_cols, apply(data_ascii[indices,],2,range), tolerance=1e-6) } unlink(trans_bin_file) }) test_that("serialization + getDataInFile retrieve original data / from connection", { - data_bin_file = "/tmp/epclust_test_c.bin" + data_bin_file = ".epclust_test_c.bin" unlink(data_bin_file) - #dataset 300 lignes / 50 columns + #dataset 300 cols / 50 rows data_csv = system.file("testdata","de_serialize.csv",package="epclust") nbytes = 8 endian = "big" @@ -76,8 +76,8 @@ test_that("serialization + getDataInFile retrieve original data / from connectio for (indices in list(c(1,3,5), 3:13, c(5,20,50), c(75,130:135), 196:200)) { #HACK: as.matrix(as.data.frame( )) required to match (ref) data structure - data_lines = as.matrix(as.data.frame( getDataInFile(indices,data_bin_file,nbytes,endian) )) - expect_equal(data_lines, data_ascii[indices,]) + data_cols = as.matrix(as.data.frame( getDataInFile(indices,data_bin_file,nbytes,endian) )) + expect_equal(data_cols, data_ascii[,indices]) } unlink(data_bin_file) @@ -87,8 +87,8 @@ test_that("serialization + getDataInFile retrieve original data / from connectio expect_equal(file.info(data_bin_file)$size, 300*50*8+8) for (indices in list(c(1,3,5), 3:13, c(5,20,50), c(75,130:135), 196:200)) { - data_lines = as.matrix(as.data.frame( getDataInFile(indices,data_bin_file,nbytes,endian) )) - expect_equal(data_lines, data_ascii[indices,]) + data_cols = as.matrix(as.data.frame( getDataInFile(indices,data_bin_file,nbytes,endian) )) + expect_equal(data_cols, data_ascii[,indices]) } unlink(data_bin_file) #close(data_con) --> done in binarize() diff --git a/epclust/tests/testthat/test.Filter.R b/epclust/tests/testthat/test.filter.R similarity index 100% rename from epclust/tests/testthat/test.Filter.R rename to epclust/tests/testthat/test.filter.R