市场数据转码器
项目描述
Google Cloud Datacast Solution
将高性能交易所数据流导入Google Cloud
这不是官方的Google产品或服务
简介
数据流转换器(Datacast transcoder)是一个基于模式、面向消息的工具,用于简化将常见的高性能电子交易数据格式无损地导入到Google Cloud中。
电子交易场所具有特殊的数据表示和分发需求。特别是,由于交易场所处理的大量交易,高效的消息表示是一个高优先级。云原生API通常使用JSON作为消息负载,但使用高上下文编码表示消息所需的额外字节在计费计算环境中具有成本影响。
与JSON、YAML甚至CSV不同,二进制编码的数据是低上下文且非自我描述的——解释二进制消息的说明必须由生产者分别提前提供,并由解释器遵循。
转换器的架构依赖于以下几个主要抽象,如下所述
模式
模式(也称为数据字典)类似于API规范,但它描述的不是API端点合同,而是系统之间流动的二进制消息的代表性格式。最接近的比较可能类似于SQL数据定义语言支持的表定义,但这些模式既用于运动中的数据,也用于静止的数据。
转换器的当前输入模式支持包括简单二进制编码(SBE)XML以及QuickFIX风格的FIX协议模式表示(也在XML中)。
目标模式和数据元素将根据指定的output_type
进行渲染。如果没有指定输出类型,转换器默认将显示转换消息的YAML表示到控制台,并且不执行持久的模式转换。对于Avro和JSON,转换的模式和数据文件封装在本地的POSIX文件中。支持直接转换为BigQuery和Pub/Sub目标,转换的模式在消息导入或发布之前应用。也可以从指定的输入模式推导出BigQuery和Pub/Sub资源的Terraform配置。Terraform选项仅在本地上渲染配置,并不执行Terraform apply
。--create_schemas_only
选项单独转换其他输出类型的模式。
输出资源的名称将分别对应输入模式中定义的消息类型名称。例如,转换器将为在源数据中找到的FIX NewOrderSingle
消息创建并使用名为“NewOrderSingle”的Pub/Sub主题进行发布。同样,如果选择输出类型为bigquery
,转换器将在由--destination_dataset_id
指定的数据集中创建一个NewOrderSingle
表。默认情况下,Avro和JSON编码的输出将保存到使用--output_path
参数指定的目录中名为<message type>
的文件中,具有相应的扩展名。
消息
消息代表两个共享模式的系统之间的离散交互。每条消息都将符合模式中定义的单一消息类型。可以通过传递逗号分隔的消息类型名称字符串到--message_type_exclusions
和--message_type_inclusions
参数来包含或排除特定消息类型以进行处理。
编码
编码描述了消息负载的内容如何表示给系统。许多熟悉的编码,如JSON、YAML或CSV,是自我描述的,并且不严格要求应用程序使用单独的模式定义。然而,SBE、Avro和Protocol Buffers等二进制编码要求应用程序使用相关的模式来正确解释消息。
转换器支持的输入编码包括SBE二进制和ASCII编码(tag=value)FIX。Pub/Sub消息负载的输出编码可以是Avro二进制或Avro JSON。本地文件可以是Avro或JSON。
转换器支持使用--base64
和--base64_urlsafe
选项对消息进行base64解码。
传输
消息传输描述了系统之间传输消息的机制。这可能包括数据在运动中的传输,例如以太网网络,或者数据在静态中的传输,例如位于POSIX文件系统上的文件或位于云存储中的对象。原始消息字节必须从特定的传输中解帧,例如长度分隔的文件或数据包捕获文件。
转换器当前支持的入站消息源传输是PCAP文件、长度分隔的二进制文件和换行分隔的ASCII文件。多播UDP和Pub/Sub入站传输在路线图上。
出站传输选项包括本地存储的Avro和JSON POSIX文件、Pub/Sub主题或BigQuery表。如果没有指定output_type
,转换后的消息将输出到控制台,并以YAML编码,不会自动持久化。此外,可以将指定模式的Google Cloud资源定义封装在Terraform配置中。
消息工厂
消息工厂从输入源读取消息负载,确定从模式应用相关的消息类型,并在转换之前对消息数据进行任何调整。例如,消息生产者可能使用非标准的SBE标题或元数据,您可能希望删除或转换。对于标准的FIX标签/值输入源,可以使用包含的fix
消息工厂。
CLI用法
usage: txcode [-h] [--factory {cme,itch,memx,fix}]
[--schema_file SCHEMA_FILE] [--source_file SOURCE_FILE]
[--source_file_encoding SOURCE_FILE_ENCODING]
--source_file_format_type
{pcap,length_delimited,line_delimited,cme_binary_packet}
[--base64 | --base64_urlsafe]
[--fix_header_tags FIX_HEADER_TAGS]
[--fix_separator FIX_SEPARATOR]
[--message_handlers MESSAGE_HANDLERS]
[--message_skip_bytes MESSAGE_SKIP_BYTES]
[--prefix_length PREFIX_LENGTH]
[--message_type_exclusions MESSAGE_TYPE_EXCLUSIONS | --message_type_inclusions MESSAGE_TYPE_INCLUSIONS]
[--sampling_count SAMPLING_COUNT] [--skip_bytes SKIP_BYTES]
[--skip_lines SKIP_LINES] [--source_file_endian {big,little}]
[--output_path OUTPUT_PATH]
[--output_type {diag,avro,fastavro,bigquery,pubsub,bigquery_terraform,pubsub_terraform,jsonl,length_delimited}]
[--error_output_path ERROR_OUTPUT_PATH]
[--lazy_create_resources] [--frame_only] [--stats_only]
[--create_schemas_only]
[--destination_project_id DESTINATION_PROJECT_ID]
[--destination_dataset_id DESTINATION_DATASET_ID]
[--output_encoding {binary,json}]
[--create_schema_enforcing_topics | --no-create_schema_enforcing_topics]
[--continue_on_error]
[--log {notset,debug,info,warning,error,critical}] [-q] [-v]
Datacast Transcoder process input arguments
options:
-h, --help show this help message and exit
--continue_on_error Indicates if an exception file should be created, and
records continued to be processed upon message level
exceptions
--log {notset,debug,info,warning,error,critical}
The default logging level
-q, --quiet Suppress message output to console
-v, --version show program's version number and exit
Input source arguments:
--factory {cme,itch,memx,fix}
Message factory for decoding
--schema_file SCHEMA_FILE
Path to the schema file
--source_file SOURCE_FILE
Path to the source file
--source_file_encoding SOURCE_FILE_ENCODING
The source file character encoding
--source_file_format_type {pcap,length_delimited,line_delimited,cme_binary_packet}
The source file format
--base64 Indicates if each individual message extracted from
the source is base 64 encoded
--base64_urlsafe Indicates if each individual message extracted from
the source is base 64 url safe encoded
--fix_header_tags FIX_HEADER_TAGS
Comma delimited list of fix header tags
--fix_separator FIX_SEPARATOR
The unicode int representing the fix message separator
--message_handlers MESSAGE_HANDLERS
Comma delimited list of message handlers in priority
order
--message_skip_bytes MESSAGE_SKIP_BYTES
Number of bytes to skip before processing individual
messages within a repeated length delimited file
message source
--prefix_length PREFIX_LENGTH
How many bytes to use for the length prefix of length-
delimited binary sources
--message_type_exclusions MESSAGE_TYPE_EXCLUSIONS
Comma-delimited list of message types to exclude when
processing
--message_type_inclusions MESSAGE_TYPE_INCLUSIONS
Comma-delimited list of message types to include when
processing
--sampling_count SAMPLING_COUNT
Halt processing after reaching this number of
messages. Applied after all Handlers are executed per
message
--skip_bytes SKIP_BYTES
Number of bytes to skip before processing the file.
Useful for skipping file-level headers
--skip_lines SKIP_LINES
Number of lines to skip before processing the file
--source_file_endian {big,little}
Source file endianness
Output arguments:
--output_path OUTPUT_PATH
Output file path. Defaults to avroOut
--output_type {diag,avro,fastavro,bigquery,pubsub,bigquery_terraform,pubsub_terraform,jsonl,length_delimited}
Output format type
--error_output_path ERROR_OUTPUT_PATH
Error output file path if --continue_on_error flag
enabled. Defaults to errorOut
--lazy_create_resources
Flag indicating that output resources for message
types should be only created as messages of each type
are encountered in the source data. Default behavior
is to create resources for each message type before
messages are processed. Particularly useful when
working with FIX but only processing a limited set of
message types in the source data
--frame_only Flag indicating that transcoder should only frame
messages to an output source
--stats_only Flag indicating that transcoder should only report on
message type counts without parsing messages further
--create_schemas_only
Flag indicating that transcoder should only create
output resource schemas and not output message data
Google Cloud arguments:
--destination_project_id DESTINATION_PROJECT_ID
The Google Cloud project ID for the destination
resource
BigQuery arguments:
--destination_dataset_id DESTINATION_DATASET_ID
The BigQuery dataset for the destination. If it does
not exist, it will be created
Pub/Sub arguments:
--output_encoding {binary,json}
The encoding of the output
--create_schema_enforcing_topics, --no-create_schema_enforcing_topics
Indicates if Pub/Sub schemas should be created and
used to validate messages sent to a topic
消息处理器
txcode
支持执行消息处理器类,这些类可以用于有状态地更改正在进行的流和消息。例如,TimestampPullForwardHandler
将查找seconds
样式的ITCH消息(该消息通知流应用后续消息的当前纪元的秒数),并将该值的最新值追加到所有后续消息中(在seconds
消息出现的实例之间。这有助于使用绝对时间戳持久化单个消息,该时间戳需要更少的上下文来解释(即,出站消息包含的不仅仅是“午夜后的纳秒”时间戳。
另一个处理器是SequencerHandler
,它将序列号追加到所有出站消息中。这在处理长度分隔的存储格式中的大量消息时很有用,其中包含原始序列号的IP数据包头部已被剥离。
FilterHandler
允许您根据消息的特定属性过滤输出。此功能的常见用途是过滤仅涉及特定证券标识符或符号的消息。
以下是一组转换调用,可用于通过交易符号对消息宇宙进行分片。首先,必须使用助记交易符号标识符(stock
)从stock_directory
消息中查找其相关整数证券标识符(stock_locate
)。stock_locate
是包含在每条相关消息中的标识符(而不是stock
,它存在于某些消息类型中)
txcode --source_file 12302019.NASDAQ_ITCH50 --schema_file totalview-itch-50.xml --message_type_inclusions stock_directory --source_file_format_type length_delimited --factory itch --message_handlers FilterHandler:stock=SPY --sampling_count 1
authenticity: P
etp_flag: Y
etp_leverage_factor: null
financial_status_indicator: ' '
inverse_indicator: null
ipo_flag: ' '
issue_classification: Q
issue_subtype: E
luld_reference_price_tier: '1'
market_category: P
round_lot_size: 100
round_lots_only: N
short_sale_threshold_indicator: N
stock: SPY
stock_locate: 7451
timestamp: 11354508113636
tracking_number: 0
INFO:root:Sampled messages: 1
INFO:root:Message type inclusions: ['stock_directory']
INFO:root:Source message count: 7466
INFO:root:Processed message count: 7451
INFO:root:Transcoded message count: 1
INFO:root:Processed schema count: 1
INFO:root:Summary of message counts: {'stock_directory': 7451}
INFO:root:Summary of error message counts: {}
INFO:root:Message rate: 53260.474108 per second
INFO:root:Total runtime in seconds: 0.140179
INFO:root:Total runtime in minutes: 0.002336
从上述消息中提取stock_locate
字段的值允许我们过滤所有该字段/值组合的消息。此外,我们还可以将序列号追加到所有输出的转换消息中。以下组合返回我们用于查找stock_locate
代码的原始stock_directory
消息,以及流中具有相同stock_locate
值的下两条消息
txcode --source_file 12302019.NASDAQ_ITCH50 --schema_file totalview-itch-50.xml --source_file_format_type length_delimited --factory itch --message_handlers FilterHandler:stock_locate=7451,SequencerHandler --sampling_count 3
authenticity: P
etp_flag: Y
etp_leverage_factor: null
financial_status_indicator: ' '
inverse_indicator: null
ipo_flag: ' '
issue_classification: Q
issue_subtype: E
luld_reference_price_tier: '1'
market_category: P
round_lot_size: 100
round_lots_only: N
sequence_number: 1
short_sale_threshold_indicator: N
stock: SPY
stock_locate: 7451
timestamp: 11354508113636
tracking_number: 0
reason: ''
reserved: ' '
sequence_number: 2
stock: SPY
stock_locate: 7451
timestamp: 11355134575401
tracking_number: 0
trading_state: T
reg_sho_action: '0'
sequence_number: 3
stock: SPY
stock_locate: 7451
timestamp: 11355134599149
tracking_number: 0
INFO:root:Sampled messages: 3
INFO:root:Source message count: 23781
INFO:root:Processed message count: 23781
INFO:root:Transcoded message count: 3
INFO:root:Processed schema count: 21
INFO:root:Summary of message counts: {'system_event': 1, 'stock_directory': 8906, 'stock_trading_action': 7437, 'reg_sho_restriction': 7437, 'market_participant_position': 0, 'mwcb_decline_level': 0, 'ipo_quoting_period_update': 0, 'luld_auction_collar': 0, 'operational_halt': 0, 'add_order_no_attribution': 0, 'add_order_attribution': 0, 'order_executed': 0, 'order_executed_price': 0, 'order_cancelled': 0, 'order_deleted': 0, 'order_replaced': 0, 'trade': 0, 'cross_trade': 0, 'broken_trade': 0, 'net_order_imbalance': 0, 'retail_price_improvement_indicator': 0}
INFO:root:Summary of error message counts: {}
INFO:root:Message rate: 80950.257512 per second
INFO:root:Total runtime in seconds: 0.293773
INFO:root:Total runtime in minutes: 0.004896
处理器指定的语法是
<Handler1>:<Handler1Parameter>=<Handler1Parameter>,<Handler2>
消息处理器在transcoder/message/handler/
中部署。
安装
如果您是希望使用CLI或库而不进行更改的用户,您可以使用pip从PyPI安装Market Data Transcoder
pip install market-data-transcoder
在pip安装后,您可以使用以下命令验证转换器是否可用
txcode --help
开发者
如果您想扩展Market Data Transcoder的功能
cd market-data-transcoder
pip install -r requirements.txt
在安装所需的依赖项后,您可以使用以下方式运行转换器
export PYTHONPATH=`pwd`
python ./transcoder/main.py --help
项目详情
下载文件
下载适合您平台的文件。如果您不确定选择哪个,请了解更多关于 安装包 的信息。
源代码分发
构建分发
market-data-transcoder-1.0.3.tar.gz 的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 561ddadb47ac34773f5d70d46255b8aca5d0911021a45add67546fd0fc3e72fd |
|
MD5 | 73cb05f69d6e0a825ebe5702c1885104 |
|
BLAKE2b-256 | ff208a4a7fede94018ef39875f14f8261b49e359e5d44c9f2fec30333a28a8e5 |
market_data_transcoder-1.0.3-py3-none-any.whl 的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | e16a326823c0f31894447daeabc2454aec421b150ec95f2ccccbb23aa7638a0f |
|
MD5 | b4caf146e69543192e696f5fe3702c82 |
|
BLAKE2b-256 | bd47472db0db67a18826983c75a2adb8666015b32b0d8b7084d209a3bcd70014 |