用Python编写的Google Dataproc模板
项目描述
数据流程模板(Python - PySpark)
- AzureBlobStorageToBigQuery
- BigQueryToGCS(博客文章 链接)
- CassandraToBigquery
- CassandraToGCS(博客文章 链接)
- ElasticsearchToBigQuery
- ElasticsearchToBigtable
- ElasticsearchToGCS
- GCSToBigQuery(博客文章 链接)
- GCSToBigTable(博客文章 链接)
- GCSToGCS(博客文章 链接)
- GCSToJDBC(博客文章 链接)
- GCSToMongo (博客文章 链接)
- HbaseToGCS
- HiveToBigQuery (博客文章 链接)
- HiveToGCS(博客文章 链接)
- JDBCToBigQuery (博客文章 链接)
- JDBCToGCS (博客文章 链接)
- JDBCToJDBC (博客文章 链接)
- KafkaToGCS
- KafkaToBigQuery
- MongoToGCS(博客文章 链接)
- MongoToBigQuery
- PubSubLiteToBigtable
- RedshiftToGCS(博客文章 链接)
- S3ToBigQuery
- SnowflakeToGCS(博客文章 链接)
- TextToBigQuery
Dataproc Templates (Python - PySpark) 支持使用 batches submit pyspark 提交作业到 Dataproc Serverless,以及使用 jobs submit pyspark 提交到 Dataproc 集群。
使用 PyPi 包运行
在本 README 中,您可以看到如何运行模板的说明。
目前,有 3 种选项被描述
- 使用 bin/start.sh
- 使用 gcloud CLI
- 使用 Vertex AI
这三种选项需要您克隆此存储库并开始运行模板。
Dataproc Templates PyPi 包 是运行模板的第四种方法,可以直接在 PySpark 环境中运行(Dataproc 或本地/其他)。
示例
!pip3 install --user google-dataproc-templates==0.0.3
from dataproc_templates.bigquery.bigquery_to_gcs import BigQueryToGCSTemplate
from pyspark.sql import SparkSession
args = dict()
args["bigquery.gcs.input.table"] = "<bq_dataset>.<bq_table>"
args["bigquery.gcs.input.location"] = "<location>"
args["bigquery.gcs.output.format"] = "<format>"
args["bigquery.gcs.output.mode"] = "<mode>"
args["bigquery.gcs.output.location"] = "gs://<bucket_name/path>"
spark = SparkSession.builder \
.appName("BIGQUERYTOGCS") \
.enableHiveSupport() \
.getOrCreate()
template = BigQueryToGCSTemplate()
template.run(spark, args)
小贴士:在 Vertex AI 管理笔记本中 启动 Dataproc Serverless Spark 会话,并利用无服务器 Spark 会话,其中您的作业将使用 Dataproc Serverless 运行,而不是您的本地 PySpark 环境。
虽然这提供了一种简单的方法来开始,但请记住,bin/start.sh 已经提供了一种简单的方法,例如,您可以指定所需的 .jar 依赖项。使用 PyPi 包,您需要根据特定模板的要求配置您的 PySpark 会话。例如,您需要指定 spark.driver.extraClassPath 配置
spark = SparkSession.builder \
... \
.config('spark.driver.extraClassPath', '<template_required_dependency>.jar')
... \
.getOrCreate()
设置本地环境
建议在设置本地环境时使用 虚拟环境。此设置对于提交模板不是必需的,仅用于本地运行和开发。
# Create a virtual environment, activate it and install requirements
mkdir venv
python -m venv venv/
source venv/bin/activate
pip install -r requirements.txt
运行单元测试
单元测试使用 pytest
开发。
要运行所有单元测试,只需运行 pytest
pytest
要生成覆盖率报告,请使用 coverage 运行测试
coverage run \
--source=dataproc_templates \
--module pytest \
--verbose \
test
coverage report --show-missing
运行模板
Dataproc Templates (Python - PySpark) 支持无服务器和集群模式。默认情况下,使用无服务器模式。要运行这些模板,请直接使用 gcloud
CLI 或提供的 start.sh
脚本。
无服务器模式(默认)
使用batches submit pyspark命令将作业提交给Dataproc Serverless。
集群模式
使用jobs submit pyspark命令将作业提交给Dataproc标准集群。
要在现有集群上运行模板,必须另外指定环境变量JOB_TYPE=CLUSTER
和CLUSTER=<full clusterId>
。例如
export GCP_PROJECT=my-gcp-project
export REGION=gcp-region
export GCS_STAGING_LOCATION=gs://my-bucket/temp
export JOB_TYPE=CLUSTER
export CLUSTER=${DATAPROC_CLUSTER_NAME}
./bin/start.sh \
-- --template HIVETOBIGQUERY
注意:一些需要自定义镜像来执行的HBase模板在集群模式下尚不支持。
提交模板
提供了一个shell脚本来
- 构建Python包
- 根据环境变量设置Dataproc参数
- 使用提供的模板参数将所需的模板提交给Dataproc
提交时,用户需要提供3种类型的属性/参数。
- Spark属性:请参阅此文档以查看可用的Spark属性。
- 每个模板的特定参数:请参阅每个模板的README。
- 通用参数:--template_name和--log_level
- --log_level参数是可选的,默认为INFO。
- 可能的选项是Spark日志级别:["ALL", "DEBUG", "ERROR", "FATAL", "INFO", "OFF", "TRACE", "WARN"]。
- --log_level参数是可选的,默认为INFO。
bin/start.sh用法:
# Set required environment variables
export GCP_PROJECT=<project_id>
export REGION=<region>
export GCS_STAGING_LOCATION=<gs://path>
# Set optional environment variables
export SUBNET=<subnet>
export JARS="gs://additional/dependency.jar"
export HISTORY_SERVER_CLUSTER=projects/{projectId}/regions/{regionId}/clusters/{clusterId}
export METASTORE_SERVICE=projects/{projectId}/locations/{regionId}/services/{serviceId}
# Submit to Dataproc passing template parameters
./bin/start.sh [--properties=<spark.something.key>=<value>] \
-- --template=TEMPLATENAME \
--log_level=INFO \
--my.property="<value>" \
--my.other.property="<value>"
(etc...)
gcloud CLI用法:
也可以直接使用gcloud CLI提交作业。这可以通过以下方式实现
- 将
dataproc_templates
包构建为.egg
PACKAGE_EGG_FILE=dist/dataproc_templates_distribution.egg
python setup.py bdist_egg --output=${PACKAGE_EGG_FILE}
- 提交作业
- 应该将
main.py
文件作为主Python脚本 - 必须使用
--py-files
标志将包的.egg
文件打包
gcloud dataproc batches submit pyspark \
--region=<region> \
--project=<project_id> \
--jars="<required_jar_dependencies>" \
--deps-bucket=<gs://path> \
--subnet=<subnet> \
--py-files=${PACKAGE_EGG_FILE} \
[--properties=<spark.something.key>=<value>] \
main.py \
-- --template=TEMPLATENAME \
--log_level=INFO \
--<my.property>="<value>" \
--<my.other.property>="<value>"
(etc...)
Vertex AI用法:
按照Dataproc Templates (Jupyter Notebooks) README从Vertex AI笔记本提交Dataproc模板。
项目详情
下载文件
下载您平台上的文件。如果您不确定要选择哪个,请了解更多关于安装包的信息。
源分布
构建分布
google_dataproc_templates-0.6.0b0.tar.gz的散列
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 22ad7883540b10144c2b563aca2ff16c8c893a0c8d8c954bdbee0530d624ee10 |
|
MD5 | a49101b8f3034e3f49435f2ad334be97 |
|
BLAKE2b-256 | 2f1613b73c5fedc32a399c852abd2c86ced2f1ddc74c84c13335cfda5b219242 |
google_dataproc_templates-0.6.0b0-py2.py3-none-any.whl 的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 0e9634fc17435b6f61eb0d26d45aab9db5e4464495d3024a99906d8ec4c71c85 |
|
MD5 | fb833949d1ce7455aaf32d09204c3dd1 |
|
BLAKE2b-256 | e6ad473ea3590b034785884d1a1f3d60769155e4e432d2b2562e80b42cf95c5a |