Python流处理。
项目描述
Python流处理
- 版本::
1.10.4
- 网站::
- 下载::
- 源码::
- 关键词::
分布式, 流, 异步, 处理, 数据, 队列, 状态管理
# Python Streams
# Forever scalable event processing & in-memory durable K/V store;
# as a library w/ asyncio & static typing.
import faust
Faust 是一个流处理库,将来自 Kafka Streams 的想法移植到 Python。
在Robinhood中,它被用于构建高性能分布式系统以及实时数据处理管道,每天处理数十亿事件。
Faust提供了流处理和事件处理功能,与Kafka Streams、Apache Spark/Storm/Samza/Flink等工具有相似之处。
Faust不使用DSL,它仅使用Python!这意味着您可以在流处理中使用所有喜欢的Python库:NumPy、PyTorch、Pandas、NLTK、Django、Flask、SQLAlchemy等。
Faust需要Python 3.6或更高版本的新async/await语法和变量类型注解。
以下是一个处理传入订单流的示例
app = faust.App('myapp', broker='kafka://localhost')
# Models describe how messages are serialized:
# {"account_id": "3fae-...", "amount": 3}
class Order(faust.Record):
account_id: str
amount: int
@app.agent(value_type=Order)
async def order(orders):
async for order in orders:
# process infinite stream of orders.
print(f'Order for {order.account_id}: {order.amount}')
Agent装饰器定义了一个“流处理器”,它本质上从Kafka主题中消费数据,并为每个接收到的事件执行某些操作。
该代理是一个async def函数,因此还可以异步执行其他操作,如网络请求。
该系统可以持久化状态,类似于数据库。表是命名分布式键/值存储,可以像常规Python字典一样使用。
表存储在每个机器上本地使用C++编写的超快速嵌入式数据库,称为RocksDB。
表还可以存储聚合计数,这些计数可以是可选的“窗口化”,因此您可以跟踪“过去一天的点击次数”或“过去一小时的点击次数”等。例如。像Kafka Streams一样,我们支持滑动、跳跃和时间窗口,并且可以过期旧窗口以防止数据溢出。
为了可靠性,我们使用Kafka主题作为“预写日志”。每当键更改时,都会发布到变更日志。备用节点从该变更日志中消费以保持数据的精确副本,并在任何节点失败时实现即时恢复。
对用户来说,表只是一个字典,但数据在重启之间持久化,并在节点之间复制,因此故障转移时其他节点可以自动接管。
您可以根据URL计算页面浏览量
# data sent to 'clicks' topic sharded by URL key.
# e.g. key="http://example.com" value="1"
click_topic = app.topic('clicks', key_type=str, value_type=int)
# default value for missing URL will be 0 with `default=int`
counts = app.Table('click_counts', default=int)
@app.agent(click_topic)
async def count_click(clicks):
async for url, count in clicks.items():
counts[url] += count
发送到Kafka主题的数据是分区的,这意味着点击将通过URL进行分片,以便每个相同URL的计数都将发送到同一Faust工作实例。
Faust支持任何类型的流数据:字节、Unicode和序列化结构,同时还提供“模型”,使用现代Python语法描述流中的键和值的序列化方式。
# Order is a json serialized dictionary,
# having these fields:
class Order(faust.Record):
account_id: str
product_id: str
price: float
quantity: float = 1.0
orders_topic = app.topic('orders', key_type=str, value_type=Order)
@app.agent(orders_topic)
async def process_order(orders):
async for order in orders:
# process each order using regular Python
total_price = order.price * order.quantity
await send_order_received_email(order.account_id, order)
Faust是静态类型化的,使用mypy类型检查器,因此您可以在编写应用程序时利用静态类型。
Faust的源代码体积小,组织良好,是学习Kafka Streams实现的好资源。
- 在 介绍 页面 了解更多关于Faust的信息
阅读更多关于Faust、系统要求、安装说明、社区资源等内容。
- 或直接前往 快速入门 教程
通过编程流应用程序来查看Faust的实际操作。
- 然后探索 用户指南
以主题组织深入信息。
Faust是…
- 简单
Faust极易于使用。要开始使用其他流处理解决方案,您需要复杂的hello-world项目和基础设施要求。Faust只需要Kafka,其余的都是Python,因此如果您知道Python,就可以使用Faust进行流处理,并且它可以与几乎所有东西集成。
这是一个比较容易制作的应用之一
import faust class Greeting(faust.Record): from_name: str to_name: str app = faust.App('hello-app', broker='kafka://localhost') topic = app.topic('hello-topic', value_type=Greeting) @app.agent(topic) async def hello(greetings): async for greeting in greetings: print(f'Hello from {greeting.from_name} to {greeting.to_name}') @app.timer(interval=1.0) async def example_sender(app): await hello.send( value=Greeting(from_name='Faust', to_name='you'), ) if __name__ == '__main__': app.main()
你可能对< cite>async cite>和< cite>await cite>关键字感到有些害怕,但你不必了解< span class="docutils literal">asyncio span>是如何工作的才能使用Faust:只需模仿示例,一切都会顺利。
示例应用启动了两个任务:一个处理流,另一个是发送事件到该流的后台线程。在实际应用中,您的系统将发布事件到Kafka主题,您的处理器可以从中消费,后台线程只需要向我们的示例中填充数据。
- 高可用性
Faust具有高可用性,并且可以应对网络问题和服务器崩溃。在节点失败的情况下,它可以自动恢复,并且表有备用节点来接管。
- 分布式
根据需要启动更多实例的应用程序。
- 快速
单个核心的Faust工作实例已经可以每秒处理数万条事件,我们合理地相信,一旦我们能够支持更优化的Kafka客户端,吞吐量将进一步提高。
- 灵活
Faust只是Python,流是一个无限异步迭代器。如果您知道如何使用Python,那么您已经知道如何使用Faust,并且它与您最喜欢的Python库(如Django、Flask、SQLAlchemy、NTLK、NumPy、SciPy、TensorFlow等)兼容。
安装
您可以通过Python包索引(PyPI)或从源代码安装Faust。
使用< cite>pip cite>安装
$ pip install -U faust
捆绑
Faust还定义了一系列< span class="docutils literal">setuptools span>扩展,可用于安装Faust和特定功能的依赖项。
您可以在要求或< span class="docutils literal">pip span>命令行中通过使用方括号来指定这些扩展。使用逗号分隔多个捆绑包。
$ pip install "faust[rocksdb]"
$ pip install "faust[rocksdb,uvloop,fast,redis]"
以下捆绑包可用
存储
- < span class="docutils literal">faust[rocksdb] span>< span class="colon">: span>
用于使用< a href="http://rocksdb.org" rel="nofollow">RocksDB a>存储Faust表状态。
在生产中推荐使用。
缓存
- < span class="docutils literal">faust[redis] span>< span class="colon">: span>
用于使用< cite>Redis cite>作为简单的缓存后端(Memcached样式)。
编解码器
- < span class="docutils literal">faust[yaml] span>< span class="colon">: span>
用于在流中使用YAML和< span class="docutils literal">PyYAML span>库。
优化
- < span class="docutils literal">faust[fast] span>< span class="colon">: span>
用于安装所有可用的C速度提升扩展到Faust核心。
传感器
- < span class="docutils literal">faust[datadog] span>< span class="colon">: span>
用于使用Datadog Faust监视器。
- < span class="docutils literal">faust[statsd] span>< span class="colon">: span>
用于使用Statsd Faust监视器。
事件循环
- < span class="docutils literal">faust[uvloop] span>< span class="colon">: span>
用于使用Faust与< span class="docutils literal">uvloop span>。
- < span class="docutils literal">faust[eventlet] span>< span class="colon">: span>
用于使用Faust与< span class="docutils literal">eventlet span>
调试
- < span class="docutils literal">faust[debug] span>< span class="colon">: span>
用于使用< span class="docutils literal">aiomonitor span>连接和调试正在运行的Faust工作进程。
- < span class="docutils literal">faust[setproctitle] span>< span class="colon">: span>
当安装了< span class="docutils literal">setproctitle span>模块时,Faust工作进程将使用它来在< span class="docutils literal">ps span>/< span class="docutils literal">top span>列表中设置更美观的进程名称。也包含在< span class="docutils literal">fast span>和< span class="docutils literal">debug span>捆绑包中。
从源代码下载和安装
从https://pypi.ac.cn/project/faust下载Faust的最新版本
您可以通过以下操作安装它
$ tar xvfz faust-0.0.0.tar.gz
$ cd faust-0.0.0
$ python setup.py build
# python setup.py install
如果您当前没有使用virtualenv,则必须以特权用户身份执行最后一个命令。
使用开发版本
使用pip
您可以使用以下pip命令安装Faust的最新快照:
$ pip install https://github.com/robinhood/faust/zipball/master#egg=faust
常见问题解答
我可以使用Faust与Django/Flask等一起使用吗?
是的!使用eventlet作为与asyncio集成的桥梁。
使用eventlet
此方法适用于任何可以与eventlet一起工作的阻塞Python库。
使用eventlet需要您安装aioeventlet模块,并且您可以将其作为Faust的包一起安装
$ pip install -U faust[eventlet]
然后,要实际使用eventlet作为事件循环,您必须使用-faust --loop
$ faust -L eventlet -A myproj worker -l info
或在其入口脚本顶部添加import mode.loop.eventlet
#!/usr/bin/env python3
import mode.loop.eventlet # noqa
我可以使用Faust与Tornado一起使用吗?
是的!使用tornado.platform.asyncio桥梁:[https://tornado.pythonlang.cn/en/stable/asyncio.html](https://tornado.pythonlang.cn/en/stable/asyncio.html)
我可以使用Faust与Twisted一起使用吗?
是的!使用asyncio反应器实现:[https://twistedmatrix.com/documents/17.1.0/api/twisted.internet.asyncioreactor.html](https://twistedmatrix.com/documents/17.1.0/api/twisted.internet.asyncioreactor.html)
您会支持Python 2.7或Python 3.5吗?
不会。Faust需要Python 3.6或更高版本,因为它大量使用了Python 3.6中引入的功能(async,await,变量类型注解)。
当我在本地运行Faust应用程序时,RocksDB报错“超过最大打开文件数”。我该如何修复?
您可能需要增加最大打开文件数的限制。以下文章解释了如何在OS X上这样做:[https://blog.dekstroza.io/ulimit-shenanigans-on-osx-el-capitan/](https://blog.dekstroza.io/ulimit-shenanigans-on-osx-el-capitan/)
Faust支持哪些kafka版本?
Faust支持版本大于等于0.10的kafka。
获取帮助
Slack
有关Faust的使用、开发和未来的讨论,请加入fauststream Slack。
[https://fauststream.slack.com](https://fauststream.slack.com)
注册:[https://join.slack.com/t/fauststream/shared_invite/enQtNDEzMTIyMTUyNzU2LTIyMjNjY2M2YzA2OWFhMDlmMzVkODk3YTBlYThlYmZiNTUwZDJlYWZiZTdkN2Q4ZGU4NWM4YWMyNTM5MGQ5OTg](https://join.slack.com/t/fauststream/shared_invite/enQtNDEzMTIyMTUyNzU2LTIyMjNjY2M2YzA2OWFhMDlmMzVkODk3YTBlYThlYmZiNTUwZDJlYWZiZTdkN2Q4ZGU4NWM4YWMyNTM5MGQ5OTg)
资源
错误跟踪器
如果您有任何建议、错误报告或烦恼,请向我们的错误跟踪器报告:[https://github.com/robinhood/faust/issues/](https://github.com/robinhood/faust/issues/)
许可证
本软件根据新BSD许可证授权。有关完整的许可证文本,请参阅顶层分发目录中的LICENSE文件。
贡献
Faust的开发在GitHub上发生:[https://github.com/robinhood/faust](https://github.com/robinhood/faust)
我们强烈鼓励您参与Faust的开发。
请务必阅读文档中的为Faust做出贡献部分。
行为准则
所有与项目代码库、错误跟踪器、聊天室和邮件列表互动的人都需要遵守Faust行为准则。
作为这些项目的贡献者和维护者,为了培养一个开放和欢迎的社区,我们承诺尊重所有通过报告问题、发布功能请求、更新文档、提交拉取请求或补丁以及进行其他活动来做出贡献的人。
我们致力于使所有参与者(无论经验水平、性别、性别认同与表达、性取向、残疾、个人外貌、体型、种族、民族、年龄、宗教或国籍)都能在这些项目中享受到无骚扰的参与体验。
参与者不可接受的行为示例包括
使用性化的语言或图像
个人攻击
骚扰或侮辱性评论
公开或私下骚扰
未经明确许可发布他人的私人信息,如物理或电子地址
其他不道德或不专业的行为。
项目负责人有权利和责任删除、编辑或拒绝不符合此行为准则的评论、提交、代码、维基编辑、问题和其他贡献。通过采用此行为准则,项目负责人承诺将公平且一致地应用这些原则于管理此项目的各个方面。不遵守或执行行为准则的项目负责人可能会被永久地从项目团队中移除。
此行为准则适用于项目空间内以及个人代表项目或其社区时的公共空间。
滥用、骚扰或其他不可接受的行为可以通过提交问题或联系项目负责人之一或多个来报告。
此行为准则改编自贡献者誓言,版本1.2.0,可在http://contributor-covenant.org/version/1/2/0/找到。
项目详情
下载文件
下载适合您平台的文件。如果您不确定选择哪个,请了解更多关于安装包的信息。
源代码发行版
构建发行版
faust-1.10.4.tar.gz 的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | ffcd350ea29d528f6814fc9a42b5e50e130310da054a93e9d8216ef89a254611 |
|
MD5 | 0331755f10904deaaa35a97d8513b8ab |
|
BLAKE2b-256 | 17376f3babe6a3cfa9cdccaf3e7f65bbba19b132486113372fe46627bf303c2c |
faust-1.10.4-py2.py3-none-any.whl 的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | abaade164bde21cd5f41dff24a203ff91f2c935c849c8e6a807f854bf84ea77d |
|
MD5 | 88be65d73630d0bede9c2a189e1e1bc5 |
|
BLAKE2b-256 | 79f83fec4f5c3e5bf1ce8bb557ae507525253fa30a5cfc5984f342b931143f75 |
faust-1.10.4-cp38-cp38-macosx_10_14_x86_64.whl 的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | cfd47e756825eb8c6d197c1a2f25199aef2895cd31b522c74ec2dbb95dfa6fcc |
|
MD5 | 625631f1cfa569738ea194689ef9da27 |
|
BLAKE2b-256 | 4f3b9956f716834aa4591e83d8abe592fc819b16bc32174f2995bcb04b95c321 |
faust-1.10.4-cp37-cp37m-macosx_10_9_x86_64.whl 的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 4ae94762a16c3ef70aa1e77772ff94395c2068ecbb99e1acce252d0f1156bd2c |
|
MD5 | 18d7b09ac0f1bde944c7c9186896b384 |
|
BLAKE2b-256 | 42e3d2b09f142efbb2e29f7b98cc5375f5a8891b3675b81ba35357ed464d7876 |
faust-1.10.4-cp36-cp36m-macosx_10_13_x86_64.whl 的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 128dc0b9483aa4009edcc8b23f5c132757f2329c5da1fcc144d4c1d1dd63f156 |
|
MD5 | f32f569ff9d259b13b72a1463215f305 |
|
BLAKE2b-256 | 706e2752702b6a0e98a12fb293697d7656eb985b44a8ac0faea44ffde175e485 |