跳转到主要内容

分布式数据任务数据连接器的统一

项目描述

Tentaclio

CircleCI status Documentation Status

Python库,简化

  • 处理不同协议(如 file:ftp:sftp:s3: 等)的流处理。
  • 打开数据库连接。
  • 管理分布式系统中的凭证。

设计中的主要考虑因素

  • 易于使用:所有流都通过 tentaclio.open 打开,所有数据库连接通过 tentaclio.db
  • URL是基本资源定位器和数据库连接字符串。
  • 对受保护资源的自动认证。
  • 可扩展性:您可以添加自己的处理器以处理其他方案。
  • Pandas交互。

快速示例。

读取和写入流。

import tentaclio
contents = "👋 🐙"

with tentaclio.open("ftp://localhost:2021/upload/file.txt", mode="w") as writer:
    writer.write(contents)

# Using boto3 authentication under the hood.
bucket = "s3://my-bucket/octopus/hello.txt"
with tentaclio.open(bucket) as reader:
    print(reader.read())

复制流

import tentaclio

tentaclio.copy("/home/constantine/data.csv", "sftp://constantine:tentacl3@sftp.octoenergy.com/uploads/data.csv")

删除资源

import tentaclio

tentaclio.remove("s3://my-bucket/octopus/the-9th-tentacle.txt")

列出资源

import tentaclio

for entry in tentaclio.listdir("s3:://mybucket/path/to/dir"):
    print("Entry", entry)

认证资源。

import os

import tentaclio

print("env ftp credentials", os.getenv("OCTOIO__CONN__OCTOENERGY_FTP"))
# This prints `sftp://constantine:tentacl3@sftp.octoenergy.com/`

# Credentials get automatically injected.

with tentaclio.open("sftp://sftp.octoenergy.com/uploads/data.csv") as reader:
    print(reader.read())

数据库连接。

import os

import tentaclio

print("env TENTACLIO__CONN__DB", os.getenv("TENTACLIO__CONN__DB"))

# This prints `postgresql://octopus:tentacle@localhost:5444/example`

# hostname is a wildcard, the credentials get injected.
with tentaclio.db("postgresql://hostname/example") as pg:
    results = pg.query("select * from my_table")

Pandas交互。

import pandas as pd  # 🐼🐼
import tentaclio  # 🐙

df = pd.DataFrame([[1, 2, 3], [10, 20, 30]], columns=["col_1", "col_2", "col_3"])

bucket = "s3://my-bucket/data/pandas.csv"

with tentaclio.open(bucket, mode="w") as writer:  # supports more pandas readers
    df.to_csv(writer, index=False)

with tentaclio.open(bucket) as reader:
    new_df = pd.read_csv(reader)

# another example: using pandas.DataFrame.to_sql() with tentaclio to upload
with tentaclio.db(
        connection_info,
        connect_args={'options': '-csearch_path=schema_name'}
    ) as client:
    df.to_sql(
        name='observations', # table name
        con=client.conn,
    )

安装

您可以使用pip或pipenv获取tentaclio。

pip install tentaclio

或pipenv

pipenv install tentaclio

开发。

克隆此存储库并安装pipenv

Makefile中,您会发现一些用于linting、测试等的有用目标,例如。

make test

如何使用

这是如何使用tentaclio来满足您的日常数据摄取和存储需求。

为了打开用于加载数据或存储数据的流,通用的函数是

import tentaclio

with tentaclio.open("/path/to/my/file") as reader:
    contents = reader.read()

with tentaclio.open("s3://bucket/file", mode='w') as writer:
    writer.write(contents)

允许的模式是 rwrbwb。您可以使用 t 来代替 b 指示文本流,但这默认是有效的。

为了使 tentaclio 尽可能轻量级,它默认只包括 fileftpsftphttphttps 协议。然而,通过安装额外的包,可以轻松获得更多。

默认

  • /local/file
  • file:///local/file
  • ftp://path/to/file
  • sftp://path/to/file
  • http://host.com/path/to/resource
  • https://host.com/path/to/resource

tentaclio-s3

  • s3://bucket/file

tentaclio-gs

  • gs://bucket/file
  • gsc://bucket/file

tentaclio-gdrive

  • gdrive:/My Drive/file
  • googledrive:/My Drive/file

tentaclio-postgres

  • postgresql://host/database::table 允许您将 csv 格式的数据写入具有相同列名的数据库(注意:表名位于 :: 之后 :warning:)。

您可以为任何 URL 添加凭据以访问受保护资源。

您可以使用这些读取器和写入器与 pandas 函数一起使用

import pandas as pd
import tentaclio

with tentaclio.open("/path/to/my/file") as reader:
    df = pd.read_csv(reader)

[...]

with tentaclio.open("s3::/path/to/my/file", mode='w') as writer:
    df.to_parquet(writer)

ReadersWriters 以及它们的可关闭版本可以在任何期望文件对象的地方使用;pandas 或 pickle 就是这样的函数示例。

关于为 Spark、Presto 和类似下游系统写入文件的说明

Python 中 open 上下文管理器的默认行为是在可写模式打开时创建一个空文件。如果创建数据的进程在 with 子句中产生空数据框并且没有任何内容被写入,这可能会很烦人。这会使 Spark 和 Presto 处于恐慌状态。

为了避免这种情况,我们可以使流 空安全,这样如果没有任何写入操作,就不会刷新空缓冲区,从而不会创建空文件。

with tio.make_empty_safe(tio.open("s3://bucket/file.parquet", mode="wb")) as writer:
    if not df.empty:
        df.to_parquet(writer)

类似文件系统的资源操作

列出资源

一些 URL 方案允许以 Pythonic 方式列出资源

import tentaclio

for entry in tentaclio.listdir("s3:://mybucket/path/to/dir"):
    print("Entry", entry)

虽然 listdir 可能很方便,但我们还提供了 scandir,它返回一个 DirEntry 列表,以及 walk。所有函数尽可能接近标准库的定义。

数据库访问

要打开数据库连接,您可以使用 tentaclio.db 并立即访问 postgres、sqlite、athena 和 mssql。

import tentaclio

[...]

query = "select 1";
with tentaclio.db(POSTGRES_TEST_URL) as client:
    result =client.query(query)
[...]

支持的数据库方案包括

默认

  • sqlite://
  • mssql://
    • 由 sqlalchemy 支持的任何其他方案。

tentaclio-postgres

  • postgresql://

tentaclio-athena

  • awsathena+rest://

tentaclio-databricks

  • databricks+thrift://

tentaclio-snowflake

  • snowflake://

数据库的额外功能

对于 postgres,您可以设置变量 TENTACLIO__PG_APPLICATION_NAME,连接到数据库时将注入该值。

自动凭据注入

  1. 通过使用以 TENTACLIO__CONN__ 为前缀的环境变量来配置凭据(例如,TENTACLIO__CONN__DATA_FTP=sfpt://real_user:132ldsf@ftp.octoenergy.com)。

  2. 打开流

with tentaclio.open("sftp://ftp.octoenergy.com/file.csv") as reader:
    reader.read()

凭据被注入到 URL 中。

  1. 打开数据库客户端
import tentaclio

with tentaclio.db("postgresql://hostname/my_data_base") as client:
    client.query("select 1")

请注意,URL 中要认证的 hostname 是一个通配符,它将匹配任何主机名。因此,如果存在 http://user:pass@octo.co/ 的凭据,则 authenticate("http://hostname/file.txt") 将被注入到 http://user:pass@octo.co/file.txt

URL 的不同部分以不同的方式设置

  • 方案和路径将从 URL 中设置,如果缺失则为空。
  • 用户名、密码和主机名将从存储的凭据中设置。
  • 如果存在,端口将从存储的凭据中设置,否则从 URL 中设置。
  • 如果存在,查询将从 URL 中设置,否则从存储的凭据中设置(因此可以重写)

凭据文件

您还可以设置一个类似以下格式的凭据文件

secrets:
    db_1: postgresql://user1:pass1@myhost.com/database_1
    db_2: mssql://user2:pass2@otherhost.com/database_2?driver=ODBC+Driver+17+for+SQL+Server
    ftp_server: ftp://fuser:fpass@ftp.myhost.com

并通过设置环境变量 TENTACLIO__SECRETS_FILE 使其可供 tentaclio 使用。每个 URL 的实际名称用于跟踪,对功能没有任何影响。

(注意:您可能需要在 mssql 数据库连接字符串中添加 ?driver={driver from /usr/local/etc/odbcinst.ini};请参见上面示例)

或者,您可以通过运行 curl https://raw.githubusercontent.com/octoenergy/tentaclio/master/extras/init_tentaclio.sh 来创建 ~/.tentaclio.yml 中的密钥文件并自动配置您的环境。

环境变量可以通过使用 ${ENV_VARIABLE} 如下方式包含在凭据文件中

secrets:
    db: postgresql://${DB_USER}:${DB_PASS}@myhost.com/database

Tentaclio 将在环境中搜索 DB_USERDB_PASS,并将它们的值与密钥文件内容进行插值。

关于协议结构子类型的简要说明。

为了从数据相关函数的实现中抽象出具体依赖项(或系统任何部分实际上),我们使用类型化的 协议。这比使用子类化或 更复杂的方法 提供了更灵活的依赖注入。这种想法深受在 go 中如何做这一点的启发。在我们的 技术博客 中了解更多关于此原则的信息。

项目详情


下载文件

下载您平台上的文件。如果您不确定选择哪个,请了解更多关于 安装软件包 的信息。

源代码分发

tentaclio-1.3.0.tar.gz (33.8 kB 查看哈希)

源代码

构建分发

tentaclio-1.3.0-py2.py3-none-any.whl (39.9 kB 查看哈希)

上传于 Python 2 Python 3

由以下组织支持