使用服务器无服务器概念使用DataFlows库。
项目描述
# dataflows-serverless
DataFlows-serverless允许使用[DataFlows](https://github.com/datahq/dataflows)库,采用无服务器概念。
* 在Kubernetes上运行,根据需要启动和停止集群以节省成本。
* 支持适合的步骤的并行处理(例如,独立处理每行的步骤)。
* 使用标准的DataFlows库在本地开发和测试处理流程。
## 使用方法
### 创建流程
使用[dataflows](https://github.com/datahq/dataflows)库在本地开发流程。
将`Flow`类替换为`ServerlessFlow`,并将与无服务器处理相关的步骤用`serverless_step`包装
一个简单的示例,从URL列表中下载
```
from dataflows_serverless.flow import ServerlessFlow, serverless_step
from dataflows import dump_to_path
import requests
URLS = ['http://httpbin.org/get?page={}'.format(page) for page in range(100)]
def download(row)
print('下载{}'.format(url))
row['content'] = str(requests.get(row['url']).json())
ServerlessFlow(({'url': url, 'content': ''} for url in URLS),
serverless_step(download),
dump_to_path('url_contents')).serverless().process()
```
将其保存为`flow.py`,安装dataflows-serverless包,并在本地运行流程,不使用无服务器
```
pip3 install dataflows-serverless
python3 flow.py
```
输出数据位于`./url_contents/res_1.csv`
### 设置Kubernetes集群
任何最近的Kubernetes集群都应该可以工作,以下是一些创建集群的推荐方法
###### 使用Google Kubernetes Engine
这是测试/开发和运行生产工作负载的推荐方法
* 如果在本地运行时出现问题,请尝试使用[Google Cloud Shell](https://cloud.google.com/shell/),它可以使该过程更加简单。
* [安装Google Cloud SDK](https://cloud.google.com/sdk/docs/downloads-interactive)
* 安装kubectl
* `gcloud components install kubectl`
* 使用应用程序默认凭据(ADC)登录Google云
* `gcloud auth application-default login`
* 连接到现有的dataflows集群或如果不存在则创建一个新集群
* `$(dataflows_serverless_bin)/gke_connect_or_create.sh <GOOGLE_PROJECT_ID>`
* 您可以配置一些关于创建的集群的额外参数,运行 `gke_connect_or_create` 脚本,不带任何参数来查看可用的参数。
###### 使用 Minikube
Minikube 是一个在虚拟机上本地运行的 Kubernetes 集群
* 安装 [minikube](https://kubernetes.ac.cn/docs/tasks/tools/install-minikube/)
* 启动 minikube 集群
* `minikube start`
* 切换到 minikube 上下文
* `kubectl config use-context minikube`
* 创建 dataflows 命名空间
* `kubectl create ns datafows`
* 将当前上下文设置为默认使用此命名空间
* `kubectl config set-context minikube --namespace=dataflows`
### 在集群上运行流程
以下命令启动 1 个主作业,运行主流程,以及 10 个辅助作业,运行无服务器步骤。
```
python3 flow.py --serverless --secondaries=10 --output-datadir=url_contents
```
输出数据可在本地以非无服务器流程的输出路径相同的路径中找到:`./url_contents/res_1.csv`
首次运行可能需要较长时间,因为需要拉取 Docker 镜像,后续流程将显著更快。
如果您使用 Google Kubernetes Engine,请记住在完成后删除集群
```
$(dataflows_serverless_bin)/gke_cleanup.sh <GOOGLE_PROJECT_ID>
```
## 高级用法
### 安装要求/系统依赖项
为了安装流程的附加要求,您需要从 dataflows 镜像创建一个 Docker 镜像
通过运行 `dataflows_serverless_image` 获取相关的 Docker 镜像
以下示例 Dockerfile 添加了 Python 图像库
```
FROM orihoch/dataflows-serverless:9
RUN apk add --update --no-cache zlib-dev jpeg-dev && pip3 install pillow
```
构建并推送修改后的镜像。Kubernetes 如果镜像已存在则不会拉取镜像,因此请确保修改每个构建的标签。
使用您修改的镜像运行流程
```
python3 flow.py --serverless --secondaries=10 --output-datadir=url_contents --image=your-username/dataflows-serverless-pillow:0.0.1
```
### 提供输入数据
您可以在作业启动之前提供输入数据,所有输入数据应相对于当前工作目录的相对路径
```
python3 flow.py --serverless --secondaries=10 --output-datadir=url_contents --input-datadir=data/input_dir_1 --input-datadir=data/input_dir_2
```
### 高级数据初始化
为了提供大量数据和更高级的数据初始化,您可以提供一个数据初始化容器。
以下示例展示了如何使用 Google Cloud Storage。
创建一个数据初始化 Docker 镜像,将数据复制到 `/exports/data` 目录
```
FROM google/cloud-sdk
ENTRYPOINT ["bash", "-c", "\
mkdir -p /exports/data/ &&\
gcloud --project=GOOGLE_PROJECT_ID auth activate-service-account --key-file=/secrets/service-account.json &&\
gsutil -m cp -r gs://BUCKET_NAME/data/ /exports/
"]
```
该镜像需要凭证以访问 Google Storage,以下脚本创建一个服务账户以认证 Google Cloud,
相关的服务账户密钥和 Kubernetes 机密
```
GOOGLE_PROJECT_ID=
SERVICE_ACCOUNT_NAME=
SECRET_NAME=
gcloud --project=$GOOGLE_PROJECT_ID iam service-accounts create $SERVICE_ACCOUNT_NAME &&\
gcloud projects add-iam-policy-binding $GOOGLE_PROJECT_ID \
--member "serviceAccount:${SERVICE_ACCOUNT_NAME}@${GOOGLE_PROJECT_ID}.iam.gserviceaccount.com" \
--role "roles/storage.objectAdmin" &&\
gcloud iam service-accounts keys create secret-service-account.json --iam-account ${SERVICE_ACCOUNT_NAME}@${GOOGLE_PROJECT_ID}.iam.gserviceaccount.com &&\
kubectl create secret generic ${SECRET_NAME} --from-file=service-account.json=secret-service-account.json
```
您可能需要修改您的流程以从正确的目录获取输入数据。
您可以通过检查 DATAFLOWS_WORKDIR 环境变量来有条件地在服务器和本地使用正确的路径
```
data_dir = os.path.join(os.environ.get('DATAFLOWS_WORKDIR', '.'), 'data')
```
运行无服务器流程
```
python3 flow.py --serverless --secondaries=10 --output-datadir=url_contents \
--data-init-image=your-username/data-init-image --data-init-secret=${SECRET_NAME}
```
为了防止数据的重新加载,您可以保持数据服务器运行,为 --nfs-uuid= 参数提供一个唯一的值
```
python3 flow.py --serverless --secondaries=10 --output-datadir=url_contents \
--data-init-image=your-username/data-init --data-init-secret=${SECRET_NAME}
--nfs-uuid=my-nfs
```
请注意,在这种情况下,您必须确保每个数据服务器上运行一个主作业。
### 防止清理
防止清理已创建的资源 - 允许调试
```
python3 flow.py --serverless --secondaries=10 --output-datadir=url_contents --no-cleanup
```
由于NFS的动态性和使用特性,清理可能会出现问题,但是以下命令可以完成清理
```
kubectl delete jobs --all && kubectl delete all --all
```
### 远程调试流
以调试模式启动无服务器流
```
python3 flow.py --serverless --secondaries=2 --output-datadir=url_contents --debug
```
现在您需要手动运行作业,在相关的Pod上执行,例如
```
kubectl exec -it PRIMARY_POD_NAME /entrypoint.sh
kubectl exec -it SECONDARY_1_POD_NAME /entrypoint.sh
kubectl exec -it SECONDARY_2_POD_NAME /entrypoint.sh
```
## 更新和发布 dataflows-serverless 版本
更新 `VERSION.txt` 中的版本
更新 `dataflows_serverless/constants.py` 中的 `DEFAULT_IMAGE` 常量到新的Docker镜像标签。
构建并发布Docker镜像:`docker build -t orihoch/dataflows-serverless:v$(cat VERSION.txt) . && docker push orihoch/dataflows-serverless:v$(cat VERSION.txt)`
构建并发布PyPI上的包:`python setup.py sdist && twine upload dist/dataflows_serverless-$(cat VERSION.txt).tar.gz`
DataFlows-serverless允许使用[DataFlows](https://github.com/datahq/dataflows)库,采用无服务器概念。
* 在Kubernetes上运行,根据需要启动和停止集群以节省成本。
* 支持适合的步骤的并行处理(例如,独立处理每行的步骤)。
* 使用标准的DataFlows库在本地开发和测试处理流程。
## 使用方法
### 创建流程
使用[dataflows](https://github.com/datahq/dataflows)库在本地开发流程。
将`Flow`类替换为`ServerlessFlow`,并将与无服务器处理相关的步骤用`serverless_step`包装
一个简单的示例,从URL列表中下载
```
from dataflows_serverless.flow import ServerlessFlow, serverless_step
from dataflows import dump_to_path
import requests
URLS = ['http://httpbin.org/get?page={}'.format(page) for page in range(100)]
def download(row)
print('下载{}'.format(url))
row['content'] = str(requests.get(row['url']).json())
ServerlessFlow(({'url': url, 'content': ''} for url in URLS),
serverless_step(download),
dump_to_path('url_contents')).serverless().process()
```
将其保存为`flow.py`,安装dataflows-serverless包,并在本地运行流程,不使用无服务器
```
pip3 install dataflows-serverless
python3 flow.py
```
输出数据位于`./url_contents/res_1.csv`
### 设置Kubernetes集群
任何最近的Kubernetes集群都应该可以工作,以下是一些创建集群的推荐方法
###### 使用Google Kubernetes Engine
这是测试/开发和运行生产工作负载的推荐方法
* 如果在本地运行时出现问题,请尝试使用[Google Cloud Shell](https://cloud.google.com/shell/),它可以使该过程更加简单。
* [安装Google Cloud SDK](https://cloud.google.com/sdk/docs/downloads-interactive)
* 安装kubectl
* `gcloud components install kubectl`
* 使用应用程序默认凭据(ADC)登录Google云
* `gcloud auth application-default login`
* 连接到现有的dataflows集群或如果不存在则创建一个新集群
* `$(dataflows_serverless_bin)/gke_connect_or_create.sh <GOOGLE_PROJECT_ID>`
* 您可以配置一些关于创建的集群的额外参数,运行 `gke_connect_or_create` 脚本,不带任何参数来查看可用的参数。
###### 使用 Minikube
Minikube 是一个在虚拟机上本地运行的 Kubernetes 集群
* 安装 [minikube](https://kubernetes.ac.cn/docs/tasks/tools/install-minikube/)
* 启动 minikube 集群
* `minikube start`
* 切换到 minikube 上下文
* `kubectl config use-context minikube`
* 创建 dataflows 命名空间
* `kubectl create ns datafows`
* 将当前上下文设置为默认使用此命名空间
* `kubectl config set-context minikube --namespace=dataflows`
### 在集群上运行流程
以下命令启动 1 个主作业,运行主流程,以及 10 个辅助作业,运行无服务器步骤。
```
python3 flow.py --serverless --secondaries=10 --output-datadir=url_contents
```
输出数据可在本地以非无服务器流程的输出路径相同的路径中找到:`./url_contents/res_1.csv`
首次运行可能需要较长时间,因为需要拉取 Docker 镜像,后续流程将显著更快。
如果您使用 Google Kubernetes Engine,请记住在完成后删除集群
```
$(dataflows_serverless_bin)/gke_cleanup.sh <GOOGLE_PROJECT_ID>
```
## 高级用法
### 安装要求/系统依赖项
为了安装流程的附加要求,您需要从 dataflows 镜像创建一个 Docker 镜像
通过运行 `dataflows_serverless_image` 获取相关的 Docker 镜像
以下示例 Dockerfile 添加了 Python 图像库
```
FROM orihoch/dataflows-serverless:9
RUN apk add --update --no-cache zlib-dev jpeg-dev && pip3 install pillow
```
构建并推送修改后的镜像。Kubernetes 如果镜像已存在则不会拉取镜像,因此请确保修改每个构建的标签。
使用您修改的镜像运行流程
```
python3 flow.py --serverless --secondaries=10 --output-datadir=url_contents --image=your-username/dataflows-serverless-pillow:0.0.1
```
### 提供输入数据
您可以在作业启动之前提供输入数据,所有输入数据应相对于当前工作目录的相对路径
```
python3 flow.py --serverless --secondaries=10 --output-datadir=url_contents --input-datadir=data/input_dir_1 --input-datadir=data/input_dir_2
```
### 高级数据初始化
为了提供大量数据和更高级的数据初始化,您可以提供一个数据初始化容器。
以下示例展示了如何使用 Google Cloud Storage。
创建一个数据初始化 Docker 镜像,将数据复制到 `/exports/data` 目录
```
FROM google/cloud-sdk
ENTRYPOINT ["bash", "-c", "\
mkdir -p /exports/data/ &&\
gcloud --project=GOOGLE_PROJECT_ID auth activate-service-account --key-file=/secrets/service-account.json &&\
gsutil -m cp -r gs://BUCKET_NAME/data/ /exports/
"]
```
该镜像需要凭证以访问 Google Storage,以下脚本创建一个服务账户以认证 Google Cloud,
相关的服务账户密钥和 Kubernetes 机密
```
GOOGLE_PROJECT_ID=
SERVICE_ACCOUNT_NAME=
SECRET_NAME=
gcloud --project=$GOOGLE_PROJECT_ID iam service-accounts create $SERVICE_ACCOUNT_NAME &&\
gcloud projects add-iam-policy-binding $GOOGLE_PROJECT_ID \
--member "serviceAccount:${SERVICE_ACCOUNT_NAME}@${GOOGLE_PROJECT_ID}.iam.gserviceaccount.com" \
--role "roles/storage.objectAdmin" &&\
gcloud iam service-accounts keys create secret-service-account.json --iam-account ${SERVICE_ACCOUNT_NAME}@${GOOGLE_PROJECT_ID}.iam.gserviceaccount.com &&\
kubectl create secret generic ${SECRET_NAME} --from-file=service-account.json=secret-service-account.json
```
您可能需要修改您的流程以从正确的目录获取输入数据。
您可以通过检查 DATAFLOWS_WORKDIR 环境变量来有条件地在服务器和本地使用正确的路径
```
data_dir = os.path.join(os.environ.get('DATAFLOWS_WORKDIR', '.'), 'data')
```
运行无服务器流程
```
python3 flow.py --serverless --secondaries=10 --output-datadir=url_contents \
--data-init-image=your-username/data-init-image --data-init-secret=${SECRET_NAME}
```
为了防止数据的重新加载,您可以保持数据服务器运行,为 --nfs-uuid= 参数提供一个唯一的值
```
python3 flow.py --serverless --secondaries=10 --output-datadir=url_contents \
--data-init-image=your-username/data-init --data-init-secret=${SECRET_NAME}
--nfs-uuid=my-nfs
```
请注意,在这种情况下,您必须确保每个数据服务器上运行一个主作业。
### 防止清理
防止清理已创建的资源 - 允许调试
```
python3 flow.py --serverless --secondaries=10 --output-datadir=url_contents --no-cleanup
```
由于NFS的动态性和使用特性,清理可能会出现问题,但是以下命令可以完成清理
```
kubectl delete jobs --all && kubectl delete all --all
```
### 远程调试流
以调试模式启动无服务器流
```
python3 flow.py --serverless --secondaries=2 --output-datadir=url_contents --debug
```
现在您需要手动运行作业,在相关的Pod上执行,例如
```
kubectl exec -it PRIMARY_POD_NAME /entrypoint.sh
kubectl exec -it SECONDARY_1_POD_NAME /entrypoint.sh
kubectl exec -it SECONDARY_2_POD_NAME /entrypoint.sh
```
## 更新和发布 dataflows-serverless 版本
更新 `VERSION.txt` 中的版本
更新 `dataflows_serverless/constants.py` 中的 `DEFAULT_IMAGE` 常量到新的Docker镜像标签。
构建并发布Docker镜像:`docker build -t orihoch/dataflows-serverless:v$(cat VERSION.txt) . && docker push orihoch/dataflows-serverless:v$(cat VERSION.txt)`
构建并发布PyPI上的包:`python setup.py sdist && twine upload dist/dataflows_serverless-$(cat VERSION.txt).tar.gz`
项目详情
关闭
dataflows_serverless-0.0.2.tar.gz 的哈希
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 2919ba4bd9f0f01c39969b1e4b9856e9356410ae819dc99804b59442122a17ef |
|
MD5 | 0affb1798043957926b09cdf37919b2b |
|
BLAKE2b-256 | 78344a3fb1402a8e5fa0f4a491985c3123d5dbbcf9f61eb89e6f912ada94caf3 |