跳转到主要内容

Python实现电路断路器模式

项目描述

PyBreaker是Michael T. Nygard的书中描述的电路断路器模式的Python实现,《Release It!》。

根据Nygard的话,“电路断路器存在是为了允许一个子系统失败而不会破坏整个系统。这是通过包装危险操作(通常是集成点)并使用一个可以在系统不健康时绕过调用的组件来实现的。”

功能

  • 可配置的排除异常列表(例如业务异常)

  • 可配置的失败阈值和重置超时

  • 支持每个断路器的事件监听器

  • 可以保护生成器函数

  • 易于监控和管理的函数和属性

  • 线程安全

  • 可选的Redis支持

  • 可选的异步Tornado调用支持

需求

安装

运行以下命令行,从PyPI下载PyBreaker的最新稳定版本

$ pip install pybreaker

如果您是Git用户,您可能希望以可编辑模式安装当前的开发版本

$ git clone git://github.com/danielfm/pybreaker.git
$ cd pybreaker
$ # run tests (on windows omit ./)
$ ./pw test
$ pip install -e .

用法

第一步是为您想要保护免受其影响的每个集成点创建一个CircuitBreaker实例

import pybreaker

# Used in database integration points
db_breaker = pybreaker.CircuitBreaker(fail_max=5, reset_timeout=60)

CircuitBreaker实例应在应用程序作用域内全局生存,例如,跨越请求。

如果您想使用Redis后端,请使用CircuitBreakerCircuitRedisStorage初始化。

import pybreaker
import redis

redis = redis.StrictRedis()
db_breaker = pybreaker.CircuitBreaker(
    fail_max=5,
    reset_timeout=60,
    state_storage=pybreaker.CircuitRedisStorage(pybreaker.STATE_CLOSED, redis))

不要将Redis连接的decode_responses设置为True,这将强制从Redis返回ASCII对象,并在Python3+中引发

AttributeError: ‘str’对象没有属性‘decode’

import pybreaker
from django_redis import get_redis_connection

db_breaker = pybreaker.CircuitBreaker(
    fail_max=5,
    reset_timeout=60,
    state_storage=pybreaker.CircuitRedisStorage(pybreaker.STATE_CLOSED, get_redis_connection('default')))
import pybreaker
from django_redis import get_redis_connection

db_breaker = pybreaker.CircuitBreaker(
    fail_max=5,
    reset_timeout=60,
    state_storage=pybreaker.CircuitRedisStorage(pybreaker.STATE_CLOSED, get_redis_connection('default'),namespace='unique_namespace'))

事件监听

如果您只想在发生某些事件时采取行动,则无需子类化CircuitBreaker。在这种情况下,最好是子类化CircuitBreakerListener

class DBListener(pybreaker.CircuitBreakerListener):
    "Listener used by circuit breakers that execute database operations."

    def before_call(self, cb, func, *args, **kwargs):
        "Called before the circuit breaker `cb` calls `func`."
        pass

    def state_change(self, cb, old_state, new_state):
        "Called when the circuit breaker `cb` state changes."
        pass

    def failure(self, cb, exc):
        "Called when a function invocation raises a system error."
        pass

    def success(self, cb):
        "Called when a function invocation succeeds."
        pass

class LogListener(pybreaker.CircuitBreakerListener):
    "Listener used to log circuit breaker events."

    def state_change(self, cb, old_state, new_state):
        msg = "State Change: CB: {0}, New State: {1}".format(cb.name, new_state)
        logging.info(msg)

要将监听器添加到断路器

# At creation time...
db_breaker = pybreaker.CircuitBreaker(listeners=[DBListener(), LogListener()])

# ...or later
db_breaker.add_listeners(OneListener(), AnotherListener())

断路器做什么?

假设您想在更新customer数据库表中的行的函数上使用断路器

@db_breaker
def update_customer(cust):
    # Do stuff here...
    pass

# Will trigger the circuit breaker
updated_customer = update_customer(my_customer)

或者如果您不想使用装饰器语法

def update_customer(cust):
    # Do stuff here...
    pass

# Will trigger the circuit breaker
updated_customer = db_breaker.call(update_customer, my_customer)

或者将其用作上下文管理器以及一个with语句

# Will trigger the circuit breaker
with db_breaker.calling():
    # Do stuff here...
    pass

根据默认参数,断路器db_breaker将在update_customer中连续5次失败后自动打开电路。

当电路打开时,所有对update_customer的调用将立即失败(引发CircuitBreakerError),而不会尝试执行实际操作。如果您想在电路跳闸时抛出原始错误,请将throw_new_error_on_trip选项设置为False

pybreaker.CircuitBreaker(..., throw_new_error_on_trip=False)

60秒后,断路器将允许对update_customer的下一个调用通过。如果该调用成功,则电路关闭;如果失败,则电路再次打开,直到另一个超时到期。

可选的Tornado支持

断路器可以(可选地)用于调用异步Tornado函数

from tornado import gen

@db_breaker(__pybreaker_call_async=True)
@gen.coroutine
def async_update(cust):
    # Do async stuff here...
    pass

或者如果您不想使用装饰器语法

@gen.coroutine
def async_update(cust):
    # Do async stuff here...
    pass

updated_customer = db_breaker.call_async(async_update, my_customer)

排除异常

默认情况下,失败的调用是引发异常的任何调用。然而,通常还会抛出异常来指示业务异常,并且这些异常不应由断路器忽略,因为它们不表示系统错误

# At creation time...
db_breaker = CircuitBreaker(exclude=[CustomerValidationError])

# ...or later
db_breaker.add_excluded_exception(CustomerValidationError)

在这种情况下,当任何由该断路器保护的函数引发CustomerValidationError(或从CustomerValidationError派生的任何异常)时,该调用不会被视为系统故障。

为了覆盖仅使用异常类不足以确定它是否表示系统错误的情况,您也可以传递一个可调用对象而不是类型

db_breaker = CircuitBreaker(exclude=[lambda e: type(e) == HTTPError and e.status_code < 500])

您可以自由混合类型和过滤可调用对象。

监控和管理

断路器提供了一些属性和函数,您可以用来监控和更改其当前状态

# Get the current number of consecutive failures
print(db_breaker.fail_counter)

# Get/set the maximum number of consecutive failures
print(db_breaker.fail_max)
db_breaker.fail_max = 10

# Get/set the current reset timeout period (in seconds)
print db_breaker.reset_timeout
db_breaker.reset_timeout = 60

# Get the current state, i.e., 'open', 'half-open', 'closed'
print(db_breaker.current_state)

# Closes the circuit
db_breaker.close()

# Half-opens the circuit
db_breaker.half_open()

# Opens the circuit
db_breaker.open()

这些属性和函数可能并且应该以某种方式暴露给运维人员,因为它们有助于他们检测系统中的问题。

贡献

运行测试

$ ./pw test

代码格式化(black和isort)和linting(mypy)

$ ./pw format
$ ./pw lint

上述命令将在.pyprojectx内部自动安装必要的工具,并安装预提交钩子。

列出可用命令

$ ./pw -i

项目详情


下载文件

下载适用于您平台文件的文件。如果您不确定选择哪个,请了解更多关于安装包的信息。

源代码分布

pybreaker-1.2.0.tar.gz (14.7 kB 查看哈希值)

上传时间 源代码

构建分布

pybreaker-1.2.0-py3-none-any.whl (12.3 kB 查看哈希值)

上传时间 Python 3

由以下支持