From: Benjamin Auder Date: Wed, 5 Apr 2017 00:03:14 +0000 (+0200) Subject: add realtime option, slightly refactor data acquisition X-Git-Url: https://git.auder.net/variants/Dynamo/current/doc/html/up.jpg?a=commitdiff_plain;h=72b9c50162bcdcf6c99fbb8b2ec6ea9ba98379cb;p=talweg.git add realtime option, slightly refactor data acquisition --- diff --git a/TODO b/TODO index 7de936b..a0abec2 100644 --- a/TODO +++ b/TODO @@ -1,11 +1,4 @@ s'inspirer de vim-rmarkdown pour syntaxe .gj ? -script bash qui passe variables "heure début" et "horizon" à report.gj > stocke fichier _H_h, -puis run Jupyter ... - version this (talweg) as a whole; private! subtree: talweg-R (shared with world, R package) - -==> les poids ne sont pas normalisés en sortie de predictSHape de Neighbors(2) -[ce serait mieux, ça éviterait la normalisation systématique dans J_Neighbors, et corrigerait les -histogrammes de poids................] diff --git a/pkg/R/Data.R b/pkg/R/Data.R index 697da05..551cfaf 100644 --- a/pkg/R/Data.R +++ b/pkg/R/Data.R @@ -18,24 +18,24 @@ #' Return number of series in dataset.} #' \item{\code{getStdHorizon()}}{ #' Return number of time steps from serie[1] until midnight} -#' \item{\code{append(new_time, new_centered_serie, new_level, new_exo, new_exo_hat)}}{ -#' Acquire a new vector of lists (time, centered_serie, level, exo, exo_hat).} +#' \item{\code{appendHat(time, hat_serie, hat_exo)}}{ +#' New estimated data + time.} +#' \item{\code{append(serie, exo)}}{ +#' New measured data; call *after* \code{appendHat()}} #' \item{\code{getTime(index)}}{ #' Get times at specified index.} -#' \item{\code{getCenteredSerie(index)}}{ -#' Get centered serie at specified index.} -#' \item{\code{getCenteredSeries(indices)}}{ +#' \item{\code{getCenteredSerie(index, hat=FALSE)}}{ +#' Get (measured or predicted) centered serie at specified index.} +#' \item{\code{getCenteredSeries(indices, hat=FALSE)}}{ #' Get centered series at specified indices (in columns).} -#' \item{\code{getLevel(index)}}{ +#' \item{\code{getLevel(index, hat=FALSE)}}{ #' Get level at specified index.} -#' \item{\code{getSerie(index)}}{ +#' \item{\code{getSerie(index, hat=FALSE)}}{ #' Get serie (centered+level) at specified index.} -#' \item{\code{getSeries(indices)}}{ +#' \item{\code{getSeries(indices, hat=FALSE)}}{ #' Get series at specified indices (in columns).} -#' \item{\code{getExo(index)}}{ +#' \item{\code{getExo(index, hat=FALSE)}}{ #' Get exogenous variables at specified index.} -#' \item{\code{getExoHat(index)}}{ -#' Get estimated exogenous variables at specified index.} #' } #' Data = R6::R6Class("Data", @@ -49,47 +49,64 @@ Data = R6::R6Class("Data", getStdHorizon = function() 24 - as.POSIXlt( private$.data[[1]]$time[1] )$hour + 1 , - append = function(new_time, new_centered_serie, new_level, new_exo, new_exo_hat) + appendHat = function(time, hat_serie, hat_exo) { + hat_level = mean(hat_serie, na.rm=TRUE) + hat_centered_serie = hat_serie - hat_level private$.data[[length(private$.data)+1]] <- list( - "time"=new_time, "centered_serie"=new_centered_serie, "level"=new_level, - "exo"=new_exo, "exo_hat"=new_exo_hat) + "time"=time, "hat_centered_serie"=hat_centered_serie, + "hat_level"=hat_level, "hat_exo"=hat_exo ) + }, + append = function(serie, exo) + { + level = mean(serie, na.rm=TRUE) + centered_serie = serie - level + private$.data[[length(private$.data)]]$centered_serie <- centered_serie, + private$.data[[length(private$.data)]]$level <- level, + private$.data[[length(private$.data)]]$exo <- exo, }, getTime = function(index) { index = dateIndexToInteger(index, self) private$.data[[index]]$time }, - getCenteredSerie = function(index) + getCenteredSerie = function(index, hat=FALSE) { index = dateIndexToInteger(index, self) - private$.data[[index]]$centered_serie + if (hat) + private$.data[[index]]$hat_centered_serie + else + private$.data[[index]]$centered_serie }, - getCenteredSeries = function(indices) - sapply(indices, function(i) self$getCenteredSerie(i)) + getCenteredSeries = function(indices, hat=FALSE) + sapply(indices, function(i) self$getCenteredSerie(i, hat)) , - getLevel = function(index) + getLevel = function(index, hat=FALSE) { index = dateIndexToInteger(index, self) - private$.data[[index]]$level + if (hat) + private$.data[[index]]$hat_level + else + private$.data[[index]]$level }, - getSerie = function(index) + getSerie = function(index, hat=FALSE) { index = dateIndexToInteger(index, self) - private$.data[[index]]$centered_serie + private$.data[[index]]$level + if (hat) + private$.data[[index]]$hat_centered_serie + private$.data[[index]]$hat_level + else + private$.data[[index]]$centered_serie + private$.data[[index]]$level }, - getSeries = function(indices) - sapply(indices, function(i) self$getSerie(i)) + getSeries = function(indices, hat=FALSE) + sapply(indices, function(i) self$getSerie(i, hat)) , - getExo = function(index) - { - index = dateIndexToInteger(index, self) - private$.data[[index]]$exo - }, - getExoHat = function(index) + getExo = function(index, hat=FALSE) { index = dateIndexToInteger(index, self) - private$.data[[index]]$exo_hat + if (hat) + private$.data[[index]]$hat_exo + else + private$.data[[index]]$exo }, removeFirst = function() private$.data <- private$.data[2:length(private$.data)] diff --git a/pkg/R/F_Persistence.R b/pkg/R/F_Persistence.R index 1d9fd19..79cfe3c 100644 --- a/pkg/R/F_Persistence.R +++ b/pkg/R/F_Persistence.R @@ -14,12 +14,14 @@ PersistenceForecaster = R6::R6Class("PersistenceForecaster", # Return centered last (similar) day curve, avoiding NAs until memory is run first_day = max(1, today-memory) same_day = ifelse(hasArg("same_day"), list(...)$same_day, TRUE) + realtime = ifelse(hasArg("realtime"), list(...)$realtime, FALSE) # If 'same_day', get the last known future of similar day: -7 + 1 == -6 index = today - ifelse(same_day,6,0) repeat { { - last_serie = data$getCenteredSerie(index)[1:horizon] + last_serie = + data$getCenteredSerie(index,hat=(index==today && realtime))[1:horizon] index = index - ifelse(same_day,7,1) }; if (!any(is.na(last_serie))) diff --git a/pkg/R/Forecast.R b/pkg/R/Forecast.R index c64d24f..a0ddcda 100644 --- a/pkg/R/Forecast.R +++ b/pkg/R/Forecast.R @@ -17,12 +17,12 @@ #' Initialize a Forecast object with a series of date indices.} #' \item{\code{getSize()}}{ #' Return number of individual forecasts.} -#' \item{\code{append(new_serie, new_params, new_index_in_data)}}{ -#' Acquire a new individual forecast, with its (optimized) parameters and the corresponding -#' index in the dataset.} +#' \item{\code{append(forecast, params, index_in_data)}}{ +#' Acquire an individual forecast, with its (optimized) parameters and the +#' corresponding index in the dataset.} #' \item{\code{getDates()}}{ #' Get dates where forecast occurs.} -#' \item{\code{getSerie(index)}}{ +#' \item{\code{getForecast(index)}}{ #' Get forecasted serie at specified index.} #' \item{\code{getParams(index)}}{ #' Get parameters at specified index (for 'Neighbors' method).} @@ -44,19 +44,19 @@ Forecast = R6::R6Class("Forecast", getSize = function() length(private$.pred) , - append = function(new_serie, new_params, new_index_in_data) + append = function(forecast, params, index_in_data) { private$.pred[[length(private$.pred)+1]] <- - list("serie"=new_serie, "params"=new_params, "index_in_data"=new_index_in_data) + list("forecast"=forecast, "params"=params, "index_in_data"=index_in_data) }, getDates = function() as.Date( private$.dates, origin="1970-01-01" ) , - getSerie = function(index) + getForecast = function(index) { if (is(index,"Date")) index = match(index, private$.dates) - private$.pred[[index]]$serie + private$.pred[[index]]$forecast }, getParams = function(index) { diff --git a/pkg/R/J_Neighbors.R b/pkg/R/J_Neighbors.R index 7ca0003..351fbf9 100644 --- a/pkg/R/J_Neighbors.R +++ b/pkg/R/J_Neighbors.R @@ -9,12 +9,13 @@ getNeighborsJumpPredict = function(data, today, memory, horizon, params, ...) filter = (params$indices >= first_day) indices = params$indices[filter] weights = params$weights[filter] + realtime = ifelse(hasArg("realtime"), list(...)$realtime, FALSE) if (any(is.na(weights) | is.na(indices))) return (NA) gaps = sapply(indices, function(i) { - data$getSerie(i+1)[1] - tail(data$getSerie(i), 1) + data$getSerie(i+1,hat=(realtime && i+1==today))[1] - tail(data$getSerie(i), 1) }) scal_product = weights * gaps norm_fact = sum( weights[!is.na(scal_product)] ) diff --git a/pkg/R/J_Persistence.R b/pkg/R/J_Persistence.R index 5d156cc..4d3abd2 100644 --- a/pkg/R/J_Persistence.R +++ b/pkg/R/J_Persistence.R @@ -8,12 +8,13 @@ getPersistenceJumpPredict = function(data, today, memory, horizon, params, ...) #return gap between end of similar day curve and first day of tomorrow (in the past) first_day = max(1, today-memory) same_day = ifelse(hasArg("same_day"), list(...)$same_day, TRUE) + realtime = ifelse(hasArg("realtime"), list(...)$realtime, FALSE) index = today - ifelse(same_day,7,1) repeat { { last_serie_end = tail( data$getSerie(index), 1) - last_tomorrow_begin = data$getSerie(index+1)[1] + last_tomorrow_begin = data$getSerie(index+1,hat=(realtime && index+1==today))[1] index = index - ifelse(same_day,7,1) }; if (!is.na(last_serie_end) && !is.na(last_tomorrow_begin)) diff --git a/pkg/R/computeError.R b/pkg/R/computeError.R index b57e607..ce05fdb 100644 --- a/pkg/R/computeError.R +++ b/pkg/R/computeError.R @@ -3,13 +3,13 @@ #' Obtain the errors between forecast and data #' #' @param data Dataset, object of class \code{Data} output of \code{getData} -#' @param forecast Forecast object, class \code{Forecast} output of \code{computeForecast} +#' @param pred Forecast object, class \code{Forecast} output of \code{computeForecast} #' @param horizon Horizon where to compute the error (<= horizon used in \code{computeForecast}) #' #' @return A list (abs,MAPE) of lists (day,indices) #' #' @export -computeError = function(data, forecast, horizon=data$getStdHorizon()) +computeError = function(data, pred, horizon=data$getStdHorizon()) { L = forecast$getSize() mape_day = rep(0, horizon) @@ -22,15 +22,15 @@ computeError = function(data, forecast, horizon=data$getStdHorizon()) { index = forecast$getIndexInData(i) serie = data$getSerie(index+1)[1:horizon] - pred = forecast$getSerie(i)[1:horizon] - if (!any(is.na(serie)) && !any(is.na(pred))) + forecast = pred$getForecast(i)[1:horizon] + if (!any(is.na(serie)) && !any(is.na(forecast))) { nb_no_NA_data = nb_no_NA_data + 1 - mape_increment = abs(serie - pred) / serie + mape_increment = abs(serie - forecast) / serie mape_increment[is.nan(mape_increment)] = 0. # 0 / 0 mape_increment[!is.finite(mape_increment)] = 1. # >0 / 0 mape_day = mape_day + mape_increment - abs_increment = abs(serie - pred) + abs_increment = abs(serie - forecast) abs_day = abs_day + abs_increment mape_indices[i] = mean(mape_increment) abs_indices[i] = mean(abs_increment) diff --git a/pkg/R/computeForecast.R b/pkg/R/computeForecast.R index 24f114e..198f6ec 100644 --- a/pkg/R/computeForecast.R +++ b/pkg/R/computeForecast.R @@ -20,7 +20,8 @@ #' @param memory Data depth (in days) to be used for prediction #' @param horizon Number of time steps to predict #' @param ncores Number of cores for parallel execution (1 to disable) -#' @param ... Additional parameters for the forecasting models +#' @param ... Additional parameters for the forecasting models; +#' In particular, realtime=TRUE to use predictions instead of measurements #' #' @return An object of class Forecast #' @@ -43,7 +44,7 @@ computeForecast = function(data, indices, forecaster, pjump, { # (basic) Arguments sanity checks horizon = as.integer(horizon)[1] - if (horizon<=0 || horizon>length(data$getCenteredSerie(2))) + if (horizon<=0 || horizon>length(data$getCenteredSerie(1))) stop("Horizon too short or too long") integer_indices = sapply(indices, function(i) dateIndexToInteger(i,data)) if (any(integer_indices<=0 | integer_indices>data$getSize())) @@ -52,7 +53,8 @@ computeForecast = function(data, indices, forecaster, pjump, stop("forecaster (name) and pjump (function) should be of class character") pred = Forecast$new( sapply(indices, function(i) integerIndexToDate(i,data)) ) - forecaster_class_name = getFromNamespace(paste(forecaster,"Forecaster",sep=""), "talweg") + forecaster_class_name = getFromNamespace( + paste(forecaster,"Forecaster",sep=""), "talweg") forecaster = forecaster_class_name$new( #.pjump = getFromNamespace(paste("get",pjump,"JumpPredict",sep=""), "talweg")) @@ -60,7 +62,8 @@ computeForecast = function(data, indices, forecaster, pjump, { p <- parallel::mclapply(seq_along(integer_indices), function(i) { list( - "forecast" = forecaster$predictSerie(data, integer_indices[i], memory, horizon, ...), + "forecast" = forecaster$predictSerie( + data, integer_indices[i], memory, horizon, ...), "params"= forecaster$getParameters(), "index" = integer_indices[i] ) }, mc.cores=ncores) @@ -69,7 +72,8 @@ computeForecast = function(data, indices, forecaster, pjump, { p <- lapply(seq_along(integer_indices), function(i) { list( - "forecast" = forecaster$predictSerie(data, integer_indices[i], memory, horizon, ...), + "forecast" = forecaster$predictSerie( + data, integer_indices[i], memory, horizon, ...), "params"= forecaster$getParameters(), "index" = integer_indices[i] ) }) @@ -79,9 +83,9 @@ computeForecast = function(data, indices, forecaster, pjump, for (i in seq_along(integer_indices)) { pred$append( - new_serie = p[[i]]$forecast, - new_params = p[[i]]$params, - new_index_in_data = p[[i]]$index + forecast = p[[i]]$forecast, + params = p[[i]]$params, + index_in_data = p[[i]]$index ) } pred diff --git a/pkg/R/getData.R b/pkg/R/getData.R index e13cf86..b944dfb 100644 --- a/pkg/R/getData.R +++ b/pkg/R/getData.R @@ -1,13 +1,14 @@ #' @title Acquire data in a clean format #' -#' @description Take in input data frames and/or files containing raw data, and timezones, and -#' output a Data object, roughly corresponding to a list where each cell contains all value -#' for one day (see \code{?Data}). +#' @description Take in input data frames and/or files containing raw data, and +#' timezones, and output a Data object, roughly corresponding to a list where each cell +#' contains all value for one day (see \code{?Data}). #' #' @param ts_data Time-series, as a data frame (DB style: 2 columns, first is date/time, #' second is value) or a CSV file -#' @param exo_data Exogenous variables, as a data frame or a CSV file; first comlumn is dates, -#' next block are measurements for the day, and final block are exogenous forecasts +#' @param exo_data Exogenous variables, as a data frame or a CSV file; first comlumn is +#' dates, next block are measurements for the day, and final block are exogenous +#' forecasts #' @param input_tz Timezone in the input files ("GMT" or e.g. "Europe/Paris") #' @param date_format How date/time are stored (e.g. year/month/day hour:minutes; #' see \code{strptime}) @@ -51,7 +52,8 @@ getData = function(ts_data, exo_data, input_tz="GMT", date_format="%d/%m/%Y %H:% ts_data # Convert to the desired timezone (usually "GMT" or "Europe/Paris") formatted_dates_POSIXlt = strptime(as.character(ts_df[,1]), date_format, tz=input_tz) - ts_df[,1] = format(as.POSIXct(formatted_dates_POSIXlt, tz=input_tz), tz=working_tz, usetz=TRUE) + ts_df[,1] = format( + as.POSIXct(formatted_dates_POSIXlt, tz=input_tz), tz=working_tz, usetz=TRUE) exo_df = if (is.character(exo_data)) @@ -69,22 +71,26 @@ getData = function(ts_data, exo_data, input_tz="GMT", date_format="%d/%m/%Y %H:% { time = c() serie = c() + hat_serie = c() repeat { { time = c(time, ts_df[line,1]) + hat_serie = c(serie, ts_df[line,3]) serie = c(serie, ts_df[line,2]) line = line + 1 }; - if (line >= nb_lines + 1 || as.POSIXlt(ts_df[line-1,1],tz=working_tz)$hour == predict_at) + if (line >= nb_lines + 1 + || as.POSIXlt(ts_df[line-1,1],tz=working_tz)$hour == predict_at) + { break + } } + hat_exo = as.data.frame( exo_df[i,(1+nb_exos+1):(1+2*nb_exos)] ) exo = as.data.frame( exo_df[i,2:(1+nb_exos)] ) - exo_hat = as.data.frame( exo_df[i,(1+nb_exos+1):(1+2*nb_exos)] ) - level = mean(serie, na.rm=TRUE) - centered_serie = serie - level - data$append(time, centered_serie, level, exo, exo_hat) + data$appendHat(time, hat_serie, hat_exo) + data$append(serie, exo) #in realtime, this call comes hours later if (i >= limit) break i = i + 1 diff --git a/pkg/R/plot.R b/pkg/R/plot.R index 52b077b..5ea4843 100644 --- a/pkg/R/plot.R +++ b/pkg/R/plot.R @@ -28,8 +28,8 @@ plotCurves <- function(data, indices=seq_len(data$getSize())) #' @param cols Colors for each error (default: 1,2,3,...) #' #' @seealso \code{\link{plotCurves}}, \code{\link{plotPredReal}}, -#' \code{\link{plotSimils}}, \code{\link{plotFbox}}, -#' \code{\link{computeFilaments}, }\code{\link{plotFilamentsBox}}, \code{\link{plotRelVar}} +#' \code{\link{plotSimils}}, \code{\link{plotFbox}}, \code{\link{computeFilaments}}, +#' \code{\link{plotFilamentsBox}}, \code{\link{plotRelVar}} #' #' @export plotError <- function(err, cols=seq_along(err)) @@ -83,9 +83,9 @@ plotError <- function(err, cols=seq_along(err)) #' @export plotPredReal <- function(data, pred, index) { - horizon = length(pred$getSerie(1)) + horizon = length(pred$getForecast(1)) measure = data$getSerie( pred$getIndexInData(index)+1 )[1:horizon] - prediction = pred$getSerie(index) + prediction = pred$getForecast(index) yrange = range(measure, prediction) par(mar=c(4.7,5,1,1), cex.axis=1.5, cex.lab=1.5, lwd=3) plot(measure, type="l", ylim=yrange, xlab="Time (hours)", ylab="PM10") @@ -175,7 +175,8 @@ computeFilaments <- function(data, pred, index, limit=60, plot=TRUE) centered_series = rbind( data$getCenteredSeries( pred$getParams(index)$indices ), data$getCenteredSeries( pred$getParams(index)$indices+1 ) ) - yrange = range( ref_serie, quantile(centered_series, probs=c(0.025,0.975), na.rm=TRUE) ) + yrange = range( ref_serie, + quantile(centered_series, probs=c(0.025,0.975), na.rm=TRUE) ) par(mar=c(4.7,5,1,1), cex.axis=1.5, cex.lab=1.5, lwd=2) for (i in nn:1) { @@ -214,7 +215,7 @@ plotFilamentsBox = function(data, fil) rainbow::fboxplot(series_fds, "functional", "hdr", xlab="Time (hours)", ylab="PM10", plotlegend=FALSE, lwd=2) - # "Magic" found at http://stackoverflow.com/questions/13842560/get-xlim-from-a-plot-in-r + # "Magic": http://stackoverflow.com/questions/13842560/get-xlim-from-a-plot-in-r usr <- par("usr") yr <- (usr[4] - usr[3]) / 27 par(new=TRUE) @@ -237,7 +238,9 @@ plotRelVar = function(data, fil) ref_var = c( apply(data$getSeries(fil$neighb_indices),1,sd), apply(data$getSeries(fil$neighb_indices+1),1,sd) ) fdays = getNoNA2(data, 1, fil$index-1) - global_var = c( apply(data$getSeries(fdays),1,sd), apply(data$getSeries(fdays+1),1,sd) ) + global_var = c( + apply(data$getSeries(fdays),1,sd), + apply(data$getSeries(fdays+1),1,sd) ) yrange = range(ref_var, global_var) par(mar=c(4.7,5,1,1), cex.axis=1.5, cex.lab=1.5) diff --git a/pkg/R/utils.R b/pkg/R/utils.R index 3649f58..bbf1387 100644 --- a/pkg/R/utils.R +++ b/pkg/R/utils.R @@ -16,7 +16,8 @@ dateIndexToInteger = function(index, data) if (inherits(index, "Date") || is.character(index)) { - tryCatch(indexAsDate <- as.Date(index), error=function(e) stop("Unrecognized index format")) + tryCatch(indexAsDate <- as.Date(index), + error=function(e) stop("Unrecognized index format")) #TODO: tz arg to difftime ? integerIndex <- round( as.numeric( difftime(indexAsDate, as.Date(data$getTime(1)[1])) ) ) + 1