跳转到主要内容

使用服务器无服务器概念使用DataFlows库。

项目描述

# dataflows-serverless

DataFlows-serverless允许使用[DataFlows](https://github.com/datahq/dataflows)库,采用无服务器概念。

* 在Kubernetes上运行,根据需要启动和停止集群以节省成本。
* 支持适合的步骤的并行处理(例如,独立处理每行的步骤)。
* 使用标准的DataFlows库在本地开发和测试处理流程。

## 使用方法

### 创建流程

使用[dataflows](https://github.com/datahq/dataflows)库在本地开发流程。

将`Flow`类替换为`ServerlessFlow`,并将与无服务器处理相关的步骤用`serverless_step`包装

一个简单的示例,从URL列表中下载

```
from dataflows_serverless.flow import ServerlessFlow, serverless_step
from dataflows import dump_to_path
import requests

URLS = ['http://httpbin.org/get?page={}'.format(page) for page in range(100)]

def download(row)
print('下载{}'.format(url))
row['content'] = str(requests.get(row['url']).json())

ServerlessFlow(({'url': url, 'content': ''} for url in URLS),
serverless_step(download),
dump_to_path('url_contents')).serverless().process()
```

将其保存为`flow.py`,安装dataflows-serverless包,并在本地运行流程,不使用无服务器

```
pip3 install dataflows-serverless
python3 flow.py
```

输出数据位于`./url_contents/res_1.csv`

### 设置Kubernetes集群

任何最近的Kubernetes集群都应该可以工作,以下是一些创建集群的推荐方法

###### 使用Google Kubernetes Engine

这是测试/开发和运行生产工作负载的推荐方法

* 如果在本地运行时出现问题,请尝试使用[Google Cloud Shell](https://cloud.google.com/shell/),它可以使该过程更加简单。
* [安装Google Cloud SDK](https://cloud.google.com/sdk/docs/downloads-interactive)
* 安装kubectl
* `gcloud components install kubectl`
* 使用应用程序默认凭据(ADC)登录Google云
* `gcloud auth application-default login`
* 连接到现有的dataflows集群或如果不存在则创建一个新集群
* `$(dataflows_serverless_bin)/gke_connect_or_create.sh <GOOGLE_PROJECT_ID>`
* 您可以配置一些关于创建的集群的额外参数,运行 `gke_connect_or_create` 脚本,不带任何参数来查看可用的参数。


###### 使用 Minikube

Minikube 是一个在虚拟机上本地运行的 Kubernetes 集群

* 安装 [minikube](https://kubernetes.ac.cn/docs/tasks/tools/install-minikube/)
* 启动 minikube 集群
* `minikube start`
* 切换到 minikube 上下文
* `kubectl config use-context minikube`
* 创建 dataflows 命名空间
* `kubectl create ns datafows`
* 将当前上下文设置为默认使用此命名空间
* `kubectl config set-context minikube --namespace=dataflows`

### 在集群上运行流程

以下命令启动 1 个主作业,运行主流程,以及 10 个辅助作业,运行无服务器步骤。

```
python3 flow.py --serverless --secondaries=10 --output-datadir=url_contents
```

输出数据可在本地以非无服务器流程的输出路径相同的路径中找到:`./url_contents/res_1.csv`

首次运行可能需要较长时间,因为需要拉取 Docker 镜像,后续流程将显著更快。

如果您使用 Google Kubernetes Engine,请记住在完成后删除集群

```
$(dataflows_serverless_bin)/gke_cleanup.sh <GOOGLE_PROJECT_ID>
```

## 高级用法

### 安装要求/系统依赖项

为了安装流程的附加要求,您需要从 dataflows 镜像创建一个 Docker 镜像

通过运行 `dataflows_serverless_image` 获取相关的 Docker 镜像

以下示例 Dockerfile 添加了 Python 图像库

```
FROM orihoch/dataflows-serverless:9
RUN apk add --update --no-cache zlib-dev jpeg-dev && pip3 install pillow
```

构建并推送修改后的镜像。Kubernetes 如果镜像已存在则不会拉取镜像,因此请确保修改每个构建的标签。

使用您修改的镜像运行流程

```
python3 flow.py --serverless --secondaries=10 --output-datadir=url_contents --image=your-username/dataflows-serverless-pillow:0.0.1
```

### 提供输入数据

您可以在作业启动之前提供输入数据,所有输入数据应相对于当前工作目录的相对路径

```
python3 flow.py --serverless --secondaries=10 --output-datadir=url_contents --input-datadir=data/input_dir_1 --input-datadir=data/input_dir_2
```

### 高级数据初始化

为了提供大量数据和更高级的数据初始化,您可以提供一个数据初始化容器。

以下示例展示了如何使用 Google Cloud Storage。

创建一个数据初始化 Docker 镜像,将数据复制到 `/exports/data` 目录

```
FROM google/cloud-sdk
ENTRYPOINT ["bash", "-c", "\
mkdir -p /exports/data/ &&\
gcloud --project=GOOGLE_PROJECT_ID auth activate-service-account --key-file=/secrets/service-account.json &&\
gsutil -m cp -r gs://BUCKET_NAME/data/ /exports/
"]
```

该镜像需要凭证以访问 Google Storage,以下脚本创建一个服务账户以认证 Google Cloud,
相关的服务账户密钥和 Kubernetes 机密

```
GOOGLE_PROJECT_ID=
SERVICE_ACCOUNT_NAME=
SECRET_NAME=
gcloud --project=$GOOGLE_PROJECT_ID iam service-accounts create $SERVICE_ACCOUNT_NAME &&\
gcloud projects add-iam-policy-binding $GOOGLE_PROJECT_ID \
--member "serviceAccount:${SERVICE_ACCOUNT_NAME}@${GOOGLE_PROJECT_ID}.iam.gserviceaccount.com" \
--role "roles/storage.objectAdmin" &&\
gcloud iam service-accounts keys create secret-service-account.json --iam-account ${SERVICE_ACCOUNT_NAME}@${GOOGLE_PROJECT_ID}.iam.gserviceaccount.com &&\
kubectl create secret generic ${SECRET_NAME} --from-file=service-account.json=secret-service-account.json
```

您可能需要修改您的流程以从正确的目录获取输入数据。

您可以通过检查 DATAFLOWS_WORKDIR 环境变量来有条件地在服务器和本地使用正确的路径

```
data_dir = os.path.join(os.environ.get('DATAFLOWS_WORKDIR', '.'), 'data')
```

运行无服务器流程

```
python3 flow.py --serverless --secondaries=10 --output-datadir=url_contents \
--data-init-image=your-username/data-init-image --data-init-secret=${SECRET_NAME}
```

为了防止数据的重新加载,您可以保持数据服务器运行,为 --nfs-uuid= 参数提供一个唯一的值

```
python3 flow.py --serverless --secondaries=10 --output-datadir=url_contents \
--data-init-image=your-username/data-init --data-init-secret=${SECRET_NAME}
--nfs-uuid=my-nfs
```

请注意,在这种情况下,您必须确保每个数据服务器上运行一个主作业。

### 防止清理

防止清理已创建的资源 - 允许调试

```
python3 flow.py --serverless --secondaries=10 --output-datadir=url_contents --no-cleanup
```

由于NFS的动态性和使用特性,清理可能会出现问题,但是以下命令可以完成清理

```
kubectl delete jobs --all && kubectl delete all --all
```

### 远程调试流

以调试模式启动无服务器流

```
python3 flow.py --serverless --secondaries=2 --output-datadir=url_contents --debug
```

现在您需要手动运行作业,在相关的Pod上执行,例如

```
kubectl exec -it PRIMARY_POD_NAME /entrypoint.sh
kubectl exec -it SECONDARY_1_POD_NAME /entrypoint.sh
kubectl exec -it SECONDARY_2_POD_NAME /entrypoint.sh
```

## 更新和发布 dataflows-serverless 版本

更新 `VERSION.txt` 中的版本

更新 `dataflows_serverless/constants.py` 中的 `DEFAULT_IMAGE` 常量到新的Docker镜像标签。

构建并发布Docker镜像:`docker build -t orihoch/dataflows-serverless:v$(cat VERSION.txt) . && docker push orihoch/dataflows-serverless:v$(cat VERSION.txt)`

构建并发布PyPI上的包:`python setup.py sdist && twine upload dist/dataflows_serverless-$(cat VERSION.txt).tar.gz`

项目详情


下载文件

下载适合您平台的文件。如果您不确定选择哪个,请了解更多关于 安装包 的信息。

源分布

dataflows_serverless-0.0.2.tar.gz (14.9 kB 查看哈希)

上传时间

由以下机构支持

AWS AWS 云计算和安全赞助商 Datadog Datadog 监控 Fastly Fastly CDN Google Google 下载分析 Microsoft Microsoft PSF 赞助商 Pingdom Pingdom 监控 Sentry Sentry 错误日志 StatusPage StatusPage 状态页面