# Cluster one full task (nb_curves / ntasks series)
clusteringTask = function(indices,getSeries,getSeriesForSynchrones,synchrones_file,
- getCoefs,K1,K2,nb_series_per_chunk,ncores,to_file)
+ getCoefs,K1,K2,nb_series_per_chunk,ncores,to_file,ftype)
{
cl = parallel::makeCluster(ncores)
repeat
parallel::stopCluster(cl)
if (K2 == 0)
return (indices)
- computeClusters2(indices, K2, getSeries, getSeriesForSynchrones, to_file)
+ computeClusters2(indices, K2, getSeries, getSeriesForSynchrones, to_file,
+ nb_series_per_chunk,ftype)
vector("integer",0)
}
}
# Cluster a chunk of series inside one task (~max nb_series_per_chunk)
-computeClusters2 = function(indices, K2, getSeries, getSeriesForSynchrones, to_file)
+computeClusters2 = function(indices, K2, getSeries, getSeriesForSynchrones, to_file,
+ nb_series_per_chunk, ftype)
{
- curves = computeSynchrones(indices, getSeries, getSeriesForSynchrones)
+ curves = computeSynchrones(indices, getSeries, getSeriesForSynchrones, nb_series_per_chunk)
dists = computeWerDists(curves)
medoids = cluster::pam(dists, K2, diss=TRUE)$medoids
if (to_file)
{
- serialize(medoids, synchrones_file)
+ serialize(medoids, synchrones_file, ftype, nb_series_per_chunk)
return (NULL)
}
medoids
}
# Compute the synchrones curves (sum of clusters elements) from a clustering result
-computeSynchrones = function(indices, getSeries, getSeriesForSynchrones)
+computeSynchrones = function(indices, getSeries, getSeriesForSynchrones, nb_series_per_chunk)
{
#les getSeries(indices) sont les medoides --> init vect nul pour chacun, puis incr avec les
#courbes (getSeriesForSynchrones) les plus proches... --> au sens de la norme L2 ?
- series = getSeries(indices)
- #...........
- #sapply(seq_along(inds), colMeans(getSeries(inds[[i]]$indices,inds[[i]]$ids)))
+ medoids = getSeries(indices)
+ K = nrow(medoids)
+ synchrones = matrix(0, nrow=K, ncol=ncol(medoids))
+ counts = rep(0,K)
+ index = 1
+ repeat
+ {
+ series = getSeriesForSynchrones((index-1)+seq_len(nb_series_per_chunk))
+ if (is.null(series))
+ break
+ #get medoids indices for this chunk of series
+ index = which.min( rowSums( sweep(medoids, 2, series[i,], '-')^2 ) )
+ synchrones[index,] = synchrones[index,] + series[i,]
+ counts[index] = counts[index] + 1
+ }
+ #NOTE: odds for some clusters to be empty? (when series already come from stage 2)
+ synchrones = sweep(synchrones, 1, counts, '/')
}
# Compute the WER distance between the synchrones curves (in rows)
#' cl = epclust(getData, K1=200, K2=15, ntasks=1000, nb_series_per_chunk=5000, WER="mix")
#' @export
epclust = function(series,K1,K2,ntasks=1,nb_series_per_chunk=50*K1,min_series_per_chunk=5*K1,
- wf="haar",WER="end",ncores_tasks=1,ncores_clust=4,random=TRUE,...)
+ wf="haar",WER="end",ncores_tasks=1,ncores_clust=4,random=TRUE,ftype="float",...)
{
# Check/transform arguments
bin_dir = "epclust.bin/"
unlink(series_file)
}
if (is.matrix(series))
- serialize(series, series_file)
+ serialize(series, series_file, ftype, nb_series_per_chunk)
else if (!is.function(series))
{
tryCatch(
open(series)
series_con = series
}
- serialize(series_con, series_file)
+ serialize(series_con, series_file, ftype, nb_series_per_chunk)
close(series_con)
},
error=function(e) "series should be a data.frame, a function or a valid connection"
if (is.null(series))
break
coeffs_chunk = curvesToCoeffs(series, wf)
- serialize(coeffs_chunk, coefs_file)
+ serialize(coeffs_chunk, coefs_file, ftype, nb_series_per_chunk)
index = index + nb_series_per_chunk
nb_curves = nb_curves + nrow(coeffs_chunk)
}
#1000*K1 (or K2) indices (or NOTHING--> series on file)
indices = unlist( parallel::parLapply(cl, indices_tasks, function(inds) {
clusteringTask(inds, getSeries, getSeries, getCoefs, K1, K2*(WER=="mix"),
- nb_series_per_chunk,ncores_clust,to_file=TRUE)
+ nb_series_per_chunk,ncores_clust,to_file=TRUE, ftype)
}) )
parallel::stopCluster(cl)
if (is.null(series))
break
coeffs_chunk = curvesToCoeffs(series, wf)
- serialize(coeffs_chunk, coefs_file)
+ serialize(coeffs_chunk, coefs_file, ftype, nb_series_per_chunk)
index = index + nb_series_per_chunk
}
}
# Run step2 on resulting indices or series (from file)
clusteringTask(indices, getSeries, getSeriesForSynchrones, getCoefs, K1, K2,
- nb_series_per_chunk, ncores_tasks*ncores_clust, to_file=FALSE)
+ nb_series_per_chunk, ncores_tasks*ncores_clust, to_file=FALSE, ftype)
}
x
}
-writeCoeffs = function(coeffs)
-{
- file = ".coeffs"
- #.........
- #C function (from data.frame, type of IDs ??! force integers ? [yes])
- #return raw vector
- #take raw vector, append it (binary mode) to a file
-#TODO: appendCoeffs() en C --> serialize et append to file
-}
-
-readCoeffs = function(indices)
-{
- #......
- file = ".coeffs"
- #C function (from file name)
-}
-
-getSeries(data, rank=NULL, id=NULL)
-{
- #TODO:
-}
-
curvesToCoeffs = function(series, wf)
{
L = length(series[1,])
rev( sapply( W, function(v) ( sqrt( sum(v^2) ) ) ) )
})
}
+
+#data: matrix of double or connection
+serialize = function(data, file, type, nb_per_chunk)
+{
+ bin_data = file(file, "ab")
+ #write data length on first call
+ nbytes = ifelse(type=="double",8,4)
+ first_write = FALSE
+ if (file.info(file)$size == 0)
+ {
+ #number of items always on 8 bytes
+ writeBin(0L, bin_data, size=8) #,endian="little")
+ first_write = TRUE
+ }
+ if (is.matrix(data))
+ {
+ writeBin(t(data), bin_data, size=nbytes)
+ data_length = ncol(data)
+ }
+ else #if (is(data, "connection"))
+ {
+ if (first_write)
+ {
+ data_line = scan(data, double(), sep=",", nlines=1)
+ writeBin(data_line, bin_data, size=nbytes)
+ data_length = length(data_line)
+ }
+ repeat
+ {
+ data_chunk = scan(data, double(), sep=",", nlines=nb_per_chunk)
+ if (length(data_chunk)==0)
+ break
+ writeBin(data_chunk, bin_data, size=nbytes)
+ }
+ }
+ if (first_write)
+ {
+ #ecrire file_size-1 / (nbytes*nbWritten) en 0 dans bin_data ! ignored == file_size
+ ignored = seek(bin_data, 0)
+ writeBin(data_length, bin_data, size=8)
+ }
+ close(bin_data)
+}
+
+#TODO: read in binary file, always same structure
+getDataFromFile(indices, file, type)
+{
+ bin_data = file(file, "rb")
+ nbytes = ifelse(type=="double",8,4)
+ data_length = readBin(bin_data,"double",1,nbytes) #,endian="little")
+ t(sapply(indices, function(i) readBin(bin_data,"double",n=data_length,size=nbytes)))
+}