跳转到主要内容

未提供项目描述

项目描述

mack

image image image PyPI - Downloads PyPI version

mack提供了一组辅助方法,使您能够轻松执行常见的Delta Lake操作。

mack

安装

使用pip install mack安装mack。

以下是使用mack执行单行代码的Type 2 SCD upsert的示例

import mack

mack.type_2_scd_upsert(path, updatesDF, "pkey", ["attr1", "attr2"])

Type 2 SCD Upserts

此库提供了一种基于约定的Type 2 SCD管理方法。在介绍利用该功能所需约定之前,让我们看看一个示例。

假设您有一个以下SCD表,其中包含pkey主键

+----+-----+-----+----------+-------------------+--------+
|pkey|attr1|attr2|is_current|     effective_time|end_time|
+----+-----+-----+----------+-------------------+--------+
|   1|    A|    A|      true|2019-01-01 00:00:00|    null|
|   2|    B|    B|      true|2019-01-01 00:00:00|    null|
|   4|    D|    D|      true|2019-01-01 00:00:00|    null|
+----+-----+-----+----------+-------------------+--------+

您想对此数据进行upsert操作

+----+-----+-----+-------------------+
|pkey|attr1|attr2|     effective_time|
+----+-----+-----+-------------------+
|   2|    Z| null|2020-01-01 00:00:00| // upsert data
|   3|    C|    C|2020-09-15 00:00:00| // new pkey
+----+-----+-----+-------------------+

以下是执行upsert的方法

mack.type_2_scd_upsert(delta_table, updatesDF, "pkey", ["attr1", "attr2"])

以下是upsert后的表

+----+-----+-----+----------+-------------------+-------------------+
|pkey|attr1|attr2|is_current|     effective_time|           end_time|
+----+-----+-----+----------+-------------------+-------------------+
|   2|    B|    B|     false|2019-01-01 00:00:00|2020-01-01 00:00:00|
|   4|    D|    D|      true|2019-01-01 00:00:00|               null|
|   1|    A|    A|      true|2019-01-01 00:00:00|               null|
|   3|    C|    C|      true|2020-09-15 00:00:00|               null|
|   2|    Z| null|      true|2020-01-01 00:00:00|               null|
+----+-----+-----+----------+-------------------+-------------------+

如果您的SCD表满足以下要求,则可以重用upsert代码

  • 包含一个唯一主键列
  • 任何属性列的变化都会触发upsert操作
  • 通过effective_timeend_timeis_current列公开SCD逻辑(您还可以使用日期或版本列进行SCD upserts)

删除重复项

kill_duplicate函数会完全从Delta表中删除所有重复行。

假设您有以下表

+----+----+----+
|col1|col2|col3|
+----+----+----+
|   1|   A|   A| # duplicate
|   2|   A|   B|
|   3|   A|   A| # duplicate
|   4|   A|   A| # duplicate
|   5|   B|   B| # duplicate
|   6|   D|   D|
|   9|   B|   B| # duplicate
+----+----+----+

运行kill_duplicates函数

mack.kill_duplicates(deltaTable, ["col2", "col3"])

以下是表的最终状态

+----+----+----+
|col1|col2|col3|
+----+----+----+
|   2|   A|   B|
|   6|   D|   D|
+----+----+----+

使用主键删除重复项

drop_duplicates_pkey函数从Delta表中删除除一个重复行之外的所有重复行。**警告**:您必须提供一个主键列,该列必须包含唯一值,否则该方法将默认删除重复项。如果无法提供唯一主键,则可以使用drop_duplicates方法。

假设您有以下表

+----+----+----+----+
|col1|col2|col3|col4|
+----+----+----+----+
|   1|   A|   A|   C| # duplicate1
|   2|   A|   B|   C|
|   3|   A|   A|   D| # duplicate1
|   4|   A|   A|   E| # duplicate1
|   5|   B|   B|   C| # duplicate2
|   6|   D|   D|   C|
|   9|   B|   B|   E| # duplicate2
+----+----+----+----+

运行drop_duplicates函数

mack.drop_duplicates_pkey(delta_table=deltaTable, primary_key="col1", duplication_columns=["col2", "col3"])

以下是表的最终状态

+----+----+----+----+
|col1|col2|col3|col4|
+----+----+----+----+
|   1|   A|   A|   C|
|   2|   A|   B|   C|
|   5|   B|   B|   C|
|   6|   D|   D|   C|
+----+----+----+----+

删除重复项

drop_duplicates 函数从 Delta 表中移除除一行外的所有重复行。它的行为与 drop_duplicates DataFrame API 完全相同。警告:此方法会覆盖整个表,因此效率非常低。如果可能,请改用 drop_duplicates_pkey 方法。

假设您有以下表

+----+----+----+----+
|col1|col2|col3|col4|
+----+----+----+----+
|   1|   A|   A|   C| # duplicate
|   1|   A|   A|   C| # duplicate
|   2|   A|   A|   C|
+----+----+----+----+

运行drop_duplicates函数

mack.drop_duplicates(delta_table=deltaTable, duplication_columns=["col1"])

以下是表的最终状态

+----+----+----+----+
|col1|col2|col3|col4|
+----+----+----+----+
|   1|   A|   A|   C| # duplicate
|   2|   A|   A|   C| # duplicate
+----+----+----+----+

复制表

copy_table 函数复制现有的 Delta 表。当你复制一个表时,它将在指定的目标处重新创建。此目标可以是路径或元数据存储中的表。复制包括

  • 数据
  • 分区
  • 表属性

复制包括 delta 日志,这意味着您无法将新表恢复到原始表的旧版本。

以下是执行复制的方法

mack.copy_table(delta_table=deltaTable, target_path=path)

验证追加

validate_append 函数提供了一种机制,允许某些列进行模式演进,但拒绝包含未明确允许的列的追加。

假设您有以下 Delta 表

+----+----+----+
|col1|col2|col3|
+----+----+----+
|   2|   b|   B|
|   1|   a|   A|
+----+----+----+

以下是一个包装 validate_append 的追加函数

def append_fun(delta_table, append_df):
    mack.validate_append(
        delta_table,
        append_df,
        required_cols=["col1", "col2"],
        optional_cols=["col4"],
    )

您可以追加以下 DataFrame,它包含所需的列和可选列

+----+----+----+
|col1|col2|col4|
+----+----+----+
|   3|   c| cat|
|   4|   d| dog|
+----+----+----+

追加这些数据后,Delta 表将包含以下内容

+----+----+----+----+
|col1|col2|col3|col4|
+----+----+----+----+
|   3|   c|null| cat|
|   4|   d|null| dog|
|   2|   b|   B|null|
|   1|   a|   A|null|
+----+----+----+----+

您不能追加以下 DataFrame,它包含所需的列,但还包含一个未指定为可选列的列(col5)。

+----+----+----+
|col1|col2|col5|
+----+----+----+
|   4|   b|   A|
|   5|   y|   C|
|   6|   z|   D|
+----+----+----+

尝试此写入时,您将得到以下错误:“TypeError:列 'col5' 不属于当前 Delta 表。如果要将列添加到表中,您必须设置 optional_cols 参数。”

您也不能追加以下 DataFrame,它缺少一个所需的列。

+----+----+
|col1|col4|
+----+----+
|   4|   A|
|   5|   C|
|   6|   D|
+----+----+

您将得到以下错误:“TypeError:基础 Delta 表包含以下列 '['col1', 'col4']',但需要以下列 '['col1', 'col2']'。”

追加无重复数据

append_without_duplicates 函数有助于将记录追加到现有的 Delta 表,而不会将重复记录追加到记录中。

假设您有以下 Delta 表

+----+----+----+
|col1|col2|col3|
+----+----+----+
|   1|   A|   B|
|   2|   C|   D|
|   3|   E|   F|
+----+----+----+

以下是待追加的数据

+----+----+----+
|col1|col2|col3|
+----+----+----+
|   2|   R|   T| # duplicate col1
|   8|   A|   B|
|   8|   C|   D| # duplicate col1
|  10|   X|   Y|
+----+----+----+

运行 append_without_duplicates 函数

mack.append_without_duplicates(deltaTable, append_df, ["col1"])

以下是最终结果


+----+----+----+
|col1|col2|col3|
+----+----+----+
|   1|   A|   B|
|   2|   C|   D|
|   3|   E|   F|
|   8|   A|   B|
|  10|   X|   Y|
+----+----+----+

请注意,重复的 col1 值没有被追加。如果运行正常的追加操作,那么 Delta 表将包含两行数据,其中 col1 等于 2。

Delta 文件大小

delta_file_sizes 函数返回一个字典,其中包含给定 Delta 表的总字节数、文件数量和平均文件大小。

假设您有以下分区为 col1 的 Delta 表

+----+----+----+
|col1|col2|col3|
+----+----+----+
|   1|   A|   A|
|   2|   A|   B|
+----+----+----+

在表中运行 mack.delta_file_sizes(delta_table) 将返回

{"size_in_bytes": 1320,
"number_of_files": 2,
"average_file_size_in_bytes": 660}

显示 Delta 文件大小

show_delta_file_sizes 函数打印 delta 表的文件数量、表大小和平均文件大小。

假设您有以下分区为 col1 的 Delta 表

+----+----+----+
|col1|col2|col3|
+----+----+----+
|   1|   A|   A|
|   2|   A|   B|
+----+----+----+

在表中运行 mack.delta_file_sizes(delta_table) 将打印

Delta 表包含 2 个大小为 1.32 kB 的文件。平均文件大小为 660.0 B

人类化字节

humanize_bytes 函数以易于人类阅读的格式格式化表示字节数的整数。

mack.humanize_bytes(1234567890) # "1.23 GB"
mack.humanize_bytes(1234567890000) # "1.23 TB"

与 1234567890 字节相比,1.23 GB 对于人类来说更容易理解。

是否为复合键候选

is_composite_key_candidate 函数返回一个布尔值,表示一组列是否唯一,并可以形成一个复合键。

假设您有以下 Delta 表

+----+----+----+
|col1|col2|col3|
+----+----+----+
|   1|   A|   A|
|   2|   B|   B|
|   2|   C|   B|
+----+----+----+

在表中运行 mack.is_composite_key_candidate(delta_table, ["col1"]) 将返回 False。在表中运行 mack.is_composite_key_candidate(delta_table, ["col1", "col2"]) 将返回 True

在 Delta 表中查找复合键候选

find_composite_key_candidates 函数帮助您找到唯一标识 Delta 表中的行的复合键。它返回可以用作复合键的列列表。

假设您有以下 Delta 表

+----+----+----+
|col1|col2|col3|
+----+----+----+
|   1|   a|   z|
|   1|   a|   b|
|   3|   c|   b|
+----+----+----+

在表中运行 mack.find_composite_key_candidates(delta_table) 将返回 ["col1", "col3"]

追加 md5 列

with_md5_cols 函数将指定列的 md5 哈希值追加到 DataFrame 中。如果选择的列形成一个复合键,则可以用作唯一键。

您可以使用此函数与 find_composite_key_candidates 中识别的列一起使用,将唯一键追加到 DataFrame 中。

假设您有以下 Delta 表

+----+----+----+
|col1|col2|col3|
+----+----+----+
|   1|   a|null|
|   2|   b|   b|
|   3|   c|   c|
+----+----+----+

在表格上运行 mack.with_md5_cols(delta_table, ["col2", "col3"]) 将按以下方式追加 md5_col2_col3

+----+----+----+--------------------------------+
|col1|col2|col3|md5_col2_col3                   |
+----+----+----+--------------------------------+
|1   |a   |null|0cc175b9c0f1b6a831c399e269772661|
|2   |b   |b   |1eeaac3814eb80cc40efb005cf0b9141|
|3   |c   |c   |4e202f8309e7b00349c70845ab02fce9|
+----+----+----+--------------------------------+

获取最新的 Delta 表版本

latest_version 函数获取最新的 Delta 表版本号并返回。

delta_table = DeltaTable.forPath(spark, path)
mack.latest_version(delta_table)
>> 2

带有约束的追加数据

constraint_append 函数有助于将记录追加到现有的 Delta 表中,即使追加 DataFrame 中的记录违反了表约束(包括检查和非空约束),这些记录也会追加到现有的隔离 Delta 表中,而不是目标表。如果隔离 Delta 表设置为 None,则违反表约束的记录将被简单地丢弃。

假设您有以下目标 Delta 表,具有以下模式和约束

schema:
col1 int not null
col2 string null
col3 string null

check constraints:
col1_constraint: (col1 > 0)
col2_constraint: (col2 != 'Z')

+----+----+----+
|col1|col2|col3|
+----+----+----+
|   1|   A|   B|
|   2|   C|   D|
|   3|   E|   F|
+----+----+----+

假设您有一个与相同模式但无约束的隔离 Delta 表。

以下是待追加的数据

+----+----+----+
|col1|col2|col3|
+----+----+----+
|    |   H|   H| # violates col1 not null constraint
|   0|   Z|   Z| # violates both col1_constraint and col2_constraint
|   4|   A|   B|
|   5|   C|   D|
|   6|   E|   F|
|   9|   G|   G|
|  11|   Z|   Z| # violates col2_constraint
+----+----+----+

运行 constraint_append 函数

mack.constraint_append(delta_table, append_df, quarantine_table)

这是 delta_table 中的最终结果


+----+----+----+
|col1|col2|col3|
+----+----+----+
|   1|   A|   B|
|   2|   C|   D|
|   3|   E|   F|
|   4|   A|   B|
|   5|   C|   D|
|   6|   E|   F|
|   9|   G|   G|
+----+----+----+

这是 quarantine_table 中的最终结果


+----+----+----+
|col1|col2|col3|
+----+----+----+
|    |   H|   H|
|   0|   Z|   Z|
|  11|   Z|   Z|
+----+----+----+

请注意,违反了任一约束的记录都被追加到了隔离表中,其他记录被追加到目标表中,追加操作没有失败。如果运行了正常的追加操作,那么它会在约束违规时失败。如果 quarantine_table 设置为 None,则违反任一约束的记录将被简单地丢弃。

重命名 Delta 表

此函数旨在重命名 Delta 表。它可以在 Databricks 环境内或使用独立 Spark 会话操作。

参数

  • delta_table (DeltaTable):表示要重命名的 Delta 表的对象。
  • new_table_name (str):表的新名称。
  • table_location (str,可选):表存储的文件路径。如果未提供,则函数尝试从 DeltaTable 对象中推断位置。默认为 None
  • databricks (bool,可选):一个标志,表示函数的操作环境。如果是在 Databricks 中运行,则设置为 True,否则为 False。默认为 False
  • spark_session (pyspark.sql.SparkSession,可选):Spark 会话。当 databricks 设置为 True 时,这是必需的。默认为 None

返回

  • None

引发

  • TypeError:如果提供的 delta_table 不是一个 DeltaTable 对象,或者如果 databricks 设置为 Truespark_sessionNone

示例用法

rename_delta_table(existing_delta_table, "new_table_name")

字典

我们在这里使用了以下定义这里的术语。

  • 自然键:一个可以唯一标识行且存在于现实世界中的属性。
  • 代理键:一个可以唯一标识行且不存在于现实世界中的属性。
  • 复合键:多个属性组合在一起可以唯一标识一个行。
  • 主键:行的唯一标识符。
  • 候选键:可能作为主键的属性。
  • 备选键:不是主键的候选键。
  • 唯一键:可以在表上唯一的属性。也可以称为备选键。
  • 外键:用于引用另一个表中记录的属性。

项目维护者

项目理念

mack库旨在让常见的Delta Lake数据任务更简单。

当然,您不需要使用mack。您可以自己编写逻辑。

如果您不想在项目中添加依赖项,也可以轻松复制/粘贴mack中的函数。这个库中的函数特意设计为易于复制和粘贴。

让我们看看您可能想要将mack作为依赖项的一些原因。

提供良好的公共接口

公共接口(仅公共接口)通过mack命名空间提供。

当您运行import mack时,您可以访问全部公共接口。在mack命名空间中不暴露任何私有实现细节。

最小依赖

Mack仅依赖于Spark & Delta Lake。不会向Mack添加其他依赖项。

Spark用户利用了各种运行时,添加依赖项并不总是那么容易。您可以运行pip install mack,而无需担心解决大量的依赖项冲突。您还可以仅将mack wheel文件附加到集群以利用该项目。

为社区提供最佳实践示例

Mack力求成为PySpark / Delta Lake社区的优质代码库示例。

开源Delta Lake项目并不多。使用良好软件工程实践(如CI和单元测试)的项目就更少了。您可以使用mack来帮助指导您在专有代码库中的设计决策。

稳定的公共接口和1.0版本发布后的长期支持

Mack有权在1.0版本发布之前做出破坏性的公共接口更改。我们将始终尽可能地最小化破坏性更改。

1.0版本发布后,Mack将严格遵循语义版本化2.0,并在主要版本中仅进行破坏性的公共接口更改。希望1.0将是唯一的主要版本,并且不需要进行任何破坏性更改。

代码设计

以下是Mack中使用的部分代码设计原则

  • 我们尽可能避免使用类。类使得将代码的小块复制/粘贴到笔记本中变得困难。最好是停止编写类
  • 我们试图编写易于复制的函数。我们通过限制依赖于其他函数或类的函数来实现这一点。我们宁愿将单个使用函数嵌套在公共接口方法中,也不愿将其单独制作。
  • 先开发后抽象。所有代码都在单个文件中,直到正确的抽象变得明显。我们宁愿有一个大文件,也不愿有错误的抽象。

Docker环境

Dockerfiledocker-compose文件提供了使用mack进行运行和开发容器化的方式。

  • 第一次运行docker build --tag=mack .来构建镜像。
  • 要执行Docker容器内的单元测试,运行docker-compose up test
  • 要进入运行的Docker容器进行开发,运行docker run -it mack /bin/bash

社区

博客

视频

项目详情


下载文件

下载适合您平台的文件。如果您不确定选择哪个,请了解更多关于安装包的信息。

源分布

mack-0.5.0.tar.gz (17.0 kB 查看哈希值)

上传时间: 源码

构建的发行版

mack-0.5.0-py3-none-any.whl (12.2 kB 查看哈希值)

上传时间: Python 3

由以下支持