Python流处理。Faust的分支
项目描述
Python 流处理分支
安装
pip install faust-streaming
文档
简介
: https://faust-streaming.github.io/faust/introduction.html快速入门
: https://faust-streaming.github.io/faust/playbooks/quickstart.html用户指南
: https://faust-streaming.github.io/faust/userguide/index.html
为什么要分叉
我们决定分叉原始的 Faust
项目,因为有一个发布新版本的关键过程,这导致了社区的不可确定性。每个人都可以贡献于这个 分叉
,你也可以被添加为维护者。
我们想要
- 确保持续发布
- 代码质量
- 使用最新版本的 kafka 驱动程序(目前仅为 aiokafka)
- 支持 kafka 事务
- 更新文档
等等...
用法
# Python Streams
# Forever scalable event processing & in-memory durable K/V store;
# as a library w/ asyncio & static typing.
import faust
Faust 是一个流处理库,将 Kafka Streams
的理念移植到 Python。
它被用于 Robinhood
,以构建高性能的分布式系统和实时数据处理管道,每天处理数十亿事件。
Faust 提供了 流处理 和 事件处理,与 Kafka Streams
、Apache Spark
、Storm
、Samza
、Flink
等工具相似。
它不使用 DSL,它只是 Python!这意味着在流处理时,你可以使用你所有的喜欢的 Python 库:NumPy、PyTorch、Pandas、NLTK、Django、Flask、SQLAlchemy 等。
Faust 需要 Python 3.6 或更高版本,以支持新的 async/await
语法和变量类型注解。
以下是一个处理接收到的订单流量的示例
app = faust.App('myapp', broker='kafka://localhost')
# Models describe how messages are serialized:
# {"account_id": "3fae-...", amount": 3}
class Order(faust.Record):
account_id: str
amount: int
@app.agent(value_type=Order)
async def order(orders):
async for order in orders:
# process infinite stream of orders.
print(f'Order for {order.account_id}: {order.amount}')
Agent 装饰器定义了一个“流处理器”,它本质上是从 Kafka 主题中消费并对其接收到的每个事件执行某些操作。
Agent 是一个 async def
函数,因此也可以执行其他异步操作,例如网络请求。
这个系统可以持久化状态,就像数据库一样。表是以名称命名的分布式键/值存储,你可以将其用作常规 Python 字典。
表在每台机器上使用用 C++ 编写的超快嵌入式数据库 RocksDB
本地存储。
表还可以存储聚合计数,这些计数可以选择性地“窗口化”,这样你可以跟踪“过去一天的点击次数”或“过去一小时的点击次数”等。例如,与 Kafka Streams
一样,我们支持滚动、跳跃和滑动窗口,并且可以过期旧窗口以防止数据填满。
为了可靠性,我们使用 Kafka 主题作为“预写日志”。每当更改键时,我们都向更改日志发布。备用节点从该更改日志中消费,以保持数据的精确副本,并在任何节点失败时启用即时恢复。
对于用户来说,表只是一个字典,但数据在重启之间持久化,并且在节点之间复制,以便在故障转移时其他节点可以自动接管。
你可以按 URL 计算页面浏览量
# data sent to 'clicks' topic sharded by URL key.
# e.g. key="http://example.com" value="1"
click_topic = app.topic('clicks', key_type=str, value_type=int)
# default value for missing URL will be 0 with `default=int`
counts = app.Table('click_counts', default=int)
@app.agent(click_topic)
async def count_click(clicks):
async for url, count in clicks.items():
counts[url] += count
发送到 Kafka 主题的数据是分区的,这意味着点击将按 URL 进行分片,以便相同的 URL 的每个计数都发送到相同的 Faust 工作实例。
Faust 支持任何类型的流数据:字节、Unicode 和序列化结构,但它还包含使用现代 Python 语法来描述流中键和值的序列化的“模型”。
# Order is a json serialized dictionary,
# having these fields:
class Order(faust.Record):
account_id: str
product_id: str
price: float
quantity: float = 1.0
orders_topic = app.topic('orders', key_type=str, value_type=Order)
@app.agent(orders_topic)
async def process_order(orders):
async for order in orders:
# process each order using regular Python
total_price = order.price * order.quantity
await send_order_received_email(order.account_id, order)
Faust 是静态类型的,使用 mypy
类型检查器,因此你可以在编写应用程序时利用静态类型。
Faust 的源代码很小,组织良好,是学习 Kafka Streams
实现的良好资源。
在 简介
简介页面 上了解更多关于 Faust 的信息,了解系统要求、安装说明、社区资源等。
或直接访问 快速入门
教程,通过编写流应用程序来查看Faust的实际应用。
然后浏览 用户指南
,获取按主题组织的深入信息。
Robinhood
: http://robinhood.comasync/await
:https://medium.freecodecamp.org/a-guide-to-asynchronous-programming-in-python-with-asyncio-232e2afa44f6Celery
: http://celeryproject.orgKafka Streams
: https://kafka.apache.org/documentation/streamsApache Spark
: https://spark.apache.ac.cnStorm
: http://storm.apache.orgSamza
: http://samza.apache.orgFlink
: http://flink.apache.orgRocksDB
: http://rocksdb.orgAerospike
: https://www.aerospike.com/Apache Kafka
: https://kafka.apache.org
本地开发
- 克隆项目
- 创建虚拟环境:
python3.7 -m venv venv && source venv/bin/activate
- 安装需求:
./scripts/install
- 运行代码检查:
./scripts/lint
- 运行测试:
./scripts/tests
Faust要点
简单
Faust极易使用。与其他流处理解决方案相比,您需要复杂的hello-world项目和基础设施需求才能入门。Faust只需Kafka,其余的都是Python,所以如果您知道Python,您已经可以使用Faust进行流处理,并且它可以与几乎所有东西集成。
以下是一个易于创建的应用程序示例:
import faust
class Greeting(faust.Record):
from_name: str
to_name: str
app = faust.App('hello-app', broker='kafka://localhost')
topic = app.topic('hello-topic', value_type=Greeting)
@app.agent(topic)
async def hello(greetings):
async for greeting in greetings:
print(f'Hello from {greeting.from_name} to {greeting.to_name}')
@app.timer(interval=1.0)
async def example_sender(app):
await hello.send(
value=Greeting(from_name='Faust', to_name='you'),
)
if __name__ == '__main__':
app.main()
您可能对async
和await
关键字感到有些害怕,但您不必了解asyncio
的工作原理即可使用Faust:只需模仿示例,您就会没事的。
示例应用程序启动两个任务:一个是处理流,另一个是发送事件到该流的后台线程。在实际应用中,您的系统将向Kafka主题发布事件,您的处理器可以从这些主题中消费,而后台线程仅用于向我们的示例中注入数据。
高可用性
Faust具有高可用性,并且可以生存网络问题和服务器崩溃。在节点故障的情况下,它可以自动恢复,并且表具有备用节点将接管。
分布式
根据需要启动更多实例。
快速
单个核心的Faust工作实例可以每秒处理数万事件,我们合理地确信,一旦我们能够支持更优化的Kafka客户端,吞吐量将会增加。
灵活
Faust只是Python,流是一个无限异步迭代器。如果您知道如何使用Python,您就已经知道如何使用Faust,并且它可以使用您喜欢的Python库,如Django、Flask、SQLAlchemy、NLTK、NumPy、SciPy、TensorFlow等。
包
Faust还定义了一组可以用来安装Faust和特定功能依赖项的setuptools
扩展。
您可以在需求中指定这些,或者在使用pip
命令行时通过使用方括号来指定。用逗号分隔多个包
pip install "faust-streaming[rocksdb]"
pip install "faust-streaming[rocksdb,uvloop,fast,redis,aerospike]"
以下包可用
带有额外功能的Faust
存储
RocksDB
用于使用RocksDB
存储Faust表状态。生产推荐。
pip install faust-streaming[rocksdb]
(使用RocksDB 6)
pip install faust-streaming[rocksdict]
(使用RocksDB 8,不与6向后兼容)
Aerospike
pip install faust-streaming[aerospike]
用于使用Aerospike
存储Faust表状态。如果支持则推荐
Aerospike配置
可以通过指定 store="aerospike://"
来启用Aerospike作为状态存储。
默认情况下,所有由Aerospike支持的表使用 use_partitioner=True
并生成类似于由RocksDB支持的状态存储的更改日志主题事件。以下配置选项应作为键传递给Table 的 namespace
参数:aerospike命名空间。
ttl
:表中所有KV的TTL。
username
:连接到Aerospike集群的用户名。
password
:连接到Aerospike集群的密码。
hosts
:如aerospike客户端中指定,主机参数。
policies
:读取/写入/扫描的不同策略策略
client
:一个包含上述定义的 host
和 policies
的字典
缓存
使用 faust-streaming[redis]
以使用 Redis
作为简单的缓存后端(类似Memcached)。
编解码器
使用 faust-streaming[yaml]
在流中使用 YAML 和 PyYAML
库。
优化
使用 faust-streaming[fast]
安装所有可用的C速度提升扩展到Faust核心。
传感器
使用 faust-streaming[datadog]
使用 Faust 监视器。
使用 faust-streaming[statsd]
使用 Faust 监视器。
使用 faust-streaming[prometheus]
使用 Faust 监视器。
事件循环
使用 faust-streaming[uvloop]
与 uvloop
一起使用Faust。
使用 faust-streaming[eventlet]
与 eventlet
一起使用Faust。
调试
使用 faust-streaming[debug]
使用 aiomonitor
连接和调试正在运行的Faust工作进程。
当安装了 setproctitle
模块时,使用 faust-streaming[setproctitle]
,Faust工作进程将使用它来在 ps
/top
列表中设置更美观的进程名称。也安装了 fast
和 debug
包。
从源代码下载和安装
从https://pypi.ac.cn/project/faust-streaming/ 下载Faust的最新版本。
你可以通过执行以下命令来安装它:
$ tar xvfz faust-streaming-0.0.0.tar.gz
$ cd faust-streaming-0.0.0
$ python setup.py build
# python setup.py install
如果当前没有使用虚拟环境,则必须以特权用户身份执行最后一个命令。
使用开发版本
使用pip
你可以使用以下 pip
命令安装Faust的最新快照
pip install https://github.com/faust-streaming/faust/zipball/master#egg=faust
常见问题解答(FAQ)
我可以使用Faust与Django/Flask等一起使用吗?
是的!使用 eventlet
作为与 asyncio
集成的桥梁。
使用eventlet
此方法与任何可以与 eventlet
一起工作的阻塞Python库兼容。
使用 eventlet
需要你安装 faust-aioeventlet
模块,并且你可以将此模块与Faust一起安装为包。
pip install -U faust-streaming[eventlet]
然后,要实际使用eventlet作为事件循环,你必须使用 faust
程序的 -L <faust --loop>
参数
faust -L eventlet -A myproj worker -l info
或者在你的入口脚本顶部添加 import mode.loop.eventlet
#!/usr/bin/env python3
import mode.loop.eventlet # noqa
这非常重要,它必须位于模块的顶部,并且在导入库之前执行。
我可以使用Faust与Tornado一起使用吗?
是的!使用 tornado.platform.asyncio
桥梁
我可以使用Faust与Twisted一起使用吗?
是的!使用 asyncio
反应器实现:https://twistedmatrix.com/documents/current/api/twisted.internet.asyncioreactor.html
你会支持Python 2.7或Python 3.5吗?
不会。Faust需要Python 3.8或更高版本,因为它大量使用在Python 3.6中引入的特性(async
,await
,变量类型注解)。
我在本地运行Faust应用时遇到RocksDB的最大打开文件数超出错误。我该如何修复它?
你可能需要增加最大打开文件数的限制。在macOS和Linux上,你可以使用
ulimit -n max_open_files
来将打开文件限制增加到max_open_files。
在 Docker 中,你可以使用 --ulimit 标志
docker run --ulimit nofile=50000:100000 <image-tag>
其中 50000 是软限制,100000 是硬限制 查看区别。
Faust 支持的 Kafka 版本
Faust 支持 Kafka 版本 >= 0.10。
获取帮助
Slack
有关 Faust 的使用、开发和未来的讨论,请加入 fauststream
Slack。
- https://fauststream.slack.com
- 注册: https://join.slack.com/t/fauststreaming/shared_invite/zt-1q1jhq4kh-Q1t~rJgpyuMQ6N38cByE9g
资源
错误跟踪器
如果您有任何建议、错误报告或烦恼,请通过我们的错误跟踪器 https://github.com/faust-streaming/faust/issues/ 报告。
许可
本软件受 New BSD 许可证
许可。请参阅顶级分发目录中的 LICENSE
文件以获取完整的许可文本。
贡献
Faust
的开发在 GitHub 进行。
我们强烈鼓励您参与 Faust
的开发。
行为准则
所有与项目代码库、错误跟踪器、聊天室和邮件列表互动的人都应遵守 Faust 的行为准则。
作为这些项目的贡献者和维护者,为了培养一个开放和欢迎的社区,我们承诺尊重通过报告问题、发布功能请求、更新文档、提交拉取请求或补丁等方式贡献的所有人。
我们致力于使这些项目的参与成为每个人的无骚扰体验,无论经验水平、性别、性别认同和表达、性取向、残疾、个人外观、体型、种族、民族、年龄、宗教或国籍。
参与者不可接受的行为包括
- 使用性化语言或图像
- 个人攻击
- 骚扰或侮辱性评论
- 公开或私下的骚扰
- 在未明确许可的情况下发布他人的私人信息,例如物理或电子地址
- 其他不道德或不专业的行为。
项目维护者有权和义务删除、编辑或拒绝不符合此行为准则的评论、提交、代码、维基编辑、问题和其他贡献。通过采用此行为准则,项目维护者承诺公平和一致地应用这些原则于管理此项目的各个方面。不遵守或执行行为准则的项目维护者可能被永久移除出项目团队。
此行为准则适用于项目空间内和个体代表项目或其社区时的公共空间。
可以通过打开问题或联系一个或多个项目维护者来报告滥用、骚扰或其他不可接受的行为。
项目详情
下载文件
下载适用于您平台的文件。如果您不确定选择哪个,请了解有关 安装包 的更多信息。
源代码分发
构建分发
faust-streaming-0.11.3.tar.gz 的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 3488be5dec17dcd4613dfa2600bb2bf83ebadb05de4620dca791ed4a0b7bd650 |
|
MD5 | 25a582f8756808877a763413fb64f454 |
|
BLAKE2b-256 | 84f662d1c034a246178fafaa4aafb51d823a5e3103149120bcec731f0ea0bfdb |
faust_streaming-0.11.3-cp311-cp311-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl 的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | c0eb3438bc05e77446ce5e8c0ed0929d983a3f29e4db12a58c084416041c2aea |
|
MD5 | 326a1941fd3b945c816c23062cf0d79c |
|
BLAKE2b-256 | 86d03cef9ab2cbda3223fa920909e1ec135f334283343a0ef42a58cf56080735 |
faust_streaming-0.11.3-cp311-cp311-macosx_11_0_arm64.whl 的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | c3b26283253f94b8d6e73f7184e2b91dd33c3661a35ef0111efed6e403f54f54 |
|
MD5 | dc73469ff30785494d0d266ee7de9d83 |
|
BLAKE2b-256 | 1e5bb2251f781db100676ed5efe79714ccf31acab8c83a19eb91eb431f1d7b27 |
faust_streaming-0.11.3-cp311-cp311-macosx_10_9_x86_64.whl 的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 115d274ec3366a2b03ee85fd9b25aeeaa8389ae77dd8d029559a99e8e7a401cb |
|
MD5 | 18c21e87b60b7657ef023246f9296941 |
|
BLAKE2b-256 | 7145ac219344d5b48ab8dae05a8179dfcf9ca1d62c71d3c658f7b30e4f60d5bf |
faust_streaming-0.11.3-cp310-cp310-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl 的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | c3b112b1642c2ae952600eb1b5d873d1cb66ee568019ddd9ede6bd1e1afb15e3 |
|
MD5 | 8da89c019c3f9b4b71299b8870630fe6 |
|
BLAKE2b-256 | ad61edb75b1771b36855b955e696dbae2b7929e48ed7156d495a937b1d3130d3 |
faust_streaming-0.11.3-cp310-cp310-macosx_11_0_arm64.whl 的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | bd06c9f97727032a90597f92c0f06514286011c47038c1b606005449b0526033 |
|
MD5 | eb15a013ccb0d1c4c830942aa78357cb |
|
BLAKE2b-256 | d675cfab604fb8853e56df1d78c1925770a9145420ba902c8b1528f9337d9873 |
哈希值 for faust_streaming-0.11.3-cp310-cp310-macosx_10_9_x86_64.whl
算法 | 哈希摘要 | |
---|---|---|
SHA256 | ce60311f1130351c86ffe42139462f182ac308b744a37f879e336c31e9661daa |
|
MD5 | cd6bfb7ab8d38df28c1ac815dc3b0429 |
|
BLAKE2b-256 | 04c403dea4f5b06a4a13a0d138991605693ac89c562f388f5e8eba834c2ec1bf |
哈希值 for faust_streaming-0.11.3-cp39-cp39-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 15f025b695981162cd4c199881bb3b0b9f0fb0e8f42575b87ada61349013055a |
|
MD5 | 8e0e9b9db4c409154d63da47c3bec027 |
|
BLAKE2b-256 | 961d372eb9391e26da44d3837bcb335f5b66507b0b4b447d9778b3da43e7f32f |
哈希值 for faust_streaming-0.11.3-cp39-cp39-macosx_11_0_arm64.whl
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 7eace309ba7f164ef6c06bcdcda2e96b87bb658cee652f0732f9f6dc1dafc05f |
|
MD5 | 864fa26ae22e0c1d53f534eb408144cc |
|
BLAKE2b-256 | bf3f102376799aadbe07b0cdfd8aeadee760627d038eb796741ccc3729a16e31 |
哈希值 for faust_streaming-0.11.3-cp39-cp39-macosx_10_9_x86_64.whl
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 53a05931c04a0d6db9e49faeb54afbd5bd25c4d6332eda711aaa9df1acc8446f |
|
MD5 | c1b3c56bd94ccf04b3add55ecb33c990 |
|
BLAKE2b-256 | 4ee035b5b5b40e2ef54790222548ff28159c54a2c9839a8e8368560816b7fbf3 |
哈希值 for faust_streaming-0.11.3-cp38-cp38-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 989e88c8e7a267728dd20c671b5834e3bfbd67867dd32dfd7008819a3c3c011e |
|
MD5 | fad66f230d59e68c373d6d26823f5350 |
|
BLAKE2b-256 | 9e1e391094c27be4bbc9bcc8a6bca54d8b241d2a9677d4be7fa385b8ee528197 |
哈希值 for faust_streaming-0.11.3-cp38-cp38-macosx_11_0_arm64.whl
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 1e1bb745e461ea1ff5125ff57ca3d83618b7f4fbb108dea4308ef081294ee04d |
|
MD5 | f22dc049bc50e9567410593928c8236a |
|
BLAKE2b-256 | 5c5c1d389cf5657388da94193d9e9ea992c444228bd1a9ae892c52bc9963ceb0 |
哈希值 for faust_streaming-0.11.3-cp38-cp38-macosx_10_9_x86_64.whl
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 5b28eaa52f2b4ee131e8a36f451159d2d255d7583439b642e932084061b22e96 |
|
MD5 | 46a3433df3436f79e6a2cbfafe853d27 |
|
BLAKE2b-256 | ea80483ba332fdbb5ae2f3200f1449035c41fbbb07e6eeb3443af4627643c63f |