跳转到主要内容

Python流处理。Faust的分支

项目描述

faust

Python 流处理分支

python versions version codecov slack Code style: black pre-commit license downloads

安装

pip install faust-streaming

文档

为什么要分叉

我们决定分叉原始的 Faust 项目,因为有一个发布新版本的关键过程,这导致了社区的不可确定性。每个人都可以贡献于这个 分叉,你也可以被添加为维护者。

我们想要

  • 确保持续发布
  • 代码质量
  • 使用最新版本的 kafka 驱动程序(目前仅为 aiokafka
  • 支持 kafka 事务
  • 更新文档

等等...

用法

# Python Streams
# Forever scalable event processing & in-memory durable K/V store;
# as a library w/ asyncio & static typing.
import faust

Faust 是一个流处理库,将 Kafka Streams 的理念移植到 Python。

它被用于 Robinhood,以构建高性能的分布式系统和实时数据处理管道,每天处理数十亿事件。

Faust 提供了 流处理事件处理,与 Kafka StreamsApache SparkStormSamzaFlink 等工具相似。

它不使用 DSL,它只是 Python!这意味着在流处理时,你可以使用你所有的喜欢的 Python 库:NumPy、PyTorch、Pandas、NLTK、Django、Flask、SQLAlchemy 等。

Faust 需要 Python 3.6 或更高版本,以支持新的 async/await 语法和变量类型注解。

以下是一个处理接收到的订单流量的示例

app = faust.App('myapp', broker='kafka://localhost')

# Models describe how messages are serialized:
# {"account_id": "3fae-...", amount": 3}
class Order(faust.Record):
    account_id: str
    amount: int

@app.agent(value_type=Order)
async def order(orders):
    async for order in orders:
        # process infinite stream of orders.
        print(f'Order for {order.account_id}: {order.amount}')

Agent 装饰器定义了一个“流处理器”,它本质上是从 Kafka 主题中消费并对其接收到的每个事件执行某些操作。

Agent 是一个 async def 函数,因此也可以执行其他异步操作,例如网络请求。

这个系统可以持久化状态,就像数据库一样。表是以名称命名的分布式键/值存储,你可以将其用作常规 Python 字典。

表在每台机器上使用用 C++ 编写的超快嵌入式数据库 RocksDB 本地存储。

表还可以存储聚合计数,这些计数可以选择性地“窗口化”,这样你可以跟踪“过去一天的点击次数”或“过去一小时的点击次数”等。例如,与 Kafka Streams 一样,我们支持滚动、跳跃和滑动窗口,并且可以过期旧窗口以防止数据填满。

为了可靠性,我们使用 Kafka 主题作为“预写日志”。每当更改键时,我们都向更改日志发布。备用节点从该更改日志中消费,以保持数据的精确副本,并在任何节点失败时启用即时恢复。

对于用户来说,表只是一个字典,但数据在重启之间持久化,并且在节点之间复制,以便在故障转移时其他节点可以自动接管。

你可以按 URL 计算页面浏览量

# data sent to 'clicks' topic sharded by URL key.
# e.g. key="http://example.com" value="1"
click_topic = app.topic('clicks', key_type=str, value_type=int)

# default value for missing URL will be 0 with `default=int`
counts = app.Table('click_counts', default=int)

@app.agent(click_topic)
async def count_click(clicks):
    async for url, count in clicks.items():
        counts[url] += count

发送到 Kafka 主题的数据是分区的,这意味着点击将按 URL 进行分片,以便相同的 URL 的每个计数都发送到相同的 Faust 工作实例。

Faust 支持任何类型的流数据:字节、Unicode 和序列化结构,但它还包含使用现代 Python 语法来描述流中键和值的序列化的“模型”。

# Order is a json serialized dictionary,
# having these fields:

class Order(faust.Record):
    account_id: str
    product_id: str
    price: float
    quantity: float = 1.0

orders_topic = app.topic('orders', key_type=str, value_type=Order)

@app.agent(orders_topic)
async def process_order(orders):
    async for order in orders:
        # process each order using regular Python
        total_price = order.price * order.quantity
        await send_order_received_email(order.account_id, order)

Faust 是静态类型的,使用 mypy 类型检查器,因此你可以在编写应用程序时利用静态类型。

Faust 的源代码很小,组织良好,是学习 Kafka Streams 实现的良好资源。

简介 简介页面 上了解更多关于 Faust 的信息,了解系统要求、安装说明、社区资源等。

或直接访问 快速入门 教程,通过编写流应用程序来查看Faust的实际应用。

然后浏览 用户指南,获取按主题组织的深入信息。

本地开发

  1. 克隆项目
  2. 创建虚拟环境: python3.7 -m venv venv && source venv/bin/activate
  3. 安装需求: ./scripts/install
  4. 运行代码检查: ./scripts/lint
  5. 运行测试: ./scripts/tests

Faust要点

简单

Faust极易使用。与其他流处理解决方案相比,您需要复杂的hello-world项目和基础设施需求才能入门。Faust只需Kafka,其余的都是Python,所以如果您知道Python,您已经可以使用Faust进行流处理,并且它可以与几乎所有东西集成。

以下是一个易于创建的应用程序示例:

import faust

class Greeting(faust.Record):
    from_name: str
    to_name: str

app = faust.App('hello-app', broker='kafka://localhost')
topic = app.topic('hello-topic', value_type=Greeting)

@app.agent(topic)
async def hello(greetings):
    async for greeting in greetings:
        print(f'Hello from {greeting.from_name} to {greeting.to_name}')

@app.timer(interval=1.0)
async def example_sender(app):
    await hello.send(
        value=Greeting(from_name='Faust', to_name='you'),
    )

if __name__ == '__main__':
    app.main()

您可能对asyncawait关键字感到有些害怕,但您不必了解asyncio的工作原理即可使用Faust:只需模仿示例,您就会没事的。

示例应用程序启动两个任务:一个是处理流,另一个是发送事件到该流的后台线程。在实际应用中,您的系统将向Kafka主题发布事件,您的处理器可以从这些主题中消费,而后台线程仅用于向我们的示例中注入数据。

高可用性

Faust具有高可用性,并且可以生存网络问题和服务器崩溃。在节点故障的情况下,它可以自动恢复,并且表具有备用节点将接管。

分布式

根据需要启动更多实例。

快速

单个核心的Faust工作实例可以每秒处理数万事件,我们合理地确信,一旦我们能够支持更优化的Kafka客户端,吞吐量将会增加。

灵活

Faust只是Python,流是一个无限异步迭代器。如果您知道如何使用Python,您就已经知道如何使用Faust,并且它可以使用您喜欢的Python库,如Django、Flask、SQLAlchemy、NLTK、NumPy、SciPy、TensorFlow等。

Faust还定义了一组可以用来安装Faust和特定功能依赖项的setuptools扩展。

您可以在需求中指定这些,或者在使用pip命令行时通过使用方括号来指定。用逗号分隔多个包

pip install "faust-streaming[rocksdb]"

pip install "faust-streaming[rocksdb,uvloop,fast,redis,aerospike]"

以下包可用

带有额外功能的Faust

存储

RocksDB

用于使用RocksDB存储Faust表状态。生产推荐。

pip install faust-streaming[rocksdb](使用RocksDB 6)

pip install faust-streaming[rocksdict](使用RocksDB 8,不与6向后兼容)

Aerospike

pip install faust-streaming[aerospike]用于使用Aerospike存储Faust表状态。如果支持则推荐

Aerospike配置

可以通过指定 store="aerospike://" 来启用Aerospike作为状态存储。

默认情况下,所有由Aerospike支持的表使用 use_partitioner=True 并生成类似于由RocksDB支持的状态存储的更改日志主题事件。以下配置选项应作为键传递给Tablenamespace 参数:aerospike命名空间。

ttl:表中所有KV的TTL。

username:连接到Aerospike集群的用户名。

password:连接到Aerospike集群的密码。

hosts:如aerospike客户端中指定,主机参数。

policies:读取/写入/扫描的不同策略策略

client:一个包含上述定义的 hostpolicies 的字典

缓存

使用 faust-streaming[redis] 以使用 Redis 作为简单的缓存后端(类似Memcached)。

编解码器

使用 faust-streaming[yaml] 在流中使用 YAML 和 PyYAML 库。

优化

使用 faust-streaming[fast] 安装所有可用的C速度提升扩展到Faust核心。

传感器

使用 faust-streaming[datadog] 使用 Faust 监视器。

使用 faust-streaming[statsd] 使用 Faust 监视器。

使用 faust-streaming[prometheus] 使用 Faust 监视器。

事件循环

使用 faust-streaming[uvloop]uvloop 一起使用Faust。

使用 faust-streaming[eventlet]eventlet 一起使用Faust。

调试

使用 faust-streaming[debug] 使用 aiomonitor 连接和调试正在运行的Faust工作进程。

当安装了 setproctitle 模块时,使用 faust-streaming[setproctitle],Faust工作进程将使用它来在 ps/top 列表中设置更美观的进程名称。也安装了 fastdebug 包。

从源代码下载和安装

https://pypi.ac.cn/project/faust-streaming/ 下载Faust的最新版本。

你可以通过执行以下命令来安装它:

$ tar xvfz faust-streaming-0.0.0.tar.gz
$ cd faust-streaming-0.0.0
$ python setup.py build
# python setup.py install

如果当前没有使用虚拟环境,则必须以特权用户身份执行最后一个命令。

使用开发版本

使用pip

你可以使用以下 pip 命令安装Faust的最新快照

pip install https://github.com/faust-streaming/faust/zipball/master#egg=faust

常见问题解答(FAQ)

我可以使用Faust与Django/Flask等一起使用吗?

是的!使用 eventlet 作为与 asyncio 集成的桥梁。

使用eventlet

此方法与任何可以与 eventlet 一起工作的阻塞Python库兼容。

使用 eventlet 需要你安装 faust-aioeventlet 模块,并且你可以将此模块与Faust一起安装为包。

pip install -U faust-streaming[eventlet]

然后,要实际使用eventlet作为事件循环,你必须使用 faust 程序的 -L <faust --loop> 参数

faust -L eventlet -A myproj worker -l info

或者在你的入口脚本顶部添加 import mode.loop.eventlet

#!/usr/bin/env python3
import mode.loop.eventlet  # noqa

这非常重要,它必须位于模块的顶部,并且在导入库之前执行。

我可以使用Faust与Tornado一起使用吗?

是的!使用 tornado.platform.asyncio 桥梁

我可以使用Faust与Twisted一起使用吗?

是的!使用 asyncio 反应器实现:https://twistedmatrix.com/documents/current/api/twisted.internet.asyncioreactor.html

你会支持Python 2.7或Python 3.5吗?

不会。Faust需要Python 3.8或更高版本,因为它大量使用在Python 3.6中引入的特性(asyncawait,变量类型注解)。

我在本地运行Faust应用时遇到RocksDB的最大打开文件数超出错误。我该如何修复它?

你可能需要增加最大打开文件数的限制。在macOS和Linux上,你可以使用

ulimit -n max_open_files 来将打开文件限制增加到max_open_files。

在 Docker 中,你可以使用 --ulimit 标志

docker run --ulimit nofile=50000:100000 <image-tag> 其中 50000 是软限制,100000 是硬限制 查看区别

Faust 支持的 Kafka 版本

Faust 支持 Kafka 版本 >= 0.10。

获取帮助

Slack

有关 Faust 的使用、开发和未来的讨论,请加入 fauststream Slack。

资源

错误跟踪器

如果您有任何建议、错误报告或烦恼,请通过我们的错误跟踪器 https://github.com/faust-streaming/faust/issues/ 报告。

许可

本软件受 New BSD 许可证 许可。请参阅顶级分发目录中的 LICENSE 文件以获取完整的许可文本。

贡献

Faust 的开发在 GitHub 进行。

我们强烈鼓励您参与 Faust 的开发。

行为准则

所有与项目代码库、错误跟踪器、聊天室和邮件列表互动的人都应遵守 Faust 的行为准则。

作为这些项目的贡献者和维护者,为了培养一个开放和欢迎的社区,我们承诺尊重通过报告问题、发布功能请求、更新文档、提交拉取请求或补丁等方式贡献的所有人。

我们致力于使这些项目的参与成为每个人的无骚扰体验,无论经验水平、性别、性别认同和表达、性取向、残疾、个人外观、体型、种族、民族、年龄、宗教或国籍。

参与者不可接受的行为包括

  • 使用性化语言或图像
  • 个人攻击
  • 骚扰或侮辱性评论
  • 公开或私下的骚扰
  • 在未明确许可的情况下发布他人的私人信息,例如物理或电子地址
  • 其他不道德或不专业的行为。

项目维护者有权和义务删除、编辑或拒绝不符合此行为准则的评论、提交、代码、维基编辑、问题和其他贡献。通过采用此行为准则,项目维护者承诺公平和一致地应用这些原则于管理此项目的各个方面。不遵守或执行行为准则的项目维护者可能被永久移除出项目团队。

此行为准则适用于项目空间内和个体代表项目或其社区时的公共空间。

可以通过打开问题或联系一个或多个项目维护者来报告滥用、骚扰或其他不可接受的行为。

项目详情


发布历史 发布通知 | RSS 源

下载文件

下载适用于您平台的文件。如果您不确定选择哪个,请了解有关 安装包 的更多信息。

源代码分发

faust-streaming-0.11.3.tar.gz (759.2 kB 查看哈希值)

上传时间 源代码

构建分发

faust_streaming-0.11.3-cp311-cp311-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.2 MB 查看哈希值)

上传时间 CPython 3.11 manylinux: glibc 2.17+ x86-64 manylinux: glibc 2.5+ x86-64

faust_streaming-0.11.3-cp311-cp311-macosx_11_0_arm64.whl (514.7 kB 查看哈希值)

上传时间 CPython 3.11 macOS 11.0+ ARM64

faust_streaming-0.11.3-cp311-cp311-macosx_10_9_x86_64.whl (518.1 kB 查看哈希值)

上传时间 CPython 3.11 macOS 10.9+ x86-64

faust_streaming-0.11.3-cp310-cp310-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.2 MB 查看哈希值)

上传时间 CPython 3.10 manylinux: glibc 2.17+ x86-64 manylinux: glibc 2.5+ x86-64

faust_streaming-0.11.3-cp310-cp310-macosx_11_0_arm64.whl (514.6 kB 查看哈希值)

上传时间 CPython 3.10 macOS 11.0+ ARM64

faust_streaming-0.11.3-cp310-cp310-macosx_10_9_x86_64.whl (517.8 kB 查看哈希值)

上传时间 CPython 3.10 macOS 10.9+ x86-64

faust_streaming-0.11.3-cp39-cp39-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.2 MB 查看哈希值)

上传于 CPython 3.9 manylinux: glibc 2.17+ x86-64 manylinux: glibc 2.5+ x86-64

faust_streaming-0.11.3-cp39-cp39-macosx_11_0_arm64.whl (515.4 kB 查看哈希值)

上传于 CPython 3.9 macOS 11.0+ ARM64

faust_streaming-0.11.3-cp39-cp39-macosx_10_9_x86_64.whl (518.5 kB 查看哈希值)

上传于 CPython 3.9 macOS 10.9+ x86-64

faust_streaming-0.11.3-cp38-cp38-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.2 MB 查看哈希值)

上传于 CPython 3.8 manylinux: glibc 2.17+ x86-64 manylinux: glibc 2.5+ x86-64

faust_streaming-0.11.3-cp38-cp38-macosx_11_0_arm64.whl (516.4 kB 查看哈希值)

上传于 CPython 3.8 macOS 11.0+ ARM64

faust_streaming-0.11.3-cp38-cp38-macosx_10_9_x86_64.whl (519.8 kB 查看哈希值)

上传于 CPython 3.8 macOS 10.9+ x86-64

由以下支持