跳转到主要内容

数据输入/输出立方,导入和导出

项目描述

数据输入/输出立方,导入和导出

大规模存储

大规模存储是一个CW存储,用于通过纯SQL逻辑推送大量数据,从而避免CW检查。它比其他CW存储更快(它不检查每个步骤的eid,使用COPY FROM方法),但安全性较低(没有数据完整性安全),在使用create_entity函数时不会返回eid。

警告:目前,由于依赖于COPY FROM方法和特定的PostgreSQL表来获取所有索引,此存储可能只能与PostgreSQL一起使用。

大规模存储的工作流程

大规模存储的工作流程如下

  • 从元数据表(实体、is_instance_of等)中删除索引和约束;

  • 插入数据

    • 对于实体,使用create_entity函数;

    • 对于关系,使用relate函数;

    • 对于基于外部标识符的关系,使用related_by_iid函数;

    • 对于尚未见过的rtype的每次插入,将触发为该rtype创建一个临时表以存储结果。

    • 对于尚未见过的etype的每次插入,将删除实体表上的所有索引/约束。

  • 在某个时刻,应调用flush方法

    • 它将基于COPY_FROM将实体数据刷新到数据库中。

    • 它将基于COPY_FROM将关系数据刷新到数据库中。

    • 它将基于COPY_FROM将关系-iid数据刷新到数据库中。

    • 它将为插入的实体创建元数据(实体等)。

    • 它将提交。

  • 如果某些关系基于外部标识符(relate_by_iid)创建,则应手动使用convert_relations方法进行转换。

  • 在插入结束时,应调用cleanup方法

    • 它将为实体/关系表重新创建索引/约束/主键。

    • 它将在元数据表上重新创建索引/约束。

    • 它将删除临时表和内部存储表。

Massive Store 中的实体/关系

由于数据库插入的技术限制,以下是一些需要注意的具体点:

  • create_entity 调用将返回一个具有特定 eid 的实体。Eid 由 Massive Store 自动处理(它将获取给定范围的 eid 以供内部使用),但您可以在 create_entity 调用的 kwargs 中传递一个特定的 eid 以跳过自动分配 eid。

  • relate 方法不支持内联关系。

将为 PostgreSQL COPY_FROM 子句的调用创建一个缓冲区。如果用于创建此表文件的分隔符出现在实体(或关系)的数据中,它将被存储的 replace_sep 替换(默认为空字符串)。

Massive Store 的基本使用

使用 Massive Store 的简单脚本

# Initialize the store
store = MassiveObjectStore(session)
# Initialize the Relation table
store.init_rtype_table('Person', 'lives', 'Location')

# Import logic
...
entity = store.create_entity('Person', ...)
entity = store.create_entity('Location', ...)

# Flush the data in memory to sql database
store.flush()

# Import logic
...
entity = store.create_entity('Person', ...)
entity = store.create_entity('Location', ...)
# Person_iid and location_iid are unique iid that are data dependant (e.g URI)
store.relate_by_iid(person_iid, 'lives', location_iid)
...

# Flush the data in memory to sql database
store.flush()

# Convert the relation
store.convert_relations('Person', 'lives', 'Location')

# Clean the store / rebuild indexes
store.cleanup()

在这种情况下,iid_subj 和 iid_obj 代表一个唯一的 id(例如 uri 或导入数据库中的 id),它可以在导入实体后用于创建关系。

Massive Store 的高级使用

Massive Store 的简单和默认使用是保守的,以避免元数据管理中的问题。但是,可以增加插入速度

  • 如果多次刷新元数据,则刷新元数据可能会很昂贵。一个好的做法是在导入结束时只执行一次。为此,您应该在创建存储时将 autoflush_metadata 设置为 False,并在导入结束时调用 flush_meta_data(但在调用 `cleanup` 之前)。

  • 您可以通过在创建存储时将 commit_at_flush 设置为 False 来避免每次刷新时都提交。因此,您应该明确地调用 commit 方法至少一次 在刷新元数据和清理存储之前

  • 您可以在创建存储期间使用 drop_index 属性来避免删除不同的索引和约束。

  • 您可以在创建存储期间使用 eids_seq_start 属性来设置 eids 序列的不同起始点。

  • 可以提供额外的回调来处理提交和回滚(on_commit_callbackon_rollback_callback)。

Massive Store 高级使用示例

store = MassiveObjectStore(session,
                           autoflush_metadata=False,
                           commit_at_flush=False)
store.init_rtype_table('Location', 'names', 'LocationName')
for ind, infos in enumerate(ucsvreader(open(dumpname))):
    entity = {'name': infos[1], ...}
    entity['my_inlined_relation'] =  my_dict.get(infos[2])
    entity = store.create_entity('Location', **entity)
    store.relate_by_iid(entity.cwuri, 'my_external_relation', infos[3])
    if ind and ind % 200000 == 0:
        store.flush()
        store.commit()
store.flush()
store.commit()
store.flush_meta_data()
store.convert_relations('Location', 'my_external_relation', 'Location',
                        'cwuri', 'cwuri')
store.cleanup()

在 Massive Store 失败后恢复数据库

Massive Store 删除一些约束和索引,这些约束和索引在 cleanup 调用期间自动重建。如果在导入过程中发生错误,您仍然可以调用 cleanup 方法,甚至在失败后重新创建另一个存储并调用该存储的 cleanup 方法。

Massive Store 为其内部使用创建了以下表

  • dataio_initialized:有关已初始化的 etype/rtype 表的信息。

  • dataio_constraints:可能用于恢复不同 etype/rtype 表的约束/索引的查询。

  • dataio_metadata:已经推送其元数据的 etypes。

从模式

对于并行使用 Massive Store,提供了从模式

  • 应创建一个 Massive Store()。

  • 对于导入过程中可能遇到的所有可能的 etype/rtype,应调用 存储的 init_etype_table/init_relation_table 方法。

  • 可以在创建存储期间使用 slave_mode 属性创建不同的 存储。应将 autoflush_metadata 属性设置为 False。

  • 每个 存储可以在不同的线程中使用,用于创建实体和关系,并且应只调用其 flushcommit 方法。

  • 主存储应该在导入结束时调用其 flush_meta_datacleanup 方法。

RDF 存储

RDF 存储用于根据 Yams 与 RDF 模式转换将 RDF 数据导入 CubicWeb 数据。转换规则存储在 XY 结构中。

构建 XY 结构

您需要在您的立方体中创建一个文件(通常称为 xy.py),并导入 xy 的数据io版本

from cubes.dataio import xy

您需要注册不同的前缀(如 skos 或 foaf 这样的常用前缀已注册)

xy.register_prefix('diseasome', 'http://www4.wiwiss.fu-berlin.de/diseasome/resource/diseasome/')

默认情况下,实体类型基于 rdf 属性“rdf:type”,但您可以使用以下方式更改它

xy.register_rdf_etype_property('skos:inScheme')

也可以给一个特定的回调来从 rdf 属性确定实体类型

def _rameau_etype_callback(rdf_properties):
    if 'skos:inScheme' in rdf_properties and 'skos:prefLabel' in rdf_properties:
       return 'Rameau'

xy.register_etype_callback(_rameau_etype_callback)

URI 从“rdf:about”属性中获取,可以使用特定的回调进行规范化

def normalize_uri(uri):
    if uri.endswith('.rdf'):
       return uri[:-4]
    return uri

xy.register_uri_conversion_callback(normalize_uri)

定义转换规则

然后,您可以编写转换规则

  • xy.add_equivalence 允许您添加实体类型/属性/关系与 RDF 属性之间的基本等价性。在 Yams 部分中,您可以使用“*”作为通配符。例如,对于实体类型

    xy.add_equivalence('Gene', 'diseasome:genes')
    xy.add_equivalence('Disease', 'diseasome:diseases')

    例如,对于属性

    xy.add_equivalence('* name', 'diseasome:name')
    xy.add_equivalence('* label', 'rdfs:label')
    xy.add_equivalence('* label', 'diseasome:label')
    xy.add_equivalence('* class_degree', 'diseasome:classDegree')
    xy.add_equivalence('* size', 'diseasome:size')

    例如,对于关系

    xy.add_equivalence('Disease close_match ExternalUri', 'diseasome:classes')
    xy.add_equivalence('Disease subtype_of Disease', 'diseasome:diseaseSubtypeOf')
    xy.add_equivalence('Disease associated_genes Gene', 'diseasome:associatedGene')
    xy.add_equivalence('Disease chromosomal_location ExternalUri', 'diseasome:chromosomalLocation')
    xy.add_equivalence('* sameas ExternalUri', 'owl:sameAs')
    xy.add_equivalence('Gene gene_id ExternalUri', 'diseasome:geneId')
    xy.add_equivalence('Gene bio2rdf_symbol ExternalUri', 'diseasome:bio2rdfSymbol')
  • 可以提供一个基本 URI 来自动确定一个资源是否应被视为外部 URI 或内部关系

    xy.register_base_uri('http://www4.wiwiss.fu-berlin.de/diseasome/resource/')

    可以通过提供一个特定的回调来使用更复杂的逻辑

    def externaluri_callback(uri):
        if uri.startswith('http://www4.wiwiss.fu-berlin.de/diseasome/resource/'):
           if uri.endswith('disease') or uri.endswith('gene'):
              return False
           return True
        return True
    
    xy.register_externaluri_callback(externaluri_callback)

属性值基于 Yams 类型构建。但您可以使用特定的回调从 rdf 属性计算正确的值

def _convert_date(_object, datetime_format='%Y-%m-%d'):
    """ Convert an rdf value to a date """
    try:
       return datetime.strptime(_object.format(), datetime_format)
    except:
       return None

xy.register_attribute_callback('Date', _convert_date)

def format_isbn(rdf_properties):
    if 'bnf-onto:isbn' in rdf_properties:
       isbn = rdf_properties['bnf-onto:isbn'][0]
       isbn = [i for i in isbn if i in '0123456789']
       return int(''.join(isbn)) if isbn else None

xy.register_attribute_callback('Manifestation formatted_isbn', format_isbn)

导入数据

因此,可以使用 cubicweb-ctl 的“import-rdf”命令导入数据

cubicweb-ctl import-rdf <my-instance> <filer-or-folder>

用于读取数据的默认库是“rdflib”,但可以使用“–lib”选项使用“librdf”。

也可以使用“–rdf-format”选项强制 rdf 格式(它将自动确定,但这有时可能导致错误)。

导出数据

可以调用“rdf”视图,并将结果集创建为 RDF 文件。这是一个修改后的 CubicWeb RDFView 版本,它考虑了 dataio 立方体中更复杂的转换规则。格式也可以强制(默认为 XML),使用 url 中的“–format”选项(xml、n3 或 nt)。

示例

dataio rdf 导入的使用示例可以在 nytimes 和 diseasome 立方体中找到。

项目详情


下载文件

下载您平台上的文件。如果您不确定选择哪个,请了解更多关于 安装包 的信息。

源分布

cubicweb-dataio-0.7.0.tar.gz (41.1 kB 查看哈希值)

上传时间

由以下支持

AWS AWS 云计算和安全赞助商 Datadog Datadog 监控 Fastly Fastly CDN Google Google 下载分析 Microsoft Microsoft PSF 赞助商 Pingdom Pingdom 监控 Sentry Sentry 错误记录 StatusPage StatusPage 状态页面