add realtime option, slightly refactor data acquisition
[talweg.git] / pkg / R / computeForecast.R
index bd19b8d..198f6ec 100644 (file)
 #' }
 #' @param memory Data depth (in days) to be used for prediction
 #' @param horizon Number of time steps to predict
-#' @param ... Additional parameters for the forecasting models
+#' @param ncores Number of cores for parallel execution (1 to disable)
+#' @param ... Additional parameters for the forecasting models;
+#'   In particular, realtime=TRUE to use predictions instead of measurements
 #'
 #' @return An object of class Forecast
 #'
 #' @examples
 #' ts_data = system.file("extdata","pm10_mesures_H_loc.csv",package="talweg")
 #' exo_data = system.file("extdata","meteo_extra_noNAs.csv",package="talweg")
-#' data = getData(ts_data, exo_data, input_tz = "Europe/Paris",
-#'   working_tz="Europe/Paris", predict_at=7)
+#' data = getData(ts_data, exo_data, input_tz="GMT", working_tz="GMT", predict_at=7)
 #' pred = computeForecast(data, 2200:2230, "Persistence", "Persistence", 500, 12)
 #' \dontrun{#Sketch for real-time mode:
 #' data = new("Data", ...)
 #' }}
 #' @export
 computeForecast = function(data, indices, forecaster, pjump,
-       memory=Inf, horizon=data$getStdHorizon(), ...)
+       memory=Inf, horizon=data$getStdHorizon(), ncores=3, ...)
 {
        # (basic) Arguments sanity checks
        horizon = as.integer(horizon)[1]
-       if (horizon<=0 || horizon>length(data$getCenteredSerie(2)))
+       if (horizon<=0 || horizon>length(data$getCenteredSerie(1)))
                stop("Horizon too short or too long")
        integer_indices = sapply(indices, function(i) dateIndexToInteger(i,data))
        if (any(integer_indices<=0 | integer_indices>data$getSize()))
@@ -52,43 +53,40 @@ computeForecast = function(data, indices, forecaster, pjump,
                stop("forecaster (name) and pjump (function) should be of class character")
 
        pred = Forecast$new( sapply(indices, function(i) integerIndexToDate(i,data)) )
-       forecaster_class_name = getFromNamespace(paste(forecaster,"Forecaster",sep=""), "talweg")
+       forecaster_class_name = getFromNamespace(
+               paste(forecaster,"Forecaster",sep=""), "talweg")
        forecaster = forecaster_class_name$new( #.pjump =
                getFromNamespace(paste("get",pjump,"JumpPredict",sep=""), "talweg"))
 
-#oo = forecaster$predictSerie(data, integer_indices[1], memory, horizon, ...)
-#browser()
-
-       parll=TRUE #FALSE
-       if (parll)
+       if (ncores > 1 && requireNamespace("parallel",quietly=TRUE))
        {
-               library(parallel)
-               ppp <- parallel::mclapply(seq_along(integer_indices), function(i) {
+               p <- parallel::mclapply(seq_along(integer_indices), function(i) {
                        list(
-                               "forecast" = forecaster$predictSerie(data, integer_indices[i], memory, horizon, ...),
+                               "forecast" = forecaster$predictSerie(
+                                       data, integer_indices[i], memory, horizon, ...),
                                "params"= forecaster$getParameters(),
                                "index" = integer_indices[i] )
-                       }, mc.cores=3)
+                       }, mc.cores=ncores)
        }
        else
        {
-               ppp <- lapply(seq_along(integer_indices), function(i) {
+               p <- lapply(seq_along(integer_indices), function(i) {
                        list(
-                               "forecast" = forecaster$predictSerie(data, integer_indices[i], memory, horizon, ...),
+                               "forecast" = forecaster$predictSerie(
+                                       data, integer_indices[i], memory, horizon, ...),
                                "params"= forecaster$getParameters(),
                                "index" = integer_indices[i] )
                        })
        }
-#browser()
-
-for (i in seq_along(integer_indices))
-{
-       pred$append(
-                       new_serie = ppp[[i]]$forecast,
-                       new_params = ppp[[i]]$params,
-                       new_index_in_data = ppp[[i]]$index
-       )
-}
 
+       # TODO: find a way to fill pred in //...
+       for (i in seq_along(integer_indices))
+       {
+               pred$append(
+                       forecast = p[[i]]$forecast,
+                       params = p[[i]]$params,
+                       index_in_data = p[[i]]$index
+               )
+       }
        pred
 }