Python实现电路断路器模式
项目描述
PyBreaker是Michael T. Nygard的书中描述的电路断路器模式的Python实现,《Release It!》。
根据Nygard的话,“电路断路器存在是为了允许一个子系统失败而不会破坏整个系统。这是通过包装危险操作(通常是集成点)并使用一个可以在系统不健康时绕过调用的组件来实现的。”
功能
可配置的排除异常列表(例如业务异常)
可配置的失败阈值和重置超时
支持每个断路器的事件监听器
可以保护生成器函数
易于监控和管理的函数和属性
线程安全
可选的Redis支持
可选的异步Tornado调用支持
需求
Python 3.8+
安装
运行以下命令行,从PyPI下载PyBreaker的最新稳定版本
$ pip install pybreaker
如果您是Git用户,您可能希望以可编辑模式安装当前的开发版本
$ git clone git://github.com/danielfm/pybreaker.git $ cd pybreaker $ # run tests (on windows omit ./) $ ./pw test $ pip install -e .
用法
第一步是为您想要保护免受其影响的每个集成点创建一个CircuitBreaker实例
import pybreaker
# Used in database integration points
db_breaker = pybreaker.CircuitBreaker(fail_max=5, reset_timeout=60)
CircuitBreaker实例应在应用程序作用域内全局生存,例如,跨越请求。
如果您想使用Redis后端,请使用CircuitBreaker与CircuitRedisStorage初始化。
import pybreaker
import redis
redis = redis.StrictRedis()
db_breaker = pybreaker.CircuitBreaker(
fail_max=5,
reset_timeout=60,
state_storage=pybreaker.CircuitRedisStorage(pybreaker.STATE_CLOSED, redis))
不要将Redis连接的decode_responses设置为True,这将强制从Redis返回ASCII对象,并在Python3+中引发
AttributeError: ‘str’对象没有属性‘decode’
import pybreaker
from django_redis import get_redis_connection
db_breaker = pybreaker.CircuitBreaker(
fail_max=5,
reset_timeout=60,
state_storage=pybreaker.CircuitRedisStorage(pybreaker.STATE_CLOSED, get_redis_connection('default')))
import pybreaker
from django_redis import get_redis_connection
db_breaker = pybreaker.CircuitBreaker(
fail_max=5,
reset_timeout=60,
state_storage=pybreaker.CircuitRedisStorage(pybreaker.STATE_CLOSED, get_redis_connection('default'),namespace='unique_namespace'))
事件监听
如果您只想在发生某些事件时采取行动,则无需子类化CircuitBreaker。在这种情况下,最好是子类化CircuitBreakerListener
class DBListener(pybreaker.CircuitBreakerListener):
"Listener used by circuit breakers that execute database operations."
def before_call(self, cb, func, *args, **kwargs):
"Called before the circuit breaker `cb` calls `func`."
pass
def state_change(self, cb, old_state, new_state):
"Called when the circuit breaker `cb` state changes."
pass
def failure(self, cb, exc):
"Called when a function invocation raises a system error."
pass
def success(self, cb):
"Called when a function invocation succeeds."
pass
class LogListener(pybreaker.CircuitBreakerListener):
"Listener used to log circuit breaker events."
def state_change(self, cb, old_state, new_state):
msg = "State Change: CB: {0}, New State: {1}".format(cb.name, new_state)
logging.info(msg)
要将监听器添加到断路器
# At creation time...
db_breaker = pybreaker.CircuitBreaker(listeners=[DBListener(), LogListener()])
# ...or later
db_breaker.add_listeners(OneListener(), AnotherListener())
断路器做什么?
假设您想在更新customer数据库表中的行的函数上使用断路器
@db_breaker
def update_customer(cust):
# Do stuff here...
pass
# Will trigger the circuit breaker
updated_customer = update_customer(my_customer)
或者如果您不想使用装饰器语法
def update_customer(cust):
# Do stuff here...
pass
# Will trigger the circuit breaker
updated_customer = db_breaker.call(update_customer, my_customer)
或者将其用作上下文管理器以及一个with语句
# Will trigger the circuit breaker
with db_breaker.calling():
# Do stuff here...
pass
根据默认参数,断路器db_breaker将在update_customer中连续5次失败后自动打开电路。
当电路打开时,所有对update_customer的调用将立即失败(引发CircuitBreakerError),而不会尝试执行实际操作。如果您想在电路跳闸时抛出原始错误,请将throw_new_error_on_trip选项设置为False
pybreaker.CircuitBreaker(..., throw_new_error_on_trip=False)
60秒后,断路器将允许对update_customer的下一个调用通过。如果该调用成功,则电路关闭;如果失败,则电路再次打开,直到另一个超时到期。
可选的Tornado支持
断路器可以(可选地)用于调用异步Tornado函数
from tornado import gen
@db_breaker(__pybreaker_call_async=True)
@gen.coroutine
def async_update(cust):
# Do async stuff here...
pass
或者如果您不想使用装饰器语法
@gen.coroutine
def async_update(cust):
# Do async stuff here...
pass
updated_customer = db_breaker.call_async(async_update, my_customer)
排除异常
默认情况下,失败的调用是引发异常的任何调用。然而,通常还会抛出异常来指示业务异常,并且这些异常不应由断路器忽略,因为它们不表示系统错误
# At creation time...
db_breaker = CircuitBreaker(exclude=[CustomerValidationError])
# ...or later
db_breaker.add_excluded_exception(CustomerValidationError)
在这种情况下,当任何由该断路器保护的函数引发CustomerValidationError(或从CustomerValidationError派生的任何异常)时,该调用不会被视为系统故障。
为了覆盖仅使用异常类不足以确定它是否表示系统错误的情况,您也可以传递一个可调用对象而不是类型
db_breaker = CircuitBreaker(exclude=[lambda e: type(e) == HTTPError and e.status_code < 500])
您可以自由混合类型和过滤可调用对象。
监控和管理
断路器提供了一些属性和函数,您可以用来监控和更改其当前状态
# Get the current number of consecutive failures
print(db_breaker.fail_counter)
# Get/set the maximum number of consecutive failures
print(db_breaker.fail_max)
db_breaker.fail_max = 10
# Get/set the current reset timeout period (in seconds)
print db_breaker.reset_timeout
db_breaker.reset_timeout = 60
# Get the current state, i.e., 'open', 'half-open', 'closed'
print(db_breaker.current_state)
# Closes the circuit
db_breaker.close()
# Half-opens the circuit
db_breaker.half_open()
# Opens the circuit
db_breaker.open()
这些属性和函数可能并且应该以某种方式暴露给运维人员,因为它们有助于他们检测系统中的问题。
贡献
运行测试
$ ./pw test
代码格式化(black和isort)和linting(mypy)
$ ./pw format $ ./pw lint
上述命令将在.pyprojectx内部自动安装必要的工具,并安装预提交钩子。
列出可用命令
$ ./pw -i
项目详情
下载文件
下载适用于您平台文件的文件。如果您不确定选择哪个,请了解更多关于安装包的信息。