import warnings
def fxn():
warnings.warn("deprecated", DeprecationWarning)
with warnings.catch_warnings():
warnings.simplefilter("ignore")
fxn()
# importations de modules "local"
import sys
sys.path.append('functions/')
import base_notebook
import css_base_dark
import fonctions_perso
# import des librairies et des paramètres
from base_notebook import *
from css_regression import *
from css_base_dark import *
# creation dossier grahs
path_graph = "data/exports/graphs/"
os.makedirs(path_graph, exist_ok=True)
import matplotlib as mpl
mpl.rcParams['figure.dpi'] = 100
# repère de début de traitement pour calculer le temps total d'exécution du notebook
start_time_m4 = time.time()
# Import des données de production et de consommation d'électricité
df = pd.read_csv("data/inputs/eCO2mix_RTE_energie_M.csv", index_col=0, parse_dates=True, sep='\t', engine='python', encoding='ANSI')
# affichage aléatoires de 3 observations
df.sample(3)
# affichage de la taille du df
df.shape
# affichage des colonnes
df.columns
# affichage de l'index
df.index
Analyse du daraframe
Territoire
Francedf_fr = df.loc[df['Territoire'] == 'France', ['Consommation totale']]
df_fr.columns = ['conso']
df_fr=df_fr.rename_axis('date')
df_fr
# vérification de présence de valeurs nulles
df_fr.isna().sum()
# Visualisons la série temporelle de la consommation d'électricité en France
plt.figure(figsize=(12,4))
plt.plot(df_fr['conso'])
plt.title("Consommation totale d'électricité en France (en GWh)", y=1.01)
plt.ylabel("Consommation (en GWh)")
plt.tight_layout()
plt.savefig(path_graph+"01-serie_temporelle_conso_France_brute.png", dpi=200)
plt.show();
Ainsi, on constate que :
# calculons le minimum et le maximum de la consommation pour chaque année (en indiquant également le mois)
# années de la periode
# on limite les données aux années complètes (on ne compte pas 2021)
years_list = np.arange(2012, 2021, 1).tolist()
# création d'un df df_fr_year avec pour index les années
df_fr_year=pd.DataFrame(
index=years_list,
columns=["avg", "min", "min_m", "max", "max_m"])
# création d'un df groupby par année
df_fr2=df_fr.copy()
df_fr2=df_fr2[df_fr2.index < "2021-01.-01"]
df_fr2['year']=pd.DatetimeIndex(df_fr2.index).year
df_fr2["month"] = pd.DatetimeIndex(df_fr2.index).month
df_fr3=df_fr2.groupby(['year'])
# initialisation des listes avec valeurs et numéro du mois
avg_list_value=[]
sum_list_value=[]
min_list_value=[]
min_list_month=[]
max_list_value=[]
max_list_month=[]
# on boucle sur chaque année et on récupère les données dans les listes
for name, group in df_fr3:
avg_list_value.append(group['conso'].mean())
sum_list_value.append(group['conso'].sum())
min_list_value.append(group['conso'].min())
min_list_month.append(group.loc[group['conso'] == group['conso'].min(), "month"].values[0])
max_list_value.append(group['conso'].max())
max_list_month.append(group.loc[group['conso'] == group['conso'].max(), "month"].values[0])
# on remplit df_fr_year avec les listes précédentes
df_fr_year['avg'] = avg_list_value
df_fr_year['avg'] = df_fr_year['avg'].apply(lambda x: round(x))
df_fr_year['conso_annee'] = sum_list_value
df_fr_year['conso_annee'] = df_fr_year['conso_annee'].apply(lambda x: "{:,}".format(x).replace(',', ' '))
df_fr_year['min'] = min_list_value
df_fr_year['min_m'] = min_list_month
df_fr_year['max'] = max_list_value
df_fr_year['max_m'] = max_list_month
df_fr_year
# prepa des données
years = df_fr_year.index
df_fr['year']=pd.DatetimeIndex(df_fr.index).year
df_fr["month"] = pd.DatetimeIndex(df_fr.index).month
months_chiffre = np.arange(0,12,1).tolist()
months=["Janv", "Fev", "Mars", "Avril", "Mai", "Juin", "Juil", "Aout", "Sept", "Oct", "Nov", "Dec"]
df_fr["month"]=df_fr["month"].apply(lambda x: months[x-1])
df_fr.dtypes
# prepa des couleurs
np.random.seed(100)
mycolors = np.random.choice(list(mpl.colors.XKCD_COLORS.keys()), len(years), replace=False)
# Dessiner le graphique
plt.figure(figsize=(12,6), dpi= 100)
for i, y in enumerate(years) :
if i > 0 :
plt.plot('month', 'conso', data=df_fr.loc[df_fr.year==y, :], color=mycolors[i], label=y)
plt.text(df_fr.loc[df_fr.year==y, :].shape[0]-.9, df_fr.loc[df_fr.year==y, 'conso'][-1 :].values[0], y, fontsize=8, color=mycolors[i])
#Décoration
plt.gca().set(xlim=(-0.3, 12), ylim=(25000, 60000))
plt.ylabel("Consommation (en GWh)")
plt.yticks(fontsize=11, alpha=.7)
plt.title("Saisonnalité de la consommation brute d'électricité en France")
plt.tight_layout()
plt.savefig(path_graph+"02-saisonnalite_consommation_electricite_brute.png", dpi=200)
plt.show();
with plt.style.context("seaborn-ticks"):
fig, axes = plt.subplots(1, 2, figsize=(15,5))
config_moy={"marker":"o",
"markerfacecolor":"white",
"markeredgecolor":"black",
"markeredgewidth":"0.5",
"markersize":"6"}
config_moy_little={"marker":"o",
"markerfacecolor":"white",
"markeredgecolor":"black",
"markeredgewidth":"0.5",
"markersize":"4"}
q_colors = sns.color_palette("Set1", 12, .7)
_=sns.boxplot(x='year', y='conso', data=df_fr[df_fr['year'] != 2021], showmeans=True, meanprops=config_moy, palette=(q_colors[i] for i,j in enumerate(df_fr['year'][:-1])), ax=axes[0]);
qualitative_colors = sns.color_palette("Set3", 18);
_=sns.boxplot(x='month', y='conso', data=df_fr.loc[~df_fr.year.isin([2012, 2020]), :], palette=(qualitative_colors[i] for i,j in enumerate(df_fr['month'])), meanprops=config_moy_little, showmeans=True);
_=axes[0].set_title('BoxPlot par année\n(la Tendance)', color="darkred", fontsize=14);
_=axes[0].set_ylabel("Consommation brute (en GWh)");
_=axes[0].set_xlabel("");
_=axes[1].set_title('BoxPlot par mois\n(La saisonnalité)', color="darkred", fontsize=14);
_=axes[1].set_ylabel("Consommation brute (en GWh)");
_=axes[1].set_xlabel("");
plt.tight_layout(w_pad=3)
plt.savefig(path_graph+"03-boxplot_annee_mois_conso_brute.png", dpi=200)
plt.show();
Boxplot par année
Boxplot par mois
Intégration des données météo
Pour corriger les données de l'effet température, on va intégrer les Degrés Jour Unifié :
Remarques
# import des dju chauffe
dju = pd.read_csv("data/inputs/dju_paris_meteo_chauffe.csv", index_col=[0], sep=";", decimal=",")
dju
# nos données de consommation commencent le 1/1/2012 -> on ne garde que les dju depuis cette date
dju=dju[dju.index>2011]
# suppression de la colonne "total"
dju=dju.drop(columns="Total")
# on ordonnes les données par année (ordre croissant)
dju=dju.sort_index()
dju
On va intégrer les dju dans le df de la consommation
On va au préalable limiter les données de df_fr aux périodes pour lesquelles on dispose des valeurs de dju :
--> jusqu'en mai 2020 inclus
# limitation des données de df_fr
df_fr = df_fr[df_fr.index < '2020-06-01']
# initialisation liste qui contiendra les valeurs de dju
li=[]
# pour chaque index
for i in dju.index:
# on ajoute les valeurs des colonnes
li.extend([dju.loc[i,col] for col in dju.columns])
# on ne garde que les 5 premiers mois de l'années 2020 -> on ne garde donc pas les 7 dernières valeurs de la liste
li=li[:-7]
# on verifie que les tailles de df_fr et de li sont identiques
if len(li) == df_fr.shape[0]:
df_fr['dju']=li
df_fr
df_fr.dtypes
plt.rc('grid', alpha=0.3, linestyle='--')
plt.rc('figure', figsize=(10, 6), dpi=100, titlesize=14)
plt.rc('axes', titlecolor="darkred", titlesize=14, labelcolor="gray", labelsize=12)
plt.rc('xtick', labelsize=11)
plt.rc('ytick', labelsize=11)
fig=plt.figure(figsize=(12,4))
plt.rc('grid', alpha=0.3, linestyle='--')
ax = fig.add_subplot(211)
ax.plot(df_fr['conso'], label="conso", color="teal")
plt.ylabel("Consommation (en GWh)", color="gray", fontsize=10)
plt.legend(fontsize=9)
ax = fig.add_subplot(212)
ax.plot(df_fr['dju'], label="DJU", color="red")
plt.ylabel("DJU (en °C)", fontsize=10)
plt.suptitle('Comparaison graphique "consommation électrique" et graphique "DJU"', y=0.93, color="darkred")
plt.legend(fontsize=9)
plt.tight_layout()
plt.savefig(path_graph+"04-serie_temporelle_conso_France_brute_et_dju_2_graphs.png", dpi=200)
plt.show();
fig, ax1 = plt.subplots(figsize=(15,5))
color = 'teal'
ax1.plot(df_fr["conso"], label="Consommation (en GWh)", color=color)
plt.ylabel("Conso (en GWh)", color=color, labelpad=10)
plt.legend(loc="upper left")
ax2 = plt.gca().twinx()
color2='red'
ax2.plot(df_fr["dju"], label="DJU (en d °C)", color=color2)
plt.ylabel("DJU (en °C)", color=color2, labelpad=10)
plt.legend(loc="upper right")
plt.suptitle('Superposition graphique "consommation électrique" et graphique "DJU"', y=0.95)
plt.tight_layout()
plt.savefig(path_graph+"05-serie_temporelle_conso_France_brute_et_dju_1_graph.png", dpi=200)
plt.show();
#export des data
df_fr.to_csv("data/exports/df_fr.csv", header=True, index=True)
df_fr_year.to_csv("data/exports/df_fr_year.csv", header=True, index=True)
from fonction_regression_lineaire import *
# modèle de régression linéaire
reg = ols('conso~dju', data=df_fr).fit()
# verification des hypothèses d'application du modèle linéaire
res_hyp = hypotheses_regression_lineaire('conso', ['dju'], df_fr, 'conso_dju_chauffe')
# analyse du modèle
res_analyse = analyse_reg(reg, droite=True, dju="dju", df=df_fr, version_name="conso_dju_chauffe")
# on récupère les coefficients
coef_dju = res_analyse['coef']['dju']
# serie conso corrigée de effet de chauffe = serie conso brute - saisonnalité DJU de chauffe
df_fr['conso_corr_dju'] = df_fr['conso'] - df_fr['dju']*coef_dju
plt.figure(figsize=(12,4))
plt.plot(df_fr['conso'], color="teal", label="Consommation brute")
plt.plot(df_fr['conso_corr_dju'], color="red", label="Consommation corrigée")
plt.title("Consommation d'électricité brute et corrigée de l'effet des températures en France (en GWh)")
plt.ylabel("Consommation (en GWh)")
plt.legend(fontsize=8)
plt.tight_layout()
plt.savefig(path_graph+"06-serie_temporelle_conso_France_brute_et_corrigee_effet_temperature.png", dpi=200)
plt.show();
df_fr.sample(1)
plt.figure(figsize=(12,4))
plt.plot(df_fr['conso_corr_dju'], color="red", label="Consommation corrigée", zorder=3)
for i in range(9):
datte = df_fr.index[i*12]
plt.axvline(x=datte, color='lightgray', linestyle='--', zorder=2)
plt.ylabel("Consommation (en GWh)")
plt.title("Consommation d'électricité corrigée de l'effet des températures en France (en GWh)")
plt.legend(loc="best", fontsize=10)
plt.tight_layout()
plt.savefig(path_graph+"07-serie_temporelle_conso_France_corrigee_effet_temperature.png", dpi=200)
plt.show();
une tendance ($T_t$) :
une saisonnalité ($S_t$) :
seasonal_decompose
de 'statsmodels :En fonction de la nature de la tendance et de la saisonnalité, la série temporelle peut être modélisée :
Concernant notre série temporelle, le modèle additif semble le mieux convenir au vu des graphiques précédents
On va maintenant enlever la saisonnalité de la consommation corrigée de l'effet température
Pour rappel, la saisonnalité est ici de 12 mois
Méthode utilisée :
3 objectifs dans la recherche de la MM optimale :
-> on cherche la MM :
# calcul des moyennes mobiles
def moy_mob(serie, p):
nb_elemt = len(serie)
k = p // 2
mb = []
for i in range(k):
mb.append(np.nan)
if p%2 != 0:
for i in range(nb_elemt-p+1):
s=serie[i:i+p]
mb.append(sum(s)/p)
else:
for i in range(nb_elemt-p):
s=serie[i:i+p+1]
mb.append((0.5*s[0]+0.5*s[-1]+sum(s[1:-1]))/p)
for i in range(k):
mb.append(np.nan)
mbr = [round(x,2) for x in mb]
return mbr
On calcule M12 sur la série corrigée conso_corr_dju
-> on obtient ainsi la tendance de la série corrigée
On ajoute cette tendance dans notre df, dans la colonne conso_corr_lissee
x=df_fr['conso_corr_dju'].tolist()
y=moy_mob(x,12)
df_fr['conso_corr_lissee']=y
df_fr.head(10)
plt.figure(figsize=(12,4))
plt.plot(df_fr['conso_corr_dju'], color="red", label="Consommation corrigée", zorder=3)
plt.plot(df_fr['conso_corr_lissee'], color="green", label="Consommation corrigée lissée", zorder=4)
for i in range(9):
datte = df_fr.index[i*12]
plt.axvline(x=datte, color='lightgray', linestyle='--', zorder=2)
plt.ylabel("Consommation (en GWh)")
plt.title("Consommation d'électricité corrigée et corrigée lissée via M12")
plt.legend(loc="best", fontsize=9)
plt.tight_layout()
plt.savefig(path_graph+"08-serie_temporelle_conso_France_corrigee_et_corrigee_et_lissee_M12.png", dpi=200)
plt.show();
plt.figure(figsize=(12,4))
plt.plot(df_fr['conso'], color="lightseagreen", label="Consommation brute")
plt.plot(df_fr['conso_corr_dju'], color="red", label="Consommation corrigée")
plt.plot(df_fr['conso_corr_lissee'], color="green", label="Consommation corrigée lissée", zorder=4)
plt.title("Consommation d'électricité brute, corrigée, et corrigée lissée (en GWh)")
plt.ylabel("Consommation (en GWh)")
plt.legend(loc="best", fontsize=8)
plt.tight_layout()
plt.savefig(path_graph+"09-serie_temporelle_conso_France_brute_et_corrigee_et_lissee.png", dpi=200)
plt.show();
plt.figure(figsize=(12,4))
plt.plot(df_fr['conso_corr_lissee'], color="green", label="Consommation corrigée lissée", zorder=3)
plt.ylabel("Consommation (en GWh)")
plt.title("Consommation d'électricité corrigée et lissée via M12 (en GWh) : la tendance")
plt.legend(loc="best", fontsize=8)
plt.tight_layout()
plt.savefig(path_graph+"10-serie_temporelle_conso_France_corrigee_lissee_M12.png", dpi=200)
plt.show();
conso_corr_dju
) et la tendance (conso_corr_lissee
) est la double composante "saisonnalté et erreur"double_compo
df_fr['double_compo'] = df_fr['conso_corr_dju'] - df_fr['conso_corr_lissee']
df_fr.tail(10)
# copie du df
dft = df_fr.copy()
# limitations des données aux valeurs non NaN et années entières (multiple de 12)
dft = dft[(dft.index>"2012-06-01") & (dft.index<"2019.07.01")]
dft.shape
dft.head(2)
dft.tail(2)
On va appliquer le modèle linéaire suivant : $Y_t = a + bt + \sum_{i=1}^{12} (c_i1)$
Mais problème de colinéarité : la somme des indicatrices est égale à 1 ; or "1" est déjà une variable
=> Solution : pour chaque $Y_t$ on enlève la valeur ajustée moyenne des indicatrices $\hat{c_i}$
# on isole la série à traiter
y=dft[['double_compo']]
# base tendancielle
t=np.arange(1,85,1)
# base saisonnière
# 12 indicatrices : Si vaudra "1" pour le mois i, quelque soit l'année considérée
for i in range(12):
su = np.repeat(0, repeats=12)
su[i] = 1
s = np.tile(su, 84 // len(su) + 1)[:84]
vars()['s' + str(i+1)] = s
from sklearn import linear_model
# on applique la réfression linéaire
reg = linear_model.LinearRegression(fit_intercept=False)
t=reg.fit(np.array([t, s1, s2, s3, s4, s5, s6, s7, s8, s9, s10, s11, s12]).transpose(), y)
# modèle modifié pour pallier au problème de colinéarité
# valeur estiméee moyenne des 12 indicatrices
a = np.mean(reg.coef_[0][1:13])
# coef de t
b = reg.coef_[0][0]
# coef des 12 indicatrices corrigés
c = reg.coef_[0][1:13] - a
saisonnalite = (c[0]*s1+c[1]*s2+c[2]*s3+c[3]*s4+c[4]*s5+c[5]*s6+c[6]*s7+c[7]*s8+c[8]*s9+
c[9]*s10+c[10]*s11+c[11]*s12)
dft['saisonnalite'] = saisonnalite
dft['residus'] = dft['double_compo'] - dft['saisonnalite']
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(15, 4))
dft['saisonnalite'].plot(ax=ax1, color="#ffa600")
dft['residus'].plot(ax=ax2, color="#ef5675")
y_lab=["Saisonnalité", "Résidus"]
for i,x in enumerate([ax1, ax2]):
x.set_xlim("2012-04-01", "2019-09-01")
x.set_ylabel(y_lab[i], color="gray", labelpad=15)
x.set_xlabel("", color="white", fontsize=0)
plt.suptitle("Décomposition de la double composante de la série corrigée de consommation d'électricité (en GWh)", y=0.95)
plt.tight_layout()
plt.savefig(path_graph+"11-decomposition_double_composante_saison_residus_conso_corrigee_dju.png", dpi=200)
plt.show();
pylab.rcParams['figure.figsize'] = (12, 6)
from statsmodels.tsa.seasonal import seasonal_decompose
def decompo_conso(s, model, num_graph):
result = seasonal_decompose(s, model=model, extrapolate_trend='freq')
fig, (ax1, ax2, ax3, ax4) = plt.subplots(4, 1, figsize=(15, 8))
zz = [ax1, ax2, ax3, ax4]
colors = ["#003f5c", "#7a5195", "#ef5675", "#ffa600"]
# Valeurs observées
result.observed.plot(ax=ax1, color=colors[0])
# Tendance
result.trend.plot(ax=ax2, color=colors[1])
# Saisonnalité
result.seasonal.plot(ax=ax3, color=colors[2])
for z in range(9):
year = 2012+z
ax3.axvline(x=str(year)+"-01-01", color='gray', linestyle='--', zorder=3)
# Résidus
result.resid.plot(ax=ax4, color=colors[3])
y_lab = ["Observations", "Tendance", "Saisonnalité", "Résidus"]
for i,z in enumerate(zz):
z.set_xlim('2011-11-01', '2020-07-01')
z.set_ylabel(y_lab[i], color="gray", labelpad=15)
z.set_xlabel("", color="white", fontsize=0)
plt.suptitle("Décomposition "+model+" de la consommation corrigée de l'effet température", y=0.97, color="darkred")
plt.tight_layout()
plt.savefig(path_graph+num_graph+"-decomposition_"+model+"_conso_corrigee_dju.png", dpi=200)
plt.show();
return result
result_add=decompo_conso(s=df_fr['conso_corr_dju'], model='additive', num_graph="12")
print(dir(result_add))
seasonal_decompose
df_verif = pd.DataFrame(list(zip(result_add.trend, result_add.seasonal, result_add.resid)),
columns = ['sd_trend','sd_seasonal', 'sd_resid'],
index = df_fr.index)
df_verif
seasonal_decompose
de statsmodelsfig, (ax1, ax2, ax3, ax4) = plt.subplots(4, 1, figsize=(15, 8))
zz = [ax1, ax2, ax3, ax4]
colors = ["#003f5c", "#7a5195", "#ef5675", "#ffa600"]
colors2 = ["#49c1f8", "#50fba3", "#302bda", "#085dbf"]
# Valeurs observées
result_add.observed.plot(ax=ax1, color=colors[0])
# Tendance
result_add.trend.plot(ax=ax2, color=colors[1], alpha=0.8)
dft['conso_corr_lissee'].plot(ax=ax2, color=colors2[1], linestyle="--", linewidth=1)
# Saisonnalité
result_add.seasonal.plot(ax=ax3, color=colors[2], alpha=0.8)
dft['saisonnalite'].plot(ax=ax3, color=colors2[2], linestyle="--", linewidth=1)
for z in range(9):
year = 2012+z
ax3.axvline(x=str(year)+"-01-01", color='gray', linestyle='--', zorder=3)
# Résidus
result_add.resid.plot(ax=ax4, color=colors[3], alpha=0.8, label="seasonal_decompose")
dft['residus'].plot(ax=ax4, color=colors2[3], linestyle="--", linewidth=1, label="composantes calculées")
y_lab = ["Observations", "Tendance", "Saisonnalité", "Résidus"]
for i,z in enumerate(zz):
z.set_xlim('2011-11-01', '2020-07-01')
z.set_ylabel(y_lab[i], color="gray", labelpad=15)
z.set_xlabel("", color="white", fontsize=0)
plt.legend(bbox_to_anchor=(1, 5), fontsize=8)
plt.suptitle("Comparaison de la décomposition calculée et de la décomposition de 'seasonal_decompose'", x=0.43, y=0.93, color="darkred")
plt.savefig(path_graph+"13-comparaison_decomposition_calculee_et_seasonal_decompose.png", dpi=200)
plt.show();
On peut quantifier ces différences en comparant les valeurs de ces 2 composantes
df_fr_data = dft[['conso_corr_lissee', 'saisonnalite', 'residus']]
df_fr_data.columns = ['calc_trend', 'calc_seasonal', 'calc_resid']
df_verif = df_verif.merge(df_fr_data, left_index=True, right_index=True, how='inner')
df_verif
comp = ['trend', 'seasonal', 'resid']
for c in comp:
df_verif['diff_'+c] = df_verif['sd_'+c] - df_verif['calc_'+c]
# affichage aléatoire de 10 valeurs
df_verif.sample(10)
df_fr['conso_corr_deseason'] = df_fr['conso_corr_dju'] - result_add.seasonal
df_fr.sample(2)
plt.figure(figsize=(12,4))
plt.plot(df_fr['conso_corr_dju'], color="#003f5c", linestyle='dashed', label="Consommation corrigée de l'effet température", zorder=2, alpha=0.5)
plt.plot(df_fr['conso_corr_deseason'], color="#ef5675", label="Consommation corrigée et désaisonnalisée", zorder=3)
plt.ylabel("Consommation (en GWh)")
plt.title("Consommation corrigée et consommation corrigée désaisonnalisée")
plt.legend(loc="best", fontsize=8)
plt.tight_layout()
plt.savefig(path_graph+"14-serie_temporelle_conso_corrigee_et_conso_corrigee_desaisonnalisee.png", dpi=200)
plt.show();
On constate graphiquement que la désaisonnalisation a permis de lisser la série :
-> la moyenne semble équivalente, avec une variance plus faible
df_fr[['conso_corr_dju', 'conso_corr_deseason']].describe().T.loc[:,['count', 'mean', 'std', 'min', 'max']].apply(lambda x: round(x,2))
# df avec seulement la consommation corrigée et la consommation desaisonnalisée
df=df_fr.copy()
df=df_fr.drop(columns=['conso_corr_lissee', 'double_compo'])
df=df.rename(columns={'conso_corr_dju': 'conso_corr'})
df=df.rename(columns={'conso_corr_deseason': 'conso_desaison'})
df.head(2)
df.tail(12)
Principe général :
La méthode de prévision de Holt Winters est une méthode empirique qui repose sur le lissage exponentiel :
-> Cette technique se base sur les moyennes pondérées d’observations antérieures, en accordant un poids d'autant plus important que l'observation est récente.
La méthode de Hots Winters utilise un "triple lissage" : elle permet de gérer des séries comportant des niveaux, une tendance mais aussi une saisonnalité.
Principe d'application :
# taille de x_train et x_test
df[df.index < '2019-01-01'].shape
df[df.index >= '2019-01-01'].shape
# preapration des données
df_x = df[['conso_corr']]
df_x_train = df[['conso_corr']][:84]
x_train=df_x_train['conso_corr']
from statsmodels.tsa.api import ExponentialSmoothing
import warnings
from statsmodels.tools.sm_exceptions import ConvergenceWarning
warnings.simplefilter('ignore', ConvergenceWarning)
warnings.simplefilter('ignore', FutureWarning)
# methode de Holt Winters
hw = ExponentialSmoothing(np.asarray(x_train), seasonal_periods=12, trend='add', seasonal='add').fit()
# prévisions
hw_pred = hw.forecast(17)
# ajout d'une colonne 'prevision' dans df
df['prevision_hw']=np.nan
df.loc[df.index > '2018-12-31', 'prevision_hw'] = hw_pred
df_prevision = df[['prevision_hw']]
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(15, 8))
zz = [ax1, ax2]
# x_train + x_test
df_x[:85].plot(ax=ax1, color='teal')
df_prevision[84:].plot(ax=ax1, color='red')
# x + x_test
df_x.plot(ax=ax2, color='teal')
df_prevision[84:].plot(ax=ax2, color='red', linestyle='dashed', alpha=0.8)
for z in zz:
z.set_xlim('2011-11-01', '2020-07-01')
z.set_ylabel("Consommation (en GWh)", color="gray", labelpad=15)
z.set_xlabel("", color="white", fontsize=0)
z.legend(['Consommation réelles', 'Consommation prédites'], loc="best", fontsize=9, frameon=True, edgecolor='black')
plt.suptitle("Prévision de la consommation corrigée à partir de 2019 - Méthode Holt Winters", y= 0.92, fontsize=14, color="darkred")
plt.savefig(path_graph+"15-Prevision_consommation_2019_Holt_Winters.png", dpi=200)
plt.show();
On fait un focus graphique sur la période prédite
Remarque :
Du fait du contexte particulier de la crise sanitaire débutée début 2020, on constate un "décrochage" des prévisions par rapport aux données réelles.
On présente donc 2 graphiques différents des prévisions :
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 8))
zz = [ax1, ax2]
# x_train + x_test (19-mai20)
df_x[84:].plot(ax=ax1, color='teal')
df_prevision[84:].plot(ax=ax1, color='red', linestyle='dashed', alpha=0.8)
ax1.set_xlim('2018-12-01', '2020-06-01')
# x_train + x_test (2019)
df_x[84:-4].plot(ax=ax2, color='teal')
df_prevision[84:-4].plot(ax=ax2, color='red', linestyle='dashed', alpha=0.8)
ax2.set_xlim('2018-12-01', '2020-02-01')
for z in zz:
z.set_ylabel("Consommation (en GWh)", color="gray", labelpad=15)
z.set_xlabel("", color="white", fontsize=0)
z.legend(['Consommation réelles', 'Consommation prédites'], loc="best", fontsize=9, frameon=True, edgecolor='black', shadow=True)
plt.suptitle("Prévision de la consommation corrigée à partir de 2019 - Méthode Holt Winters", y= 0.96, fontsize=14, color="darkred")
plt.tight_layout()
plt.savefig(path_graph+"16-Prevision_consommation_2019_Holt_Winters_focus_periode_predite.png", dpi=200)
plt.show();
dfr = df.copy()
dfr['residus'] = dfr['conso_corr'] - dfr['prevision_hw']
dfr['erreur_pct'] = (dfr['residus'] / dfr['conso_corr']).apply(lambda x: "{:.2%}".format(x))
for col in ['conso_corr', 'prevision_hw', 'residus']:
dfr[col] = dfr[col].apply(lambda x: round(x,2))
analys_res_hw = dfr[84:][['conso_corr', 'prevision_hw', 'residus', 'erreur_pct']]
analys_res_hw.columns=['y_test', 'pred_hw', 'res_hw', 'res_hw_pct']
analys_res_hw=analys_res_hw[:-5]
print(dir(hw))
Calcul de :
On récupère également les valeurs des critères d'informations AIC et BIC
from sklearn.metrics import mean_squared_error
def RMSE(y, predictions):
RMSE = mean_squared_error(y, predictions, squared=False)
return RMSE
def MAPE(y, predictions):
MAPE = np.mean(np.abs((y - predictions) / y)) * 100
return MAPE
res_pred_hw={}
res_pred_hw['res_pred_hw'] = analys_res_hw
y1 = df_x[84:]['conso_corr']
predictions1 = df_prevision[84:]['prevision_hw']
RMSE1=RMSE(y1, predictions1)
MAPE1=MAPE(y1, predictions1)
y2 = df_x[84:-4]['conso_corr']
predictions2 = df_prevision[84:-4]['prevision_hw']
RMSE2=RMSE(y2, predictions2)
MAPE2=MAPE(y2, predictions2)
print("#################################################")
print("#### Performances du modèle de Holt Winters ####")
print("#################################################")
print("#")
print(f"# AIC : {round(hw.aic,3)}")
res_pred_hw['aic']=round(hw.aic,3)
print(f"# BIC : {round(hw.bic,3)}")
res_pred_hw['bic']=round(hw.bic,3)
print("#")
print("######################################")
print("####### Période janv19-mai20 #######")
print("######################################")
print(f"# RMSE : {round(RMSE1, 2)}")
print(f"# MAPE : {round(MAPE1, 2)}")
print("#")
print("######################################")
print("####### Période janv19-dec19 #######")
print("######################################")
print(f"# RMSE : {round(RMSE2, 2)}")
res_pred_hw['rmse']=round(RMSE2, 2)
print(f"# MAPE : {round(MAPE2, 2)}")
res_pred_hw['mape']=round(MAPE2, 2)
Un processus temporel $X_t$ est dit stationnaire au sens faible (ou "de second ordre") s'il remplit les 3 conditions suivantes :
Pour tester la stationnarité d'une série temporelle, on va utiliser différentes méthodes :
df = df[df.index < '2020-01-01']
df
y=df[['conso_corr']]
from fonctions_perso import test_stationnarite_adf
res=test_stationnarite_adf(y, "y", details=False)
Le test nous confirme que la série de consommation corrigée de l'effet température n'est pas stationnaire
Nous allons déstationnariser la série grâce à une ou plusieurs différenciations.
Dans un modèle SARIMA, 2 types de différenciation sont possibles :
une différenciation "en tendance", c'est-à-dire d'ordre 1
-> Concernant notre série temporelle, nous avons vu précédemment que la tendance est stable au cours du temps (une consommation corrigée mensuelle de l'ordre de 31500 GWh environ)
---> nous n'avons donc pas besoin de réaliser de différenciation en tendance
une différenciation "en saisonnalité", c'est-à-dire d'ordre équivalent au nombre de périodes d'une saison
-> Concernant notre série temporelle, nous avons déterminé une saisonnalité de 12 mois
---> nous allons donc a priori réaliser une ou plusieurs différenciations d'ordre 12
Analysons maintenant le graphique de la sortie ACF pour vérifier les autocorrélations
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf
plot_acf(y, zero=True, lags=40)
plt.xticks(np.arange(0, 42, step=3))
plt.show();
Nous allons commencer par la partie saisonnière, puis voir ensuite s'il est nécessaire ou non de différencier en tendance.
La lecture de la sortie ACF indique des pics toutes les 6 périodes.
En regardant plus précisément la sortie ACF, on constate qu'il ne s'agit pas d'une saisonnalité de 6 mais bien d'une saisonnalité de 12 :
-> en effet, on constate le même schéma qui se répète :
--> on va donc effectuer une différenciation d'ordre 12: $\nabla^{12} = (I - B^{12})$
On regarde la sortie ACF correspondante
ydiff_12 = y - y.shift(12)
plot_acf(ydiff_12, zero=True, lags=40, missing='drop')
plt.xticks(np.arange(0, 42, step=3))
plt.show();
La sortie ACF semble désormais correspondre à un autocorrélogramme simple (on ne constate plus de pics liés à une fréquence saisonnière).
Regardons la sortie PACF correspondante
plot_pacf(ydiff_12.dropna(), zero=True, lags=30)
plt.xticks(np.arange(0, 32, step=3))
plt.show();
Ainsi, cette différenciation d'ordre 12 semble suffisante pour rendre le processus startionnaire.
Vérifions la stationnarité avec un test de racine unitaire
res_y12 = test_stationnarite_adf(ydiff_12, 'ydiff_12', details=False)
La série ainsi différenciée parait donc stationnaire au seuil de 5% d'erreur.
Le modèle SARIMA sera donc de la forme :
En effet :
Il faut maintenant déterminer les valeurs des paramètres p, q, P et Q :
-> on s'aide pour cela des graphiques des sorties ACF et PACF, à savoir :
Remarque : Pour les 2 graphiques ACF et PACF
-> seules les lignes verticales qui dépassent les lignes horizontales sont considérées comme significatives
# rappel des graphiques des sorties ACF et PACF
from fonctions_perso import graph_acf_pacf
graph_acf_pacf(ydiff_12, "Différenciation saisonnière d'ordre 12 effectuée 1 fois : ydiff_12", lags=30, zero=False, name_plt="18-ACF_PACF_ydiff12" )
ydiff_12_2 = ydiff_12 - ydiff_12.shift(12)
graph_acf_pacf(ydiff_12_2, "Différenciation saisonnière d'ordre 12 effectuée 1 fois : ydiff_12", lags=20, zero=False, name_plt="19-ACF_PACF_ydiff_12")
Essayons de déterminer les paramètres p,q, P et Q
Ainsi, un modèle possible serait le suivant :
La lecture des sorties ACF et PACF n'est pas évidente :
on va tester différentes valeurs pour les paramètres p,q, P et Q
---> on se limite, pour chaque paramètre, aux valeurs 0, 1 et 2 (des valeurs supérieures risquant d'entraîner un ajustement excessif des données et/ou des problèmes d'estimations)
pour l'ordre des différenciations, nous gardons les valeurs qui nous ont permis de stationnariser la série, à savoir d=0 et D=1
--> déterminons toutes les combinaisons possibles
from fonctions_perso import combi_possibles
combi_sarima=combi_possibles(d_start=0, d_end=0, sd_start=1, sd_end=1, m=12, details=True)
Nous allons procéder en 2 temps :
utilisation d'une fonction créée pour ce projet (select_model_sarima
, disponible dans le dossier "fonctions_perso") qui se base sur la méthode décrite précédemment ;
--> à noter que nous distinguerons 2 cas différents :
utilisation de la fonction auto-SARIMA
du package pyramid-arima
qui utilise une méthode similaire (documentation)
# import des fonctions
from fonctions_perso import select_model_sarima
from fonctions_perso import affichage_graph_diagnostics
y = df['conso_corr']
pdq = combi_sarima['pdq']
seasonal_pdq = combi_sarima['s_pdq']
select_best_mod = select_model_sarima(y, pdq, seasonal_pdq, version_name="test_1_modele_optimal", periode_saison=12, perf="aic", details=False)
=> Aucun modèle potentiel ou possible;
Testons les mêmes paramètres avec la fonction auto-SARIMA
du package pyramid-arima
import pmdarima as pm
y = df[['conso_corr']]
smodel = pm.auto_arima(y, start_p=0, start_q=0,
test='adf',
max_p=2, max_q=2, m=12,
start_P=0, seasonal=True,
d=0, D=1, trace=True,
error_action='trace',
suppress_warnings=True,
stepwise=True)
smodel.summary()
Le modèle retourné n'est pas valide
--> la p_value d'un coefficient est supérieur à 0,05 ==> ce coefficient n'est donc pas significatif
Faisons une différenciation supplémentaire, à savoir une différenciation de la tendance, soit d=1, et testons la stationnarité
ydiff_12_1 = ydiff_12 - ydiff_12.shift(1)
res_y12_1 = test_stationnarite_adf(ydiff_12_1, 'ydiff_12_1', details=False)
La série est stationnaire avec cette nouvelle différenciation
=> regardons les sorties ACF et PACF correspondant à cette double différenciation
graph_acf_pacf(ydiff_12_1, "Double différenciation d=1 et D=12 : ydiff_12_1", lags=26, zero=False, name_plt="20-ACF_PACF_ydiff_12_1" )
La lecture des sorties ACF et PACF n'est pas évidente
=> on va maintenant tester différentes valeurs pour les paramètres p,q, P et Q (de 0 à 2), en fixant les paramètres de différenciation d=1 et D=12
combi2_sarima=combi_possibles(d_start=1, d_end=1, sd_start=1, sd_end=1, m=12, details=True)
y = df['conso_corr']
pdq = combi2_sarima['pdq']
seasonal_pdq = combi2_sarima['s_pdq']
select_best_mod2 = select_model_sarima(y, pdq, seasonal_pdq, version_name="test_2_modele_optimal", periode_saison=12, perf="aic", details=False)
Ainsi, le modèle optimal, c'est-à-dire celui qui possède les critères d'information AIC le plus faible, est le suivant :
affichage_graph_diagnostics(select_best_mod2, 'mod_1', size_img='medium')
On va vérifier ce résultat en le comparant à celui obtenu avec la fonction auto-SARIMA
du package pyramid-arima
y = df[['conso_corr']]
smodel = pm.auto_arima(y, start_p=0, start_q=0,
test='adf',
max_p=2, max_q=2, m=12,
start_P=0, seasonal=True,
d=1, D=1, trace=True,
error_action='trace',
suppress_warnings=True,
stepwise=True)
smodel.summary()
Le modèle renvoyé par auto_arima
est différent;
Nous allons donc tester ces 2 modèles :
On répartit la série temporelle de la consommation corrigée en 2 :
On vérifie que les 2 modèles sont toujours valides
--> on vérifie les éléments suivants :
Si les modèles sont toujours valides, alors on peut réaliser des prévisions :
y_train
y_prev
pour une année supplémentaire (cf la période couverte par y_test)y_prev
et les valeurs réelles y_test
--> on réalise un test ADF sur la série tronquée
ydiff_12_1_t=ydiff_12_1.dropna()[:-12]
res_y12_1_t = test_stationnarite_adf(ydiff_12_1_t, 'ydiff_12_1', details=False)
Ainsi, la p_value du test de stationnarité est légèrement supérieure à 0.05, soit 0.057
--> même si la valeur est limite, on va considérer malgré tout la série tronquée comme stationnaire
from fonctions_perso import prediction_sarima
y=df['conso_corr']
# Tests de validité du modèle 1
res_model1 = prediction_sarima(y, order=(0,1,2), s_order=(1,1,2,12), model_name="model_1", m=12, nb_graph=21, details=False, graph=True, res_only=False)
=> le modèle 1 est bien valide pour la période couverte par y_train (2012-2018)
# Test de validité du modèle 2
res_model2 = prediction_sarima(y, order=(2,1,0), s_order=(0,1,2,12), model_name="model_2", m=12, nb_graph=22, details=True, graph=True, res_only=False)
Le modèle 2 n'est plus un modèle potentiel sur la période 2012-2018 :
de plus, le QQ Plot laisse penser que la distribution des résidus n'est pas forcément gaussienne
==> ainsi, on ne retient que le modèle 1, soit SARIMA(0,1,2)x(1,1,2,12)
Nous allons comparer les résultats des prévisions obtenues par les deux types de modèles :
print(*res_model1)
print(*res_pred_hw)
df_res = res_pred_hw['res_pred_hw']
df_res
df_res['pred_sarima'] = res_model1['previsions_xtest']
df_res['res_sarima'] = df_res['y_test'] - df_res['pred_sarima']
df_res['res_sarima_pct'] = (df_res['res_sarima'] / df_res['y_test']).apply(lambda x: "{:.2%}".format(x))
for col in ['y_test', 'pred_sarima', 'res_sarima']:
df_res[col] = df_res[col].apply(lambda x: round(x,2))
df_res
print("Modèle Holt Winters :")
print("#####################")
print(f"AIC = {res_pred_hw['aic']}")
print(f"BIC = {res_pred_hw['bic']}")
print(f"RMSE = {res_pred_hw['rmse']}")
print(f"MAPE = {res_pred_hw['mape']}")
print()
print("Modèle SARIMA :")
print("#####################")
print(f"AIC = {res_model1['aic']}")
print(f"BIC = {res_model1['bic']}")
print(f"RMSE = {round(res_model1['rmse'],2)}")
print(f"MAPE = {round(res_model1['mape'],2)}")
plt.figure(figsize=(10,4))
plt.plot(df_res['y_test'], color='blue', label='Consommation réelle', alpha=0.35)
plt.plot(df_res['pred_hw'], color='red', label='Consommation prédite Holt Winters')
plt.plot(df_res['pred_sarima'], color='green', label='Consommation prédite SARIMA')
plt.ylabel("Consommation en GWh")
plt.title("Prédictions de la consommation corrigée pour l'année 2019 par le modèle Holt Winters et SARIMA")
plt.legend(loc="best", fontsize=9, frameon=True, edgecolor='black')
plt.tight_layout()
plt.savefig(path_graph+"23-Prediction_Conso_Corrigee_2019_HW_et_SARIMA_1_graph.png", dpi=200)
plt.show();
df_prevision_final = df_prevision[:-5]
df_prevision_final['prevision_sarima'] = df_res['pred_sarima']
df_x_final = df_x[:-5]
df_x_prev = df_x.merge(df_prevision_final, left_index=True, right_index=True)
df_x_prev
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(15, 8))
zz = [ax1, ax2]
# x_train + x_test
df_x_prev['conso_corr'][:85].plot(ax=ax1, color='teal', alpha=0.4)
df_x_prev['prevision_hw'][84:].plot(ax=ax1, color='red')
df_x_prev['prevision_sarima'][84:].plot(ax=ax1, color='green')
# x + x_test
df_x_prev['conso_corr'].plot(ax=ax2, color='teal', alpha=0.4)
df_x_prev['prevision_hw'][84:].plot(ax=ax2, color='red', linestyle='dashed', alpha=0.8)
df_x_prev['prevision_sarima'][84:].plot(ax=ax2, color='green', linestyle='dashed', alpha=0.8)
for z in zz:
z.set_xlim('2011-11-01', '2020-02-01')
z.set_ylabel("Consommation (en GWh)", color="gray", labelpad=15)
z.set_xlabel("", color="white", fontsize=0)
z.legend(['Consommation réelles', 'Consommation prédites hw', 'Consommation prédites SARIMA'], loc="best", fontsize=9, frameon=True, edgecolor='black')
plt.suptitle("Prévision de la consommation corrigée à partir de 2019 - Méthode Holt Winters et SARIMA", y= 0.92, fontsize=14, color="darkred")
plt.savefig(path_graph+"24-Prediction_Conso_Corrigee_2019_HW_et_SARIMA_2_graphs.png", dpi=200)
plt.show();