跳转到主要内容

InfluxDB 3.0社区Python客户端

项目描述

Your Image

PyPI version PyPI downloads CodeQL analysis CircleCI Code Cov Community Slack

InfluxDB 3.0 Python客户端

简介

influxdb_client_3是一个Python模块,提供了一种简单方便的方式与InfluxDB 3.0交互。该模块支持将数据写入InfluxDB,并使用Flight客户端查询数据,允许您在InfluxDB 3.0上执行SQL和InfluxQL查询。

我们提供了一个"入门:InfluxDB 3.0 Python客户端库"视频,介绍了如何使用库以及示例。

依赖关系

  • pyarrow(自动安装)
  • pandas(可选)

安装

您可以使用pip安装'influxdb3-python'。

pip install influxdb3-python

注意:此安装不包括Pandas支持。如果您想使用诸如to_pandas()write_file()等关键功能,您需要单独安装pandas

注意:请确保您使用的是3.6或更高版本。为了获得最佳性能,请使用3.11或更高版本。

使用方法

开始使用的一种最简单的方法是查看"宝可梦训练师食谱"。这个场景将引导您了解客户端库和Pyarrow的基础知识。

导入模块

from influxdb_client_3 import InfluxDBClient3, Point

初始化

如果您使用InfluxDB Cloud,请注意:

  1. 您需要提供组织ID,对于InfluxDB专用版则无需提供。
  2. 使用桶名称作为database参数。
client = InfluxDBClient3(token="your-token",
                         host="your-host",
                         org="your-org",
                         database="your-database")

写入数据

您可以使用Point类或提供行协议来写入数据。

使用Points

point = Point("measurement").tag("location", "london").field("temperature", 42)
client.write(point)

使用行协议

point = "measurement fieldname=0"
client.write(point)

从文件写入

用户可以从CSV、JSON、Feather、ORC、Parquet导入数据。

import influxdb_client_3 as InfluxDBClient3
import pandas as pd
import numpy as np
from influxdb_client_3 import write_client_options, WritePrecision, WriteOptions, InfluxDBError


class BatchingCallback(object):

    def success(self, conf, data: str):
        print(f"Written batch: {conf}, data: {data}")

    def error(self, conf, data: str, exception: InfluxDBError):
        print(f"Cannot write batch: {conf}, data: {data} due: {exception}")

    def retry(self, conf, data: str, exception: InfluxDBError):
        print(f"Retryable error occurs for batch: {conf}, data: {data} retry: {exception}")

callback = BatchingCallback()

write_options = WriteOptions(batch_size=500,
                                        flush_interval=10_000,
                                        jitter_interval=2_000,
                                        retry_interval=5_000,
                                        max_retries=5,
                                        max_retry_delay=30_000,
                                        exponential_base=2)

wco = write_client_options(success_callback=callback.success,
                          error_callback=callback.error,
                          retry_callback=callback.retry,
                          write_options=write_options
                        )

with  InfluxDBClient3.InfluxDBClient3(
    token="INSERT_TOKEN",
    host="eu-central-1-1.aws.cloud2.influxdata.com",
    org="6a841c0c08328fb1",
    database="python", write_client_options=wco) as client:


    client.write_file(
        file='./out.csv',
        timestamp_column='time', tag_columns=["provider", "machineID"])

    client.write_file(
        file='./out.json',
        timestamp_column='time', tag_columns=["provider", "machineID"], date_unit='ns' )

Pandas DF

client._write_api.write(bucket="pokemon-codex", record=pd_df, data_frame_measurement_name='caught', data_frame_tag_columns=['trainer', 'id', 'num'], data_frame_timestamp_column='timestamp')

Polars DF

client._write_api.write(bucket="pokemon-codex", record=pl_df, data_frame_measurement_name='caught', data_frame_tag_columns=['trainer', 'id', 'num'], data_frame_timestamp_column='timestamp')

查询

使用SQL查询

query = "select * from measurement"
reader = client.query(query=query, language="sql")
table = reader.read_all()
print(table.to_pandas().to_markdown())

使用influxql查询

query = "select * from measurement"
reader = client.query(query=query, language="influxql")
table = reader.read_all()
print(table.to_pandas().to_markdown())

Windows用户

目前,Windows用户在通过Flight原生查询时需要额外安装。这是由于gRPC无法找到Windows根证书。为了解决这个问题,请按照以下步骤操作:安装certifi

pip install certifi

接下来,在flight客户端选项中包含certifi

import influxdb_client_3 as InfluxDBClient3
import pandas as pd
import numpy as np
from influxdb_client_3 import flight_client_options
import certifi

fh = open(certifi.where(), "r")
cert = fh.read()
fh.close()


client = InfluxDBClient3.InfluxDBClient3(
    token="",
    host="b0c7cce5-8dbc-428e-98c6-7f996fb96467.a.influxdb.io",
    org="6a841c0c08328fb1",
    database="flightdemo",
    flight_client_options=flight_client_options(
        tls_root_certs=cert))


table = client.query(
    query="SELECT * FROM flight WHERE time > now() - 4h",
    language="influxql")

print(table.to_pandas())

您也可以通过这种方式包含您自己的根证书。

项目详情


下载文件

下载适用于您平台的文件。如果您不确定选择哪个,请了解更多关于安装包的信息。

源代码发行版

influxdb3_python-0.9.0.tar.gz (69.4 kB 查看哈希值)

上传时间: 源代码

构建发行版

influxdb3_python-0.9.0-py3-none-any.whl (81.1 kB 查看哈希值)

上传时间: Python 3

由以下支持

AWS AWS 云计算和安全赞助商 Datadog Datadog 监控 Fastly Fastly CDN Google Google 下载分析 Microsoft Microsoft PSF 赞助商 Pingdom Pingdom 监控 Sentry Sentry 错误记录 StatusPage StatusPage 状态页面