跳转到主要内容

Python的非阻塞redis客户端

项目描述

txredisapi

Build Status

有关最新源代码,请参阅 http://github.com/IlyaSkriblovsky/txredisapi

txredisapi 是一个用Python编写的redis数据库非阻塞客户端驱动程序。它使用 Twisted 与redis进行异步通信。

它最初是从原始的 redis协议twisted 分支出来的,并演变成一个更健壮、更可靠、更完整的解决方案,适用于像Web服务器这样的应用程序。这些类型的应用程序通常需要一个具有多个redis服务器的容错连接池,这使得轻松开发和维护分布式系统成为可能。

支持大多数 redis命令,以及其他功能,如静默重新连接、连接池和自动分片。

此驱动程序作为 cyclone Web框架的一部分进行分发。

变更日志

请参阅 CHANGELOG.md

功能

  • 连接池
  • 懒连接
  • 自动分片
  • 自动重连
  • 使用Redis Sentinel进行连接
  • 发布/订阅(Pub/Sub)
  • 事务
  • Unix 套接字连接

安装

请注意,txredisapi.py 是纯 Python 编写的,仅在一个文件中。因此,您绝对不需要安装它。只需将其复制到您的项目目录中,即可开始使用。

最新源代码位于 https://github.com/IlyaSkriblovsky/txredisapi

如果您有 cyclone,您可能已经有了它。尝试以下操作

$ python
>>> import cyclone.redis
>>> cyclone.redis.version
'1.0'

但是,如果您真的坚持要安装,可以从 PyPI 获取

pip install txredisapi

单元测试

Twisted Trial 单元测试可用。只需启动 Redis,然后运行 trial ./tests。如果 Redis 中禁用了 Unix 套接字,它将静默跳过这些测试。

请确保在测试后运行 redis-cli flushall 以清理 Redis。

用法

首先要做的是选择您想要的连接类型。该驱动程序支持单个连接、连接池、分片连接(基于内置的一致性哈希算法自动分布)、分片连接池,而且所有这些不同类型都可以是 懒加载 的,这将在稍后解释(因为我现在很懒)。

基本上,您想要普通的连接,用于简单的批处理客户端,这些客户端连接到 Redis,执行一些命令然后断开连接——如爬虫等。

示例

#!/usr/bin/env python
# coding: utf-8

import txredisapi as redis

from twisted.internet import defer
from twisted.internet import reactor


@defer.inlineCallbacks
def main():
    rc = yield redis.Connection()
    print rc

    yield rc.set("foo", "bar")
    v = yield rc.get("foo")
    print "foo:", repr(v)

    yield rc.disconnect()


if __name__ == "__main__":
    main().addCallback(lambda ign: reactor.stop())
    reactor.run()

可以轻松地在 redis.Connection()redis.ConnectionPool() 之间切换,而无需修改程序的逻辑。

以下是连接到 Redis 所支持的所有方法:

Connection(host, port, dbid, reconnect, charset)
lazyConnection(host, port, dbid, reconnect, charset)

ConnectionPool(host, port, dbid, poolsize, reconnect, charset)
lazyConnectionPool(host, port, dbid, poolsize, reconnect, charset)

ShardedConnection(hosts, dbid, reconnect, charset)
lazyShardedConnection(hosts, dbid, reconnect, charset)

ShardedConnectionPool(hosts, dbid, poolsize, reconnect, charset)
lazyShardedConnectionPool(hosts, dbid, poolsize, reconnect, charset)

UnixConnection(path, dbid, reconnect, charset)
lazyUnixConnection(path, dbid, reconnect, charset)

UnixConnectionPool(unix, dbid, poolsize, reconnect, charset)
lazyUnixConnectionPool(unix, dbid, poolsize, reconnect, charset)

ShardedUnixConnection(paths, dbid, reconnect, charset)
lazyShardedUnixConnection(paths, dbid, reconnect, charset)

ShardedUnixConnectionPool(paths, dbid, poolsize, reconnect, charset)
lazyShardedUnixConnectionPool(paths, dbid, poolsize, reconnect, charset)

参数如下

  • host:Redis 服务器的 IP 地址或主机名。[默认:localhost]
  • port:Redis 服务器的端口号。[默认:6379]
  • path:Redis 服务器套接字的路径。[默认:/tmp/redis.sock]
  • dbid:Redis 服务器的数据库 ID。[默认:0]
  • poolsize:要建立的连接数量。[默认:10]
  • reconnect:如果连接丢失,自动重连。[默认:True]
  • charset:字符串编码。如果为 None,则不解码/编码字符串。[默认:utf-8]
  • hosts(用于分片):一组 host:port 对。[默认:None]
  • paths(用于分片):一组 pathnames。[默认:None]
  • password:Redis 服务的密码。[默认:None]
  • ssl_context_factory:表示是否使用 SSL/TLS 的布尔值或特定的 ClientContextFactory。[默认:False]

连接处理程序

所有连接方法在某个时候都返回一个连接处理程序对象。

普通连接(非懒加载)返回一个延迟对象,在连接建立后触发连接处理程序。

对于连接池,它将在所有连接设置并准备就绪后触发回调。

连接处理程序是 Redis 的客户端接口。它接受 Redis 支持的所有命令,例如 getset 等。它是在下面的示例中的 rc 对象。

连接处理程序将自动选择连接池中可用的连接,并在必要时自动重新连接到 Redis。

如果与 Redis 的连接丢失,所有命令都将引发 ConnectionError 异常,以指示没有活动连接。但是,如果在初始化期间设置了 reconnect 参数为 True,它将在后台不断尝试重新连接。

示例

#!/usr/bin/env python
# coding: utf-8

import txredisapi as redis

from twisted.internet import defer
from twisted.internet import reactor


def sleep(n):
    d = defer.Deferred()
    reactor.callLater(5, lambda *ign: d.callback(None))
    return d


@defer.inlineCallbacks
def main():
    rc = yield redis.ConnectionPool()
    print rc

    # set
    yield rc.set("foo", "bar")

    # sleep, so you can kill redis
    print "sleeping for 5s, kill redis now..."
    yield sleep(5)

    try:
      v = yield rc.get("foo")
      print "foo:", v

      yield rc.disconnect()
    except redis.ConnectionError, e:
      print str(e)


if __name__ == "__main__":
    main().addCallback(lambda ign: reactor.stop())
    reactor.run()

懒连接

此类连接将立即返回连接处理程序对象,甚至在连接建立之前。

它将在后台启动连接(或连接池的情况下的连接),并在必要时自动重新连接。

当您编写服务器(如 web 服务器或其他在程序初始化期间不应等待 Redis 连接的服务器)时,您需要懒加载连接。

下面的示例是一个 web 应用程序,它将通过 HTTP 暴露 Redis 的 set、get 和 delete 命令。

如果数据库连接断开(无论是redis未运行还是其他原因),Web应用程序将正常启动。如果在操作过程中连接丢失,则不会发生任何变化。

当没有连接时,所有命令都会失败,因此Web应用程序将返回HTTP 503(服务不可用)。一旦与redis的连接重新建立,它将恢复正常。

在应用程序运行后尝试杀死redis服务器,然后发出几个请求。然后,再次启动redis并再次尝试。

示例

#!/usr/bin/env python
# coding: utf-8

import sys

import cyclone.web
import cyclone.redis
from twisted.internet import defer
from twisted.internet import reactor
from twisted.python import log


class Application(cyclone.web.Application):
    def __init__(self):
      handlers = [ (r"/text/(.+)", TextHandler) ]

      RedisMixin.setup()
      cyclone.web.Application.__init__(self, handlers, debug=True)


class RedisMixin(object):
    redis_conn = None

    @classmethod
    def setup(self):
        RedisMixin.redis_conn = cyclone.redis.lazyConnectionPool()


# Provide GET, SET and DELETE redis operations via HTTP
class TextHandler(cyclone.web.RequestHandler, RedisMixin):
    @defer.inlineCallbacks
    def get(self, key):
      try:
          value = yield self.redis_conn.get(key)
      except Exception, e:
          log.msg("Redis failed to get('%s'): %s" % (key, str(e)))
          raise cyclone.web.HTTPError(503)

      self.set_header("Content-Type", "text/plain")
      self.write("%s=%s\r\n" % (key, value))

    @defer.inlineCallbacks
    def post(self, key):
        value = self.get_argument("value")
        try:
            yield self.redis_conn.set(key, value)
        except Exception, e:
            log.msg("Redis failed to set('%s', '%s'): %s" % (key, value, str(e)))
            raise cyclone.web.HTTPError(503)

        self.set_header("Content-Type", "text/plain")
        self.write("%s=%s\r\n" % (key, value))

    @defer.inlineCallbacks
    def delete(self, key):
        try:
            n = yield self.redis_conn.delete(key)
        except Exception, e:
            log.msg("Redis failed to del('%s'): %s" % (key, str(e)))
            raise cyclone.web.HTTPError(503)

        self.set_header("Content-Type", "text/plain")
        self.write("DEL %s=%d\r\n" % (key, n))


def main():
    log.startLogging(sys.stdout)
    reactor.listenTCP(8888, Application(), interface="127.0.0.1")
    reactor.run()


if __name__ == "__main__":
    main()

这是在一个终端中运行的服务器:

$ ./helloworld.py
2012-02-17 15:40:25-0500 [-] Log opened.
2012-02-17 15:40:25-0500 [-] Starting factory <redis.Factory instance at 0x1012f0560>
2012-02-17 15:40:25-0500 [-] __main__.Application starting on 8888
2012-02-17 15:40:25-0500 [-] Starting factory <__main__.Application instance at 0x100f42290>
2012-02-17 15:40:53-0500 [RedisProtocol,client] 200 POST /text/foo (127.0.0.1) 1.20ms
2012-02-17 15:41:01-0500 [RedisProtocol,client] 200 GET /text/foo (127.0.0.1) 0.97ms
2012-02-17 15:41:09-0500 [RedisProtocol,client] 200 DELETE /text/foo (127.0.0.1) 0.65ms
(killed redis-server)
2012-02-17 15:48:48-0500 [HTTPConnection,0,127.0.0.1] Redis failed to get('foo'): Not connected
2012-02-17 15:48:48-0500 [HTTPConnection,0,127.0.0.1] 503 GET /text/foo (127.0.0.1) 2.99ms

这些是来自另一个终端的curl请求。

设置

$ curl -D - -d "value=bar" http://localhost:8888/text/foo
HTTP/1.1 200 OK
Content-Length: 9
Content-Type: text/plain

foo=bar

获取

$ curl -D - http://localhost:8888/text/foo
HTTP/1.1 200 OK
Content-Length: 9
Etag: "b63729aa7fa0e438eed735880951dcc21d733676"
Content-Type: text/plain

foo=bar

删除

$ curl -D - -X DELETE http://localhost:8888/text/foo
HTTP/1.1 200 OK
Content-Length: 11
Content-Type: text/plain

DEL foo=1

当redis未运行时

$ curl -D - http://localhost:8888/text/foo
HTTP/1.1 503 Service Unavailable
Content-Length: 89
Content-Type: text/html; charset=UTF-8

<html><title>503: Service Unavailable</title>
<body>503: Service Unavailable</body></html>

分片连接

它们可以是正常连接或懒连接。它们可以是分片连接池。并非所有命令都支持在分片连接上执行。

如果您尝试运行的命令在分片连接上不受支持,连接处理器将引发NotImplementedError异常。

简单示例:在两个redis服务器之间自动分片键

#!/usr/bin/env python
# coding: utf-8

import txredisapi as redis

from twisted.internet import defer
from twisted.internet import reactor


@defer.inlineCallbacks
def main():
    rc = yield redis.ShardedConnection(["localhost:6379", "localhost:6380"])
    print rc
    print "Supported methods on sharded connections:", rc.ShardedMethods

    keys = []
    for x in xrange(100):
        key = "foo%02d" % x
        yield rc.set(key, "bar%02d" % x)
        keys.append(key)

    # yey! mget is supported!
    response = yield rc.mget(keys)
    for val in response:
        print val

    yield rc.disconnect()


if __name__ == "__main__":
    main().addCallback(lambda ign: reactor.stop())
    reactor.run()

事务

出于明显的原因,分片连接不支持事务。但在正常或懒连接以及连接池中,它们表现相当好。

注意:redis使用以下方法进行事务

  • WATCH:同步
  • MULTI:开始事务
  • EXEC:提交事务
  • DISCARD:明白了。

因为exec是Python中的保留字,所以提交命令为commit

示例

#!/usr/bin/env python
# coding: utf-8

import txredisapi as redis

from twisted.internet import defer
from twisted.internet import reactor


@defer.inlineCallbacks
def main():
    rc = yield redis.ConnectionPool()

    # Remove the keys
    yield rc.delete(["a1", "a2", "a3"])

    # Start transaction
    t = yield rc.multi()

    # These will return "QUEUED" - even t.get(key)
    yield t.set("a1", "1")
    yield t.set("a2", "2")
    yield t.set("a3", "3")
    yield t.get("a1")

    # Try to call get() while in a transaction.
    # It will fail if it's not a connection pool, or if all connections
    # in the pool are in a transaction.
    # Note that it's rc.get(), not the transaction object t.get().
    try:
        v = yield rc.get("foo")
    print "foo=", v
        except Exception, e:
        print "can't get foo:", e

    # Commit, and get all responses from transaction.
    r = yield t.commit()
    print "commit=", repr(r)

    yield rc.disconnect()


if __name__ == "__main__":
    main().addCallback(lambda ign: reactor.stop())
    reactor.run()

一个“COUNTER”示例,使用WATCH/MULTI

 #!/usr/bin/env python
 # coding: utf-8

 import txredisapi as redis

 from twisted.internet import defer
 from twisted.internet import reactor


 @defer.inlineCallbacks
 def main():
     rc = yield redis.ConnectionPool()

     # Reset keys
     yield rc.set("a1", 0)

     # Synchronize and start transaction
     t = yield rc.watch("a1")

     # Load previous value
     a1 = yield t.get("a1")

     # start the transactional pipeline
     yield t.multi()

     # modify and retrieve the new a1 value
     yield t.set("a1", a1 + 1)
     yield t.get("a1")

     print "simulating concurrency, this will abort the transaction"
     yield rc.set("a1", 2)

     try:
         r = yield t.commit()
         print "commit=", repr(r)
     except redis.WatchError, e:
         a1 = yield rc.get("a1")
         print "transaction has failed."
         print "current a1 value: ", a1

     yield rc.disconnect()


 if __name__ == "__main__":
     main().addCallback(lambda ign: reactor.stop())
     reactor.run()

调用commit将返回一个列表,其中包含事务中执行的所有命令的返回值。discard另一方面,通常只返回一个OK

管道

txredisapi自动通过在发送下一个命令而不等待前一个命令从服务器收到回复来管道化所有命令。这在单个连接上也能工作,通过减少往返延迟次数来提高性能。但有两个例外

  • blpopbrpopbrpoplpush阻塞后不会发送任何命令,直到收到响应;
  • 通过multi/commit的事务也是阻塞的,使所有其他命令都等待事务执行。

当您需要将大量数据加载到Redis时,可能更有效的是将命令分批发送,将它们离线分组在一起以节省TCP数据包和网络栈开销。您可以使用pipeline方法显式累积命令并将它们作为一个批次发送到服务器。请小心不要累积太多命令:不合理的批次大小可能会在客户端和服务器端消耗意外数量的内存。例如,将命令分成10k个批次的批次,而不是一次发送所有数据。速度几乎相同,但额外使用的内存最多仅用于排队这10k个命令所需的内存量

要批量发送命令

#!/usr/bin/env python
# coding: utf-8

import txredisapi as redis

from twisted.internet import defer
from twisted.internet import reactor

@defer.inlineCallbacks
def main():
    rc = yield redis.ConnectionPool()

    # Start grouping commands
    pipeline = yield rc.pipeline()

    pipeline.set("foo", 123)
    pipeline.set("bar", 987)
    pipeline.get("foo")
    pipeline.get("bar")

    # Write those 2 sets and 2 gets to redis all at once, and wait
    # for all replies before continuing.
    results = yield pipeline.execute_pipeline()

    print "foo:", results[2] # should be 123
    print "bar:", results[3] # should be 987

    yield rc.disconnect()

if __name__ == "__main__":
    main().addCallback(lambda ign: reactor.stop())
    reactor.run()

身份验证

以下是身份验证的方法:

#!/usr/bin/env python

import txredisapi
from twisted.internet import defer
from twisted.internet import reactor


@defer.inlineCallbacks
def main():
    redis = yield txredisapi.Connection(password="foobared")
    yield redis.set("foo", "bar")
    print (yield redis.get("foo"))
    reactor.stop()


if __name__ == "__main__":
    main()
    reactor.run()

使用Redis Sentinel进行连接

txredisapi可以使用Redis Sentinel发现Redis主从地址并在服务器故障时自动故障转移。

#!/usr/bin/env python

from twisted.internet.task import react
import txredisapi

@defer.inlineCallbacks
def main(reactor):
    sentinel = txredisapi.Sentinel([("sentinel-a", 26379), ("sentinel-b", 26379), ("sentinel-c", 26379)])
    redis = sentinel.master_for("service_name")
    yield redis.set("foo", "bar")
    print (yield redis.get("foo"))
    yield redis.disconnect()
    yield sentinel.disconnect()
    
react(main)

可以在master_for()调用中指定类似dbid=Npoolsize=N的常规连接参数。使用sentinel.slave_for()连接到从服务器而不是主服务器。

min_other_sentinels=N添加到Sentinel构造函数调用中,使其仅遵守连接到指定数量其他哨兵的哨兵的信息,以最小化网络分区情况下的脑裂风险。

致谢

感谢(不分先后):

  • Alexandre Fiori

    • txredisapi的作者
  • Gleicon Moraes

    • 错误修复、测试和RestMQ
    • 用于分片的一致性哈希算法的编写。
  • Dorian Raymer和Ludovico Magnocavallo

    • 原始redis协议for twisted的作者。
  • Vanderson Mota

    • 初始pypi设置和补丁。
  • Jeethu Rao

    • 贡献了测试用例和其他想法,如对travis-ci的支持
  • Jeremy Archer

    • 少量错误修复。
  • Christoph Tavan (@ctavan)

    • 嵌套多批量回复的想法和测试用例,命令的轻微增强。
  • dgvncsz0f

    • WATCH/UNWATCH命令
  • Ilia Glazkov

    • 池的免费连接选择算法。
    • 非Unicode字符集修复。
    • SCAN命令
  • Matt Pizzimenti (mjpizz)

    • 支持管道化
  • Nickolai Novik (jettify)

    • SET命令的更新
  • Evgeny Tataurov (etataurov)

    • 使用hiredis协议解析器的功能
  • Ilya Skriblovsky (IlyaSkriblovsky)

    • 哨兵支持

项目详情


下载文件

下载适合您平台文件。如果您不确定选择哪个,请了解更多关于安装包的信息。

源分布

txredisapi-1.4.10.tar.gz (31.2 kB 查看哈希值)

上传时间

构建分布

txredisapi-1.4.10-py3-none-any.whl (31.0 kB 查看哈希值)

上传时间 Python 3

由以下机构支持

AWSAWS云计算和安全赞助商DatadogDatadog监控FastlyFastlyCDNGoogleGoogle下载分析MicrosoftMicrosoftPSF赞助商PingdomPingdom监控SentrySentry错误日志StatusPageStatusPage状态页面