跳转到主要内容

data.gouv.fr的数据异步爬虫和解析服务

项目描述

udata-hydra 🦀

udata-hydra 是一个针对 data.gouv.fr 的异步元数据爬虫。

通过 aiohttp 爬取URL,将目录和爬取的元数据存储在 PostgreSQL 数据库中。

由于其名为 hydra,因此它还内置了神话般的力量

  • 分析远程资源元数据随时间的变化,以尽可能智能的方式检测变化
  • 如果远程资源是CSV,则将其转换为PostgreSQL表,以便API化
  • 将爬取和分析信息发送到udata实例

架构图

完整工作流程的架构如下

Full workflow architecture

hydra爬虫是架构的一部分。它将检查资源是否可用,分析文件类型(如果资源已修改),以及分析CSV内容。它还将CSV资源转换为数据库表,并将数据发送到udata实例。

Crawler architecture

依赖关系

此项目使用 libmagic,需要在您的系统上安装,例如

brew install libmagic 在MacOS上,或在linux上 sudo apt-get install libmagic-dev

命令行界面

创建数据库结构

安装udata-hydra依赖和CLI。 poetry install

poetry run udata-hydra migrate

从data.gouv.fr加载(UPSERT)最新目录版本

poetry run udata-hydra load-catalog

爬虫

poetry run udata-hydra-crawl

程序将根据在config.toml中设置的配置无限期地爬取目录,默认配置在udata_hydra/config_default.toml中。

在每次循环运行时,BATCH_SIZE个URL将被排队。

爬虫将从从未检查过的URL开始,然后继续检查SINCE间隔之前的URL。然后它将等待直到某些事情发生变化(目录或时间)。

存在一种按域名退避的机制。当在给定的批次的给定域名中,BACKOFF_NB_REQBACKOFF_PERIOD秒的周期内超过时,爬虫将等待。它将重试,直到退避被解除。

如果URL匹配EXCLUDED_PATTERNS之一,则永远不会进行检查。

工作进程

使用作业排队系统来处理长时间运行的任务。使用以下命令启动工作进程:

poetry run rq worker -c udata_hydra.worker

监控工作进程状态

poetry run rq info -c udata_hydra.worker --interval 1

CSV转换到数据库

转换后的CSV表将存储在通过config.DATABASE_URL_CSV指定的数据库中。对于测试,它与目录使用的数据库相同。在本地,docker compose将启动两个不同的数据库容器。

测试

要运行测试,需要启动数据库、测试数据库和Redis代理,命令为docker compose -f docker-compose.yml -f docker-compose.test.yml -f docker-compose.broker.yml up -d

然后可以使用poetry run pytest运行测试。

要运行特定的测试文件,可以将文件的路径传递给pytest,例如:poetry run pytest tests/test_app.py

要运行特定的测试函数,可以将文件的路径和函数的名称传递给pytest,例如:poetry run pytest tests/test_app.py::test_get_latest_check

如果您想看到执行时的打印语句,可以将-s标志传递给pytest(poetry run pytest -s)。然而,请注意,这有时可能难以解析。

测试覆盖率

pytest自动使用coverage包生成覆盖率报告,该报告在测试运行结束时在终端中显示。覆盖率在pypoject.toml文件中的[tool.pytest.ini_options]部分进行配置。您也可以在运行测试时通过传递一些标志(如--cov-report)来覆盖覆盖率报告配置。有关更多信息,请参阅pytest-cov文档

API

API需要在受保护端点(任何不是GET的端点)上的每个请求都需要一个Bearer令牌。该令牌在config.toml文件中配置为API_KEY,并在udata_hydra/config_default.toml文件中设置了默认值。

如果您将hydra用作外部服务以从udata接收资源事件,那么udata还需要在其udata.cfg文件中配置此API密钥。

# Wether udata should publish the resource events
PUBLISH_ON_RESOURCE_EVENTS = True
# Where to publish the events
RESOURCES_ANALYSER_URI = "http://localhost:8000"
# The API key that hydra needs
RESOURCES_ANALYSER_API_KEY = "api_key_to_change"

运行

poetry install
poetry run adev runserver udata_hydra/app.py

路由/端点

API提供以下端点

与检查相关

  • /api/checks/latest?url={url}&resource_id={resource_id}上使用GET获取给定URL和/或resource_id的最新检查
  • /api/checks/all?url={url}&resource_id={resource_id}上使用GET获取给定URL和/或resource_id的所有检查
  • /api/checks/aggregate?group_by={column}&created_at={date}上使用GET获取特定datecolumn分组的检查发生情况

与资源相关

  • /api/resource/{resource_id}上使用GET获取数据库中“目录”表的资源,其resource_id
  • 通过在 /api/resources 上执行 POST 请求,从源接收资源创建事件。它将在数据库 "catalog" 表中创建一个新的资源,并将其标记为下一次爬取的优先级。
  • /api/resources/{resource_id} 上执行 PUT 请求,以更新数据库 "catalog" 表中的资源。
  • /api/resources/{resource_id} 上执行 DELETE 请求,以删除数据库 "catalog" 表中的资源。

:warning: 警告:以下路由已过时,未来需要删除:

  • POST 请求在 /api/resource/created 上 -> 应使用 POST 请求在 /api/resources/ 上。
  • POST 请求在 /api/resource/updated 上 -> 应使用 PUT 请求在 /api/resources/ 上。
  • POST 请求在 /api/resource/deleted 上 -> 应使用 DELETE 请求在 /api/resources/ 上。

与资源异常相关

  • /api/resources-exceptions 上执行 GET 请求,以获取所有资源异常列表。
  • /api/resources-exceptions 上执行 POST 请求,以在数据库中创建新的资源异常。
  • /api/resources-exceptions/{resource_id} 上执行 PUT 请求,以更新数据库中的资源异常。
  • /api/resources-exceptions/{resource_id} 上执行 DELETE 请求,以从数据库中删除资源异常。

与某些状态和健康检查相关

  • /api/status/crawler 上执行 GET 请求,以获取爬取状态。
  • /api/status/worker 上执行 GET 请求,以获取工作状态。
  • /api/stats 上执行 GET 请求,以获取爬取统计信息。
  • /api/health 上执行 GET 请求,以获取 API 版本号和环境。

以下提供了关于一些端点的更多详细信息以及示例,但并非所有端点。

获取最新的检查

?url={url}?resource_id={resource_id} 一起使用。

$ curl -s "http://localhost:8000/api/checks/latest?url=http://opendata-sig.saintdenis.re/datasets/661e19974bcc48849bbff7c9637c5c28_1.csv" | json_pp
{
   "status" : 200,
   "catalog_id" : 64148,
   "deleted" : false,
   "error" : null,
   "created_at" : "2021-02-06T12:19:08.203055",
   "response_time" : 0.830198049545288,
   "url" : "http://opendata-sig.saintdenis.re/datasets/661e19974bcc48849bbff7c9637c5c28_1.csv",
   "domain" : "opendata-sig.saintdenis.re",
   "timeout" : false,
   "id" : 114750,
   "dataset_id" : "5c34944606e3e73d4a551889",
   "resource_id" : "b3678c59-5b35-43ad-9379-fce29e5b56fe",
   "headers" : {
      "content-disposition" : "attachment; filename=\"xn--Dlimitation_des_cantons-bcc.csv\"",
      "server" : "openresty",
      "x-amz-meta-cachetime" : "191",
      "last-modified" : "Wed, 29 Apr 2020 02:19:04 GMT",
      "content-encoding" : "gzip",
      "content-type" : "text/csv",
      "cache-control" : "must-revalidate",
      "etag" : "\"20415964703d9ccc4815d7126aa3a6d8\"",
      "content-length" : "207",
      "date" : "Sat, 06 Feb 2021 12:19:08 GMT",
      "x-amz-meta-contentlastmodified" : "2018-11-19T09:38:28.490Z",
      "connection" : "keep-alive",
      "vary" : "Accept-Encoding"
   }
}

获取特定 URL 或资源的所有检查

?url={url}?resource_id={resource_id} 一起使用。

$ curl -s "http://localhost:8000/api/checks/all?url=http://www.drees.sante.gouv.fr/IMG/xls/er864.xls" | json_pp
[
   {
      "domain" : "www.drees.sante.gouv.fr",
      "dataset_id" : "53d6eadba3a72954d9dd62f5",
      "timeout" : false,
      "deleted" : false,
      "response_time" : null,
      "error" : "Cannot connect to host www.drees.sante.gouv.fr:443 ssl:True [SSLCertVerificationError: (1, \"[SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: Hostname mismatch, certificate is not valid for 'www.drees.sante.gouv.fr'. (_ssl.c:1122)\")]",
      "catalog_id" : 232112,
      "url" : "http://www.drees.sante.gouv.fr/IMG/xls/er864.xls",
      "headers" : {},
      "id" : 165107,
      "created_at" : "2021-02-06T14:32:47.675854",
      "resource_id" : "93dfd449-9d26-4bb0-a6a9-ee49b1b8a4d7",
      "status" : null
   },
   {
      "timeout" : false,
      "deleted" : false,
      "response_time" : null,
      "error" : "Cannot connect to host www.drees.sante.gouv.fr:443 ssl:True [SSLCertVerificationError: (1, \"[SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: Hostname mismatch, certificate is not valid for 'www.drees.sante.gouv.fr'. (_ssl.c:1122)\")]",
      "domain" : "www.drees.sante.gouv.fr",
      "dataset_id" : "53d6eadba3a72954d9dd62f5",
      "created_at" : "2020-12-24T17:06:58.158125",
      "resource_id" : "93dfd449-9d26-4bb0-a6a9-ee49b1b8a4d7",
      "status" : null,
      "catalog_id" : 232112,
      "url" : "http://www.drees.sante.gouv.fr/IMG/xls/er864.xls",
      "headers" : {},
      "id" : 65092
   }
]

按列对特定日期的检查进行分组

?group_by={column}?created_at={date} 一起使用。 date 应为 YYYY-MM-DD 格式的日期或默认关键字 today

$ curl -s "http://localhost:8000/api/checks/aggregate?group_by=domain&created_at=today" | json_pp
[
  {
    "value": "www.geo2france.fr",
    "count": 4
  },
  {
    "value": "static.data.gouv.fr",
    "count": 4
  },
  {
    "value": "grandestprod.data4citizen.com",
    "count": 3
  },
  {
    "value": "www.datasud.fr",
    "count": 2
  },
  {
    "value": "koumoul.com",
    "count": 2
  },
  {
    "value": "opendata.aude.fr",
    "count": 2
  },
  {
    "value": "departement-ain.opendata.arcgis.com",
    "count": 2
  },
  {
    "value": "opendata.agglo-larochelle.fr",
    "count": 1
  }
]

添加资源异常

$ curl  -X POST http://localhost:8000/api/resources-exceptions
        -H "Authorization: Bearer <myAPIkey>"
        -d "{'resource_id': 'f868cca6-8da1-4369-a78d-47463f19a9a3', 'table_indexes': {'SIRET': "index", "immatriculation": "index"}}"

...或者,如果您不想添加表索引

$ curl  -X POST http://localhost:8000/api/resources-exceptions
        -H "Authorization: Bearer <myAPIkey>"
        -d "{'resource_id': 'f868cca6-8da1-4369-a78d-47463f19a9a3'}"

更新资源异常

$ curl  -X PUT http://localhost:8000/api/resources-exceptions/f868cca6-8da1-4369-a78d-47463f19a9a3
        -H "Authorization: Bearer <myAPIkey>"
         -d "{'table_indexes': {'SIRET': "index", "immatriculation": "index"}}"

删除资源异常

$ curl  -X DELETE http://localhost:8000/api/resources-exceptions/f868cca6-8da1-4369-a78d-47463f19a9a3
        -H "Authorization: Bearer <myAPIkey>"

获取爬取状态

$ curl -s "http://localhost:8000/api/status/crawler" | json_pp
{
   "fresh_checks_percentage" : 0.4,
   "pending_checks" : 142153,
   "total" : 142687,
   "fresh_checks" : 534,
   "checks_percentage" : 0.4
}

获取工作状态

$ curl -s "http://localhost:8000/api/status/worker" | json_pp
{
   "queued" : {
      "default" : 0,
      "high" : 825,
      "low" : 655
   }
}

获取爬取统计信息

$ curl -s "http://localhost:8000/api/stats" | json_pp
{
   "status" : [
      {
         "count" : 525,
         "percentage" : 98.3,
         "label" : "ok"
      },
      {
         "label" : "error",
         "percentage" : 1.3,
         "count" : 7
      },
      {
         "label" : "timeout",
         "percentage" : 0.4,
         "count" : 2
      }
   ],
   "status_codes" : [
      {
         "code" : 200,
         "count" : 413,
         "percentage" : 78.7
      },
      {
         "code" : 501,
         "percentage" : 12.4,
         "count" : 65
      },
      {
         "percentage" : 6.1,
         "count" : 32,
         "code" : 404
      },
      {
         "code" : 500,
         "percentage" : 2.7,
         "count" : 14
      },
      {
         "code" : 502,
         "count" : 1,
         "percentage" : 0.2
      }
   ]
}

使用 Webhook 集成

** 设置配置值**

创建一个 config.toml 文件,其中包含您启动的服务和命令,或通过 HYDRA_SETTINGS 环境变量指定 TOML 文件的路径。 config.toml 或等效文件将覆盖 udata_hydra/config_default.toml 中的值,可在那里查找需要或可以定义的值。

UDATA_URI = "https://dev.local:7000/api/2"
UDATA_URI_API_KEY = "example.api.key"
SENTRY_DSN = "https://{my-sentry-dsn}"

Webhook 集成在资源被分析或检查以填充资源额外信息时向 udata 发送 HTTP 消息。

关于分析,有一个名为 "更改检测" 的阶段。它将尝试根据不同的标准猜测资源是否已被修改。

  • 在目录中获取修改日期
  • 内容长度和最后修改头信息
  • 随着时间的推移进行校验和比较

负载应类似于以下内容

{
   "analysis:content-length": 91661,
   "analysis:mime-type": "application/zip",
   "analysis:checksum": "bef1de04601dedaf2d127418759b16915ba083be",
   "analysis:last-modified-at": "2022-11-27T23:00:54.762000",
   "analysis:last-modified-detection": "harvest-resource-metadata",
}

开发

docker compose

提供了多个 docker-compose 文件

  • 一个最小的 docker-compose.yml,包含两个 PostgreSQL 容器(一个用于目录和元数据,另一个用于将转换后的 CSV 数据库化)
  • docker-compose.broker.yml 添加了一个 Redis 中间件
  • docker-compose.test.yml 启动了一个测试数据库,用于运行测试

注意:您可以通过这种方式从多个文件启动 compose: docker compose -f docker-compose.yml -f docker-compose.test.yml up

日志记录和调试

可以通过环境变量 LOG_LEVEL 调整日志级别。例如,在初始化数据库时将日志级别设置为 DEBUG,使用 LOG_LEVEL="DEBUG" udata-hydra init_db

编写迁移

  1. 添加一个名为 migrations/{YYYYMMDD}_{description}.sql 的文件,并编写您需要执行的 SQL 迁移。
  2. udata-hydra migrate 将根据需要迁移数据库。

部署

需要部署 3 个服务才能运行完整堆栈

  • worker
  • api / app
  • crawler

参考每个部分以了解如何启动它们。开发和生产环境之间的唯一区别是

  • 使用 HYDRA_SETTINGS 环境变量来指向您的自定义 config.toml
  • 使用 HYDRA_APP_SOCKET_PATH 来配置 aiohttp 应该监听的 反向代理连接(例如 nginx),并使用 udata-hydra-app 来启动应用程序服务器

贡献

在向仓库贡献并提交任何 PR 之前,初始化预提交钩子是必要的

pre-commit install

完成此操作后,代码格式化、代码检查以及导入排序将在每次提交之前自动检查。

如果您无法使用 pre-commit,则必须在提交之前使用 Ruff 格式化、检查和排序导入

ruff check --fix .
ruff format .

项目详情


发布历史 发布通知 | RSS 源

下载文件

下载适合您平台的文件。如果您不确定选择哪个,请了解有关 安装包 的更多信息。

源分布

udata_hydra-2.0.1.tar.gz (46.5 kB 查看哈希值)

上传

构建分布

udata_hydra-2.0.1-py3-none-any.whl (61.1 kB 查看哈希值)

上传 Python 3

支持者

AWS AWS 云计算和安全赞助商 Datadog Datadog 监控 Fastly Fastly CDN Google Google 下载分析 Microsoft Microsoft PSF 赞助商 Pingdom Pingdom 监控 Sentry Sentry 错误记录 StatusPage StatusPage 状态页面