add comments, fix some things. TODO: comment tests, finish computeWerDists, test it
authorBenjamin Auder <benjamin.auder@somewhere>
Sat, 11 Mar 2017 15:57:48 +0000 (16:57 +0100)
committerBenjamin Auder <benjamin.auder@somewhere>
Sat, 11 Mar 2017 15:57:48 +0000 (16:57 +0100)
epclust/R/clustering.R
epclust/R/de_serialize.R
epclust/R/main.R
epclust/src/computeMedoidsIndices.cpp
epclust/src/filter.cpp
epclust/tests/testthat/helper.clustering.R
epclust/tests/testthat/helper.computeMedoidsIndices.R
epclust/tests/testthat/test.clustering.R
epclust/tests/testthat/test.wavelets.R

index a431ba8..2ce4267 100644 (file)
@@ -33,10 +33,12 @@ clusteringTask1 = function(indices, getContribs, K1, algoClust1, nb_items_clust1
        if (parll)
        {
                cl = parallel::makeCluster(ncores_clust, outfile = "")
-               parallel::clusterExport(cl, varlist=c("getContribs","K1","verbose"), envir=environment())
+               parallel::clusterExport(cl, c("getContribs","K1","verbose"), envir=environment())
        }
+       # Iterate clustering algorithm 1 until K1 medoids are found
        while (length(indices) > K1)
        {
+               # Balance tasks by splitting the indices set - as evenly as possible
                indices_workers = .spreadIndices(indices, nb_items_clust1)
                if (verbose)
                        cat(paste("*** [iterated] Clustering task 1 on ",length(indices)," series\n", sep=""))
@@ -64,16 +66,23 @@ clusteringTask1 = function(indices, getContribs, K1, algoClust1, nb_items_clust1
 #' @rdname clustering
 #' @export
 clusteringTask2 = function(medoids, K2, algoClust2, getRefSeries, nb_ref_curves,
-       nb_series_per_chunk, sync_mean, nbytes,endian,ncores_clust=1,verbose=FALSE,parll=TRUE)
+       nb_series_per_chunk, nbytes,endian,ncores_clust=1,verbose=FALSE,parll=TRUE)
 {
        if (verbose)
                cat(paste("*** Clustering task 2 on ",ncol(medoids)," synchrones\n", sep=""))
 
        if (ncol(medoids) <= K2)
                return (medoids)
+
+       # A) Obtain synchrones, that is to say the cumulated power consumptions
+       #    for each of the K1 initial groups
        synchrones = computeSynchrones(medoids, getRefSeries, nb_ref_curves,
-               nb_series_per_chunk, sync_mean, ncores_clust, verbose, parll)
+               nb_series_per_chunk, ncores_clust, verbose, parll)
+
+       # B) Compute the WER distances (Wavelets Extended coefficient of deteRmination)
        distances = computeWerDists(synchrones, nbytes, endian, ncores_clust, verbose, parll)
+
+       # C) Apply clustering algorithm 2 on the WER distances matrix
        if (verbose)
                cat(paste("   algoClust2() on ",nrow(distances)," items\n", sep=""))
        medoids[ ,algoClust2(distances,K2) ]
@@ -82,7 +91,7 @@ clusteringTask2 = function(medoids, K2, algoClust2, getRefSeries, nb_ref_curves,
 #' computeSynchrones
 #'
 #' Compute the synchrones curves (sum of clusters elements) from a matrix of medoids,
-#' using L2 distances.
+#' using euclidian distance.
 #'
 #' @param medoids big.matrix of medoids (curves of same length as initial series)
 #' @param getRefSeries Function to retrieve initial series (e.g. in stage 2 after series
@@ -94,8 +103,9 @@ clusteringTask2 = function(medoids, K2, algoClust2, getRefSeries, nb_ref_curves,
 #'
 #' @export
 computeSynchrones = function(medoids, getRefSeries, nb_ref_curves,
-       nb_series_per_chunk, sync_mean, ncores_clust=1,verbose=FALSE,parll=TRUE)
+       nb_series_per_chunk, ncores_clust=1,verbose=FALSE,parll=TRUE)
 {
+       # Synchrones computation is embarassingly parallel: compute it by chunks of series
        computeSynchronesChunk = function(indices)
        {
                if (parll)
@@ -103,26 +113,25 @@ computeSynchrones = function(medoids, getRefSeries, nb_ref_curves,
                        require("bigmemory", quietly=TRUE)
                        requireNamespace("synchronicity", quietly=TRUE)
                        require("epclust", quietly=TRUE)
+                       # The big.matrix objects need to be attached to be usable on the workers
                        synchrones <- bigmemory::attach.big.matrix(synchrones_desc)
-                       if (sync_mean)
-                               counts <- bigmemory::attach.big.matrix(counts_desc)
                        medoids <- bigmemory::attach.big.matrix(medoids_desc)
                        m <- synchronicity::attach.mutex(m_desc)
                }
 
+               # Obtain a chunk of reference series
                ref_series = getRefSeries(indices)
                nb_series = ncol(ref_series)
 
                # Get medoids indices for this chunk of series
                mi = computeMedoidsIndices(medoids@address, ref_series)
 
+               # Update synchrones using mi above
                for (i in seq_len(nb_series))
                {
                        if (parll)
-                               synchronicity::lock(m)
+                               synchronicity::lock(m) #locking required because several writes at the same time
                        synchrones[, mi[i] ] = synchrones[, mi[i] ] + ref_series[,i]
-                       if (sync_mean)
-                               counts[ mi[i] ] = counts[ mi[i] ] + 1
                        if (parll)
                                synchronicity::unlock(m)
                }
@@ -130,34 +139,29 @@ computeSynchrones = function(medoids, getRefSeries, nb_ref_curves,
 
        K = ncol(medoids) ; L = nrow(medoids)
        # Use bigmemory (shared==TRUE by default) + synchronicity to fill synchrones in //
-       # TODO: if size > RAM (not our case), use file-backed big.matrix
        synchrones = bigmemory::big.matrix(nrow=L, ncol=K, type="double", init=0.)
-       if (sync_mean)
-               counts = bigmemory::big.matrix(nrow=K, ncol=1, type="double", init=0)
-       # synchronicity is only for Linux & MacOS; on Windows: run sequentially
+       # NOTE: synchronicity is only for Linux & MacOS; on Windows: run sequentially
        parll = (requireNamespace("synchronicity",quietly=TRUE)
                && parll && Sys.info()['sysname'] != "Windows")
        if (parll)
        {
-               m <- synchronicity::boost.mutex()
+               m <- synchronicity::boost.mutex() #for lock/unlock, see computeSynchronesChunk
+               # mutex and big.matrix objects cannot be passed directly:
+               # they will be accessed from their description
                m_desc <- synchronicity::describe(m)
                synchrones_desc = bigmemory::describe(synchrones)
-               if (sync_mean)
-                       counts_desc = bigmemory::describe(counts)
                medoids_desc = bigmemory::describe(medoids)
                cl = parallel::makeCluster(ncores_clust)
-               varlist=c("synchrones_desc","sync_mean","m_desc","medoids_desc","getRefSeries")
-               if (sync_mean)
-                       varlist = c(varlist, "counts_desc")
-               parallel::clusterExport(cl, varlist, envir=environment())
+               parallel::clusterExport(cl, envir=environment(),
+                       varlist=c("synchrones_desc","m_desc","medoids_desc","getRefSeries"))
        }
 
        if (verbose)
-       {
-               if (verbose)
-                       cat(paste("--- Compute ",K," synchrones with ",nb_ref_curves," series\n", sep=""))
-       }
-       indices_workers = .spreadIndices(seq_len(nb_ref_curves), nb_series_per_chunk)
+               cat(paste("--- Compute ",K," synchrones with ",nb_ref_curves," series\n", sep=""))
+
+       # Balance tasks by splitting the indices set - maybe not so evenly, but
+       # max==TRUE in next call ensures that no set has more than nb_series_per_chunk items.
+       indices_workers = .spreadIndices(seq_len(nb_ref_curves), nb_series_per_chunk, max=TRUE)
        ignored <-
                if (parll)
                        parallel::parLapply(cl, indices_workers, computeSynchronesChunk)
@@ -167,19 +171,7 @@ computeSynchrones = function(medoids, getRefSeries, nb_ref_curves,
        if (parll)
                parallel::stopCluster(cl)
 
-       if (!sync_mean)
-               return (synchrones)
-
-       #TODO: can we avoid this loop? ( synchrones = sweep(synchrones, 2, counts, '/') )
-       for (i in seq_len(K))
-               synchrones[,i] = synchrones[,i] / counts[i]
-       #NOTE: odds for some clusters to be empty? (when series already come from stage 2)
-       #      ...maybe; but let's hope resulting K1' be still quite bigger than K2
-       noNA_rows = sapply(seq_len(K), function(i) all(!is.nan(synchrones[,i])))
-       if (all(noNA_rows))
-               return (synchrones)
-       # Else: some clusters are empty, need to slice synchrones
-       bigmemory::as.big.matrix(synchrones[,noNA_rows])
+       return (synchrones)
 }
 
 #' computeWerDists
@@ -196,21 +188,13 @@ computeSynchrones = function(medoids, getRefSeries, nb_ref_curves,
 #' @export
 computeWerDists = function(synchrones, nbytes,endian,ncores_clust=1,verbose=FALSE,parll=TRUE)
 {
-       n <- nrow(synchrones)
-       delta <- ncol(synchrones)
+       n <- ncol(synchrones)
+       L <- nrow(synchrones)
        #TODO: automatic tune of all these parameters ? (for other users)
+       # 4 here represent 2^5 = 32 half-hours ~ 1 day
        nvoice   <- 4
        # noctave = 2^13 = 8192 half hours ~ 180 days ; ~log2(ncol(synchrones))
        noctave = 13
-       # 4 here represent 2^5 = 32 half-hours ~ 1 day
-       #NOTE: default scalevector == 2^(0:(noctave * nvoice) / nvoice) * s0 (?)
-       scalevector  <- 2^(4:(noctave * nvoice) / nvoice + 1)
-       #condition: ( log2(s0*w0/(2*pi)) - 1 ) * nvoice + 1.5 >= 1
-       s0 = 2
-       w0 = 2*pi
-       scaled=FALSE
-       s0log = as.integer( (log2( s0*w0/(2*pi) ) - 1) * nvoice + 1.5 )
-       totnoct = noctave + as.integer(s0log/nvoice) + 1
 
        Xwer_dist <- bigmemory::big.matrix(nrow=n, ncol=n, type="double")
 
@@ -228,15 +212,15 @@ computeWerDists = function(synchrones, nbytes,endian,ncores_clust=1,verbose=FALS
 
        computeSaveCWT = function(index)
        {
-               ts <- scale(ts(synchrones[index,]), center=TRUE, scale=scaled)
-               totts.cwt = Rwave::cwt(ts, totnoct, nvoice, w0, plot=FALSE)
+               ts <- scale(ts(synchrones[,index]), center=TRUE, scale=FALSE)
+               totts.cwt = Rwave::cwt(ts, totnoct, nvoice, w0=2*pi, twoD=TRUE, plot=FALSE)
                ts.cwt = totts.cwt[,s0log:(s0log+noctave*nvoice)]
                #Normalization
                sqs <- sqrt(2^(0:(noctave*nvoice)/nvoice)*s0)
                sqres <- sweep(ts.cwt,2,sqs,'*')
                res <- sqres / max(Mod(sqres))
                #TODO: serializer les CWT, les récupérer via getDataInFile ;
-               #--> OK, faut juste stocker comme séries simples de taille delta*ncol (53*17519)
+               #--> OK, faut juste stocker comme séries simples de taille L*n' (53*17519)
                binarize(c(as.double(Re(res)),as.double(Im(res))), cwt_file, ncol(res), ",", nbytes, endian)
        }
 
@@ -245,8 +229,9 @@ computeWerDists = function(synchrones, nbytes,endian,ncores_clust=1,verbose=FALS
                cl = parallel::makeCluster(ncores_clust)
                synchrones_desc <- bigmemory::describe(synchrones)
                Xwer_dist_desc <- bigmemory::describe(Xwer_dist)
-               parallel::clusterExport(cl, varlist=c("synchrones_desc","Xwer_dist_desc","totnoct",
-                       "nvoice","w0","s0log","noctave","s0","verbose","getCWT"), envir=environment())
+               parallel::clusterExport(cl, envir=environment(),
+                       varlist=c("synchrones_desc","Xwer_dist_desc","totnoct","nvoice","w0","s0log",
+                               "noctave","s0","verbose","getCWT"))
        }
        
        if (verbose)
@@ -289,7 +274,7 @@ computeWerDists = function(synchrones, nbytes,endian,ncores_clust=1,verbose=FALS
                WX  <- epclustFilter(Mod(cwt_i * Conj(cwt_i)))
                WY <- epclustFilter(Mod(cwt_j * Conj(cwt_j)))
                wer2 <- sum(colSums(num)^2) / sum(colSums(WX) * colSums(WY))
-               Xwer_dist[i,j] <- sqrt(delta * ncol(cwt_i) * max(1 - wer2, 0.)) #FIXME: wer2 should be < 1
+               Xwer_dist[i,j] <- sqrt(L * ncol(cwt_i) * max(1 - wer2, 0.))
                Xwer_dist[j,i] <- Xwer_dist[i,j]
                Xwer_dist[i,i] = 0.
        }
@@ -314,7 +299,8 @@ computeWerDists = function(synchrones, nbytes,endian,ncores_clust=1,verbose=FALS
 }
 
 # Helper function to divide indices into balanced sets
-.spreadIndices = function(indices, nb_per_set)
+# If max == TRUE, sets sizes cannot exceed nb_per_set
+.spreadIndices = function(indices, nb_per_set, max=FALSE)
 {
        L = length(indices)
        nb_workers = floor( L / nb_per_set )
@@ -328,6 +314,13 @@ computeWerDists = function(synchrones, nbytes,endian,ncores_clust=1,verbose=FALS
        {
                indices_workers = lapply( seq_len(nb_workers), function(i)
                        indices[(nb_per_set*(i-1)+1):(nb_per_set*i)] )
+
+               if (max)
+               {
+                       # Sets are not so well balanced, but size is supposed to be critical
+                       return ( c( indices_workers, (L-rem+1):L ) )
+               }
+
                # Spread the remaining load among the workers
                rem = L %% nb_per_set
                while (rem > 0)
index fd3a5db..5a9dd1f 100644 (file)
@@ -14,8 +14,8 @@
 #' @param data_ascii Either a matrix or CSV file, with items in rows
 #' @param indices Indices of the lines to retrieve
 #' @param data_bin_file Name of binary file on output (\code{binarize})
-#'   or intput (\code{getDataInFile})
-#' @param nb_per_chunk Number of lines to process in one batch
+#'   or input (\code{getDataInFile})
+#' @param nb_per_chunk Number of lines to process in one batch (big.matrix or connection)
 #' @inheritParams claws
 #' @param getData Function to retrieve data chunks
 #' @param transform Transformation function to apply on data chunks
@@ -30,24 +30,29 @@ NULL
 binarize = function(data_ascii, data_bin_file, nb_per_chunk,
        sep=",", nbytes=4, endian=.Platform$endian)
 {
+       # data_ascii can be of two types: [big.]matrix, or connection
        if (is.character(data_ascii))
                data_ascii = file(data_ascii, open="r")
        else if (methods::is(data_ascii,"connection") && !isOpen(data_ascii))
                open(data_ascii)
        is_matrix = !methods::is(data_ascii,"connection")
 
+       # At first call, the length of a stored row is written. So it's important to determine
+       # if the serialization process already started.
        first_write = (!file.exists(data_bin_file) || file.info(data_bin_file)$size == 0)
+
+       # Open the binary file for writing (or 'append' if already exists)
        data_bin = file(data_bin_file, open=ifelse(first_write,"wb","ab"))
 
-       #write data length on first call
        if (first_write)
        {
-               #number of items always on 8 bytes
+               # Write data length on first call: number of items always on 8 bytes
                writeBin(0L, data_bin, size=8, endian=endian)
                if (is_matrix)
                        data_length = nrow(data_ascii)
                else #connection
                {
+                       # Read the first line to know data length, and write it then
                        data_line = scan(data_ascii, double(), sep=sep, nlines=1, quiet=TRUE)
                        writeBin(data_line, data_bin, size=nbytes, endian=endian)
                        data_length = length(data_line)
@@ -55,7 +60,11 @@ binarize = function(data_ascii, data_bin_file, nb_per_chunk,
        }
 
        if (is_matrix)
+       {
+               # Data is processed by chunks; although this may not be so useful for (normal) matrix
+               # input, it could for a file-backed big.matrix. It's easier to follow a unified pattern.
                index = 1
+       }
        repeat
        {
                if (is_matrix)
@@ -67,10 +76,14 @@ binarize = function(data_ascii, data_bin_file, nb_per_chunk,
                                        double(0)
                        index = index + nb_per_chunk
                }
-               else
+               else #connection
                        data_chunk = scan(data_ascii, double(), sep=sep, nlines=nb_per_chunk, quiet=TRUE)
+
+               # Data size is unknown in the case of a connection
                if (length(data_chunk)==0)
                        break
+
+               # Write this chunk of data to the binary file
                writeBin(data_chunk, data_bin, size=nbytes, endian=endian)
        }
 
@@ -91,35 +104,47 @@ binarize = function(data_ascii, data_bin_file, nb_per_chunk,
 binarizeTransform = function(getData, transform, data_bin_file, nb_per_chunk,
        nbytes=4, endian=.Platform$endian)
 {
-       nb_items = 0
+       nb_items = 0 #side-effect: store the number of transformed items
        index = 1
        repeat
        {
+               # Retrieve a chunk of data in a binary file (generally obtained by binarize())
                data_chunk = getData((index-1)+seq_len(nb_per_chunk))
                if (is.null(data_chunk))
                        break
+
+               # Apply transformation on the current chunk (by columns)
                transformed_chunk = transform(data_chunk)
+
+               # Save the result in binary format
                binarize(transformed_chunk, data_bin_file, nb_per_chunk, ",", nbytes, endian)
+
                index = index + nb_per_chunk
                nb_items = nb_items + nrow(data_chunk)
        }
-       nb_items
+       nb_items #number of transformed items
 }
 
 #' @rdname de_serialize
 #' @export
 getDataInFile = function(indices, data_bin_file, nbytes=4, endian=.Platform$endian)
 {
-       data_bin = file(data_bin_file, "rb")
-       data_size = file.info(data_bin_file)$size
+       data_bin = file(data_bin_file, "rb") #source binary file
+
+       data_size = file.info(data_bin_file)$size #number of bytes in the file
+       # data_length: length of a vector in the binary file (first element, 8 bytes)
        data_length = readBin(data_bin, "integer", n=1, size=8, endian=endian)
+
+       # Seek all 'indices' columns in the binary file, using data_length and nbytes
+       # to compute the offset ( index i at 8 + i*data_length*nbytes )
        data_ascii = do.call( cbind, lapply( indices, function(i) {
                offset = 8+(i-1)*data_length*nbytes
                if (offset > data_size)
                        return (NULL)
-               ignored = seek(data_bin, offset)
+               ignored = seek(data_bin, offset) #position cursor at computed offset
                readBin(data_bin, "double", n=data_length, size=nbytes, endian=endian)
        } ) )
        close(data_bin)
-       data_ascii
+
+       data_ascii #retrieved data, in columns
 }
index 2af6f90..bcc650a 100644 (file)
@@ -4,29 +4,35 @@
 #' two stage procedure in parallel (see details).
 #' Input series must be sampled on the same time grid, no missing values.
 #'
-#' @details Summary of the function execution flow:
+#' Summary of the function execution flow:
+#' \enumerate{
+#'   \item Compute and serialize all contributions, obtained through discrete wavelet
+#'     decomposition (see Antoniadis & al. [2013])
+#'   \item Divide series into \code{ntasks} groups to process in parallel. In each task:
 #'   \enumerate{
-#'     \item Compute and serialize all contributions, obtained through discrete wavelet
-#'       decomposition (see Antoniadis & al. [2013])
-#'     \item Divide series into \code{ntasks} groups to process in parallel. In each task:
-#'     \enumerate{
-#'       \item iterate the first clustering algorithm on its aggregated outputs,
-#'         on inputs of size \code{nb_items_clust1}
-#'       \item optionally, if WER=="mix":
-#'         a) compute the K1 synchrones curves,
-#'         b) compute WER distances (K1xK1 matrix) between synchrones and
-#'         c) apply the second clustering algorithm
-#'     }
-#'     \item Launch a final task on the aggregated outputs of all previous tasks:
-#'       in the case WER=="end" this task takes indices in input, otherwise
-#'       (medoid) curves
+#'     \item iterate the first clustering algorithm on its aggregated outputs,
+#'       on inputs of size \code{nb_items_clust1}
+#'     \item optionally, if WER=="mix":
+#'       a) compute the K1 synchrones curves,
+#'       b) compute WER distances (K1xK1 matrix) between synchrones and
+#'       c) apply the second clustering algorithm
 #'   }
-#'   The main argument -- \code{getSeries} -- has a quite misleading name, since it can be
-#'   either a [big.]matrix, a CSV file, a connection or a user function to retrieve
-#'   series; the name was chosen because all types of arguments are converted to a function.
-#'   When \code{getSeries} is given as a function, it must take a single argument,
-#'   'indices', integer vector equal to the indices of the curves to retrieve;
-#'   see SQLite example. The nature and role of other arguments should be clear
+#'   \item Launch a final task on the aggregated outputs of all previous tasks:
+#'     in the case WER=="end" this task takes indices in input, otherwise
+#'     (medoid) curves
+#' }
+#' \cr
+#' The main argument -- \code{getSeries} -- has a quite misleading name, since it can be
+#' either a [big.]matrix, a CSV file, a connection or a user function to retrieve
+#' series; the name was chosen because all types of arguments are converted to a function.
+#' When \code{getSeries} is given as a function, it must take a single argument,
+#' 'indices', integer vector equal to the indices of the curves to retrieve;
+#' see SQLite example. The nature and role of other arguments should be clear
+#' \cr
+#' Note: Since we don't make assumptions on initial data, there is a possibility that
+#' even when serialized, contributions or synchrones do not fit in RAM. For example,
+#' 30e6 series of length 100,000 would lead to a +4Go contribution matrix. Therefore,
+#' it's safer to place these in (binary) files; that's what we do.
 #'
 #' @param getSeries Access to the (time-)series, which can be of one of the three
 #'   following types:
@@ -55,7 +61,6 @@
 #' @param contrib_type Type of contribution: "relative", "logit" or "absolute" (any prefix)
 #' @param WER "end" to apply stage 2 after stage 1 has fully iterated, or "mix" to apply
 #'   stage 2 at the end of each task
-#' @param sync_mean TRUE to compute a synchrone as a mean curve, FALSE for a sum
 #' @param random TRUE (default) for random chunks repartition
 #' @param ntasks Number of tasks (parallel iterations to obtain K1 [if WER=="end"]
 #'   or K2 [if WER=="mix"] medoids); default: 1.
 #' digest::sha1(medoids_db)
 #' }
 #' @export
-claws <- function(getSeries, K1, K2, nb_series_per_chunk,
-       nb_items_clust1=7*K1,
+claws <- function(getSeries, K1, K2, nb_series_per_chunk, nb_items_clust1=7*K1,
        algoClust1=function(data,K) cluster::pam(t(data),K,diss=FALSE)$id.med,
        algoClust2=function(dists,K) cluster::pam(dists,K,diss=TRUE)$id.med,
-       wav_filt="d8", contrib_type="absolute",
-       WER="end",sync_mean=TRUE,
-       random=TRUE,
-       ntasks=1, ncores_tasks=1, ncores_clust=4,
-       sep=",",
-       nbytes=4, endian=.Platform$endian,
-       verbose=FALSE, parll=TRUE)
+       wav_filt="d8", contrib_type="absolute", WER="end", random=TRUE,
+       ntasks=1, ncores_tasks=1, ncores_clust=4, sep=",", nbytes=4,
+       endian=.Platform$endian, verbose=FALSE, parll=TRUE)
 {
        # Check/transform arguments
        if (!is.matrix(getSeries) && !bigmemory::is.big.matrix(getSeries)
@@ -173,7 +173,6 @@ claws <- function(getSeries, K1, K2, nb_series_per_chunk,
                stop("'contrib_type' in {'relative','absolute','logit'}")
        if (WER!="end" && WER!="mix")
                stop("'WER': in {'end','mix'}")
-       sync_mean <- .toLogical(sync_mean)
        random <- .toLogical(random)
        ntasks <- .toInteger(ntasks, function(x) x>=1)
        ncores_tasks <- .toInteger(ncores_tasks, function(x) x>=1)
@@ -184,27 +183,20 @@ claws <- function(getSeries, K1, K2, nb_series_per_chunk,
        verbose <- .toLogical(verbose)
        parll <- .toLogical(parll)
 
-       # Since we don't make assumptions on initial data, there is a possibility that even
-       # when serialized, contributions or synchrones do not fit in RAM. For example,
-       # 30e6 series of length 100,000 would lead to a +4Go contribution matrix. Therefore,
-       # it's safer to place these in (binary) files, located in the following folder.
-       bin_dir <- ".epclust_bin/"
-       dir.create(bin_dir, showWarnings=FALSE, mode="0755")
-
        # Binarize series if getSeries is not a function; the aim is to always use a function,
        # to uniformize treatments. An equally good alternative would be to use a file-backed
-       # bigmemory::big.matrix, but it would break the uniformity.
+       # bigmemory::big.matrix, but it would break the "all-is-function" pattern.
        if (!is.function(getSeries))
        {
                if (verbose)
                        cat("...Serialize time-series\n")
-               series_file = paste(bin_dir,"data",sep="") ; unlink(series_file)
+               series_file = ".series.bin" ; unlink(series_file)
                binarize(getSeries, series_file, nb_series_per_chunk, sep, nbytes, endian)
                getSeries = function(inds) getDataInFile(inds, series_file, nbytes, endian)
        }
 
        # Serialize all computed wavelets contributions into a file
-       contribs_file = paste(bin_dir,"contribs",sep="") ; unlink(contribs_file)
+       contribs_file = ".contribs.bin" ; unlink(contribs_file)
        index = 1
        nb_curves = 0
        if (verbose)
@@ -221,8 +213,8 @@ claws <- function(getSeries, K1, K2, nb_series_per_chunk,
        if (nb_series_per_task < K2)
                stop("Too many tasks: less series in one task than final number of clusters")
 
-       # Generate a random permutation of 1:N (if random==TRUE); otherwise just use arrival
-       # (storage) order.
+       # Generate a random permutation of 1:N (if random==TRUE);
+       # otherwise just use arrival (storage) order.
        indices_all = if (random) sample(nb_curves) else seq_len(nb_curves)
        # Split (all) indices into ntasks groups of ~same size
        indices_tasks = lapply(seq_len(ntasks), function(i) {
@@ -236,9 +228,9 @@ claws <- function(getSeries, K1, K2, nb_series_per_chunk,
                # under Linux. All necessary variables are passed to the workers.
                cl = parallel::makeCluster(ncores_tasks, outfile="")
                varlist = c("getSeries","getContribs","K1","K2","algoClust1","algoClust2",
-                       "nb_series_per_chunk","nb_items_clust1","ncores_clust","sep",
-                       "nbytes","endian","verbose","parll")
-               if (WER=="mix")
+                       "nb_series_per_chunk","nb_items_clust1","ncores_clust",
+                       "sep","nbytes","endian","verbose","parll")
+               if (WER=="mix" && ntasks>1)
                        varlist = c(varlist, "medoids_file")
                parallel::clusterExport(cl, varlist, envir = environment())
        }
@@ -249,19 +241,19 @@ claws <- function(getSeries, K1, K2, nb_series_per_chunk,
        # where n = N / ntasks, N being the total number of curves.
        runTwoStepClustering = function(inds)
        {
-               # When running in parallel, the environment is blank: we need to load required
+               # When running in parallel, the environment is blank: we need to load the required
                # packages, and pass useful variables.
                if (parll && ntasks>1)
                        require("epclust", quietly=TRUE)
                indices_medoids = clusteringTask1(
                        inds, getContribs, K1, algoClust1, nb_series_per_chunk, ncores_clust, verbose, parll)
-               if (WER=="mix")
+               if (WER=="mix" && ntasks>1)
                {
-                       if (parll && ntasks>1)
+                       if (parll)
                                require("bigmemory", quietly=TRUE)
                        medoids1 = bigmemory::as.big.matrix( getSeries(indices_medoids) )
                        medoids2 = clusteringTask2(medoids1, K2, algoClust2, getSeries, nb_curves,
-                               nb_series_per_chunk, sync_mean, nbytes, endian, ncores_clust, verbose, parll)
+                               nb_series_per_chunk, nbytes, endian, ncores_clust, verbose, parll)
                        binarize(medoids2, medoids_file, nb_series_per_chunk, sep, nbytes, endian)
                        return (vector("integer",0))
                }
@@ -271,8 +263,8 @@ claws <- function(getSeries, K1, K2, nb_series_per_chunk,
        # Synchrones (medoids) need to be stored only if WER=="mix"; indeed in this case, every
        # task output is a set of new (medoids) curves. If WER=="end" however, output is just a
        # set of indices, representing some initial series.
-       if (WER=="mix")
-               {medoids_file = paste(bin_dir,"medoids",sep="") ; unlink(medoids_file)}
+       if (WER=="mix" && ntasks>1)
+               {medoids_file = ".medoids.bin" ; unlink(medoids_file)}
 
        if (verbose)
        {
@@ -283,8 +275,7 @@ claws <- function(getSeries, K1, K2, nb_series_per_chunk,
        }
 
        # As explained above, indices will be assigned to ntasks*K1 medoids indices [if WER=="end"],
-       # or nothing (empty vector) if WER=="mix"; in this case, medoids (synchrones) are stored
-       # in a file.
+       # or nothing (empty vector) if WER=="mix"; in this case, synchrones are stored in a file.
        indices <-
                if (parll && ntasks>1)
                        unlist( parallel::parLapply(cl, indices_tasks, runTwoStepClustering) )
@@ -294,14 +285,14 @@ claws <- function(getSeries, K1, K2, nb_series_per_chunk,
                parallel::stopCluster(cl)
 
        # Right before the final stage, two situations are possible:
-       #  a. data to be processed now sit in binary format in medoids_file (if WER=="mix")
+       #  a. data to be processed now sit in binary format in medoids_file (if WER=="mix")
        #  b. data still is the initial set of curves, referenced by the ntasks*K1 indices
        # So, the function getSeries() will potentially change. However, computeSynchrones()
        # requires a function retrieving the initial series. Thus, the next line saves future
        # conditional instructions.
        getRefSeries = getSeries
 
-       if (WER=="mix")
+       if (WER=="mix" && ntasks>1)
        {
                indices = seq_len(ntasks*K2)
                # Now series (synchrones) must be retrieved from medoids_file
@@ -316,9 +307,6 @@ claws <- function(getSeries, K1, K2, nb_series_per_chunk,
                        contribs_file, nb_series_per_chunk, nbytes, endian)
        }
 
-#TODO: check THAT
-
-
        # Run step2 on resulting indices or series (from file)
        if (verbose)
                cat("...Run final // stage 1 + stage 2\n")
@@ -326,10 +314,12 @@ claws <- function(getSeries, K1, K2, nb_series_per_chunk,
                nb_series_per_chunk, ncores_tasks*ncores_clust, verbose, parll)
        medoids1 = bigmemory::as.big.matrix( getSeries(indices_medoids) )
        medoids2 = clusteringTask2(medoids1, K2, algoClust2, getRefSeries, nb_curves,
-               nb_series_per_chunk, sync_mean, nbytes, endian, ncores_tasks*ncores_clust, verbose, parll)
+               nb_series_per_chunk, nbytes, endian, ncores_tasks*ncores_clust, verbose, parll)
 
-       # Cleanup: remove temporary binary files and their folder
-       unlink(bin_dir, recursive=TRUE)
+       # Cleanup: remove temporary binary files
+       tryCatch(
+               {unlink(series_file); unlink(contribs_file); unlink(medoids_file)},
+               error = function(e) {})
 
        # Return medoids as a standard matrix, since K2 series have to fit in RAM
        # (clustering algorithm 1 takes K1 > K2 of them as input)
@@ -352,10 +342,12 @@ curvesToContribs = function(series, wav_filt, contrib_type, coin=FALSE)
        series = as.matrix(series) #1D serie could occur
        L = nrow(series)
        D = ceiling( log2(L) )
+       # Series are interpolated to all have length 2^D
        nb_sample_points = 2^D
        apply(series, 2, function(x) {
                interpolated_curve = spline(1:L, x, n=nb_sample_points)$y
                W = wavelets::dwt(interpolated_curve, filter=wav_filt, D)@W
+               # Compute the sum of squared discrete wavelet coefficients, for each scale
                nrj = rev( sapply( W, function(v) ( sqrt( sum(v^2) ) ) ) )
                if (contrib_type!="absolute")
                        nrj = nrj / sum(nrj)
index f247584..895031a 100644 (file)
@@ -10,33 +10,39 @@ using namespace Rcpp;
 
 //' computeMedoidsIndices
 //'
-//' Compute medoids indices
+//' For each column of the 'series' matrix input, search for the closest medoid
+//' (euclidian distance) and store its index
 //'
-//' @param pMedoids External pointer
-//' @param ref_series reference series
+//' @param pMedoids External pointer (a big.matrix 'address' slot in R)
+//' @param series (reference) series, a matrix of size Lxn
 //'
-//' @return A map serie number -> medoid index
+//' @return An integer vector of the closest medoids indices, for each (column) serie
 // [[Rcpp::export]]
-IntegerVector computeMedoidsIndices(SEXP pMedoids, NumericMatrix ref_series)
+IntegerVector computeMedoidsIndices(SEXP pMedoids, NumericMatrix series)
 {
+       // Turn SEXP external pointer into BigMatrix (description) object
        XPtr<BigMatrix> pMed(pMedoids);
+       // medoids: access to the content of the BigMatrix object
        MatrixAccessor<double> medoids = MatrixAccessor<double>(*pMed);
-       int nb_series = ref_series.ncol(),
+
+       int nb_series = series.ncol(),
                K = pMed->ncol(),
                L = pMed->nrow();
        IntegerVector mi(nb_series);
 
-       for (int i=0; i<nb_series ; i++)
+       for (int i=0; i<nb_series ; i++) //column index in series
        {
-               // mi[i] <- which.min( rowSums( sweep(medoids, 2, ref_series[i,], '-')^2 ) )
+               // In R: mi[i] <- which.min( rowSums( sweep(medoids, 2, series[i,], '-')^2 ) )
+               // In C(++), computations must be unrolled
                mi[i] = 0;
                double best_dist = DBL_MAX;
-               for (int j=0; j<K; j++)
+               for (int j=0; j<K; j++) //column index in medoids
                {
                        double dist_ij = 0.;
-                       for (int k=0; k<L; k++)
+                       for (int k=0; k<L; k++) //common row index (medoids, series)
                        {
-                               double delta = ref_series(k,i) - *(medoids[j]+k);
+                               // Accessing values for a big matrix is a bit weird; see Rcpp gallery/bigmemory
+                               double delta = series(k,i) - *(medoids[j]+k);
                                dist_ij += delta * delta;
                        }
                        if (dist_ij < best_dist)
index cb5a8a0..5268a5f 100644 (file)
@@ -1,51 +1,50 @@
 #include <Rcpp.h>
 
-#include <R.h>           //  Rprintf()
-//#include <R_ext/Utils.h> //  user interrupts
-#include <Rdefines.h>
-#include <Rinternals.h>
-
-#include <stdio.h>
-
 using namespace Rcpp;
 
 //' filter
 //'
-//' Filter time-series
+//' Filter [time-]series by replacing all values by the moving average of previous, current and
+//' next value. Possible extensions: larger window, gaussian kernel... (but would be costly!).
+//' Note: border values are unchanged.
 //'
-//' @param cwt Continuous wavelets transform
+//' @param cwt Continuous wavelets transform (in columns): a matrix of size LxD
 //'
-//' @return The filtered CWT
+//' @return The filtered CWT, in a matrix of same size (LxD)
 // [[Rcpp::export]]
 RcppExport SEXP epclustFilter(SEXP cwt_)
 {
+       // NOTE: C-style for better efficiency (this must be as fast as possible)
        int L = INTEGER(Rf_getAttrib(cwt_, R_DimSymbol))[0],
                D = INTEGER(Rf_getAttrib(cwt_, R_DimSymbol))[1];
        double *cwt = REAL(cwt_);
-       SEXP fcwt_;
+
+       SEXP fcwt_; //the filtered result
        PROTECT(fcwt_ = Rf_allocMatrix(REALSXP, L, D));
-               double* fcwt = REAL(fcwt_); //(double*)malloc(L*D*sizeof(double));
+       double* fcwt = REAL(fcwt_); //pointer to the encapsulated vector
 
-       //TODO: coding style is terrible... no time for now.
-       for (int col=0; col<D; col++)
+       // NOTE: unused loop parameter: shifting at the end of the loop is more efficient
+       for (int col=D-1; col>=0; col++)
        {
-               double v1 = cwt[0];
-               double ma = v1 + cwt[1] + cwt[2];
+               double v1 = cwt[0]; //first value
+               double ms = v1 + cwt[1] + cwt[2]; //moving sum at second value
                for (int i=1; i<L-2; i++)
                {
-                       fcwt[i] = ma / 3.;
-                       ma = ma - v1 + cwt[i+2];
-                       v1 = cwt[i];
+                       fcwt[i] = ms / 3.; //ms / 3: moving average at current index i
+                       ms = ms - v1 + cwt[i+2]; //update ms: remove oldest items, add next
+                       v1 = cwt[i]; //update first value for next iteration
                }
+
+               // Fill a few border values not computed in the loop
                fcwt[0] = cwt[0];
-               fcwt[L-2] = ma / 3.;
+               fcwt[L-2] = ms / 3.;
                fcwt[L-1] = cwt[L-1];
+
+               // Shift by L == increment column index by 1 (storage per column)
                cwt = cwt + L;
                fcwt = fcwt + L;
        }
 
-//     REAL(fcwt_) = fcwt;
        UNPROTECT(1);
-
        return fcwt_;
 }
index 21a791a..273a0b7 100644 (file)
@@ -1,15 +1,18 @@
-#shorthand: map 1->1, 2->2, 3->3, 4->1, ..., 149->2, 150->3, ... (is base==3)
+# Shorthand: map 1->1, 2->2, 3->3, 4->1, ..., 149->2, 150->3, ... (is base==3)
 I = function(i, base)
        (i-1) %% base + 1
 
-# NOTE: medoids can be a big.matrix
+# Compute the sum of (normalized) sum of squares of closest distances to a medoid.
+# Note: medoids can be a big.matrix
 computeDistortion = function(series, medoids)
 {
+       if (bigmemory::is.big.matrix(medoids))
+               medoids = medoids[,] #extract standard matrix
+
        n = nrow(series) ; L = ncol(series)
        distortion = 0.
-       if (bigmemory::is.big.matrix(medoids))
-               medoids = medoids[,]
        for (i in seq_len(n))
                distortion = distortion + min( colSums( sweep(medoids,1,series[,i],'-')^2 ) / L )
+
        distortion / n
 }
index 9342feb..cd6a30e 100644 (file)
@@ -1,10 +1,12 @@
-#R-equivalent of computeMedoidsIndices, requiring a matrix
-#(thus potentially breaking "fit-in-memory" hope)
+# R-equivalent of computeMedoidsIndices, requiring a matrix
+# (thus potentially breaking "fit-in-memory" hope)
 R_computeMedoidsIndices <- function(medoids, series)
 {
-       nb_series = ncol(series)
+       nb_series = ncol(series) #series in columns
+
        mi = rep(NA,nb_series)
        for (i in 1:nb_series)
                mi[i] <- which.min( colSums( sweep(medoids, 1, series[,i], '-')^2 ) )
+
        mi
 }
index e22835a..c10f820 100644 (file)
@@ -21,11 +21,11 @@ test_that("computeSynchrones behave as expected",
                if (length(indices)>0) series[,indices] else NULL
        }
        synchrones = computeSynchrones(bigmemory::as.big.matrix(cbind(s1,s2,s3)), getRefSeries,
-               n, 100, sync_mean=TRUE, verbose=TRUE, parll=FALSE)
+               n, 100, verbose=TRUE, parll=FALSE)
 
        expect_equal(dim(synchrones), c(L,K))
        for (i in 1:K)
-               expect_equal(synchrones[,i], s[[i]], tolerance=0.01)
+               expect_equal(synchrones[,i]/100, s[[i]], tolerance=0.01)
 })
 
 # Helper function to divide indices into balanced sets
@@ -105,7 +105,7 @@ test_that("clusteringTask2 behave as expected",
        medoids_K1 = bigmemory::as.big.matrix( sapply( 1:K1, function(i) s[[I(i,K1)]] ) )
        algoClust2 = function(dists,K) cluster::pam(dists,K,diss=TRUE)$id.med
        medoids_K2 = clusteringTask2(medoids_K1, K2, algoClust2, getRefSeries,
-               n, 75, sync_mean=TRUE, verbose=TRUE, parll=FALSE)
+               n, 75, verbose=TRUE, parll=FALSE)
 
        expect_equal(dim(medoids_K2), c(L,K2))
        # Not easy to evaluate result: at least we expect it to be better than random selection of
index 5eb19c5..96f9db3 100644 (file)
@@ -1,3 +1,21 @@
-#TODO: test computeWerDists (reference result? Jairo?)
+context("wavelets discrete & continuous")
 
-#TODO: test sur curvesToCoefs! ref results ?
+test_that("curvesToContribs behave as expected",
+{
+       curvesToContribs(...)
+})
+
+test_that("computeWerDists output correct results",
+{
+       nbytes = 8
+       endian = "little"
+
+       # On two identical series
+       serie = rnorm(212, sd=5)
+       synchrones = cbind(serie, serie)
+       dists = computeWerDists(synchrones, nbytes,endian,verbose=TRUE,parll=FALSE)
+       expect_equal(dists, matrix(0.,nrow=2,ncol=2))
+
+       # On two constant series
+       # TODO:...
+})