- cl_clust = parallel::makeCluster(ncores_clust)
- parallel::clusterExport(cl_clust, .............., envir=........)
- indices_clust = indices_task[[i]]
- repeat
- {
- nb_workers = max( 1, round( length(indices_clust) / nb_series_per_chunk ) )
- indices_workers = list()
- #indices[[i]] == (start_index,number_of_elements)
- for (i in 1:nb_workers)
- {
- upper_bound = ifelse( i<nb_workers,
- min(nb_series_per_chunk*i,length(indices_clust)), length(indices_clust) )
- indices_workers[[i]] = indices_clust[(nb_series_per_chunk*(i-1)+1):upper_bound]
- }
- indices_clust = parallel::parSapply(cl, indices_workers, processChunk, K1, K2*(WER=="mix"))
- if ( (WER=="end" && length(indices_clust) == K1) ||
- (WER=="mix" && length(indices_clust) == K2) )
- {
- break
- }
- }
- parallel::stopCluster(cl_clust)
- res_clust
+ cl = parallel::makeCluster(ncores)
+ parallel::clusterExport(cl,
+ varlist=c("K1","getCoefs"),
+ envir=environment())
+ repeat
+ {
+ nb_workers = max( 1, round( length(indices_clust) / nb_series_per_chunk ) )
+ indices_workers = lapply(seq_len(nb_workers), function(i) {
+ upper_bound = ifelse( i<nb_workers,
+ min(nb_series_per_chunk*i,length(indices_clust)), length(indices_clust) )
+ indices_clust[(nb_series_per_chunk*(i-1)+1):upper_bound]
+ })
+ indices_clust = unlist( parallel::parLapply(cl, indices_workers, function(indices)
+ computeClusters1(indices, getCoefs, K1)) )
+ if (length(indices_clust) == K1)
+ break
+ }
+ parallel::stopCluster(cl_clust)
+ if (WER == "end")
+ return (indices_clust)
+ #WER=="mix"
+ computeClusters2(indices_clust, K2, getSeries, to_file=TRUE)