跳转到主要内容

未提供项目描述

项目描述

AirCan

使用Airflow作为运行器,将数据加载到CKAN数据存储。这是DataPusher和Xloader的替代品。

组件分离良好,您可以重用所需的部分(例如,您不使用Airflow,而是使用自己的运行器)。

开始使用

  • 安装 Python >= 3.5 <= 3.7.x(并创建一个虚拟环境)。

  • 克隆 aircan 以获取可用的示例

    git clone https://github.com/datopian/aircan
    
  • 安装和设置 Airflow (https://airflow.org.cn/docs/stable/installation.html)

    export AIRFLOW_HOME=~/airflow
    pip install apache-airflow
    airflow initdb
    

注意:在Python的最新版本(3.7+)中,在执行 airflow initdb 时可能会遇到以下错误

ModuleNotFoundError: No module named 'typing_extensions'

这可以通过 pip install typing_extensions 解决。

  • 然后,启动服务器并访问您的Airflow管理员UI

    airflow webserver -p 8080
    

默认情况下,服务器将在 http://localhost:8080/ 上可用,如您运行之前命令的终端输出所示。

示例

示例1:CSV转JSON

在这个例子中,我们将运行一个AirCan示例来将CSV转换为JSON。

将DAG添加到Airflow默认目录,以便Airflow可以识别它

mkdir ~/airflow/dags/
cp examples/aircan-example-1.csv ~/airflow/
cp examples/csv_to_json.py ~/airflow/dags/

要使这个DAG在Airflow管理UI中显示,您可能需要重新启动服务器或启动调度器以更新DAG列表(这可能需要一分钟左右的时间来更新,然后刷新Airflow管理UI页面)。

airflow scheduler

运行此DAG

  • 管理UI中启用此DAG,使用此开关使其与调度器一起运行

    aircan_enable_example_dag_in_scheduler

  • 使用此按钮“触发”DAG

    aircan_trigger_example_dag_to_run

  • 片刻之后,检查输出。您应该看到此DAG成功的运行

    aircan_output_example_dag

  • 在磁盘上找到输出位置:~/airflow/aircan-example-1.json

在本地Airflow实例中使用Aircan DAGs

示例2:使用数据存储API将本地文件加载到CKAN数据存储

我们假设您有

  • 一个在本地的CKAN设置,并运行在http://localhost:5000
  • 一个数据集(例如,my-dataset),它有一个资源(例如,my-resource,其resource_idmy-res-id-123);
  • 我们还需要为Airflow设置两个环境变量。访问Airflow变量面板并设置CKAN_SITE_URL和您的CKAN_SYSADMIN_API_KEY

Variables configuration

单节点DAG

api_ckan_load_single_node是一个单节点DAG,它删除、创建并将资源加载到本地或远程CKAN实例。您可以通过以下步骤运行api_ckan_load_single_node

  1. 打开您的airflow.cfg文件(通常位于~/airflow/airflow.cfg),并将您的DAG文件夹指向AirCan
dags_folder = /your/path/to/aircan

...other configs

dag_run_conf_overrides_params = True

注意:不要将dags_folder指向/your/path/to/aircan/aircan/dags。它必须指向外部的aircan文件夹。

  1. 通过运行airflow list_dags验证Airflow是否找到了Aircan的DAGs。输出应列出
-------------------------------------------------------------------
DAGS
-------------------------------------------------------------------
ckan_api_load_single_step
...other DAGs...
  1. 确保您已正确设置这些环境变量
export LC_ALL=en_US.UTF-8
export LANG=en_US.UTF-8
export OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES
  1. 运行Airflow web服务器(如果您跳过了前面的示例):airflow webserver

  2. 运行Airflow调度器:airflow scheduler。确保已设置(3)中的环境变量。

  3. 访问Airflow UI(http://localhost:8080/)。您应该看到列出的DAG ckan_api_load_single_step

  4. 通过点击界面上的关闭按钮激活DAG。

  5. 现在我们可以测试DAG。在您的终端中运行

airflow test \
-tp "{ \"resource_id\": \"my-res-id-123\", \
\"schema_fields_array\": \"[ 'field1', 'field2']\", \
\"csv_input\": \"/path/to/my.csv\", \
\"json_output\": \"/path/to/my.json\" }" \
ckan_api_load_single_step full_load_via_api now

确保相应地替换参数。

  • resource_id是您在CKAN上的资源ID。
  • schema_fields_array是您的CSV文件的标题。现在所有内容都被视为纯文本。
  • csv_input是要上传的CSV文件的路径。
  • DAG将您的CSV文件转换为JSON文件,然后上传。json_output指定您要导出JSON文件的路径。
  1. 检查您的CKAN实例并验证数据是否已加载。

  2. 使用以下方式触发DAG

airflow trigger_dag ckan_api_load_single_step \
 --conf='{ "resource_id": "my-res-id-123", "schema_fields_array": [ "field1", "field2" ], "csv_input": "/path/to.csv", "json_output": "/path/to.json" }'

不要忘记用您的数据正确替换参数,并正确转义特殊字符。或者,您也可以直接使用airflow run命令运行DAG。

api_ckan_load_single_node也适用于远程CKAN实例。只需相应设置Airflow的CKAN_SITE_URL变量。

多节点DAG

ckan_api_load_multiple_steps执行与api_ckan_load_single_node相同的步骤,但它使用多个节点(任务)。您可以重复上一节的步骤并运行ckan_api_load_multiple_steps

[忽略]示例3:使用Postgres将本地文件加载到CKAN DataStore

我们将本地csv文件加载到CKAN DataStore实例。

预备知识:设置您的CKAN实例

我们假设您有

  • 一个在本地的CKAN设置,并运行在http://localhost:5000
  • 已启用数据存储。如果您使用Docker,您可能需要暴露您的Postgres实例端口。例如,在您的docker-compose.yml文件中添加以下内容
db:
  ports:
      - "5432:5432"

(有用信息:您可以在Docker容器中访问Postgres。运行docker ps,您应该会看到一个名为docker-ckan_db的容器,这对应于CKAN数据库。运行docker exec -it CONTAINER_ID bash,然后运行psql -U ckan以访问相应的Postgres实例。)

  • 现在您需要在Airflow上设置一些信息。访问您的本地Airflow连接面板http://localhost:8080/admin/connection/。创建一个名为ckan_postgres的新连接,并使用您的数据存储信息。例如,假设您的CKAN_DATASTORE_WRITE_URL=postgresql://ckan:ckan@db/datastore,使用以下架构

Connection configuration

  • 我们还需要为Airflow设置两个环境变量。访问Airflow变量面板,并设置CKAN_SITE_URL和您的CKAN_SYSADMIN_API_KEY

Variables configuration

[待参数化变量] [待参数化路径]

然后,使用此脚本创建一个名为aircan-example的数据集

cd aircan
pip install -r requirements-example.txt
python examples/setup-ckan.py --api-key

进行加载

我们假设您现在有一个名为my-first-dataset的数据集。

创建加载的DAG

cp aircan/lib/api_ckan_load.py ~/airflow/dags/

使用airflow list_dags检查Airflow是否识别您的DAG。您应该会看到一个名为ckan_load的DAG。

现在您可以单独测试每个任务

  • 要删除数据存储,运行airflow test ckan_load delete_datastore_table now
  • 要创建数据存储,运行airflow test ckan_load create_datastore_table now。您可以在日志中看到数据存储的相应resource_id。[待JSON硬编码,使用kwargs或其他操作符结构进行参数化]
  • 要将CSV加载到Postgres中,运行airflow test ckan_load load_csv_to_postgres_via_copy now。[待JSON硬编码,在其上插入resource_id。文件路径也是硬编码的,更改它]
  • 最后,将数据存储设置为活动状态:airflow test ckan_load restore_indexes_and_set_datastore_active now

要运行整个DAG

  • 选择DAG[截图]
  • 使用到../your/aircan/examples/example1.csv的路径配置它
  • 运行它...[截图]

检查输出

示例2a:将远程文件加载到数据存储

与示例2相同,但使用此DAG

cp aircan/examples/ckan-datastore-from-remote.py ~/airflow/dags/

此外,设置加载的远程URL。

示例3:自动将上传到CKAN的文件加载到CKAN数据存储中

配置CKAN以自动加载。

  • 设置CKAN - 参见前面的部分。
  • 此外,在您的CKAN实例中安装此扩展:ckanext-aircan-connector待办事项:添加说明。
  • 使用您的airflow实例的位置和DAG ID(aircan-load-csv)配置CKAN。

运行它

运行此脚本,它将CSV文件上传到您的CKAN实例,并触发数据存储的加载。

cd aircan
pip install -r requirements-example.txt
python examples/ckan-upload-csv.py

使用Google Cloud Composer

  1. https://cloud.google.com/composer上注册一个账户。在Google Cloud Platform中创建或选择一个现有项目。对于此示例,我们使用一个名为aircan-test-project的项目。
  2. 在Google Cloud Composer中创建一个环境,无论是通过命令行还是通过UI。确保在创建项目时选择Python 3。在这里,我们创建了一个名为aircan-airflow的环境。Google Cloud Composer environment configuration

创建您的环境后,它应该出现在您的环境列表中:Google Cloud Composer environment configuration

  1. 覆盖dag_run_conf_overrides_params的配置:Google Cloud Composer environment configuration

  2. 访问指定的DAG文件夹(它将是一个存储桶)。将local/path/to/aircan/aircan的内容上传到存储桶:Google Cloud Composer DAGs folder configuration

子文件夹 aircan 的内容必须是: Google Cloud Composer DAGs 文件夹配置

  1. 进入子目录 dags 并删除此目录下的 __init__.py 文件。它与 Google Cloud Composer 配置冲突。

  2. 与我们在示例 2 中所做的方法类似,访问您由 Google Cloud Composer 创建的 Airflow 实例,并将 CKAN_SITE_URLCKAN_SYSADMIN_API_KEY 添加为变量。现在 DAG 应该出现在 UI 界面上。

  3. 假设您在 https://demo.ckan.org/ 上有一个资源,其资源 ID 为 my-res-id-123。我们还假设您在 Google Cloud 平台 DAG 存储桶的根目录下有两个文件:一个是您要上传的资源 CSV 文件,命名为 r3.csv,包含两列,field1field2。您必须在存储桶根目录下的另一个文件是 r4.json,一个空的 JSON 文件。 Google Cloud Composer DAGs 文件夹配置

由于我们的 DAG 预期参数,您必须通过 CLI 触发它们:例如,要触发 api_ckan_load_single_node,请在您的终端中运行

gcloud composer environments run aircan-airflow \
     --location us-east1 \
     trigger_dag -- ckan_api_load_single_step \
      --conf='{ "resource_id": "my-res-id-123", "schema_fields_array": [ "field1", "field2" ], "csv_input": "/home/airflow/gcs/dags/r3.csv", "json_output": "/home/airflow/gcs/dags/r4.json" }'

检查日志(提示:按 DAG ID 过滤它们,例如,ckan_api_load_single_step)。它应该成功将您的 .csv 文件的数据上传到 demo.ckan

项目详情


下载文件

下载适用于您的平台的文件。如果您不确定选择哪个,请了解更多关于 安装包 的信息。

源分发

aircan-0.0.4.tar.gz (15.8 kB 查看哈希值)

上传时间

构建分发

aircan-0.0.4-py3-none-any.whl (13.8 kB 查看哈希值)

上传时间 Python 3

由以下机构支持

AWS AWS 云计算和安全赞助商 Datadog Datadog 监控 Fastly Fastly CDN Google Google 下载分析 Microsoft Microsoft PSF 赞助商 Pingdom Pingdom 监控 Sentry Sentry 错误日志 StatusPage StatusPage 状态页面