未提供项目描述
项目描述
mack
mack提供了一组辅助方法,使您能够轻松执行常见的Delta Lake操作。
安装
使用pip install mack
安装mack。
以下是使用mack执行单行代码的Type 2 SCD upsert的示例
import mack
mack.type_2_scd_upsert(path, updatesDF, "pkey", ["attr1", "attr2"])
Type 2 SCD Upserts
此库提供了一种基于约定的Type 2 SCD管理方法。在介绍利用该功能所需约定之前,让我们看看一个示例。
假设您有一个以下SCD表,其中包含pkey
主键
+----+-----+-----+----------+-------------------+--------+
|pkey|attr1|attr2|is_current| effective_time|end_time|
+----+-----+-----+----------+-------------------+--------+
| 1| A| A| true|2019-01-01 00:00:00| null|
| 2| B| B| true|2019-01-01 00:00:00| null|
| 4| D| D| true|2019-01-01 00:00:00| null|
+----+-----+-----+----------+-------------------+--------+
您想对此数据进行upsert操作
+----+-----+-----+-------------------+
|pkey|attr1|attr2| effective_time|
+----+-----+-----+-------------------+
| 2| Z| null|2020-01-01 00:00:00| // upsert data
| 3| C| C|2020-09-15 00:00:00| // new pkey
+----+-----+-----+-------------------+
以下是执行upsert的方法
mack.type_2_scd_upsert(delta_table, updatesDF, "pkey", ["attr1", "attr2"])
以下是upsert后的表
+----+-----+-----+----------+-------------------+-------------------+
|pkey|attr1|attr2|is_current| effective_time| end_time|
+----+-----+-----+----------+-------------------+-------------------+
| 2| B| B| false|2019-01-01 00:00:00|2020-01-01 00:00:00|
| 4| D| D| true|2019-01-01 00:00:00| null|
| 1| A| A| true|2019-01-01 00:00:00| null|
| 3| C| C| true|2020-09-15 00:00:00| null|
| 2| Z| null| true|2020-01-01 00:00:00| null|
+----+-----+-----+----------+-------------------+-------------------+
如果您的SCD表满足以下要求,则可以重用upsert代码
- 包含一个唯一主键列
- 任何属性列的变化都会触发upsert操作
- 通过
effective_time
、end_time
和is_current
列公开SCD逻辑(您还可以使用日期或版本列进行SCD upserts)
删除重复项
kill_duplicate
函数会完全从Delta表中删除所有重复行。
假设您有以下表
+----+----+----+
|col1|col2|col3|
+----+----+----+
| 1| A| A| # duplicate
| 2| A| B|
| 3| A| A| # duplicate
| 4| A| A| # duplicate
| 5| B| B| # duplicate
| 6| D| D|
| 9| B| B| # duplicate
+----+----+----+
运行kill_duplicates
函数
mack.kill_duplicates(deltaTable, ["col2", "col3"])
以下是表的最终状态
+----+----+----+
|col1|col2|col3|
+----+----+----+
| 2| A| B|
| 6| D| D|
+----+----+----+
使用主键删除重复项
drop_duplicates_pkey
函数从Delta表中删除除一个重复行之外的所有重复行。**警告**:您必须提供一个主键列,该列必须包含唯一值,否则该方法将默认删除重复项。如果无法提供唯一主键,则可以使用drop_duplicates
方法。
假设您有以下表
+----+----+----+----+
|col1|col2|col3|col4|
+----+----+----+----+
| 1| A| A| C| # duplicate1
| 2| A| B| C|
| 3| A| A| D| # duplicate1
| 4| A| A| E| # duplicate1
| 5| B| B| C| # duplicate2
| 6| D| D| C|
| 9| B| B| E| # duplicate2
+----+----+----+----+
运行drop_duplicates
函数
mack.drop_duplicates_pkey(delta_table=deltaTable, primary_key="col1", duplication_columns=["col2", "col3"])
以下是表的最终状态
+----+----+----+----+
|col1|col2|col3|col4|
+----+----+----+----+
| 1| A| A| C|
| 2| A| B| C|
| 5| B| B| C|
| 6| D| D| C|
+----+----+----+----+
删除重复项
drop_duplicates
函数从 Delta 表中移除除一行外的所有重复行。它的行为与 drop_duplicates
DataFrame API 完全相同。警告:此方法会覆盖整个表,因此效率非常低。如果可能,请改用 drop_duplicates_pkey
方法。
假设您有以下表
+----+----+----+----+
|col1|col2|col3|col4|
+----+----+----+----+
| 1| A| A| C| # duplicate
| 1| A| A| C| # duplicate
| 2| A| A| C|
+----+----+----+----+
运行drop_duplicates
函数
mack.drop_duplicates(delta_table=deltaTable, duplication_columns=["col1"])
以下是表的最终状态
+----+----+----+----+
|col1|col2|col3|col4|
+----+----+----+----+
| 1| A| A| C| # duplicate
| 2| A| A| C| # duplicate
+----+----+----+----+
复制表
copy_table
函数复制现有的 Delta 表。当你复制一个表时,它将在指定的目标处重新创建。此目标可以是路径或元数据存储中的表。复制包括
- 数据
- 分区
- 表属性
复制不包括 delta 日志,这意味着您无法将新表恢复到原始表的旧版本。
以下是执行复制的方法
mack.copy_table(delta_table=deltaTable, target_path=path)
验证追加
validate_append
函数提供了一种机制,允许某些列进行模式演进,但拒绝包含未明确允许的列的追加。
假设您有以下 Delta 表
+----+----+----+
|col1|col2|col3|
+----+----+----+
| 2| b| B|
| 1| a| A|
+----+----+----+
以下是一个包装 validate_append
的追加函数
def append_fun(delta_table, append_df):
mack.validate_append(
delta_table,
append_df,
required_cols=["col1", "col2"],
optional_cols=["col4"],
)
您可以追加以下 DataFrame,它包含所需的列和可选列
+----+----+----+
|col1|col2|col4|
+----+----+----+
| 3| c| cat|
| 4| d| dog|
+----+----+----+
追加这些数据后,Delta 表将包含以下内容
+----+----+----+----+
|col1|col2|col3|col4|
+----+----+----+----+
| 3| c|null| cat|
| 4| d|null| dog|
| 2| b| B|null|
| 1| a| A|null|
+----+----+----+----+
您不能追加以下 DataFrame,它包含所需的列,但还包含一个未指定为可选列的列(col5
)。
+----+----+----+
|col1|col2|col5|
+----+----+----+
| 4| b| A|
| 5| y| C|
| 6| z| D|
+----+----+----+
尝试此写入时,您将得到以下错误:“TypeError:列 'col5' 不属于当前 Delta 表。如果要将列添加到表中,您必须设置 optional_cols 参数。”
您也不能追加以下 DataFrame,它缺少一个所需的列。
+----+----+
|col1|col4|
+----+----+
| 4| A|
| 5| C|
| 6| D|
+----+----+
您将得到以下错误:“TypeError:基础 Delta 表包含以下列 '['col1', 'col4']',但需要以下列 '['col1', 'col2']'。”
追加无重复数据
append_without_duplicates
函数有助于将记录追加到现有的 Delta 表,而不会将重复记录追加到记录中。
假设您有以下 Delta 表
+----+----+----+
|col1|col2|col3|
+----+----+----+
| 1| A| B|
| 2| C| D|
| 3| E| F|
+----+----+----+
以下是待追加的数据
+----+----+----+
|col1|col2|col3|
+----+----+----+
| 2| R| T| # duplicate col1
| 8| A| B|
| 8| C| D| # duplicate col1
| 10| X| Y|
+----+----+----+
运行 append_without_duplicates
函数
mack.append_without_duplicates(deltaTable, append_df, ["col1"])
以下是最终结果
+----+----+----+
|col1|col2|col3|
+----+----+----+
| 1| A| B|
| 2| C| D|
| 3| E| F|
| 8| A| B|
| 10| X| Y|
+----+----+----+
请注意,重复的 col1
值没有被追加。如果运行正常的追加操作,那么 Delta 表将包含两行数据,其中 col1
等于 2。
Delta 文件大小
delta_file_sizes
函数返回一个字典,其中包含给定 Delta 表的总字节数、文件数量和平均文件大小。
假设您有以下分区为 col1
的 Delta 表
+----+----+----+
|col1|col2|col3|
+----+----+----+
| 1| A| A|
| 2| A| B|
+----+----+----+
在表中运行 mack.delta_file_sizes(delta_table)
将返回
{"size_in_bytes": 1320,
"number_of_files": 2,
"average_file_size_in_bytes": 660}
显示 Delta 文件大小
show_delta_file_sizes
函数打印 delta 表的文件数量、表大小和平均文件大小。
假设您有以下分区为 col1
的 Delta 表
+----+----+----+
|col1|col2|col3|
+----+----+----+
| 1| A| A|
| 2| A| B|
+----+----+----+
在表中运行 mack.delta_file_sizes(delta_table)
将打印
Delta 表包含 2 个大小为 1.32 kB 的文件。平均文件大小为 660.0 B
人类化字节
humanize_bytes
函数以易于人类阅读的格式格式化表示字节数的整数。
mack.humanize_bytes(1234567890) # "1.23 GB"
mack.humanize_bytes(1234567890000) # "1.23 TB"
与 1234567890 字节相比,1.23 GB 对于人类来说更容易理解。
是否为复合键候选
is_composite_key_candidate
函数返回一个布尔值,表示一组列是否唯一,并可以形成一个复合键。
假设您有以下 Delta 表
+----+----+----+
|col1|col2|col3|
+----+----+----+
| 1| A| A|
| 2| B| B|
| 2| C| B|
+----+----+----+
在表中运行 mack.is_composite_key_candidate(delta_table, ["col1"])
将返回 False
。在表中运行 mack.is_composite_key_candidate(delta_table, ["col1", "col2"])
将返回 True
。
在 Delta 表中查找复合键候选
find_composite_key_candidates
函数帮助您找到唯一标识 Delta 表中的行的复合键。它返回可以用作复合键的列列表。
假设您有以下 Delta 表
+----+----+----+
|col1|col2|col3|
+----+----+----+
| 1| a| z|
| 1| a| b|
| 3| c| b|
+----+----+----+
在表中运行 mack.find_composite_key_candidates(delta_table)
将返回 ["col1", "col3"]
。
追加 md5 列
with_md5_cols
函数将指定列的 md5
哈希值追加到 DataFrame 中。如果选择的列形成一个复合键,则可以用作唯一键。
您可以使用此函数与 find_composite_key_candidates
中识别的列一起使用,将唯一键追加到 DataFrame 中。
假设您有以下 Delta 表
+----+----+----+
|col1|col2|col3|
+----+----+----+
| 1| a|null|
| 2| b| b|
| 3| c| c|
+----+----+----+
在表格上运行 mack.with_md5_cols(delta_table, ["col2", "col3"])
将按以下方式追加 md5_col2_col3
+----+----+----+--------------------------------+
|col1|col2|col3|md5_col2_col3 |
+----+----+----+--------------------------------+
|1 |a |null|0cc175b9c0f1b6a831c399e269772661|
|2 |b |b |1eeaac3814eb80cc40efb005cf0b9141|
|3 |c |c |4e202f8309e7b00349c70845ab02fce9|
+----+----+----+--------------------------------+
获取最新的 Delta 表版本
latest_version
函数获取最新的 Delta 表版本号并返回。
delta_table = DeltaTable.forPath(spark, path)
mack.latest_version(delta_table)
>> 2
带有约束的追加数据
constraint_append
函数有助于将记录追加到现有的 Delta 表中,即使追加 DataFrame 中的记录违反了表约束(包括检查和非空约束),这些记录也会追加到现有的隔离 Delta 表中,而不是目标表。如果隔离 Delta 表设置为 None
,则违反表约束的记录将被简单地丢弃。
假设您有以下目标 Delta 表,具有以下模式和约束
schema:
col1 int not null
col2 string null
col3 string null
check constraints:
col1_constraint: (col1 > 0)
col2_constraint: (col2 != 'Z')
+----+----+----+
|col1|col2|col3|
+----+----+----+
| 1| A| B|
| 2| C| D|
| 3| E| F|
+----+----+----+
假设您有一个与相同模式但无约束的隔离 Delta 表。
以下是待追加的数据
+----+----+----+
|col1|col2|col3|
+----+----+----+
| | H| H| # violates col1 not null constraint
| 0| Z| Z| # violates both col1_constraint and col2_constraint
| 4| A| B|
| 5| C| D|
| 6| E| F|
| 9| G| G|
| 11| Z| Z| # violates col2_constraint
+----+----+----+
运行 constraint_append
函数
mack.constraint_append(delta_table, append_df, quarantine_table)
这是 delta_table 中的最终结果
+----+----+----+
|col1|col2|col3|
+----+----+----+
| 1| A| B|
| 2| C| D|
| 3| E| F|
| 4| A| B|
| 5| C| D|
| 6| E| F|
| 9| G| G|
+----+----+----+
这是 quarantine_table 中的最终结果
+----+----+----+
|col1|col2|col3|
+----+----+----+
| | H| H|
| 0| Z| Z|
| 11| Z| Z|
+----+----+----+
请注意,违反了任一约束的记录都被追加到了隔离表中,其他记录被追加到目标表中,追加操作没有失败。如果运行了正常的追加操作,那么它会在约束违规时失败。如果 quarantine_table
设置为 None
,则违反任一约束的记录将被简单地丢弃。
重命名 Delta 表
此函数旨在重命名 Delta 表。它可以在 Databricks 环境内或使用独立 Spark 会话操作。
参数
delta_table
(DeltaTable
):表示要重命名的 Delta 表的对象。new_table_name
(str
):表的新名称。table_location
(str
,可选):表存储的文件路径。如果未提供,则函数尝试从DeltaTable
对象中推断位置。默认为None
。databricks
(bool
,可选):一个标志,表示函数的操作环境。如果是在 Databricks 中运行,则设置为True
,否则为False
。默认为False
。spark_session
(pyspark.sql.SparkSession
,可选):Spark 会话。当databricks
设置为True
时,这是必需的。默认为None
。
返回
None
引发
TypeError
:如果提供的delta_table
不是一个 DeltaTable 对象,或者如果databricks
设置为True
且spark_session
为None
。
示例用法
rename_delta_table(existing_delta_table, "new_table_name")
字典
我们在这里使用了以下定义这里的术语。
- 自然键:一个可以唯一标识行且存在于现实世界中的属性。
- 代理键:一个可以唯一标识行且不存在于现实世界中的属性。
- 复合键:多个属性组合在一起可以唯一标识一个行。
- 主键:行的唯一标识符。
- 候选键:可能作为主键的属性。
- 备选键:不是主键的候选键。
- 唯一键:可以在表上唯一的属性。也可以称为备选键。
- 外键:用于引用另一个表中记录的属性。
项目维护者
- Matthew Powers,又名 MrPowers
- Robert Kossendey,又名 robertkossendey
- Souvik Pratiher,又名 souvik-databricks
项目理念
mack库旨在让常见的Delta Lake数据任务更简单。
当然,您不需要使用mack。您可以自己编写逻辑。
如果您不想在项目中添加依赖项,也可以轻松复制/粘贴mack中的函数。这个库中的函数特意设计为易于复制和粘贴。
让我们看看您可能想要将mack作为依赖项的一些原因。
提供良好的公共接口
公共接口(仅公共接口)通过mack
命名空间提供。
当您运行import mack
时,您可以访问全部公共接口。在mack
命名空间中不暴露任何私有实现细节。
最小依赖
Mack仅依赖于Spark & Delta Lake。不会向Mack添加其他依赖项。
Spark用户利用了各种运行时,添加依赖项并不总是那么容易。您可以运行pip install mack
,而无需担心解决大量的依赖项冲突。您还可以仅将mack wheel文件附加到集群以利用该项目。
为社区提供最佳实践示例
Mack力求成为PySpark / Delta Lake社区的优质代码库示例。
开源Delta Lake项目并不多。使用良好软件工程实践(如CI和单元测试)的项目就更少了。您可以使用mack来帮助指导您在专有代码库中的设计决策。
稳定的公共接口和1.0版本发布后的长期支持
Mack有权在1.0版本发布之前做出破坏性的公共接口更改。我们将始终尽可能地最小化破坏性更改。
1.0版本发布后,Mack将严格遵循语义版本化2.0,并在主要版本中仅进行破坏性的公共接口更改。希望1.0将是唯一的主要版本,并且不需要进行任何破坏性更改。
代码设计
以下是Mack中使用的部分代码设计原则
- 我们尽可能避免使用类。类使得将代码的小块复制/粘贴到笔记本中变得困难。最好是停止编写类。
- 我们试图编写易于复制的函数。我们通过限制依赖于其他函数或类的函数来实现这一点。我们宁愿将单个使用函数嵌套在公共接口方法中,也不愿将其单独制作。
- 先开发后抽象。所有代码都在单个文件中,直到正确的抽象变得明显。我们宁愿有一个大文件,也不愿有错误的抽象。
Docker环境
Dockerfile
和docker-compose
文件提供了使用mack进行运行和开发容器化的方式。
- 第一次运行
docker build --tag=mack .
来构建镜像。 - 要执行Docker容器内的单元测试,运行
docker-compose up test
- 要进入运行的Docker容器进行开发,运行
docker run -it mack /bin/bash
社区
博客
视频
项目详情
下载文件
下载适合您平台的文件。如果您不确定选择哪个,请了解更多关于安装包的信息。
源分布
构建的发行版
mack-0.5.0.tar.gz 的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 9b4a621c8955451fea0fa819f5ba52410912825f746162c475b985c553192b25 |
|
MD5 | 28658a99c80e9f37bd6952046280517b |
|
BLAKE2b-256 | 1242aeef0fabef18362a41434ed30e83f0d7bda2d46dd02fd49374082f536128 |
mack-0.5.0-py3-none-any.whl 的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 83d1bb349df2262a49270bc72866e8548a23c044ac8cae5294ce243bce26d886 |
|
MD5 | 286f76453bc0eeeeb84dfbe47c2a78f1 |
|
BLAKE2b-256 | 1e6a0fe0d17c4b7f6c45bda51f19c6a41ad7021d2a4ebd608adb13e7bc8b8216 |