跳转到主要内容

一致读取/写入[批量、在线、流]数据的数据集SDK。

项目描述

Tests Coverage Status Binder

欢迎使用zdatasets

==================================================

待办事项

import pandas as pd
from metaflow import FlowSpec, step

from zdatasets import Dataset, Mode
from zdatasets.metaflow import DatasetParameter
from zdatasets.plugins import BatchOptions


# Can also invoke from CLI:
#  > python zdatasets/tutorials/0_hello_dataset_flow.py run \
#    --hello_dataset '{"name": "HelloDataset", "mode": "READ_WRITE", \
#    "options": {"type": "BatchOptions", "partition_by": "region"}}'
class HelloDatasetFlow(FlowSpec):
    hello_dataset = DatasetParameter(
        "hello_dataset",
        default=Dataset("HelloDataset", mode=Mode.READ_WRITE, options=BatchOptions(partition_by="region")),
    )

    @step
    def start(self):
        df = pd.DataFrame({"region": ["A", "A", "A", "B", "B", "B"], "zpid": [1, 2, 3, 4, 5, 6]})
        print("saving data_frame: \n", df.to_string(index=False))

        # Example of writing to a dataset
        self.hello_dataset.write(df)

        # save this as an output dataset
        self.output_dataset = self.hello_dataset

        self.next(self.end)

    @step
    def end(self):
        print(f"I have dataset \n{self.output_dataset=}")

        # output_dataset to_pandas(partitions=dict(region="A")) only
        df: pd.DataFrame = self.output_dataset.to_pandas(partitions=dict(region="A"))
        print('self.output_dataset.to_pandas(partitions=dict(region="A")):')
        print(df.to_string(index=False))


if __name__ == "__main__":
    HelloDatasetFlow()

项目详情


下载文件

下载适用于您的平台的文件。如果您不确定要选择哪个,请了解更多关于安装包的信息。

源分布

zdatasets-1.2.5.tar.gz (54.8 kB 查看哈希值)

上传时间

构建分布

zdatasets-1.2.5-py3-none-any.whl (84.7 kB 查看哈希值)

上传时间 Python 3

支持