#!/usr/bin/env python # coding: utf-8 # In[1]: import os wd = os.getcwd() print(wd) import sys import warnings warnings.filterwarnings('ignore') import numpy as np import pandas as pd import scipy import statsmodels.api as sm import sklearn.decomposition as sck_dec import matplotlib import matplotlib.pyplot as plt import matplotlib.patches as mpatches matplotlib.style.use('ggplot') get_ipython().magic('matplotlib inline') from IPython.display import display, HTML, Image # In[2]: ecb_yc = pd.read_csv('data.csv') ecb_yc.head()[0:2] # In[3]: ecb_yc_sub = ecb_yc[['DATA_TYPE_FM', 'TIME_PERIOD', 'OBS_VALUE']].copy() ecb_yc_sub['TIME_PERIOD'] = pd.to_datetime(ecb_yc_sub['TIME_PERIOD'], format = '%Y-%m-%d') ecb_yc_sub = ecb_yc_sub.set_index('TIME_PERIOD') # In[4]: ecb_yc_sub['2019-07'][0:3] # In[5]: ecb_yc_sub['DATA_TYPE_FM'].unique() # In[6]: def extract_yield_type(data_type_fm): return(data_type_fm[0:3]) ecb_yc_sub['YIELD_TYPE'] = ecb_yc_sub['DATA_TYPE_FM'].apply(extract_yield_type) # In[7]: ecb_yc_sr = ecb_yc_sub[ecb_yc_sub['YIELD_TYPE'] == 'SR_'].copy() ecb_yc_sr.head() # In[8]: yield_3m = ecb_yc_sr[ecb_yc_sr['DATA_TYPE_FM'] == 'SR_3M'] yield_3m['OBS_VALUE'].plot(figsize = (10, 6), color = 'blue') plt.legend((['Yield 3m'])) # In[9]: maturities = [1, 2, 3, 5, 7, 10, 20, 30] # In[10]: time = []; time.append('3M'); time.append('6M') time_float = []; time_float.append(3/12); time_float.append(6/12) yield_df_column_names = []; yield_df_column_names.append('Yield 3M'); yield_df_column_names.append('Yield 6M'); # In[11]: for i in maturities: time.append(str(i) + 'Y') time_float.append(i) yield_df_column_names.append('Yield ' + str(i) + 'Y') # In[12]: yields_df = pd.DataFrame(index = yield_3m.index, columns = yield_df_column_names) # In[13]: for i in range(0, len(time)): maturity = time[i] yield_name = 'Yield ' + maturity yields_df[yield_name] = ecb_yc_sr[ecb_yc_sr['DATA_TYPE_FM'] == 'SR_' + maturity]['OBS_VALUE'] yields_df.tail()[-3:] # In[14]: fig = plt.gcf(); fig.set_size_inches(16, 10.5) plt.plot(time_float, yields_df.iloc[-1], linestyle = '-', marker = 'o', color = 'blue', lw = 3) plt.plot(time_float, yields_df.iloc[0], linestyle = '--', marker = 'x', color = 'orange', lw = 3) plt.legend(loc = 'lower right', frameon = True) plt.xlabel('Years'); plt.ylabel('Yield') # In[15]: yields_df.plot(figsize = (16, 10)) # In[ ]: # C.1 Run PCA # In[16]: X = np.matrix(yields_df) X_dm = X - np.mean(X, axis = 0) Cov_X = np.cov(X_dm, rowvar = False) eigen = np.linalg.eig(Cov_X) eig_values_X = np.matrix(eigen[0]) eig_vectors_X = np.matrix(eigen[1]) Y_dm = X_dm * eig_vectors_X # In[63]: yields_trans = Y_dm.copy() np.savetxt('x_dm.csv',X_dm,delimiter=',') np.savetxt('eig_vectors_x.csv',eig_vectors_X,delimiter=',') np.savetxt('y_dm.csv',Y_dm,delimiter=',') # In[72]: yields_trans[:, 0:3] # In[18]: plt.figure(figsize = (14, 8)) plt.plot(yields_trans[:, 0:3]) plt.legend(['PC1', 'PC2', 'PC3']) # In[62]: Y_dm # In[73]: #Compare PC1 with yields pc1_yields = yields_df.copy() pc1_yields['Yields_PC1'] = yields_trans[:, 0]*(-1) pc1_yields['Yields_PC2'] = yields_trans[:, 1] pc1_yields['Yields_PC3'] = yields_trans[:, 2] # In[74]: pc1_yields[['Yield 1Y', 'Yield 5Y', 'Yield 10Y', 'Yields_PC1']].iloc[:, 0:11].plot(figsize=(14,12)) # In[75]: pc1_yields.corr() # In[ ]: #C.2 Impirtance of PCs Level, Slope, Curveture # In[80]: var_explained = np.zeros(eig_values_X.shape[1]) var_explained_agg = np.zeros(eig_values_X.shape[1]) # sum of eigenvalues eig_values_X_mat = np.diagflat(np.array(eig_values_X)) eigen_values = eig_values_X_mat.diagonal() eig_val_sum_all = np.sum(eigen_values) for i in range(len(eigen_values)): var_explained[i] = eigen_values[i] / eig_val_sum_all eig_val_sum = np.sum(eigen_values[0:i+1]) var_explained_agg[i] = eig_val_sum / eig_val_sum_all print('') print('\t \t \t PC1 PC2 PC3 PC4 PC5') print('') print('Variance Explained: ', np.round(var_explained[0:5], 2)) print('Agg Variance Explained: ', np.round(var_explained_agg[0:5], 2)) # In[81]: plt.figure(figsize = (8,6)) plt.bar(np.arange(len(var_explained_agg))[0:3], var_explained[0:3], align = 'center', color = 'blue') plt.xlim(-1, 3) plt.ylabel('Explained Variance Ratio') # In[82]: # Original 30y yield yield_30y = yields_df.iloc[-1]['Yield 30Y'] print(np.round(yield_30y, 2)) # In[84]: # Use only first three PCs yield_30y_pc_123 = yields_trans[-1, 0:3] * eig_vectors_X[-1, 0:3].getI() yield_30y_pc_123 = yield_30y_pc_123 + yields_df['Yield 30Y'].mean() print(np.round(yield_30y_pc_123.item(), 2)) # In[85]: yields_trans[-1, 0:3] # In[87]: eig_vectors_X[-1, 0:3].getI() # In[88]: yields_trans[-1, 0:3] * eig_vectors_X[-1, 0:3].getI() # In[90]: yields_df['Yield 30Y'].mean() # In[91]: ##D. Yield curve forcast # In[ ]: #D.1 Forcasting the Principal Components # In[92]: # number of principal components n = 10 # In[93]: def calc_ols_beta(Y, X): nobs = int(X.shape[0]) nvar = int(X.shape[1]) betas = (X.getT() * X).getI() * X.getT() * Y epsilon = Y - X * betas sigma2_eps = ((epsilon.getT() * epsilon) / (nobs - nvar)).item() var = sigma2_eps * (X.getT() * X).getI() se = np.sqrt(var.diagonal()).getT() tstats = (betas - 0) / se return betas, tstats # In[110]: # Run AR(1) estimation constant = np.empty(3); constant_tstat = np.empty(3) ar1_coef = np.empty(3); ar1_coef_tstat = np.empty(3) # for PC i for i in range(0, 3): X = np.matrix(yields_trans[0: len(yields_trans)-1, i]) #Lagged value X = np.insert(X, obj = 0, values = 1, axis = 1) Y = np.matrix(yields_trans[1:len(yields_trans), i]).reshape(163,1) betas, tstats = calc_ols_beta(Y, X) constant[i] = betas[0][0]; constant_tstat[i] = tstats[0][0] ar1_coef[i] = betas[1][0]; ar1_coef_tstat[i] = tstats[1][0] #print results print('') print('constant: ', np.round(constant, 4)) print('t-stats: ', np.round(constant_tstat, 4)) print('') print('AR(1) coefficients:', np.round(ar1_coef, 4)) print('t-stats: ', np.round(ar1_coef_tstat, 3)) print('') # In[97]: # Second, forcast PC1, PC2, PC3 pc1_fc = ar1_coef[0] * yields_trans[-1, 0] pc2_fc = ar1_coef[1] * yields_trans[-1, 1] pc3_fc = ar1_coef[2] * yields_trans[-1, 2] pc_fc = np.array([pc1_fc, pc2_fc, pc3_fc]).reshape(1, 3) # In[98]: pc_fc # In[99]: #D.2 ECB Yield Curve Forcast #third, obtain yields from PC1, 2, 3 forcast # In[101]: #Recover yields X_dm_fc = pc_fc * eig_vectors_X[:, 0:3].getI() #Add mean yields_fc = np.zeros(n) for i in range(n): yields_fc[i] = X_dm_fc[0, i] + np.mean(yields_df.iloc[:, i]) yields_fc # In[102]: #E.g. 6m forcast print('Original 6m Yield: ', np.round(yields_df.iloc[-1, 1], 4)) print('Forecast 6m Yield: ', np.round(yields_fc[1], 4)) print('') print('Difference: ', np.round(yields_df.iloc[-1, 1] - yields_fc[1], 4)) # In[103]: #E.g. 30y forcast print('Original 30y Yield: ', np.round(yields_df.iloc[-1, 9], 4)) print('Forecast 30y Yield: ', np.round(yields_fc[9], 4)) print('') print('Difference: ', np.round(yields_df.iloc[-1, 9] - yields_fc[9], 4)) # In[104]: # fourth, plot expected yield curve # In[105]: fig = plt.gcf() fig.set_size_inches(18, 14) plt.plot(time_float, yields_df.iloc[-1], label = yields_df.index[-1].date(), linestyle='-', marker='o') plt.plot(time_float, np.array(yields_fc), label = '2019-08-22 (forcast)', linestyle='--', marker='o') plt.legend(loc = 'lower right', frameon = True) plt.xlabel('Years') plt.ylabel('(Expected)Yield') # In[106]: # short end of the yield curve fig = plt.gcf() fig.set_size_inches(18, 14) plt.plot(time_float[0:5], yields_df.iloc[-1][0:5], label = yields_df.index[-1].date(), linestyle='-', marker='o') plt.plot(time_float[0:5], np.array(yields_fc[0:5]), label = '2019-08-22 (forcast)', linestyle='--', marker='o') plt.legend(loc = 'lower right', frameon = True) plt.xlabel('Years') plt.ylabel('(Expected)Yield') # In[ ]: