跳转到主要内容

gevent的TCP连接池

项目描述

geventconnpool

此软件包实现了一个通用的TCP连接池,用于基于gevent的应用程序。每当您的程序需要通过基于TCP的协议(包括所有HTTP协议如REST API)连接到外部服务,并且希望您的进程保持和管理到远程端点的一组连接时,都可以使用它。

一个典型的场景可能是一个基于gunicorn的web应用程序,具有gevent后端,通过HTTPS API访问远程服务。在这种情况下,使用池可以缩短请求并最小化延迟,因为池保持了到远程端点的开放连接,每次需要发出命令时无需进行完整的SSL握手。

快速入门

要安装此包,请使用pip

$ pip install geventconnpool

或easy_install

$ easy_install geventconnpool

您需要从ConnectionPool派生,并重新实现_new_connection,以指定如何打开到远程站点的连接。例如

from geventconnpool import ConnectionPool
from gevent import socket

class MyPool(ConnectionPool):
    def _new_connection(self):
        return socket.create_connection(('test.example.org', 2485))

在这种情况下,我们只是打开到指定对等方的TCP连接。

可以通过指定我们希望同时保持打开多少个连接来实例化池

pool = MyPool(20)  # always keep 20 connections open

要访问池中的连接

with pool.get() as c:
    c.send("PING\n")
    if c.recv(5) != "PONG\n":
        raise socket.error("something awful happened")

如果上下文通过socket.error异常退出,则连接将被丢弃,并在后台打开一个新连接,以保持池始终充满有效连接。任何其他异常都没有特殊含义,连接将被重新插入池中以便以后重用。

如果您的应用程序/库使用非socket.error派生的异常来表示连接错误(imaplib 是一个例子),您可以重写哪些异常被视为触发丢弃

pool = MyPool(20, exc_classes=(socket.error, imaplib.IMAP4.error))

自动重试

如果您想对临时网络错误具有容错性,可以使用 retry 装饰器,如果函数因 socket.error 异常而退出,它将重新执行该函数

from geventconnpool import retry

@retry
def senddata(data):
    with pool.get() as c:
        c.send(data)
        if c.recv(2) != "OK":
            raise socket.error("something awful happened")

由于池在生成 socket.error 异常时丢弃连接,因此 retry 的净效应是每次尝试都使用不同的连接。

retry 装饰器有一些默认禁用的额外功能。如果您传递一个日志记录器,它将记录每次尝试。您还可以指定最大尝试次数、连续尝试之间的间隔以及触发重试的具体异常类。日志消息和级别是可定制的。

import logging
logging.basicConfig()
log = logging.getLogger()

from geventconnpool import retry

@retry(exc_classes=(socket.error, imaplib.IMAP4.error), logger=log,
       max_failures=5, interval=2)
def senddata(data):
    with pool.get() as c:
        typ, data = c.select('INBOX')

如果您希望将一组 retry 选项编码到您的代码中,请考虑使用 functools.partial

高级连接示例

当实现连接池时,建议在 _new_connection 回调中执行应用程序协议的所有初始化阶段。例如,一种协议可能允许使用类似于 STARTTLS 的方式切换到 TLS,然后需要身份验证

from geventconnpool import ConnectionPool
from gevent import socket, ssl

class MyPool(ConnectionPool):
    def _new_connection(self):
        s = socket.create_connection(('test.example.org', 2485))
        s.send("STARTTLS\n")
        res = s.recv(3)
        if res == "OK\n":
            s = ssl.wrap_socket(s)
        elif res == "NO\n":
            pass
        else:
            raise socket.error("invalid response to STARTTLS")

        s.send("LOGIN: %s\n" % MY_LOGIN_NAME);
        s.send("PASS: %s\n" % MY_PASS);
        res = s.recv(2)
        if res != "OK":
            raise socket.error("authentication failed")
        return s

正如您所看到的,如果出现问题,可以简单地引发 socket.error。池对临时连接错误具有抵抗力,并将自动尝试稍后建立新的连接。

另一种常见的情况可能涉及使用第三方库,例如使用 boto 连接到 Amazon AWS

from geventconnpool import ConnectionPool
import boto
from boto.exception import NoAuthHanlder

class UsersPool(ConnectionPool):
    def _new_connection(self):
        try:
            c = boto.connect_dynamodb(MY_AWS_KEY_ID, MY_AWS_SECRET_KEY)
            return c.get_table("users")
        except:
            raise socket.error("error connecting to AWS")

在这种情况下,我们不仅连接到 AWS 并进行身份验证,还打开一个特定的表并返回对该表的引用。事实上,_new_connection() 的返回值不一定是套接字(或类似于套接字):ConnectionPool 将其视为黑盒并在调用 get 时返回它。

Keepalive

某些协议或网络可能需要保持连接开启的 keepalive 机制,如果连接空闲。例如,如果连接空闲时间过长,远程对等方、防火墙或负载均衡器可能会关闭连接。

有时,仅依赖于标准的 TCP 级别 keepalive 就足够了,这可以在任何 TCP 套接字上打开

from geventconnpool import ConnectionPool
from gevent import socket

class MyPool(ConnectionPool):
    def _new_connection(self):
        s = socket.create_connection(('test.example.org', 2485))
        s._sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
        return s

TCP keepalive 使用 ACK 包与远程对等方持续通信。要调整 keepalive 参数(ACK 之间的延迟、认为连接已断开的不应答 ACK 的数量等),您需要调整 proc 文件系统(是的,这是一个全局每计算机配置)。

或者,您可以通过实现 _keepalive 方法并在构造函数中指定 keepalive 频率来实现应用级别的 keepalive

from geventconnpool import ConnectionPool
from gevent import socket

class MyPool(ConnectionPool):
    def _new_connection(self):
        return socket.create_connection(('test.example.org', 2485))

    def _keepalive(self, c):
        c.send("PING\n")
        if c.recv(5) != "PONG\n":
            raise socket.error

pool = MyPool(20, keepalive=30)

上面的代码使用基于应用级命令(PING)的 keepalive,并指定它应该每 30 秒(每个连接)执行一次。

_keepalive 应该引发 socket.error 来表示连接似乎已断开,应由池丢弃。

项目详细信息


下载文件

下载适合您平台的文件。如果您不确定选择哪个,请了解更多关于 安装包 的信息。

源代码分布

geventconnpool-0.2.1.tar.gz (6.9 kB 查看哈希值)

上传时间 源代码

支持者

AWSAWS云计算和安全赞助商DatadogDatadog监控FastlyFastlyCDNGoogleGoogle下载分析MicrosoftMicrosoftPSF赞助商PingdomPingdom监控SentrySentry错误日志StatusPageStatusPage状态页面