跳转到主要内容

{{ 描述 }}

项目描述

Datapackage Pipelines

Travis Coveralls PyPI - Python Version

基础知识

这是什么?

datapackage-pipelines 是一个用于声明式流处理的表格数据框架。它基于 Frictionless Data 项目的概念和工具构建。

管道

本框架的基本概念是管道。

管道包含一系列处理步骤,并生成单个数据包作为其输出。每个步骤在处理器中执行,并包括以下阶段

  • 修改数据包描述符 - 例如:添加元数据、添加或删除资源、更改资源的数据模式等。
  • 处理资源 - 每个资源的每一行都按顺序处理。处理器可以删除行、添加新行或修改其内容。
  • 返回统计信息 - 如果需要,处理器可以报告一个字典,该字典将在管道执行结束时返回给用户。这可以用于计算处理数据的质量指标。

并非每个处理器都需要执行所有这些。事实上,您通常会发现在每个处理步骤中只执行这些操作之一。

pipeline-spec.yaml 文件

管道以声明式方式定义,而不是以代码方式定义。可以在pipeline-spec.yaml文件中定义一个或多个管道。该文件指定了处理器的列表(通过名称引用)以及每个处理器的执行参数。

下面是一个pipeline-spec.yaml文件的示例

worldbank-co2-emissions:
  title: CO2 emission data from the World Bank
  description: Data per year, provided in metric tons per capita.
  environment:
    DEBUG: true
  pipeline:
    -
      run: update_package
      parameters:
        name: 'co2-emissions'
        title: 'CO2 emissions (metric tons per capita)'
        homepage: 'http://worldbank.org/'
    -
      run: load
      parameters:
        from: "http://api.worldbank.org/v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel"
        name: 'global-data'
        format: xls
        headers: 4
    -
      run: set_types
      parameters:
         resources: global-data
         types:
           "[12][0-9]{3}":
              type: number
    -
      run: dump_to_zip
      parameters:
          out-file: co2-emissions-wb.zip

在这个例子中,我们看到一个名为worldbank-co2-emissions的管道。其管道由4个步骤组成

  • update_package:这是一个库处理器(见下文),它修改数据包的描述符(在我们的案例中:初始的、空的描述符) - 向数据包中添加nametitle和其他属性。
  • load:这是另一个库处理器,它将数据加载到数据包中。此资源有一个name和一个from属性,指向数据的远程位置。
  • set_types:此处理器将数据类型分配给数据中的字段。在这个例子中,看起来像年份的字段标题将被分配number类型。
  • dump_to_zip:创建一个带有提供文件名的压缩和验证过的数据包。

此外,我们还提供了一些元数据

完整的pipeline-spec.yaml文件的JSONSchema可以在这里找到

机制

管道运行的一个重要方面是数据以流的形式从一个处理器传递到另一个处理器。如果我们“变得技术性”,那么每个处理器都在其自己的专用进程中运行,其中数据包从其stdin读取并输出到其stdout。这里需要注意的重要事情是,在任何时候,没有处理器都持有整个数据集。

这种限制是设计上的 - 以保持每个处理器的内存和磁盘需求有限且与数据集大小无关。

快速入门

首先,在您的当前目录中创建一个pipeline-spec.yaml文件。如果您只是想尝试它,可以使用上面的文件。

然后,您可以在本地安装datapackage-pipelines - 请注意,由于使用了类型提示和高级asyncio使用,因此需要Python 3.6或更高版本。

$ pip install datapackage-pipelines

现在,您应该可以使用dpp命令了

$ dpp
Available Pipelines:
- ./worldbank-co2-emissions (*)

$ $ dpp run --verbose ./worldbank-co2-emissions
RUNNING ./worldbank-co2-emissions
Collecting dependencies
Running async task
Waiting for completion
Async task starting
Searching for existing caches
Building process chain:
- update_package
- load
- set_types
- dump_to_zip
- (sink)
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/update_package.py
load: DEBUG   :Starting new HTTP connection (1): api.worldbank.org:80
load: DEBUG   :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1" 200 308736
load: DEBUG   :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1" 200 308736
load: DEBUG   :Starting new HTTP connection (1): api.worldbank.org:80
load: DEBUG   :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1" 200 308736
load: DEBUG   :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1" 200 308736
set_types: INFO    :(<dataflows.processors.set_type.set_type object at 0x10a5c79b0>,)
load: INFO    :Processed 264 rows
set_types: INFO    :Processed 264 rows
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/load.py
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/set_types.py
dump_to_zip: INFO    :Processed 264 rows
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/manager/../lib/internal/sink.py
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/dump_to_zip.py
DONE V ./worldbank-co2-emissions {'bytes': 692741, 'count_of_rows': 264, 'dataset_name': 'co2-emissions', 'hash': '4dd18effcdfbf5fc267221b4ffc28fa4'}
INFO    :RESULTS:
INFO    :SUCCESS: ./worldbank-co2-emissions {'bytes': 692741, 'count_of_rows': 264, 'dataset_name': 'co2-emissions', 'hash': '4dd18effcdfbf5fc267221b4ffc28fa4'}

或者,您可以使用我们的Docker镜像

$ docker run -it -v `pwd`:/pipelines:rw \
        frictionlessdata/datapackage-pipelines
<available-pipelines>

$ docker run -it -v `pwd`:/pipelines:rw \
       frictionlessdata/datapackage-pipelines run ./worldbank-co2-emissions
<execution-logs>

命令行界面 - dpp

在命令行中运行管道使用的是 dpp 工具。

不带任何参数运行 dpp,将显示可用的管道列表。这是通过扫描当前目录及其子目录,搜索 pipeline-spec.yaml 文件并提取其中描述的管道规范列表来完成的。

每个管道都有一个标识符,由 pipeline-spec.yaml 文件的路径和该描述文件中定义的管道名称组成。

要运行管道,请使用 dpp run <pipeline-id>

您还可以使用 dpp run all 来运行所有管道,以及 dpp run dirty 来仅运行 管道(稍后会有更多介绍)。

深入查看管道

处理器解析

如之前所见,处理器是通过名称引用的。

这个名称实际上是包含处理代码的 Python 脚本名称(不含 .py 扩展名)。当尝试找到实际需要执行代码的位置时,处理器解析器会在以下预定义位置中搜索

  • 首先,它将尝试在 pipeline-spec.yaml 文件所在的目录中找到具有该名称的自定义处理器。处理器名称支持点表示法,因此您可以写入 mycode.custom_processor,它将在与管道规范文件相同的路径下的 mycode 目录中尝试找到名为 custom_processor.py 的处理器。对于这个特定的解析阶段,如果您写入 ..custom_processor,它将尝试在管道规范文件的父目录中找到该处理器。(有关如何编写自定义处理器的说明请继续阅读)
  • 如果处理器名称看起来像 myplugin.somename,它将尝试在 myplugin 插件中找到名为 somename 的处理器。也就是说,它会检查是否有名为 myplugin 的已安装插件,并且如果有的话,该插件是否发布了名为 somename 的处理器(有关插件请参阅以下内容)。
  • 如果在这一点之前没有找到处理器,它将尝试在处理器搜索路径中搜索该处理器。处理器搜索路径来自环境变量 DPP_PROCESSOR_PATH。路径中的每个 : 分隔的路径都被视为解析处理器的一个可能的起点。
  • 最后,它将尝试在包含在这个软件包中的标准处理器库中找到该处理器。

排除目录以进行管道规范扫描

默认情况下,会排除 .* 目录的扫描,您可以通过在项目根目录创建一个 .dpp_spec_ignore 文件来添加额外的目录模式排除项。此文件具有与 .gitignore 类似的语法,将基于通配符模式匹配排除目录。

例如,以下文件将忽略子目录中的 test* 目录,以及项目根目录中的 /docs 目录。

test*
/docs

缓存

通过将特定管道步骤的 cached 属性设置为 True,此步骤的输出将存储在磁盘上(在 .cache 目录中,与 pipeline-spec.yaml 文件相同的路径)。

重新运行管道将使用该缓存,从而避免执行缓存的步骤及其前驱步骤。

内部,为管道中的每个步骤计算一个哈希值——这个哈希值基于处理器的代码、它的参数以及其前驱的哈希值。如果存在具有与特定步骤完全相同的哈希值的缓存文件,则可以删除它(及其前驱),并使用该缓存文件作为管道的输入。

这样,如果代码或执行参数发生变化(无论是缓存的处理器还是任何前驱处理器),缓存就变得无效。

脏任务和保持状态

缓存哈希还用于判断流水线是否“脏”。当流水线成功执行完成后,dpp会存储缓存哈希和流水线ID。如果存储的哈希与当前计算的哈希不同,则表示代码或执行参数已修改,需要重新运行流水线。

dpp与两个存储后端一起工作。在本地运行时,它使用Python sqlite DB存储每个正在运行的任务的当前状态,包括最后的结果和缓存哈希。状态DB文件存储在dpp运行目录下的名为.dpp.db的文件中。

对于其他安装,尤其是使用任务调度器的安装,建议使用Redis后端。为了启用Redis连接,只需将环境变量DPP_REDIS_HOST设置为指向一个正在运行的Redis实例。

流水线依赖关系

您可以将一个流水线声明为依赖于另一个流水线或数据包。在计算流水线的缓存哈希时,将考虑这种依赖关系,这反过来又影响缓存文件的有效性以及“脏”状态。

  • 对于流水线依赖关系,使用该流水线的哈希进行计算。
  • 对于数据包依赖关系,使用数据包中的hash属性进行计算。

如果缺少依赖关系,则将流水线标记为“无法执行”。

pipeline-spec.yaml文件中,通过将dependencies属性添加到流水线定义来声明依赖关系。该属性应包含一个依赖关系列表,每个依赖关系是一个具有以下格式的对象

  • 一个名为pipeline的单个键,其值为依赖的流水线ID
  • 一个名为datapackage的单个键,其值为依赖的数据包标识符(或URL)

示例

cat-vs-dog-populations:
  dependencies:
    -
      pipeline: ./geo/region-areal
    -
      datapackage: http://pets.net/data/dogs-per-region/datapackage.json
    -
      datapackage: http://pets.net/data/dogs-per-region
  ...

验证

每个处理器的输入都会自动进行正确性验证

  • 在传递给处理器之前,始终会验证数据包,因此处理器无法以使其无效的方式修改数据包。

  • 除非在步骤的info中显式将validate标志设置为True,否则不会将数据与其各自的JSON表模式进行验证。这主要有两个原因

    • 从性能角度来看,在每一步验证数据都非常消耗CPU资源
    • 在某些情况下,您可能在一步中修改模式,在另一步中修改数据,因此您只想在所有更改完成后验证一次数据

    在任何情况下,当使用set_types标准处理器时,它将使用新的类型验证和转换输入数据。

数据流集成

Dataflows是datapackage-pipelines的后继产品,为运行流水线提供了更Pythonic的接口。您可以使用flow属性而不是run将dataflows集成到流水线规范中。例如,给定以下flow文件,保存在my-flow.py

from dataflows import Flow, dump_to_path, load, update_package

def flow(parameters, datapackage, resources, stats):
    stats['multiplied_fields'] = 0

    def multiply(field, n):
        def step(row):
            row[field] = row[field] * n
            stats['multiplied_fields'] += 1
        return step

    return Flow(update_package(name='my-datapackage'),
                multiply('my-field', 2))

以及同一目录下的pipeline-spec.yaml

my-flow:
  pipeline:
  - run: load_resource
    parameters:
      url: http://example.com/my-datapackage/datapackage.json
      resource: my-resource
  - flow: my-flow
  - run: dump_to_path

您可以使用dpp run my-flow运行流水线。

标准处理器库

库中提供了一些内置处理器。

update_package

向数据包添加元数据。

参数:

可以提供任何允许的属性(根据规范)。

示例:

- run: update_package
  parameters:
    name: routes-to-mordor
    license: CC-BY-SA-4
    author: Frodo Baggins <frodo@shire.me>
    contributors:
      - samwise gamgee <samwise1992@yahoo.com>

update_resource

向资源添加元数据。

参数:

  • resources
    • 要操作的资源的名称
    • 匹配资源名称的正则表达式
    • 资源名称列表
    • 包中资源的索引
    • 省略表示应在所有资源上执行操作
  • metadata - 一个字典,包含任何允许的属性(根据规范)。

示例:

- run: update_resource
  parameters:
    resources: ['resource1']
    metadata:
      path: 'new-path.csv'

load

将数据加载到包中,推断模式并可选地转换值。

参数:

  • from - 要加载的数据的位置。可以是以下之一
    • 本地路径(例如:/path/to/the/data.csv)
    • 远程URL(例如:https://path.to/the/data.csv
    • 其他支持链接,基于当前在tabulator中支持的方案和格式
    • 本地路径或远程URL到datapackage.json文件(例如:https://path.to/data_package/datapackage.json
    • 环境变量的引用,包含源位置,形式为env://ENV_VAR
    • 包含(datapackage_descriptor,resources_iterator)的元组
  • resources - 可选的,仅在源指向datapackage.json文件或datapackage/resource元组时相关。值应该是以下之一
    • 要加载的单个资源名称
    • 匹配要加载的资源名称的正则表达式
    • 要加载的资源名称列表
    • 'None'表示加载所有资源
    • 包中资源的索引
  • validate - 是否将数据转换为推断的数据类型。仅在不是从datapackage加载数据时相关
  • 其他选项 - 根据加载的文件,额外选项(例如,对于Excel文件等,请参阅上面的tabulator链接)

printer

仅打印看到的内容。适用于调试。

参数:

  • num_rows - 修改要预览的行数,printer将从流的不同位置打印该数量的多个样本
  • last_rows - 打印流中最后多少行。可选的,默认值为num_rows的值
  • fields - 可选的,要预览的字段名列表
  • resources - 可选的,允许限制打印的资源,与load处理器资源参数的语义相同

set_types

将数据类型和类型选项设置为流式资源中的字段,并确保数据仍然与新类型有效。

这允许对现有表模式进行修改,通常是对stream_remote_resources的默认模式。

参数:

  • resources - 要修改的资源。可以是

    • 字符串列表,解释为要流式传输的资源名称
    • 字符串,解释为用于匹配资源名称的正则表达式

    如果省略,则流式传输datapackage中的所有资源。

  • regex - 如果设置为False,字段名称将被解释为字符串而不是正则表达式(默认为True

  • types - 字段名称和字段定义之间的映射。

    • 字段名称是字段的名称,或者是一个匹配多个字段的正则表达式。
    • 字段定义是一个遵守JSON Table Schema规范的对象。您可以使用null代替对象来从模式中删除字段。

示例:

- run: add_resources
  parameters:
    name: example-resource
    url: http://example.com/my-csv-file.csv
    encoding: "iso-8859-2"
- run: stream_remote_resources
- run: set_types
  parameters:
    resources: example-resource
    types:
      age:
        type: integer
      "yearly_score_[0-9]{4}":
        type: number
      "date of birth":
        type: date
        format: "%d/%m/%Y"
      "social security number": null

load_metadata

从现有数据包加载元数据。

参数:

从位于url的数据包中加载元数据。

将复制所有已加载的数据包属性(除了resources

示例:

- run: load_metadata
  parameters:
    url: http://example.com/my-datapackage/datapackage.json

load_resource

从现有数据包加载表格资源。

参数:

从位于url的数据包中加载resource参数指定的资源。将复制加载的资源的所有属性 - 包括pathschema

  • url - 指向包含所需资源的datapackage的URL

  • resource - 可以是

    • 字符串列表,解释为要加载的资源名称
    • 字符串,解释为用于匹配资源名称的正则表达式
    • 一个整数,表示资源在数据包中的索引(基于0)
  • limit-rows - 如果提供,将限制从源中获取的行数。取一个整数值,指定要流式传输的源行数

  • log-progress-rows - 如果提供,将记录加载进度。取一个整数值,指定记录进度的行间隔数

  • stream - 如果提供且设置为false,则资源将被添加到数据包中,但不会流式传输

  • resources - 可以用 resource 属性替换,以支持加载资源并修改输出资源元数据

    • 值是一个字典,包含源资源名称到要加载的映射,以及包含要应用于加载资源的描述符更新的字典
  • required - 如果提供并设置为false,则数据包不可用或资源缺失时不会失败

示例:

- run: load_resource
  parameters:
    url: http://example.com/my-datapackage/datapackage.json
    resource: my-resource
- run: load_resource
  parameters:
    url: http://example.com/my-other-datapackage/datapackage.json
    resource: 1
- run: load_resource
  parameters:
    url: http://example.com/my-datapackage/datapackage.json
    resources:
      my-resource:
        name: my-renamed-resource
        path: my-renamed-resource.csv

concatenate

连接多个流式资源并将它们转换为单个资源。

参数:

  • sources - 要连接的资源。与 stream_remote_resources 中的 resources 具有相同的语义。

    如果省略,则将连接数据包中的所有资源。

    要连接的资源必须在数据包中按顺序出现。

  • target - 保存连接数据的目标资源。至少应定义以下属性

    • name - 资源名称
    • path - 此文件在数据包中的路径。

    如果省略,目标资源将接收 concat 名称,并将保存到数据包中的 data/concat.csv

  • fields - 源和目标之间字段的映射,因此键是目标字段名称,值是源字段名称的列表。

    此映射用于创建目标资源模式。

    请注意,目标字段名称始终假定映射为其自身。

示例:

- run: concatenate
  parameters:
    target:
      name: multi-year-report
      path: data/multi-year-report.csv
    sources: 'report-year-20[0-9]{2}'
    fields:
      activity: []
      amount: ['2009_amount', 'Amount', 'AMOUNT [USD]', '$$$']

在此示例中,我们连接所有看起来像 report-year-<year> 的资源,并将它们输出到 multi-year-report 资源。

输出包含两个字段

  • activity,在所有源中称为 activity
  • amount,在不同资源中有不同的名称(例如 Amount2009_amountamount 等)

join

连接两个流式资源。

在我们的情况下,“连接”意味着获取 目标 资源,并通过对 资源中的数据查找来向其每一行添加字段。

连接操作的特例是当没有目标流时,使用源中的所有唯一行来创建它。此模式称为 去重 模式 - 将创建目标资源,并将从源中添加去重行。

参数:

  • source - 关于 资源的信息

    • name - 资源名称
    • key - 其中之一
      • 字段名称列表,应用作查找键
      • 字符串,被解释为Python格式字符串,用于形成键(例如 {<field_name_1>}:{field_name_2}
    • delete - 连接后从数据包中删除(默认为 False
  • target - 保存连接数据的目标资源。至少应定义以下属性

    • name - 与 source 相同
    • key - 与 source 相同,或 null 以创建目标资源并执行 去重
  • fields - 从源资源到目标资源的字段映射。键应该是目标资源中的字段名称。值可以定义两个属性

    • name - 源中的字段名称(默认情况下与目标字段名称相同)

    • aggregate - 聚合策略(如何处理具有相同键的多个 行)。可以取以下选项

      • sum - 对聚合值进行汇总。对于数值,它是算术和;对于字符串,它是字符串的连接;对于其他类型将产生错误。

      • avg - 计算聚合值的平均值。

        对于数值,它是算术平均,对于其他类型将产生错误。

      • max - 计算聚合值的最大值。

        对于数值,它是算术最大值,对于字符串是字典最大值;对于其他类型将产生错误。

      • min - 计算聚合值的最小值。

        对于数值,它是算术最小值,对于字符串是字典最小值;对于其他类型将产生错误。

      • first - 取遇到的首个值

      • last - 取遇到的最后一个值

      • count - 计算特定键出现的次数。对于此方法,指定 name 不是必需的。如果指定,则 count 将计算该源字段非空值的数量。

      • counters - 计算不同值的出现次数。将返回一个形如 [value, count-of-value] 的二维数组。

      • set - 收集聚合字段的全部不同值,无序

      • array - 收集聚合字段的所有值,按出现顺序

      • any - 选取任意值。

      默认情况下,aggregateany 值。

    如果既不需要指定 name 也不需要指定 aggregate,映射可以映射到空对象 {}null

  • full - 布尔值,

    • 如果为 True(默认值),则在源中失败的查找将导致源中出现 "null" 值。
    • 如果为 False,则在源中失败的查找将导致从目标中删除行。

重要提示:"source" 资源 必须 在数据包中出现在 "target" 资源之前。

示例:

- run: join
  parameters:
    source:
      name: world_population
      key: ["country_code"]
      delete: yes
    target:
      name: country_gdp_2015
      key: ["CC"]
    fields:
      population:
        name: "census_2015"
    full: true

上述示例旨在创建一个包含世界上每个国家 GDP 和人口的包。

我们有一个资源 (world_population),其数据如下所示

country_code country_name census_2000 census_2015
UK 联合王国 58857004 64715810
...

还有另一个资源 (country_gdp_2015),其数据如下所示

CC GDP (£m) 净债务 (£m)
UK 1832318 1606600
...

使用 join 命令将根据 country_code / CC 字段将两个数据集的行匹配起来,然后将 census_2015 字段中的值复制到新的 population 字段中。

结果数据包将删除 world_population 资源,并且 country_gdp_2015 资源看起来如下

CC GDP (£m) 净债务 (£m) population
UK 1832318 1606600 64715810
...

更复杂的示例

- run: join
  parameters:
    source:
      name: screen_actor_salaries
      key: "{production} ({year})"
    target:
      name: mgm_movies
      key: "{title}"
    fields:
      num_actors:
        aggregate: 'count'
      average_salary:
        name: salary
        aggregate: 'avg'
      total_salaries:
        name: salary
        aggregate: 'sum'
    full: false

此示例旨在分析 MGM 电影院中银幕演员的薪资。

再次,我们有一个资源 (screen_actor_salaries),其数据如下所示

year production actor salary
2016 Vertigo 2 先生·T 15000000
2016 Vertigo 2 小罗伯特·唐尼 7000000
2015 堕落 - 复活 詹妮弗·劳伦斯 18000000
2015 阿尔夫 - 返回梅尔马克 摇滚 12000000
...

还有另一个资源 (mgm_movies),其数据如下所示

title director producer
Vertigo 2 (2016) 林赛·罗韩 李嘉诚
机器人 - 电影 (2018) 先生·T 先生·T
...

使用 join 命令将根据电影名称和生产年份将两个数据集的行匹配起来。注意我们如何通过使用不同的键模式来克服不兼容的字段。

结果数据集可能看起来如下

title director producer num_actors average_salary total_salaries
Vertigo 2 (2016) 林赛·罗韩 李嘉诚 2 11000000 22000000
...

filter

筛选流媒体资源。

过滤器(filter)接受相等和不等条件,并对所选资源中的每一行进行测试。如果没有条件验证,则该行将被丢弃。

参数:

  • resources - 在哪些资源上应用过滤器。与 stream_remote_resources 中的 resources 具有相同的语义。
  • in - 键到值的映射,这些值对应于 row[key] == value 条件
  • out - 键到值的映射,这些值对应于 row[key] != value 条件

Both in and out 应该是对象的列表。但是,out 应仅包含一个元素。

示例:

仅筛选美国和欧洲国家,排除主要语言为英语的国家

- run: filter
  parameters:
    resources: world_population
    in:
      - continent: america
      - continent: europe
- run: filter
  parameters:
    resources: world_population
    out:
      - language: english

要按多个值过滤 out,需要多个过滤器处理器,而不是多个 out 元素。否则,某些条件总会验证,并且不会丢弃任何行

- run: filter
  parameters:
    resources: world_population
    out:
      - language: english
- run: filter
  parameters:
    resources: world_population
    out:
      - language: swedish

排序

按键对流式资源进行排序。

sort 接受资源列表和键(作为 Python 格式字符串在行字段上)。它将输出每个资源的行,根据键(默认情况下为升序)进行排序。

参数:

  • resources - 要排序的资源。与 stream_remote_resources 中的 resources 具有相同的语义。
  • sort-by - 字符串,它将被解释为用于形成键的 Python 格式字符串(例如,{<field_name_1>}:{field_name_2}
  • reverse - 可选布尔值,如果设置为 true - 以相反的顺序排序

示例:

仅筛选美国和欧洲国家,排除主要语言为英语的国家

- run: sort
  parameters:
    resources: world_population
    sort-by: "{country_name}"

去重

根据资源的主要键在资源中去重行。

deduplicate 接受资源指定器 - 对于每个资源,它将仅输出基于主键字段值的唯一行。具有重复主键的行将被忽略。

参数:

  • resources - 要排序的资源。与 stream_remote_resources 中的 resources 具有相同的语义。

示例:

world-population 资源中去重行。

- run: deduplicate
  parameters:
    resources: world_population

重复

复制资源。

duplicate 接受数据包中的单个资源名称。然后,它将在输出数据包中复制它,具有不同的名称和路径。复制的资源将紧接其原始资源之后出现。

参数:

  • source - 要复制的资源。资源的名称。
  • target-name - 新复制的资源的名称。
  • target-path - 新复制的资源的路径。
  • duplicate_to_end - 使复制的资源出现在末尾,而不是紧接其原始资源之后。

示例:

仅筛选美国和欧洲国家,排除主要语言为英语的国家

- run: duplicate
  parameters:
    source: original-resource
    target-name: copy-of-resource
    target-path: data/duplicate.csv

删除字段

从流式资源中删除字段(列)

delete_fields 接受资源列表和要删除的字段列表

注意:如果提供了多个资源,则所有资源都应包含要删除的所有字段

参数:

  • resources - 要从其中删除列的资源。与 stream_remote_resources 中的 resources 具有相同的语义。
  • fields - 要删除的字段(列)名称列表(确切名称或匹配字段名称的正则表达式)
  • regex - 如果设置为False,字段名称将被解释为字符串而不是正则表达式(默认为True

示例:

world_population 资源中删除 country_namecensus_2000

- run: delete_fields
  parameters:
    resources: world_population
    fields:
      - country_name
      - census_2000

添加计算字段

向流式资源添加字段(s)

add_computed_field 接受资源列表和要添加到现有资源中的字段。它将为每个资源输出具有新字段(列)的行。 add_computed_field 允许在将值插入目标字段之前执行各种操作。

参数:

  • resources - 要添加字段的资源。与 stream_remote_resources 中的 resources 具有相同的语义。
  • fields - 要在目标字段上执行的操作列表。
    • operation: 在同一行的预定义列的值上执行的操作。可用操作
      • constant - 添加一个常数值
      • sum - 行中给定列的总和。
      • avg - 行中给定列的平均值。
      • min - 行中给定列的最小值。
      • max - 行中给定列中的最大值。
      • multiply - 行中给定列的乘积。
      • join - 将行中的两个或多个列值连接起来。
      • format - Python 格式化字符串,用于形成值,例如:my name is {first_name}
    • target - 新字段的名称。
    • source - 要执行操作的列的列表(在formatconstant的情况下不要求)。
    • with - 传递给constantformatjoin操作的字符串。
      • constant中 - 用作常量值。
      • format中 - 用作具有现有列值的 Python 格式化字符串,例如:{first_name} {last_name}
      • join中 - 用作分隔符。

示例:

以下示例向salaries资源添加了4个新字段。

run: add_computed_field
parameters:
  resources: salaries
  fields:
    -
      operation: sum
      target: total
      source:
        - jan
        - feb
        - may
    -
      operation: avg
      target: average
      source:
        - jan
        - feb
        - may
    -
      operation: format
      target: full_name
      with: '{first_name} {last_name}'
    -
      operation: constant
      target: status
      with: single

我们有一个资源(salaries),其数据看起来像:

first_name last_name jan feb mar
John Doe 100 200 300
...

结果数据集可能看起来如下

first_name last_name last_name jan feb mar average total status
John Doe John Doe 100 200 300 200 600 single
...

find_replace

从字段值中查找和替换字符串或模式。

参数:

  • resources - 清理字段值的资源。与stream_remote_resources中的resources具有相同的语义。

  • fields - 要替换值的字段列表。

    • name - 要替换值的字段名称。
    • patterns - 要从字段中查找和替换的模式列表。
      • find - 字符串,解释为正则表达式以匹配字段值。
      • replace - 字符串,解释为正则表达式以替换匹配的模式。

示例:

以下示例使用正则表达式和精确的字符串模式替换字段值。

run: find_replace
parameters:
  resources: dates
  fields:
    -
      name: year
      patterns:
        -
          find: ([0-9]{4})( \(\w+\))
          replace: \1
    -
      name: quarter
      patterns:
        -
          find: Q1
          replace: '03-31'
        -
          find: Q2
          replace: '06-31'
        -
          find: Q3
          replace: '09-30'
        -
          find: Q4
          replace: '12-31'

我们有一个资源(dates),其数据看起来像:

year quarter
2000 (1) 2000-Q1
...

结果数据集可能看起来如下

year quarter
2000 2000-03-31
...

unpivot

将表格数据转换为一行一条记录。

参数:

  • resources - 要unpivot的资源。与stream_remote_resources中的resources具有相同的语义。
  • extraKeyFields - 目标字段定义的列表,每个定义是一个包含至少以下属性的对象(未旋转的列值将放在这里)
    • name - 目标字段的名称。
    • type - 目标字段的类型。
  • extraValueField - 目标字段定义 - 一个包含至少以下属性的对象(未旋转的单元格值将放在这里)
    • name - 目标字段的名称。
    • type - 目标字段的类型。
  • unpivot - 源字段定义的列表,每个定义是一个包含至少以下属性的对象
    • name - 可以是简单的名称,也可以是匹配原始字段名称的正则表达式。
    • keys - 目标字段名称与原始字段值的映射
      • 键应该是extraKeyFields中的目标字段名称
      • 值可以是简单地要插入的常量值,也可以是匹配name的正则表达式。

示例:

以下示例将数据旋转到3个新字段:yeardirectionamount

parameters:
  resources: balance
  extraKeyFields:
    -
      name: year
      type: integer
    -
      name: direction
      type: string
      constraints:
        enum:
          - In
          - Out
  extraValueField:
      name: amount
      type: number
  unpivot:
    -
      name: 2015 incomes
      keys:
        year: 2015
        direction: In
    -
      name: 2015 expenses
      keys:
        year: 2015
        direction: Out
    -
      name: 2016 incomes
      keys:
        year: 2016
        direction: In
    -
      name: 2016 expenses
      keys:
        year: 2016
        direction: Out

我们有一个资源(balance),其数据看起来像:

company 2015 incomes 2015 expenses 2016 incomes 2016 expenses
Inc 1000 900 2000 1700
Org 2000 800 3000 2000
...

结果数据集可能看起来如下

company year direction amount
Inc 2015 In 1000
Inc 2015 Out 900
Inc 2016 In 2000
Inc 2016 Out 1700
Org 2015 In 2000
Org 2015 Out 800
Org 2016 In 3000
Org 2016 Out 2000
...

可以通过定义正则表达式而不是常量值来实现类似的结果。

parameters:
  resources: balance
  extraKeyFields:
    -
      name: year
      type: integer
    -
      name: direction
      type: string
      constraints:
        enum:
          - In
          - Out
  extraValueField:
      name: amount
      type: number
  unpivot:
    -
      name: ([0-9]{4}) (\\w+)  # regex for original column
      keys:
        year: \\1  # First member of group from above
        direction: \\2  # Second member of group from above

dump_to_sql

将数据包保存到 SQL 数据库。

参数:

  • engine - 连接到 SQL 数据库的连接字符串(URL 语法)。还支持env://<environment-variable>,表示连接字符串应从指示的环境变量中获取。如果没有指定,则假定默认值为env://DPP_DB_ENGINE
  • tables - 资源与数据库表之间的映射。键是表名,值是具有以下属性的对象
    • resource-name - 应该导出到表中的资源名称
    • mode - 数据应如何写入数据库。可能的值
      • rewrite(默认)- 重写表,所有之前的数据(如果有)将被删除。
      • append - 在不更改已存在数据的情况下写入新行。
      • update - 根据一组“更新键”更新表。对于每一行新数据,检查数据库中是否存在可以更新的现有行(即,在所有更新键中具有相同值的现有行)。如果存在,则更新现有行中的其余列。否则,将新行插入到数据库中。
    • update_keys - 仅适用于update模式。应用于检查行存在的字段名列表。如果未指定,则默认使用模式的primaryKey
    • indexes - 待定
  • updated_column - 可选:将在数据中添加具有布尔值的列的名称
    • true - 行被更新
    • false - 行被插入
  • updated_id_column - 可选:将在数据中添加并包含更新行ID的列的名称

dump_to_path

将数据包保存到文件系统路径。

参数:

  • out-path - 输出路径的名称,其中将存储datapackage.json

    如果不存在,则将创建此路径以及内部数据包路径。

    如果省略,则假定.(当前目录)。

  • force-format - 指定是否强制所有输出文件使用相同的格式

    • 如果为True(默认),所有资源将使用相同的格式
    • 如果为False,格式将根据文件扩展名推断。未知扩展名的资源将被丢弃。
  • format - 指定要生成的输出文件类型(如果force-format为true):csv(默认)或json

  • add-filehash-to-path:指定是否将文件MD5散列包含到资源路径中。默认为False。如果为True,则将散列嵌入路径中,如下所示

    • 如果原始路径为path/to/the/file.ext
    • 修改后的路径将是path/to/the/HASH/file.ext
  • counters - 指定是否计算行数、字节数或数据的MD5散列及其存储位置。一个具有以下属性的对象

    • datapackage-rowcount:数据包的总行数应存储在哪里(默认:count_of_rows
    • datapackage-bytes:数据包的总字节数应存储在哪里(默认:bytes
    • datapackage-hash:数据包的MD5散列应存储在哪里(默认:hash
    • resource-rowcount:每个资源总行数应存储在哪里(默认:count_of_rows
    • resource-bytes:每个资源总字节数应存储在哪里(默认:bytes
    • resource-hash:每个资源的MD5散列应存储在哪里(默认:hash)每个属性都可以设置为null以防止计数。每个属性都可以是点分隔的字符串,用于在嵌套对象中存储数据(例如stats.rowcount
  • pretty-descriptor:指定数据包描述符(datapackage.json)文件的外观

    • False(默认)- 描述符将写在一行中。
    • True - 描述符将有缩进和每个键的新行,使其更易于阅读。

dump_to_zip

将数据包保存到压缩归档。

参数:

  • out-file - 存储压缩数据的输出文件名
  • force-formatformat - 与 dump_to_path 中相同
  • add-filehash-to-path - 与 dump_to_path 中相同
  • counters - 与 dump_to_path 中相同
  • pretty-descriptor - 与 dump_to_path 中相同

已弃用的处理器

这些处理器将在下一个主要版本中移除。

add_metadata

update_package 的别名,保留是为了向后兼容的原因。

add_resource

向数据包中添加新的外部表格资源。

参数:

您应提供 nameurl 属性,以及其他在 规范 中定义的可选属性。

url 表示该资源的所在位置。稍后当 stream_remote_resources 运行时,它将使用 url(存储在资源中的 dpp:streamedFrom 属性中)来读取数据行并将它们推入管道。

请注意,url 也支持 env://<环境变量>,这表示应从指示的环境变量中获取资源 URL。这在您提供包含敏感信息(如从数据库表流式传输的 SQL 连接字符串)的字符串时很有用。

参数基本上是传递给 tabulator.Stream 实例的参数(请参阅 API)。除此之外,您还可以传递一个 constants 参数,它应该是标题到字符串值的映射。当与 stream_remote_resources 结合使用时,这些常量值将添加到每个生成的行(以及默认模式)中。

您也可以在这里提供模式,或使用 stream_remote_resources 处理器生成的默认模式。如果指定了 path,则将使用它。如果没有指定,stream_remote_resources 处理器将为您分配一个具有 csv 扩展名的 path

示例:

- run: add_resource
  parameters:
    url: http://example.com/my-excel-file.xlsx
    sheet: 1
    headers: 2
- run: add_resource
  parameters:
    url: http://example.com/my-csv-file.csv
    encoding: "iso-8859-2"

stream_remote_resources

将外部资源转换为流式资源。

外部资源是链接到远程数据源(URL 或文件路径)的资源,但不经过管道处理,保持原样。

流式资源是可以由管道处理的资源,其输出将作为结果数据包的一部分保存。

如果一个资源没有模式,则会自动在这里创建一个默认模式,通过从数据源中的每一列创建一个 string 字段。

参数:

  • resources - 要流式传输的资源。可以是

    • 字符串列表,解释为要流式传输的资源名称
    • 字符串,解释为用于匹配资源名称的正则表达式

    如果省略,则流式传输datapackage中的所有资源。

  • ignore-missing - 如果为 true,则缺失的资源不会引发错误,但将被视为 '空'(即零行)。具有空 URL 的资源将被视为相同(即生成一个 '空' 资源)。

  • limit-rows - 如果提供,将限制从源中获取的行数。取一个整数值,指定要流式传输的源行数

示例:

- run: stream_remote_resources
  parameters:
    resources: ['2014-data', '2015-data']
- run: stream_remote_resources
  parameters:
    resources: '201[67]-data'

此处理器还支持加载纯文本资源(例如 HTML 页面),并将它们作为表格数据处理 - 分隔为具有单个 "data" 列的行。要启用此行为,请向资源添加以下属性:"format": "txt"

dump.to_sql

dump_to_sql 的别名,保留是为了向后兼容的原因。

dump.to_path

将数据包保存到文件系统路径。

参数:

  • out-path - 输出路径的名称,其中将存储datapackage.json

    如果不存在,则将创建此路径以及内部数据包路径。

    如果省略,则假定.(当前目录)。

  • force-format - 指定是否强制所有输出文件使用相同的格式

    • 如果为True(默认),所有资源将使用相同的格式
    • 如果为False,格式将根据文件扩展名推断。未知扩展名的资源将被丢弃。
  • format - 指定要生成的输出文件类型(如果force-format为true):csv(默认)或json

  • handle-non-tabular - 指定是否应将非表格资源(即没有 schema 的资源)也一起输出到结果数据包中。(有关更多详细信息,请参阅以下说明)

  • add-filehash-to-path:指定是否将文件MD5散列包含到资源路径中。默认为False。如果为True,则将散列嵌入路径中,如下所示

    • 如果原始路径为path/to/the/file.ext
    • 修改后的路径将是path/to/the/HASH/file.ext
  • counters - 指定是否计算行数、字节数或数据的MD5散列及其存储位置。一个具有以下属性的对象

    • datapackage-rowcount:数据包的总行数应存储在哪里(默认:count_of_rows
    • datapackage-bytes:数据包的总字节数应存储在哪里(默认:bytes
    • datapackage-hash:数据包的MD5散列应存储在哪里(默认:hash
    • resource-rowcount:每个资源总行数应存储在哪里(默认:count_of_rows
    • resource-bytes:每个资源总字节数应存储在哪里(默认:bytes
    • resource-hash:每个资源的MD5散列应存储在哪里(默认:hash)每个属性都可以设置为null以防止计数。每个属性都可以是点分隔的字符串,用于在嵌套对象中存储数据(例如stats.rowcount
  • pretty-descriptor:指定数据包描述符(datapackage.json)文件的外观

    • False(默认)- 描述符将写在一行中。
    • True - 描述符将有缩进和每个键的新行,使其更易于阅读。
  • file-formatters:指定自定义文件格式处理器。一个将格式名称映射到 Python 模块和类名的对象。

    • 允许覆盖现有的 csvjson 格式处理器或添加对新格式的支持。
    • 请注意,此类更改可能会导致结果数据包与 frictionlessdata 规范不兼容,并可能导致互操作性问题。
    • 示例用法: pipeline-spec.yaml(在 custom-formatters 管道下),XLSXFormat 类

dump.to_zip

将数据包保存到压缩归档。

参数:

  • out-file - 存储压缩数据的输出文件名
  • force-formatformat - 与 dump_to_path 中相同
  • handle-non-tabular - 与 dump_to_path 中相同
  • add-filehash-to-path - 与 dump_to_path 中相同
  • counters - 与 dump_to_path 中相同
  • pretty-descriptor - 与 dump_to_path 中相同
  • file-formatters - 与 dump_to_path 中相同

说明

dump.to_pathdump.to_zip 处理器也将处理非表格资源。这些资源必须同时具有 urlpath 属性,并且 不能 包含 schema 属性。在这种情况下,文件将从 url 下载并放置在提供的 path 中。

自定义处理器

对于任何非平凡的处理任务,您可能会遇到无法使用标准库处理器解决的问题,这是完全合理的。

为此,您可能需要编写自己的处理器 - 这里是如何操作的。

编写处理器的API有两个 - 高级API和低级API。

高级处理器API

高级API对于大多数处理器类型非常有用。

from datapackage_pipelines.wrapper import process

def modify_datapackage(datapackage, parameters, stats):
    # Do something with datapackage
    return datapackage

def process_row(row, row_index,
                resource_descriptor, resource_index,
                parameters, stats):
    # Do something with row
    return row

process(modify_datapackage=modify_datapackage,
        process_row=process_row)

高级API由一个方法组成,即 process 方法,它接受两个函数

  • modify_datapackage - 该方法可以对数据包描述符进行更改(如有必要),例如添加元数据、添加资源、修改资源模式等。

    也可以在需要时用于初始化代码。

    它具有以下参数

    • datapackage 是需要修改的当前数据包描述符。需要返回修改后的数据包描述符。
    • parameters 是一个字典,包含处理器的参数,如 pipeline-spec.yaml 文件中提供的那样。
    • stats 是一个字典,应在过程中修改以收集指标和测量值(例如,验证检查、行计数等)
  • process_row - 该方法修改流中的单个行。它接收以下参数

    • row 是一个包含要处理的行的字典
    • row_index 是该行在资源中的索引
    • resource_descriptor 是当前正在处理的资源的描述符对象
    • resource_index 是资源在数据包中的索引
    • parameters 是一个字典,包含处理器的参数,如 pipeline-spec.yaml 文件中提供的那样。
    • stats 是一个字典,应在过程中修改以收集指标和测量值(例如,验证检查、行计数等)

    并产生零个或多个处理后的行。

一些示例

# Add license information
from datapackage_pipelines.wrapper import process

def modify_datapackage(datapackage, parameters, stats):
    datapackage['license'] = 'CC-BY-SA'
    return datapackage

process(modify_datapackage=modify_datapackage)
# Add new column with constant value to first resource
# Column name and value are taken from the processor's parameters
from datapackage_pipelines.wrapper import process

def modify_datapackage(datapackage, parameters, stats):
    datapackage['resources'][0]['schema']['fields'].append({
      'name': parameters['column-name'],
      'type': 'string'
    })
    return datapackage

def process_row(row, row_index, resource_descriptor, resource_index, parameters, stats):
    if resource_index == 0:
        row[parameters['column-name']] = parameters['value']
    return row

process(modify_datapackage=modify_datapackage,
        process_row=process_row)
# Row counter
from datapackage_pipelines.wrapper import process

def modify_datapackage(datapackage, parameters, stats):
    stats['row-count'] = 0
    return datapackage

def process_row(row, row_index, resource_descriptor, resource_index, parameters, stats):
    stats['row-count'] += 1
    return row

process(modify_datapackage=modify_datapackage,
        process_row=process_row)

低级处理器API

在某些情况下,高级API可能过于限制。在这种情况下,您应考虑使用低级API。

from datapackage_pipelines.wrapper import ingest, spew

if __name__ == '__main__':
  with ingest() as ctx:

    # Initialisation code, if needed

    # Do stuff with datapackage
    # ...

    stats = {}

    # and resources:
    def new_resource_iterator(resource_iterator_):
        def resource_processor(resource_):
            # resource_.spec is the resource descriptor
            for row in resource_:
                # Do something with row
                # Perhaps collect some stats here as well
                yield row
        for resource in resource_iterator_:
            yield resource_processor(resource)

    spew(ctx.datapackage,
         new_resource_iterator(ctx.resource_iterator),
         ctx.stats)

上面的代码片段显示了大多数低级处理器的结构。

我们总是从调用 ingest() 开始 - 这个方法为我们提供了上下文,包含执行参数、数据包描述符(从前一个步骤输出)以及所有流式资源的行的迭代器。

我们通过调用 spew() 完成处理,该调用将处理后的数据发送到管道中的下一个处理器。 spew 接收

  • 修改后的数据包描述符;
  • (可能新的)资源迭代器;
  • 一个统计对象,它将添加到之前步骤的统计信息中,并在管道完成时返回给用户;
  • 可选地,一个在迭代资源完成后但在向其他处理器发出完成信号之前被调用的 finalizer 函数。您可以用它来关闭任何打开的文件,例如。

更深入的解释

spew 按以下顺序写入它接收到的数据

  • 首先,写入 datapackage 参数到流中。这意味着必须在调用 spew 之前完成对数据包描述符的所有修改。一个常见的陷阱是在资源迭代器内部修改数据包描述符 - 尽量避免这样做,因为下一个处理器将接收到的描述符将是错误的。
  • 然后它开始迭代资源。对于每个资源,它遍历其行并将每行写入流中。这个迭代过程最终会导致原始资源迭代器(从ingest返回的那个)的迭代。反过来,这会导致进程的输入流被读取。由于操作系统中的缓冲方式,"慢"处理器会缓慢地读取它们的输入,导致在其之前的处理器在I/O上等待,而它们的CPU密集型对应处理器完成处理。而"快"处理器不会无目的地工作,而是会睡眠等待传入数据或等待它们的输出缓冲区清空。这里实现的是所有数据行都大致同时处理,并且没有处理器在后续处理步骤中处理可能失败的行上"走得过远"。
  • 然后统计信息被写入流中。这意味着统计信息可以在迭代过程中修改,并且只有迭代完成后才会使用其值。
  • 最后,如果收到了(如果有的话),会调用finalizer方法。

一些示例

我们将从上面提到的相同的处理器开始,现在使用低级API实现。

# Add license information
from datapackage_pipelines.wrapper import ingest, spew

if __name__ == '__main__':
  with ingest() as ctx:
    ctx.datapackage['license'] = 'MIT'
    spew(ctx.datapackage, ctx.resource_iterator)
# Add new column with constant value to first resource
# Column name and value are taken from the processor's parameters
from datapackage_pipelines.wrapper import ingest, spew

parameters, datapackage, resource_iterator = ingest()

datapackage['resources'][0]['schema']['fields'].append({
   'name': parameters['column-name'],
   'type': 'string'
})

def new_resource_iterator(resource_iterator_):
    def resource_processor(resource_):
        for row in resource_:
            row[parameters['column-name']] = parameters['value']
            yield row

    first_resource = next(resource_iterator_)
    yield(resource_processor(first_resource))

    for resource in resource_iterator_:
        yield resource

spew(datapackage, new_resource_iterator(resource_iterator))
# Row counter
from datapackage_pipelines.wrapper import ingest, spew

_, datapackage, resource_iterator = ingest()

stats = {'row-count': 0}

def new_resource_iterator(resource_iterator_):
    def resource_processor(resource_):
        for row in resource_:
            stats['row-count'] += 1
            yield row

    for resource in resource_iterator_:
        yield resource_processor(resource)

spew(datapackage, new_resource_iterator(resource_iterator), stats)

接下来的示例展示了如何实现一个简单的网络爬虫。虽然不是严格必需的,但网络爬虫通常是管道中的第一个处理器。因此,它们可以忽略传入的数据包和资源迭代器,因为没有之前的处理器生成数据。

# Web Scraper
import requests
from datapackage_pipelines.wrapper import ingest, spew
from datapackage_pipelines.utilities.resources import PROP_STREAMING

parameters, _, _ = ingest()

host = parameters['ckan-instance']
package_list_api = 'https://{host}/api/3/action/package_list'
package_show_api = 'https://{host}/api/3/action/package_show'

def scrape_ckan(host_):
    all_packages = requests.get(package_list_api.format(host=host_))\
                           .json()\
                           .get('result', [])
    for package_id in all_packages:
      params = dict(id=package_id)
      package_info = requests.get(package_show_api.format(host=host_),
                                  params=params)\
                             .json()\
                             .get('result')
      if result is not None:
        yield dict(
            package_id=package_id,
            author=package_info.get('author'),
            title=package_info.get('title'),
        )

datapackage = {
  'resources': [
    {
      PROP_STREAMING: True,   # You must set this property for resources being streamed in the pipeline!
      'name': 'package-list',
      'schema': {
        'fields': [
          {'name': 'package_id', 'type': 'string'},
          {'name': 'author',     'type': 'string'},
          {'name': 'title',      'type': 'string'},
        ]
      }
    }
  ]
}

spew(datapackage, [scrape_ckan(host)])

在这个示例中,我们可以看到初始数据包是从头开始生成的,而资源迭代器实际上是一个爬虫,它们从CKAN实例API接收行时产生这些行。

插件和源描述符

在特定问题域中编写管道时,可能会发现开发出来的处理管道遵循某种模式。抓取或获取源数据通常彼此相似。处理、数据清洗、验证通常是相同的。

为了简化维护并避免样板代码,可以编写一个datapackage-pipelines 插件

插件是名为datapackage_pipelines_<plugin-name>的Python模块。插件可以提供两种功能:

  • 处理器包 - 你可以在插件中将围绕某个主题或特定目的的处理器打包在一起。任何位于datapackage_pipelines_<plugin-name>.processors模块下的处理器foo都可以在管道中使用,作为<plugin-name>.foo
  • 管道模板 - 如果datapackage_pipelines_<plugin-name>模块中存在Generator类,它将被用于根据模板生成管道 - 我们称之为"源描述符"。

源描述符

源描述符是一个包含创建完整管道所需信息的yaml文件。

dpp将寻找名为<plugin-name>.source-spec.yaml的文件,并将它们作为管道生成代码的输入,该代码应在datapackage_pipelines_<plugin-name>模块中的名为Generator的类中实现。

这个类应该继承自GeneratorBase并实现两个方法:

  • generate_pipeline - 接收源描述并返回一个形式为(id, details)的元组的迭代器。如果id是管道ID,则详情将是一个包含管道定义的对象。如果id的形式为:module:,则详情被视为指定模块的源规范。这样,生成器可以生成其他源规范。
  • get_schema - 应返回一个JSON Schema来验证源描述的结构。

示例

让我们假设我们编写一个用于从CKAN实例中提取数据的datapackage_pipelines_ckan插件。

以下是这样一种假设生成器的样子

import os
import json

from datapackage_pipelines.generators import \
    GeneratorBase, slugify, steps, SCHEDULE_MONTHLY

SCHEMA_FILE = os.path.join(os.path.dirname(__file__), 'schema.json')


class Generator(GeneratorBase):

    @classmethod
    def get_schema(cls):
        return json.load(open(SCHEMA_FILE))

    @classmethod
    def generate_pipeline(cls, source):
        pipeline_id = dataset_name = slugify(source['name'])
        host = source['ckan-instance']
        action = source['data-kind']

        if action == 'package-list':
            schedule = SCHEDULE_MONTHLY
            pipeline_steps = steps(*[
                ('ckan.scraper', {
                   'ckan-instance': host
                }),
                ('metadata', {
                  'name': dataset_name
                }),
                ('dump_to_zip', {
                   'out-file': 'ckan-datapackage.zip'
                })])
            pipeline_details = {
                'pipeline': pipeline_steps,
                'schedule': {'crontab': schedule}
            }
            yield pipeline_id, pipeline_details

在这种情况下,如果我们存储一个看起来像这样的ckan.source-spec.yaml文件

ckan-instance: example.com
name: example-com-list-of-packages
data-kind: package-list

然后在运行 dpp 时,我们将看到一个名为 ./example-com-list-of-packages 的可用管道。

此管道内部将包含 3 个步骤:ckan.scrapermetadatadump_to_zip

验证源描述符

源描述符可以具有与输出管道参数域最佳匹配的任何结构。然而,它必须有一个一致的格式,由 JSON Schema 文件支持。在我们的案例中,模式可能如下所示

{
  "$schema": "https://json-schema.fullstack.org.cn/draft-04/schema#",
  "type": "object",
  "properties": {
    "name":          { "type": "string" },
    "ckan-instance": { "type": "string" },
    "data-kind":     { "type": "string" }
  },
  "required": [ "name", "ckan-instance", "data-kind" ]
}

dpp 将确保源描述符文件符合该模式,然后再尝试使用 Generator 类将它们转换为管道。

提供处理代码

在某些情况下,生成器可能希望同时提供处理器的代码(以及管道定义)。为了做到这一点,生成器可以将 code 属性添加到包含处理器代码的任何步骤中。当执行时,此步骤不会像通常那样尝试解析处理器,而是使用提供的代码。

按计划运行

datapackage-pipelines 包含 celery 集成,允许通过类似 crontab 的语法在特定时间运行管道。

为了启用此功能,您只需将 schedule 部分添加到您的 pipeline-spec.yaml 文件中(或从生成器类返回一个计划,如上所述),如下所示

co2-information-cdiac:
  pipeline:
    -
        ...
  schedule:
    # minute hour day_of_week day_of_month month_of_year
    crontab: '0 * * * *'

在此示例中,此管道被设置为每小时整点运行。

要运行 celery 守护程序,请使用 celery 的命令行界面运行 datapackage_pipelines.app。这里有一种方法

$ python -m celery worker -B -A datapackage_pipelines.app

运行此服务器将首先执行所有“脏”任务,然后根据它们的计划执行任务。

作为一个快捷方式来启动调度器和仪表板(见下文),您可以使用预先构建的 Docker 镜像

$ docker run -v `pwd`:/pipelines:rw -p 5000:5000 \
        frictionlessdata/datapackage-pipelines server

然后浏览到 http://<docker machine's IP address>:5000/ 来查看当前执行状态仪表板。

管道仪表板 & 状态徽章

当安装在服务器上或使用任务调度器运行时,很难确切知道正在运行什么以及每个管道的状态。

为了使事情更容易,您可以启动网络仪表板以提供每个管道状态的概述、其基本信息和其最新执行的结果。

要从命令行启动网络服务器,请运行 dpp serve,然后浏览到 http://localhost:5000

环境变量 DPP_BASE_PATH 将确定仪表板是否从根目录或从另一个基本路径(例如值:/pipelines/)提供服务。

可以通过添加用户名和密码并使用环境变量 DPP_BASIC_AUTH_USERNAMEDPP_BASIC_AUTH_PASSWORD 来使仪表板端点需要身份验证。

对于单个管道和管道集合,都可以提供更简单的管道状态,包括状态徽章。对于单个管道,将完整管道 ID 添加到徽章端点

http://localhost:5000/badge/path_to/pipelines/my-pipeline-id

或对于管道集合

http://localhost:5000/badge/collection/path_to/pipelines/

请注意,这些徽章端点将始终暴露,无论 DPP_BASIC_AUTH_PASSWORDDPP_BASIC_AUTH_USERNAME 设置如何。

与其他服务集成

Datapackage-pipelines 可以在任何管道事件上调用预定义的 webhook。这可能允许与其他应用程序集成。

为了在特定管道中添加 webhook,请在管道定义中添加一个 hooks 属性,它应该是一个 URL 列表。每当该管道被排队、开始运行或完成运行时,所有 URL 都会与该有效载荷一起 POST。

{
  "pipeline": "<pipeline-id>",
  "event": "queue/start/progress/finish",
  "success": true/false (when applicable),
  "errors": [list-of-errors, when applicable]
}

已知问题

  • 加载具有大量数据的单个单元格的资源时引发异常(#112

项目详情


发布历史 发布通知 | RSS 源

下载文件

下载适用于您平台的应用程序文件。如果您不确定该选择哪个,请了解有关安装包的更多信息。

源分发

datapackage-pipelines-2.2.11.tar.gz (121.5 kB 查看哈希值)

上传时间

构建分发

datapackage_pipelines-2.2.11-py2.py3-none-any.whl (102.2 kB 查看哈希值)

上传时间 Python 2 Python 3

由以下机构支持

AWS AWS 云计算和安全赞助商 Datadog Datadog 监控 Fastly Fastly CDN Google Google 下载分析 Microsoft Microsoft PSF 赞助商 Pingdom Pingdom 监控 Sentry Sentry 错误记录 StatusPage StatusPage 状态页面