From 3eef8d3df59ded9a281cff51f79fe824198a7427 Mon Sep 17 00:00:00 2001 From: Benjamin Auder Date: Sun, 5 Mar 2017 15:36:26 +0100 Subject: [PATCH] complete first draft of package --- epclust/R/clustering.R | 34 ++++++++++++++----- epclust/R/main.R | 14 ++++---- epclust/R/utils.R | 74 +++++++++++++++++++++++++++++------------- 3 files changed, 84 insertions(+), 38 deletions(-) diff --git a/epclust/R/clustering.R b/epclust/R/clustering.R index c8bad66..87a5f91 100644 --- a/epclust/R/clustering.R +++ b/epclust/R/clustering.R @@ -1,6 +1,6 @@ # Cluster one full task (nb_curves / ntasks series) clusteringTask = function(indices,getSeries,getSeriesForSynchrones,synchrones_file, - getCoefs,K1,K2,nb_series_per_chunk,ncores,to_file) + getCoefs,K1,K2,nb_series_per_chunk,ncores,to_file,ftype) { cl = parallel::makeCluster(ncores) repeat @@ -19,7 +19,8 @@ clusteringTask = function(indices,getSeries,getSeriesForSynchrones,synchrones_fi parallel::stopCluster(cl) if (K2 == 0) return (indices) - computeClusters2(indices, K2, getSeries, getSeriesForSynchrones, to_file) + computeClusters2(indices, K2, getSeries, getSeriesForSynchrones, to_file, + nb_series_per_chunk,ftype) vector("integer",0) } @@ -31,27 +32,42 @@ computeClusters1 = function(indices, getCoefs, K1) } # Cluster a chunk of series inside one task (~max nb_series_per_chunk) -computeClusters2 = function(indices, K2, getSeries, getSeriesForSynchrones, to_file) +computeClusters2 = function(indices, K2, getSeries, getSeriesForSynchrones, to_file, + nb_series_per_chunk, ftype) { - curves = computeSynchrones(indices, getSeries, getSeriesForSynchrones) + curves = computeSynchrones(indices, getSeries, getSeriesForSynchrones, nb_series_per_chunk) dists = computeWerDists(curves) medoids = cluster::pam(dists, K2, diss=TRUE)$medoids if (to_file) { - serialize(medoids, synchrones_file) + serialize(medoids, synchrones_file, ftype, nb_series_per_chunk) return (NULL) } medoids } # Compute the synchrones curves (sum of clusters elements) from a clustering result -computeSynchrones = function(indices, getSeries, getSeriesForSynchrones) +computeSynchrones = function(indices, getSeries, getSeriesForSynchrones, nb_series_per_chunk) { #les getSeries(indices) sont les medoides --> init vect nul pour chacun, puis incr avec les #courbes (getSeriesForSynchrones) les plus proches... --> au sens de la norme L2 ? - series = getSeries(indices) - #........... - #sapply(seq_along(inds), colMeans(getSeries(inds[[i]]$indices,inds[[i]]$ids))) + medoids = getSeries(indices) + K = nrow(medoids) + synchrones = matrix(0, nrow=K, ncol=ncol(medoids)) + counts = rep(0,K) + index = 1 + repeat + { + series = getSeriesForSynchrones((index-1)+seq_len(nb_series_per_chunk)) + if (is.null(series)) + break + #get medoids indices for this chunk of series + index = which.min( rowSums( sweep(medoids, 2, series[i,], '-')^2 ) ) + synchrones[index,] = synchrones[index,] + series[i,] + counts[index] = counts[index] + 1 + } + #NOTE: odds for some clusters to be empty? (when series already come from stage 2) + synchrones = sweep(synchrones, 1, counts, '/') } # Compute the WER distance between the synchrones curves (in rows) diff --git a/epclust/R/main.R b/epclust/R/main.R index 0b59832..27fbb74 100644 --- a/epclust/R/main.R +++ b/epclust/R/main.R @@ -40,7 +40,7 @@ #' cl = epclust(getData, K1=200, K2=15, ntasks=1000, nb_series_per_chunk=5000, WER="mix") #' @export epclust = function(series,K1,K2,ntasks=1,nb_series_per_chunk=50*K1,min_series_per_chunk=5*K1, - wf="haar",WER="end",ncores_tasks=1,ncores_clust=4,random=TRUE,...) + wf="haar",WER="end",ncores_tasks=1,ncores_clust=4,random=TRUE,ftype="float",...) { # Check/transform arguments bin_dir = "epclust.bin/" @@ -51,7 +51,7 @@ epclust = function(series,K1,K2,ntasks=1,nb_series_per_chunk=50*K1,min_series_pe unlink(series_file) } if (is.matrix(series)) - serialize(series, series_file) + serialize(series, series_file, ftype, nb_series_per_chunk) else if (!is.function(series)) { tryCatch( @@ -63,7 +63,7 @@ epclust = function(series,K1,K2,ntasks=1,nb_series_per_chunk=50*K1,min_series_pe open(series) series_con = series } - serialize(series_con, series_file) + serialize(series_con, series_file, ftype, nb_series_per_chunk) close(series_con) }, error=function(e) "series should be a data.frame, a function or a valid connection" @@ -94,7 +94,7 @@ epclust = function(series,K1,K2,ntasks=1,nb_series_per_chunk=50*K1,min_series_pe if (is.null(series)) break coeffs_chunk = curvesToCoeffs(series, wf) - serialize(coeffs_chunk, coefs_file) + serialize(coeffs_chunk, coefs_file, ftype, nb_series_per_chunk) index = index + nb_series_per_chunk nb_curves = nb_curves + nrow(coeffs_chunk) } @@ -116,7 +116,7 @@ epclust = function(series,K1,K2,ntasks=1,nb_series_per_chunk=50*K1,min_series_pe #1000*K1 (or K2) indices (or NOTHING--> series on file) indices = unlist( parallel::parLapply(cl, indices_tasks, function(inds) { clusteringTask(inds, getSeries, getSeries, getCoefs, K1, K2*(WER=="mix"), - nb_series_per_chunk,ncores_clust,to_file=TRUE) + nb_series_per_chunk,ncores_clust,to_file=TRUE, ftype) }) ) parallel::stopCluster(cl) @@ -136,12 +136,12 @@ epclust = function(series,K1,K2,ntasks=1,nb_series_per_chunk=50*K1,min_series_pe if (is.null(series)) break coeffs_chunk = curvesToCoeffs(series, wf) - serialize(coeffs_chunk, coefs_file) + serialize(coeffs_chunk, coefs_file, ftype, nb_series_per_chunk) index = index + nb_series_per_chunk } } # Run step2 on resulting indices or series (from file) clusteringTask(indices, getSeries, getSeriesForSynchrones, getCoefs, K1, K2, - nb_series_per_chunk, ncores_tasks*ncores_clust, to_file=FALSE) + nb_series_per_chunk, ncores_tasks*ncores_clust, to_file=FALSE, ftype) } diff --git a/epclust/R/utils.R b/epclust/R/utils.R index 7083674..40b0a18 100644 --- a/epclust/R/utils.R +++ b/epclust/R/utils.R @@ -10,28 +10,6 @@ toInteger <- function(x, condition) x } -writeCoeffs = function(coeffs) -{ - file = ".coeffs" - #......... - #C function (from data.frame, type of IDs ??! force integers ? [yes]) - #return raw vector - #take raw vector, append it (binary mode) to a file -#TODO: appendCoeffs() en C --> serialize et append to file -} - -readCoeffs = function(indices) -{ - #...... - file = ".coeffs" - #C function (from file name) -} - -getSeries(data, rank=NULL, id=NULL) -{ - #TODO: -} - curvesToCoeffs = function(series, wf) { L = length(series[1,]) @@ -43,3 +21,55 @@ curvesToCoeffs = function(series, wf) rev( sapply( W, function(v) ( sqrt( sum(v^2) ) ) ) ) }) } + +#data: matrix of double or connection +serialize = function(data, file, type, nb_per_chunk) +{ + bin_data = file(file, "ab") + #write data length on first call + nbytes = ifelse(type=="double",8,4) + first_write = FALSE + if (file.info(file)$size == 0) + { + #number of items always on 8 bytes + writeBin(0L, bin_data, size=8) #,endian="little") + first_write = TRUE + } + if (is.matrix(data)) + { + writeBin(t(data), bin_data, size=nbytes) + data_length = ncol(data) + } + else #if (is(data, "connection")) + { + if (first_write) + { + data_line = scan(data, double(), sep=",", nlines=1) + writeBin(data_line, bin_data, size=nbytes) + data_length = length(data_line) + } + repeat + { + data_chunk = scan(data, double(), sep=",", nlines=nb_per_chunk) + if (length(data_chunk)==0) + break + writeBin(data_chunk, bin_data, size=nbytes) + } + } + if (first_write) + { + #ecrire file_size-1 / (nbytes*nbWritten) en 0 dans bin_data ! ignored == file_size + ignored = seek(bin_data, 0) + writeBin(data_length, bin_data, size=8) + } + close(bin_data) +} + +#TODO: read in binary file, always same structure +getDataFromFile(indices, file, type) +{ + bin_data = file(file, "rb") + nbytes = ifelse(type=="double",8,4) + data_length = readBin(bin_data,"double",1,nbytes) #,endian="little") + t(sapply(indices, function(i) readBin(bin_data,"double",n=data_length,size=nbytes))) +} -- 2.44.0