第七章 信号处理与时间序列
(需要统计学知识)
1 statsmodels 子库
示例代码如下
import pkgutil as pu import pydoc import statsmodels as sm # statmodels版本号 print("statmodels version", sm.__version__) def clean(astr): s = astr # remove multiple spaces s = ' '.join(s.split()) s = s.replace('=', '') return s def print_desc(prefix, pkg_path): print("pkg_path", pkg_path) ''' :param prefix: 模块名称 :param pkg_path:模块包路径 :return: ''' for pkg in pu.iter_modules(path=pkg_path): '''Yields (module_loader,模块 name,子库名 ispkg)是否''' print("pkg", pkg) name = prefix + "." + pkg[1] # 输出子库名,帮助文档信息 if pkg[2] == True: try: docstr = pydoc.plain(pydoc.render_doc(name)) docstr = clean(docstr) start = docstr.find("DESCRIPTION") docstr = docstr[start: start + 140] print(name, docstr) except: continue print_desc("statsmodels", sm.__path__)运行结果如下:
statmodels version 0.8.0rc1
statsmodels.base
statsmodels.compat
statsmodels.datasets
statsmodels.discrete
statsmodels.distributions
statsmodels.duration
statsmodels.emplike
statsmodels.formula
statsmodels.genmod
statsmodels.graphics
statsmodels.imputation
statsmodels.interface
statsmodels.iolib
statsmodels.miscmodels
statsmodels.multivariate
statsmodels.nonparametric DESCRIPTION Foran overview of this module, see docs/source/nonparametric.rst PACKAGE CONTENTS_kernel_base _smoothers_lowess api bandwidths
statsmodels.regression
statsmodels.resampling
statsmodels.robust
statsmodels.sandbox
statsmodels.src
statsmodels.stats
statsmodels.tools
statsmodels.tsa
2 移动平均值
示例代码如下:
import matplotlib.pyplot as plt import statsmodels.api as sm from pandas.stats.moments import rolling_mean data_loader = sm.datasets.sunspots.load_pandas() df = data_loader.data year_range = df['YEAR'].values plt.plot(year_range, df["SUNACTIVITY"].values, label="Original") plt.plot(year_range, rolling_mean(df, 11)["SUNACTIVITY"].values, label="SMA 11") plt.plot(year_range, rolling_mean(df, 22)["SUNACTIVITY"].values, label="SMA 22") plt.legend() plt.show()
方法二:
import matplotlib.pyplot as plt import statsmodels.api as sm from pandas.stats.moments import rolling_mean import pandas.core.generic data_loader = sm.datasets.sunspots.load_pandas() df = data_loader.data df11 = data_loader.data.rolling(window=11, center=False).mean() df22 = data_loader.data.rolling(window=22, center=False).mean() year_range = df['YEAR'].values plt.plot(year_range, df["SUNACTIVITY"].values, label="Original") # plt.plot(year_range, rolling_mean(df, window=11, center=False)["SUNACTIVITY"].values, label="SMA 11") # plt.plot(year_range, rolling_mean(df, window=22, center=False)["SUNACTIVITY"].values, label="SMA 22") plt.plot(year_range, df11["SUNACTIVITY"].values, label="SMA 11") plt.plot(year_range, df22["SUNACTIVITY"].values, label="SMA 22") plt.legend() plt.show()
运行结果如下:
3 窗口函数
是定义在一个区间(窗口)上的函数,超出定义,函数值取零,可以用来分析频谱,设计滤波器
import matplotlib.pyplot as plt import statsmodels.api as sm from pandas.stats.moments import rolling_window import pandas as pd data_loader = sm.datasets.sunspots.load_pandas() # 从尾部取150行 df = data_loader.data.tail(150) df = pd.DataFrame({"SUNACTIVITY": df['SUNACTIVITY'].values}, index=df['YEAR']) ax = df.plot() def plot_window(win_type): # df2 = rolling_window(df, 22, win_type) df2 = df.rolling(window=22, win_type=win_type).mean() df2.columns = [win_type] # 显示原始数据 df2.plot(ax=ax) # 矩形窗口 plot_window("boxcar") # 三角形窗口 plot_window("triang") # 布莱克曼窗口 plot_window("blackman") # 汉宁窗 plot_window("hanning") # 巴特莱特窗 plot_window("bartlett") # 显示网格 plt.grid() plt.show()运行结果如下:
4 协整的定义
示例代码如下:
import statsmodels.api as sm from pandas.stats.moments import rolling_window import pandas as pd import statsmodels.tsa.stattools as ts import numpy as np # 用来计算ADF统计量 def calc_adf(x, y): result = sm.OLS(x, y).fit() return ts.adfuller(result.resid) # 太阳黑子数据载入到numpy数组 data_loader = sm.datasets.sunspots.load_pandas() data = data_loader.data.values N = len(data) # 计算正弦值,求出该值与自身的协整关系 t = np.linspace(-2 * np.pi, 2 * np.pi, N) sine = np.sin(np.sin(t)) print('Self ADF', calc_adf(sine, sine)) # 给正弦波添加噪音 noise = np.random.normal(0, .01, N) print('ADF sine with noise', calc_adf(sine, sine + noise)) # 生成一个幅值和偏移量更大的余弦波,并混入噪音 cosine = 100 * np.cos(t) + 10 print('ADF sine vs cosine with noise', calc_adf(sine, cosine + noise)) # 正弦与太阳黑子 print('Sine vs sunspots', calc_adf(sine, data))运行结果如下:
Self ADF (2.1752959320935576e-16,
0.95853208606005602,
0,
308,
{'10%': -2.5717944160060719,
'1%': -3.4517611601803702,
'5%': -2.8709700936076912},
-21598.896016765088)
ADF sine with noise (-11.80572756306368,
9.1062110841151392e-22,
2,
306,
{'10%': -2.5718274501260199,
'1%': -3.4519023023726696,
'5%': -2.8710320399170537},
-1857.1417094083959)
ADF sine vs cosine with noise(-6.9222457355201135,
1.1386106445203264e-09,
16,
292,
{'10%': -2.5720714378870331,
'1%': -3.4529449243622383,
'5%': -2.8714895534256861},
-10180.957513197414)
Sine vs sunspots (-6.7242691810700963,
3.4210811915549913e-09,
16,
292,
{'10%': -2.5720714378870331,
'1%': -3.4529449243622383,
'5%': -2.8714895534256861},
-1102.5867415291168)
5 自相关
数据集内部的相关性,可以用来指明趋势
示例代码如下:
import numpy as np import pandas as pd import statsmodels.api as sm import matplotlib.pyplot as plt from pandas.tools.plotting import autocorrelation_plot # 读入测试数据 data_loader = sm.datasets.sunspots.load_pandas() data = data_loader.data['SUNACTIVITY'].values # 计算自相关值 y = data - np.mean(data) norm = np.sum(y ** 2) correlated = np.correlate(y, y, mode='full') / norm res = correlated[len(correlated) / 2:] # 关联度最高值的索引 print(np.argsort(res)[-5:]) # 结果为[ 9 11 10 1 0] # 绘制图形 plt.plot(res) plt.grid(True) plt.xlabel('Lag') plt.ylabel('Autocorrelation') plt.show() # 使用pandas绘制 autocorrelation_plot(data) plt.show()
运行结果如下:
使用pandas绘制
6 自回归模型
可用于预测时间序列将来的值
一元线性回归的计算公式(因变量y与自变量x之间的线性关系)
β0和β1为模型的参数
ε误差项一个期望值为0的随机变量
回归方程
E(y)=β0+β1x
自加归模型的数学公式
示例代码如下:
from scipy.optimize import leastsq import statsmodels.api as sm import matplotlib.pyplot as plt import numpy as np # 搭建模型代码 def model(p, x1, x10): p1, p10 = p return p1 * x1 + p10 * x10 def error(p, data, x1, x10): return data - model(p, x1, x10) # 给参数表赋值 def fit(data): p0 = [.5, 0.5] params = leastsq(error, p0, args=(data[10:], data[9:-1], data[:-10]))[0] return params # 加载数据 data_loader = sm.datasets.sunspots.load_pandas() sunspots = data_loader.data['SUNACTIVITY'].values print(sunspots) # 训练模型 cutoff = .9 * len(sunspots) params = fit(sunspots[:cutoff]) print('Params', params) # 取得各个指标的值 pred = params[0] * sunspots[cutoff - 1:-1] + params[1] * sunspots[cutoff - 10:-10] actual = sunspots[cutoff:] print('Root mean square error', np.sqrt(np.mean(actual - pred) ** 2)) print('Mean absolute error ', np.mean(np.abs(actual - pred))) print('Mean absolute percentage error', 100 * np.mean(np.abs(actual - pred) / actual)) mid = (actual + pred) / 2 print('Symmetric Mean absolute percentage error', 100 * np.mean(np.abs(actual - pred) / mid)) print('Cofficient of determination', 1 - ((actual - pred) ** 2).sum() / ((actual - actual.mean()) ** 2).sum()) year_range = data_loader.data['YEAR'].values[cutoff:] # 绘制图像 # 太阳黑子活动数值 plt.plot(year_range, actual, 'o', label='Sunspots') # 预测值 plt.plot(year_range, pred, 'x', label='Prediction') plt.grid(True) plt.xlabel('YEAR') plt.ylabel('SUNACTIVITY') plt.legend() plt.show()
运行结果如下:
Params [ 0.67172672 0.33626295]
Root mean square error 1.02884848439
Mean absolute error 17.6515446503
Mean absolute percentage error60.7817800736
Symmetric Mean absolute percentage error34.9843386176
Cofficient of determination 0.799940292779
7 ARMA模型
由自回归模型和移动平均模型结合而成,用于时间序列的预测
示例代码如下:
import pandas as pd import matplotlib.pyplot as plt import statsmodels.api as sm import datetime # 加载数据 data_loader = sm.datasets.sunspots.load_pandas() df = data_loader.data # 拟合数据 years = df['YEAR'].values.astype(int) str(years[0]) df.index = pd.Index(sm.tsa.datetools.dates_from_range(str(years[0]), str(years[-1]))) del df['YEAR'] # 预测数据 model = sm.tsa.ARMA(df, (10, 1)).fit() prediction = model.predict('1975', str(years[-1]), dynamic=True) # 绘制数据 # 太阳黑子活动数据 df['1975':].plot() # 预测数据 prediction.plot(style='--', label='Prediction') plt.grid(True) plt.legend() plt.show()
运行结果如下:
8 生成周期信号
示例代码如下:
from scipy.optimize import leastsq import statsmodels.api as sm import matplotlib.pyplot as plt import numpy as np # 搭建模型 def model(p, t): C, p1, f1, phi1, p2, f2, phi2, p3, f3, phi3 = p return C + p1 * np.sin(f1 * t + phi1) + p2 * np.sin(f2 * t + phi2) + p3 * np.sin(f3 * t + phi3) def error(p, y, t): return y - model(p, t) # 给参数表赋值 def fit(y, t): p0 = [y.mean(), 0, 2 * np.pi / 11, 0, 0, 2 * np.pi / 22, 0, 0, 2 * np.pi / 100, 0] params = leastsq(error, p0, args=(y, t))[0] return params # 加载数据 data_loader = sm.datasets.sunspots.load_pandas() sunspots = data_loader.data["SUNACTIVITY"].values years = data_loader.data["YEAR"].values # 训练模型 cutoff = .9 * len(sunspots) params = fit(sunspots[:cutoff], years[:cutoff]) print("Params", params) # 取得各个指标的值 pred = model(params, years[cutoff:]) actual = sunspots[cutoff:] print("Root mean square error", np.sqrt(np.mean((actual - pred) ** 2))) print("Mean absolute error", np.mean(np.abs(actual - pred))) print("Mean absolute percentage error", 100 * np.mean(np.abs(actual - pred) / actual)) mid = (actual + pred) / 2 print("Symmetric Mean absolute percentage error", 100 * np.mean(np.abs(actual - pred) / mid)) print("Coefficient of determination", 1 - ((actual - pred) ** 2).sum() / ((actual - actual.mean()) ** 2).sum()) year_range = data_loader.data["YEAR"].values[cutoff:] # 绘制图形 plt.plot(year_range, actual, 'o', label="Sunspots") plt.plot(year_range, pred, 'x', label="Prediction") plt.grid(True) plt.xlabel("YEAR") plt.ylabel("SUNACTIVITY") plt.legend() plt.show()运行结果如下:
Params [ 47.18800156 28.89947466 0.56827281 6.51174621 4.55214731
0.29372076 -14.3092358 -18.16524066 0.06574835 -4.37789397]
Root mean square error 59.5619988827
Mean absolute error 44.581532168 #平均绝结误差
Mean absolute percentage error65.1643378479
Symmetric Mean absolute percentage error78.4479302694
Coefficient of determination-0.363528934815 #判定系数应尽量接近于1
9 傅量叶分析
FFT (fast fourier transform)快速傅里叶变换
示例代码如下:
import numpy as np import statsmodels.api as sm import matplotlib.pyplot as plt from scipy.fftpack import rfft from scipy.fftpack import fftshift # 加载数据 data_loader = sm.datasets.sunspots.load_pandas() sunspots = data_loader.data["SUNACTIVITY"].values # 创建一个正弦波 t = np.linspace(-2 * np.pi, 2 * np.pi, len(sunspots)) mid = np.ptp(sunspots) / 2 sine = mid + mid * np.sin(np.sin(t)) sine_fft = np.abs(fftshift(rfft(sine))) # 最大振幅的相应索引 print("Index of max sine FFT", np.argsort(sine_fft)[-5:]) # 对太阳黑子进行 fft transformed = np.abs(fftshift(rfft(sunspots))) print("Indices of max sunspots FFT", np.argsort(transformed)[-5:]) # 太阳黑子活动数据和sine函数 plt.subplot(311) plt.plot(sunspots, label="Sunspots") plt.plot(sine, lw=2, label="Sine") plt.grid(True) plt.legend() # 傅里叶变换后的太阳黑子活动数据 plt.subplot(312) plt.plot(transformed, label="Transformed Sunspots") plt.grid(True) plt.legend() # 傅里叶变换后的sine函数 plt.subplot(313) plt.plot(sine_fft, lw=2, label="Transformed Sine") plt.grid(True) plt.legend() plt.show()运行结果如下:
Index of max sine FFT [160 157 166 158 154]
Indices of max sunspots FFT [205 212 215209 154]
10 谱分析
功率频谱
示例代码如下:
import numpy as np import statsmodels.api as sm import matplotlib.pyplot as plt from scipy.fftpack import rfft from scipy.fftpack import fftshift # 加载数据 data_loader = sm.datasets.sunspots.load_pandas() sunspots = data_loader.data["SUNACTIVITY"].values # 创建一个正弦波 t = np.linspace(-2 * np.pi, 2 * np.pi, len(sunspots)) mid = np.ptp(sunspots) / 2 sine = mid + mid * np.sin(np.sin(t)) # 对正弦波进行FFT sine_fft = np.abs(fftshift(rfft(sine))) # 最大振幅的相应索引 print("Index of max sine FFT", np.argsort(sine_fft)[-5:]) # 对太阳黑子进行 fft transformed = fftshift(rfft(sunspots)) print("Indices of max sunspots FFT", np.argsort(transformed)[-5:]) # 太阳黑子活动数据和sine函数 plt.subplot(311) plt.plot(sunspots, label="Sunspots") plt.plot(sine, lw=2, label="Sine") plt.grid(True) plt.legend() # 绘制功率频谱 plt.subplot(312) plt.plot(transformed ** 2, label="Power Spectrum") plt.grid(True) plt.legend() # 相位谱,正弦函数的起始角 plt.subplot(313) plt.plot(np.angle(transformed), label="Phase Spectrum") plt.grid(True) plt.legend() plt.show()
运行结果如下:
11 滤波
是一种信号处理技术,可以对信号的某些部分进行删减或抑制,可以对高频或是低频进行滤波
中值滤波
Wiener滤波
Detrend滤波
示例代码如下:
import statsmodels.api as sm import matplotlib.pyplot as plt from scipy.signal import medfilt from scipy.signal import wiener from scipy.signal import detrend # 加载数据 data_loader = sm.datasets.sunspots.load_pandas() sunspots = data_loader.data['SUNACTIVITY'].values years = data_loader.data['YEAR'].values # 绘制图像 # 原始数据 plt.plot(years, sunspots, label='SUNATCIVITY') # 中值滤波 plt.plot(years, medfilt(sunspots, 11), lw=2, label='Meadian') # wiener滤波 plt.plot(years, wiener(sunspots, 11), '--', lw=2, label='Wiener') # detrend滤波 plt.plot(years, detrend(sunspots), lw=3, label='Detrend') plt.xlabel('YEAR') plt.grid(True) plt.legend() plt.show()
运行结果如下: