未提供项目描述
项目描述
AirCan
使用Airflow作为运行器,将数据加载到CKAN数据存储。这是DataPusher和Xloader的替代品。
组件分离良好,您可以重用所需的部分(例如,您不使用Airflow,而是使用自己的运行器)。
开始使用
-
安装 Python >= 3.5 <= 3.7.x(并创建一个虚拟环境)。
-
克隆
aircan
以获取可用的示例git clone https://github.com/datopian/aircan
-
安装和设置 Airflow (https://airflow.org.cn/docs/stable/installation.html)
export AIRFLOW_HOME=~/airflow pip install apache-airflow airflow initdb
注意:在Python的最新版本(3.7+)中,在执行 airflow initdb
时可能会遇到以下错误
ModuleNotFoundError: No module named 'typing_extensions'
这可以通过 pip install typing_extensions
解决。
-
然后,启动服务器并访问您的Airflow管理员UI
airflow webserver -p 8080
默认情况下,服务器将在 http://localhost:8080/
上可用,如您运行之前命令的终端输出所示。
示例
示例1:CSV转JSON
在这个例子中,我们将运行一个AirCan示例来将CSV转换为JSON。
将DAG添加到Airflow默认目录,以便Airflow可以识别它
mkdir ~/airflow/dags/
cp examples/aircan-example-1.csv ~/airflow/
cp examples/csv_to_json.py ~/airflow/dags/
要使这个DAG在Airflow管理UI中显示,您可能需要重新启动服务器或启动调度器以更新DAG列表(这可能需要一分钟左右的时间来更新,然后刷新Airflow管理UI页面)。
airflow scheduler
运行此DAG
-
在管理UI中启用此DAG,使用此开关使其与调度器一起运行
-
使用此按钮“触发”DAG
-
片刻之后,检查输出。您应该看到此DAG成功的运行
-
在磁盘上找到输出位置:
~/airflow/aircan-example-1.json
在本地Airflow实例中使用Aircan DAGs
示例2:使用数据存储API将本地文件加载到CKAN数据存储
我们假设您有
- 一个在本地的CKAN设置,并运行在http://localhost:5000;
- 一个数据集(例如,
my-dataset
),它有一个资源(例如,my-resource
,其resource_id
为my-res-id-123
); - 我们还需要为Airflow设置两个环境变量。访问Airflow变量面板并设置
CKAN_SITE_URL
和您的CKAN_SYSADMIN_API_KEY
单节点DAG
api_ckan_load_single_node
是一个单节点DAG,它删除、创建并将资源加载到本地或远程CKAN实例。您可以通过以下步骤运行api_ckan_load_single_node
- 打开您的
airflow.cfg
文件(通常位于~/airflow/airflow.cfg
),并将您的DAG文件夹指向AirCan
dags_folder = /your/path/to/aircan
...other configs
dag_run_conf_overrides_params = True
注意:不要将dags_folder
指向/your/path/to/aircan/aircan/dags
。它必须指向外部的aircan
文件夹。
- 通过运行
airflow list_dags
验证Airflow是否找到了Aircan的DAGs。输出应列出
-------------------------------------------------------------------
DAGS
-------------------------------------------------------------------
ckan_api_load_single_step
...other DAGs...
- 确保您已正确设置这些环境变量
export LC_ALL=en_US.UTF-8
export LANG=en_US.UTF-8
export OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES
-
运行Airflow web服务器(如果您跳过了前面的示例):
airflow webserver
-
运行Airflow调度器:
airflow scheduler
。确保已设置(3)中的环境变量。 -
访问Airflow UI(
http://localhost:8080/
)。您应该看到列出的DAGckan_api_load_single_step
-
通过点击界面上的关闭按钮激活DAG。
-
现在我们可以测试DAG。在您的终端中运行
airflow test \
-tp "{ \"resource_id\": \"my-res-id-123\", \
\"schema_fields_array\": \"[ 'field1', 'field2']\", \
\"csv_input\": \"/path/to/my.csv\", \
\"json_output\": \"/path/to/my.json\" }" \
ckan_api_load_single_step full_load_via_api now
确保相应地替换参数。
resource_id
是您在CKAN上的资源ID。schema_fields_array
是您的CSV文件的标题。现在所有内容都被视为纯文本。csv_input
是要上传的CSV文件的路径。- DAG将您的CSV文件转换为JSON文件,然后上传。
json_output
指定您要导出JSON文件的路径。
-
检查您的CKAN实例并验证数据是否已加载。
-
使用以下方式触发DAG
airflow trigger_dag ckan_api_load_single_step \
--conf='{ "resource_id": "my-res-id-123", "schema_fields_array": [ "field1", "field2" ], "csv_input": "/path/to.csv", "json_output": "/path/to.json" }'
不要忘记用您的数据正确替换参数,并正确转义特殊字符。或者,您也可以直接使用airflow run
命令运行DAG。
api_ckan_load_single_node
也适用于远程CKAN实例。只需相应设置Airflow的CKAN_SITE_URL
变量。
多节点DAG
ckan_api_load_multiple_steps
执行与api_ckan_load_single_node
相同的步骤,但它使用多个节点(任务)。您可以重复上一节的步骤并运行ckan_api_load_multiple_steps
。
[忽略]示例3:使用Postgres将本地文件加载到CKAN DataStore
我们将本地csv文件加载到CKAN DataStore实例。
预备知识:设置您的CKAN实例
我们假设您有
- 一个在本地的CKAN设置,并运行在http://localhost:5000
- 已启用数据存储。如果您使用Docker,您可能需要暴露您的Postgres实例端口。例如,在您的
docker-compose.yml
文件中添加以下内容
db:
ports:
- "5432:5432"
(有用信息:您可以在Docker容器中访问Postgres。运行docker ps
,您应该会看到一个名为docker-ckan_db
的容器,这对应于CKAN数据库。运行docker exec -it CONTAINER_ID bash
,然后运行psql -U ckan
以访问相应的Postgres实例。)
- 现在您需要在Airflow上设置一些信息。访问您的本地Airflow连接面板http://localhost:8080/admin/connection/。创建一个名为
ckan_postgres
的新连接,并使用您的数据存储信息。例如,假设您的CKAN_DATASTORE_WRITE_URL=postgresql://ckan:ckan@db/datastore
,使用以下架构
- 我们还需要为Airflow设置两个环境变量。访问Airflow变量面板,并设置
CKAN_SITE_URL
和您的CKAN_SYSADMIN_API_KEY
[待参数化变量] [待参数化路径]
然后,使用此脚本创建一个名为aircan-example
的数据集
cd aircan
pip install -r requirements-example.txt
python examples/setup-ckan.py --api-key
进行加载
我们假设您现在有一个名为my-first-dataset
的数据集。
创建加载的DAG
cp aircan/lib/api_ckan_load.py ~/airflow/dags/
使用airflow list_dags
检查Airflow是否识别您的DAG。您应该会看到一个名为ckan_load
的DAG。
现在您可以单独测试每个任务
- 要删除数据存储,运行
airflow test ckan_load delete_datastore_table now
- 要创建数据存储,运行
airflow test ckan_load create_datastore_table now
。您可以在日志中看到数据存储的相应resource_id
。[待JSON硬编码,使用kwargs或其他操作符结构进行参数化] - 要将CSV加载到Postgres中,运行
airflow test ckan_load load_csv_to_postgres_via_copy now
。[待JSON硬编码,在其上插入resource_id。文件路径也是硬编码的,更改它] - 最后,将数据存储设置为活动状态:
airflow test ckan_load restore_indexes_and_set_datastore_active now
。
要运行整个DAG
- 选择DAG[截图]
- 使用到../your/aircan/examples/example1.csv的路径配置它
- 运行它...[截图]
检查输出
- 访问http://localhost:5000/dataset/aircan-example/并查看名为XXX的资源。现在它的数据存储中将有数据!
示例2a:将远程文件加载到数据存储
与示例2相同,但使用此DAG
cp aircan/examples/ckan-datastore-from-remote.py ~/airflow/dags/
此外,设置加载的远程URL。
示例3:自动将上传到CKAN的文件加载到CKAN数据存储中
配置CKAN以自动加载。
- 设置CKAN - 参见前面的部分。
- 此外,在您的CKAN实例中安装此扩展:
ckanext-aircan-connector
。待办事项:添加说明。 - 使用您的airflow实例的位置和DAG ID(
aircan-load-csv
)配置CKAN。
运行它
运行此脚本,它将CSV文件上传到您的CKAN实例,并触发数据存储的加载。
cd aircan
pip install -r requirements-example.txt
python examples/ckan-upload-csv.py
使用Google Cloud Composer
- 在https://cloud.google.com/composer上注册一个账户。在Google Cloud Platform中创建或选择一个现有项目。对于此示例,我们使用一个名为
aircan-test-project
的项目。 - 在Google Cloud Composer中创建一个环境,无论是通过命令行还是通过UI。确保在创建项目时选择Python 3。在这里,我们创建了一个名为
aircan-airflow
的环境。
创建您的环境后,它应该出现在您的环境列表中:
-
覆盖
dag_run_conf_overrides_params
的配置: -
访问指定的DAG文件夹(它将是一个存储桶)。将
local/path/to/aircan/aircan
的内容上传到存储桶:
子文件夹 aircan
的内容必须是:
-
进入子目录
dags
并删除此目录下的__init__.py
文件。它与 Google Cloud Composer 配置冲突。 -
与我们在示例 2 中所做的方法类似,访问您由 Google Cloud Composer 创建的 Airflow 实例,并将
CKAN_SITE_URL
和CKAN_SYSADMIN_API_KEY
添加为变量。现在 DAG 应该出现在 UI 界面上。 -
假设您在
https://demo.ckan.org/
上有一个资源,其资源 ID 为my-res-id-123
。我们还假设您在 Google Cloud 平台 DAG 存储桶的根目录下有两个文件:一个是您要上传的资源 CSV 文件,命名为r3.csv
,包含两列,field1
和field2
。您必须在存储桶根目录下的另一个文件是r4.json
,一个空的 JSON 文件。
由于我们的 DAG 预期参数,您必须通过 CLI 触发它们:例如,要触发 api_ckan_load_single_node
,请在您的终端中运行
gcloud composer environments run aircan-airflow \
--location us-east1 \
trigger_dag -- ckan_api_load_single_step \
--conf='{ "resource_id": "my-res-id-123", "schema_fields_array": [ "field1", "field2" ], "csv_input": "/home/airflow/gcs/dags/r3.csv", "json_output": "/home/airflow/gcs/dags/r4.json" }'
检查日志(提示:按 DAG ID 过滤它们,例如,ckan_api_load_single_step
)。它应该成功将您的 .csv
文件的数据上传到 demo.ckan
。
项目详情
下载文件
下载适用于您的平台的文件。如果您不确定选择哪个,请了解更多关于 安装包 的信息。
源分发
构建分发
aircan-0.0.4.tar.gz 的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 3db5bd12bb1ac0b7ff6c9f3f3c0243470138f3b5e1fd1079e772bcad829108b3 |
|
MD5 | 729448b3c46d97eb1813d5f212f17583 |
|
BLAKE2b-256 | 172e98579f12bbffa4f660e3305cd433f57757ea5e54e97bdc64d6f1d8bbb81f |
aircan-0.0.4-py3-none-any.whl 的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 7b746e62aabd0cbd9788151a0de81df35df189248e0c580ac94e1c647bc69d90 |
|
MD5 | 3589af0c0f798443be05c9e0f7d74773 |
|
BLAKE2b-256 | 43b7fa4a7867ea85e9fa3a1a1950e1452b9986243119b8cabea52ba57c6b5394 |