跳转到主要内容

市场数据转码器

项目描述

Google Cloud Datacast Solution

将高性能交易所数据流导入Google Cloud

这不是官方的Google产品或服务

简介

数据流转换器(Datacast transcoder)是一个基于模式、面向消息的工具,用于简化将常见的高性能电子交易数据格式无损地导入到Google Cloud中。

电子交易场所具有特殊的数据表示和分发需求。特别是,由于交易场所处理的大量交易,高效的消息表示是一个高优先级。云原生API通常使用JSON作为消息负载,但使用高上下文编码表示消息所需的额外字节在计费计算环境中具有成本影响。

与JSON、YAML甚至CSV不同,二进制编码的数据是低上下文且非自我描述的——解释二进制消息的说明必须由生产者分别提前提供,并由解释器遵循。

转换器的架构依赖于以下几个主要抽象,如下所述

模式

模式(也称为数据字典)类似于API规范,但它描述的不是API端点合同,而是系统之间流动的二进制消息的代表性格式。最接近的比较可能类似于SQL数据定义语言支持的表定义,但这些模式既用于运动中的数据,也用于静止的数据。

转换器的当前输入模式支持包括简单二进制编码(SBE)XML以及QuickFIX风格的FIX协议模式表示(也在XML中)。

目标模式和数据元素将根据指定的output_type进行渲染。如果没有指定输出类型,转换器默认将显示转换消息的YAML表示到控制台,并且不执行持久的模式转换。对于Avro和JSON,转换的模式和数据文件封装在本地的POSIX文件中。支持直接转换为BigQuery和Pub/Sub目标,转换的模式在消息导入或发布之前应用。也可以从指定的输入模式推导出BigQuery和Pub/Sub资源的Terraform配置。Terraform选项仅在本地上渲染配置,并不执行Terraform apply--create_schemas_only选项单独转换其他输出类型的模式。

输出资源的名称将分别对应输入模式中定义的消息类型名称。例如,转换器将为在源数据中找到的FIX NewOrderSingle消息创建并使用名为“NewOrderSingle”的Pub/Sub主题进行发布。同样,如果选择输出类型为bigquery,转换器将在由--destination_dataset_id指定的数据集中创建一个NewOrderSingle表。默认情况下,Avro和JSON编码的输出将保存到使用--output_path参数指定的目录中名为<message type>的文件中,具有相应的扩展名。

消息

消息代表两个共享模式的系统之间的离散交互。每条消息都将符合模式中定义的单一消息类型。可以通过传递逗号分隔的消息类型名称字符串到--message_type_exclusions--message_type_inclusions参数来包含或排除特定消息类型以进行处理。

编码

编码描述了消息负载的内容如何表示给系统。许多熟悉的编码,如JSON、YAML或CSV,是自我描述的,并且不严格要求应用程序使用单独的模式定义。然而,SBE、Avro和Protocol Buffers等二进制编码要求应用程序使用相关的模式来正确解释消息。

转换器支持的输入编码包括SBE二进制和ASCII编码(tag=value)FIX。Pub/Sub消息负载的输出编码可以是Avro二进制或Avro JSON。本地文件可以是Avro或JSON。

转换器支持使用--base64--base64_urlsafe选项对消息进行base64解码。

传输

消息传输描述了系统之间传输消息的机制。这可能包括数据在运动中的传输,例如以太网网络,或者数据在静态中的传输,例如位于POSIX文件系统上的文件或位于云存储中的对象。原始消息字节必须从特定的传输中解帧,例如长度分隔的文件或数据包捕获文件。

转换器当前支持的入站消息源传输是PCAP文件、长度分隔的二进制文件和换行分隔的ASCII文件。多播UDP和Pub/Sub入站传输在路线图上。

出站传输选项包括本地存储的Avro和JSON POSIX文件、Pub/Sub主题或BigQuery表。如果没有指定output_type,转换后的消息将输出到控制台,并以YAML编码,不会自动持久化。此外,可以将指定模式的Google Cloud资源定义封装在Terraform配置中。

消息工厂

消息工厂从输入源读取消息负载,确定从模式应用相关的消息类型,并在转换之前对消息数据进行任何调整。例如,消息生产者可能使用非标准的SBE标题或元数据,您可能希望删除或转换。对于标准的FIX标签/值输入源,可以使用包含的fix消息工厂。

CLI用法

usage: txcode  [-h] [--factory {cme,itch,memx,fix}]
               [--schema_file SCHEMA_FILE] [--source_file SOURCE_FILE]
               [--source_file_encoding SOURCE_FILE_ENCODING]
               --source_file_format_type
               {pcap,length_delimited,line_delimited,cme_binary_packet}
               [--base64 | --base64_urlsafe]
               [--fix_header_tags FIX_HEADER_TAGS]
               [--fix_separator FIX_SEPARATOR]
               [--message_handlers MESSAGE_HANDLERS]
               [--message_skip_bytes MESSAGE_SKIP_BYTES]
               [--prefix_length PREFIX_LENGTH]
               [--message_type_exclusions MESSAGE_TYPE_EXCLUSIONS | --message_type_inclusions MESSAGE_TYPE_INCLUSIONS]
               [--sampling_count SAMPLING_COUNT] [--skip_bytes SKIP_BYTES]
               [--skip_lines SKIP_LINES] [--source_file_endian {big,little}]
               [--output_path OUTPUT_PATH]
               [--output_type {diag,avro,fastavro,bigquery,pubsub,bigquery_terraform,pubsub_terraform,jsonl,length_delimited}]
               [--error_output_path ERROR_OUTPUT_PATH]
               [--lazy_create_resources] [--frame_only] [--stats_only]
               [--create_schemas_only]
               [--destination_project_id DESTINATION_PROJECT_ID]
               [--destination_dataset_id DESTINATION_DATASET_ID]
               [--output_encoding {binary,json}]
               [--create_schema_enforcing_topics | --no-create_schema_enforcing_topics]
               [--continue_on_error]
               [--log {notset,debug,info,warning,error,critical}] [-q] [-v]

Datacast Transcoder process input arguments

options:
  -h, --help            show this help message and exit
  --continue_on_error   Indicates if an exception file should be created, and
                        records continued to be processed upon message level
                        exceptions
  --log {notset,debug,info,warning,error,critical}
                        The default logging level
  -q, --quiet           Suppress message output to console
  -v, --version         show program's version number and exit

Input source arguments:
  --factory {cme,itch,memx,fix}
                        Message factory for decoding
  --schema_file SCHEMA_FILE
                        Path to the schema file
  --source_file SOURCE_FILE
                        Path to the source file
  --source_file_encoding SOURCE_FILE_ENCODING
                        The source file character encoding
  --source_file_format_type {pcap,length_delimited,line_delimited,cme_binary_packet}
                        The source file format
  --base64              Indicates if each individual message extracted from
                        the source is base 64 encoded
  --base64_urlsafe      Indicates if each individual message extracted from
                        the source is base 64 url safe encoded
  --fix_header_tags FIX_HEADER_TAGS
                        Comma delimited list of fix header tags
  --fix_separator FIX_SEPARATOR
                        The unicode int representing the fix message separator
  --message_handlers MESSAGE_HANDLERS
                        Comma delimited list of message handlers in priority
                        order
  --message_skip_bytes MESSAGE_SKIP_BYTES
                        Number of bytes to skip before processing individual
                        messages within a repeated length delimited file
                        message source
  --prefix_length PREFIX_LENGTH
                        How many bytes to use for the length prefix of length-
                        delimited binary sources
  --message_type_exclusions MESSAGE_TYPE_EXCLUSIONS
                        Comma-delimited list of message types to exclude when
                        processing
  --message_type_inclusions MESSAGE_TYPE_INCLUSIONS
                        Comma-delimited list of message types to include when
                        processing
  --sampling_count SAMPLING_COUNT
                        Halt processing after reaching this number of
                        messages. Applied after all Handlers are executed per
                        message
  --skip_bytes SKIP_BYTES
                        Number of bytes to skip before processing the file.
                        Useful for skipping file-level headers
  --skip_lines SKIP_LINES
                        Number of lines to skip before processing the file
  --source_file_endian {big,little}
                        Source file endianness

Output arguments:
  --output_path OUTPUT_PATH
                        Output file path. Defaults to avroOut
  --output_type {diag,avro,fastavro,bigquery,pubsub,bigquery_terraform,pubsub_terraform,jsonl,length_delimited}
                        Output format type
  --error_output_path ERROR_OUTPUT_PATH
                        Error output file path if --continue_on_error flag
                        enabled. Defaults to errorOut
  --lazy_create_resources
                        Flag indicating that output resources for message
                        types should be only created as messages of each type
                        are encountered in the source data. Default behavior
                        is to create resources for each message type before
                        messages are processed. Particularly useful when
                        working with FIX but only processing a limited set of
                        message types in the source data
  --frame_only          Flag indicating that transcoder should only frame
                        messages to an output source
  --stats_only          Flag indicating that transcoder should only report on
                        message type counts without parsing messages further
  --create_schemas_only
                        Flag indicating that transcoder should only create
                        output resource schemas and not output message data

Google Cloud arguments:
  --destination_project_id DESTINATION_PROJECT_ID
                        The Google Cloud project ID for the destination
                        resource

BigQuery arguments:
  --destination_dataset_id DESTINATION_DATASET_ID
                        The BigQuery dataset for the destination. If it does
                        not exist, it will be created

Pub/Sub arguments:
  --output_encoding {binary,json}
                        The encoding of the output
  --create_schema_enforcing_topics, --no-create_schema_enforcing_topics
                        Indicates if Pub/Sub schemas should be created and
                        used to validate messages sent to a topic

消息处理器

txcode支持执行消息处理器类,这些类可以用于有状态地更改正在进行的流和消息。例如,TimestampPullForwardHandler将查找seconds样式的ITCH消息(该消息通知流应用后续消息的当前纪元的秒数),并将该值的最新值追加到所有后续消息中(在seconds消息出现的实例之间。这有助于使用绝对时间戳持久化单个消息,该时间戳需要更少的上下文来解释(即,出站消息包含的不仅仅是“午夜后的纳秒”时间戳。

另一个处理器是SequencerHandler,它将序列号追加到所有出站消息中。这在处理长度分隔的存储格式中的大量消息时很有用,其中包含原始序列号的IP数据包头部已被剥离。

FilterHandler允许您根据消息的特定属性过滤输出。此功能的常见用途是过滤仅涉及特定证券标识符或符号的消息。

以下是一组转换调用,可用于通过交易符号对消息宇宙进行分片。首先,必须使用助记交易符号标识符(stock)从stock_directory消息中查找其相关整数证券标识符(stock_locate)。stock_locate是包含在每条相关消息中的标识符(而不是stock,它存在于某些消息类型中)


txcode --source_file 12302019.NASDAQ_ITCH50 --schema_file totalview-itch-50.xml --message_type_inclusions stock_directory --source_file_format_type length_delimited --factory itch --message_handlers FilterHandler:stock=SPY --sampling_count 1

authenticity: P
etp_flag: Y
etp_leverage_factor: null
financial_status_indicator: ' '
inverse_indicator: null
ipo_flag: ' '
issue_classification: Q
issue_subtype: E
luld_reference_price_tier: '1'
market_category: P
round_lot_size: 100
round_lots_only: N
short_sale_threshold_indicator: N
stock: SPY
stock_locate: 7451
timestamp: 11354508113636
tracking_number: 0

INFO:root:Sampled messages: 1
INFO:root:Message type inclusions: ['stock_directory']
INFO:root:Source message count: 7466
INFO:root:Processed message count: 7451
INFO:root:Transcoded message count: 1
INFO:root:Processed schema count: 1
INFO:root:Summary of message counts: {'stock_directory': 7451}
INFO:root:Summary of error message counts: {}
INFO:root:Message rate: 53260.474108 per second
INFO:root:Total runtime in seconds: 0.140179
INFO:root:Total runtime in minutes: 0.002336

从上述消息中提取stock_locate字段的值允许我们过滤所有该字段/值组合的消息。此外,我们还可以将序列号追加到所有输出的转换消息中。以下组合返回我们用于查找stock_locate代码的原始stock_directory消息,以及流中具有相同stock_locate值的下两条消息


txcode --source_file 12302019.NASDAQ_ITCH50 --schema_file totalview-itch-50.xml --source_file_format_type length_delimited --factory itch --message_handlers FilterHandler:stock_locate=7451,SequencerHandler --sampling_count 3 

authenticity: P
etp_flag: Y
etp_leverage_factor: null
financial_status_indicator: ' '
inverse_indicator: null
ipo_flag: ' '
issue_classification: Q
issue_subtype: E
luld_reference_price_tier: '1'
market_category: P
round_lot_size: 100
round_lots_only: N
sequence_number: 1
short_sale_threshold_indicator: N
stock: SPY
stock_locate: 7451
timestamp: 11354508113636
tracking_number: 0

reason: ''
reserved: ' '
sequence_number: 2
stock: SPY
stock_locate: 7451
timestamp: 11355134575401
tracking_number: 0
trading_state: T

reg_sho_action: '0'
sequence_number: 3
stock: SPY
stock_locate: 7451
timestamp: 11355134599149
tracking_number: 0

INFO:root:Sampled messages: 3
INFO:root:Source message count: 23781
INFO:root:Processed message count: 23781
INFO:root:Transcoded message count: 3
INFO:root:Processed schema count: 21
INFO:root:Summary of message counts: {'system_event': 1, 'stock_directory': 8906, 'stock_trading_action': 7437, 'reg_sho_restriction': 7437, 'market_participant_position': 0, 'mwcb_decline_level': 0, 'ipo_quoting_period_update': 0, 'luld_auction_collar': 0, 'operational_halt': 0, 'add_order_no_attribution': 0, 'add_order_attribution': 0, 'order_executed': 0, 'order_executed_price': 0, 'order_cancelled': 0, 'order_deleted': 0, 'order_replaced': 0, 'trade': 0, 'cross_trade': 0, 'broken_trade': 0, 'net_order_imbalance': 0, 'retail_price_improvement_indicator': 0}
INFO:root:Summary of error message counts: {}
INFO:root:Message rate: 80950.257512 per second
INFO:root:Total runtime in seconds: 0.293773
INFO:root:Total runtime in minutes: 0.004896


处理器指定的语法是

<Handler1>:<Handler1Parameter>=<Handler1Parameter>,<Handler2>

消息处理器在transcoder/message/handler/中部署。

安装

如果您是希望使用CLI或库而不进行更改的用户,您可以使用pip从PyPI安装Market Data Transcoder

pip install market-data-transcoder

在pip安装后,您可以使用以下命令验证转换器是否可用

txcode --help

开发者

如果您想扩展Market Data Transcoder的功能

cd market-data-transcoder
pip install -r requirements.txt

在安装所需的依赖项后,您可以使用以下方式运行转换器

export PYTHONPATH=`pwd`
python ./transcoder/main.py --help

项目详情


下载文件

下载适合您平台的文件。如果您不确定选择哪个,请了解更多关于 安装包 的信息。

源代码分发

market-data-transcoder-1.0.3.tar.gz (67.3 kB 查看哈希值)

上传时间 源代码

构建分发

market_data_transcoder-1.0.3-py3-none-any.whl (129.5 kB 查看哈希值)

上传时间 Python 3

AWS AWS 云计算和安全赞助商 Datadog Datadog 监控 Fastly Fastly CDN Google Google 下载分析 Microsoft Microsoft PSF赞助商 Pingdom Pingdom 监控 Sentry Sentry 错误日志 StatusPage StatusPage 状态页面