data.gouv.fr的数据异步爬虫和解析服务
项目描述
udata-hydra 🦀
udata-hydra
是一个针对 data.gouv.fr 的异步元数据爬虫。
通过 aiohttp 爬取URL,将目录和爬取的元数据存储在 PostgreSQL 数据库中。
由于其名为 hydra,因此它还内置了神话般的力量
- 分析远程资源元数据随时间的变化,以尽可能智能的方式检测变化
- 如果远程资源是CSV,则将其转换为PostgreSQL表,以便API化
- 将爬取和分析信息发送到udata实例
架构图
完整工作流程的架构如下
hydra爬虫是架构的一部分。它将检查资源是否可用,分析文件类型(如果资源已修改),以及分析CSV内容。它还将CSV资源转换为数据库表,并将数据发送到udata实例。
依赖关系
此项目使用 libmagic
,需要在您的系统上安装,例如
brew install libmagic
在MacOS上,或在linux上 sudo apt-get install libmagic-dev
命令行界面
创建数据库结构
安装udata-hydra依赖和CLI。 poetry install
poetry run udata-hydra migrate
从data.gouv.fr加载(UPSERT)最新目录版本
poetry run udata-hydra load-catalog
爬虫
poetry run udata-hydra-crawl
程序将根据在config.toml
中设置的配置无限期地爬取目录,默认配置在udata_hydra/config_default.toml
中。
在每次循环运行时,BATCH_SIZE
个URL将被排队。
爬虫将从从未检查过的URL开始,然后继续检查SINCE
间隔之前的URL。然后它将等待直到某些事情发生变化(目录或时间)。
存在一种按域名退避的机制。当在给定的批次的给定域名中,BACKOFF_NB_REQ
在BACKOFF_PERIOD
秒的周期内超过时,爬虫将等待。它将重试,直到退避被解除。
如果URL匹配EXCLUDED_PATTERNS
之一,则永远不会进行检查。
工作进程
使用作业排队系统来处理长时间运行的任务。使用以下命令启动工作进程:
poetry run rq worker -c udata_hydra.worker
监控工作进程状态
poetry run rq info -c udata_hydra.worker --interval 1
CSV转换到数据库
转换后的CSV表将存储在通过config.DATABASE_URL_CSV
指定的数据库中。对于测试,它与目录使用的数据库相同。在本地,docker compose
将启动两个不同的数据库容器。
测试
要运行测试,需要启动数据库、测试数据库和Redis代理,命令为docker compose -f docker-compose.yml -f docker-compose.test.yml -f docker-compose.broker.yml up -d
。
然后可以使用poetry run pytest
运行测试。
要运行特定的测试文件,可以将文件的路径传递给pytest,例如:poetry run pytest tests/test_app.py
。
要运行特定的测试函数,可以将文件的路径和函数的名称传递给pytest,例如:poetry run pytest tests/test_app.py::test_get_latest_check
。
如果您想看到执行时的打印语句,可以将-s标志传递给pytest(poetry run pytest -s
)。然而,请注意,这有时可能难以解析。
测试覆盖率
pytest自动使用coverage
包生成覆盖率报告,该报告在测试运行结束时在终端中显示。覆盖率在pypoject.toml
文件中的[tool.pytest.ini_options]
部分进行配置。您也可以在运行测试时通过传递一些标志(如--cov-report
)来覆盖覆盖率报告配置。有关更多信息,请参阅pytest-cov文档。
API
API需要在受保护端点(任何不是GET
的端点)上的每个请求都需要一个Bearer令牌。该令牌在config.toml
文件中配置为API_KEY
,并在udata_hydra/config_default.toml
文件中设置了默认值。
如果您将hydra用作外部服务以从udata接收资源事件,那么udata还需要在其udata.cfg
文件中配置此API密钥。
# Wether udata should publish the resource events
PUBLISH_ON_RESOURCE_EVENTS = True
# Where to publish the events
RESOURCES_ANALYSER_URI = "http://localhost:8000"
# The API key that hydra needs
RESOURCES_ANALYSER_API_KEY = "api_key_to_change"
运行
poetry install
poetry run adev runserver udata_hydra/app.py
路由/端点
API提供以下端点
与检查相关
- 在
/api/checks/latest?url={url}&resource_id={resource_id}
上使用GET
获取给定URL和/或resource_id
的最新检查 - 在
/api/checks/all?url={url}&resource_id={resource_id}
上使用GET
获取给定URL和/或resource_id
的所有检查 - 在
/api/checks/aggregate?group_by={column}&created_at={date}
上使用GET
获取特定date
按column
分组的检查发生情况
与资源相关
- 在
/api/resource/{resource_id}
上使用GET
获取数据库中“目录”表的资源,其resource_id
- 通过在
/api/resources
上执行POST
请求,从源接收资源创建事件。它将在数据库 "catalog" 表中创建一个新的资源,并将其标记为下一次爬取的优先级。 - 在
/api/resources/{resource_id}
上执行PUT
请求,以更新数据库 "catalog" 表中的资源。 - 在
/api/resources/{resource_id}
上执行DELETE
请求,以删除数据库 "catalog" 表中的资源。
:warning: 警告:以下路由已过时,未来需要删除:
POST
请求在/api/resource/created
上 -> 应使用POST
请求在/api/resources/
上。POST
请求在/api/resource/updated
上 -> 应使用PUT
请求在/api/resources/
上。POST
请求在/api/resource/deleted
上 -> 应使用DELETE
请求在/api/resources/
上。
与资源异常相关
- 在
/api/resources-exceptions
上执行GET
请求,以获取所有资源异常列表。 - 在
/api/resources-exceptions
上执行POST
请求,以在数据库中创建新的资源异常。 - 在
/api/resources-exceptions/{resource_id}
上执行PUT
请求,以更新数据库中的资源异常。 - 在
/api/resources-exceptions/{resource_id}
上执行DELETE
请求,以从数据库中删除资源异常。
与某些状态和健康检查相关
- 在
/api/status/crawler
上执行GET
请求,以获取爬取状态。 - 在
/api/status/worker
上执行GET
请求,以获取工作状态。 - 在
/api/stats
上执行GET
请求,以获取爬取统计信息。 - 在
/api/health
上执行GET
请求,以获取 API 版本号和环境。
以下提供了关于一些端点的更多详细信息以及示例,但并非所有端点。
获取最新的检查
与 ?url={url}
和 ?resource_id={resource_id}
一起使用。
$ curl -s "http://localhost:8000/api/checks/latest?url=http://opendata-sig.saintdenis.re/datasets/661e19974bcc48849bbff7c9637c5c28_1.csv" | json_pp
{
"status" : 200,
"catalog_id" : 64148,
"deleted" : false,
"error" : null,
"created_at" : "2021-02-06T12:19:08.203055",
"response_time" : 0.830198049545288,
"url" : "http://opendata-sig.saintdenis.re/datasets/661e19974bcc48849bbff7c9637c5c28_1.csv",
"domain" : "opendata-sig.saintdenis.re",
"timeout" : false,
"id" : 114750,
"dataset_id" : "5c34944606e3e73d4a551889",
"resource_id" : "b3678c59-5b35-43ad-9379-fce29e5b56fe",
"headers" : {
"content-disposition" : "attachment; filename=\"xn--Dlimitation_des_cantons-bcc.csv\"",
"server" : "openresty",
"x-amz-meta-cachetime" : "191",
"last-modified" : "Wed, 29 Apr 2020 02:19:04 GMT",
"content-encoding" : "gzip",
"content-type" : "text/csv",
"cache-control" : "must-revalidate",
"etag" : "\"20415964703d9ccc4815d7126aa3a6d8\"",
"content-length" : "207",
"date" : "Sat, 06 Feb 2021 12:19:08 GMT",
"x-amz-meta-contentlastmodified" : "2018-11-19T09:38:28.490Z",
"connection" : "keep-alive",
"vary" : "Accept-Encoding"
}
}
获取特定 URL 或资源的所有检查
与 ?url={url}
和 ?resource_id={resource_id}
一起使用。
$ curl -s "http://localhost:8000/api/checks/all?url=http://www.drees.sante.gouv.fr/IMG/xls/er864.xls" | json_pp
[
{
"domain" : "www.drees.sante.gouv.fr",
"dataset_id" : "53d6eadba3a72954d9dd62f5",
"timeout" : false,
"deleted" : false,
"response_time" : null,
"error" : "Cannot connect to host www.drees.sante.gouv.fr:443 ssl:True [SSLCertVerificationError: (1, \"[SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: Hostname mismatch, certificate is not valid for 'www.drees.sante.gouv.fr'. (_ssl.c:1122)\")]",
"catalog_id" : 232112,
"url" : "http://www.drees.sante.gouv.fr/IMG/xls/er864.xls",
"headers" : {},
"id" : 165107,
"created_at" : "2021-02-06T14:32:47.675854",
"resource_id" : "93dfd449-9d26-4bb0-a6a9-ee49b1b8a4d7",
"status" : null
},
{
"timeout" : false,
"deleted" : false,
"response_time" : null,
"error" : "Cannot connect to host www.drees.sante.gouv.fr:443 ssl:True [SSLCertVerificationError: (1, \"[SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: Hostname mismatch, certificate is not valid for 'www.drees.sante.gouv.fr'. (_ssl.c:1122)\")]",
"domain" : "www.drees.sante.gouv.fr",
"dataset_id" : "53d6eadba3a72954d9dd62f5",
"created_at" : "2020-12-24T17:06:58.158125",
"resource_id" : "93dfd449-9d26-4bb0-a6a9-ee49b1b8a4d7",
"status" : null,
"catalog_id" : 232112,
"url" : "http://www.drees.sante.gouv.fr/IMG/xls/er864.xls",
"headers" : {},
"id" : 65092
}
]
按列对特定日期的检查进行分组
与 ?group_by={column}
和 ?created_at={date}
一起使用。 date
应为 YYYY-MM-DD
格式的日期或默认关键字 today
。
$ curl -s "http://localhost:8000/api/checks/aggregate?group_by=domain&created_at=today" | json_pp
[
{
"value": "www.geo2france.fr",
"count": 4
},
{
"value": "static.data.gouv.fr",
"count": 4
},
{
"value": "grandestprod.data4citizen.com",
"count": 3
},
{
"value": "www.datasud.fr",
"count": 2
},
{
"value": "koumoul.com",
"count": 2
},
{
"value": "opendata.aude.fr",
"count": 2
},
{
"value": "departement-ain.opendata.arcgis.com",
"count": 2
},
{
"value": "opendata.agglo-larochelle.fr",
"count": 1
}
]
添加资源异常
$ curl -X POST http://localhost:8000/api/resources-exceptions
-H "Authorization: Bearer <myAPIkey>"
-d "{'resource_id': 'f868cca6-8da1-4369-a78d-47463f19a9a3', 'table_indexes': {'SIRET': "index", "immatriculation": "index"}}"
...或者,如果您不想添加表索引
$ curl -X POST http://localhost:8000/api/resources-exceptions
-H "Authorization: Bearer <myAPIkey>"
-d "{'resource_id': 'f868cca6-8da1-4369-a78d-47463f19a9a3'}"
更新资源异常
$ curl -X PUT http://localhost:8000/api/resources-exceptions/f868cca6-8da1-4369-a78d-47463f19a9a3
-H "Authorization: Bearer <myAPIkey>"
-d "{'table_indexes': {'SIRET': "index", "immatriculation": "index"}}"
删除资源异常
$ curl -X DELETE http://localhost:8000/api/resources-exceptions/f868cca6-8da1-4369-a78d-47463f19a9a3
-H "Authorization: Bearer <myAPIkey>"
获取爬取状态
$ curl -s "http://localhost:8000/api/status/crawler" | json_pp
{
"fresh_checks_percentage" : 0.4,
"pending_checks" : 142153,
"total" : 142687,
"fresh_checks" : 534,
"checks_percentage" : 0.4
}
获取工作状态
$ curl -s "http://localhost:8000/api/status/worker" | json_pp
{
"queued" : {
"default" : 0,
"high" : 825,
"low" : 655
}
}
获取爬取统计信息
$ curl -s "http://localhost:8000/api/stats" | json_pp
{
"status" : [
{
"count" : 525,
"percentage" : 98.3,
"label" : "ok"
},
{
"label" : "error",
"percentage" : 1.3,
"count" : 7
},
{
"label" : "timeout",
"percentage" : 0.4,
"count" : 2
}
],
"status_codes" : [
{
"code" : 200,
"count" : 413,
"percentage" : 78.7
},
{
"code" : 501,
"percentage" : 12.4,
"count" : 65
},
{
"percentage" : 6.1,
"count" : 32,
"code" : 404
},
{
"code" : 500,
"percentage" : 2.7,
"count" : 14
},
{
"code" : 502,
"count" : 1,
"percentage" : 0.2
}
]
}
使用 Webhook 集成
** 设置配置值**
创建一个 config.toml
文件,其中包含您启动的服务和命令,或通过 HYDRA_SETTINGS
环境变量指定 TOML 文件的路径。 config.toml
或等效文件将覆盖 udata_hydra/config_default.toml
中的值,可在那里查找需要或可以定义的值。
UDATA_URI = "https://dev.local:7000/api/2"
UDATA_URI_API_KEY = "example.api.key"
SENTRY_DSN = "https://{my-sentry-dsn}"
Webhook 集成在资源被分析或检查以填充资源额外信息时向 udata
发送 HTTP 消息。
关于分析,有一个名为 "更改检测" 的阶段。它将尝试根据不同的标准猜测资源是否已被修改。
- 在目录中获取修改日期
- 内容长度和最后修改头信息
- 随着时间的推移进行校验和比较
负载应类似于以下内容
{
"analysis:content-length": 91661,
"analysis:mime-type": "application/zip",
"analysis:checksum": "bef1de04601dedaf2d127418759b16915ba083be",
"analysis:last-modified-at": "2022-11-27T23:00:54.762000",
"analysis:last-modified-detection": "harvest-resource-metadata",
}
开发
docker compose
提供了多个 docker-compose 文件
- 一个最小的
docker-compose.yml
,包含两个 PostgreSQL 容器(一个用于目录和元数据,另一个用于将转换后的 CSV 数据库化) docker-compose.broker.yml
添加了一个 Redis 中间件docker-compose.test.yml
启动了一个测试数据库,用于运行测试
注意:您可以通过这种方式从多个文件启动 compose: docker compose -f docker-compose.yml -f docker-compose.test.yml up
日志记录和调试
可以通过环境变量 LOG_LEVEL 调整日志级别。例如,在初始化数据库时将日志级别设置为 DEBUG
,使用 LOG_LEVEL="DEBUG" udata-hydra init_db
。
编写迁移
- 添加一个名为
migrations/{YYYYMMDD}_{description}.sql
的文件,并编写您需要执行的 SQL 迁移。 udata-hydra migrate
将根据需要迁移数据库。
部署
需要部署 3 个服务才能运行完整堆栈
- worker
- api / app
- crawler
参考每个部分以了解如何启动它们。开发和生产环境之间的唯一区别是
- 使用
HYDRA_SETTINGS
环境变量来指向您的自定义config.toml
- 使用
HYDRA_APP_SOCKET_PATH
来配置 aiohttp 应该监听的 反向代理连接(例如 nginx),并使用udata-hydra-app
来启动应用程序服务器
贡献
在向仓库贡献并提交任何 PR 之前,初始化预提交钩子是必要的
pre-commit install
完成此操作后,代码格式化、代码检查以及导入排序将在每次提交之前自动检查。
如果您无法使用 pre-commit,则必须在提交之前使用 Ruff 格式化、检查和排序导入
ruff check --fix .
ruff format .
项目详情
下载文件
下载适合您平台的文件。如果您不确定选择哪个,请了解有关 安装包 的更多信息。
源分布
构建分布
udata_hydra-2.0.1.tar.gz 的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 19b3b594db36ce0bc0846416bca01a010b785f571303d23039be707616d53343 |
|
MD5 | e8f4206a0a2c401b2ba065741c281c29 |
|
BLAKE2b-256 | c199181ceb8751b98f2f2b4a5471de6ac03585340ed2afff37bf519474ae6104 |
udata_hydra-2.0.1-py3-none-any.whl 的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 0932283d397389d91d4ff26336e9b4aaea90be05e216b89837cd54fca7db9eca |
|
MD5 | 13ceb7bc71dae02eb898e8b27e974023 |
|
BLAKE2b-256 | 1b90593bf8177e8b1541754146c78f6b18546d1468e8d9e6bad231ceeddcb344 |