分布式数据任务数据连接器的统一
项目描述
Tentaclio
Python库,简化
- 处理不同协议(如
file:
、ftp:
、sftp:
、s3:
等)的流处理。 - 打开数据库连接。
- 管理分布式系统中的凭证。
设计中的主要考虑因素
- 易于使用:所有流都通过
tentaclio.open
打开,所有数据库连接通过tentaclio.db
。 - URL是基本资源定位器和数据库连接字符串。
- 对受保护资源的自动认证。
- 可扩展性:您可以添加自己的处理器以处理其他方案。
- Pandas交互。
快速示例。
读取和写入流。
import tentaclio
contents = "👋 🐙"
with tentaclio.open("ftp://localhost:2021/upload/file.txt", mode="w") as writer:
writer.write(contents)
# Using boto3 authentication under the hood.
bucket = "s3://my-bucket/octopus/hello.txt"
with tentaclio.open(bucket) as reader:
print(reader.read())
复制流
import tentaclio
tentaclio.copy("/home/constantine/data.csv", "sftp://constantine:tentacl3@sftp.octoenergy.com/uploads/data.csv")
删除资源
import tentaclio
tentaclio.remove("s3://my-bucket/octopus/the-9th-tentacle.txt")
列出资源
import tentaclio
for entry in tentaclio.listdir("s3:://mybucket/path/to/dir"):
print("Entry", entry)
认证资源。
import os
import tentaclio
print("env ftp credentials", os.getenv("OCTOIO__CONN__OCTOENERGY_FTP"))
# This prints `sftp://constantine:tentacl3@sftp.octoenergy.com/`
# Credentials get automatically injected.
with tentaclio.open("sftp://sftp.octoenergy.com/uploads/data.csv") as reader:
print(reader.read())
数据库连接。
import os
import tentaclio
print("env TENTACLIO__CONN__DB", os.getenv("TENTACLIO__CONN__DB"))
# This prints `postgresql://octopus:tentacle@localhost:5444/example`
# hostname is a wildcard, the credentials get injected.
with tentaclio.db("postgresql://hostname/example") as pg:
results = pg.query("select * from my_table")
Pandas交互。
import pandas as pd # 🐼🐼
import tentaclio # 🐙
df = pd.DataFrame([[1, 2, 3], [10, 20, 30]], columns=["col_1", "col_2", "col_3"])
bucket = "s3://my-bucket/data/pandas.csv"
with tentaclio.open(bucket, mode="w") as writer: # supports more pandas readers
df.to_csv(writer, index=False)
with tentaclio.open(bucket) as reader:
new_df = pd.read_csv(reader)
# another example: using pandas.DataFrame.to_sql() with tentaclio to upload
with tentaclio.db(
connection_info,
connect_args={'options': '-csearch_path=schema_name'}
) as client:
df.to_sql(
name='observations', # table name
con=client.conn,
)
安装
您可以使用pip或pipenv获取tentaclio。
pip install tentaclio
或pipenv
pipenv install tentaclio
开发。
克隆此存储库并安装pipenv
在Makefile
中,您会发现一些用于linting、测试等的有用目标,例如。
make test
如何使用
这是如何使用tentaclio
来满足您的日常数据摄取和存储需求。
流
为了打开用于加载数据或存储数据的流,通用的函数是
import tentaclio
with tentaclio.open("/path/to/my/file") as reader:
contents = reader.read()
with tentaclio.open("s3://bucket/file", mode='w') as writer:
writer.write(contents)
允许的模式是 r
、w
、rb
和 wb
。您可以使用 t
来代替 b
指示文本流,但这默认是有效的。
为了使 tentaclio 尽可能轻量级,它默认只包括 file
、ftp
、sftp
、http
和 https
协议。然而,通过安装额外的包,可以轻松获得更多。
默认
/local/file
file:///local/file
ftp://path/to/file
sftp://path/to/file
http://host.com/path/to/resource
https://host.com/path/to/resource
s3://bucket/file
gs://bucket/file
gsc://bucket/file
gdrive:/My Drive/file
googledrive:/My Drive/file
postgresql://host/database::table
允许您将 csv 格式的数据写入具有相同列名的数据库(注意:表名位于::
之后 :warning:)。
您可以为任何 URL 添加凭据以访问受保护资源。
您可以使用这些读取器和写入器与 pandas 函数一起使用
import pandas as pd
import tentaclio
with tentaclio.open("/path/to/my/file") as reader:
df = pd.read_csv(reader)
[...]
with tentaclio.open("s3::/path/to/my/file", mode='w') as writer:
df.to_parquet(writer)
Readers
、Writers
以及它们的可关闭版本可以在任何期望文件对象的地方使用;pandas 或 pickle 就是这样的函数示例。
关于为 Spark、Presto 和类似下游系统写入文件的说明
Python 中 open
上下文管理器的默认行为是在可写模式打开时创建一个空文件。如果创建数据的进程在 with
子句中产生空数据框并且没有任何内容被写入,这可能会很烦人。这会使 Spark 和 Presto 处于恐慌状态。
为了避免这种情况,我们可以使流 空安全,这样如果没有任何写入操作,就不会刷新空缓冲区,从而不会创建空文件。
with tio.make_empty_safe(tio.open("s3://bucket/file.parquet", mode="wb")) as writer:
if not df.empty:
df.to_parquet(writer)
类似文件系统的资源操作
列出资源
一些 URL 方案允许以 Pythonic 方式列出资源
import tentaclio
for entry in tentaclio.listdir("s3:://mybucket/path/to/dir"):
print("Entry", entry)
虽然 listdir
可能很方便,但我们还提供了 scandir
,它返回一个 DirEntry 列表,以及 walk
。所有函数尽可能接近标准库的定义。
数据库访问
要打开数据库连接,您可以使用 tentaclio.db
并立即访问 postgres、sqlite、athena 和 mssql。
import tentaclio
[...]
query = "select 1";
with tentaclio.db(POSTGRES_TEST_URL) as client:
result =client.query(query)
[...]
支持的数据库方案包括
默认
sqlite://
mssql://
-
- 由 sqlalchemy 支持的任何其他方案。
postgresql://
awsathena+rest://
databricks+thrift://
snowflake://
数据库的额外功能
对于 postgres,您可以设置变量 TENTACLIO__PG_APPLICATION_NAME
,连接到数据库时将注入该值。
自动凭据注入
-
通过使用以
TENTACLIO__CONN__
为前缀的环境变量来配置凭据(例如,TENTACLIO__CONN__DATA_FTP=sfpt://real_user:132ldsf@ftp.octoenergy.com
)。 -
打开流
with tentaclio.open("sftp://ftp.octoenergy.com/file.csv") as reader:
reader.read()
凭据被注入到 URL 中。
- 打开数据库客户端
import tentaclio
with tentaclio.db("postgresql://hostname/my_data_base") as client:
client.query("select 1")
请注意,URL 中要认证的 hostname
是一个通配符,它将匹配任何主机名。因此,如果存在 http://user:pass@octo.co/
的凭据,则 authenticate("http://hostname/file.txt")
将被注入到 http://user:pass@octo.co/file.txt
。
URL 的不同部分以不同的方式设置
- 方案和路径将从 URL 中设置,如果缺失则为空。
- 用户名、密码和主机名将从存储的凭据中设置。
- 如果存在,端口将从存储的凭据中设置,否则从 URL 中设置。
- 如果存在,查询将从 URL 中设置,否则从存储的凭据中设置(因此可以重写)
凭据文件
您还可以设置一个类似以下格式的凭据文件
secrets:
db_1: postgresql://user1:pass1@myhost.com/database_1
db_2: mssql://user2:pass2@otherhost.com/database_2?driver=ODBC+Driver+17+for+SQL+Server
ftp_server: ftp://fuser:fpass@ftp.myhost.com
并通过设置环境变量 TENTACLIO__SECRETS_FILE
使其可供 tentaclio 使用。每个 URL 的实际名称用于跟踪,对功能没有任何影响。
(注意:您可能需要在 mssql 数据库连接字符串中添加 ?driver={driver from /usr/local/etc/odbcinst.ini}
;请参见上面示例)
或者,您可以通过运行 curl https://raw.githubusercontent.com/octoenergy/tentaclio/master/extras/init_tentaclio.sh
来创建 ~/.tentaclio.yml
中的密钥文件并自动配置您的环境。
环境变量可以通过使用 ${ENV_VARIABLE}
如下方式包含在凭据文件中
secrets:
db: postgresql://${DB_USER}:${DB_PASS}@myhost.com/database
Tentaclio 将在环境中搜索 DB_USER
和 DB_PASS
,并将它们的值与密钥文件内容进行插值。
关于协议结构子类型的简要说明。
为了从数据相关函数的实现中抽象出具体依赖项(或系统任何部分实际上),我们使用类型化的 协议。这比使用子类化或 更复杂的方法 提供了更灵活的依赖注入。这种想法深受在 go 中如何做这一点的启发。在我们的 技术博客 中了解更多关于此原则的信息。
项目详情
下载文件
下载您平台上的文件。如果您不确定选择哪个,请了解更多关于 安装软件包 的信息。