Source code for stbt.simulator

#!/usr/bin/env python3
"""Module with Strategy class to all
   backtest related manipulations"""
import logging
import random
import pickle
import operator
import pandas as pd
from pandas.plotting import register_matplotlib_converters
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.ticker as mticker
import matplotlib.dates as mdates
import seaborn as sns
import stbt.helpers as hf
register_matplotlib_converters()

# logging
###########################################################################
LOG_FORMAT = '%(asctime)s: %(filename)s: %(funcName)s: %(message)s'
FORMATTER = logging.Formatter(LOG_FORMAT, datefmt='%b %d %H:%M:%S')
SYSLOG = logging.StreamHandler()
SYSLOG.setFormatter(FORMATTER)

LOGGER = logging.getLogger('simulator')
LOGGER.addHandler(SYSLOG)
LOGGER.setLevel(logging.INFO)
###########################################################################


[docs]class Strategy(object): """Class to do all backtesting, visualization and statistics calculation Note: data_df and weights_df should have same columns(name and len) Args: data_df (DataFrame): Close prices of instruments should be columns. weights_df (DataFrame): Money distribution for every day, same form with data_df pool_file (str): File to save pnl of created strategy cash (float64): Starting capital and returns multiplier Attributes: data (DataFrame): Close prices of instruments should be columns. weights (DataFrame): Money distribution for every day, same form with data cash (float64): Starting capital and returns multiplier pool_file (str): File to save pnl of created strategy pnl (DataFrame): Accumelated profit and loss of strategy, index is time stats_dict (dict): All the math statistics of strategy backtest data_mistakes_dict (dict): All the data inconsistencies stats_figure (figure): Statistics visualized strategy_figure (figure): Strategy visualized tests_figure (figure): Tests visualized """ def __init__(self, data_df, weights_df, pool_file='strategy_pool.pickle', cash=1.0): """Constructor method""" self.data = data_df # scaling weights, sum of absolute values is one for every row self.weights = weights_df.div( weights_df.abs().sum(axis=1), axis=0).fillna(0) self.instruments = [] # not in use self.cash = cash # not in use, because reasons self.pool_file = pool_file self.pnl = None self.stats_dict = {} self.data_mistakes_dict = { 'shape': 0, 'index_type': 0, 'duplicates': 0, 'Nans': 0, 'missed': 0, 'dates_values': 0, 'column_names': 0} self.stats_figure = None self.strategy_figure = None self.tests_figure = None
[docs] def verify_data_integrity(self, frequency=None): """Method to check data passed in constructor for mistakes Args: frequency (str): Timeframe for data resampling 'D', 'W', 'M', if None - do not resample data """ # lens if len(self.data) != len(self.weights): self.data_mistakes_dict['shape'] += 1 if len(self.data.columns) != len(self.weights.columns): self.data_mistakes_dict['shape'] += 1 # index type if not isinstance(self.data.index, pd.DatetimeIndex): self.data_mistakes_dict['index_type'] += 1 if not isinstance(self.weights.index, pd.DatetimeIndex): self.data_mistakes_dict['index_type'] += 1 # duplicates if len(self.weights[self.weights.index.duplicated()]) >= 1: self.data_mistakes_dict['duplicates'] += 1 if len(self.data[self.data.index.duplicated()]) >= 1: self.data_mistakes_dict['duplicates'] += 1 # NaNs for i in (len(self.data) - self.data.count()).values: if i != 0: self.data_mistakes_dict['Nans'] += 1 for i in (self.weights.count() - len(self.weights)).values: if i != 0: self.data_mistakes_dict['Nans'] += 1 inds = pd.isnull(self.weights).any(1).nonzero()[0] LOGGER.debug('wrong indexes: %s', inds) # missed if frequency: # fill gaps in data df_reindexed = self.data.reindex(self.data.date_range(start=self.data.index.min(), end=self.data.index.max(), freq=frequency)) df_reindexed.fillna(method='ffill', inplace=True) if len(df_reindexed) - len(self.data) >= 1: self.data_mistakes_dict['missed'] += 1 # dates values if self.data.index[0] != self.weights.index[0]: self.data_mistakes_dict['dates_values'] += 1 if self.data.index[-1] != self.weights.index[-1]: self.data_mistakes_dict['dates_values'] += 1 # same columns if list(self.data.columns) != list(self.weights.columns): self.data_mistakes_dict['column_names'] += 1 # mistakes assesment data_is_okay = True for key in self.data_mistakes_dict: if self.data_mistakes_dict[key] != 0: LOGGER.error('There are mistakes in data!, self.data_mistakes_dict: %s', self.data_mistakes_dict) data_is_okay = False if not data_is_okay: raise ValueError('Take a look at data passed to Strategy') else: LOGGER.debug('Data in Strategy is okay, good to go')
[docs] def backtest(self, delay=1, instruments_drop=None, commissions_const=0.0, capitalization=False, start_date=None, end_date=None): """Method to calculate returns and pnl Args: delay (int): Time delay in applying weights to data instruments_drop (list): Columns with such names will be droped from data and weights commissions_const (float64): Fee paid for every transaction: 0.01 is 1% fee for every trade capitalization (Boolean): If money should be reinvested every time start_date (datetime): Date to start trading end_date (datetime): Date to end trading Returns: dict with pnl, returns, commissions dataframes """ # initialize local data for simulate ######################################################### simulate_data = self.data.copy() simulate_weights = self.weights.copy() ######################################################### # filter instruments ######################################################### if instruments_drop is None: pass else: simulate_data.drop(columns=instruments_drop, inplace=True) simulate_weights.drop(columns=instruments_drop, inplace=True) ######################################################### # filter time ######################################################### if (start_date is None) or (end_date is None): pass else: simulate_data = simulate_data[(simulate_data.index > start_date) & ( simulate_data.index < end_date)] simulate_weights = simulate_weights[(simulate_weights.index > start_date) & ( simulate_weights.index < end_date)] ######################################################### # delay ######################################################### if delay: simulate_weights = simulate_weights.shift(delay) ######################################################### # initialize everything ######################################################### commissions = pd.DataFrame( np.zeros(len(simulate_data)), index=simulate_data.index, columns=['coms']) returns_df = pd.DataFrame() pnl = pd.DataFrame(np.zeros(len(simulate_data)), index=simulate_data.index, columns=['_'.join(simulate_data.columns)]) inst_sum_returns = pd.DataFrame( np.zeros(len(simulate_data)), index=simulate_data.index) ######################################################### # calculate commissions ######################################################### weights_diff_df = simulate_weights.diff() if commissions_const > 0.0: commissions = commissions.add(weights_diff_df.sum(axis=1), axis=0) commissions = abs(commissions) * commissions_const commissions.fillna(0, inplace=True) ######################################################### # calculate returns ######################################################### returns_df = simulate_data.pct_change() returns_df.fillna(0, inplace=True) ######################################################### # calculate returns for all instruments with respect to weights and commissions ######################################################### daily_returns_dist = ((returns_df) * simulate_weights) inst_sum_returns = inst_sum_returns.add( daily_returns_dist.sum(axis=1), axis=0) inst_sum_returns.columns = ['_'.join(simulate_data.columns)] inst_sum_returns = inst_sum_returns.subtract( commissions[['coms']].values) ######################################################### # calculate pnl with respect to capitalization ######################################################### if not capitalization: pnl = (inst_sum_returns * self.cash).cumsum() + self.cash else: fake_returns = inst_sum_returns.copy() fake_returns.iloc[0][0] = self.cash pnl[pnl.columns[0]] = ((fake_returns + 1).cumprod()) ######################################################### LOGGER.debug('Strategy was backtested') return { 'pnl': pnl, 'returns': inst_sum_returns, 'coms': commissions, 'capitalization': capitalization, 'delay': delay, 'commissions_const': commissions_const }
[docs] def calculate_sim_stats(self, pnl, returns): """Method to calculate vatious statistics of simulation Args: pnl (DataFrame): Accumelated profit and loss of strategy, index is time returns (DataFrame): Strategy returns for every day, index is time Returns: dict sim_stats_dict with great deal of stats Note: Also creates stats_figure attribute """ sim_stats_dict = { 'start_date': str(returns.index[0]), 'end_date': str(returns.index[-1]), 'Sharpe': 0, 'Sharpe_1d': 0, 'Sharpe_30d': 0, 'Sharpe_90d': 0 } # sharpe calculation: sim_stats_dict['Sharpe'] = hf.get_sharpe(returns) # correlation with data sim_stats_dict['Correlation'] = pnl.corrwith(self.data) # 1day sharpe df_resampled_sharpe_1d = returns.resample('1d').apply(hf.get_sharpe) sim_stats_dict['Sharpe_1d'] = round( df_resampled_sharpe_1d.mean()[0], 1) # 30days sharpe df_resampled_sharpe_30d = returns.resample('30d').apply(hf.get_sharpe) sim_stats_dict['Sharpe_30d'] = round( df_resampled_sharpe_30d.mean()[0], 1) # 30days sharpe df_resampled_sharpe_90d = returns.resample('90d').apply(hf.get_sharpe) sim_stats_dict['Sharpe_90d'] = round( df_resampled_sharpe_90d.mean()[0], 1) # total returns sim_stats_dict['Total_returns'] = str( round(returns.sum()[0] * 100, 2)) + '%' # avg_returns per period sim_stats_dict['Avg_returns'] = str( round(returns.mean()[0] * 100, 2)) + '%' # avg_returns per day returns_resampled_1d = returns.resample('1d').sum() sim_stats_dict['Avg_returns_1d'] = str( round(returns_resampled_1d.mean()[0] * 100, 2)) + '%' # avg_returns per month returns_resampled_30d = returns.resample('30d').sum() sim_stats_dict['Avg_returns_30d'] = str( round(returns_resampled_30d.mean()[0] * 100, 2)) + '%' # Max Drawdawn dd_tuple = hf.get_max_drawdown(returns.iloc[:, 0]) sim_stats_dict['Max_Drawdown'] = str( round(abs(dd_tuple[0]) * 100, 1)) + '%' # Daily Turnover weights_diff = abs(self.weights.diff()) weights_diff_resampled = weights_diff.resample('1d').sum() turnover = str(round(weights_diff_resampled.mean()[0] * 100, 1)) + '%' sim_stats_dict['Turnover_1d'] = turnover turnover_resampled = weights_diff_resampled.resample( '30d').mean() * 100 # MAX CORR sim_stats_dict['Max_corr'] = self.get_max_corr(pnl) # plot stats self.stats_figure = plt.figure(tight_layout=True) axis = plt.subplot2grid((12, 1), (0, 0), rowspan=2, colspan=1) axis.plot(returns_resampled_30d.index.values, returns_resampled_30d.values) axis.plot(returns_resampled_30d.index.values, np.zeros(len(returns_resampled_30d)), 'r--') axis.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d')) axis.xaxis.set_major_locator(mticker.MaxNLocator(5)) plt.title('Avg_returns_30d') axis2 = plt.subplot2grid((12, 1), (3, 0), rowspan=2, colspan=1) axis2.plot(df_resampled_sharpe_30d.index.values, df_resampled_sharpe_30d.values) axis2.plot(df_resampled_sharpe_30d.index.values, np.zeros(len(df_resampled_sharpe_30d)), 'r--') axis2.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d')) axis2.xaxis.set_major_locator(mticker.MaxNLocator(5)) plt.title('Sharpe_30d') axis3 = plt.subplot2grid((12, 1), (6, 0), rowspan=2, colspan=1) axis3.plot(df_resampled_sharpe_90d.index.values, df_resampled_sharpe_90d.values) axis3.plot(df_resampled_sharpe_90d.index.values, np.zeros(len(df_resampled_sharpe_90d)), 'r--') axis3.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d')) axis3.xaxis.set_major_locator(mticker.MaxNLocator(5)) plt.title('Sharpe_90d') axis4 = plt.subplot2grid((12, 1), (9, 0), rowspan=2, colspan=1) axis4.plot(turnover_resampled.index.values, turnover_resampled.values) axis4.plot(turnover_resampled.index.values, np.zeros(len(turnover_resampled)), 'r--') axis4.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d')) axis4.xaxis.set_major_locator(mticker.MaxNLocator(5)) plt.title('Turnover_daily_30d_mean') LOGGER.debug('Statistics for strategy were calculated:') return sim_stats_dict
[docs] def plot_sim_results(self, pnl): """Method to visualize previous backtest Args: pnl (DataFrame): Accumelated profit and loss of strategy, index is time Note: Creates strategy_figure attribute """ self.strategy_figure = plt.figure() ax1 = plt.subplot2grid((12, 1), (0, 0), rowspan=3, colspan=1) ax1.plot(self.data.index, self.data) ax1.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d')) ax1.xaxis.set_major_locator(mticker.MaxNLocator(5)) plt.title('Instruments Price') for xlabel_i in ax1.get_xticklabels(): xlabel_i.set_visible(False) plt.legend(self.data.columns) plt.yscale('log') ax2 = plt.subplot2grid((12, 1), (4, 0), rowspan=5, colspan=1) ax2.plot(pnl.index.values, pnl.values) ax2.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d')) ax2.xaxis.set_major_locator(mticker.MaxNLocator(5)) plt.title('PnL') for xlabel_i in ax2.get_xticklabels(): xlabel_i.set_visible(False) ax3 = plt.subplot2grid((12, 1), (10, 0), rowspan=2, colspan=1) ax3.plot(self.weights.index.values, self.weights.values) ax3.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d')) ax3.xaxis.set_major_locator(mticker.MaxNLocator(5)) plt.title('Weights') LOGGER.debug('Graph with backtest results was created')
[docs] def run_tests(self): """Method to check strategy robusness against time and comissions Note: Creates tests_figure attribute """ list_of_res_dicts = [] tests = [ {'delay': 1}, {'delay': 2}, {'delay': 3}, {'delay': 2, 'commissions_const': 0.001}, ] self.tests_figure = plt.figure() axis = plt.subplot2grid((12, 1), (0, 0), rowspan=12, colspan=1) test_number = 0 for test in tests: list_of_res_dicts.append(self.backtest(**test)) axis.plot(self.weights.index, list_of_res_dicts[-1]['pnl'], label='{}'.format(hf.get_label_from_dict(test))) test_number += 1 axis.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d')) axis.xaxis.set_major_locator(mticker.MaxNLocator(5)) plt.title('Tests_pnls') plt.legend() return list_of_res_dicts
[docs] def run_all(self, delay=1, verify_data_integrity=True, instruments_drop=None, commissions_const=0, capitalization=False, start_date=None, end_date=None): """Method to get all info about strategy(run all methods) Args: delay (int): Time delay in applying weights to data instruments_drop (list): Columns with such names will be droped from data and weights commissions_const (float64): Fee paid for every transaction: 0.01 is 1% fee for every trade capitalization (Boolean): If money should be reinvested every time start_date (datetime): Date to start trading end_date (datetime): Date to end trading """ if verify_data_integrity: self.verify_data_integrity() results_dict = self.backtest(instruments_drop=instruments_drop, commissions_const=commissions_const, capitalization=capitalization, delay=delay, start_date=start_date, end_date=end_date) self.plot_sim_results(results_dict['pnl']) self.pnl = results_dict['pnl'] self.stats_dict = self.calculate_sim_stats( results_dict['pnl'], results_dict['returns']) LOGGER.debug(str(self.stats_dict)) self.run_tests()
[docs] def get_pnls_pool(self): """Method to read all pnls from self.pool_file""" with open(self.pool_file, 'rb') as filename: pnls_df = pickle.load(filename) return pnls_df
[docs] def add_to_pnls_pool(self, pnl_df=None, name=str(random.randint(1, 9999)) + '_pnl'): """Method to add pnls to self.pool_file""" if not pnl_df: pnl_df = self.pnl pnl_df.rename(columns={pnl_df.columns[0]: name}, inplace=True) try: pnls_df = self.get_pnls_pool() if len(pnl_df) == len(pnls_df): pnls_df = pnls_df.join(pnl_df) with open(self.pool_file, 'wb') as filename: pickle.dump(pnls_df, filename) else: LOGGER.error( 'Length of dfs is inconsistent: cant save such pnls!') except FileNotFoundError: pnls_df = pnl_df with open(self.pool_file, 'wb') as filename: pickle.dump(pnls_df, filename) except ValueError: LOGGER.error('''You are trying to add pnl which already exists! (change column name to add it)''') return pnls_df
[docs] def get_pool_heatmap(self): """Method to visualize self.pool_file""" pnls_df = self.get_pnls_pool() corr = pnls_df.corr() figure = plt.figure() sns.heatmap(corr, annot=True) plt.title('Correlation heatmap') return figure, corr
[docs] def get_max_corr(self, pnl): """Method to get highest correlation with pnl from self.pool_file""" corr_dict = {} try: pnls_df = self.get_pnls_pool() time_delta = pnl.index[1] - pnl.index[0] if time_delta != pd.Timedelta(1, 'h'): pnl = hf.resample(pnl, 'H') if len(pnl) > len(pnls_df): zero_df = pd.DataFrame( np.zeros(len(pnls_df)), index=pnls_df.index, columns=pnl.columns) pnl = zero_df + pnl pnl = pnl.dropna() if len(pnl) < len(pnls_df): zero_df = pd.DataFrame( np.zeros(len(pnls_df)), index=pnls_df.index, columns=pnl.columns) pnl = zero_df + pnl pnl = pnl.ffill() for column in pnls_df: corr_dict[column] = pnl.corrwith(pnls_df[column]).values[0] top_key = max(corr_dict.items(), key=operator.itemgetter(1))[0] res_list = [top_key, corr_dict[top_key]] except BaseException as err: LOGGER.error(err) res_list = ['0', 0] return res_list
# Special functions:
[docs]def get_correlation(list_of_pnls, plot=True): """Function to get correlation heatmap Args: list_of_pnls (list): List with dataframes of pnls Returns: corr (DataFrame): Matrix of correalations figure (figure): Seaborn heatmap """ pnl_df = pd.DataFrame() for counter, pnl in enumerate(list_of_pnls): pnl.rename(columns={pnl.columns[0]: "{}".format(counter)}, inplace=True) if pnl_df.empty: pnl_df = pnl else: pnl_df = pnl_df.join(pnl) corr = pnl_df.corr() figure = None if plot: figure = plt.figure() sns.heatmap(corr, annot=True) plt.title('Correlation heatmap') return corr, figure