跳转到主要内容

中间件将项目日志关联到单个请求

项目描述

pypi test codecov

ASGI关联ID中间件

用于读取或生成每个传入请求的关联ID的中间件。然后可以将关联ID添加到您的日志中,从而可以轻松检索单个HTTP请求生成的所有日志。

当中间件检测到入站请求中的关联ID HTTP头时,该ID将被存储。如果没有找到头,则将为请求生成一个关联ID。

默认情况下,中间件会检查X-Request-ID头,但可以设置为任何键。X-Correlation-ID也相当常用。

示例

一旦配置了日志记录,您的输出将从这个

INFO    ... project.views  This is an info log
WARNING ... project.models This is a warning log
INFO    ... project.views  This is an info log
INFO    ... project.views  This is an info log
WARNING ... project.models This is a warning log
WARNING ... project.models This is a warning log

变为这个

INFO    ... [773fa6885] project.views  This is an info log
WARNING ... [773fa6885] project.models This is a warning log
INFO    ... [0d1c3919e] project.views  This is an info log
INFO    ... [99d44111e] project.views  This is an info log
WARNING ... [0d1c3919e] project.models This is a warning log
WARNING ... [99d44111e] project.models This is a warning log

现在我们可以真正看到哪些日志是相关的。

安装

pip install asgi-correlation-id

设置

为了设置包,您需要添加中间件并配置日志。

添加中间件

中间件可以添加如下

from fastapi import FastAPI

from asgi_correlation_id import CorrelationIdMiddleware

app = FastAPI()
app.add_middleware(CorrelationIdMiddleware)

或者您框架允许的任何其他方式。

对于Starlette应用程序,只需在所有示例中将FastAPI替换为Starlette

配置日志

本节假设您已经在项目中开始配置日志。如果不是这种情况,请查看从头开始设置日志的部分。

要设置关联ID的日志记录,您只需添加包提供的日志过滤器即可。

如果您的当前日志配置看起来像这样

LOGGING = {
    'version': 1,
    'disable_existing_loggers': False,
    'formatters': {
        'web': {
            'class': 'logging.Formatter',
            'datefmt': '%H:%M:%S',
            'format': '%(levelname)s ... %(name)s %(message)s',
        },
    },
    'handlers': {
        'web': {
            'class': 'logging.StreamHandler',
            'formatter': 'web',
        },
    },
    'loggers': {
        'my_project': {
            'handlers': ['web'],
            'level': 'DEBUG',
            'propagate': True,
        },
    },
}

您只需添加过滤器,如下所示

LOGGING = {
    'version': 1,
    'disable_existing_loggers': False,
+   'filters': {
+       'correlation_id': {
+           '()': 'asgi_correlation_id.CorrelationIdFilter',
+           'uuid_length': 32,
+           'default_value': '-',
+       },
+   },
    'formatters': {
        'web': {
            'class': 'logging.Formatter',
            'datefmt': '%H:%M:%S',
+           'format': '%(levelname)s ... [%(correlation_id)s] %(name)s %(message)s',
        },
    },
    'handlers': {
        'web': {
            'class': 'logging.StreamHandler',
+           'filters': ['correlation_id'],
            'formatter': 'web',
        },
    },
    'loggers': {
        'my_project': {
            'handlers': ['web'],
            'level': 'DEBUG',
            'propagate': True,
        },
    },
}

如果您使用json日志格式化程序,只需将correlation-id: %(correlation_id)s添加到您的属性列表中。

中间件配置

中间件可以通过几种方式配置,但没有必需的参数。

app.add_middleware(
    CorrelationIdMiddleware,
    header_name='X-Request-ID',
    update_request_header=True,
    generator=lambda: uuid4().hex,
    validator=is_valid_uuid4,
    transformer=lambda a: a,
)

可配置的中间件参数包括

header_name

  • 类型: str
  • 默认值: X-Request-ID
  • 描述: 头名称决定从哪个HTTP头读取关联ID的值。X-Request-IDX-Correlation-ID是常见的选择。

update_request_header

  • 类型: bool
  • 默认值: True
  • 描述: 是否使用生成的关联ID更新入站请求的头值。这是为了支持依赖于请求头存在的情况(如各种跟踪中间件)。

generator

  • 类型: Callable[[], str]
  • 默认值: lambda: uuid4().hex
  • 描述: 生成器函数负责在未从入站请求的头部接收ID时生成新的关联ID。我们默认使用UUIDs,但您可以选择使用如nanoid或您自己的自定义函数。

validator

  • 类型: Callable[[str], bool]
  • 默认值: is_valid_uuid4 ( found here)
  • 描述: 验证器函数用于读取入站HTTP头值。默认情况下,我们丢弃非UUID格式的头值,以强制关联ID的唯一性。如果您希望允许任何头值,可以将此设置设置为None,或传递自己的验证器。

transformer

  • 类型: Callable[[str], str]
  • 默认值: lambda a: a
  • 描述: 大多数用户不需要转换器,默认情况下我们不执行任何操作。该参数添加是为了满足用户可能希望以某种方式更改入站或生成的ID值的情况。它提供了一种按您认为合适的方式转换入站ID的机制。有关更多上下文,请参阅中间件代码。

CORS

如果您正在使用跨源资源共享(CORS),例如,您从不同的源提供的前端JavaScript代码中对API进行请求,您必须确保以下两点

  • 允许在入站请求的HTTP头中包含关联ID头,以便中间件可以重复使用该值,
  • 在响应的HTTP头中将关联ID头添加到允许列表中,以便浏览器可以访问。

最好通过为所选框架使用专用中间件来实现这一点。以下是一些示例。

Starlette

文档:https://www.starlette.io/middleware/#corsmiddleware

from starlette.applications import Starlette
from starlette.middleware import Middleware
from starlette.middleware.cors import CORSMiddleware


middleware = [
    Middleware(
        CORSMiddleware,
        allow_origins=['*'],
        allow_methods=['*'],
        allow_headers=['X-Requested-With', 'X-Request-ID'],
        expose_headers=['X-Request-ID']
    )
]

app = Starlette(..., middleware=middleware)

FastAPI

文档:https://fastapi.org.cn/tutorial/cors/

from app.main import app
from fastapi.middleware.cors import CORSMiddleware


app.add_middleware(
    CORSMiddleware,
    allow_origins=['*'],
    allow_methods=['*'],
    allow_headers=['X-Requested-With', 'X-Request-ID'],
    expose_headers=['X-Request-ID']
)

有关此主题的更多详细信息,请参阅CORS协议

异常处理

默认情况下,X-Request-ID响应头将包含在所有来自服务器的响应中,除了未处理的服务器错误的情况。如果您希望在500错误的情况下包含请求ID,您可以添加一个自定义异常处理程序。

以下是一些简单的示例,以帮助您开始。有关更多信息,请参阅每个框架的文档。

Starlette

文档:https://www.starlette.io/exceptions/

from starlette.requests import Request
from starlette.responses import PlainTextResponse
from starlette.applications import Starlette

from asgi_correlation_id import correlation_id


async def custom_exception_handler(request: Request, exc: Exception) -> PlainTextResponse:
    return PlainTextResponse(
        "Internal Server Error",
        status_code=500,
        headers={'X-Request-ID': correlation_id.get() or ""}
    )


app = Starlette(
    ...,
    exception_handlers={500: custom_exception_handler}
)

FastAPI

文档:https://fastapi.org.cn/tutorial/handling-errors/

from app.main import app
from fastapi import HTTPException, Request
from fastapi.exception_handlers import http_exception_handler
from fastapi.responses import JSONResponse

from asgi_correlation_id import correlation_id


@app.exception_handler(Exception)
async def unhandled_exception_handler(request: Request, exc: Exception) -> JSONResponse:
    return await http_exception_handler(
        request,
        HTTPException(
            500,
            'Internal server error',
            headers={'X-Request-ID': correlation_id.get() or ""}
        ))

如果您使用CORS,您还必须在错误响应中包含Access-Control-Allow-OriginAccess-Control-Expose-Headers头。有关更多详细信息,请参阅上面的CORS部分

从头开始设置日志记录

如果您的项目尚未配置日志记录,本节将解释如何开始。如果您想要更多细节,请查看这篇博客文章

Python文档说明了您可以使用一些配置函数进行简单的设置。对于此示例,我们将使用dictConfig,因为例如,Django用户应该会发现它最熟悉,但是不同的配置方法是可互换的,所以如果您想使用另一种方法,只需浏览Python文档,并根据您的需要更改配置方法。

dictConfig的好处是它允许您在单个数据结构中指定整个日志配置,并允许您向其中添加条件逻辑。以下示例显示了如何设置控制台和JSON日志记录。

from logging.config import dictConfig

from app.core.config import settings


def configure_logging() -> None:
    dictConfig(
        {
            'version': 1,
            'disable_existing_loggers': False,
            'filters': {  # correlation ID filter must be added here to make the %(correlation_id)s formatter work
                'correlation_id': {
                    '()': 'asgi_correlation_id.CorrelationIdFilter',
                    'uuid_length': 8 if not settings.ENVIRONMENT == 'local' else 32,
                    'default_value': '-',
                },
            },
            'formatters': {
                'console': {
                    'class': 'logging.Formatter',
                    'datefmt': '%H:%M:%S',
                    # formatter decides how our console logs look, and what info is included.
                    # adding %(correlation_id)s to this format is what make correlation IDs appear in our logs
                    'format': '%(levelname)s:\t\b%(asctime)s %(name)s:%(lineno)d [%(correlation_id)s] %(message)s',
                },
            },
            'handlers': {
                'console': {
                    'class': 'logging.StreamHandler',
                    # Filter must be declared in the handler, otherwise it won't be included
                    'filters': ['correlation_id'],
                    'formatter': 'console',
                },
            },
            # Loggers can be specified to set the log-level to log, and which handlers to use
            'loggers': {
                # project logger
                'app': {'handlers': ['console'], 'level': 'DEBUG', 'propagate': True},
                # third-party package loggers
                'databases': {'handlers': ['console'], 'level': 'WARNING'},
                'httpx': {'handlers': ['console'], 'level': 'INFO'},
                'asgi_correlation_id': {'handlers': ['console'], 'level': 'WARNING'},
            },
        }
    )

如果将日志配置定义在像这样的函数中,您只需确保以某种方式在启动时运行该函数即可,日志记录应该会对您起作用。您可以选择任何方式来做这件事,但是将它们传递给FastAPI.on_startup可调用对象列表是一个很好的起点。

与structlog集成

structlog是一个Python库,它使结构化日志记录成为可能。

它可以使用asgi_correlation_id轻松配置。

import logging
from typing import Any

import structlog
from asgi_correlation_id import correlation_id


def add_correlation(
    logger: logging.Logger, method_name: str, event_dict: dict[str, Any]
) -> dict[str, Any]:
    """Add request id to log message."""
    if request_id := correlation_id.get():
        event_dict["request_id"] = request_id
    return event_dict


structlog.configure(
    processors=[
        add_correlation,
        structlog.stdlib.filter_by_level,
        structlog.stdlib.add_logger_name,
        structlog.stdlib.add_log_level,
        structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M.%S"),
        structlog.processors.StackInfoRenderer(),
        structlog.processors.format_exc_info,
        structlog.processors.JSONRenderer(),
    ],
    wrapper_class=structlog.stdlib.BoundLogger,
    logger_factory=structlog.stdlib.LoggerFactory(),
    cache_logger_on_first_use=True,
)

SAQ集成

如果您使用saq,您可以使用库提供的事件钩子轻松地将请求ID从Web服务器传输到您的工作者。

from uuid import uuid4

from asgi_correlation_id import correlation_id
from saq import Job, Queue


CID_TRANSFER_KEY = 'correlation_id'


async def before_enqueue(job: Job) -> None:
    """
    Transfer the correlation ID from the current context to the worker.

    This might be called from a web server or a worker process.
    """
    job.meta[CID_TRANSFER_KEY] = correlation_id.get() or uuid4()


async def before_process(ctx: dict) -> None:
    """
    Load correlation ID from the enqueueing process to this one.
    """
    correlation_id.set(ctx['job'].meta.get(CID_TRANSFER_KEY, uuid4()))


async def after_process(ctx: dict) -> None:
    """
    Reset correlation ID for this process.
    """
    correlation_id.set(None)

queue = Queue(...)
queue.register_before_enqueue(before_enqueue)

priority_settings = {
    ...,
    'queue': queue,
    'before_process': before_process,
    'after_process': after_process,
}

hypercorn集成

要将关联ID添加到hypercorn日志中,您需要添加一个日志过滤器并更改日志格式。以下是一个如何配置hypercorn的示例,如果您正在运行FastAPI应用。

import logging
import os

from fastapi import APIRouter, FastAPI
from hypercorn.config import Config
from hypercorn.asyncio import serve
import asgi_correlation_id
import asyncio
import hypercorn


def configure_logging():
    console_handler = logging.StreamHandler()
    console_handler.addFilter(asgi_correlation_id.CorrelationIdFilter())
    logging.basicConfig(
        handlers=[console_handler],
        level="INFO",
	format="%(levelname)s log [%(correlation_id)s] %(name)s %(message)s")


app = FastAPI(on_startup=[configure_logging])
app.add_middleware(asgi_correlation_id.CorrelationIdMiddleware)
router = APIRouter()


@router.get("/test")
async def test_get():
    print("toto")
    logger = logging.getLogger()
    logger.info("test_get")


app.include_router(router)


if __name__ == "__main__":
    logConfig = {
        "handlers": {
            "hypercorn.access": {
                "formatter": "hypercorn.access",
                "level": "INFO",
                "class": "logging.StreamHandler",
                "stream": "ext://sys.stdout",
                "filters": [
                    asgi_correlation_id.CorrelationIdFilter()
                ],
        }},
        "formatters": {
            "hypercorn.access": {
                "format": "%(message)s %(correlation_id)s",
            }
        },
        "loggers": {
            "hypercorn.access": {
                "handlers": [
                    "hypercorn.access"
                ],
                "level": "INFO",
            },
        },
        "version": 1
    }

    config = Config()
    # write access log to stdout
    config.accesslog = "-"

    config.logconfig_dict = logConfig
    asyncio.run(serve(app, config))
# run it
$ python3 test.py

# test it:
$ curl https://127.0.0.1:8080/test

# log on stdout:
INFO log [7e7ccfff352a428991920d1da2502674] root test_get
127.0.0.1:34754 - - [14/Dec/2023:10:34:08 +0100] "GET /test 1.1" 200 4 "-" "curl/7.76.1" 7e7ccfff352a428991920d1da2502674

Uvicorn集成

要将关联ID添加到uvicorn日志中,您需要添加一个日志过滤器并更改日志格式。以下是一个如何配置uvicorn的示例,如果您正在运行FastAPI应用。

import logging
import os

import asgi_correlation_id
import uvicorn
from fastapi import APIRouter, FastAPI
from uvicorn.config import LOGGING_CONFIG


def configure_logging():
    console_handler = logging.StreamHandler()
    console_handler.addFilter(asgi_correlation_id.CorrelationIdFilter())
    logging.basicConfig(
        handlers=[console_handler],
        level="INFO",
        format="%(levelname)s log [%(correlation_id)s] %(name)s %(message)s")


app = FastAPI(on_startup=[configure_logging])
app.add_middleware(asgi_correlation_id.CorrelationIdMiddleware)
router = APIRouter()


@router.get("/test")
async def test_get():
    logger = logging.getLogger()
    logger.info("test_get")


app.include_router(router)


if __name__ == "__main__":
    LOGGING_CONFIG["handlers"]["access"]["filters"] = [asgi_correlation_id.CorrelationIdFilter()]
    LOGGING_CONFIG["formatters"]["access"]["fmt"] = "%(levelname)s access [%(correlation_id)s] %(name)s %(message)s"
    uvicorn.run("test:app", port=8080, log_level=os.environ.get("LOGLEVEL", "DEBUG").lower())
# run it
python test.py

# test it
curl https://127.0.0.1:8080/test

# log on stdout
INFO log [16b61d57f9ff4a85ac80f5cd406e0aa2] root test_get
INFO access [16b61d57f9ff4a85ac80f5cd406e0aa2] uvicorn.access 127.0.0.1:24810 - "GET /test HTTP/1.1" 200

扩展

除了中间件外,我们还为第三方包添加了一些扩展。

Sentry

如果您的项目已安装sentry-sdk,关联ID将自动作为transaction_id添加到Sentry事件中。

请参阅这篇博客文章以获取更多细节。事务ID在Sentry的事件详细视图显示,这只是一个将日志连接到Sentry事件的简单方法。

Celery

注意:如果您使用Celery集成,请使用以下命令安装该包:pip install asgi-correlation-id[celery]

对于Celery用户来说,有一个主要问题:工作进程作为完全独立的进程运行,因此当从请求中启动后台任务时,关联ID会丢失。

然而,通过一些Celery信号魔法,我们实际上可以将关联ID传输到工作进程,如下所示

@before_task_publish.connect()
def transfer_correlation_id(headers) -> None:
    # This is called before task.delay() finishes
    # Here we're able to transfer the correlation ID via the headers kept in our backend
    headers[header_key] = correlation_id.get()


@task_prerun.connect()
def load_correlation_id(task) -> None:
    # This is called when the worker picks up the task
    # Here we're able to load the correlation ID from the headers
    id_value = task.request.get(header_key)
    correlation_id.set(id_value)

要配置关联ID传输,只需导入并运行该包提供的设置函数

from asgi_correlation_id.extensions.celery import load_correlation_ids

load_correlation_ids()

进一步——添加Celery跟踪ID

除了将请求ID传输到Celery工作进程外,我们还增加了一个日志过滤器,以提高Celery进程中的跟踪。这完全独立于关联ID功能,但我们自己也在使用它,所以将它保留在包中与其他信号一起。

该日志过滤器为每个工作进程添加一个ID,celery_current_id,以及为启动它的进程添加一个ID,celery_parent_id

以下是不同场景输出的快速总结

场景 关联ID Celery当前ID Celery父ID
请求
请求 -> 工作进程
请求 -> 工作进程 -> 另一个工作进程
定时器 -> 工作进程 ✅*
定时器 -> 工作进程 -> 工作进程 ✅*

*当我们从HTTP请求独立启动一个进程时,对于链中的第一个进程,仍然会生成一个关联ID并传递下去。您可以认为关联ID是一个源ID,而当前ID和父ID的组合是链接链的一种方式。

要添加当前ID和父ID,只需将您的celery.py更改为以下内容

+ from asgi_correlation_id.extensions.celery import load_correlation_ids, load_celery_current_and_parent_ids

load_correlation_ids()
+ load_celery_current_and_parent_ids()

如果您希望通过在您的代理中找到的ID(即Celery的task_id)关联Celery任务ID,请使用load_celery_current_and_parent_ids上的use_internal_celery_task_id参数

from asgi_correlation_id.extensions.celery import load_correlation_ids, load_celery_current_and_parent_ids

load_correlation_ids()
+ load_celery_current_and_parent_ids(use_internal_celery_task_id=True)

注意:当use_internal_celery_task_id设置为True时,load_celery_current_and_parent_ids将忽略generator参数

要设置额外的日志过滤器,请更新您的日志配置如下

LOGGING = {
    'version': 1,
    'disable_existing_loggers': False,
    'filters': {
        'correlation_id': {
+           '()': 'asgi_correlation_id.CorrelationIdFilter',
+           'uuid_length': 32,
+           'default_value': '-',
+       },
+       'celery_tracing': {
+            '()': 'asgi_correlation_id.CeleryTracingIdsFilter',
+            'uuid_length': 32,
+            'default_value': '-',
+       },
    },
    'formatters': {
        'web': {
            'class': 'logging.Formatter',
            'datefmt': '%H:%M:%S',
            'format': '%(levelname)s ... [%(correlation_id)s] %(name)s %(message)s',
        },
+       'celery': {
+           'class': 'logging.Formatter',
+           'datefmt': '%H:%M:%S',
+           'format': '%(levelname)s ... [%(correlation_id)s] [%(celery_parent_id)s-%(celery_current_id)s] %(name)s %(message)s',
+       },
    },
    'handlers': {
        'web': {
            'class': 'logging.StreamHandler',
            'filters': ['correlation_id'],
            'formatter': 'web',
        },
+       'celery': {
+           'class': 'logging.StreamHandler',
+           'filters': ['correlation_id', 'celery_tracing'],
+           'formatter': 'celery',
+       },
    },
    'loggers': {
        'my_project': {
+           'handlers': ['celery' if any('celery' in i for i in sys.argv) else 'web'],
            'level': 'DEBUG',
            'propagate': True,
        },
    },
}

配置这些ID后,您应该能够

  1. 关联来自单个源的所有日志,
  2. 拼凑每条日志执行的顺序以及哪个进程启动了哪个进程。

示例

一切配置好后,假设您有一组这样的任务

@celery.task()
def debug_task() -> None:
    logger.info('Debug task 1')
    second_debug_task.delay()
    second_debug_task.delay()


@celery.task()
def second_debug_task() -> None:
    logger.info('Debug task 2')
    third_debug_task.delay()
    fourth_debug_task.delay()


@celery.task()
def third_debug_task() -> None:
    logger.info('Debug task 3')
    fourth_debug_task.delay()
    fourth_debug_task.delay()


@celery.task()
def fourth_debug_task() -> None:
    logger.info('Debug task 4')

那么您的日志可能看起来像这样

   correlation-id               current-id
          |        parent-id        |
          |            |            |
INFO [3b162382e1] [    -     ] [93ddf3639c] project.tasks - Debug task 1
INFO [3b162382e1] [93ddf3639c] [24046ab022] project.tasks - Debug task 2
INFO [3b162382e1] [93ddf3639c] [cb5595a417] project.tasks - Debug task 2
INFO [3b162382e1] [24046ab022] [08f5428a66] project.tasks - Debug task 3
INFO [3b162382e1] [24046ab022] [32f40041c6] project.tasks - Debug task 4
INFO [3b162382e1] [cb5595a417] [1c75a4ed2c] project.tasks - Debug task 3
INFO [3b162382e1] [08f5428a66] [578ad2d141] project.tasks - Debug task 4
INFO [3b162382e1] [cb5595a417] [21b2ef77ae] project.tasks - Debug task 4
INFO [3b162382e1] [08f5428a66] [8cad7fc4d7] project.tasks - Debug task 4
INFO [3b162382e1] [1c75a4ed2c] [72a43319f0] project.tasks - Debug task 4
INFO [3b162382e1] [1c75a4ed2c] [ec3cf4113e] project.tasks - Debug task 4

项目详情


下载文件

下载适用于您的平台文件。如果您不确定选择哪个,请了解有关安装包的更多信息。

源分发

asgi_correlation_id-4.3.3.tar.gz (20.1 kB 查看哈希值)

上传时间 源代码

构建分发

asgi_correlation_id-4.3.3-py3-none-any.whl (15.3 kB 查看哈希值)

上传时间 Python 3

支持