即使拼命回望过去无法预知将来,从过去中总结出的规律就是没有没有规律,我们未曾知道未来会有怎样的惊喜。
未来难以捉摸又何妨,仍不妨碍我们把握当下,勇敢的选择与判断。随着模型复杂,量变到质变,我们的确可以有所改善。我们从一个简单的策略开始:双均线策略。
这节博客我们将使用tushare来设计一个简单的策略框架,并在其中实现一种简单的策略:双均线策略,并与benchmark做对比。
[python]项目在github开源:项目地址(demo文件夹下)。
1 双均线策略
首先来介绍双均线策略。双均线策略简单说来就是金叉买入,死叉卖出。
顾名思义,双均线就是使用两条均线,比如我们使用一条5天的移动平均线和一条十天的移动平均线
- 金叉定义为 MA5>MA10开始的位置
- 死叉定义为MA10>MA5开始的位置
2 双均线策略的特点
双均线策略是一种极为简单的策略,对于长期处于涨势时可以取得较好收益,但通常我们不能预测大体走势,常做分析的都知道,单看价格走势曲线是找不到什么规律的,唯一的规律就是没有规律。
并且,双均线策略在两条均线纠缠交错时会产生大量无效交易,所以实际上并不会直接使用该策略。这节博客主要是入门学习,所以找一个简单的策略上上手,下面就直接看代码了。
3 数据获取类
代码都在title摘要给出的链接中开源了。
获取数据部分,就是通过tushare获取对应代码的数据,然后通过rolling方法计算MA5和MA10。因为在计算MA5和MA10过程中,开头的数字会出现NAN,所以采用df.dropna来去除无效值。
本来是想设计一个单例模式,由于python太烂,秀不出来,就算了。
#coding=utf-8
import tushare as ts
import pandas as pd
import numpy as np
import math
def singleton(cls):
def wrapper(code_list, starttime, endtime):
if cls._instance is None:
cls._instance = cls(code_list, starttime, endtime)
return cls._instance
return wrapper
@singleton
#DataRepository = singleton(DataRepository)
class DataRepository(object):
_instance = None
def __init__(self, code_list, starttime, endtime):
self.all_data = {}
for code in code_list:
df = ts.get_k_data(code, starttime, endtime)
df['ma5'] = df['close'].rolling(5).mean()
df['ma10'] = df['close'].rolling(10).mean()
df = df.dropna(how='any')
self.all_data[code] = df
def get_onecode_df(self, code):
return self.all_data[code]
def get_instance(self, code_list, starttime, endtime):
pass
4 策略类
下面这段代码是核心代码。
在初始化函数中,设置benchmark、初始现金、股票代码、开始时间和结束时间等,初始化各种变量。
双均线策略是一个趋势策略,基本思路是金叉买入,死叉卖出,也就是当ma5向上穿过ma10时,则买入,向下穿过ma10时,则卖出。
主要实现在Strategy类中,输入的变量格式如下:code_list = [‘002415’, ‘002416’, ‘000333’],init_cash = 100000,starttime = ‘2014-01-01’,endtime = ‘2017-11-30’,其他几个重要成员变量如下:cash为还剩余的现金;capital_market_value为持仓的市值,按每天的收盘价计算;limit_cash为每只股票分得的仓位,双均线策略中,codelist中的股票平分持仓;position_list为持仓的情况,类型为dict,股票的code为key,持仓的数量为value;data_range为一个回测的日期list,pd.period_range的freq参数选择为B,即工作日;benchmark赋值为sh,即上证指数,以便比对策略结果。
class Strategy(object):
def __init__(self, code_list, init_cash, start_time, end_time):
self.start_time = start_time
self.end_time = end_time
self.data_repository = DataRepository(code_list , self.start_time, self.end_time)
#self.data_repository = DataRepository.get_instance(code_list, \
# self.start_time, \
# self.end_time)
self.code_list = code_list
self.benchmark_code = 'sh'
self.cash = init_cash
self.limited_cash = init_cash/len(code_list)
self.position_list = {}
for code in self.code_list:
self.position_list[code] = 0
self.trade = 0
d = list(pd.period_range(start=start_time, end = end_time, freq='B'))
self.date_range = list(map(str,d))
self.res_df = pd.DataFrame()
self.res_df['date'] = self.date_range
self.capital_market_value = []
def run_simulation(self):
for date in self.date_range:
for code in self.code_list:
sell_signal, sell_open_price = self.get_sell_signal(code, date)
direction = -1
if sell_signal == 1:
amount = self.get_sell_amount(code)
if amount > 0:
commission = self.cal_cost_function(sell_open_price, amount)
#update cash
self.cash += sell_open_price*amount
self.cash -= commission
#update hold
self.position_list[code] -= amount
#add trade log
self.trade = 0
for code in self.code_list:
buy_signal, buy_open_price = self.get_buy_signal(code,date)
direction = 1
if buy_signal == 1:
amount = self.get_buy_amount(code, buy_open_price)
if amount > 0:
commission = self.cal_cost_function(buy_open_price, amount)
#update cash
self.cash -= buy_open_price*amount
self.cash -= commission
#update hold
self.position_list[code] += amount
#add trade
self.trade = 0
self.capital_market_value.append(self.get_market_value(date))
# dates goes by
self.res_df['capital_market_value'] = pd.Series(self.capital_market_value)
self.res_df['profolio_daily_return'] = round((self.res_df['capital_market_value']/\
self.res_df['capital_market_value'].shift(1)-1),4)
self.res_df['benchmark'] = self.get_benchmark_index()
self.res_df['benchmark'].fillna(method='bfill', inplace=True)
self.res_df['benchmark'].fillna(method='ffill', inplace=True)
self.res_df.to_csv('./datares.csv')
所有天数回测结束后,将结果记录到res_df中,该变量为pandas类型,以便分析结果。res_df中数据有每天capital_market_value,每天的涨跌幅,每天对应的benchmark的值(因为回测的date_range已经剔除了周末,节假日等情况会存在停市,故capital_market_value与benchmark的值均使用之前第一个值填充),benchmark采用先bfill,然后还可能存在最开头的值是空的情况,在ffill填充,而capital_market_value则是使用df[df[‘date’] <= date].tail(1)[‘close’]获得。
看懂了上面的代码,下面的这一大段瞟一眼就可以了,就是实现信号判断啊,调仓啊,算手续费啊等。
def get_benchmark_index(self):
df = ts.get_k_data(self.benchmark_code, start=self.start_time, end=self.end_time)
benchmark_list = []
for date in self.date_range:
if df[df['date'] == date].empty:
benchmark_list.append(np.nan)
else:
benchmark_list.append(float(df[df['date']==date]['close']))
return benchmark_list
def get_market_value(self, date):
market_value = 0
for code in self.position_list:
df = self.data_repository.get_onecode_df(code)
if self.position_list[code] != 0:
close_price = df[df['date'] <= date].tail(1)['close']
market_value += self.position_list[code]*float(close_price)
return round(market_value+self.cash, 2)
def get_sell_signal(self, code, date):
df = self.data_repository.get_onecode_df(code)
sell_signal = 0
sell_open_price = 0
if df[df['date'] == date].empty:
return sell_signal, sell_open_price
df = df[df['date'] <= date].tail(3)
if len(df) == 3 and df.iloc[0]['ma5'] > df.iloc[0]['ma10'] and df.iloc[1]['ma5'] < df.iloc[1]['ma10']:
sell_signal = 1
sell_open_price = df.iloc[1]['open']
return sell_signal, sell_open_price
#以后还要加入判断止盈的方法
def get_buy_signal(self, code, date):
df = self.data_repository.get_onecode_df(code)
buy_signal = 0
buy_open_price = 0
if df[df['date'] == date].empty:
return buy_signal, buy_open_price
df = df[df['date'] <= date].tail(3)
if len(df) == 3 and df.iloc[0]['ma5'] < df.iloc[0]['ma10'] and df.iloc[1]['ma5'] > df.iloc[1]['ma10']:
buy_signal = 1
buy_open_price = df.iloc[1]['open']
return buy_signal, buy_open_price
def get_sell_amount(self, code):
return self.position_list[code]
def get_buy_amount(self, code, price):
if self.position_list[code] == 0:
amount = math.floor(self.limited_cash/(price*100))*100
return amount
else:
return 0
def cal_cost_function(self, price, amount):
commission = price*amount*0.0003
#最低5元手续费
if commission > 5:
return commission
else:
return 5
5 运行分析与结果绘制
在一个新文件中import原来两个文件demo.dm_*,然后运行策略,读取策略保存的csv表格文件,然后使用pymatplot进行绘图,对比benchmark和策略的收益。
#coding=utf-8
import demo.dm_strategy
import matplotlib.pyplot as plt
import demo.dm_data
import pandas as pd
import tushare as ts
import math
if __name__ == '__main__':
obj = demo.dm_strategy.Strategy(code_list=['002415', '002416', '000333'], init_cash=100000, \
start_time='2014-01-01', end_time='2018-06-01')
obj.run_simulation()
df = pd.read_csv("./datares.csv")
df['benchmark'] = df['benchmark'] / df['benchmark'][0]
df['benchmark'].plot(legend='True')
df['capital_market_value'] = df['capital_market_value'] / df['capital_market_value'][0]
df['capital_market_value'].plot(legend='True')
plt.show()
这篇博客偷个懒,做测评的时候使用的是三只牛,做对比的时候 Benchmark 选的是上证。这里知道这样做是不严谨的就可以了,毕竟这不是我们要get到的点。

好的,这样我们就实现了一个简单的策略。
OK,See You Next Chapter!
能要源码学习下吗?非常感谢