-# Cluster one full task (nb_curves / ntasks series); only step 1
-clusteringTask = function(indices, getCoefs, K1, nb_series_per_chunk, ncores)
+#' @name clustering
+#' @rdname clustering
+#' @aliases clusteringTask computeClusters1 computeClusters2
+#'
+#' @title Two-stages clustering, withing one task (see \code{claws()})
+#'
+#' @description \code{clusteringTask()} runs one full task, which consists in iterated stage 1
+#' clustering (on nb_curves / ntasks energy contributions, computed through discrete
+#' wavelets coefficients). \code{computeClusters1()} and \code{computeClusters2()}
+#' correspond to the atomic clustering procedures respectively for stage 1 and 2.
+#' The former applies the clustering algorithm (PAM) on a contributions matrix, while
+#' the latter clusters a chunk of series inside one task (~max nb_series_per_chunk)
+#'
+#' @param indices Range of series indices to cluster in parallel (initial data)
+#' @param getContribs Function to retrieve contributions from initial series indices:
+#' \code{getContribs(indices)} outpus a contributions matrix
+#' @param contribs matrix of contributions (e.g. output of \code{curvesToContribs()})
+#' @inheritParams computeSynchrones
+#' @inheritParams claws
+#'
+#' @return For \code{clusteringTask()} and \code{computeClusters1()}, the indices of the
+#' computed (K1) medoids. Indices are irrelevant for stage 2 clustering, thus
+#' \code{computeClusters2()} outputs a matrix of medoids
+#' (of size limited by nb_series_per_chunk)
+NULL
+
+#' @rdname clustering
+#' @export
+clusteringTask = function(indices, getContribs, K1, nb_series_per_chunk, ncores_clust)
{
- cl = parallel::makeCluster(ncores)
- parallel::clusterExport(cl, varlist=c("getCoefs","K1"), envir=environment())
+
+#NOTE: comment out parallel sections for debugging
+#propagate verbose arg ?!
+
+# cl = parallel::makeCluster(ncores_clust)
+# parallel::clusterExport(cl, varlist=c("getContribs","K1"), envir=environment())
repeat
{
+
+print(length(indices))
+
nb_workers = max( 1, floor( length(indices) / nb_series_per_chunk ) )
indices_workers = lapply( seq_len(nb_workers), function(i)
indices[(nb_series_per_chunk*(i-1)+1):(nb_series_per_chunk*i)] )
indices_workers[[index]] = c(indices_workers[[index]], tail(indices,rem))
rem = rem - 1
}
- indices = unlist( parallel::parLapply( cl, indices_workers, function(inds) {
- require("epclust", quietly=TRUE)
- inds[ computeClusters1(getCoefs(inds), K1) ]
+# indices = unlist( parallel::parLapply( cl, indices_workers, function(inds) {
+ indices = unlist( lapply( indices_workers, function(inds) {
+# require("epclust", quietly=TRUE)
+
+print(paste(" ",length(inds))) ## PROBLEME ICI : 21104 ??!
+
+ inds[ computeClusters1(getContribs(inds), K1) ]
} ) )
if (length(indices) == K1)
break
}
- parallel::stopCluster(cl)
+# parallel::stopCluster(cl)
indices #medoids
}
-# Apply the clustering algorithm (PAM) on a coeffs or distances matrix
-computeClusters1 = function(coefs, K1)
- cluster::pam(coefs, K1, diss=FALSE)$id.med
+#' @rdname clustering
+#' @export
+computeClusters1 = function(contribs, K1)
+ cluster::pam(contribs, K1, diss=FALSE)$id.med
-# Cluster a chunk of series inside one task (~max nb_series_per_chunk)
+#' @rdname clustering
+#' @export
computeClusters2 = function(medoids, K2, getRefSeries, nb_series_per_chunk)
{
synchrones = computeSynchrones(medoids, getRefSeries, nb_series_per_chunk)
medoids[ cluster::pam(computeWerDists(synchrones), K2, diss=TRUE)$medoids , ]
}
-# Compute the synchrones curves (sum of clusters elements) from a clustering result
+#' computeSynchrones
+#'
+#' Compute the synchrones curves (sum of clusters elements) from a matrix of medoids,
+#' using L2 distances.
+#'
+#' @param medoids Matrix of medoids (curves of same legnth as initial series)
+#' @param getRefSeries Function to retrieve initial series (e.g. in stage 2 after series
+#' have been replaced by stage-1 medoids)
+#' @inheritParams claws
+#'
+#' @export
computeSynchrones = function(medoids, getRefSeries, nb_series_per_chunk)
{
K = nrow(medoids)
synchrones[ sapply(seq_len(K), function(i) all(!is.nan(synchrones[i,]))) , ]
}
-# Compute the WER distance between the synchrones curves (in rows)
-computeWerDists = function(curves)
+#' computeWerDists
+#'
+#' Compute the WER distances between the synchrones curves (in rows), which are
+#' returned (e.g.) by \code{computeSynchrones()}
+#'
+#' @param synchrones A matrix of synchrones, in rows. The series have same length as the
+#' series in the initial dataset
+#'
+#' @export
+computeWerDists = function(synchrones)
{
- if (!require("Rwave", quietly=TRUE))
- stop("Unable to load Rwave library")
- n <- nrow(curves)
- delta <- ncol(curves)
+ n <- nrow(synchrones)
+ delta <- ncol(synchrones)
#TODO: automatic tune of all these parameters ? (for other users)
nvoice <- 4
- # noctave = 2^13 = 8192 half hours ~ 180 days ; ~log2(ncol(curves))
+ # noctave = 2^13 = 8192 half hours ~ 180 days ; ~log2(ncol(synchrones))
noctave = 13
# 4 here represent 2^5 = 32 half-hours ~ 1 day
#NOTE: default scalevector == 2^(0:(noctave * nvoice) / nvoice) * s0 (?)
# (normalized) observations node with CWT
Xcwt4 <- lapply(seq_len(n), function(i) {
- ts <- scale(ts(curves[i,]), center=TRUE, scale=scaled)
+ ts <- scale(ts(synchrones[i,]), center=TRUE, scale=scaled)
totts.cwt = Rwave::cwt(ts,totnoct,nvoice,w0,plot=0)
ts.cwt = totts.cwt[,s0log:(s0log+noctave*nvoice)]
#Normalization
-#data: matrix of double or connection
-serialize = function(data_ascii, data_bin_file, nb_per_chunk,
+#' @name de_serialize
+#' @rdname de_serialize
+#' @aliases binarize getDataInFile
+#'
+#' @title (De)Serialization of a matrix
+#'
+#' @description \code{binarize()} serializes a matrix or CSV file with minimal overhead,
+#' into a binary file. \code{getDataInFile()} achieves the inverse task: she retrieves
+#' (ASCII) data rows from indices in the binary file
+#'
+#' @param data_ascii Either a matrix or CSV file, with items in rows
+#' @param indices Indices of the lines to retrieve
+#' @param data_bin_file Name of binary file on output (\code{binarize})
+#' or intput (\code{getDataInFile})
+#' @param nb_per_chunk Number of lines to process in one batch
+#' @inheritParams claws
+#'
+#' @return For \code{getDataInFile()}, the matrix with rows corresponding to the
+#' requested indices
+NULL
+
+#' @rdname de_serialize
+#' @export
+binarize = function(data_ascii, data_bin_file, nb_per_chunk,
sep=",", nbytes=4, endian=.Platform$endian)
{
if (is.character(data_ascii))
data_ascii = file(data_ascii, open="r")
- else if (is(data_ascii,"connection") && !isOpen(data_ascii))
+ else if (methods::is(data_ascii,"connection") && !isOpen(data_ascii))
open(data_ascii)
first_write = (!file.exists(data_bin_file) || file.info(data_bin_file)$size == 0)
if (first_write)
{
#number of items always on 8 bytes
- writeBin(0L, data_bin, size=8) #,endian="little")
+ writeBin(0L, data_bin, size=8, endian=endian)
if (is.matrix(data_ascii))
data_length = ncol(data_ascii)
else #if (is(data, "connection"))
{
data_line = scan(data_ascii, double(), sep=sep, nlines=1, quiet=TRUE)
- writeBin(data_line, data_bin, size=nbytes)
+ writeBin(data_line, data_bin, size=nbytes, endian=endian)
data_length = length(data_line)
}
}
data_chunk = scan(data_ascii, double(), sep=sep, nlines=nb_per_chunk, quiet=TRUE)
if (length(data_chunk)==0)
break
- writeBin(data_chunk, data_bin, size=nbytes)
+ writeBin(data_chunk, data_bin, size=nbytes, endian=endian)
}
if (first_write)
{
#ecrire file_size-1 / (nbytes*nbWritten) en 0 dans bin_data ! ignored == file_size
ignored = seek(data_bin, 0)
- writeBin(data_length, data_bin, size=8)
+ writeBin(data_length, data_bin, size=8, endian=endian)
}
close(data_bin)
- if (is(data_ascii,"connection"))
+ if (methods::is(data_ascii,"connection"))
close(data_ascii)
}
-#read in binary file, always same structure
+#' @rdname de_serialize
+#' @export
getDataInFile = function(indices, data_bin_file, nbytes=4, endian=.Platform$endian)
{
data_bin = file(data_bin_file, "rb")
- data_size = file.info(data_bin)$size
- data_length = readBin(data_bin, "integer", 1, 8, endian)
+ data_size = file.info(data_bin_file)$size
+ data_length = readBin(data_bin, "integer", n=1, size=8, endian=endian)
#Ou t(sapply(...)) (+ rapide ?)
data_ascii = do.call( rbind, lapply( indices, function(i) {
offset = 8+(i-1)*data_length*nbytes
if (offset > data_size)
return (vector("double",0))
ignored = seek(data_bin, offset)
- readBin(data_bin, "double", n=data_length, size=nbytes)
+ readBin(data_bin, "double", n=data_length, size=nbytes, endian=endian)
} ) )
close(data_bin)
if (ncol(data_ascii)>0) data_ascii else NULL
-#' @include de_serialize.R
-#' @include clustering.R
-NULL
-
#' CLAWS: CLustering with wAvelets and Wer distanceS
#'
#' Groups electricity power curves (or any series of similar nature) by applying PAM
#' \item function: a custom way to retrieve the curves; it has only one argument:
#' the indices of the series to be retrieved. See examples
#' }
+#' @inheritParams clustering
#' @param K1 Number of super-consumers to be found after stage 1 (K1 << N)
#' @param K2 Number of clusters to be found after stage 2 (K2 << K1)
-#' @param random TRUE (default) for random chunks repartition
-#' @param wf Wavelet transform filter; see ?wavelets::wt.filter. Default: haar
+#' @param wf Wavelet transform filter; see ?wavelets::wt.filter
+#' @param ctype Type of contribution: "relative" or "absolute" (or any prefix)
#' @param WER "end" to apply stage 2 after stage 1 has fully iterated, or "mix" to apply stage 2
#' at the end of each task
+#' @param random TRUE (default) for random chunks repartition
#' @param ntasks Number of tasks (parallel iterations to obtain K1 medoids); default: 1.
#' Note: ntasks << N, so that N is "roughly divisible" by N (number of series)
#' @param ncores_tasks "MPI" number of parallel tasks (1 to disable: sequential tasks)
#' @param ncores_clust "OpenMP" number of parallel clusterings in one task
#' @param nb_series_per_chunk (~Maximum) number of series in each group, inside a task
#' @param min_series_per_chunk Minimum number of series in each group
-#' @param sep Separator in CSV input file (relevant only if getSeries is a file name)
+#' @param sep Separator in CSV input file (if any provided)
#' @param nbytes Number of bytes to serialize a floating-point number; 4 or 8
#' @param endian Endianness to use for (de)serialization. Use "little" or "big" for portability
+#' @param verbose Level of verbosity (0/FALSE for nothing or 1/TRUE for all; devel stage)
#'
#' @return A matrix of the final medoids curves (K2) in rows
#'
#' x = seq(0,500,0.05)
#' L = length(x) #10001
#' ref_series = matrix( c(cos(x), cos(2*x), cos(3*x), sin(x), sin(2*x), sin(3*x)),
-#' byrows=TRUE, ncol=L )
+#' byrow=TRUE, ncol=L )
#' library(wmtsa)
#' series = do.call( rbind, lapply( 1:6, function(i)
#' do.call(rbind, wmtsa::wavBootstrap(ref_series[i,], n.realization=400)) ) )
#' #dim(series) #c(2400,10001)
-#' medoids_ascii = claws(series_RData, K1=60, K2=6, wf="d8", nb_series_per_chunk=500)
+#' medoids_ascii = claws(series, K1=60, K2=6, "d8", "rel", nb_series_per_chunk=500)
#'
#' # Same example, from CSV file
#' csv_file = "/tmp/epclust_series.csv"
#' write.table(series, csv_file, sep=",", row.names=FALSE, col.names=FALSE)
-#' medoids_csv = claws(csv_file, K1=60, K2=6, wf="d8", nb_series_per_chunk=500)
+#' medoids_csv = claws(csv_file, K1=60, K2=6, "d8", "rel", nb_series_per_chunk=500)
#'
#' # Same example, from binary file
#' bin_file = "/tmp/epclust_series.bin"
#' nbytes = 8
#' endian = "little"
-#' epclust::serialize(csv_file, bin_file, 500, nbytes, endian)
+#' epclust::binarize(csv_file, bin_file, 500, nbytes, endian)
#' getSeries = function(indices) getDataInFile(indices, bin_file, nbytes, endian)
-#' medoids_bin = claws(getSeries, K1=60, K2=6, wf="d8", nb_series_per_chunk=500)
+#' medoids_bin = claws(getSeries, K1=60, K2=6, "d8", "rel", nb_series_per_chunk=500)
#' unlink(csv_file)
#' unlink(bin_file)
#'
#' series_db <- dbConnect(RSQLite::SQLite(), "file::memory:")
#' # Prepare data.frame in DB-format
#' n = nrow(series)
-#' formatted_series = data.frame(
-#' ID = rep(1:n,each=L),
-#' time = as.POSIXct(1800*(0:n),"GMT",origin="2001-01-01"),
-#' value
-
-
-
-
-#' TODO
-
-
-#' times_values = as.data.frame(series)
+#' time_values = data.frame(
+#' id = rep(1:n,each=L),
+#' time = rep( as.POSIXct(1800*(0:n),"GMT",origin="2001-01-01"), L ),
+#' value = as.double(t(series)) )
#' dbWriteTable(series_db, "times_values", times_values)
-#' # NOTE: assume that DB internal data is not reorganized when computing coefficients
-#' indexToID_inDB <<- list()
+#' # Fill associative array, map index to identifier
+#' indexToID_inDB <- as.character(
+#' dbGetQuery(series_db, 'SELECT DISTINCT id FROM time_values')[,"id"] )
#' getSeries = function(indices) {
-#' con = dbConnect(drv = RSQLite::SQLite(), dbname = db_file)
-#' if (indices %in% indexToID_inDB)
-#' {
-#' df = dbGetQuery(con, paste(
-#' "SELECT value FROM times_values GROUP BY id OFFSET ",start,
-#' "LIMIT ", n, " ORDER BY date", sep=""))
-#' return (df)
-#' }
-#' else
-#' {
-#' ...
-#' }
+#' request = "SELECT id,value FROM times_values WHERE id in ("
+#' for (i in indices)
+#' request = paste(request, i, ",", sep="")
+#' request = paste(request, ")", sep="")
+#' df_series = dbGetQuery(series_db, request)
+#' # Assume that all series share same length at this stage
+#' ts_length = sum(df_series[,"id"] == df_series[1,"id"])
+#' t( as.matrix(df_series[,"value"], nrow=ts_length) )
#' }
-#' dbDisconnect(mydb)
+#' medoids_db = claws(getSeries, K1=60, K2=6, "d8", "rel", nb_series_per_chunk=500)
+#' dbDisconnect(series_db)
+#'
+#' # All computed medoids should be the same:
+#' digest::sha1(medoids_ascii)
+#' digest::sha1(medoids_csv)
+#' digest::sha1(medoids_bin)
+#' digest::sha1(medoids_db)
#' }
#' @export
claws = function(getSeries, K1, K2,
- random=TRUE, #randomize series order?
- wf="haar", #stage 1
+ wf,ctype, #stage 1
WER="end", #stage 2
+ random=TRUE, #randomize series order?
ntasks=1, ncores_tasks=1, ncores_clust=4, #control parallelism
nb_series_per_chunk=50*K1, min_series_per_chunk=5*K1, #chunk size
sep=",", #ASCII input separator
- nbytes=4, endian=.Platform$endian) #serialization (write,read)
+ nbytes=4, endian=.Platform$endian, #serialization (write,read)
+ verbose=FALSE)
{
# Check/transform arguments
if (!is.matrix(getSeries) && !is.function(getSeries) &&
- !is(getSeries, "connection" && !is.character(getSeries)))
+ !methods::is(getSeries, "connection" && !is.character(getSeries)))
{
stop("'getSeries': matrix, function, file or valid connection (no NA)")
}
if (!is.logical(random))
stop("'random': logical")
tryCatch(
- {ignored <- wt.filter(wf)},
+ {ignored <- wavelets::wt.filter(wf)},
error = function(e) stop("Invalid wavelet filter; see ?wavelets::wt.filter"))
if (WER!="end" && WER!="mix")
stop("WER takes values in {'end','mix'}")
nbytes = .toInteger(nbytes, function(x) x==4 || x==8)
# Serialize series if required, to always use a function
- bin_dir = "epclust.bin/"
+ bin_dir = ".epclust.bin/"
dir.create(bin_dir, showWarnings=FALSE, mode="0755")
if (!is.function(getSeries))
{
+ if (verbose)
+ cat("...Serialize time-series\n")
series_file = paste(bin_dir,"data",sep="") ; unlink(series_file)
- serialize(getSeries, series_file, nb_series_per_chunk, sep, nbytes, endian)
- getSeries = function(indices) getDataInFile(indices, series_file, nbytes, endian)
+ binarize(getSeries, series_file, nb_series_per_chunk, sep, nbytes, endian)
+ getSeries = function(inds) getDataInFile(inds, series_file, nbytes, endian)
}
- # Serialize all wavelets coefficients (+ IDs) onto a file
- coefs_file = paste(bin_dir,"coefs",sep="") ; unlink(coefs_file)
+ # Serialize all computed wavelets contributions onto a file
+ contribs_file = paste(bin_dir,"contribs",sep="") ; unlink(contribs_file)
index = 1
nb_curves = 0
+ if (verbose)
+ cat("...Compute contributions and serialize them\n")
repeat
{
series = getSeries((index-1)+seq_len(nb_series_per_chunk))
if (is.null(series))
break
- coefs_chunk = curvesToCoefs(series, wf)
- serialize(coefs_chunk, coefs_file, nb_series_per_chunk, sep, nbytes, endian)
+ contribs_chunk = curvesToContribs(series, wf, ctype)
+ binarize(contribs_chunk, contribs_file, nb_series_per_chunk, sep, nbytes, endian)
index = index + nb_series_per_chunk
- nb_curves = nb_curves + nrow(coefs_chunk)
+ nb_curves = nb_curves + nrow(contribs_chunk)
}
- getCoefs = function(indices) getDataInFile(indices, coefs_file, nbytes, endian)
+ getContribs = function(indices) getDataInFile(indices, contribs_file, nbytes, endian)
if (nb_curves < min_series_per_chunk)
stop("Not enough data: less rows than min_series_per_chunk!")
if (nb_series_per_task < min_series_per_chunk)
stop("Too many tasks: less series in one task than min_series_per_chunk!")
- # Cluster coefficients in parallel (by nb_series_per_chunk)
+ # Cluster contributions in parallel (by nb_series_per_chunk)
indices_all = if (random) sample(nb_curves) else seq_len(nb_curves)
indices_tasks = lapply(seq_len(ntasks), function(i) {
upper_bound = ifelse( i<ntasks, min(nb_series_per_task*i,nb_curves), nb_curves )
indices_all[((i-1)*nb_series_per_task+1):upper_bound]
})
- cl = parallel::makeCluster(ncores_tasks)
+ if (verbose)
+ cat(paste("...Run ",ntasks," x stage 1 in parallel\n",sep=""))
+# cl = parallel::makeCluster(ncores_tasks)
+# parallel::clusterExport(cl, varlist=c("getSeries","getContribs","K1","K2",
+# "nb_series_per_chunk","ncores_clust","synchrones_file","sep","nbytes","endian"),
+# envir = environment())
# 1000*K1 indices [if WER=="end"], or empty vector [if WER=="mix"] --> series on file
- indices = unlist( parallel::parLapply(cl, indices_tasks, function(inds) {
- require("epclust", quietly=TRUE)
- indices_medoids = clusteringTask(inds,getCoefs,K1,nb_series_per_chunk,ncores_clust)
+# indices = unlist( parallel::parLapply(cl, indices_tasks, function(inds) {
+ indices = unlist( lapply(indices_tasks, function(inds) {
+# require("epclust", quietly=TRUE)
+
+ browser() #TODO: CONTINUE DEBUG HERE
+
+ indices_medoids = clusteringTask(inds,getContribs,K1,nb_series_per_chunk,ncores_clust)
if (WER=="mix")
{
medoids2 = computeClusters2(
getSeries(indices_medoids), K2, getSeries, nb_series_per_chunk)
- serialize(medoids2, synchrones_file, nb_series_per_chunk, sep, nbytes, endian)
+ binarize(medoids2, synchrones_file, nb_series_per_chunk, sep, nbytes, endian)
return (vector("integer",0))
}
indices_medoids
}) )
- parallel::stopCluster(cl)
+# parallel::stopCluster(cl)
getRefSeries = getSeries
synchrones_file = paste(bin_dir,"synchrones",sep="") ; unlink(synchrones_file)
indices = seq_len(ntasks*K2)
#Now series must be retrieved from synchrones_file
getSeries = function(inds) getDataInFile(inds, synchrones_file, nbytes, endian)
- #Coefs must be re-computed
- unlink(coefs_file)
+ #Contributions must be re-computed
+ unlink(contribs_file)
index = 1
+ if (verbose)
+ cat("...Serialize contributions computed on synchrones\n")
repeat
{
series = getSeries((index-1)+seq_len(nb_series_per_chunk))
if (is.null(series))
break
- coefs_chunk = curvesToCoefs(series, wf)
- serialize(coefs_chunk, coefs_file, nb_series_per_chunk, sep, nbytes, endian)
+ contribs_chunk = curvesToContribs(series, wf, ctype)
+ binarize(contribs_chunk, contribs_file, nb_series_per_chunk, sep, nbytes, endian)
index = index + nb_series_per_chunk
}
}
# Run step2 on resulting indices or series (from file)
+ if (verbose)
+ cat("...Run final // stage 1 + stage 2\n")
indices_medoids = clusteringTask(
- indices, getCoefs, K1, nb_series_per_chunk, ncores_tasks*ncores_clust)
- computeClusters2(getSeries(indices_medoids),K2,getRefSeries,nb_series_per_chunk)
+ indices, getContribs, K1, nb_series_per_chunk, ncores_tasks*ncores_clust)
+ medoids = computeClusters2(getSeries(indices_medoids),K2,getRefSeries,nb_series_per_chunk)
+
+ # Cleanup
+ unlink(bin_dir, recursive=TRUE)
+
+ medoids
}
-# helper
-curvesToCoefs = function(series, wf)
+#' curvesToContribs
+#'
+#' Compute the discrete wavelet coefficients for each series, and aggregate them in
+#' energy contribution across scales as described in https://arxiv.org/abs/1101.4744v2
+#'
+#' @param series Matrix of series (in rows), of size n x L
+#' @inheritParams claws
+#'
+#' @return A matrix of size n x log(L) containing contributions in rows
+#'
+#' @export
+curvesToContribs = function(series, wf, ctype)
{
L = length(series[1,])
D = ceiling( log2(L) )
nb_sample_points = 2^D
+ cont_types = c("relative","absolute")
+ ctype = cont_types[ pmatch(ctype,cont_types) ]
t( apply(series, 1, function(x) {
interpolated_curve = spline(1:L, x, n=nb_sample_points)$y
W = wavelets::dwt(interpolated_curve, filter=wf, D)@W
- rev( sapply( W, function(v) ( sqrt( sum(v^2) ) ) ) )
+ nrj = rev( sapply( W, function(v) ( sqrt( sum(v^2) ) ) ) )
+ if (ctype=="relative") nrj / sum(nrj) else nrj
}) )
}
-# helper
+# Helper for main function: check integer arguments with functiional conditions
.toInteger <- function(x, condition)
{
if (!is.integer(x))
context("de_serialize")
-data_bin_file <<- "/tmp/epclust_test.bin"
-unlink(data_bin_file)
-
test_that("serialization + getDataInFile retrieve original data / from matrix",
{
+ data_bin_file = "/tmp/epclust_test_m.bin"
+ unlink(data_bin_file)
+
#dataset 200 lignes / 30 columns
data_ascii = matrix(runif(200*30,-10,10),ncol=30)
nbytes = 4 #lead to a precision of 1e-7 / 1e-8
endian = "little"
#Simulate serialization in one single call
- serialize(data_ascii, data_bin_file, 500, ",", nbytes, endian)
+ binarize(data_ascii, data_bin_file, 500, ",", nbytes, endian)
expect_equal(file.info(data_bin_file)$size, length(data_ascii)*nbytes+8)
for (indices in list(c(1,3,5), 3:13, c(5,20,50), c(75,130:135), 196:200))
{
#...in several calls (last call complete, next call NULL)
for (i in 1:20)
- serialize(data_ascii[((i-1)*10+1):(i*10),], data_bin_file, 20, ",", nbytes, endian)
+ binarize(data_ascii[((i-1)*10+1):(i*10),], data_bin_file, 20, ",", nbytes, endian)
expect_equal(file.info(data_bin_file)$size, length(data_ascii)*nbytes+8)
for (indices in list(c(1,3,5), 3:13, c(5,20,50), c(75,130:135), 196:200))
{
test_that("serialization + getDataInFile retrieve original data / from connection",
{
+ data_bin_file = "/tmp/epclust_test_c.bin"
+ unlink(data_bin_file)
+
#dataset 300 lignes / 50 columns
data_csv = system.file("testdata","de_serialize.csv",package="epclust")
nbytes = 8
endian = "big"
- data_ascii = as.matrix(read.csv(test_series, sep=";", header=FALSE)) #for ref
+ data_ascii = as.matrix(read.csv(data_csv, sep=";", header=FALSE)) #for ref
#Simulate serialization in one single call
- serialize(data_csv, data_bin_file, 350, ";", nbytes, endian)
+ binarize(data_csv, data_bin_file, 350, ";", nbytes, endian)
expect_equal(file.info(data_bin_file)$size, 300*50*8+8)
for (indices in list(c(1,3,5), 3:13, c(5,20,50), c(75,130:135), 196:200))
{
#...in several calls / chunks of 29 --> 29*10 + 10, incomplete last
data_con = file(data_csv, "r")
- serialize(data_con, data_bin_file, 29, ";", nbytes, endian)
+ binarize(data_con, data_bin_file, 29, ";", nbytes, endian)
expect_equal(file.info(data_bin_file)$size, 300*50*8+8)
for (indices in list(c(1,3,5), 3:13, c(5,20,50), c(75,130:135), 196:200))
{
expect_equal(data_lines, data_ascii[indices,])
}
unlink(data_bin_file)
- #close(data_con) --> done in serialize()
+ #close(data_con) --> done in binarize()
})