InfluxDB 3.0社区Python客户端
项目描述
InfluxDB 3.0 Python客户端
简介
influxdb_client_3
是一个Python模块,提供了一种简单方便的方式与InfluxDB 3.0交互。该模块支持将数据写入InfluxDB,并使用Flight客户端查询数据,允许您在InfluxDB 3.0上执行SQL和InfluxQL查询。
我们提供了一个"入门:InfluxDB 3.0 Python客户端库"视频,介绍了如何使用库以及示例。
依赖关系
pyarrow
(自动安装)pandas
(可选)
安装
您可以使用pip
安装'influxdb3-python'。
pip install influxdb3-python
注意:此安装不包括Pandas支持。如果您想使用诸如to_pandas()
和write_file()
等关键功能,您需要单独安装pandas
。
注意:请确保您使用的是3.6或更高版本。为了获得最佳性能,请使用3.11或更高版本。
使用方法
开始使用的一种最简单的方法是查看"宝可梦训练师食谱"。这个场景将引导您了解客户端库和Pyarrow的基础知识。
导入模块
from influxdb_client_3 import InfluxDBClient3, Point
初始化
如果您使用InfluxDB Cloud,请注意:
- 您需要提供组织ID,对于InfluxDB专用版则无需提供。
- 使用桶名称作为
database
参数。
client = InfluxDBClient3(token="your-token",
host="your-host",
org="your-org",
database="your-database")
写入数据
您可以使用Point类或提供行协议来写入数据。
使用Points
point = Point("measurement").tag("location", "london").field("temperature", 42)
client.write(point)
使用行协议
point = "measurement fieldname=0"
client.write(point)
从文件写入
用户可以从CSV、JSON、Feather、ORC、Parquet导入数据。
import influxdb_client_3 as InfluxDBClient3
import pandas as pd
import numpy as np
from influxdb_client_3 import write_client_options, WritePrecision, WriteOptions, InfluxDBError
class BatchingCallback(object):
def success(self, conf, data: str):
print(f"Written batch: {conf}, data: {data}")
def error(self, conf, data: str, exception: InfluxDBError):
print(f"Cannot write batch: {conf}, data: {data} due: {exception}")
def retry(self, conf, data: str, exception: InfluxDBError):
print(f"Retryable error occurs for batch: {conf}, data: {data} retry: {exception}")
callback = BatchingCallback()
write_options = WriteOptions(batch_size=500,
flush_interval=10_000,
jitter_interval=2_000,
retry_interval=5_000,
max_retries=5,
max_retry_delay=30_000,
exponential_base=2)
wco = write_client_options(success_callback=callback.success,
error_callback=callback.error,
retry_callback=callback.retry,
write_options=write_options
)
with InfluxDBClient3.InfluxDBClient3(
token="INSERT_TOKEN",
host="eu-central-1-1.aws.cloud2.influxdata.com",
org="6a841c0c08328fb1",
database="python", write_client_options=wco) as client:
client.write_file(
file='./out.csv',
timestamp_column='time', tag_columns=["provider", "machineID"])
client.write_file(
file='./out.json',
timestamp_column='time', tag_columns=["provider", "machineID"], date_unit='ns' )
Pandas DF
client._write_api.write(bucket="pokemon-codex", record=pd_df, data_frame_measurement_name='caught', data_frame_tag_columns=['trainer', 'id', 'num'], data_frame_timestamp_column='timestamp')
Polars DF
client._write_api.write(bucket="pokemon-codex", record=pl_df, data_frame_measurement_name='caught', data_frame_tag_columns=['trainer', 'id', 'num'], data_frame_timestamp_column='timestamp')
查询
使用SQL查询
query = "select * from measurement"
reader = client.query(query=query, language="sql")
table = reader.read_all()
print(table.to_pandas().to_markdown())
使用influxql查询
query = "select * from measurement"
reader = client.query(query=query, language="influxql")
table = reader.read_all()
print(table.to_pandas().to_markdown())
Windows用户
目前,Windows用户在通过Flight原生查询时需要额外安装。这是由于gRPC无法找到Windows根证书。为了解决这个问题,请按照以下步骤操作:安装certifi
pip install certifi
接下来,在flight客户端选项中包含certifi
import influxdb_client_3 as InfluxDBClient3
import pandas as pd
import numpy as np
from influxdb_client_3 import flight_client_options
import certifi
fh = open(certifi.where(), "r")
cert = fh.read()
fh.close()
client = InfluxDBClient3.InfluxDBClient3(
token="",
host="b0c7cce5-8dbc-428e-98c6-7f996fb96467.a.influxdb.io",
org="6a841c0c08328fb1",
database="flightdemo",
flight_client_options=flight_client_options(
tls_root_certs=cert))
table = client.query(
query="SELECT * FROM flight WHERE time > now() - 4h",
language="influxql")
print(table.to_pandas())
您也可以通过这种方式包含您自己的根证书。
项目详情
下载文件
下载适用于您平台的文件。如果您不确定选择哪个,请了解更多关于安装包的信息。
源代码发行版
influxdb3_python-0.9.0.tar.gz (69.4 kB 查看哈希值)
构建发行版
influxdb3_python-0.9.0-py3-none-any.whl (81.1 kB 查看哈希值)