From: Benjamin Auder <benjamin.auder@somewhere>
Date: Wed, 5 Apr 2017 00:03:14 +0000 (+0200)
Subject: add realtime option, slightly refactor data acquisition
X-Git-Url: https://git.auder.net/variants/Cwda/doc/html/packages.html?a=commitdiff_plain;h=72b9c50162bcdcf6c99fbb8b2ec6ea9ba98379cb;p=talweg.git

add realtime option, slightly refactor data acquisition
---

diff --git a/TODO b/TODO
index 7de936b..a0abec2 100644
--- a/TODO
+++ b/TODO
@@ -1,11 +1,4 @@
 s'inspirer de vim-rmarkdown pour syntaxe .gj ?
 
-script bash qui passe variables "heure début" et "horizon" à report.gj > stocke fichier _H_h,
-puis run Jupyter ...
-
 version this (talweg) as a whole; private!
 subtree: talweg-R (shared with world, R package)
-
-==> les poids ne sont pas normalisés en sortie de predictSHape de Neighbors(2)
-[ce serait mieux, ça éviterait la normalisation systématique dans J_Neighbors, et corrigerait les
-histogrammes de poids................]
diff --git a/pkg/R/Data.R b/pkg/R/Data.R
index 697da05..551cfaf 100644
--- a/pkg/R/Data.R
+++ b/pkg/R/Data.R
@@ -18,24 +18,24 @@
 #'   Return number of series in dataset.}
 #' \item{\code{getStdHorizon()}}{
 #'   Return number of time steps from serie[1] until midnight}
-#' \item{\code{append(new_time, new_centered_serie, new_level, new_exo, new_exo_hat)}}{
-#'   Acquire a new vector of lists (time, centered_serie, level, exo, exo_hat).}
+#' \item{\code{appendHat(time, hat_serie, hat_exo)}}{
+#'   New estimated data + time.}
+#' \item{\code{append(serie, exo)}}{
+#'   New measured data; call *after* \code{appendHat()}}
 #' \item{\code{getTime(index)}}{
 #'   Get times at specified index.}
-#' \item{\code{getCenteredSerie(index)}}{
-#'   Get centered serie at specified index.}
-#' \item{\code{getCenteredSeries(indices)}}{
+#' \item{\code{getCenteredSerie(index, hat=FALSE)}}{
+#'   Get (measured or predicted) centered serie at specified index.}
+#' \item{\code{getCenteredSeries(indices, hat=FALSE)}}{
 #'   Get centered series at specified indices (in columns).}
-#' \item{\code{getLevel(index)}}{
+#' \item{\code{getLevel(index, hat=FALSE)}}{
 #'   Get level at specified index.}
-#' \item{\code{getSerie(index)}}{
+#' \item{\code{getSerie(index, hat=FALSE)}}{
 #'   Get serie (centered+level) at specified index.}
-#' \item{\code{getSeries(indices)}}{
+#' \item{\code{getSeries(indices, hat=FALSE)}}{
 #'   Get series at specified indices (in columns).}
-#' \item{\code{getExo(index)}}{
+#' \item{\code{getExo(index, hat=FALSE)}}{
 #'   Get exogenous variables at specified index.}
-#' \item{\code{getExoHat(index)}}{
-#'   Get estimated exogenous variables at specified index.}
 #' }
 #'
 Data = R6::R6Class("Data",
@@ -49,47 +49,64 @@ Data = R6::R6Class("Data",
 		getStdHorizon = function()
 			24 - as.POSIXlt( private$.data[[1]]$time[1] )$hour + 1
 		,
-		append = function(new_time, new_centered_serie, new_level, new_exo, new_exo_hat)
+		appendHat = function(time, hat_serie, hat_exo)
 		{
+			hat_level = mean(hat_serie, na.rm=TRUE)
+			hat_centered_serie = hat_serie - hat_level
 			private$.data[[length(private$.data)+1]] <- list(
-				"time"=new_time, "centered_serie"=new_centered_serie, "level"=new_level,
-				"exo"=new_exo, "exo_hat"=new_exo_hat)
+				"time"=time, "hat_centered_serie"=hat_centered_serie,
+				"hat_level"=hat_level, "hat_exo"=hat_exo )
+		},
+		append = function(serie, exo)
+		{
+			level = mean(serie, na.rm=TRUE)
+			centered_serie = serie - level
+			private$.data[[length(private$.data)]]$centered_serie <- centered_serie,
+			private$.data[[length(private$.data)]]$level <- level,
+			private$.data[[length(private$.data)]]$exo <- exo,
 		},
 		getTime = function(index)
 		{
 			index = dateIndexToInteger(index, self)
 			private$.data[[index]]$time
 		},
-		getCenteredSerie = function(index)
+		getCenteredSerie = function(index, hat=FALSE)
 		{
 			index = dateIndexToInteger(index, self)
-			private$.data[[index]]$centered_serie
+			if (hat)
+				private$.data[[index]]$hat_centered_serie
+			else
+				private$.data[[index]]$centered_serie
 		},
-		getCenteredSeries = function(indices)
-			sapply(indices, function(i) self$getCenteredSerie(i))
+		getCenteredSeries = function(indices, hat=FALSE)
+			sapply(indices, function(i) self$getCenteredSerie(i, hat))
 		,
-		getLevel = function(index)
+		getLevel = function(index, hat=FALSE)
 		{
 			index = dateIndexToInteger(index, self)
-			private$.data[[index]]$level
+			if (hat)
+				private$.data[[index]]$hat_level
+			else
+				private$.data[[index]]$level
 		},
-		getSerie = function(index)
+		getSerie = function(index, hat=FALSE)
 		{
 			index = dateIndexToInteger(index, self)
-			private$.data[[index]]$centered_serie + private$.data[[index]]$level
+			if (hat)
+				private$.data[[index]]$hat_centered_serie + private$.data[[index]]$hat_level
+			else
+				private$.data[[index]]$centered_serie + private$.data[[index]]$level
 		},
-		getSeries = function(indices)
-			sapply(indices, function(i) self$getSerie(i))
+		getSeries = function(indices, hat=FALSE)
+				sapply(indices, function(i) self$getSerie(i, hat))
 		,
-		getExo = function(index)
-		{
-			index = dateIndexToInteger(index, self)
-			private$.data[[index]]$exo
-		},
-		getExoHat = function(index)
+		getExo = function(index, hat=FALSE)
 		{
 			index = dateIndexToInteger(index, self)
-			private$.data[[index]]$exo_hat
+			if (hat)
+				private$.data[[index]]$hat_exo
+			else
+				private$.data[[index]]$exo
 		},
 		removeFirst = function()
 			private$.data <- private$.data[2:length(private$.data)]
diff --git a/pkg/R/F_Persistence.R b/pkg/R/F_Persistence.R
index 1d9fd19..79cfe3c 100644
--- a/pkg/R/F_Persistence.R
+++ b/pkg/R/F_Persistence.R
@@ -14,12 +14,14 @@ PersistenceForecaster = R6::R6Class("PersistenceForecaster",
 			# Return centered last (similar) day curve, avoiding NAs until memory is run
 			first_day = max(1, today-memory)
 			same_day = ifelse(hasArg("same_day"), list(...)$same_day, TRUE)
+			realtime = ifelse(hasArg("realtime"), list(...)$realtime, FALSE)
 			# If 'same_day', get the last known future of similar day: -7 + 1 == -6
 			index = today - ifelse(same_day,6,0)
 			repeat
 			{
 				{
-					last_serie = data$getCenteredSerie(index)[1:horizon]
+					last_serie =
+						data$getCenteredSerie(index,hat=(index==today && realtime))[1:horizon]
 					index = index - ifelse(same_day,7,1)
 				};
 				if (!any(is.na(last_serie)))
diff --git a/pkg/R/Forecast.R b/pkg/R/Forecast.R
index c64d24f..a0ddcda 100644
--- a/pkg/R/Forecast.R
+++ b/pkg/R/Forecast.R
@@ -17,12 +17,12 @@
 #'   Initialize a Forecast object with a series of date indices.}
 #' \item{\code{getSize()}}{
 #'   Return number of individual forecasts.}
-#' \item{\code{append(new_serie, new_params, new_index_in_data)}}{
-#'   Acquire a new individual forecast, with its (optimized) parameters and the corresponding
-#'   index in the dataset.}
+#' \item{\code{append(forecast, params, index_in_data)}}{
+#'   Acquire an individual forecast, with its (optimized) parameters and the
+#'   corresponding index in the dataset.}
 #' \item{\code{getDates()}}{
 #'   Get dates where forecast occurs.}
-#' \item{\code{getSerie(index)}}{
+#' \item{\code{getForecast(index)}}{
 #'   Get forecasted serie at specified index.}
 #' \item{\code{getParams(index)}}{
 #'   Get parameters at specified index (for 'Neighbors' method).}
@@ -44,19 +44,19 @@ Forecast = R6::R6Class("Forecast",
 		getSize = function()
 			length(private$.pred)
 		,
-		append = function(new_serie, new_params, new_index_in_data)
+		append = function(forecast, params, index_in_data)
 		{
 			private$.pred[[length(private$.pred)+1]] <-
-				list("serie"=new_serie, "params"=new_params, "index_in_data"=new_index_in_data)
+				list("forecast"=forecast, "params"=params, "index_in_data"=index_in_data)
 		},
 		getDates = function()
 			as.Date( private$.dates, origin="1970-01-01" )
 		,
-		getSerie = function(index)
+		getForecast = function(index)
 		{
 			if (is(index,"Date"))
 				index = match(index, private$.dates)
-			private$.pred[[index]]$serie
+			private$.pred[[index]]$forecast
 		},
 		getParams = function(index)
 		{
diff --git a/pkg/R/J_Neighbors.R b/pkg/R/J_Neighbors.R
index 7ca0003..351fbf9 100644
--- a/pkg/R/J_Neighbors.R
+++ b/pkg/R/J_Neighbors.R
@@ -9,12 +9,13 @@ getNeighborsJumpPredict = function(data, today, memory, horizon, params, ...)
 	filter = (params$indices >= first_day)
 	indices = params$indices[filter]
 	weights = params$weights[filter]
+	realtime = ifelse(hasArg("realtime"), list(...)$realtime, FALSE)
 
 	if (any(is.na(weights) | is.na(indices)))
 		return (NA)
 
 	gaps = sapply(indices, function(i) {
-		data$getSerie(i+1)[1] - tail(data$getSerie(i), 1)
+		data$getSerie(i+1,hat=(realtime && i+1==today))[1] - tail(data$getSerie(i), 1)
 	})
 	scal_product = weights * gaps
 	norm_fact = sum( weights[!is.na(scal_product)] )
diff --git a/pkg/R/J_Persistence.R b/pkg/R/J_Persistence.R
index 5d156cc..4d3abd2 100644
--- a/pkg/R/J_Persistence.R
+++ b/pkg/R/J_Persistence.R
@@ -8,12 +8,13 @@ getPersistenceJumpPredict = function(data, today, memory, horizon, params, ...)
 	#return gap between end of similar day curve and first day of tomorrow (in the past)
 	first_day = max(1, today-memory)
 	same_day = ifelse(hasArg("same_day"), list(...)$same_day, TRUE)
+	realtime = ifelse(hasArg("realtime"), list(...)$realtime, FALSE)
 	index = today - ifelse(same_day,7,1)
 	repeat
 	{
 		{
 			last_serie_end = tail( data$getSerie(index), 1)
-			last_tomorrow_begin = data$getSerie(index+1)[1]
+			last_tomorrow_begin = data$getSerie(index+1,hat=(realtime && index+1==today))[1]
 			index = index - ifelse(same_day,7,1)
 		};
 		if (!is.na(last_serie_end) && !is.na(last_tomorrow_begin))
diff --git a/pkg/R/computeError.R b/pkg/R/computeError.R
index b57e607..ce05fdb 100644
--- a/pkg/R/computeError.R
+++ b/pkg/R/computeError.R
@@ -3,13 +3,13 @@
 #' Obtain the errors between forecast and data
 #'
 #' @param data Dataset, object of class \code{Data} output of \code{getData}
-#' @param forecast Forecast object, class \code{Forecast} output of \code{computeForecast}
+#' @param pred Forecast object, class \code{Forecast} output of \code{computeForecast}
 #' @param horizon Horizon where to compute the error (<= horizon used in \code{computeForecast})
 #'
 #' @return A list (abs,MAPE) of lists (day,indices)
 #'
 #' @export
-computeError = function(data, forecast, horizon=data$getStdHorizon())
+computeError = function(data, pred, horizon=data$getStdHorizon())
 {
 	L = forecast$getSize()
 	mape_day = rep(0, horizon)
@@ -22,15 +22,15 @@ computeError = function(data, forecast, horizon=data$getStdHorizon())
 	{
 		index = forecast$getIndexInData(i)
 		serie = data$getSerie(index+1)[1:horizon]
-		pred = forecast$getSerie(i)[1:horizon]
-		if (!any(is.na(serie)) && !any(is.na(pred)))
+		forecast = pred$getForecast(i)[1:horizon]
+		if (!any(is.na(serie)) && !any(is.na(forecast)))
 		{
 			nb_no_NA_data = nb_no_NA_data + 1
-			mape_increment = abs(serie - pred) / serie
+			mape_increment = abs(serie - forecast) / serie
 			mape_increment[is.nan(mape_increment)] = 0. # 0 / 0
 			mape_increment[!is.finite(mape_increment)] = 1. # >0 / 0
 			mape_day = mape_day + mape_increment
-			abs_increment = abs(serie - pred)
+			abs_increment = abs(serie - forecast)
 			abs_day = abs_day + abs_increment
 			mape_indices[i] = mean(mape_increment)
 			abs_indices[i] = mean(abs_increment)
diff --git a/pkg/R/computeForecast.R b/pkg/R/computeForecast.R
index 24f114e..198f6ec 100644
--- a/pkg/R/computeForecast.R
+++ b/pkg/R/computeForecast.R
@@ -20,7 +20,8 @@
 #' @param memory Data depth (in days) to be used for prediction
 #' @param horizon Number of time steps to predict
 #' @param ncores Number of cores for parallel execution (1 to disable)
-#' @param ... Additional parameters for the forecasting models
+#' @param ... Additional parameters for the forecasting models;
+#'   In particular, realtime=TRUE to use predictions instead of measurements
 #'
 #' @return An object of class Forecast
 #'
@@ -43,7 +44,7 @@ computeForecast = function(data, indices, forecaster, pjump,
 {
 	# (basic) Arguments sanity checks
 	horizon = as.integer(horizon)[1]
-	if (horizon<=0 || horizon>length(data$getCenteredSerie(2)))
+	if (horizon<=0 || horizon>length(data$getCenteredSerie(1)))
 		stop("Horizon too short or too long")
 	integer_indices = sapply(indices, function(i) dateIndexToInteger(i,data))
 	if (any(integer_indices<=0 | integer_indices>data$getSize()))
@@ -52,7 +53,8 @@ computeForecast = function(data, indices, forecaster, pjump,
 		stop("forecaster (name) and pjump (function) should be of class character")
 
 	pred = Forecast$new( sapply(indices, function(i) integerIndexToDate(i,data)) )
-	forecaster_class_name = getFromNamespace(paste(forecaster,"Forecaster",sep=""), "talweg")
+	forecaster_class_name = getFromNamespace(
+		paste(forecaster,"Forecaster",sep=""), "talweg")
 	forecaster = forecaster_class_name$new( #.pjump =
 		getFromNamespace(paste("get",pjump,"JumpPredict",sep=""), "talweg"))
 
@@ -60,7 +62,8 @@ computeForecast = function(data, indices, forecaster, pjump,
 	{
 		p <- parallel::mclapply(seq_along(integer_indices), function(i) {
 			list(
-				"forecast" = forecaster$predictSerie(data, integer_indices[i], memory, horizon, ...),
+				"forecast" = forecaster$predictSerie(
+					data, integer_indices[i], memory, horizon, ...),
 				"params"= forecaster$getParameters(),
 				"index" = integer_indices[i] )
 			}, mc.cores=ncores)
@@ -69,7 +72,8 @@ computeForecast = function(data, indices, forecaster, pjump,
 	{
 		p <- lapply(seq_along(integer_indices), function(i) {
 			list(
-				"forecast" = forecaster$predictSerie(data, integer_indices[i], memory, horizon, ...),
+				"forecast" = forecaster$predictSerie(
+					data, integer_indices[i], memory, horizon, ...),
 				"params"= forecaster$getParameters(),
 				"index" = integer_indices[i] )
 			})
@@ -79,9 +83,9 @@ computeForecast = function(data, indices, forecaster, pjump,
 	for (i in seq_along(integer_indices))
 	{
 		pred$append(
-			new_serie = p[[i]]$forecast,
-			new_params = p[[i]]$params,
-			new_index_in_data = p[[i]]$index
+			forecast = p[[i]]$forecast,
+			params = p[[i]]$params,
+			index_in_data = p[[i]]$index
 		)
 	}
 	pred
diff --git a/pkg/R/getData.R b/pkg/R/getData.R
index e13cf86..b944dfb 100644
--- a/pkg/R/getData.R
+++ b/pkg/R/getData.R
@@ -1,13 +1,14 @@
 #' @title Acquire data in a clean format
 #'
-#' @description Take in input data frames and/or files containing raw data, and timezones, and
-#'   output a Data object, roughly corresponding to a list where each cell contains all value
-#'   for one day (see \code{?Data}).
+#' @description Take in input data frames and/or files containing raw data, and
+#'   timezones, and output a Data object, roughly corresponding to a list where each cell
+#'   contains all value for one day (see \code{?Data}).
 #'
 #' @param ts_data Time-series, as a data frame (DB style: 2 columns, first is date/time,
 #'   second is value) or a CSV file
-#' @param exo_data Exogenous variables, as a data frame or a CSV file; first comlumn is dates,
-#'   next block are measurements for the day, and final block are exogenous forecasts
+#' @param exo_data Exogenous variables, as a data frame or a CSV file; first comlumn is
+#'   dates, next block are measurements for the day, and final block are exogenous
+#'   forecasts
 #' @param input_tz Timezone in the input files ("GMT" or e.g. "Europe/Paris")
 #' @param date_format How date/time are stored (e.g. year/month/day hour:minutes;
 #'   see \code{strptime})
@@ -51,7 +52,8 @@ getData = function(ts_data, exo_data, input_tz="GMT", date_format="%d/%m/%Y %H:%
 			ts_data
 	# Convert to the desired timezone (usually "GMT" or "Europe/Paris")
 	formatted_dates_POSIXlt = strptime(as.character(ts_df[,1]), date_format, tz=input_tz)
-	ts_df[,1] = format(as.POSIXct(formatted_dates_POSIXlt, tz=input_tz), tz=working_tz, usetz=TRUE)
+	ts_df[,1] = format(
+		as.POSIXct(formatted_dates_POSIXlt, tz=input_tz), tz=working_tz, usetz=TRUE)
 
 	exo_df =
 		if (is.character(exo_data))
@@ -69,22 +71,26 @@ getData = function(ts_data, exo_data, input_tz="GMT", date_format="%d/%m/%Y %H:%
 	{
 		time = c()
 		serie = c()
+		hat_serie = c()
 		repeat
 		{
 			{
 				time = c(time, ts_df[line,1])
+				hat_serie = c(serie, ts_df[line,3])
 				serie = c(serie, ts_df[line,2])
 				line = line + 1
 			};
-			if (line >= nb_lines + 1 || as.POSIXlt(ts_df[line-1,1],tz=working_tz)$hour == predict_at)
+			if (line >= nb_lines + 1
+				|| as.POSIXlt(ts_df[line-1,1],tz=working_tz)$hour == predict_at)
+			{
 				break
+			}
 		}
 
+		hat_exo = as.data.frame( exo_df[i,(1+nb_exos+1):(1+2*nb_exos)] )
 		exo = as.data.frame( exo_df[i,2:(1+nb_exos)] )
-		exo_hat = as.data.frame( exo_df[i,(1+nb_exos+1):(1+2*nb_exos)] )
-		level = mean(serie, na.rm=TRUE)
-		centered_serie = serie - level
-		data$append(time, centered_serie, level, exo, exo_hat)
+		data$appendHat(time, hat_serie, hat_exo)
+		data$append(serie, exo) #in realtime, this call comes hours later
 		if (i >= limit)
 			break
 		i = i + 1
diff --git a/pkg/R/plot.R b/pkg/R/plot.R
index 52b077b..5ea4843 100644
--- a/pkg/R/plot.R
+++ b/pkg/R/plot.R
@@ -28,8 +28,8 @@ plotCurves <- function(data, indices=seq_len(data$getSize()))
 #' @param cols Colors for each error (default: 1,2,3,...)
 #'
 #' @seealso \code{\link{plotCurves}}, \code{\link{plotPredReal}},
-#'   \code{\link{plotSimils}}, \code{\link{plotFbox}},
-#'   \code{\link{computeFilaments}, }\code{\link{plotFilamentsBox}}, \code{\link{plotRelVar}}
+#'   \code{\link{plotSimils}}, \code{\link{plotFbox}}, \code{\link{computeFilaments}},
+#'   \code{\link{plotFilamentsBox}}, \code{\link{plotRelVar}}
 #'
 #' @export
 plotError <- function(err, cols=seq_along(err))
@@ -83,9 +83,9 @@ plotError <- function(err, cols=seq_along(err))
 #' @export
 plotPredReal <- function(data, pred, index)
 {
-	horizon = length(pred$getSerie(1))
+	horizon = length(pred$getForecast(1))
 	measure = data$getSerie( pred$getIndexInData(index)+1 )[1:horizon]
-	prediction = pred$getSerie(index)
+	prediction = pred$getForecast(index)
 	yrange = range(measure, prediction)
 	par(mar=c(4.7,5,1,1), cex.axis=1.5, cex.lab=1.5, lwd=3)
 	plot(measure, type="l", ylim=yrange, xlab="Time (hours)", ylab="PM10")
@@ -175,7 +175,8 @@ computeFilaments <- function(data, pred, index, limit=60, plot=TRUE)
 		centered_series = rbind(
 			data$getCenteredSeries( pred$getParams(index)$indices ),
 			data$getCenteredSeries( pred$getParams(index)$indices+1 ) )
-		yrange = range( ref_serie, quantile(centered_series, probs=c(0.025,0.975), na.rm=TRUE) )
+		yrange = range( ref_serie,
+			quantile(centered_series, probs=c(0.025,0.975), na.rm=TRUE) )
 		par(mar=c(4.7,5,1,1), cex.axis=1.5, cex.lab=1.5, lwd=2)
 		for (i in nn:1)
 		{
@@ -214,7 +215,7 @@ plotFilamentsBox = function(data, fil)
 	rainbow::fboxplot(series_fds, "functional", "hdr", xlab="Time (hours)", ylab="PM10",
 		plotlegend=FALSE, lwd=2)
 
-	# "Magic" found at http://stackoverflow.com/questions/13842560/get-xlim-from-a-plot-in-r
+	# "Magic": http://stackoverflow.com/questions/13842560/get-xlim-from-a-plot-in-r
 	usr <- par("usr")
 	yr <- (usr[4] - usr[3]) / 27
 	par(new=TRUE)
@@ -237,7 +238,9 @@ plotRelVar = function(data, fil)
 	ref_var = c( apply(data$getSeries(fil$neighb_indices),1,sd),
 		apply(data$getSeries(fil$neighb_indices+1),1,sd) )
 	fdays = getNoNA2(data, 1, fil$index-1)
-	global_var = c( apply(data$getSeries(fdays),1,sd), apply(data$getSeries(fdays+1),1,sd) )
+	global_var = c(
+		apply(data$getSeries(fdays),1,sd),
+		apply(data$getSeries(fdays+1),1,sd) )
 
 	yrange = range(ref_var, global_var)
 	par(mar=c(4.7,5,1,1), cex.axis=1.5, cex.lab=1.5)
diff --git a/pkg/R/utils.R b/pkg/R/utils.R
index 3649f58..bbf1387 100644
--- a/pkg/R/utils.R
+++ b/pkg/R/utils.R
@@ -16,7 +16,8 @@ dateIndexToInteger = function(index, data)
 
 	if (inherits(index, "Date") || is.character(index))
 	{
-		tryCatch(indexAsDate <- as.Date(index), error=function(e) stop("Unrecognized index format"))
+		tryCatch(indexAsDate <- as.Date(index),
+			error=function(e) stop("Unrecognized index format"))
 		#TODO: tz arg to difftime ?
 		integerIndex <- round( as.numeric(
 			difftime(indexAsDate, as.Date(data$getTime(1)[1])) ) ) + 1