- #2) process coeffs (by nb_series_per_chunk) and cluster them in parallel
- library(parallel)
- cl_tasks = parallel::makeCluster(ncores_tasks)
- #Nothing to export because each worker retrieve and put data from/on files (or DB)
- #parallel::clusterExport(cl=cl, varlist=c("nothing","to","export"), envir=environment())
- #TODO: be careful of writing to a new temp file, then flush initial one, then re-use it...
- res_tasks = parallel::parSapply(cl_tasks, 1:ntasks, function() {
- cl_clust = parallel::makeCluster(ncores_clust)
- repeat
- {
- #while there are jobs to do
- #(i.e. size of tmp "file" is greater than ntasks * nb_series_per_chunk)
- nb_workers = nb_curves %/% nb_series_per_chunk
- indices = list()
- #indices[[i]] == (start_index,number_of_elements)
- for (i in 1:nb_workers)
- indices[[i]] = c(nb_series_per_chunk*(i-1)+1, nb_series_per_chunk)
- remainder = nb_curves %% nb_series_per_chunk
- if (remainder >= min_series_per_chunk)
- {
- nb_workers = nb_workers + 1
- indices[[nb_workers]] = c(nb_curves-remainder+1, nb_curves)
- } else if (remainder > 0)
- {
- #spread the load among other workers
- #...
- }
- res_clust = parallel::parSapply(cl, indices, processChunk, K, WER=="mix")
- #C) flush tmp file (current parallel processes will write in it)
- }
- parallel:stopCluster(cl_clust)
- })
- parallel::stopCluster(cl_tasks)
-
- #3) readTmp last results, apply PAM on it, and return medoids + identifiers
- final_coeffs = readTmp(1, nb_series_per_chunk)
- if (nrow(final_coeffs) == K)
- {
- return ( list( medoids=coeffsToCurves(final_coeffs[,2:ncol(final_coeffs)]),
- ids=final_coeffs[,1] ) )
- }
- pam_output = getClusters(as.matrix(final_coeffs[,2:ncol(final_coeffs)]), K)
- medoids = coeffsToCurves(pam_output$medoids, wf)
- ids = final_coeffs[,1] [pam_output$ranks]
-
- #4) apply stage 2 (in parallel ? inside task 2) ?)
- if (WER == "end")