未提供项目描述
项目描述
ckanext-ingest
从任意来源导入数据的框架。
注意:此扩展没有将所有可能的数据源导入CKAN的目的。相反,它定义了使导入更可预测、可重用和灵活的结构和规则。
如果需要以下功能,可以使用此扩展
- 使用多个文件中的数据创建数据集/资源等。但希望以类似的方式导入所有文件,并且不希望花费时间介绍和解释整个过程。
- 在不同项目中重用导入逻辑
- 在不同导入工作流程之间共享逻辑片段
如果您只想
- 一次使用CLI导入单个文件,并且以后不再这样做,那么您可能不需要它。
结构
要求
与核心CKAN版本兼容性
CKAN版本 | 兼容吗? |
---|---|
2.9 | 不兼容 |
2.10 | 兼容 |
master | 兼容 |
安装
安装ckanext-ingest
- 通过 pip 安装它
pip install ckanext-ingest ## with basic XLSX strategy # pip install 'ckanext-ingest[xlsx]'
- 将
ingest
添加到您的CKAN配置文件中的ckan.plugins
设置中。
使用
数据可以通过 ingest_import_records
API操作导入到CKAN中。它需要一个包含数据的 source
,并且建议传递一个提取 strategy
,以实现对过程的完全控制。
ckanapi action ingest_import_records source@path/to/data.zip strategy="myext:extract_archive"
但在进行任何导入之前,您必须注册一个生成 records
的 strategy
。 strategy
定义了如何解析源并将其分割成数据块,而 record
将单个数据块包装起来并使用块中的信息执行操作。
strategy
通过 IIngest
接口进行注册。它必须是 ckanext.ingest.shared.ExtractionStrategy
的子类。对 strategy
的唯一要求是从其 extract
方法返回可迭代的 records
。
record
由 strategy
创建,它必须是 ckanext.ingest.shared.Record
的子类。其 ingest
方法负责导入:根据记录的目的,它可以创建/更新/删除数据或执行任何有意义的任务。
示例
注册自定义策略
import ckan.plugins as p
from ckanext.ingest.interfaces import IIngest
class MyPlugin(p.SingletonPlugin):
p.implements(IIngest)
def get_ingest_strategies(self):
return {
"my:custom_strategy": CustomStrategy
}
读取JSON文件并从中创建单个数据集的策略。
import ckan.plugins.toolkit as tk
from ckanext.ingest.shared import ExtractionStrategy, Storage, Record, IngestionResult
class SingleJsonStrategy(ExtractionStrategy):
def extract(self, source: Storage, options):
# source is a readable IO stream(werkzeug.datastructures.FileStorage)
data = json.load(source)
# `extract` returns iterable over records. When the strategy produces
# a single record, this record can be either yielded or returned as
# a list with a single element
yield SimplePackageRecord(data, {})
class SimplePackageRecord(Record):
def ingest(self, context: ckan.types.Context) -> IngestionResult:
dataset = tk.get_action("package_create")(context, self.data)
# `ingest` returns a brief overview of the ingestion result
return {
"success": True,
"result": dataset,
"details": {}
}
读取CSV文件以获取必须从门户中删除的组织名称的策略。
import csv
import ckan.plugins.toolkit as tk
from ckanext.ingest.shared import ExtractionStrategy, Record
class DropOrganizationsUsingCsvStrategy(ExtractionStrategy):
def extract(self, source, options):
# `source` is an `IO[bytes]`, so we turn in into `IO[str]`
str_stream = StringIO(source.read().decode())
rows = csv.DictReader(st_stream)
for row in rows:
# record's constructor requires two arguments:
# the raw data and the mapping with record options.
yield DropOrganiationRecord(row, {})
class DropOrganizationRecord(Record):
def ingest(self, context: ckan.types.Context):
try:
tk.get_action("organization_delete")(context, {"id": self.data["name"]})
except tk.ObjectNotFound:
success = False
else:
success = True
return {
"success": success,
"result": None,
"details": {}
}
从指定的CKAN实例(如 ckanext-harvest)中拉取数据集,并删除在导入过程中未更新的数据集。
import json
from datetime import datetime
from ckanapi import RemoteCKAN
import ckan.plugins.toolkit as tk
from ckanext.ingest.shared import ExtractionStrategy, Record
class HarvestStrategy(ExtractionStrategy):
def extract(self, source, options):
details = json.load(source)
client = RemoteCKAN(**details)
now = datetime.utcnow()
# produce a record that creates a package for every remote dataset
for dataset in client.action.package_search()["results"]:
yield SimpleDatasetRecord(row, {})
# produce an additional record that removes stale datasets
# (datasets that were modified before ingestion started and were
# not updated during current ingestion)
yield DeleteStaleDatasetsRecord({"before": now}, {})
class SimplePackageRecord(Record):
def ingest(self, context: ckan.types.Context) -> IngestionResult:
dataset = tk.get_action("package_create")(context, self.data)
return {
"success": True,
"result": dataset,
"details": {"remote_id": self.data["id"]}
}
class DeleteStaleDatasetsRecord(Record):
def ingest(self, context: ckan.types.Context) -> IngestionResult:
before = self.data["before"].isoformat()
result = tk.get_action("package_search")(
context,
{"fq": f"metadata_modified:[* TO {before}]", "fl": "id"}
)
deleted = []
for dataset in result["results"]
tk.get_action("package_delete")(context, {"id": dataset["id"]})
deleted.append(id)
return {
"success": True,
"result": deleted,
"details": {"count": len(deleted), "before": before}
}
高级
要充分利用导入工作流程,请尝试编写可重用的策略和记录,详情如下。
策略自动检测
操作的 strategy
参数是可选的。当它缺失时,插件将选择最适合导入源的最合适的策略。此功能依赖于提取策略的 can_handle
和 must_handle
方法。这两种方法都接收源的mimetype以及源本身,并返回 True
/False
。
在所有从 can_handle
返回 True
的策略中,插件选择第一个同时从 must_handle
返回 True
的策略。如果没有这样的策略,则选择第一个 can_handle
。
ckanext.ingest.shared.ExtractionStrategy
定义了这两种方法。 must_handle
总是返回 False
。 can_handle
在源的类型mimetype列在处理程序的 mimetypes
属性中时返回 True
。
class ExtractionStrategy:
mimetypes: ClassVar[set[str]] = set()
@classmethod
def can_handle(cls, mime: str | None, source) -> bool:
return mime in cls.mimetypes
@classmethod
def must_handle(cls, mime, source) -> bool:
return False
如果您想注册可以处理JSON源的策略,只需注册具有适当 mimetypes
的策略。
class JsonStrategy(ExtractionStrategy):
mimetypes = {"application/json"}
如果有多个策略支持JSON mimetype,则选择第一个注册的策略。如果您想注册一个始终处理具有特定名称(如 DRINK_ME.json
)的JSON源的策略,不考虑顺序,您可以使用 must_handle
。
注意,仅当 can_handle
返回 True
时才会检查 must_handle
,所以我们仍然使用默认的 mimetypes
逻辑。
class DrinkMeJsonStrategy(ExtractionStrategy):
mimetypes = {"application/json"}
@classmethod
def must_handle(cls, mime, source: Storage) -> bool:
return source.filename == "DRINK_ME.json"
记录工厂
ExtractionStrategy
有一个默认的 extract
实现。此默认实现调用 chunks
方法来解析源并获取可导入的数据片段。然后,对于每个数据片段,调用 chunk_into_record
方法将任意数据转换为 Record
。最后,extract
产生由 chunk_into_record
产生的任何内容。
chunks
的默认实现忽略了源并返回一个空列表。因此,默认情况下,任何源都会产生零个记录,什么也不会发生。
要产生数据,您可以首先覆盖 chunks
。
如果您正在处理CSV文件,chunks
可以返回文件中的行
class CsvRowsStrategy(ExtractionStrategy):
mimetypes = {"text/csv"}
def chunks(self, source, options) -> Iterable[Any]:
str_stream = StringIO(source.read().decode())
rows = csv.reader(str_stream)
yield from rows
这种策略将为源CSV的每一行生成ckanext.ingest.shared.Record
。但是基本的Record
类并不做什么,所以您需要用您自己的Record
子类替换它。
如前所述,数据块通过chunk_into_record
方法转换为记录。您可以重写它,或者使用默认实现,该实现创建存储在策略的record_factory
属性中的类的实例。此属性的默认值为ckanext.ingest.shared.Record
,如果您想使用不同的记录实现,请执行以下操作
class CsvRowsStrategy(ExtractionStrategy):
record_factory = MyCustomRecord
...
策略委派
ExtractionStrategy.extract
方法负责生成记录。但这并不意味着策略必须自己生成记录。相反,策略可以做一些准备工作,然后使用另一个策略来生成记录。
让我们想象一个UrlStrategy
,它接受包含单个行(远程门户的URL)的文件,并从该门户拉取数据。由于我们不知道数据的类型,我们无法知道如何从它创建记录。因此,当数据被检索时,我们可以使用其mimetype来选择最合适的策略,并将记录生成委派给其extract
方法
import requests
import magic
from io import BytesIO
from ckanext.ingest import shared
class UrlStrategy(ExtractionStrategy):
def extract(self, source, options) -> Iterable[Any]:
# read URL from file-like source
url = source.read()
resp = requests.get(url)
# convert response bytes into `source`
sub_source = shared.make_storage(BytesIO(resp.content))
# identify mimetype
mime = magic.from_buffer(sub_source.read(1024))
sub_source.seek(0)
# choose the appropriate strategy
handler = shared.get_handler_for_mimetype(mime, sub_source)
# delegate extraction
if handler:
yield from handler.extract(source, options)
策略和记录选项
ExtractionStrategy.extract
和Record.ingest
接受第二个参数options
。在两种情况下,它都是一个可以用来修改相应方法内部逻辑的字典。策略选项由ckanext.ingest.shared.StrategyOptions
描述,记录选项由ckanext.ingest.shared.RecordOptions
描述。
顶级定义的键对每个策略/记录都有意义。例如,RecordOptions
定义了update_existing
标志。如果创建数据的记录检测到现有冲突的实体,则应考虑update_existing
标志来决定在这种情况下应做什么。这只是一种建议,并且可以忽略此标志或使用不同的选项。但使用常见选项简化了工作流程的理解。
对于策略,有3个常用键
record_options
:这些选项应传递给策略生成的每个记录(RecordOptions
)nested_strategy
:如果策略将记录创建委派给另一个策略,则应优先使用nested_strategy
而不是自动检测的策略(mimetype检测)locator
:如果源是某种类型的集合,则locator
是一个可调用的函数,它返回集合的特定成员。它可以在解析存档时使用,以便策略可以从一个文件中提取包的元数据,并将由locator
返回的资源上传到它。或者,在解析XLSX时,locator
可以返回标题为工作表以简化多个工作表的处理。
对于任何只能由特定策略使用的选项,在StrategyOptions
和RecordOptions
中都有一个extras
选项。这个字典可以存储任何数据,对其结构没有期望。
在extras
中常用的键最终可能被添加为顶级建议选项。但,因为这些只是建议,您可以直接忽略它们并传递您需要的任何数据作为选项。
记录中的数据转换
ckanext.ingest.shared.Record
类需要两个初始化参数:raw
数据和记录的options
。记录创建时,它会调用其transform
方法,该方法将raw
数据复制到data
属性。这是数据映射的最佳位置,在调用记录的ingest
方法之前。如果您想从记录的data
中删除所有空成员,可以按以下方式执行
class DenseRecord(Record):
def transform(self, raw: Any):
self.data = {
key: value
for key, value in raw.items()
if value is not None
}
记录摄取和结果
记录通常在导入过程中调用CKAN API中的一个操作。为了正确执行,记录需要context
操作,该操作作为单个参数传递给ingest
方法。但这只是最常见的流程,所以如果您不使用任何操作,请忽略context
。更重要的是ingest
的输出。它必须是一个由ckanext.ingest.shared.IngestionResult
描述的字典。它有三个成员:
success
:指示导入是否成功的标志result
:导入产生的数据(包、资源、组织等)details
:可能有用的其他详细信息。例如,在导入期间修改了多少实体、使用了哪个API操作、如果导入失败,发生了什么错误。
这些详细信息不是导入所必需的,但它们可以用于构建导入报告。
使用ckanext-scheming配置记录转换
ckanext.ingest.record
模块包含PackageRecord
和ResourceRecord
类,它们用于创建包/资源。但它们的transform
方法更有趣。它使用由ckanext-scheming定义的元数据模式中的字段配置将raw
映射到data
。
为了配置映射,将ingest_options
属性添加到字段定义中
- field_name: title
label: Title
ingest_options: {}
在转换过程中,检查raw
中的每个键是否与模式匹配。如果模式包含具有ingest_options
的field_name
或label
与raw
中的键匹配的字段,则此键将复制到data
中并映射到相应的field_name
。即,对于上面的字段定义,两种raw
版本 - {"title": "hello"}
和{"Title": "hello"}
都将转换为包含值{"title": "hello"}
的data
。
如果您在raw
中具有完全不同的名称,请使用ingest_options
的aliases
(list[str]
)属性
- field_name: title
label: Title
ingest_options:
aliasses: [TITLE, "name of the dataset"]
在这种情况下,{"name of the dataset": "hello"}
和{"TITLE": "hello"}
将转换为{"title": "hello"}
。
如果值在用作字段值之前需要额外的处理,请指定所有应用的验证器作为ingest_options
的convert
属性
- field_name: title
label: Title
ingest_options:
convert: conver_to_lowercase json_list_or_string
convert
使用与字段定义中validators
属性相同的语法。您可以在该字段内部使用任何已注册的验证器。但是,与验证器不同,如果在转换期间抛出Invalid
错误,则字段将被静默忽略,导入将继续。
来自raw
的任何没有在模式中对应字段(通过field_name
/label
或ingest_options.aliases
检测到)的字段,都不会添加到data
中,也不会用于包/资源创建。
通用策略
有一些策略是现成的。您可能不会直接使用它们,但创建这些策略的子类可以简化流程并解决一些常见问题。
ingest:scheming_csv
由ckanext.ingest.strategy.csv.CsvStrategy
定义。
ingest:recursive_zip
由ckanext.ingest.strategy.zip.CsvStrategy
定义。
ingest:xlsx
由ckanext.ingest.strategy.xlsx.XlsxStrategy
定义。
配置
# List of allowed ingestion strategies. If empty, all registered strategies
# are allowed
# (optional, default: )
ckanext.ingest.strategy.allowed = ingest:recursive_zip
# List of disabled ingestion strategies.
# (optional, default: )
ckanext.ingest.strategy.disabled = ingest:scheming_csv
# Base template for WebUI
# (optional, default: page.html)
ckanext.ingest.base_template = admin/index.html
# Allow moving existing resources between packages.
# (optional, default: false)
ckanext.ingest.allow_resource_transfer = true
# Rename strategies using `{"import.path.of:StrategyClass": "new_name"}` JSON
# object
# (optional, default: )
ckanext.ingest.strategy.name_mapping = {"ckanext.ingest.strategy.zip:ZipStrategy": "zip"}
接口
ckanext.ingest.interfaces.IIngest
接口的实现可以通过get_ingest_strategies
方法注册自定义提取策略:
def get_ingest_strategies() -> dict[str, type[ckanext.ingest.shared.ExtractionStrategy]]:
"""Return extraction strategies."""
return {
"my_plugin:xlsx_datasets": MyXlsxStrategy,
}
API
ingest_extract_records
从源中提取记录。
此方法主要存在于调试中。它不创建任何内容,只是解析源,生成记录并返回记录数据作为列表。因为它将所有提取的记录聚合到一个单独的列表中,所以可能会消耗大量内存。如果您想迭代,请考虑使用生成记录的可迭代函数iter_records
。
参数
source: str|FileStorage - data source for records
strategy: str|None - record extraction strategy. If missing, strategy
is guessed depending on source's mimetype
options: SourceOptions - dictionary with configuration for strategy and
records. Consumed by strategies so heavily depends on the chosen
strategy.
ingest_import_records
从源中提取的导入记录。
解析源,将其转换为使用所选策略的记录,并调用Record.ingest
,可能创建/更新数据。
参数
source: str|FileStorage - data source for records
strategy: str|None - record extraction strategy. If missing, strategy
is guessed depending on source's mimetype
options: SourceOptions - dictionary with configuration for strategy and
records. Consumed by strategies so heavily depends on the chosen
strategy.
defaults: dict[str, Any] - default data added to every record(if missing)
overrides: dict[str, Any] - data that unconditionally overrides record details
skip: int - number of records that are skipped without ingestion
take: int - max number of records that will be ingested
项目详细信息
下载文件
下载您平台的文件。如果您不确定选择哪个,请了解更多关于安装包的信息。
源分发
构建分发
ckanext_ingest-1.4.2.tar.gz的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 69d8f35987e9bab274e6dd5fb67dd257612f58511ce26c4c94bd9e1f6a516eb1 |
|
MD5 | 27154da038cabea9c9f76dc14d3f3552 |
|
BLAKE2b-256 | 028f52d7a817207c804421a23e240d122d18a9cf54cb30f31f47688f8a8c3d4c |
ckanext_ingest-1.4.2-py3-none-any.whl的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 3bbd6899db77703872bf4e688bc939caa0deae190342fd7877a8b949e218fbcd |
|
MD5 | 6dc663bde6b9c0adfd9ba1b98fc8acc4 |
|
BLAKE2b-256 | d853d5470f5c33fc095a9a894b62c07e1945f6f3eeea3b5084fa15252da47fb3 |