中间件将项目日志关联到单个请求
项目描述
ASGI关联ID中间件
用于读取或生成每个传入请求的关联ID的中间件。然后可以将关联ID添加到您的日志中,从而可以轻松检索单个HTTP请求生成的所有日志。
当中间件检测到入站请求中的关联ID HTTP头时,该ID将被存储。如果没有找到头,则将为请求生成一个关联ID。
默认情况下,中间件会检查X-Request-ID
头,但可以设置为任何键。X-Correlation-ID
也相当常用。
示例
一旦配置了日志记录,您的输出将从这个
INFO ... project.views This is an info log
WARNING ... project.models This is a warning log
INFO ... project.views This is an info log
INFO ... project.views This is an info log
WARNING ... project.models This is a warning log
WARNING ... project.models This is a warning log
变为这个
INFO ... [773fa6885] project.views This is an info log
WARNING ... [773fa6885] project.models This is a warning log
INFO ... [0d1c3919e] project.views This is an info log
INFO ... [99d44111e] project.views This is an info log
WARNING ... [0d1c3919e] project.models This is a warning log
WARNING ... [99d44111e] project.models This is a warning log
现在我们可以真正看到哪些日志是相关的。
安装
pip install asgi-correlation-id
设置
为了设置包,您需要添加中间件并配置日志。
添加中间件
中间件可以添加如下
from fastapi import FastAPI
from asgi_correlation_id import CorrelationIdMiddleware
app = FastAPI()
app.add_middleware(CorrelationIdMiddleware)
或者您框架允许的任何其他方式。
对于Starlette应用程序,只需在所有示例中将FastAPI
替换为Starlette
。
配置日志
本节假设您已经在项目中开始配置日志。如果不是这种情况,请查看从头开始设置日志的部分。
要设置关联ID的日志记录,您只需添加包提供的日志过滤器即可。
如果您的当前日志配置看起来像这样
LOGGING = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'web': {
'class': 'logging.Formatter',
'datefmt': '%H:%M:%S',
'format': '%(levelname)s ... %(name)s %(message)s',
},
},
'handlers': {
'web': {
'class': 'logging.StreamHandler',
'formatter': 'web',
},
},
'loggers': {
'my_project': {
'handlers': ['web'],
'level': 'DEBUG',
'propagate': True,
},
},
}
您只需添加过滤器,如下所示
LOGGING = {
'version': 1,
'disable_existing_loggers': False,
+ 'filters': {
+ 'correlation_id': {
+ '()': 'asgi_correlation_id.CorrelationIdFilter',
+ 'uuid_length': 32,
+ 'default_value': '-',
+ },
+ },
'formatters': {
'web': {
'class': 'logging.Formatter',
'datefmt': '%H:%M:%S',
+ 'format': '%(levelname)s ... [%(correlation_id)s] %(name)s %(message)s',
},
},
'handlers': {
'web': {
'class': 'logging.StreamHandler',
+ 'filters': ['correlation_id'],
'formatter': 'web',
},
},
'loggers': {
'my_project': {
'handlers': ['web'],
'level': 'DEBUG',
'propagate': True,
},
},
}
如果您使用json日志格式化程序,只需将correlation-id: %(correlation_id)s
添加到您的属性列表中。
中间件配置
中间件可以通过几种方式配置,但没有必需的参数。
app.add_middleware(
CorrelationIdMiddleware,
header_name='X-Request-ID',
update_request_header=True,
generator=lambda: uuid4().hex,
validator=is_valid_uuid4,
transformer=lambda a: a,
)
可配置的中间件参数包括
header_name
- 类型:
str
- 默认值:
X-Request-ID
- 描述: 头名称决定从哪个HTTP头读取关联ID的值。
X-Request-ID
和X-Correlation-ID
是常见的选择。
update_request_header
- 类型:
bool
- 默认值:
True
- 描述: 是否使用生成的关联ID更新入站请求的头值。这是为了支持依赖于请求头存在的情况(如各种跟踪中间件)。
generator
- 类型:
Callable[[], str]
- 默认值:
lambda: uuid4().hex
- 描述: 生成器函数负责在未从入站请求的头部接收ID时生成新的关联ID。我们默认使用UUIDs,但您可以选择使用如nanoid或您自己的自定义函数。
validator
- 类型:
Callable[[str], bool]
- 默认值:
is_valid_uuid4
( found here) - 描述: 验证器函数用于读取入站HTTP头值。默认情况下,我们丢弃非UUID格式的头值,以强制关联ID的唯一性。如果您希望允许任何头值,可以将此设置设置为
None
,或传递自己的验证器。
transformer
- 类型:
Callable[[str], str]
- 默认值:
lambda a: a
- 描述: 大多数用户不需要转换器,默认情况下我们不执行任何操作。该参数添加是为了满足用户可能希望以某种方式更改入站或生成的ID值的情况。它提供了一种按您认为合适的方式转换入站ID的机制。有关更多上下文,请参阅中间件代码。
CORS
如果您正在使用跨源资源共享(CORS),例如,您从不同的源提供的前端JavaScript代码中对API进行请求,您必须确保以下两点
- 允许在入站请求的HTTP头中包含关联ID头,以便中间件可以重复使用该值,
- 在响应的HTTP头中将关联ID头添加到允许列表中,以便浏览器可以访问。
最好通过为所选框架使用专用中间件来实现这一点。以下是一些示例。
Starlette
文档:https://www.starlette.io/middleware/#corsmiddleware
from starlette.applications import Starlette
from starlette.middleware import Middleware
from starlette.middleware.cors import CORSMiddleware
middleware = [
Middleware(
CORSMiddleware,
allow_origins=['*'],
allow_methods=['*'],
allow_headers=['X-Requested-With', 'X-Request-ID'],
expose_headers=['X-Request-ID']
)
]
app = Starlette(..., middleware=middleware)
FastAPI
文档:https://fastapi.org.cn/tutorial/cors/
from app.main import app
from fastapi.middleware.cors import CORSMiddleware
app.add_middleware(
CORSMiddleware,
allow_origins=['*'],
allow_methods=['*'],
allow_headers=['X-Requested-With', 'X-Request-ID'],
expose_headers=['X-Request-ID']
)
有关此主题的更多详细信息,请参阅CORS协议。
异常处理
默认情况下,X-Request-ID
响应头将包含在所有来自服务器的响应中,除了未处理的服务器错误的情况。如果您希望在500
错误的情况下包含请求ID,您可以添加一个自定义异常处理程序。
以下是一些简单的示例,以帮助您开始。有关更多信息,请参阅每个框架的文档。
Starlette
文档:https://www.starlette.io/exceptions/
from starlette.requests import Request
from starlette.responses import PlainTextResponse
from starlette.applications import Starlette
from asgi_correlation_id import correlation_id
async def custom_exception_handler(request: Request, exc: Exception) -> PlainTextResponse:
return PlainTextResponse(
"Internal Server Error",
status_code=500,
headers={'X-Request-ID': correlation_id.get() or ""}
)
app = Starlette(
...,
exception_handlers={500: custom_exception_handler}
)
FastAPI
文档:https://fastapi.org.cn/tutorial/handling-errors/
from app.main import app
from fastapi import HTTPException, Request
from fastapi.exception_handlers import http_exception_handler
from fastapi.responses import JSONResponse
from asgi_correlation_id import correlation_id
@app.exception_handler(Exception)
async def unhandled_exception_handler(request: Request, exc: Exception) -> JSONResponse:
return await http_exception_handler(
request,
HTTPException(
500,
'Internal server error',
headers={'X-Request-ID': correlation_id.get() or ""}
))
如果您使用CORS,您还必须在错误响应中包含Access-Control-Allow-Origin
和Access-Control-Expose-Headers
头。有关更多详细信息,请参阅上面的CORS部分。
从头开始设置日志记录
如果您的项目尚未配置日志记录,本节将解释如何开始。如果您想要更多细节,请查看这篇博客文章。
Python文档说明了您可以使用一些配置函数进行简单的设置。对于此示例,我们将使用dictConfig
,因为例如,Django用户应该会发现它最熟悉,但是不同的配置方法是可互换的,所以如果您想使用另一种方法,只需浏览Python文档,并根据您的需要更改配置方法。
dictConfig
的好处是它允许您在单个数据结构中指定整个日志配置,并允许您向其中添加条件逻辑。以下示例显示了如何设置控制台和JSON日志记录。
from logging.config import dictConfig
from app.core.config import settings
def configure_logging() -> None:
dictConfig(
{
'version': 1,
'disable_existing_loggers': False,
'filters': { # correlation ID filter must be added here to make the %(correlation_id)s formatter work
'correlation_id': {
'()': 'asgi_correlation_id.CorrelationIdFilter',
'uuid_length': 8 if not settings.ENVIRONMENT == 'local' else 32,
'default_value': '-',
},
},
'formatters': {
'console': {
'class': 'logging.Formatter',
'datefmt': '%H:%M:%S',
# formatter decides how our console logs look, and what info is included.
# adding %(correlation_id)s to this format is what make correlation IDs appear in our logs
'format': '%(levelname)s:\t\b%(asctime)s %(name)s:%(lineno)d [%(correlation_id)s] %(message)s',
},
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
# Filter must be declared in the handler, otherwise it won't be included
'filters': ['correlation_id'],
'formatter': 'console',
},
},
# Loggers can be specified to set the log-level to log, and which handlers to use
'loggers': {
# project logger
'app': {'handlers': ['console'], 'level': 'DEBUG', 'propagate': True},
# third-party package loggers
'databases': {'handlers': ['console'], 'level': 'WARNING'},
'httpx': {'handlers': ['console'], 'level': 'INFO'},
'asgi_correlation_id': {'handlers': ['console'], 'level': 'WARNING'},
},
}
)
如果将日志配置定义在像这样的函数中,您只需确保以某种方式在启动时运行该函数即可,日志记录应该会对您起作用。您可以选择任何方式来做这件事,但是将它们传递给FastAPI.on_startup
可调用对象列表是一个很好的起点。
与structlog集成
structlog是一个Python库,它使结构化日志记录成为可能。
它可以使用asgi_correlation_id
轻松配置。
import logging
from typing import Any
import structlog
from asgi_correlation_id import correlation_id
def add_correlation(
logger: logging.Logger, method_name: str, event_dict: dict[str, Any]
) -> dict[str, Any]:
"""Add request id to log message."""
if request_id := correlation_id.get():
event_dict["request_id"] = request_id
return event_dict
structlog.configure(
processors=[
add_correlation,
structlog.stdlib.filter_by_level,
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M.%S"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.JSONRenderer(),
],
wrapper_class=structlog.stdlib.BoundLogger,
logger_factory=structlog.stdlib.LoggerFactory(),
cache_logger_on_first_use=True,
)
与SAQ集成
如果您使用saq,您可以使用库提供的事件钩子轻松地将请求ID从Web服务器传输到您的工作者。
from uuid import uuid4
from asgi_correlation_id import correlation_id
from saq import Job, Queue
CID_TRANSFER_KEY = 'correlation_id'
async def before_enqueue(job: Job) -> None:
"""
Transfer the correlation ID from the current context to the worker.
This might be called from a web server or a worker process.
"""
job.meta[CID_TRANSFER_KEY] = correlation_id.get() or uuid4()
async def before_process(ctx: dict) -> None:
"""
Load correlation ID from the enqueueing process to this one.
"""
correlation_id.set(ctx['job'].meta.get(CID_TRANSFER_KEY, uuid4()))
async def after_process(ctx: dict) -> None:
"""
Reset correlation ID for this process.
"""
correlation_id.set(None)
queue = Queue(...)
queue.register_before_enqueue(before_enqueue)
priority_settings = {
...,
'queue': queue,
'before_process': before_process,
'after_process': after_process,
}
与hypercorn集成
要将关联ID添加到hypercorn日志中,您需要添加一个日志过滤器并更改日志格式。以下是一个如何配置hypercorn的示例,如果您正在运行FastAPI应用。
import logging
import os
from fastapi import APIRouter, FastAPI
from hypercorn.config import Config
from hypercorn.asyncio import serve
import asgi_correlation_id
import asyncio
import hypercorn
def configure_logging():
console_handler = logging.StreamHandler()
console_handler.addFilter(asgi_correlation_id.CorrelationIdFilter())
logging.basicConfig(
handlers=[console_handler],
level="INFO",
format="%(levelname)s log [%(correlation_id)s] %(name)s %(message)s")
app = FastAPI(on_startup=[configure_logging])
app.add_middleware(asgi_correlation_id.CorrelationIdMiddleware)
router = APIRouter()
@router.get("/test")
async def test_get():
print("toto")
logger = logging.getLogger()
logger.info("test_get")
app.include_router(router)
if __name__ == "__main__":
logConfig = {
"handlers": {
"hypercorn.access": {
"formatter": "hypercorn.access",
"level": "INFO",
"class": "logging.StreamHandler",
"stream": "ext://sys.stdout",
"filters": [
asgi_correlation_id.CorrelationIdFilter()
],
}},
"formatters": {
"hypercorn.access": {
"format": "%(message)s %(correlation_id)s",
}
},
"loggers": {
"hypercorn.access": {
"handlers": [
"hypercorn.access"
],
"level": "INFO",
},
},
"version": 1
}
config = Config()
# write access log to stdout
config.accesslog = "-"
config.logconfig_dict = logConfig
asyncio.run(serve(app, config))
# run it
$ python3 test.py
# test it:
$ curl https://127.0.0.1:8080/test
# log on stdout:
INFO log [7e7ccfff352a428991920d1da2502674] root test_get
127.0.0.1:34754 - - [14/Dec/2023:10:34:08 +0100] "GET /test 1.1" 200 4 "-" "curl/7.76.1" 7e7ccfff352a428991920d1da2502674
与Uvicorn集成
要将关联ID添加到uvicorn日志中,您需要添加一个日志过滤器并更改日志格式。以下是一个如何配置uvicorn的示例,如果您正在运行FastAPI应用。
import logging
import os
import asgi_correlation_id
import uvicorn
from fastapi import APIRouter, FastAPI
from uvicorn.config import LOGGING_CONFIG
def configure_logging():
console_handler = logging.StreamHandler()
console_handler.addFilter(asgi_correlation_id.CorrelationIdFilter())
logging.basicConfig(
handlers=[console_handler],
level="INFO",
format="%(levelname)s log [%(correlation_id)s] %(name)s %(message)s")
app = FastAPI(on_startup=[configure_logging])
app.add_middleware(asgi_correlation_id.CorrelationIdMiddleware)
router = APIRouter()
@router.get("/test")
async def test_get():
logger = logging.getLogger()
logger.info("test_get")
app.include_router(router)
if __name__ == "__main__":
LOGGING_CONFIG["handlers"]["access"]["filters"] = [asgi_correlation_id.CorrelationIdFilter()]
LOGGING_CONFIG["formatters"]["access"]["fmt"] = "%(levelname)s access [%(correlation_id)s] %(name)s %(message)s"
uvicorn.run("test:app", port=8080, log_level=os.environ.get("LOGLEVEL", "DEBUG").lower())
# run it
python test.py
# test it
curl https://127.0.0.1:8080/test
# log on stdout
INFO log [16b61d57f9ff4a85ac80f5cd406e0aa2] root test_get
INFO access [16b61d57f9ff4a85ac80f5cd406e0aa2] uvicorn.access 127.0.0.1:24810 - "GET /test HTTP/1.1" 200
扩展
除了中间件外,我们还为第三方包添加了一些扩展。
Sentry
如果您的项目已安装sentry-sdk,关联ID将自动作为transaction_id
添加到Sentry事件中。
请参阅这篇博客文章以获取更多细节。事务ID在Sentry的事件详细视图显示,这只是一个将日志连接到Sentry事件的简单方法。
Celery
注意:如果您使用Celery集成,请使用以下命令安装该包:
pip install asgi-correlation-id[celery]
对于Celery用户来说,有一个主要问题:工作进程作为完全独立的进程运行,因此当从请求中启动后台任务时,关联ID会丢失。
然而,通过一些Celery信号魔法,我们实际上可以将关联ID传输到工作进程,如下所示
@before_task_publish.connect()
def transfer_correlation_id(headers) -> None:
# This is called before task.delay() finishes
# Here we're able to transfer the correlation ID via the headers kept in our backend
headers[header_key] = correlation_id.get()
@task_prerun.connect()
def load_correlation_id(task) -> None:
# This is called when the worker picks up the task
# Here we're able to load the correlation ID from the headers
id_value = task.request.get(header_key)
correlation_id.set(id_value)
要配置关联ID传输,只需导入并运行该包提供的设置函数
from asgi_correlation_id.extensions.celery import load_correlation_ids
load_correlation_ids()
进一步——添加Celery跟踪ID
除了将请求ID传输到Celery工作进程外,我们还增加了一个日志过滤器,以提高Celery进程中的跟踪。这完全独立于关联ID功能,但我们自己也在使用它,所以将它保留在包中与其他信号一起。
该日志过滤器为每个工作进程添加一个ID,celery_current_id
,以及为启动它的进程添加一个ID,celery_parent_id
。
以下是不同场景输出的快速总结
场景 | 关联ID | Celery当前ID | Celery父ID |
---|---|---|---|
请求 | ✅ | ||
请求 -> 工作进程 | ✅ | ✅ | |
请求 -> 工作进程 -> 另一个工作进程 | ✅ | ✅ | ✅ |
定时器 -> 工作进程 | ✅* | ✅ | |
定时器 -> 工作进程 -> 工作进程 | ✅* | ✅ | ✅ |
*当我们从HTTP请求独立启动一个进程时,对于链中的第一个进程,仍然会生成一个关联ID并传递下去。您可以认为关联ID是一个源ID,而当前ID和父ID的组合是链接链的一种方式。
要添加当前ID和父ID,只需将您的celery.py
更改为以下内容
+ from asgi_correlation_id.extensions.celery import load_correlation_ids, load_celery_current_and_parent_ids
load_correlation_ids()
+ load_celery_current_and_parent_ids()
如果您希望通过在您的代理中找到的ID(即Celery的task_id
)关联Celery任务ID,请使用load_celery_current_and_parent_ids
上的use_internal_celery_task_id
参数
from asgi_correlation_id.extensions.celery import load_correlation_ids, load_celery_current_and_parent_ids
load_correlation_ids()
+ load_celery_current_and_parent_ids(use_internal_celery_task_id=True)
注意:当use_internal_celery_task_id
设置为True
时,load_celery_current_and_parent_ids
将忽略generator
参数
要设置额外的日志过滤器,请更新您的日志配置如下
LOGGING = {
'version': 1,
'disable_existing_loggers': False,
'filters': {
'correlation_id': {
+ '()': 'asgi_correlation_id.CorrelationIdFilter',
+ 'uuid_length': 32,
+ 'default_value': '-',
+ },
+ 'celery_tracing': {
+ '()': 'asgi_correlation_id.CeleryTracingIdsFilter',
+ 'uuid_length': 32,
+ 'default_value': '-',
+ },
},
'formatters': {
'web': {
'class': 'logging.Formatter',
'datefmt': '%H:%M:%S',
'format': '%(levelname)s ... [%(correlation_id)s] %(name)s %(message)s',
},
+ 'celery': {
+ 'class': 'logging.Formatter',
+ 'datefmt': '%H:%M:%S',
+ 'format': '%(levelname)s ... [%(correlation_id)s] [%(celery_parent_id)s-%(celery_current_id)s] %(name)s %(message)s',
+ },
},
'handlers': {
'web': {
'class': 'logging.StreamHandler',
'filters': ['correlation_id'],
'formatter': 'web',
},
+ 'celery': {
+ 'class': 'logging.StreamHandler',
+ 'filters': ['correlation_id', 'celery_tracing'],
+ 'formatter': 'celery',
+ },
},
'loggers': {
'my_project': {
+ 'handlers': ['celery' if any('celery' in i for i in sys.argv) else 'web'],
'level': 'DEBUG',
'propagate': True,
},
},
}
配置这些ID后,您应该能够
- 关联来自单个源的所有日志,
- 拼凑每条日志执行的顺序以及哪个进程启动了哪个进程。
示例
一切配置好后,假设您有一组这样的任务
@celery.task()
def debug_task() -> None:
logger.info('Debug task 1')
second_debug_task.delay()
second_debug_task.delay()
@celery.task()
def second_debug_task() -> None:
logger.info('Debug task 2')
third_debug_task.delay()
fourth_debug_task.delay()
@celery.task()
def third_debug_task() -> None:
logger.info('Debug task 3')
fourth_debug_task.delay()
fourth_debug_task.delay()
@celery.task()
def fourth_debug_task() -> None:
logger.info('Debug task 4')
那么您的日志可能看起来像这样
correlation-id current-id
| parent-id |
| | |
INFO [3b162382e1] [ - ] [93ddf3639c] project.tasks - Debug task 1
INFO [3b162382e1] [93ddf3639c] [24046ab022] project.tasks - Debug task 2
INFO [3b162382e1] [93ddf3639c] [cb5595a417] project.tasks - Debug task 2
INFO [3b162382e1] [24046ab022] [08f5428a66] project.tasks - Debug task 3
INFO [3b162382e1] [24046ab022] [32f40041c6] project.tasks - Debug task 4
INFO [3b162382e1] [cb5595a417] [1c75a4ed2c] project.tasks - Debug task 3
INFO [3b162382e1] [08f5428a66] [578ad2d141] project.tasks - Debug task 4
INFO [3b162382e1] [cb5595a417] [21b2ef77ae] project.tasks - Debug task 4
INFO [3b162382e1] [08f5428a66] [8cad7fc4d7] project.tasks - Debug task 4
INFO [3b162382e1] [1c75a4ed2c] [72a43319f0] project.tasks - Debug task 4
INFO [3b162382e1] [1c75a4ed2c] [ec3cf4113e] project.tasks - Debug task 4
项目详情
下载文件
下载适用于您的平台文件。如果您不确定选择哪个,请了解有关安装包的更多信息。
源分发
构建分发
asgi_correlation_id-4.3.3.tar.gz 的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 25d89b52f3d32c0f3b4915a9fc38d9cffc7395960d05910bdce5c13023dc237b |
|
MD5 | 9bf61fd3cf0a3a0accf228364b0a270a |
|
BLAKE2b-256 | be8115a920a166d38ce0685de33f3c5e64b86a3c112c7d646ed46d8ec1cb4047 |
asgi_correlation_id-4.3.3-py3-none-any.whl 的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 62ba38c359aa004c1c3e2b8e0cdf0e8ad4aa5a93864eaadc46e77d5c142a206a |
|
MD5 | 28ec47b0da940264cd5f3493a6046a06 |
|
BLAKE2b-256 | fee1aa53fea4e367c1ec0e69cb78c234dec4ec50d80ce02279e1ec647dea1a47 |