跳转到主要内容

InvenioRDM数据迁移模块。

项目描述

https://github.com/inveniosoftware/invenio-rdm-migrator/workflows/CI/badge.svg https://img.shields.io/github/tag/inveniosoftware/invenio-rdm-migrator.svg https://img.shields.io/pypi/dm/invenio-rdm-migrator.svg https://img.shields.io/github/license/inveniosoftware/invenio-rdm-migrator.svg

DataCite-based数据模型用于Invenio。

开发

安装

请确保您系统已安装libpq-dev。有关更多信息,请参阅psycopg安装说明

确定是否需要使用中间数据库进行复制(更多详细信息请见下文)。如果是,您需要安装可选的[alchemy]扩展。注意!它安装了版本2的sqlalchemy,可能与某些InvenioRDM实例不兼容。

选择搜索和数据库的版本,然后运行

pip install -e .
pip install -e ."[tests,alchemy]"

测试

./run-tests.sh

如何运行它

要运行迁移,您需要

  • 一个正在运行的InvenioRDM实例。

  • 如果您的数据包含对其他记录的引用(例如,词汇表),则还需要运行设置步骤。

invenio-cli services setup --force --no-demo-data
  • 安装Invenio-RDM-Migrator,其他依赖关系必须由实例的Pipfile处理。

$ pip install invenio-rdm-migrator
  • 在您的实例上创建/编辑配置文件,例如streams.yaml

data_dir: /path/to/data
tmp_dir: /path/to/tmp
state_dir: /path/to/state
log_dir: /path/to/logs
db_uri: postgresql+psycopg2://inveniordm:inveniordm@localhost:5432/inveniordm
old_secret_key: CHANGE_ME
new_secret_key: CHANGE_ME
records:
    extract:
        filename: /path/to/records.json
  • 您需要创建一个小的Python脚本,将ETL的不同块组合在一起。您可以在以下位置找到示例:my-site/site/my_site/migrator/__main__.py

from invenio_rdm_migrator.streams import StreamDefinition
from invenio_rdm_migrator.streams.records.load import RDMRecordCopyLoad

if __name__ == "__main__":
    RecordStreamDefinition = StreamDefinition(
        name="records",
        extract_cls=JSONLExtract,
        transform_cls=ZenodoToRDMRecordTransform,
        load_cls=RDMRecordCopyLoad,
    )

    runner = Runner(
        stream_definitions=[
            RecordStreamDefinition,
        ],
        config_filepath="path/to/your/streams.yaml",
    )

    runner.run()
  • 最后,您可以执行上述代码。由于它位于Python包的__main__文件中,您可以将其作为模块运行

$ python -m my_site.migrator
  • 迁移完成后,在您的实例中您可以重新索引数据。以下列出的记录为例,它看起来像

$ invenio-cli pyshell

In [1]: from invenio_access.permissions import system_identity
In [2]: from invenio_rdm_records.proxies import current_rdm_records_service
In [3]: current_rdm_records_service.rebuild_index(identity=system_identity)

ETL(提取/转换/加载)架构

本模块 extract 包含四个包:transformloadstreams。前三个对应 ETL 流程的三个步骤。streams 包包含运行流程的逻辑和不同的 ETL 类的实现(例如 records)。

Extract

提取是数据处理流的第一个部分。它的功能很简单:返回一个迭代器(例如记录的迭代器),其中每个返回的值都是一个字典。请注意,这一步中的数据是在格式(例如 JSON、XML)上进行了转换,而不是在内容上。例如,XMLExtract 的实现可能如下所示

class XMLExtract(Extract):
...

    def run(self):
        with open("file.xml") as file:
            for entry in file:
                yield xml.loads(entry)

Transform

转换器负责修改内容以适应 InvenioRDM 数据模型(例如记录),以便可以导入数据库。它将遍历条目(即提取类返回的迭代器),转换并返回(例如记录)。以记录为例,进一步探讨

要将某些内容转换为 RDM 记录,您需要实现 streams/records/transform.py:RDMRecordTransform。对于每个记录,它将产生一个被认为是语义上“完整”的记录:记录本身、其父记录、如果存在则其草案以及与之相关的文件。

{
    "record": self._record(entry),
    "draft": self._draft(entry),
    "parent": self._parent(entry),
}

这意味着您需要为每个键实现函数。请注意,只有 _record_parent 应该返回内容,其他可以返回 None

其中一些函数本身可以使用 transform/base:Entry 转换器。_entry_ 转换器是一个额外的抽象层,提供与 Transform 类所需的生成有效数据的接口。在记录示例中,您可以实现 transform.base:RDMRecordEntry,它可以在上面代码片段中提到的 RDMRecordTransform._record 函数中使用。请注意,实现此接口将产生记录的有效数据。但是,没有关于 _metadata_ 的 abc。这是一个开放性问题,即我们应该定义多少接口以及如何避免重复 InvenioRDM 中已经存在的 Marshmallow 模式。

此时您可能会想,“为什么不使用 Marshmallow 呢”?答案是“责任分离、性能和简单性”。后者与这样一个事实有关,即大部分数据转换都是定制的,所以我们最终会得到一个充满 Method 字段的模式,这与我们拥有的模式没有太大区别,但会影响性能(Marshmallow 很慢...)。至于责任部分,验证(主要是引用,如词汇表)只能在(或之后)_load_ 时进行,此时有 RDM 实例知识/appctx。

请注意,在这一步中没有进行任何验证,甚至没有结构验证。

Load

将记录可用在 RDM 实例中的最终步骤是将它们加载。有两种类型的加载:_bulk_ 或 _transactions_。

Bulk

批量加载将通过 COPY 在数据库表中逐表插入数据。由于表的顺序没有保证,在加载之前需要删除外键。之后可以恢复它们。此外,删除索引会增加性能,因为它们将只计算一次,在加载后恢复时。

批量加载使用 load.postgresql.bulk:PostgreSQLCopyLoad 类进行,该类将执行 2 个步骤

  1. 准备数据,将每行数据库行写入一个 csv 文件

$ /path/to/data/tables1668697280.943311
    |
    | - pidstore_pid.csv
    | - rdm_parents_metadata.csv
    | - rdm_records_metadata.csv
    | - rdm_versions_state.csv
  1. 执行实际加载,使用 COPY。一次插入所有行比逐行执行 INSERT 更高效。

内部发生的情况是,prepare 函数使用 TableGenerator 实现并生成 csv 文件的文件名列表。因此,load 只遍历文件名,而不是实际的条目。

TableGenerator 将为迭代器中的每个值生成一行或多行(要写入 csv 文件的行)。例如,对于记录,它将生成:recid、DOI 和 OAI(持久标识符)、记录和父级元数据等,这些将被写入相应的 CSV 文件。

Transactions

另一种选择是迁移事务。例如,一旦您已批量完成其初始部分,您就可以迁移在批量迁移过程中持久化的更改。这可以通过迁移事务来实现。事务是一组操作,可以理解为SQL语句,因此有两个值:操作类型(创建、更新、删除)和其表示为数据库模型的数据。

事务加载使用的是load.postgresql.transactions:PostgreSQLExecuteLoad类,它将执行与上面类似的两个步骤

  1. 准备数据,在内存中存储一系列Operation

  2. 通过向会话中添加或删除,或更新相应的对象来执行实际的加载。每个操作都会刷新到数据库中,以避免外键违规。然而,每个事务都是原子性的,这意味着其中任何一个操作的错误都会导致整个事务作为一个组失败。

在内部,加载将使用一个load.postgresql.transactions.generators.group:TxGenerator实例来准备操作。此类包含表名与load.postgresql.transactions.generators.row:RowGenerators之间的映射,后者将在obj属性中返回包含数据库模型数据的操作列表。

请注意,TxGeneratortransform.transactions.Tx紧密耦合,因为它期望字典具有特定的结构

{
    "tx_id": the actual transaction id, useful for debug and error handling
    "action": this information refers to the semantic meaning of the group
                   for example: record metadata update or file upload
    "operations": [
        {
            "op": c (create), u (update), d (delete)
            "table": the name of the table in the source system (e.g. Zenodo)
            "data": the transformed data, this can use any `Transform` implementation
        }
    ]
}

状态

在迁移运行期间,需要在不同的流或同一流上的不同生成器之间共享信息。例如,记录流需要访问在社区流中填充的UUID到slug映射;或者草稿生成器需要知道在记录生成器上已创建哪些父记录,以保持版本状态一致。

所有这些信息都持久化到SQLite数据库中。这个状态数据库在每次流处理期间保持在内存中,如果在没有错误的情况下流完成,它将持久化到磁盘。状态将使用流名称(例如,records.db)保存,以避免覆盖以前的州。因此,迁移可以从任何流重新启动。

有两种方法可以向状态添加更多信息

  • 完整的实体,例如记录或用户,需要自己的数据库表。必须在state.py:State._initialize_db中定义它们。此外,为了抽象对表的访问,需要一个状态实体。它需要在Runner.py:Runner构造函数中初始化并将其添加到state_entities字典中。

  • 独立值,例如生成的主键的最大值。这些可以存储在global_state中。此状态有两个列:键和值;向其中添加信息将类似于{key: name_of_the_value, value: actual_value}

注释

使用python生成器

使用生成器而不是列表,我们可以只遍历一次数据,并对其执行E-T-L步骤。而不是为E循环,为T循环,为L循环。此外,这还允许我们在写入时打开csv文件,并在最后关闭它们(打开/关闭是一个昂贵的操作,当重复3M次时)。

更改

版本5.0.0(发布于2024-07-12)

  • 更改子模块的打包方式(破坏性更改)

  • 安装:可选安装sqlalchemy版本2(解决了与当前InvenioRDM安装的依赖项冲突)

版本4.4.1

  • 修复可空模型字段的默认值。

版本4.4.0

  • 添加GitHub流。

  • 为用户模型添加verified_atblocked_at

  • 处理记录的父DOI。

  • 将媒体文件添加到记录和草稿。

  • deletion_status 添加到记录模型中。

  • 切换到 orjson 以进行 JSON 序列化和反序列化。

  • 为转换添加多进程处理。

  • 重构状态,使其也使用 Python 字典进行缓存。

版本 4.3.0

  • 添加社区基本 CRUD 操作。

  • 添加数据库会话固件。

版本 4.2.0

  • FileUploadAction 重命名为 DraftFileUploadAction

版本 4.1.0

  • 添加文件上传操作。

  • 添加草稿编辑操作。

版本 4.0.0

  • 通过加载和转换对操作进行命名空间。

版本 3.1.0

  • DatetimeMixin 添加到转换时间戳为 ISO 格式日期字符串。

  • JSONLoadMixin 添加到从字符串中加载字典。

版本 3.0.0

  • Operation 实例已将模型和数据拆分为两个属性。

  • 添加用户操作。

  • PostgreSQLTxresolve_references 函数现在有一个默认行为(pass)。

  • 为草稿和用户相关模型添加可空配置。

  • 修复了少量错误。

版本 2.0.0

  • 使状态全局可用。

  • 将事务重构为操作。创建事务和加载数据类。

  • 移除了空的 Kafka 提取模块。

  • 改进了错误处理并创建了专用类。

  • dict_set 移至 utils。

  • 从测试矩阵中删除 Python 3.8。

版本 1.0.0

  • 首次公开发布。

项目详情


下载文件

下载适用于您的平台的文件。如果您不确定选择哪个,请了解有关 安装软件包 的更多信息。

源分布

invenio-rdm-migrator-5.0.0.tar.gz (96.1 kB 查看哈希值)

上传时间

构建分布

invenio_rdm_migrator-5.0.0-py2.py3-none-any.whl (109.0 kB 查看哈希值)

上传时间 Python 2 Python 3

由以下支持