跳转到主要内容

Trio的Redis异步客户端

项目描述

RedIO - Redis for Trio

RedIO是一个Python 3.7+模块,用于在基于Trio库的异步程序中使用Redis数据库。

pip install redio

此模块尚未准备用于生产使用,所有API都可能发生变化。它与我的应用程序兼容,并且性能大致与其他Python Redis模块相同。

常规模式(高级API)

import redio, trio

# Initialise a connection pool
redis = redio.Redis("redis://localhost/")

async def main():
    some, another = await redis().get("somekey").get("anotherkey")
    print("Got values:", some, another)

trio.run(main)

大多数常规Redis命令都可用,并且可以按如下顺序调用,或者如果更方便,可以使用变量

db = redis()  # Get a DB object
db.get("bar")
db.set("bar", "value").expire("bar", 0.5)  # Automatically deleted after 500 ms
db.get("bar")
old_bar, expire, bar = await db

所有命令都排队,并在下一个await时发送到服务器,这提高了性能,尤其是在Redis服务器不在本地主机上时,因为消除了不必要的服务器往返,并且通常所有内容都适合单个数据包。

响应以与命令相同的顺序返回为列表,请注意,例如set这样的命令不会产生任何输出。

哈希键

Redis键可以包含包含字段名称和值的字典。RedIO hset 允许通过 关键字参数 指定字段

await redis().hset(
  "hashkey",
  field1=bytes([255, 0, 255]),
  field2="text",
  field3=1.23,
)

除了关键字参数外,还可以使用 dict。同样,hgetall 返回的值也以字典的形式返回

>>> await redis().hgetall("hashkey").autodecode
{
  'field1': b'\xff\x00\xff',
  'field2': 'text',
  'field3': 1.23,
}

事务

多/执行事务允许在没有其他客户端运行任何命令的情况下执行原子操作。以下示例中,该事务原子性地增加键foo和bar的值并返回它们的新值

>>> await redis().multi().incr("foo").incr("bar").exec()
[1, 1]

注意:一旦Redis事务开始,就无法中止和撤销正在进行的操作。服务器将尝试执行所有命令,即使在出现错误之后。

在事务之前可以使用一个或多个WATCH命令,通过使用检查和设置实现乐观锁,如果在事务中监视的任何键被修改,则丢弃事务。通常,会尝试再次执行操作,直到成功为止。

db = redis()

# Inverts the capitalization of foo (sets "DEFAULT value" if foo does not exist)
while True:
    db.watch("foo")
    foo = await db.get("foo") or b"default VALUE"
    db.multi()
    db.set("foo", foo.swapcase())
    if await db.exec():
        break

如果事务被丢弃,则exec返回False。否则,返回命令响应列表或True。在这个例子中,因为事务中只有一个set命令,它不会产生任何输出,所以总是返回布尔值。

发布/订阅频道

发送消息

消息通过正常的publish命令进行发布。

await redis().publish("channel", "message")

接收消息

可以通过在连接池上调用pubsub来创建接收连接。

async for message in redis.pubsub("channel"):
    print(message)

可以通过在PubSub对象上使用subscribepsubscribe命令订阅更多频道,并在创建对象时通过调用redis.pubsub()指定零个或多个初始频道。

默认情况下,只会接收消息。当在同一个PubSub接收器上订阅多个频道时,接收频道名称可能也很有用,这可以通过使用.with_channel修改器来实现。与标准接口一样,所有命令和修改器都可以链式调用或单独调用,因为它们都返回self

pubsub = redis.pubsub().strdecode.with_channel
pubsub.subscribe("foo", "bar")
pubsub.psubscribe("chan*")

async for channel, message in pubsub:
    print(message, "from", channel)

您可以使用await pubsub而不是async for来接收单个消息。

字节编码和解码

命令编码

Redis命令只接受字节,没有其他数据类型。任何非字节参数都会自动编码(字符串、数字、json)。

db = redis()
db.set("binary", bytes([128, 0, 255]))
db.set("number", 10)
db.set("jsonkey", dict(foo=123, bar=[1, 2, 3]))
await db

响应解码

键(如字段名称和频道名称)始终解码为str,解码模式只影响值(内容)的处理。许多Redis协议命令也响应带有类型整数、字符串或列表的响应,它们不受此影响。

提供了三种用于原始字节值的解码模式。默认情况下,值不进行解码。其他模式由修改器.strdecode.autodecode启用,它们仅影响下一个await。Pub/Sub模式不会重置其解码设置,因此一旦初始设置,它们就会持续存在。

默认(不解码)

>>> await db.get("binary").get("number").get("jsonkey")
[
  b"\x80\x00\xFF",
  b"10",
  b'{"foo": 123, "bar": [1, 2, 3]}'
]

.strdecode

>>> await db.get("binary").get("number").get("jsonkey").strdecode
[
  '\udc80\x00\udcff',
  '10',
  '{"foo": 123, "bar": [1, 2, 3]}',
]

所有值都解码为str,无效的UTF-8序列由Unicode代理值替换。

.autodecode

>>> await db.get("binary").get("number").get("jsonkey").autodecode
[
  b"\x80\x00\xFF",
  10,
  {'foo': 123, 'bar': [1, 2, 3]},
]

自动解码模式尝试根据内容猜测正确的格式。这在您知道数据仅是JSON或数字时非常有用。任意二进制或字符串数据可能被意外地进一步解码,超出其应有的范围。

异步安全性

请注意,redis对象可以由多个异步工作器共享,但每个工作器都必须通过调用它来获取单独的连接,就像示例中那样。

可以将连接存储在变量中并用于多个依赖于彼此的命令,例如事务。此模块尝试跟踪连接是否可重用,因此可以将其返回到连接池。

可以使用.prevent_pooling修改器在DB对象上使用,以防止其连接在使用后被池化。

连接URL

没有单独的参数用于主机名、端口号等。相反,所有设置都编码在传递给Redis()的URL中。使用类似于其他Redis模块的格式。一些示例

  • redis://localhost/ - 默认设置(localhost:6379,数据库0,无认证)
  • redis://:password@localhost/2 - 密码认证,使用数据库2
  • rediss://secure.cloud/redis+tls://secure.cloud/ - 这两者相同:安全连接
  • redis+unix:///var/run/redis.sock?database=2 - UNIX套接字连接必须使用三个斜杠
  • redis+unix+tls://hostname.on.certificate/tmp/redis.sock - 为什么要在Unix套接字上使用TLS?

项目详情


下载文件

下载适用于您平台的文件。如果您不确定选择哪个,请了解更多关于安装包的信息。

源代码分发

redio-1.0.0.tar.gz (18.8 kB 查看哈希值)

上传时间 源代码

由以下支持