这篇文章,我们首先使用代码生成时间序列的数据,然后使用统计学(移动平均)方法来构建模型。
首先,导入库,并定义画时间序列的函数。
1 2 3 4 5 6 7 8 9 10 11 12 13 import numpy as np import matplotlib.pyplot as plt import tensorflow as tf from tensorflow import keras # 画时间序列的函数,从 start 到 end def plot_series(time, series, format="-", start=0, end=None, label=None): plt.plot(time[start:end], series[start:end], format, label=label) plt.xlabel("Time") plt.ylabel("Value") if label: plt.legend(fontsize=14) plt.grid(True)
定义趋势函数,其实就是计算 y 值的函数:
1 2 3 # slope 表示斜率 def trend(time, slope=0): return slope * time
创建时间序列,并进行可视化:
1 2 3 4 5 6 7 time = np.arange(4 * 365 + 1) baseline = 10 series = trend(time, 0.1) plt.figure(figsize=(10, 6)) plot_series(time, series) plt.show()
定义周期的函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 # season_time 的范围是 [0,1],表示当前处于这个周期的哪个点 def seasonal_pattern(season_time): """Just an arbitrary pattern, you can change it if you wish""" return np.where(season_time < 0.4, np.cos(season_time * 2 * np.pi), 1 / np.exp(3 * season_time)) # period 表示周期长度 def seasonality(time, period, amplitude=1, phase=0): """Repeats the same pattern at each period""" # 计算这个点处于周期的哪个点 season_time = ((time + phase) % period) / period return amplitude * seasonal_pattern(season_time)
将这个周期可视化
1 2 3 4 5 6 7 baseline = 10 amplitude = 40 series = seasonality(time, period=365, amplitude=amplitude) plt.figure(figsize=(10, 6)) plot_series(time, series) plt.show()
现在,我们可以创建一个带有周期和趋势的时间序列:
1 2 3 4 5 6 slope = 0.05 series = baseline + trend(time, slope) + seasonality(time, period=365, amplitude=amplitude) plt.figure(figsize=(10, 6)) plot_series(time, series) plt.show()
显示如下:
定义创建噪声的函数,会返回和 time
一样长的噪声。
1 2 3 4 5 def white_noise(time, noise_level=1, seed=None): # 设置种子 rnd = np.random.RandomState(seed) # return rnd.randn(len(time)) * noise_level
生成噪声,并进行可视化
1 2 3 4 5 6 noise_level = 5 noise = white_noise(time, noise_level, seed=42) plt.figure(figsize=(10, 6)) plot_series(time, noise) plt.show()
将噪声添加到数据中,进行可视化:
1 2 3 4 5 series += noise plt.figure(figsize=(10, 6)) plot_series(time, series) plt.show()
拆分训练集和验证集:
1 2 3 4 5 6 7 8 9 10 11 12 split_time = 1000 time_train = time[:split_time] x_train = series[:split_time] time_valid = time[split_time:] x_valid = series[split_time:] plt.figure(figsize=(10, 6)) plot_series(time_train, x_train) plt.show() plt.figure(figsize=(10, 6)) plot_series(time_valid, x_valid) plt.show()
对于验证集,每个时刻的预测值直接取前一个时刻的值。
1 2 3 4 5 6 7 8 9 10 11 # 每个时刻的预测值直接取前一个时刻的值 naive_forecast = series[split_time - 1:-1] plt.figure(figsize=(10, 6)) plot_series(time_valid, x_valid) plot_series(time_valid, naive_forecast) plt.figure(figsize=(10, 6)) # 输入数据 plot_series(time_valid, x_valid, start=0, end=150) # 标签是下一个时刻的值,因此加 1 plot_series(time_valid, naive_forecast, start=1, end=151)
如下图所示,一种绿色是真实值,黄色是预测值。
下面计算 MSE 和MAE。
1 2 print(keras.metrics.mean_squared_error(x_valid, naive_forecast).numpy()) print(keras.metrics.mean_absolute_error(x_valid, naive_forecast).numpy())
输出如下:
下面使用移动平均方法来计算每个时刻的预测值。
得到的 forecast
会比传进来的 series
少 window_size
个元素。
1 2 3 4 5 6 7 def moving_average_forecast(series, window_size): """Forecasts the mean of the last few values. If window_size=1, then this is equivalent to naive forecast""" forecast = [] for time in range(len(series) - window_size): forecast.append(series[time:time + window_size].mean()) return np.array(forecast)
这里进行计算后,取 [split_time - 30:]
,这是因为计算得到结果比原有的时间序列少 30 个元素。因此,取验证集对应的预测值,需要 [split_time - 30:]
。
1 2 3 4 5 moving_avg = moving_average_forecast(series, 30)[split_time - 30:] plt.figure(figsize=(10, 6)) plot_series(time_valid, x_valid) plot_series(time_valid, moving_avg)
计算 MSE 和 MAE 分别如下:
1 2 print(keras.metrics.mean_squared_error(x_valid, moving_avg).numpy()) print(keras.metrics.mean_absolute_error(x_valid, moving_avg).numpy())
比之前的预测更差了,这是因为移动平均方法没有利用周期性和趋势性。现在,让我们去除周期性的影响。由于我们上面构造数据时,使用的周期是 365,因此下面去掉周期性,剩下随机噪声。
注意,diff_series
比 series
少 365 个元素。
1 2 3 4 5 6 7 8 diff_series = (series[365:] - series[:-365]) diff_time = time[365:] # 也可以使用 diff_time = time[:-365] plt.figure(figsize=(10, 6)) plot_series(diff_time, diff_series) plt.show()
下面计算这个噪声的移动平均,由于 diff_moving_avg
比 diff_series
少 50 个元素,而 diff_series
比 series
少 50 个元素,因此 diff_moving_avg
比 series
少了 365 + 50 个元素。
1 2 3 4 5 6 7 8 diff_moving_avg 比 diff_series 少 50 个元素,因此比 series 少了 365 + 50 个元素 diff_moving_avg = moving_average_forecast(diff_series, 50)[split_time - 365 - 50:] plt.figure(figsize=(10, 6)) # 每个时间点,都减去 365 个点之前的数,因此这里减 365 plot_series(time_valid, diff_series[split_time - 365:]) plot_series(time_valid, diff_moving_avg) plt.show()
现在,把计算好的噪声预测值添加到原来的预测值中。
1 2 3 4 5 6 7 # 由于计算的是每个时间点与之前 365 个点的差距,因此这里使用 series[split_time - 365:-365] diff_moving_avg_plus_past = series[split_time - 365:-365] + diff_moving_avg plt.figure(figsize=(10, 6)) plot_series(time_valid, x_valid) plot_series(time_valid, diff_moving_avg_plus_past) plt.show()
计算 MSE 和 MAE 如下:
1 2 print(keras.metrics.mean_squared_error(x_valid, diff_moving_avg_plus_past).numpy()) print(keras.metrics.mean_absolute_error(x_valid, diff_moving_avg_plus_past).numpy())
MAE 还是有点大,我们尝试把窗口改为 10,减少以前数据的影响,并使用 365 天前的数据,加上噪声的移动平均。
注意,diff_moving_avg_plus_smooth_past = moving_average_forecast(series, 10)[split_time - 370:-360] + diff_moving_avg
与 diff_moving_avg_plus_smooth_past = moving_average_forecast(series[split_time - 370:-360], 10) + diff_moving_avg
的作用是一样的。
1 2 3 4 5 6 7 8 # 这句代码与下面等价:diff_moving_avg_plus_smooth_past = moving_average_forecast(series, 10)[split_time - 370:-360] + diff_moving_avg diff_moving_avg_plus_smooth_past = moving_average_forecast(series[split_time - 370:-360], 10) + diff_moving_avg plt.figure(figsize=(10, 6)) plot_series(time_valid, x_valid) plot_series(time_valid, diff_moving_avg_plus_smooth_past) plt.show()
计算 MSE 和 MAE 如下:
1 2 print(keras.metrics.mean_squared_error(x_valid, diff_moving_avg_plus_smooth_past).numpy()) print(keras.metrics.mean_absolute_error(x_valid, diff_moving_avg_plus_smooth_past).numpy())
可以看到,这时 MAE 已经很小了。