progress in main.R
[epclust.git] / code / draft_R_pkg / R / main.R
index 19729ed..6dca708 100644 (file)
@@ -60,19 +60,38 @@ epclust = function(data, K, nbSeriesPerChunk, writeTmp=ref_writeTmp, readTmp=ref
                if (is.data.frame(data))
                {
                        #full data matrix
-                       error = writeTmp( getCoeffs( data[index:(min(index+nbSeriesPerChunk-1,nrow(data))),] ) )
+                       if (index < nrow(data))
+                       {
+                               writeTmp( getCoeffs( data[index:(min(index+nbSeriesPerChunk-1,nrow(data))),] ) )
+                       } else
+                       {
+                               break
+                       }
                } else if (is.function(data))
                {
                        #custom user function to retrieve next n curves, probably to read from DB
-                       error = writeTmp( getCoeffs( data(index, nbSeriesPerChunk) ) )
+                       coeffs_chunk = getCoeffs( data(index, nbSeriesPerChunk) )
+                       if (!is.null(coeffs_chunk))
+                       {
+                               writeTmp(coeffs_chunk)
+                       } else
+                       {
+                               break
+                       }
                } else
                {
                        #incremental connection
                        #TODO: find a better way to parse than using a temp file
                        ascii_lines = readLines(dataCon, nbSeriesPerChunk)
-                       seriesChunkFile = ".tmp/seriesChunk"
-                       writeLines(ascii_lines, seriesChunkFile)
-                       error = writeTmp( getCoeffs( read.csv(seriesChunkFile) ) )
+                       if (length(ascii_lines > 0))
+                       {
+                               seriesChunkFile = ".tmp/seriesChunk"
+                               writeLines(ascii_lines, seriesChunkFile)
+                               writeTmp( getCoeffs( read.csv(seriesChunkFile) ) )
+                       } else
+                       {
+                               break
+                       }
                }
                index = index + nbSeriesPerChunk
        }
@@ -84,7 +103,7 @@ epclust = function(data, K, nbSeriesPerChunk, writeTmp=ref_writeTmp, readTmp=ref
        cl = parallel::makeCluster(ncores)
        parallel::clusterExport(cl=cl, varlist=c("X", "Y", "K", "p"), envir=environment())
        library(cluster)
-       li = parallel::parLapply(cl, 1:B, getParamsAtIndex)
+       li = parallel::parLapply(cl, 1:B, )
 
        #2) process coeffs (by nbSeriesPerChunk) and cluster them in parallel
        #TODO: be careful of writing to a new temp file, then flush initial one, then re-use it...
@@ -97,7 +116,7 @@ epclust = function(data, K, nbSeriesPerChunk, writeTmp=ref_writeTmp, readTmp=ref
                #C) flush tmp file (current parallel processes will write in it)
                #always check "complete" flag (array, as I did in MPI) to know if "slaves" finished
        }
-pam(x, k
+pam(x, k)
        parallel::stopCluster(cl)
 
        #3) readTmp last results, apply PAM on it, and return medoids + identifiers
@@ -108,8 +127,3 @@ pam(x, k
                #from center curves, apply stage 2...
        }
 }
-
-getCoeffs = function(series)
-{
-       #... return wavelets coeffs : compute in parallel !
-}