跳转到主要内容

未提供项目描述

项目描述

PyCodeHash

Build Latest Github release GitHub release date Ruff Downloads

数据管道在数据科学、工程和分析中至关重要。通常,管道中有一部分没有改变。重新计算这些节点是浪费的,特别是对于大型数据集。PyCodeHash是一个通用的数据和代码哈希库,它简化了下游缓存。

🚩以下两个函数的hash_func输出相同:38d6e9f262ab77f6536e14c74be687ce2cb44cdebb7045e5b2f51946215cf4d0! 🚩

在[文档网站](https://pycodehash.github.io/pycodehash/)上了解更多。

def func(data, key_col, value_col, **kwargs):
    if not isinstance(key_col, str) or not isinstance(value_col, str):
        raise TypeError(
            f"Column names must be strings, got {key_col}:{type(key_col)} and {value_col}:{type(value_col)}"
        )

    reserved_col = "index"
    if reserved_col in (key_col, value_col):
        raise ValueError(f"Reserved keyword: `{reserved_col}`")

    data = data[~data.isnull().any(axis=1)].copy()
    data[key_col] = data[key_col].astype(int)

    column_names = [key_col, value_col]
    for column_name in column_names:
        print(f"Unique values in {column_name}", list(data[column_name].unique()))

    return dict(zip(data[key_col], data[value_col]))

示例1:一个创建pandas DataFrame中两列映射的函数的实现。哈希:38d6e9f262ab77f6536e14c74be687ce2cb44cdebb7045e5b2f51946215cf4d0

from __future__ import annotations

import logging  # on purpose unused import

import pandas as pd


def create_df_mapping(data: pd.DataFrame, key_col: str, value_col: str, **kwargs) -> dict[int, str]:
    """Example function to demonstrate PyCodeHash.
    This function takes a pandas DataFrame and two column names, and turns them into a dictionary.

    Args:
        data: DataFrame containing the data
        key_col: column
    """
    legacy_variable = None
    if not isinstance(key_col, str) or not isinstance(value_col, str):
        raise TypeError(
            "Column names must be strings, got {key_col}:{key_type} and {value_col}:{value_type}".format(
                key_col=key_col,
                key_type=type(key_col),
                value_col=value_col,
                value_type=type(value_col),
            )
        )
    else:
        reserved_col = str("index")
        if key_col == reserved_col:
            raise ValueError("Reserved keyword: `{}`".format(reserved_col))
        elif value_col == reserved_col:
            raise ValueError("Reserved keyword: `{}`".format(reserved_col))

        data = data[~data.isnull().any(axis=1)].copy()
        data[key_col] = data[key_col].astype(int)

        column_names = [key_col, value_col]
        for index, column_name in enumerate(column_names):
            print(f"Unique values in {column_names[index]}", list(data[column_names[index]].unique()))

        return {
            key: value
            for key, value in zip(data[key_col], data[value_col])
        }

示例2:上面代码片段的另一种实现。哈希:38d6e9f262ab77f6536e14c74be687ce2cb44cdebb7045e5b2f51946215cf4d0

检测数据管道中的更改

检查两个事物是否相等的标准方式是比较它们的哈希值。在[PyCodeHash检测Python函数、SQL查询和数据集更改](https://pycodehash.github.io/pycodehash/datasets/)上了解更多。

安装

PyCodeHash可在[PyPI](https://pypi.ac.cn/project/pycodehash/)上找到

pip install pycodehash

示例

Python

使用FunctionHasher获取Python函数对象的哈希值

from pycodehash import FunctionHasher
from tliba import compute_moments
from tliba.etl import add_bernoulli_samples, combine_random_samples

fh = FunctionHasher()
# Hash the function `add_bernoulli_samples`
h1 = fh.hash_func(add_bernoulli_samples)
print("Hash for `add_bernoulli_samples`", h1)

# Hash the function `compute_moments`
h2 = fh.hash_func(compute_moments)
print("Hash for `compute_moments`", h2)

# Hash the function `combine_random_samples`
h3 = fh.hash_func(combine_random_samples)
print("Hash for `combine_random_samples`", h3)

Python使用示例

SQL

使用SQLHasher对SQL查询和文件进行哈希处理(需要pip install pycodehash[sql]

from pathlib import Path

from pycodehash.sql.sql_hasher import SQLHasher

# First query
query_1 = "SELECT * FROM db.templates"

# The second query is equivalent, but has additional whitespace
query_2 = "SELECT\n    * \nFROM \n    db.templates"

# Write the second query to a file
query_2_file = Path("/tmp/query.sql")
query_2_file.write_text(query_2)

# Create the SQLHasher object for SparkSQL
hasher = SQLHasher(dialect="sparksql")

# We can hash a string
print(hasher.hash_query(query_1))

# Or pass a path
print(hasher.hash_file(query_2_file))

SQL使用示例

数据集

对数据进行哈希处理,例如文件、目录、数据库表

from pathlib import Path

from pycodehash.datasets import LocalFileHash, LocalDirectoryHash


# Hash a single file
fh = LocalFileHash()

print(fh.collect_metadata("example.py"))
# {'last_modified': datetime.datetime(2023, 11, 24, 23, 38, 17, 524024), 'size': 543}

print(fh.compute_hash("example.py"))
# 6189721d3ecdf86503a82c07eed82743069ebbf39e974f33ca684809e67e9e0e

# Hash a directory
dh = LocalDirectoryHash()

# Recursively hash all files in a directory
print(len(dh.collect_partitions(Path(__file__).parent / "src")))
# 29

数据集使用示例

Python包依赖关系

对代码依赖的Python包列表进行哈希处理。这可能只是依赖关系列表的一部分。例如,代码依赖的最重要库,您希望跟踪以触发在版本更改时重新运行管道。哈希器检索已安装的包版本并创建这些版本的哈希值。我们强调,提供相关依赖关系列表的责任在于用户。

from pycodehash.dependency import PythonDependencyHash

# hash a list of dependencies
hasher = PythonDependencyHash()

print(hasher.collect_metadata(dependencies=["pycodehash", "rope"], add_python_version=True))
# hasher retrieves the installed package versions found
# {'pycodehash': '0.2.0', 'rope': '1.11.0', 'Python': '3.11'}

print(hasher.compute_hash(dependencies=["pycodehash", "rope"], add_python_version=True))
# cecb8036ad61235c2577db9943f519b824f7a25e449da9cd332bc600fb5dccf0

依赖关系使用示例

许可证

PyCodeHash是完全免费的、开源的,并使用MIT许可证

项目详情


下载文件

下载您平台上的文件。如果您不确定选择哪个,请了解有关安装包的更多信息。

源分布

pycodehash-0.7.0.tar.gz (48.9 kB 查看哈希值)

上传时间

构建分布

pycodehash-0.7.0-py3-none-any.whl (34.1 kB 查看哈希值)

上传时间 Python 3

由以下机构支持

AWSAWS云计算和安全赞助商DatadogDatadog监控FastlyFastlyCDNGoogleGoogle下载分析MicrosoftMicrosoftPSF赞助商PingdomPingdom监控SentrySentry错误日志StatusPageStatusPage状态页面