- #2) process coeffs (by nb_series_per_chunk) and cluster them in parallel
- library(parallel)
- ncores = ifelse(is.integer(ncores), ncores, parallel::detectCores()%/%4)
- cl = parallel::makeCluster(ncores)
- parallel::clusterExport(cl=cl, varlist=c("TODO:", "what", "to", "export?"), envir=environment())
- #TODO: be careful of writing to a new temp file, then flush initial one, then re-use it...
- repeat
- {
- #while there is jobs to do (i.e. size of tmp "file" is greater than nb_series_per_chunk)
- nb_workers = nb_curves %/% nb_series_per_chunk
- indices = list()
- #indices[[i]] == (start_index,number_of_elements)
- for (i in 1:nb_workers)
- indices[[i]] = c(nb_series_per_chunk*(i-1)+1, nb_series_per_chunk)
- remainder = nb_curves %% nb_series_per_chunk
- if (remainder >= min_series_per_chunk)
- {
- nb_workers = nb_workers + 1
- indices[[nb_workers]] = c(nb_curves-remainder+1, nb_curves)
- } else if (remainder > 0)
- {
- #spread the load among other workers
- #...
- }
- li = parallel::parLapply(cl, indices, processChunk, K, WER=="mix")
- #C) flush tmp file (current parallel processes will write in it)
- }
- parallel::stopCluster(cl)