跳转到主要内容

用Python编写的Google Dataproc模板

项目描述

Build Status Python Integration Test Status Python Dataproc Cluster Integration Tests Status

数据流程模板(Python - PySpark)

Dataproc Templates (Python - PySpark) 支持使用 batches submit pyspark 提交作业到 Dataproc Serverless,以及使用 jobs submit pyspark 提交到 Dataproc 集群。

使用 PyPi 包运行

在本 README 中,您可以看到如何运行模板的说明。
目前,有 3 种选项被描述

  • 使用 bin/start.sh
  • 使用 gcloud CLI
  • 使用 Vertex AI

这三种选项需要您克隆此存储库并开始运行模板。
Dataproc Templates PyPi 包 是运行模板的第四种方法,可以直接在 PySpark 环境中运行(Dataproc 或本地/其他)。
示例

!pip3 install --user google-dataproc-templates==0.0.3

from dataproc_templates.bigquery.bigquery_to_gcs import BigQueryToGCSTemplate
from pyspark.sql import SparkSession

args = dict()
args["bigquery.gcs.input.table"] = "<bq_dataset>.<bq_table>"
args["bigquery.gcs.input.location"] = "<location>"
args["bigquery.gcs.output.format"] = "<format>"
args["bigquery.gcs.output.mode"] = "<mode>"
args["bigquery.gcs.output.location"] = "gs://<bucket_name/path>"

spark = SparkSession.builder \
        .appName("BIGQUERYTOGCS") \
        .enableHiveSupport() \
        .getOrCreate()

template = BigQueryToGCSTemplate()
template.run(spark, args)

小贴士:在 Vertex AI 管理笔记本中 启动 Dataproc Serverless Spark 会话,并利用无服务器 Spark 会话,其中您的作业将使用 Dataproc Serverless 运行,而不是您的本地 PySpark 环境。

虽然这提供了一种简单的方法来开始,但请记住,bin/start.sh 已经提供了一种简单的方法,例如,您可以指定所需的 .jar 依赖项。使用 PyPi 包,您需要根据特定模板的要求配置您的 PySpark 会话。例如,您需要指定 spark.driver.extraClassPath 配置

spark = SparkSession.builder \
        ... \
        .config('spark.driver.extraClassPath', '<template_required_dependency>.jar')
        ... \
        .getOrCreate()

设置本地环境

建议在设置本地环境时使用 虚拟环境。此设置对于提交模板不是必需的,仅用于本地运行和开发。

# Create a virtual environment, activate it and install requirements
mkdir venv
python -m venv venv/
source venv/bin/activate
pip install -r requirements.txt

运行单元测试

单元测试使用 pytest 开发。

要运行所有单元测试,只需运行 pytest

pytest

要生成覆盖率报告,请使用 coverage 运行测试

coverage run \
  --source=dataproc_templates \
  --module pytest \
  --verbose \
  test

coverage report --show-missing

运行模板

Dataproc Templates (Python - PySpark) 支持无服务器和集群模式。默认情况下,使用无服务器模式。要运行这些模板,请直接使用 gcloud CLI 或提供的 start.sh 脚本。

无服务器模式(默认)

使用batches submit pyspark命令将作业提交给Dataproc Serverless。

集群模式

使用jobs submit pyspark命令将作业提交给Dataproc标准集群。

要在现有集群上运行模板,必须另外指定环境变量JOB_TYPE=CLUSTERCLUSTER=<full clusterId>。例如

export GCP_PROJECT=my-gcp-project
export REGION=gcp-region
export GCS_STAGING_LOCATION=gs://my-bucket/temp
export JOB_TYPE=CLUSTER
export CLUSTER=${DATAPROC_CLUSTER_NAME}
./bin/start.sh \
-- --template HIVETOBIGQUERY

注意:一些需要自定义镜像来执行的HBase模板在集群模式下尚不支持。

提交模板

提供了一个shell脚本来

  • 构建Python包
  • 根据环境变量设置Dataproc参数
  • 使用提供的模板参数将所需的模板提交给Dataproc

提交时,用户需要提供3种类型的属性/参数。

  • Spark属性:请参阅此文档以查看可用的Spark属性。
  • 每个模板的特定参数:请参阅每个模板的README。
  • 通用参数:--template_name和--log_level
    • --log_level参数是可选的,默认为INFO。
      • 可能的选项是Spark日志级别:["ALL", "DEBUG", "ERROR", "FATAL", "INFO", "OFF", "TRACE", "WARN"]。

bin/start.sh用法:

# Set required environment variables
export GCP_PROJECT=<project_id>
export REGION=<region>
export GCS_STAGING_LOCATION=<gs://path>

# Set optional environment variables
export SUBNET=<subnet>
export JARS="gs://additional/dependency.jar"
export HISTORY_SERVER_CLUSTER=projects/{projectId}/regions/{regionId}/clusters/{clusterId}
export METASTORE_SERVICE=projects/{projectId}/locations/{regionId}/services/{serviceId}

# Submit to Dataproc passing template parameters
./bin/start.sh [--properties=<spark.something.key>=<value>] \
               -- --template=TEMPLATENAME \
                  --log_level=INFO \
                  --my.property="<value>" \
                  --my.other.property="<value>"
                  (etc...)

gcloud CLI用法:

也可以直接使用gcloud CLI提交作业。这可以通过以下方式实现

  1. dataproc_templates包构建为.egg
PACKAGE_EGG_FILE=dist/dataproc_templates_distribution.egg
python setup.py bdist_egg --output=${PACKAGE_EGG_FILE}
  1. 提交作业
  • 应该将main.py文件作为主Python脚本
  • 必须使用--py-files标志将包的.egg文件打包
gcloud dataproc batches submit pyspark \
      --region=<region> \
      --project=<project_id> \
      --jars="<required_jar_dependencies>" \
      --deps-bucket=<gs://path> \
      --subnet=<subnet> \
      --py-files=${PACKAGE_EGG_FILE} \
      [--properties=<spark.something.key>=<value>] \
      main.py \
      -- --template=TEMPLATENAME \
         --log_level=INFO \
         --<my.property>="<value>" \
         --<my.other.property>="<value>"
         (etc...)

Vertex AI用法:

按照Dataproc Templates (Jupyter Notebooks) README从Vertex AI笔记本提交Dataproc模板。

项目详情


下载文件

下载您平台上的文件。如果您不确定要选择哪个,请了解更多关于安装包的信息。

源分布

google_dataproc_templates-0.6.0b0.tar.gz (62.5 kB 查看散列)

上传时间

构建分布

google_dataproc_templates-0.6.0b0-py2.py3-none-any.whl (92.2 kB 查看散列)

上传时间 Python 2 Python 3