- #2) process coeffs (by nb_series_per_chunk) and cluster them in parallel
- library(parallel)
- ncores = ifelse(is.integer(ncores), ncores, parallel::detectCores()%/%4)
- cl = parallel::makeCluster(ncores)
- parallel::clusterExport(cl=cl, varlist=c("TODO:", "what", "to", "export?"), envir=environment())
- #TODO: be careful of writing to a new temp file, then flush initial one, then re-use it...
- repeat
- {
- #while there is jobs to do (i.e. size of tmp "file" is greater than nb_series_per_chunk)
- nb_workers = nb_curves %/% nb_series_per_chunk
- indices = list()
- #indices[[i]] == (start_index,number_of_elements)
- for (i in 1:nb_workers)
- indices[[i]] = c(nb_series_per_chunk*(i-1)+1, nb_series_per_chunk)
- remainder = nb_curves %% nb_series_per_chunk
- if (remainder >= min_series_per_chunk)
- {
- nb_workers = nb_workers + 1
- indices[[nb_workers]] = c(nb_curves-remainder+1, nb_curves)
- } else if (remainder > 0)
- {
- #spread the load among other workers
- #...
- }
- li = parallel::parLapply(cl, indices, processChunk, K, WER=="mix")
- #C) flush tmp file (current parallel processes will write in it)
- }
- parallel::stopCluster(cl)
+ # Cluster coefficients in parallel (by nb_series_per_chunk)
+ indices = if (random) sample(nb_curves) else seq_len(nb_curves)
+ indices_tasks = lapply(seq_len(ntasks), function(i) {
+ upper_bound = ifelse( i<ntasks, min(nb_series_per_task*i,nb_curves), nb_curves )
+ indices[((i-1)*nb_series_per_task+1):upper_bound]
+ })
+ cl_tasks = parallel::makeCluster(ncores_tasks)
+ parallel::clusterExport(cl_tasks,
+ varlist=c("getSeries","getCoefs","K1","K2","WER","nb_series_per_chunk","ncores_clust"),
+ envir=environment())
+ #1000*K1 (or K2) indices (or NOTHING--> series on file)
+ indices = parallel::parLapply(cl_tasks, indices_tasks, clusteringTask)
+ parallel::stopCluster(cl_tasks)