set final draft for package
[epclust.git] / epclust / R / clustering.R
similarity index 53%
rename from epclust/R/stage2.R
rename to epclust/R/clustering.R
index 3ccbbad..e27ea35 100644 (file)
@@ -1,8 +1,66 @@
-library("Rwave")
+oneIteration = function(..........)
+{
+               cl_clust = parallel::makeCluster(ncores_clust)
+               parallel::clusterExport(cl_clust, .............., envir=........)
+               indices_clust = indices_task[[i]]
+               repeat
+               {
+                       nb_workers = max( 1, round( length(indices_clust) / nb_series_per_chunk ) )
+                       indices_workers = list()
+                       #indices[[i]] == (start_index,number_of_elements)
+                       for (i in 1:nb_workers)
+                       {
+                               upper_bound = ifelse( i<nb_workers,
+                                       min(nb_series_per_chunk*i,length(indices_clust)), length(indices_clust) )
+                               indices_workers[[i]] = indices_clust[(nb_series_per_chunk*(i-1)+1):upper_bound]
+                       }
+                       indices_clust = parallel::parSapply(cl, indices_workers, processChunk, K1, K2*(WER=="mix"))
+                       if ( (WER=="end" && length(indices_clust) == K1) ||
+                               (WER=="mix" && length(indices_clust) == K2) )
+                       {
+                               break
+                       }
+               }
+               parallel::stopCluster(cl_clust)
+               res_clust
+}
+
+processChunk = function(indices, K1, K2)
+{
+       #1) retrieve data (coeffs)
+       coeffs = getCoeffs(indices)
+       #2) cluster
+       cl = computeClusters(as.matrix(coeffs[,2:ncol(coeffs)]), K1)
+       #3) WER (optional)
+       if (K2 > 0)
+       {
+               curves = computeSynchrones(cl)
+               dists = computeWerDists(curves)
+               cl = computeClusters(dists, K2)
+       }
+       cl
+}
+
+computeClusters = function(data, K)
+{
+       library(cluster)
+       pam_output = cluster::pam(data, K)
+       return ( list( clusts=pam_output$clustering, medoids=pam_output$medoids,
+               ranks=pam_output$id.med ) )
+}
+
+#TODO: appendCoeffs() en C --> serialize et append to file
+
+computeSynchrones = function(...)
+{
+
+}
 
 #Entrée : courbes synchrones, soit après étape 1 itérée, soit après chaqure étape 1
-step2 = function(conso)
+computeWerDist = function(conso)
 {
+       if (!require("Rwave", quietly=TRUE))
+               stop("Unable to load Rwave library")
        n <- nrow(conso)
        delta <- ncol(conso)
        #TODO: automatic tune of all these parameters ? (for other users)