summarize_ewma {sparklyr.flint} | R Documentation |
Exponential weighted moving average summarizer
Description
Compute exponential weighted moving average (EWMA) of 'column' and store results in a new column named '<column>_ewma' At time t[n], the i-th value x[i] with timestamp t[i] will have a weighted value of [weight(i, n) * x[i]], where weight(i, n) is determined by both 'alpha' and 'smoothing_duration'.
Usage
summarize_ewma(
ts_rdd,
column,
alpha = 0.05,
smoothing_duration = "1d",
time_column = "time",
convention = c("core", "legacy"),
key_columns = list()
)
Arguments
ts_rdd |
Timeseries RDD being summarized |
column |
Column to be summarized |
alpha |
A smoothing factor between 0 and 1 (default: 0.05) – a higher alpha discounts older observations faster |
smoothing_duration |
A time duration specified in string form (e.g., "1d", "1h", "15m", etc) or "constant". The weight applied to a past observation from time t[p] at time t[n] is jointly determined by 'alpha' and 'smoothing_duration'. If 'smoothing_duration' is a fixed time duration such as "1d", then weight(p, n) = (1 - alpha) ^ [(t[n] - t[p]) / smoothing_duration] If 'smoothing_duration' is "constant", then weight(p, n) = (1 - alpha) ^ (n - p) (i.e., this option assumes the difference between consecutive timestamps is equal to some constant 'diff', and 'smoothing_duration' is effectively also equal to 'diff', so that t[n] - t[p] = (n - p) * diff and weight(p, n) = (1 - alpha) ^ [(t[n] - t[p]) / smoothing_duration] = (1 - alpha) ^ [(n - p) * diff / diff] = (1 - alpha) ^ (n - p)) |
time_column |
Name of the column containing timestamps (default: "time") |
convention |
One of "core" or "legacy" (default: "core") If 'convention' is "core", then the output will be weighted sum of all observations divided by the sum of all weight coefficients (see https://github.com/twosigma/flint/blob/master/doc/ema.md#core). If 'convention' is "legacy", then the output will simply be the weighted sum of all observations, without being normalized by the sum of all weight coefficients (see https://github.com/twosigma/flint/blob/master/doc/ema.md#legacy). |
key_columns |
Optional list of columns that will form an equivalence relation associating each record with the time series it belongs to (i.e., any 2 records having equal values in those columns will be associated with the same time series, and any 2 records having differing values in those columns are considered to be from 2 separate time series and will therefore be summarized separately) By default, 'key_colums' is empty and all records are considered to be part of a single time series. |
See Also
Other summarizers:
ols_regression()
,
summarize_avg()
,
summarize_corr2()
,
summarize_corr()
,
summarize_count()
,
summarize_covar()
,
summarize_dot_product()
,
summarize_ema_half_life()
,
summarize_geometric_mean()
,
summarize_kurtosis()
,
summarize_max()
,
summarize_min()
,
summarize_nth_central_moment()
,
summarize_nth_moment()
,
summarize_product()
,
summarize_quantile()
,
summarize_skewness()
,
summarize_stddev()
,
summarize_sum()
,
summarize_var()
,
summarize_weighted_avg()
,
summarize_weighted_corr()
,
summarize_weighted_covar()
,
summarize_z_score()
Examples
library(sparklyr)
library(sparklyr.flint)
sc <- try_spark_connect(master = "local")
if (!is.null(sc)) {
price_sdf <- copy_to(
sc,
data.frame(
time = ceiling(seq(12) / 2),
price = seq(12) / 2,
id = rep(c(3L, 7L), 6)
)
)
ts <- fromSDF(price_sdf, is_sorted = TRUE, time_unit = "DAYS")
ts_ewma <- summarize_ewma(
ts,
column = "price",
smoothing_duration = "1d",
key_columns = "id"
)
} else {
message("Unable to establish a Spark connection!")
}