gevent的TCP连接池
项目描述
geventconnpool
此软件包实现了一个通用的TCP连接池,用于基于gevent的应用程序。每当您的程序需要通过基于TCP的协议(包括所有HTTP协议如REST API)连接到外部服务,并且希望您的进程保持和管理到远程端点的一组连接时,都可以使用它。
一个典型的场景可能是一个基于gunicorn的web应用程序,具有gevent后端,通过HTTPS API访问远程服务。在这种情况下,使用池可以缩短请求并最小化延迟,因为池保持了到远程端点的开放连接,每次需要发出命令时无需进行完整的SSL握手。
快速入门
要安装此包,请使用pip
$ pip install geventconnpool
或easy_install
$ easy_install geventconnpool
您需要从ConnectionPool派生,并重新实现_new_connection,以指定如何打开到远程站点的连接。例如
from geventconnpool import ConnectionPool
from gevent import socket
class MyPool(ConnectionPool):
def _new_connection(self):
return socket.create_connection(('test.example.org', 2485))
在这种情况下,我们只是打开到指定对等方的TCP连接。
可以通过指定我们希望同时保持打开多少个连接来实例化池
pool = MyPool(20) # always keep 20 connections open
要访问池中的连接
with pool.get() as c:
c.send("PING\n")
if c.recv(5) != "PONG\n":
raise socket.error("something awful happened")
如果上下文通过socket.error异常退出,则连接将被丢弃,并在后台打开一个新连接,以保持池始终充满有效连接。任何其他异常都没有特殊含义,连接将被重新插入池中以便以后重用。
如果您的应用程序/库使用非socket.error派生的异常来表示连接错误(imaplib 是一个例子),您可以重写哪些异常被视为触发丢弃
pool = MyPool(20, exc_classes=(socket.error, imaplib.IMAP4.error))
自动重试
如果您想对临时网络错误具有容错性,可以使用 retry 装饰器,如果函数因 socket.error 异常而退出,它将重新执行该函数
from geventconnpool import retry
@retry
def senddata(data):
with pool.get() as c:
c.send(data)
if c.recv(2) != "OK":
raise socket.error("something awful happened")
由于池在生成 socket.error 异常时丢弃连接,因此 retry 的净效应是每次尝试都使用不同的连接。
retry 装饰器有一些默认禁用的额外功能。如果您传递一个日志记录器,它将记录每次尝试。您还可以指定最大尝试次数、连续尝试之间的间隔以及触发重试的具体异常类。日志消息和级别是可定制的。
import logging
logging.basicConfig()
log = logging.getLogger()
from geventconnpool import retry
@retry(exc_classes=(socket.error, imaplib.IMAP4.error), logger=log,
max_failures=5, interval=2)
def senddata(data):
with pool.get() as c:
typ, data = c.select('INBOX')
如果您希望将一组 retry 选项编码到您的代码中,请考虑使用 functools.partial。
高级连接示例
当实现连接池时,建议在 _new_connection 回调中执行应用程序协议的所有初始化阶段。例如,一种协议可能允许使用类似于 STARTTLS 的方式切换到 TLS,然后需要身份验证
from geventconnpool import ConnectionPool
from gevent import socket, ssl
class MyPool(ConnectionPool):
def _new_connection(self):
s = socket.create_connection(('test.example.org', 2485))
s.send("STARTTLS\n")
res = s.recv(3)
if res == "OK\n":
s = ssl.wrap_socket(s)
elif res == "NO\n":
pass
else:
raise socket.error("invalid response to STARTTLS")
s.send("LOGIN: %s\n" % MY_LOGIN_NAME);
s.send("PASS: %s\n" % MY_PASS);
res = s.recv(2)
if res != "OK":
raise socket.error("authentication failed")
return s
正如您所看到的,如果出现问题,可以简单地引发 socket.error。池对临时连接错误具有抵抗力,并将自动尝试稍后建立新的连接。
另一种常见的情况可能涉及使用第三方库,例如使用 boto 连接到 Amazon AWS
from geventconnpool import ConnectionPool
import boto
from boto.exception import NoAuthHanlder
class UsersPool(ConnectionPool):
def _new_connection(self):
try:
c = boto.connect_dynamodb(MY_AWS_KEY_ID, MY_AWS_SECRET_KEY)
return c.get_table("users")
except:
raise socket.error("error connecting to AWS")
在这种情况下,我们不仅连接到 AWS 并进行身份验证,还打开一个特定的表并返回对该表的引用。事实上,_new_connection() 的返回值不一定是套接字(或类似于套接字):ConnectionPool 将其视为黑盒并在调用 get 时返回它。
Keepalive
某些协议或网络可能需要保持连接开启的 keepalive 机制,如果连接空闲。例如,如果连接空闲时间过长,远程对等方、防火墙或负载均衡器可能会关闭连接。
有时,仅依赖于标准的 TCP 级别 keepalive 就足够了,这可以在任何 TCP 套接字上打开
from geventconnpool import ConnectionPool
from gevent import socket
class MyPool(ConnectionPool):
def _new_connection(self):
s = socket.create_connection(('test.example.org', 2485))
s._sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
return s
TCP keepalive 使用 ACK 包与远程对等方持续通信。要调整 keepalive 参数(ACK 之间的延迟、认为连接已断开的不应答 ACK 的数量等),您需要调整 proc 文件系统(是的,这是一个全局每计算机配置)。
或者,您可以通过实现 _keepalive 方法并在构造函数中指定 keepalive 频率来实现应用级别的 keepalive
from geventconnpool import ConnectionPool
from gevent import socket
class MyPool(ConnectionPool):
def _new_connection(self):
return socket.create_connection(('test.example.org', 2485))
def _keepalive(self, c):
c.send("PING\n")
if c.recv(5) != "PONG\n":
raise socket.error
pool = MyPool(20, keepalive=30)
上面的代码使用基于应用级命令(PING)的 keepalive,并指定它应该每 30 秒(每个连接)执行一次。
_keepalive 应该引发 socket.error 来表示连接似乎已断开,应由池丢弃。
项目详细信息
geventconnpool-0.2.1.tar.gz 的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | d80ba45260c4ea6fad8569397f5f5f4ada4a7d2a18fcb8ea1dca82f32fbcddc9 |
|
MD5 | e6d3acc866d9bc79ce215f593fd43e91 |
|
BLAKE2b-256 | 129e20c1203fb5b23332179a438997249ae89107c1a162b2c9f32f49ad030883 |