跳转到主要内容

算法交易库

项目描述

AAT

AsyncAlgoTrading

Build Status Coverage BCH compliance License PyPI Docs

aat 是一个用于在Python中编写算法交易策略的框架。它设计为模块化和可扩展,是驱动 AlgoCoin 的核心引擎。

它提供了跨(和之间)多个交易所的实时交易支持,完全集成的回测支持,滑点和交易成本建模,以及通过手动和程序化算法控制实现的强大报告和风险缓解。

类似于本系统的灵感来源 Zipline,aat 暴露了一个单一的策略类,该类用于实时交易和回测。策略类足够简单,可以快速编写和测试算法,但足够灵活,可以允许进行复杂的滑点和交易成本建模,以及中后期交易分析。

概述

aat 由4个主要部分组成。

  • 交易引擎
  • 风险管理引擎
  • 执行引擎
  • 回测引擎

交易引擎

交易引擎初始化所有交易所和策略,然后在整个系统上跟踪高级统计信息,同时将数据、交易请求和交易响应在策略、风险、执行和交易所对象之间进行协调。

风险管理引擎

风险管理引擎执行交易限制,确保策略局限于特定的风险配置文件。它可以根据用户偏好和未结头寸和订单修改或删除交易请求。

执行引擎

执行引擎是将底层交易所的简单透传。它提供了一个统一的接口来创建各种类型的订单。

回测引擎

回测引擎提供了在历史数据上离线运行相同策略的能力。

交易策略

aat 的核心元素是交易策略接口。它是 Strategy 接口的联合,该接口提供买卖方法,以及 Callback 接口,该接口提供对数据的回调。用户通过继承这个类来实现他们的策略。

回调

class Callback(metaclass=ABCMeta):
    @abstractmethod
    def onTrade(self, data: MarketData):
        '''onTrade'''

    @abstractmethod
    def onOpen(self, data: MarketData):
        '''onOpen'''

    @abstractmethod
    def onFill(self, resp: TradeResponse):
        '''onFill'''

    @abstractmethod
    def onCancel(self, data: MarketData):
        '''onCancel'''

    @abstractmethod
    def onChange(self, data: MarketData):
        '''onChange'''

    @abstractmethod
    def onError(self, data: MarketData):
        '''onError'''

策略

class Strategy(metaclass=ABCMeta):
    @abstractmethod
    def requestBuy(self,
                   callback: Callback,
                   data: MarketData):
        '''requestBuy'''

    @abstractmethod
    def requestSell(self,
                    callback: Callback,
                    data: MarketData):
        '''requestSell'''

示例策略

这是一个简单的交易策略,它只买一次并持有。

from aat.strategy import TradingStrategy
from aat.structs import MarketData, TradeRequest, TradeResponse
from aat.enums import Side, OrderType
from aat.logging import STRAT as slog, ERROR as elog

class BuyAndHoldStrategy(TradingStrategy):
    def __init__(self) -> None:
        super(BuyAndHoldStrategy, self).__init__()
        self.bought = None

    def onFill(self, res: TradeResponse) -> None:
        self.bought = res
        log.info('d->g:bought %.2f @ %.2f' % (res.volume, res.price))

    def onTrade(self, data: MarketData) -> bool:
        if self.bought is None:
            req = TradeRequest(side=Side.BUY,
                               volume=1,
                               instrument=data.instrument,
                               order_type=OrderType.MARKET,
                               exchange=data.exchange,
                               price=data.price,
                               time=data.time)
            log.info("requesting buy : %s", req)
            self.requestBuy(req)
            self.bought = 'pending'
    def onError(self, e) -> None:
        elog.critical(e)

    def onChange(self, data: MarketData) -> None:
        pass

    def onCancel(self, data: MarketData) -> None:
        pass

    def onOpen(self, data: MarketData) -> None:
        pass

交易策略有许多处理消息的必需方法。

  • onTrade:当发生交易时调用
  • onChange:当订单被修改时调用
  • onFill:当策略的交易执行时调用
  • onCancel:当订单被取消时调用
  • onError:当发生错误时调用
  • onOpen:当出现新订单时调用

还有其他更精细处理的可选回调

  • onStart:当程序开始时调用
  • onHalt:当交易暂停时调用
  • onContinue:当交易继续时调用
  • onExit:当程序关闭时调用

还有几个用于回测的可选回调

  • slippage
  • transactionCost
  • onAnalyze
    • 在交易引擎处理完所有数据后调用,用于可视化算法性能

设置和运行

一个 TradingStrategy 类的实例可以运行实时或针对一组历史交易/报价数据进行。当使用 TradingEngineConfig 对象实例化 TradingEngine 对象时,TradingEngineConfig 有一个 type 属性,可以设置为

  • live - 对交易所进行实时交易
  • simulation - 对交易所进行实时交易,但禁用订单输入
  • sandbox - 对交易所的沙箱实例进行实时交易
  • backtest - 对历史 OHLCV 数据进行离线交易

为了在任何模式下测试我们的策略,我们需要设置交易所密钥以获取历史数据,流式传输市场数据,并创建新的订单。

API 密钥

你应该为希望交易的交易所创建 API 密钥。对于这个例子,我们将假设有一个启用了交易的 Coinbase Pro 账户。我通常将我的密钥放在一组 git 忽略的 shell 脚本中,这样就不会意外发布任何内容。我的脚本看起来像这样

export COINBASE_API_KEY=...
export COINBASE_API_SECRET=...
export COINBASE_API_PASS=...

在运行之前,我需要源代码我需要的密钥。

沙箱

目前仅支持 Gemini 沙箱,其他交易所已停止其沙箱。要在沙箱中运行,将 TradingEngineConfig.type 设置为沙箱。

实时交易

当您想运行实时交易时,将 TradingEngineConfig.type 设置为实时。您将需要熟悉风险和执行引擎,因为它们控制最大回撤、最大风险累积、执行积极性等。

模拟交易

当您想运行一个算法但还没有确信它能赚钱时,将 TradingEngineConfig.type 设置为模拟。这将允许它使用真实资金运行,但禁用订单输入。然后您可以像在回测中那样设置滑点交易成本。

测试

让我们通过在 Coinbase Pro 交易所上运行一个示例策略(不进行交易!)来确保一切正常

python3 -m algocoin --simulation --exchanges=coinbase

您应该看到以下输出

python3 -m algocoin --simulation --exchanges=coinbase
2019-06-01 17:54:17,468 CRITICAL -- MainProcess parser.py:151 --
2019-06-01 17:54:17,469 CRITICAL -- MainProcess parser.py:152 -- Simulation trading
2019-06-01 17:54:17,469 CRITICAL -- MainProcess parser.py:153 --
2019-06-01 17:54:34,570 CRITICAL -- MainProcess trading.py:194 --
2019-06-01 17:54:34,570 CRITICAL -- MainProcess trading.py:195 -- Server listening on port: 8081
2019-06-01 17:54:34,571 CRITICAL -- MainProcess trading.py:196 --
2019-06-01 17:54:34,998 CRITICAL -- MainProcess market_data.py:68 -- Starting algo trading: ExchangeType.COINBASE

配置

由于有许多选项,通常配置文件更易于使用。以下是上面在 CoinbasePro 上回测 Buy-and-hold 策略的示例配置

> cat backtest.cfg
[general]
verbose=1
print=0
TradingType=backtest

[exchange]
exchanges=coinbase
currency_pairs=BTC/USD

[strategy]
strategies =
    aat.strategies.buy_and_hold.BuyAndHoldStrategy

[risk]
max_drawdown = 100.0
max_risk = 100.0
total_funds = 10.0

分析算法

我们可以通过以下命令运行上述配置

python3 -m algocoin --config=./backtest.cfg

我们应该看到以下输出

2019-06-01 17:58:40,173 INFO -- MainProcess utils.py:247 -- running in verbose mode!
2019-06-01 17:58:40,174 CRITICAL -- MainProcess parser.py:165 --
2019-06-01 17:58:40,174 CRITICAL -- MainProcess parser.py:166 -- Backtesting
2019-06-01 17:58:40,174 CRITICAL -- MainProcess parser.py:167 --
2019-06-01 17:58:40,176 CRITICAL -- MainProcess trading.py:106 -- Registering strategy: <class 'aat.strategies.buy_and_hold.BuyAndHoldStrategy'>
2019-06-01 17:58:40,177 INFO -- MainProcess backtest.py:25 -- Starting....
2019-06-01 17:58:41,338 INFO -- MainProcess buy_and_hold.py:28 -- requesting buy : <BTC/USD-Side.BUY:1.0@8567.06-OrderType.MARKET-ExchangeType.COINBASE>
2019-06-01 17:58:41,339 INFO -- MainProcess risk.py:59 -- Requesting 1.000000 @ 8567.060000
2019-06-01 17:58:41,339 INFO -- MainProcess risk.py:80 -- Risk check passed for partial order: <BTC/USD-Side.BUY:1.0@8567.06-OrderType.MARKET-ExchangeType.COINBASE>
2019-06-01 17:58:41,339 INFO -- MainProcess trading.py:244 -- Risk check passed
2019-06-01 17:58:41,339 INFO -- MainProcess trading.py:292 -- Slippage BT- <BTC/USD-Side.BUY:1.0@8567.916706-TradeResult.FILLED-ExchangeType.COINBASE>
2019-06-01 17:58:41,340 INFO -- MainProcess trading.py:295 -- TXN cost BT- <BTC/USD-Side.BUY:1.0@8589.336497765-TradeResult.FILLED-ExchangeType.COINBASE>
2019-06-01 17:58:41,340 INFO -- MainProcess buy_and_hold.py:14 -- d->g:bought 1.00 @ 8589.34
2019-06-01 17:58:41,340 INFO -- MainProcess backtest.py:42 -- <BTC/USD-1.29050038@8567.06-TickType.TRADE-ExchangeType.COINBASE>
...
2019-06-01 17:58:41,474 INFO -- MainProcess backtest.py:42 -- <BTC/USD-2.35773043@8595.0-TickType.TRADE-ExchangeType.COINBASE>
2019-06-01 17:58:41,474 INFO -- MainProcess backtest.py:33 -- Backtest done, running analysis.

这将调用我们的onAnalyze函数,在这种情况下,该函数实现为使用matplotlib绘制一些性能特性。

        import pandas
        import numpy as np
        import matplotlib, matplotlib.pyplot as plt
        import seaborn as sns
        matplotlib.rc('font', **{'size': 5})

        # extract data from trading engine
        portfolio_value = engine.portfolio_value()
        requests = engine.query().query_tradereqs()
        responses = engine.query().query_traderesps()
        trades = pandas.DataFrame([{'time': x.time, 'price': x.price} for x in engine.query().query_trades(instrument=requests[0].instrument, page=None)])
        trades.set_index(['time'], inplace=True)

        # format into pandas
        pd = pandas.DataFrame(portfolio_value, columns=['time', 'value', 'pnl'])
        pd.set_index(['time'], inplace=True)

        # setup charting
        sns.set_style('darkgrid')
        fig = plt.figure()
        ax1 = fig.add_subplot(311)
        ax2 = fig.add_subplot(312)
        ax3 = fig.add_subplot(313)

        # plot algo performance
        pd.plot(ax=ax1, y=['value'], legend=False, fontsize=5, rot=0)

        # plot up/down chart
        pd['pos'] = pd['pnl']
        pd['neg'] = pd['pnl']
        pd['pos'][pd['pos'] <= 0] = np.nan
        pd['neg'][pd['neg'] > 0] = np.nan
        pd.plot(ax=ax2, y=['pos', 'neg'], kind='area', stacked=False, color=['green', 'red'], legend=False, linewidth=0, fontsize=5, rot=0)

        # annotate with key data
        ax1.set_title('Performance')
        ax1.set_ylabel('Portfolio value($)')
        for xy in [portfolio_value[0][:2]] + [portfolio_value[-1][:2]]:
            ax1.annotate('$%s' % xy[1], xy=xy, textcoords='data')
            ax3.annotate('$%s' % xy[1], xy=xy, textcoords='data')

        # plot trade intent/trade action
        ax3.set_ylabel('Intent/Action')
        ax3.set_xlabel('Date')

        ax3.plot(trades)
        ax3.plot([x.time for x in requests if x.side == Side.BUY],
                 [x.price for x in requests if x.side == Side.BUY],
                 '2', color='y')
        ax3.plot([x.time for x in requests if x.side == Side.SELL],
                 [x.price for x in requests if x.side == Side.SELL],
                 '1', color='y')
        ax3.plot([x.time for x in responses if x.side == Side.BUY],  # FIXME
                 [x.price for x in responses if x.side == Side.BUY],
                 '^', color='g')
        ax3.plot([x.time for x in responses if x.side == Side.SELL],  # FIXME
                 [x.price for x in responses if x.side == Side.SELL],
                 'v', color='r')

        # set same limits
        y_bot, y_top = ax1.get_ylim()
        x_bot, x_top = ax1.get_xlim()
        ax3.set_ylim(y_bot, y_top)
        ax1.set_xlim(x_bot, x_top)
        ax2.set_xlim(x_bot, x_top)
        ax3.set_xlim(x_bot, x_top)
        dif = (x_top-x_bot)*.01
        ax1.set_xlim(x_bot-dif, x_top+dif)
        ax2.set_xlim(x_bot-dif, x_top+dif)
        ax3.set_xlim(x_bot-dif, x_top+dif)
        plt.show()

我们可以看到,我们的算法还实现了slippagetransactionCost,导致执行价格更差。

    def slippage(self, resp: TradeResponse) -> TradeResponse:
        slippage = resp.price * .0001  # .01% price impact
        if resp.side == Side.BUY:
            # price moves against (up)
            resp.slippage = slippage
            resp.price += slippage
        else:
            # price moves against (down)
            resp.slippage = -slippage
            resp.price -= slippage
        return resp

    def transactionCost(self, resp: TradeResponse) -> TradeResponse:
        txncost = resp.price * resp.volume * .0025  # gdax is 0.0025 max fee
        if resp.side == Side.BUY:
            # price moves against (up)
            resp.transaction_cost = txncost
            resp.price += txncost
        else:
            # price moves against (down)
            resp.transaction_cost = -txncost
            resp.price -= txncost
        return resp

扩展

除了编写新策略外,这个库还可以通过添加新的交易所进行扩展。这很简单。对于加密货币交易所,我非常依赖ccxtasyncio和websocket库。

示例

以下是coinbase交易所。大部分代码是用来管理不同的websocket订阅选项,以及在不同格式之间进行转换,例如符号、订单类型等。

class CoinbaseExchange(Exchange):
    @lru_cache(None)
    def subscription(self):
        return [json.dumps({"type": "subscribe", "product_id": x.value[0].value + '-' + x.value[1].value}) for x in self.options().currency_pairs]

    @lru_cache(None)
    def heartbeat(self):
        return json.dumps({"type": "heartbeat", "on": True})

    def tickToData(self, jsn: dict) -> MarketData:
        '''convert a jsn tick off the websocket to a MarketData struct'''
        if jsn.get('type') == 'received':
            return

        s = jsn.get('type').upper()
        reason = jsn.get('reason', '').upper()
        if s == 'MATCH' or (s == 'DONE' and reason == 'FILLED'):
            typ = TickType.TRADE
        elif s in ('OPEN', 'DONE', 'CHANGE', 'HEARTBEAT'):
            if reason == 'CANCELED':
                typ = TickType.CANCEL
            elif s == 'DONE':
                typ = TickType.FILL
            else:
                typ = TickType_from_string(s.upper())
        else:
            typ = TickType.ERROR

        order_id = jsn.get('order_id', jsn.get('maker_order_id', ''))
        time = parse_date(jsn.get('time')) if jsn.get('time') else datetime.now()

        if typ in (TickType.CANCEL, TickType.OPEN):
            volume = float(jsn.get('remaining_size', 'nan'))
        else:
            volume = float(jsn.get('size', 'nan'))
        price = float(jsn.get('price', 'nan'))

        currency_pair = str_to_currency_pair_type(jsn.get('product_id')) if typ != TickType.ERROR else PairType.NONE

        instrument = Instrument(underlying=currency_pair)

        order_type = str_to_order_type(jsn.get('order_type', ''))
        side = str_to_side(jsn.get('side', ''))
        remaining_volume = float(jsn.get('remaining_size', 0.0))

        sequence = int(jsn.get('sequence', -1))
        ret = MarketData(order_id=order_id,
                         time=time,
                         volume=volume,
                         price=price,
                         type=typ,
                         instrument=instrument,
                         remaining=remaining_volume,
                         side=side,
                         exchange=self.exchange(),
                         order_type=order_type,
                         sequence=sequence)
        return ret

项目详情


下载文件

下载适用于您平台的文件。如果您不确定选择哪个,请了解更多关于安装包的信息。

源分布

aat-0.0.3.tar.gz (633.4 kB 查看散列)

上传时间

构建分布

aat-0.0.3-cp37-cp37m-macosx_10_13_x86_64.whl (644.5 kB 查看散列)

上传时间 CPython 3.7m macOS 10.13+ x86-64

由以下机构支持

AWS AWS 云计算和安全赞助商 Datadog Datadog 监控 Fastly Fastly CDN Google Google 下载分析 Microsoft Microsoft PSF 赞助商 Pingdom Pingdom 监控 Sentry Sentry 错误日志 StatusPage StatusPage 状态页面