'update'
[epclust.git] / epclust / R / main.R
CommitLineData
8702eb86 1#' @include de_serialize.R
56857861
BA
2#' @include clustering.R
3NULL
4
8702eb86 5#' CLAWS: CLustering with wAvelets and Wer distanceS
7f0781b7 6#'
56857861 7#' Groups electricity power curves (or any series of similar nature) by applying PAM
8702eb86
BA
8#' algorithm in parallel to chunks of size \code{nb_series_per_chunk}. Input series
9#' must be sampled on the same time grid, no missing values.
7f0781b7 10#'
8702eb86
BA
11#' @param getSeries Access to the (time-)series, which can be of one of the three
12#' following types:
13#' \itemize{
14#' \item matrix: each line contains all the values for one time-serie, ordered by time
15#' \item connection: any R connection object (e.g. a file) providing lines as described above
16#' \item function: a custom way to retrieve the curves; it has only one argument:
17#' the indices of the series to be retrieved. See examples
18#' }
1c6f223e
BA
19#' @param K1 Number of super-consumers to be found after stage 1 (K1 << N)
20#' @param K2 Number of clusters to be found after stage 2 (K2 << K1)
8702eb86
BA
21#' @param random TRUE (default) for random chunks repartition
22#' @param wf Wavelet transform filter; see ?wavelets::wt.filter. Default: haar
23#' @param WER "end" to apply stage 2 after stage 1 has fully iterated, or "mix" to apply stage 2
24#' at the end of each task
1c6f223e
BA
25#' @param ntasks Number of tasks (parallel iterations to obtain K1 medoids); default: 1.
26#' Note: ntasks << N, so that N is "roughly divisible" by N (number of series)
5c652979
BA
27#' @param ncores_tasks "MPI" number of parallel tasks (1 to disable: sequential tasks)
28#' @param ncores_clust "OpenMP" number of parallel clusterings in one task
8702eb86
BA
29#' @param nb_series_per_chunk (~Maximum) number of series in each group, inside a task
30#' @param min_series_per_chunk Minimum number of series in each group
31#' @param sep Separator in CSV input file (relevant only if getSeries is a file name)
32#' @param nbytes Number of bytes to serialize a floating-point number; 4 or 8
33#' @param endian Endianness to use for (de)serialization. Use "little" or "big" for portability
7f0781b7 34#'
4efef8cc 35#' @return A matrix of the final medoids curves (K2) in rows
1c6f223e
BA
36#'
37#' @examples
4efef8cc
BA
38#' \dontrun{
39#' # WER distances computations are a bit too long for CRAN (for now)
40#'
41#' # Random series around cos(x,2x,3x)/sin(x,2x,3x)
42#' x = seq(0,500,0.05)
43#' L = length(x) #10001
44#' ref_series = matrix( c(cos(x), cos(2*x), cos(3*x), sin(x), sin(2*x), sin(3*x)),
45#' byrows=TRUE, ncol=L )
46#' library(wmtsa)
47#' series = do.call( rbind, lapply( 1:6, function(i)
48#' do.call(rbind, wmtsa::wavBootstrap(ref_series[i,], n.realization=400)) ) )
49#' #dim(series) #c(2400,10001)
50#' medoids_ascii = claws(series_RData, K1=60, K2=6, wf="d8", nb_series_per_chunk=500)
51#'
52#' # Same example, from CSV file
53#' csv_file = "/tmp/epclust_series.csv"
54#' write.table(series, csv_file, sep=",", row.names=FALSE, col.names=FALSE)
55#' medoids_csv = claws(csv_file, K1=60, K2=6, wf="d8", nb_series_per_chunk=500)
56#'
57#' # Same example, from binary file
58#' bin_file = "/tmp/epclust_series.bin"
59#' nbytes = 8
60#' endian = "little"
61#' epclust::serialize(csv_file, bin_file, 500, nbytes, endian)
62#' getSeries = function(indices) getDataInFile(indices, bin_file, nbytes, endian)
63#' medoids_bin = claws(getSeries, K1=60, K2=6, wf="d8", nb_series_per_chunk=500)
64#' unlink(csv_file)
65#' unlink(bin_file)
66#'
67#' # Same example, from SQLite database
68#' library(DBI)
69#' series_db <- dbConnect(RSQLite::SQLite(), "file::memory:")
70#' # Prepare data.frame in DB-format
71#' n = nrow(series)
72#' formatted_series = data.frame(
73#' ID = rep(1:n,each=L),
74#' time = as.POSIXct(1800*(0:n),"GMT",origin="2001-01-01"),
75#' value
76
77
78
79
80#' TODO
81
82
83#' times_values = as.data.frame(series)
84#' dbWriteTable(series_db, "times_values", times_values)
85#' # NOTE: assume that DB internal data is not reorganized when computing coefficients
86#' indexToID_inDB <<- list()
87#' getSeries = function(indices) {
88#' con = dbConnect(drv = RSQLite::SQLite(), dbname = db_file)
89#' if (indices %in% indexToID_inDB)
90#' {
91#' df = dbGetQuery(con, paste(
92#' "SELECT value FROM times_values GROUP BY id OFFSET ",start,
1c6f223e 93#' "LIMIT ", n, " ORDER BY date", sep=""))
4efef8cc
BA
94#' return (df)
95#' }
96#' else
97#' {
98#' ...
99#' }
100#' }
101#' dbDisconnect(mydb)
1c6f223e 102#' }
1c6f223e 103#' @export
56857861
BA
104claws = function(getSeries, K1, K2,
105 random=TRUE, #randomize series order?
106 wf="haar", #stage 1
107 WER="end", #stage 2
108 ntasks=1, ncores_tasks=1, ncores_clust=4, #control parallelism
109 nb_series_per_chunk=50*K1, min_series_per_chunk=5*K1, #chunk size
110 sep=",", #ASCII input separator
111 nbytes=4, endian=.Platform$endian) #serialization (write,read)
ac1d4231 112{
0e2dce80 113 # Check/transform arguments
56857861
BA
114 if (!is.matrix(getSeries) && !is.function(getSeries) &&
115 !is(getSeries, "connection" && !is.character(getSeries)))
0e2dce80 116 {
56857861 117 stop("'getSeries': matrix, function, file or valid connection (no NA)")
5c652979 118 }
56857861
BA
119 K1 = .toInteger(K1, function(x) x>=2)
120 K2 = .toInteger(K2, function(x) x>=2)
121 if (!is.logical(random))
122 stop("'random': logical")
123 tryCatch(
124 {ignored <- wt.filter(wf)},
125 error = function(e) stop("Invalid wavelet filter; see ?wavelets::wt.filter"))
7f0781b7
BA
126 if (WER!="end" && WER!="mix")
127 stop("WER takes values in {'end','mix'}")
56857861
BA
128 ntasks = .toInteger(ntasks, function(x) x>=1)
129 ncores_tasks = .toInteger(ncores_tasks, function(x) x>=1)
130 ncores_clust = .toInteger(ncores_clust, function(x) x>=1)
131 nb_series_per_chunk = .toInteger(nb_series_per_chunk, function(x) x>=K1)
132 min_series_per_chunk = .toInteger(K1, function(x) x>=K1 && x<=nb_series_per_chunk)
133 if (!is.character(sep))
134 stop("'sep': character")
135 nbytes = .toInteger(nbytes, function(x) x==4 || x==8)
136
137 # Serialize series if required, to always use a function
138 bin_dir = "epclust.bin/"
139 dir.create(bin_dir, showWarnings=FALSE, mode="0755")
140 if (!is.function(getSeries))
141 {
142 series_file = paste(bin_dir,"data",sep="") ; unlink(series_file)
143 serialize(getSeries, series_file, nb_series_per_chunk, sep, nbytes, endian)
144 getSeries = function(indices) getDataInFile(indices, series_file, nbytes, endian)
145 }
ac1d4231 146
7b13d0c2 147 # Serialize all wavelets coefficients (+ IDs) onto a file
56857861 148 coefs_file = paste(bin_dir,"coefs",sep="") ; unlink(coefs_file)
7f0781b7 149 index = 1
cea14f3a 150 nb_curves = 0
6ecf5c2d 151 repeat
ac1d4231 152 {
0e2dce80
BA
153 series = getSeries((index-1)+seq_len(nb_series_per_chunk))
154 if (is.null(series))
cea14f3a 155 break
8702eb86
BA
156 coefs_chunk = curvesToCoefs(series, wf)
157 serialize(coefs_chunk, coefs_file, nb_series_per_chunk, sep, nbytes, endian)
cea14f3a 158 index = index + nb_series_per_chunk
8702eb86 159 nb_curves = nb_curves + nrow(coefs_chunk)
8e6accca 160 }
56857861 161 getCoefs = function(indices) getDataInFile(indices, coefs_file, nbytes, endian)
8e6accca 162
5c652979
BA
163 if (nb_curves < min_series_per_chunk)
164 stop("Not enough data: less rows than min_series_per_chunk!")
165 nb_series_per_task = round(nb_curves / ntasks)
166 if (nb_series_per_task < min_series_per_chunk)
167 stop("Too many tasks: less series in one task than min_series_per_chunk!")
ac1d4231 168
7b13d0c2 169 # Cluster coefficients in parallel (by nb_series_per_chunk)
56857861 170 indices_all = if (random) sample(nb_curves) else seq_len(nb_curves)
48108c39 171 indices_tasks = lapply(seq_len(ntasks), function(i) {
5c652979 172 upper_bound = ifelse( i<ntasks, min(nb_series_per_task*i,nb_curves), nb_curves )
56857861 173 indices_all[((i-1)*nb_series_per_task+1):upper_bound]
48108c39 174 })
e205f218 175 cl = parallel::makeCluster(ncores_tasks)
56857861 176 # 1000*K1 indices [if WER=="end"], or empty vector [if WER=="mix"] --> series on file
e205f218 177 indices = unlist( parallel::parLapply(cl, indices_tasks, function(inds) {
4efef8cc 178 require("epclust", quietly=TRUE)
56857861
BA
179 indices_medoids = clusteringTask(inds,getCoefs,K1,nb_series_per_chunk,ncores_clust)
180 if (WER=="mix")
181 {
182 medoids2 = computeClusters2(
183 getSeries(indices_medoids), K2, getSeries, nb_series_per_chunk)
184 serialize(medoids2, synchrones_file, nb_series_per_chunk, sep, nbytes, endian)
185 return (vector("integer",0))
186 }
187 indices_medoids
e205f218
BA
188 }) )
189 parallel::stopCluster(cl)
3465b246 190
8702eb86 191 getRefSeries = getSeries
56857861 192 synchrones_file = paste(bin_dir,"synchrones",sep="") ; unlink(synchrones_file)
e205f218
BA
193 if (WER=="mix")
194 {
195 indices = seq_len(ntasks*K2)
196 #Now series must be retrieved from synchrones_file
56857861 197 getSeries = function(inds) getDataInFile(inds, synchrones_file, nbytes, endian)
e205f218
BA
198 #Coefs must be re-computed
199 unlink(coefs_file)
200 index = 1
201 repeat
202 {
203 series = getSeries((index-1)+seq_len(nb_series_per_chunk))
204 if (is.null(series))
205 break
8702eb86
BA
206 coefs_chunk = curvesToCoefs(series, wf)
207 serialize(coefs_chunk, coefs_file, nb_series_per_chunk, sep, nbytes, endian)
e205f218
BA
208 index = index + nb_series_per_chunk
209 }
210 }
0e2dce80
BA
211
212 # Run step2 on resulting indices or series (from file)
56857861
BA
213 indices_medoids = clusteringTask(
214 indices, getCoefs, K1, nb_series_per_chunk, ncores_tasks*ncores_clust)
8702eb86 215 computeClusters2(getSeries(indices_medoids),K2,getRefSeries,nb_series_per_chunk)
56857861
BA
216}
217
218# helper
8702eb86 219curvesToCoefs = function(series, wf)
56857861
BA
220{
221 L = length(series[1,])
222 D = ceiling( log2(L) )
223 nb_sample_points = 2^D
8702eb86 224 t( apply(series, 1, function(x) {
56857861
BA
225 interpolated_curve = spline(1:L, x, n=nb_sample_points)$y
226 W = wavelets::dwt(interpolated_curve, filter=wf, D)@W
227 rev( sapply( W, function(v) ( sqrt( sum(v^2) ) ) ) )
8702eb86 228 }) )
56857861
BA
229}
230
231# helper
232.toInteger <- function(x, condition)
233{
234 if (!is.integer(x))
235 tryCatch(
236 {x = as.integer(x)[1]},
237 error = function(e) paste("Cannot convert argument",substitute(x),"to integer")
238 )
239 if (!condition(x))
240 stop(paste("Argument",substitute(x),"does not verify condition",body(condition)))
241 x
cea14f3a 242}