#' cl = epclust(getData, K1=200, K2=15, ntasks=1000, nb_series_per_chunk=5000, WER="mix")
#' @export
epclust = function(series,K1,K2,ntasks=1,nb_series_per_chunk=50*K1,min_series_per_chunk=5*K1,
- wf="haar",WER="end",ncores_tasks=1,ncores_clust=4,random=TRUE,...)
+ wf="haar",WER="end",ncores_tasks=1,ncores_clust=4,random=TRUE,ftype="float",...)
{
# Check/transform arguments
bin_dir = "epclust.bin/"
unlink(series_file)
}
if (is.matrix(series))
- serialize(series, series_file)
+ serialize(series, series_file, ftype, nb_series_per_chunk)
else if (!is.function(series))
{
tryCatch(
open(series)
series_con = series
}
- serialize(series_con, series_file)
+ serialize(series_con, series_file, ftype, nb_series_per_chunk)
close(series_con)
},
error=function(e) "series should be a data.frame, a function or a valid connection"
if (is.null(series))
break
coeffs_chunk = curvesToCoeffs(series, wf)
- serialize(coeffs_chunk, coefs_file)
+ serialize(coeffs_chunk, coefs_file, ftype, nb_series_per_chunk)
index = index + nb_series_per_chunk
nb_curves = nb_curves + nrow(coeffs_chunk)
}
#1000*K1 (or K2) indices (or NOTHING--> series on file)
indices = unlist( parallel::parLapply(cl, indices_tasks, function(inds) {
clusteringTask(inds, getSeries, getSeries, getCoefs, K1, K2*(WER=="mix"),
- nb_series_per_chunk,ncores_clust,to_file=TRUE)
+ nb_series_per_chunk,ncores_clust,to_file=TRUE, ftype)
}) )
parallel::stopCluster(cl)
if (is.null(series))
break
coeffs_chunk = curvesToCoeffs(series, wf)
- serialize(coeffs_chunk, coefs_file)
+ serialize(coeffs_chunk, coefs_file, ftype, nb_series_per_chunk)
index = index + nb_series_per_chunk
}
}
# Run step2 on resulting indices or series (from file)
clusteringTask(indices, getSeries, getSeriesForSynchrones, getCoefs, K1, K2,
- nb_series_per_chunk, ncores_tasks*ncores_clust, to_file=FALSE)
+ nb_series_per_chunk, ncores_tasks*ncores_clust, to_file=FALSE, ftype)
}