算法交易库
项目描述
AAT
AsyncAlgoTrading
aat
是一个用于在Python中编写算法交易策略的框架。它设计为模块化和可扩展,是驱动 AlgoCoin 的核心引擎。
它提供了跨(和之间)多个交易所的实时交易支持,完全集成的回测支持,滑点和交易成本建模,以及通过手动和程序化算法控制实现的强大报告和风险缓解。
类似于本系统的灵感来源 Zipline,aat
暴露了一个单一的策略类,该类用于实时交易和回测。策略类足够简单,可以快速编写和测试算法,但足够灵活,可以允许进行复杂的滑点和交易成本建模,以及中后期交易分析。
概述
aat
由4个主要部分组成。
- 交易引擎
- 风险管理引擎
- 执行引擎
- 回测引擎
交易引擎
交易引擎初始化所有交易所和策略,然后在整个系统上跟踪高级统计信息,同时将数据、交易请求和交易响应在策略、风险、执行和交易所对象之间进行协调。
风险管理引擎
风险管理引擎执行交易限制,确保策略局限于特定的风险配置文件。它可以根据用户偏好和未结头寸和订单修改或删除交易请求。
执行引擎
执行引擎是将底层交易所的简单透传。它提供了一个统一的接口来创建各种类型的订单。
回测引擎
回测引擎提供了在历史数据上离线运行相同策略的能力。
交易策略
aat
的核心元素是交易策略接口。它是 Strategy
接口的联合,该接口提供买卖方法,以及 Callback
接口,该接口提供对数据的回调。用户通过继承这个类来实现他们的策略。
回调
class Callback(metaclass=ABCMeta):
@abstractmethod
def onTrade(self, data: MarketData):
'''onTrade'''
@abstractmethod
def onOpen(self, data: MarketData):
'''onOpen'''
@abstractmethod
def onFill(self, resp: TradeResponse):
'''onFill'''
@abstractmethod
def onCancel(self, data: MarketData):
'''onCancel'''
@abstractmethod
def onChange(self, data: MarketData):
'''onChange'''
@abstractmethod
def onError(self, data: MarketData):
'''onError'''
策略
class Strategy(metaclass=ABCMeta):
@abstractmethod
def requestBuy(self,
callback: Callback,
data: MarketData):
'''requestBuy'''
@abstractmethod
def requestSell(self,
callback: Callback,
data: MarketData):
'''requestSell'''
示例策略
这是一个简单的交易策略,它只买一次并持有。
from aat.strategy import TradingStrategy
from aat.structs import MarketData, TradeRequest, TradeResponse
from aat.enums import Side, OrderType
from aat.logging import STRAT as slog, ERROR as elog
class BuyAndHoldStrategy(TradingStrategy):
def __init__(self) -> None:
super(BuyAndHoldStrategy, self).__init__()
self.bought = None
def onFill(self, res: TradeResponse) -> None:
self.bought = res
log.info('d->g:bought %.2f @ %.2f' % (res.volume, res.price))
def onTrade(self, data: MarketData) -> bool:
if self.bought is None:
req = TradeRequest(side=Side.BUY,
volume=1,
instrument=data.instrument,
order_type=OrderType.MARKET,
exchange=data.exchange,
price=data.price,
time=data.time)
log.info("requesting buy : %s", req)
self.requestBuy(req)
self.bought = 'pending'
def onError(self, e) -> None:
elog.critical(e)
def onChange(self, data: MarketData) -> None:
pass
def onCancel(self, data: MarketData) -> None:
pass
def onOpen(self, data: MarketData) -> None:
pass
交易策略有许多处理消息的必需方法。
- onTrade:当发生交易时调用
- onChange:当订单被修改时调用
- onFill:当策略的交易执行时调用
- onCancel:当订单被取消时调用
- onError:当发生错误时调用
- onOpen:当出现新订单时调用
还有其他更精细处理的可选回调
- onStart:当程序开始时调用
- onHalt:当交易暂停时调用
- onContinue:当交易继续时调用
- onExit:当程序关闭时调用
还有几个用于回测的可选回调
- slippage
- transactionCost
- onAnalyze
- 在交易引擎处理完所有数据后调用,用于可视化算法性能
设置和运行
一个 TradingStrategy
类的实例可以运行实时或针对一组历史交易/报价数据进行。当使用 TradingEngineConfig
对象实例化 TradingEngine
对象时,TradingEngineConfig
有一个 type
属性,可以设置为
live
- 对交易所进行实时交易simulation
- 对交易所进行实时交易,但禁用订单输入sandbox
- 对交易所的沙箱实例进行实时交易backtest
- 对历史 OHLCV 数据进行离线交易
为了在任何模式下测试我们的策略,我们需要设置交易所密钥以获取历史数据,流式传输市场数据,并创建新的订单。
API 密钥
你应该为希望交易的交易所创建 API 密钥。对于这个例子,我们将假设有一个启用了交易的 Coinbase Pro 账户。我通常将我的密钥放在一组 git 忽略的 shell 脚本中,这样就不会意外发布任何内容。我的脚本看起来像这样
export COINBASE_API_KEY=...
export COINBASE_API_SECRET=...
export COINBASE_API_PASS=...
在运行之前,我需要源代码我需要的密钥。
沙箱
目前仅支持 Gemini 沙箱,其他交易所已停止其沙箱。要在沙箱中运行,将 TradingEngineConfig.type
设置为沙箱。
实时交易
当您想运行实时交易时,将 TradingEngineConfig.type
设置为实时。您将需要熟悉风险和执行引擎,因为它们控制最大回撤、最大风险累积、执行积极性等。
模拟交易
当您想运行一个算法但还没有确信它能赚钱时,将 TradingEngineConfig.type
设置为模拟。这将允许它使用真实资金运行,但禁用订单输入。然后您可以像在回测中那样设置滑点交易成本。
测试
让我们通过在 Coinbase Pro 交易所上运行一个示例策略(不进行交易!)来确保一切正常
python3 -m algocoin --simulation --exchanges=coinbase
您应该看到以下输出
python3 -m algocoin --simulation --exchanges=coinbase
2019-06-01 17:54:17,468 CRITICAL -- MainProcess parser.py:151 --
2019-06-01 17:54:17,469 CRITICAL -- MainProcess parser.py:152 -- Simulation trading
2019-06-01 17:54:17,469 CRITICAL -- MainProcess parser.py:153 --
2019-06-01 17:54:34,570 CRITICAL -- MainProcess trading.py:194 --
2019-06-01 17:54:34,570 CRITICAL -- MainProcess trading.py:195 -- Server listening on port: 8081
2019-06-01 17:54:34,571 CRITICAL -- MainProcess trading.py:196 --
2019-06-01 17:54:34,998 CRITICAL -- MainProcess market_data.py:68 -- Starting algo trading: ExchangeType.COINBASE
配置
由于有许多选项,通常配置文件更易于使用。以下是上面在 CoinbasePro 上回测 Buy-and-hold 策略的示例配置
> cat backtest.cfg
[general]
verbose=1
print=0
TradingType=backtest
[exchange]
exchanges=coinbase
currency_pairs=BTC/USD
[strategy]
strategies =
aat.strategies.buy_and_hold.BuyAndHoldStrategy
[risk]
max_drawdown = 100.0
max_risk = 100.0
total_funds = 10.0
分析算法
我们可以通过以下命令运行上述配置
python3 -m algocoin --config=./backtest.cfg
我们应该看到以下输出
2019-06-01 17:58:40,173 INFO -- MainProcess utils.py:247 -- running in verbose mode!
2019-06-01 17:58:40,174 CRITICAL -- MainProcess parser.py:165 --
2019-06-01 17:58:40,174 CRITICAL -- MainProcess parser.py:166 -- Backtesting
2019-06-01 17:58:40,174 CRITICAL -- MainProcess parser.py:167 --
2019-06-01 17:58:40,176 CRITICAL -- MainProcess trading.py:106 -- Registering strategy: <class 'aat.strategies.buy_and_hold.BuyAndHoldStrategy'>
2019-06-01 17:58:40,177 INFO -- MainProcess backtest.py:25 -- Starting....
2019-06-01 17:58:41,338 INFO -- MainProcess buy_and_hold.py:28 -- requesting buy : <BTC/USD-Side.BUY:1.0@8567.06-OrderType.MARKET-ExchangeType.COINBASE>
2019-06-01 17:58:41,339 INFO -- MainProcess risk.py:59 -- Requesting 1.000000 @ 8567.060000
2019-06-01 17:58:41,339 INFO -- MainProcess risk.py:80 -- Risk check passed for partial order: <BTC/USD-Side.BUY:1.0@8567.06-OrderType.MARKET-ExchangeType.COINBASE>
2019-06-01 17:58:41,339 INFO -- MainProcess trading.py:244 -- Risk check passed
2019-06-01 17:58:41,339 INFO -- MainProcess trading.py:292 -- Slippage BT- <BTC/USD-Side.BUY:1.0@8567.916706-TradeResult.FILLED-ExchangeType.COINBASE>
2019-06-01 17:58:41,340 INFO -- MainProcess trading.py:295 -- TXN cost BT- <BTC/USD-Side.BUY:1.0@8589.336497765-TradeResult.FILLED-ExchangeType.COINBASE>
2019-06-01 17:58:41,340 INFO -- MainProcess buy_and_hold.py:14 -- d->g:bought 1.00 @ 8589.34
2019-06-01 17:58:41,340 INFO -- MainProcess backtest.py:42 -- <BTC/USD-1.29050038@8567.06-TickType.TRADE-ExchangeType.COINBASE>
...
2019-06-01 17:58:41,474 INFO -- MainProcess backtest.py:42 -- <BTC/USD-2.35773043@8595.0-TickType.TRADE-ExchangeType.COINBASE>
2019-06-01 17:58:41,474 INFO -- MainProcess backtest.py:33 -- Backtest done, running analysis.
这将调用我们的onAnalyze
函数,在这种情况下,该函数实现为使用matplotlib
绘制一些性能特性。
import pandas
import numpy as np
import matplotlib, matplotlib.pyplot as plt
import seaborn as sns
matplotlib.rc('font', **{'size': 5})
# extract data from trading engine
portfolio_value = engine.portfolio_value()
requests = engine.query().query_tradereqs()
responses = engine.query().query_traderesps()
trades = pandas.DataFrame([{'time': x.time, 'price': x.price} for x in engine.query().query_trades(instrument=requests[0].instrument, page=None)])
trades.set_index(['time'], inplace=True)
# format into pandas
pd = pandas.DataFrame(portfolio_value, columns=['time', 'value', 'pnl'])
pd.set_index(['time'], inplace=True)
# setup charting
sns.set_style('darkgrid')
fig = plt.figure()
ax1 = fig.add_subplot(311)
ax2 = fig.add_subplot(312)
ax3 = fig.add_subplot(313)
# plot algo performance
pd.plot(ax=ax1, y=['value'], legend=False, fontsize=5, rot=0)
# plot up/down chart
pd['pos'] = pd['pnl']
pd['neg'] = pd['pnl']
pd['pos'][pd['pos'] <= 0] = np.nan
pd['neg'][pd['neg'] > 0] = np.nan
pd.plot(ax=ax2, y=['pos', 'neg'], kind='area', stacked=False, color=['green', 'red'], legend=False, linewidth=0, fontsize=5, rot=0)
# annotate with key data
ax1.set_title('Performance')
ax1.set_ylabel('Portfolio value($)')
for xy in [portfolio_value[0][:2]] + [portfolio_value[-1][:2]]:
ax1.annotate('$%s' % xy[1], xy=xy, textcoords='data')
ax3.annotate('$%s' % xy[1], xy=xy, textcoords='data')
# plot trade intent/trade action
ax3.set_ylabel('Intent/Action')
ax3.set_xlabel('Date')
ax3.plot(trades)
ax3.plot([x.time for x in requests if x.side == Side.BUY],
[x.price for x in requests if x.side == Side.BUY],
'2', color='y')
ax3.plot([x.time for x in requests if x.side == Side.SELL],
[x.price for x in requests if x.side == Side.SELL],
'1', color='y')
ax3.plot([x.time for x in responses if x.side == Side.BUY], # FIXME
[x.price for x in responses if x.side == Side.BUY],
'^', color='g')
ax3.plot([x.time for x in responses if x.side == Side.SELL], # FIXME
[x.price for x in responses if x.side == Side.SELL],
'v', color='r')
# set same limits
y_bot, y_top = ax1.get_ylim()
x_bot, x_top = ax1.get_xlim()
ax3.set_ylim(y_bot, y_top)
ax1.set_xlim(x_bot, x_top)
ax2.set_xlim(x_bot, x_top)
ax3.set_xlim(x_bot, x_top)
dif = (x_top-x_bot)*.01
ax1.set_xlim(x_bot-dif, x_top+dif)
ax2.set_xlim(x_bot-dif, x_top+dif)
ax3.set_xlim(x_bot-dif, x_top+dif)
plt.show()
我们可以看到,我们的算法还实现了slippage
和transactionCost
,导致执行价格更差。
def slippage(self, resp: TradeResponse) -> TradeResponse:
slippage = resp.price * .0001 # .01% price impact
if resp.side == Side.BUY:
# price moves against (up)
resp.slippage = slippage
resp.price += slippage
else:
# price moves against (down)
resp.slippage = -slippage
resp.price -= slippage
return resp
def transactionCost(self, resp: TradeResponse) -> TradeResponse:
txncost = resp.price * resp.volume * .0025 # gdax is 0.0025 max fee
if resp.side == Side.BUY:
# price moves against (up)
resp.transaction_cost = txncost
resp.price += txncost
else:
# price moves against (down)
resp.transaction_cost = -txncost
resp.price -= txncost
return resp
扩展
除了编写新策略外,这个库还可以通过添加新的交易所进行扩展。这很简单。对于加密货币交易所,我非常依赖ccxt
、asyncio
和websocket库。
示例
以下是coinbase交易所。大部分代码是用来管理不同的websocket订阅选项,以及在不同格式之间进行转换,例如符号、订单类型等。
class CoinbaseExchange(Exchange):
@lru_cache(None)
def subscription(self):
return [json.dumps({"type": "subscribe", "product_id": x.value[0].value + '-' + x.value[1].value}) for x in self.options().currency_pairs]
@lru_cache(None)
def heartbeat(self):
return json.dumps({"type": "heartbeat", "on": True})
def tickToData(self, jsn: dict) -> MarketData:
'''convert a jsn tick off the websocket to a MarketData struct'''
if jsn.get('type') == 'received':
return
s = jsn.get('type').upper()
reason = jsn.get('reason', '').upper()
if s == 'MATCH' or (s == 'DONE' and reason == 'FILLED'):
typ = TickType.TRADE
elif s in ('OPEN', 'DONE', 'CHANGE', 'HEARTBEAT'):
if reason == 'CANCELED':
typ = TickType.CANCEL
elif s == 'DONE':
typ = TickType.FILL
else:
typ = TickType_from_string(s.upper())
else:
typ = TickType.ERROR
order_id = jsn.get('order_id', jsn.get('maker_order_id', ''))
time = parse_date(jsn.get('time')) if jsn.get('time') else datetime.now()
if typ in (TickType.CANCEL, TickType.OPEN):
volume = float(jsn.get('remaining_size', 'nan'))
else:
volume = float(jsn.get('size', 'nan'))
price = float(jsn.get('price', 'nan'))
currency_pair = str_to_currency_pair_type(jsn.get('product_id')) if typ != TickType.ERROR else PairType.NONE
instrument = Instrument(underlying=currency_pair)
order_type = str_to_order_type(jsn.get('order_type', ''))
side = str_to_side(jsn.get('side', ''))
remaining_volume = float(jsn.get('remaining_size', 0.0))
sequence = int(jsn.get('sequence', -1))
ret = MarketData(order_id=order_id,
time=time,
volume=volume,
price=price,
type=typ,
instrument=instrument,
remaining=remaining_volume,
side=side,
exchange=self.exchange(),
order_type=order_type,
sequence=sequence)
return ret
项目详情
下载文件
下载适用于您平台的文件。如果您不确定选择哪个,请了解更多关于安装包的信息。
源分布
构建分布
aat-0.0.3.tar.gz的散列
算法 | 散列摘要 | |
---|---|---|
SHA256 | 96371c3b92aed460e178e650400e79473cc942113a152b4bbf78b09490625bc2 |
|
MD5 | 9b341dc80c26ac7da4d6108db37c6332 |
|
BLAKE2b-256 | e7394bb711f7bcdd5db182da4fc7d968bb4e7ac909d80ff0384c6dd3ecc3c144 |
aat-0.0.3-cp37-cp37m-macosx_10_13_x86_64.whl的散列
算法 | 散列摘要 | |
---|---|---|
SHA256 | bbbb833752ee63a54551fe0c87919fe3ab514f0123c7b3f5880d69f49ab76934 |
|
MD5 | 4ff4039ad4247f6b022608295781ec45 |
|
BLAKE2b-256 | 85d4b49830e6e38cc61fe1136431b4989be57a38732bacc8d76b0a533551f436 |