跳转到主要内容

未提供项目描述

项目描述

Tests

ckanext-ingest

从任意来源导入数据的框架。

注意:此扩展没有将所有可能的数据源导入CKAN的目的。相反,它定义了使导入更可预测、可重用和灵活的结构和规则。

如果需要以下功能,可以使用此扩展

  • 使用多个文件中的数据创建数据集/资源等。但希望以类似的方式导入所有文件,并且不希望花费时间介绍和解释整个过程。
  • 在不同项目中重用导入逻辑
  • 在不同导入工作流程之间共享逻辑片段

如果您只想

  • 一次使用CLI导入单个文件,并且以后不再这样做,那么您可能不需要它。

结构

要求

与核心CKAN版本兼容性

CKAN版本 兼容吗?
2.9 不兼容
2.10 兼容
master 兼容

安装

安装ckanext-ingest

  1. 通过 pip 安装它
    pip install ckanext-ingest
    
    ## with basic XLSX strategy
    # pip install 'ckanext-ingest[xlsx]'
    
  2. ingest 添加到您的CKAN配置文件中的 ckan.plugins 设置中。

使用

数据可以通过 ingest_import_records API操作导入到CKAN中。它需要一个包含数据的 source,并且建议传递一个提取 strategy,以实现对过程的完全控制。

ckanapi action ingest_import_records source@path/to/data.zip strategy="myext:extract_archive"

但在进行任何导入之前,您必须注册一个生成 recordsstrategystrategy 定义了如何解析源并将其分割成数据块,而 record 将单个数据块包装起来并使用块中的信息执行操作。

strategy 通过 IIngest 接口进行注册。它必须是 ckanext.ingest.shared.ExtractionStrategy 的子类。对 strategy 的唯一要求是从其 extract 方法返回可迭代的 records

recordstrategy 创建,它必须是 ckanext.ingest.shared.Record 的子类。其 ingest 方法负责导入:根据记录的目的,它可以创建/更新/删除数据或执行任何有意义的任务。

示例

注册自定义策略

import ckan.plugins as p

from ckanext.ingest.interfaces import IIngest

class MyPlugin(p.SingletonPlugin):
    p.implements(IIngest)

    def get_ingest_strategies(self):
        return {
          "my:custom_strategy": CustomStrategy
        }

读取JSON文件并从中创建单个数据集的策略。

import ckan.plugins.toolkit as tk
from ckanext.ingest.shared import ExtractionStrategy, Storage, Record, IngestionResult

class SingleJsonStrategy(ExtractionStrategy):

    def extract(self, source: Storage, options):
        # source is a readable IO stream(werkzeug.datastructures.FileStorage)
        data = json.load(source)

        # `extract` returns iterable over records. When the strategy produces
        # a single record, this record can be either yielded or returned as
        # a list with a single element
        yield SimplePackageRecord(data, {})

class SimplePackageRecord(Record):
    def ingest(self, context: ckan.types.Context) -> IngestionResult:

        dataset = tk.get_action("package_create")(context, self.data)

        # `ingest` returns a brief overview of the ingestion result
        return {
            "success": True,
            "result": dataset,
            "details": {}
        }

读取CSV文件以获取必须从门户中删除的组织名称的策略。

import csv
import ckan.plugins.toolkit as tk
from ckanext.ingest.shared import ExtractionStrategy, Record

class DropOrganizationsUsingCsvStrategy(ExtractionStrategy):

    def extract(self, source, options):
        # `source` is an `IO[bytes]`, so we turn in into `IO[str]`
        str_stream = StringIO(source.read().decode())
        rows = csv.DictReader(st_stream)

        for row in rows:
            # record's constructor requires two arguments:
            # the raw data and the mapping with record options.
            yield DropOrganiationRecord(row, {})

class DropOrganizationRecord(Record):
    def ingest(self, context: ckan.types.Context):
        try:
            tk.get_action("organization_delete")(context, {"id": self.data["name"]})
        except tk.ObjectNotFound:
            success = False
        else:
            success = True

        return {
            "success": success,
            "result": None,
            "details": {}
        }

从指定的CKAN实例(如 ckanext-harvest)中拉取数据集,并删除在导入过程中未更新的数据集。

import json
from datetime import datetime
from ckanapi import RemoteCKAN
import ckan.plugins.toolkit as tk
from ckanext.ingest.shared import ExtractionStrategy, Record

class HarvestStrategy(ExtractionStrategy):

    def extract(self, source, options):
        details = json.load(source)
        client = RemoteCKAN(**details)

        now = datetime.utcnow()

        # produce a record that creates a package for every remote dataset
        for dataset in client.action.package_search()["results"]:
            yield SimpleDatasetRecord(row, {})

        # produce an additional record that removes stale datasets
        # (datasets that were modified before ingestion started and were
        # not updated during current ingestion)
        yield DeleteStaleDatasetsRecord({"before": now}, {})

class SimplePackageRecord(Record):
    def ingest(self, context: ckan.types.Context) -> IngestionResult:

        dataset = tk.get_action("package_create")(context, self.data)

        return {
            "success": True,
            "result": dataset,
            "details": {"remote_id": self.data["id"]}
        }


class DeleteStaleDatasetsRecord(Record):
    def ingest(self, context: ckan.types.Context) -> IngestionResult:
        before = self.data["before"].isoformat()
        result = tk.get_action("package_search")(
            context,
            {"fq": f"metadata_modified:[* TO {before}]", "fl": "id"}
        )

        deleted = []
        for dataset in result["results"]
            tk.get_action("package_delete")(context, {"id": dataset["id"]})
            deleted.append(id)

        return {
            "success": True,
            "result": deleted,
            "details": {"count": len(deleted), "before": before}
        }

高级

要充分利用导入工作流程,请尝试编写可重用的策略和记录,详情如下。

策略自动检测

操作的 strategy 参数是可选的。当它缺失时,插件将选择最适合导入源的最合适的策略。此功能依赖于提取策略的 can_handlemust_handle 方法。这两种方法都接收源的mimetype以及源本身,并返回 True/False

在所有从 can_handle 返回 True 的策略中,插件选择第一个同时从 must_handle 返回 True 的策略。如果没有这样的策略,则选择第一个 can_handle

ckanext.ingest.shared.ExtractionStrategy 定义了这两种方法。 must_handle 总是返回 Falsecan_handle 在源的类型mimetype列在处理程序的 mimetypes 属性中时返回 True

class ExtractionStrategy:
    mimetypes: ClassVar[set[str]] = set()

    @classmethod
    def can_handle(cls, mime: str | None, source) -> bool:
        return mime in cls.mimetypes

    @classmethod
    def must_handle(cls, mime, source) -> bool:
        return False

如果您想注册可以处理JSON源的策略,只需注册具有适当 mimetypes 的策略。

class JsonStrategy(ExtractionStrategy):
    mimetypes = {"application/json"}

如果有多个策略支持JSON mimetype,则选择第一个注册的策略。如果您想注册一个始终处理具有特定名称(如 DRINK_ME.json)的JSON源的策略,不考虑顺序,您可以使用 must_handle

注意,仅当 can_handle 返回 True 时才会检查 must_handle,所以我们仍然使用默认的 mimetypes 逻辑。

class DrinkMeJsonStrategy(ExtractionStrategy):
    mimetypes = {"application/json"}

    @classmethod
    def must_handle(cls, mime, source: Storage) -> bool:
        return source.filename == "DRINK_ME.json"

记录工厂

ExtractionStrategy 有一个默认的 extract 实现。此默认实现调用 chunks 方法来解析源并获取可导入的数据片段。然后,对于每个数据片段,调用 chunk_into_record 方法将任意数据转换为 Record。最后,extract 产生由 chunk_into_record 产生的任何内容。

chunks 的默认实现忽略了源并返回一个空列表。因此,默认情况下,任何源都会产生零个记录,什么也不会发生。

要产生数据,您可以首先覆盖 chunks

如果您正在处理CSV文件,chunks可以返回文件中的行

class CsvRowsStrategy(ExtractionStrategy):
    mimetypes = {"text/csv"}

    def chunks(self, source, options) -> Iterable[Any]:
        str_stream = StringIO(source.read().decode())
        rows = csv.reader(str_stream)

        yield from rows

这种策略将为源CSV的每一行生成ckanext.ingest.shared.Record。但是基本的Record类并不做什么,所以您需要用您自己的Record子类替换它。

如前所述,数据块通过chunk_into_record方法转换为记录。您可以重写它,或者使用默认实现,该实现创建存储在策略的record_factory属性中的类的实例。此属性的默认值为ckanext.ingest.shared.Record,如果您想使用不同的记录实现,请执行以下操作

class CsvRowsStrategy(ExtractionStrategy):
    record_factory = MyCustomRecord
    ...

策略委派

ExtractionStrategy.extract方法负责生成记录。但这并不意味着策略必须自己生成记录。相反,策略可以做一些准备工作,然后使用另一个策略来生成记录。

让我们想象一个UrlStrategy,它接受包含单个行(远程门户的URL)的文件,并从该门户拉取数据。由于我们不知道数据的类型,我们无法知道如何从它创建记录。因此,当数据被检索时,我们可以使用其mimetype来选择最合适的策略,并将记录生成委派给其extract方法

import requests
import magic
from io import BytesIO
from ckanext.ingest import shared

class UrlStrategy(ExtractionStrategy):

    def extract(self, source, options) -> Iterable[Any]:
        # read URL from file-like source
        url = source.read()
        resp = requests.get(url)

        # convert response bytes into `source`
        sub_source = shared.make_storage(BytesIO(resp.content))

        # identify mimetype
        mime = magic.from_buffer(sub_source.read(1024))
        sub_source.seek(0)

        # choose the appropriate strategy
        handler = shared.get_handler_for_mimetype(mime, sub_source)

        # delegate extraction
        if handler:
            yield from handler.extract(source, options)

策略和记录选项

ExtractionStrategy.extractRecord.ingest接受第二个参数options。在两种情况下,它都是一个可以用来修改相应方法内部逻辑的字典。策略选项由ckanext.ingest.shared.StrategyOptions描述,记录选项由ckanext.ingest.shared.RecordOptions描述。

顶级定义的键对每个策略/记录都有意义。例如,RecordOptions定义了update_existing标志。如果创建数据的记录检测到现有冲突的实体,则应考虑update_existing标志来决定在这种情况下应做什么。这只是一种建议,并且可以忽略此标志或使用不同的选项。但使用常见选项简化了工作流程的理解。

对于策略,有3个常用键

  • record_options:这些选项应传递给策略生成的每个记录(RecordOptions
  • nested_strategy:如果策略将记录创建委派给另一个策略,则应优先使用nested_strategy而不是自动检测的策略(mimetype检测)
  • locator:如果源是某种类型的集合,则locator是一个可调用的函数,它返回集合的特定成员。它可以在解析存档时使用,以便策略可以从一个文件中提取包的元数据,并将由locator返回的资源上传到它。或者,在解析XLSX时,locator可以返回标题为工作表以简化多个工作表的处理。

对于任何只能由特定策略使用的选项,在StrategyOptionsRecordOptions中都有一个extras选项。这个字典可以存储任何数据,对其结构没有期望。

extras中常用的键最终可能被添加为顶级建议选项。但,因为这些只是建议,您可以直接忽略它们并传递您需要的任何数据作为选项。

记录中的数据转换

ckanext.ingest.shared.Record类需要两个初始化参数:raw数据和记录的options。记录创建时,它会调用其transform方法,该方法将raw数据复制到data属性。这是数据映射的最佳位置,在调用记录的ingest方法之前。如果您想从记录的data中删除所有空成员,可以按以下方式执行

class DenseRecord(Record):
    def transform(self, raw: Any):
        self.data = {
            key: value
            for key, value in raw.items()
            if value is not None
        }

记录摄取和结果

记录通常在导入过程中调用CKAN API中的一个操作。为了正确执行,记录需要context操作,该操作作为单个参数传递给ingest方法。但这只是最常见的流程,所以如果您不使用任何操作,请忽略context。更重要的是ingest的输出。它必须是一个由ckanext.ingest.shared.IngestionResult描述的字典。它有三个成员:

  • success:指示导入是否成功的标志
  • result:导入产生的数据(包、资源、组织等)
  • details:可能有用的其他详细信息。例如,在导入期间修改了多少实体、使用了哪个API操作、如果导入失败,发生了什么错误。

这些详细信息不是导入所必需的,但它们可以用于构建导入报告。

使用ckanext-scheming配置记录转换

ckanext.ingest.record模块包含PackageRecordResourceRecord类,它们用于创建包/资源。但它们的transform方法更有趣。它使用由ckanext-scheming定义的元数据模式中的字段配置将raw映射到data

为了配置映射,将ingest_options属性添加到字段定义中

- field_name: title
  label: Title
  ingest_options: {}

在转换过程中,检查raw中的每个键是否与模式匹配。如果模式包含具有ingest_optionsfield_namelabelraw中的键匹配的字段,则此键将复制到data中并映射到相应的field_name。即,对于上面的字段定义,两种raw版本 - {"title": "hello"}{"Title": "hello"}都将转换为包含值{"title": "hello"}data

如果您在raw中具有完全不同的名称,请使用ingest_optionsaliases(list[str])属性

- field_name: title
  label: Title
  ingest_options:
      aliasses: [TITLE, "name of the dataset"]

在这种情况下,{"name of the dataset": "hello"}{"TITLE": "hello"}将转换为{"title": "hello"}

如果值在用作字段值之前需要额外的处理,请指定所有应用的验证器作为ingest_optionsconvert属性

- field_name: title
  label: Title
  ingest_options:
      convert: conver_to_lowercase json_list_or_string

convert使用与字段定义中validators属性相同的语法。您可以在该字段内部使用任何已注册的验证器。但是,与验证器不同,如果在转换期间抛出Invalid错误,则字段将被静默忽略,导入将继续。

来自raw的任何没有在模式中对应字段(通过field_name/labelingest_options.aliases检测到)的字段,都不会添加到data中,也不会用于包/资源创建。

通用策略

有一些策略是现成的。您可能不会直接使用它们,但创建这些策略的子类可以简化流程并解决一些常见问题。

ingest:scheming_csv

ckanext.ingest.strategy.csv.CsvStrategy定义。

ingest:recursive_zip

ckanext.ingest.strategy.zip.CsvStrategy定义。

ingest:xlsx

ckanext.ingest.strategy.xlsx.XlsxStrategy定义。

配置

# List of allowed ingestion strategies. If empty, all registered strategies
# are allowed
# (optional, default: )
ckanext.ingest.strategy.allowed = ingest:recursive_zip

# List of disabled ingestion strategies.
# (optional, default: )
ckanext.ingest.strategy.disabled = ingest:scheming_csv

# Base template for WebUI
# (optional, default: page.html)
ckanext.ingest.base_template = admin/index.html

# Allow moving existing resources between packages.
# (optional, default: false)
ckanext.ingest.allow_resource_transfer = true

# Rename strategies using `{"import.path.of:StrategyClass": "new_name"}` JSON
# object
# (optional, default: )
ckanext.ingest.strategy.name_mapping = {"ckanext.ingest.strategy.zip:ZipStrategy": "zip"}

接口

ckanext.ingest.interfaces.IIngest接口的实现可以通过get_ingest_strategies方法注册自定义提取策略:

def get_ingest_strategies() -> dict[str, type[ckanext.ingest.shared.ExtractionStrategy]]:
    """Return extraction strategies."""
    return {
        "my_plugin:xlsx_datasets": MyXlsxStrategy,
    }

API

ingest_extract_records

从源中提取记录。

此方法主要存在于调试中。它不创建任何内容,只是解析源,生成记录并返回记录数据作为列表。因为它将所有提取的记录聚合到一个单独的列表中,所以可能会消耗大量内存。如果您想迭代,请考虑使用生成记录的可迭代函数iter_records

参数

source: str|FileStorage - data source for records

strategy: str|None - record extraction strategy. If missing, strategy
is guessed depending on source's mimetype

options: SourceOptions - dictionary with configuration for strategy and
records. Consumed by strategies so heavily depends on the chosen
strategy.

ingest_import_records

从源中提取的导入记录。

解析源,将其转换为使用所选策略的记录,并调用Record.ingest,可能创建/更新数据。

参数

source: str|FileStorage - data source for records

strategy: str|None - record extraction strategy. If missing, strategy
is guessed depending on source's mimetype

options: SourceOptions - dictionary with configuration for strategy and
records. Consumed by strategies so heavily depends on the chosen
strategy.

defaults: dict[str, Any] - default data added to every record(if missing)

overrides: dict[str, Any] - data that unconditionally overrides record details

skip: int - number of records that are skipped without ingestion

take: int - max number of records that will be ingested

项目详细信息


下载文件

下载您平台的文件。如果您不确定选择哪个,请了解更多关于安装包的信息。

源分发

ckanext_ingest-1.4.2.tar.gz (43.2 kB 查看哈希值)

上传时间

构建分发

ckanext_ingest-1.4.2-py3-none-any.whl (43.2 kB 查看哈希值)

上传时间 Python 3

由以下支持