add realtime option, slightly refactor data acquisition
authorBenjamin Auder <benjamin.auder@somewhere>
Wed, 5 Apr 2017 00:03:14 +0000 (02:03 +0200)
committerBenjamin Auder <benjamin.auder@somewhere>
Wed, 5 Apr 2017 00:03:14 +0000 (02:03 +0200)
TODO
pkg/R/Data.R
pkg/R/F_Persistence.R
pkg/R/Forecast.R
pkg/R/J_Neighbors.R
pkg/R/J_Persistence.R
pkg/R/computeError.R
pkg/R/computeForecast.R
pkg/R/getData.R
pkg/R/plot.R
pkg/R/utils.R

diff --git a/TODO b/TODO
index 7de936b..a0abec2 100644 (file)
--- a/TODO
+++ b/TODO
@@ -1,11 +1,4 @@
 s'inspirer de vim-rmarkdown pour syntaxe .gj ?
 
-script bash qui passe variables "heure début" et "horizon" à report.gj > stocke fichier _H_h,
-puis run Jupyter ...
-
 version this (talweg) as a whole; private!
 subtree: talweg-R (shared with world, R package)
-
-==> les poids ne sont pas normalisés en sortie de predictSHape de Neighbors(2)
-[ce serait mieux, ça éviterait la normalisation systématique dans J_Neighbors, et corrigerait les
-histogrammes de poids................]
index 697da05..551cfaf 100644 (file)
 #'   Return number of series in dataset.}
 #' \item{\code{getStdHorizon()}}{
 #'   Return number of time steps from serie[1] until midnight}
-#' \item{\code{append(new_time, new_centered_serie, new_level, new_exo, new_exo_hat)}}{
-#'   Acquire a new vector of lists (time, centered_serie, level, exo, exo_hat).}
+#' \item{\code{appendHat(time, hat_serie, hat_exo)}}{
+#'   New estimated data + time.}
+#' \item{\code{append(serie, exo)}}{
+#'   New measured data; call *after* \code{appendHat()}}
 #' \item{\code{getTime(index)}}{
 #'   Get times at specified index.}
-#' \item{\code{getCenteredSerie(index)}}{
-#'   Get centered serie at specified index.}
-#' \item{\code{getCenteredSeries(indices)}}{
+#' \item{\code{getCenteredSerie(index, hat=FALSE)}}{
+#'   Get (measured or predicted) centered serie at specified index.}
+#' \item{\code{getCenteredSeries(indices, hat=FALSE)}}{
 #'   Get centered series at specified indices (in columns).}
-#' \item{\code{getLevel(index)}}{
+#' \item{\code{getLevel(index, hat=FALSE)}}{
 #'   Get level at specified index.}
-#' \item{\code{getSerie(index)}}{
+#' \item{\code{getSerie(index, hat=FALSE)}}{
 #'   Get serie (centered+level) at specified index.}
-#' \item{\code{getSeries(indices)}}{
+#' \item{\code{getSeries(indices, hat=FALSE)}}{
 #'   Get series at specified indices (in columns).}
-#' \item{\code{getExo(index)}}{
+#' \item{\code{getExo(index, hat=FALSE)}}{
 #'   Get exogenous variables at specified index.}
-#' \item{\code{getExoHat(index)}}{
-#'   Get estimated exogenous variables at specified index.}
 #' }
 #'
 Data = R6::R6Class("Data",
@@ -49,47 +49,64 @@ Data = R6::R6Class("Data",
                getStdHorizon = function()
                        24 - as.POSIXlt( private$.data[[1]]$time[1] )$hour + 1
                ,
-               append = function(new_time, new_centered_serie, new_level, new_exo, new_exo_hat)
+               appendHat = function(time, hat_serie, hat_exo)
                {
+                       hat_level = mean(hat_serie, na.rm=TRUE)
+                       hat_centered_serie = hat_serie - hat_level
                        private$.data[[length(private$.data)+1]] <- list(
-                               "time"=new_time, "centered_serie"=new_centered_serie, "level"=new_level,
-                               "exo"=new_exo, "exo_hat"=new_exo_hat)
+                               "time"=time, "hat_centered_serie"=hat_centered_serie,
+                               "hat_level"=hat_level, "hat_exo"=hat_exo )
+               },
+               append = function(serie, exo)
+               {
+                       level = mean(serie, na.rm=TRUE)
+                       centered_serie = serie - level
+                       private$.data[[length(private$.data)]]$centered_serie <- centered_serie,
+                       private$.data[[length(private$.data)]]$level <- level,
+                       private$.data[[length(private$.data)]]$exo <- exo,
                },
                getTime = function(index)
                {
                        index = dateIndexToInteger(index, self)
                        private$.data[[index]]$time
                },
-               getCenteredSerie = function(index)
+               getCenteredSerie = function(index, hat=FALSE)
                {
                        index = dateIndexToInteger(index, self)
-                       private$.data[[index]]$centered_serie
+                       if (hat)
+                               private$.data[[index]]$hat_centered_serie
+                       else
+                               private$.data[[index]]$centered_serie
                },
-               getCenteredSeries = function(indices)
-                       sapply(indices, function(i) self$getCenteredSerie(i))
+               getCenteredSeries = function(indices, hat=FALSE)
+                       sapply(indices, function(i) self$getCenteredSerie(i, hat))
                ,
-               getLevel = function(index)
+               getLevel = function(index, hat=FALSE)
                {
                        index = dateIndexToInteger(index, self)
-                       private$.data[[index]]$level
+                       if (hat)
+                               private$.data[[index]]$hat_level
+                       else
+                               private$.data[[index]]$level
                },
-               getSerie = function(index)
+               getSerie = function(index, hat=FALSE)
                {
                        index = dateIndexToInteger(index, self)
-                       private$.data[[index]]$centered_serie + private$.data[[index]]$level
+                       if (hat)
+                               private$.data[[index]]$hat_centered_serie + private$.data[[index]]$hat_level
+                       else
+                               private$.data[[index]]$centered_serie + private$.data[[index]]$level
                },
-               getSeries = function(indices)
-                       sapply(indices, function(i) self$getSerie(i))
+               getSeries = function(indices, hat=FALSE)
+                               sapply(indices, function(i) self$getSerie(i, hat))
                ,
-               getExo = function(index)
-               {
-                       index = dateIndexToInteger(index, self)
-                       private$.data[[index]]$exo
-               },
-               getExoHat = function(index)
+               getExo = function(index, hat=FALSE)
                {
                        index = dateIndexToInteger(index, self)
-                       private$.data[[index]]$exo_hat
+                       if (hat)
+                               private$.data[[index]]$hat_exo
+                       else
+                               private$.data[[index]]$exo
                },
                removeFirst = function()
                        private$.data <- private$.data[2:length(private$.data)]
index 1d9fd19..79cfe3c 100644 (file)
@@ -14,12 +14,14 @@ PersistenceForecaster = R6::R6Class("PersistenceForecaster",
                        # Return centered last (similar) day curve, avoiding NAs until memory is run
                        first_day = max(1, today-memory)
                        same_day = ifelse(hasArg("same_day"), list(...)$same_day, TRUE)
+                       realtime = ifelse(hasArg("realtime"), list(...)$realtime, FALSE)
                        # If 'same_day', get the last known future of similar day: -7 + 1 == -6
                        index = today - ifelse(same_day,6,0)
                        repeat
                        {
                                {
-                                       last_serie = data$getCenteredSerie(index)[1:horizon]
+                                       last_serie =
+                                               data$getCenteredSerie(index,hat=(index==today && realtime))[1:horizon]
                                        index = index - ifelse(same_day,7,1)
                                };
                                if (!any(is.na(last_serie)))
index c64d24f..a0ddcda 100644 (file)
 #'   Initialize a Forecast object with a series of date indices.}
 #' \item{\code{getSize()}}{
 #'   Return number of individual forecasts.}
-#' \item{\code{append(new_serie, new_params, new_index_in_data)}}{
-#'   Acquire a new individual forecast, with its (optimized) parameters and the corresponding
-#'   index in the dataset.}
+#' \item{\code{append(forecast, params, index_in_data)}}{
+#'   Acquire an individual forecast, with its (optimized) parameters and the
+#'   corresponding index in the dataset.}
 #' \item{\code{getDates()}}{
 #'   Get dates where forecast occurs.}
-#' \item{\code{getSerie(index)}}{
+#' \item{\code{getForecast(index)}}{
 #'   Get forecasted serie at specified index.}
 #' \item{\code{getParams(index)}}{
 #'   Get parameters at specified index (for 'Neighbors' method).}
@@ -44,19 +44,19 @@ Forecast = R6::R6Class("Forecast",
                getSize = function()
                        length(private$.pred)
                ,
-               append = function(new_serie, new_params, new_index_in_data)
+               append = function(forecast, params, index_in_data)
                {
                        private$.pred[[length(private$.pred)+1]] <-
-                               list("serie"=new_serie, "params"=new_params, "index_in_data"=new_index_in_data)
+                               list("forecast"=forecast, "params"=params, "index_in_data"=index_in_data)
                },
                getDates = function()
                        as.Date( private$.dates, origin="1970-01-01" )
                ,
-               getSerie = function(index)
+               getForecast = function(index)
                {
                        if (is(index,"Date"))
                                index = match(index, private$.dates)
-                       private$.pred[[index]]$serie
+                       private$.pred[[index]]$forecast
                },
                getParams = function(index)
                {
index 7ca0003..351fbf9 100644 (file)
@@ -9,12 +9,13 @@ getNeighborsJumpPredict = function(data, today, memory, horizon, params, ...)
        filter = (params$indices >= first_day)
        indices = params$indices[filter]
        weights = params$weights[filter]
+       realtime = ifelse(hasArg("realtime"), list(...)$realtime, FALSE)
 
        if (any(is.na(weights) | is.na(indices)))
                return (NA)
 
        gaps = sapply(indices, function(i) {
-               data$getSerie(i+1)[1] - tail(data$getSerie(i), 1)
+               data$getSerie(i+1,hat=(realtime && i+1==today))[1] - tail(data$getSerie(i), 1)
        })
        scal_product = weights * gaps
        norm_fact = sum( weights[!is.na(scal_product)] )
index 5d156cc..4d3abd2 100644 (file)
@@ -8,12 +8,13 @@ getPersistenceJumpPredict = function(data, today, memory, horizon, params, ...)
        #return gap between end of similar day curve and first day of tomorrow (in the past)
        first_day = max(1, today-memory)
        same_day = ifelse(hasArg("same_day"), list(...)$same_day, TRUE)
+       realtime = ifelse(hasArg("realtime"), list(...)$realtime, FALSE)
        index = today - ifelse(same_day,7,1)
        repeat
        {
                {
                        last_serie_end = tail( data$getSerie(index), 1)
-                       last_tomorrow_begin = data$getSerie(index+1)[1]
+                       last_tomorrow_begin = data$getSerie(index+1,hat=(realtime && index+1==today))[1]
                        index = index - ifelse(same_day,7,1)
                };
                if (!is.na(last_serie_end) && !is.na(last_tomorrow_begin))
index b57e607..ce05fdb 100644 (file)
@@ -3,13 +3,13 @@
 #' Obtain the errors between forecast and data
 #'
 #' @param data Dataset, object of class \code{Data} output of \code{getData}
-#' @param forecast Forecast object, class \code{Forecast} output of \code{computeForecast}
+#' @param pred Forecast object, class \code{Forecast} output of \code{computeForecast}
 #' @param horizon Horizon where to compute the error (<= horizon used in \code{computeForecast})
 #'
 #' @return A list (abs,MAPE) of lists (day,indices)
 #'
 #' @export
-computeError = function(data, forecast, horizon=data$getStdHorizon())
+computeError = function(data, pred, horizon=data$getStdHorizon())
 {
        L = forecast$getSize()
        mape_day = rep(0, horizon)
@@ -22,15 +22,15 @@ computeError = function(data, forecast, horizon=data$getStdHorizon())
        {
                index = forecast$getIndexInData(i)
                serie = data$getSerie(index+1)[1:horizon]
-               pred = forecast$getSerie(i)[1:horizon]
-               if (!any(is.na(serie)) && !any(is.na(pred)))
+               forecast = pred$getForecast(i)[1:horizon]
+               if (!any(is.na(serie)) && !any(is.na(forecast)))
                {
                        nb_no_NA_data = nb_no_NA_data + 1
-                       mape_increment = abs(serie - pred) / serie
+                       mape_increment = abs(serie - forecast) / serie
                        mape_increment[is.nan(mape_increment)] = 0. # 0 / 0
                        mape_increment[!is.finite(mape_increment)] = 1. # >0 / 0
                        mape_day = mape_day + mape_increment
-                       abs_increment = abs(serie - pred)
+                       abs_increment = abs(serie - forecast)
                        abs_day = abs_day + abs_increment
                        mape_indices[i] = mean(mape_increment)
                        abs_indices[i] = mean(abs_increment)
index 24f114e..198f6ec 100644 (file)
@@ -20,7 +20,8 @@
 #' @param memory Data depth (in days) to be used for prediction
 #' @param horizon Number of time steps to predict
 #' @param ncores Number of cores for parallel execution (1 to disable)
-#' @param ... Additional parameters for the forecasting models
+#' @param ... Additional parameters for the forecasting models;
+#'   In particular, realtime=TRUE to use predictions instead of measurements
 #'
 #' @return An object of class Forecast
 #'
@@ -43,7 +44,7 @@ computeForecast = function(data, indices, forecaster, pjump,
 {
        # (basic) Arguments sanity checks
        horizon = as.integer(horizon)[1]
-       if (horizon<=0 || horizon>length(data$getCenteredSerie(2)))
+       if (horizon<=0 || horizon>length(data$getCenteredSerie(1)))
                stop("Horizon too short or too long")
        integer_indices = sapply(indices, function(i) dateIndexToInteger(i,data))
        if (any(integer_indices<=0 | integer_indices>data$getSize()))
@@ -52,7 +53,8 @@ computeForecast = function(data, indices, forecaster, pjump,
                stop("forecaster (name) and pjump (function) should be of class character")
 
        pred = Forecast$new( sapply(indices, function(i) integerIndexToDate(i,data)) )
-       forecaster_class_name = getFromNamespace(paste(forecaster,"Forecaster",sep=""), "talweg")
+       forecaster_class_name = getFromNamespace(
+               paste(forecaster,"Forecaster",sep=""), "talweg")
        forecaster = forecaster_class_name$new( #.pjump =
                getFromNamespace(paste("get",pjump,"JumpPredict",sep=""), "talweg"))
 
@@ -60,7 +62,8 @@ computeForecast = function(data, indices, forecaster, pjump,
        {
                p <- parallel::mclapply(seq_along(integer_indices), function(i) {
                        list(
-                               "forecast" = forecaster$predictSerie(data, integer_indices[i], memory, horizon, ...),
+                               "forecast" = forecaster$predictSerie(
+                                       data, integer_indices[i], memory, horizon, ...),
                                "params"= forecaster$getParameters(),
                                "index" = integer_indices[i] )
                        }, mc.cores=ncores)
@@ -69,7 +72,8 @@ computeForecast = function(data, indices, forecaster, pjump,
        {
                p <- lapply(seq_along(integer_indices), function(i) {
                        list(
-                               "forecast" = forecaster$predictSerie(data, integer_indices[i], memory, horizon, ...),
+                               "forecast" = forecaster$predictSerie(
+                                       data, integer_indices[i], memory, horizon, ...),
                                "params"= forecaster$getParameters(),
                                "index" = integer_indices[i] )
                        })
@@ -79,9 +83,9 @@ computeForecast = function(data, indices, forecaster, pjump,
        for (i in seq_along(integer_indices))
        {
                pred$append(
-                       new_serie = p[[i]]$forecast,
-                       new_params = p[[i]]$params,
-                       new_index_in_data = p[[i]]$index
+                       forecast = p[[i]]$forecast,
+                       params = p[[i]]$params,
+                       index_in_data = p[[i]]$index
                )
        }
        pred
index e13cf86..b944dfb 100644 (file)
@@ -1,13 +1,14 @@
 #' @title Acquire data in a clean format
 #'
-#' @description Take in input data frames and/or files containing raw data, and timezones, and
-#'   output a Data object, roughly corresponding to a list where each cell contains all value
-#'   for one day (see \code{?Data}).
+#' @description Take in input data frames and/or files containing raw data, and
+#'   timezones, and output a Data object, roughly corresponding to a list where each cell
+#'   contains all value for one day (see \code{?Data}).
 #'
 #' @param ts_data Time-series, as a data frame (DB style: 2 columns, first is date/time,
 #'   second is value) or a CSV file
-#' @param exo_data Exogenous variables, as a data frame or a CSV file; first comlumn is dates,
-#'   next block are measurements for the day, and final block are exogenous forecasts
+#' @param exo_data Exogenous variables, as a data frame or a CSV file; first comlumn is
+#'   dates, next block are measurements for the day, and final block are exogenous
+#'   forecasts
 #' @param input_tz Timezone in the input files ("GMT" or e.g. "Europe/Paris")
 #' @param date_format How date/time are stored (e.g. year/month/day hour:minutes;
 #'   see \code{strptime})
@@ -51,7 +52,8 @@ getData = function(ts_data, exo_data, input_tz="GMT", date_format="%d/%m/%Y %H:%
                        ts_data
        # Convert to the desired timezone (usually "GMT" or "Europe/Paris")
        formatted_dates_POSIXlt = strptime(as.character(ts_df[,1]), date_format, tz=input_tz)
-       ts_df[,1] = format(as.POSIXct(formatted_dates_POSIXlt, tz=input_tz), tz=working_tz, usetz=TRUE)
+       ts_df[,1] = format(
+               as.POSIXct(formatted_dates_POSIXlt, tz=input_tz), tz=working_tz, usetz=TRUE)
 
        exo_df =
                if (is.character(exo_data))
@@ -69,22 +71,26 @@ getData = function(ts_data, exo_data, input_tz="GMT", date_format="%d/%m/%Y %H:%
        {
                time = c()
                serie = c()
+               hat_serie = c()
                repeat
                {
                        {
                                time = c(time, ts_df[line,1])
+                               hat_serie = c(serie, ts_df[line,3])
                                serie = c(serie, ts_df[line,2])
                                line = line + 1
                        };
-                       if (line >= nb_lines + 1 || as.POSIXlt(ts_df[line-1,1],tz=working_tz)$hour == predict_at)
+                       if (line >= nb_lines + 1
+                               || as.POSIXlt(ts_df[line-1,1],tz=working_tz)$hour == predict_at)
+                       {
                                break
+                       }
                }
 
+               hat_exo = as.data.frame( exo_df[i,(1+nb_exos+1):(1+2*nb_exos)] )
                exo = as.data.frame( exo_df[i,2:(1+nb_exos)] )
-               exo_hat = as.data.frame( exo_df[i,(1+nb_exos+1):(1+2*nb_exos)] )
-               level = mean(serie, na.rm=TRUE)
-               centered_serie = serie - level
-               data$append(time, centered_serie, level, exo, exo_hat)
+               data$appendHat(time, hat_serie, hat_exo)
+               data$append(serie, exo) #in realtime, this call comes hours later
                if (i >= limit)
                        break
                i = i + 1
index 52b077b..5ea4843 100644 (file)
@@ -28,8 +28,8 @@ plotCurves <- function(data, indices=seq_len(data$getSize()))
 #' @param cols Colors for each error (default: 1,2,3,...)
 #'
 #' @seealso \code{\link{plotCurves}}, \code{\link{plotPredReal}},
-#'   \code{\link{plotSimils}}, \code{\link{plotFbox}},
-#'   \code{\link{computeFilaments}, }\code{\link{plotFilamentsBox}}, \code{\link{plotRelVar}}
+#'   \code{\link{plotSimils}}, \code{\link{plotFbox}}, \code{\link{computeFilaments}},
+#'   \code{\link{plotFilamentsBox}}, \code{\link{plotRelVar}}
 #'
 #' @export
 plotError <- function(err, cols=seq_along(err))
@@ -83,9 +83,9 @@ plotError <- function(err, cols=seq_along(err))
 #' @export
 plotPredReal <- function(data, pred, index)
 {
-       horizon = length(pred$getSerie(1))
+       horizon = length(pred$getForecast(1))
        measure = data$getSerie( pred$getIndexInData(index)+1 )[1:horizon]
-       prediction = pred$getSerie(index)
+       prediction = pred$getForecast(index)
        yrange = range(measure, prediction)
        par(mar=c(4.7,5,1,1), cex.axis=1.5, cex.lab=1.5, lwd=3)
        plot(measure, type="l", ylim=yrange, xlab="Time (hours)", ylab="PM10")
@@ -175,7 +175,8 @@ computeFilaments <- function(data, pred, index, limit=60, plot=TRUE)
                centered_series = rbind(
                        data$getCenteredSeries( pred$getParams(index)$indices ),
                        data$getCenteredSeries( pred$getParams(index)$indices+1 ) )
-               yrange = range( ref_serie, quantile(centered_series, probs=c(0.025,0.975), na.rm=TRUE) )
+               yrange = range( ref_serie,
+                       quantile(centered_series, probs=c(0.025,0.975), na.rm=TRUE) )
                par(mar=c(4.7,5,1,1), cex.axis=1.5, cex.lab=1.5, lwd=2)
                for (i in nn:1)
                {
@@ -214,7 +215,7 @@ plotFilamentsBox = function(data, fil)
        rainbow::fboxplot(series_fds, "functional", "hdr", xlab="Time (hours)", ylab="PM10",
                plotlegend=FALSE, lwd=2)
 
-       # "Magic" found at http://stackoverflow.com/questions/13842560/get-xlim-from-a-plot-in-r
+       # "Magic": http://stackoverflow.com/questions/13842560/get-xlim-from-a-plot-in-r
        usr <- par("usr")
        yr <- (usr[4] - usr[3]) / 27
        par(new=TRUE)
@@ -237,7 +238,9 @@ plotRelVar = function(data, fil)
        ref_var = c( apply(data$getSeries(fil$neighb_indices),1,sd),
                apply(data$getSeries(fil$neighb_indices+1),1,sd) )
        fdays = getNoNA2(data, 1, fil$index-1)
-       global_var = c( apply(data$getSeries(fdays),1,sd), apply(data$getSeries(fdays+1),1,sd) )
+       global_var = c(
+               apply(data$getSeries(fdays),1,sd),
+               apply(data$getSeries(fdays+1),1,sd) )
 
        yrange = range(ref_var, global_var)
        par(mar=c(4.7,5,1,1), cex.axis=1.5, cex.lab=1.5)
index 3649f58..bbf1387 100644 (file)
@@ -16,7 +16,8 @@ dateIndexToInteger = function(index, data)
 
        if (inherits(index, "Date") || is.character(index))
        {
-               tryCatch(indexAsDate <- as.Date(index), error=function(e) stop("Unrecognized index format"))
+               tryCatch(indexAsDate <- as.Date(index),
+                       error=function(e) stop("Unrecognized index format"))
                #TODO: tz arg to difftime ?
                integerIndex <- round( as.numeric(
                        difftime(indexAsDate, as.Date(data$getTime(1)[1])) ) ) + 1