跳转到主要内容

MQTT版本5.0/3.1.1客户端类

项目描述

本文档描述了Eclipse Paho MQTT Python客户端库的源代码,该库实现了MQTT协议的5.0、3.1.1和3.1版本。

此代码提供了一个客户端类,允许应用程序连接到MQTT代理来发布消息,并订阅主题以接收发布的消息。它还提供了一些辅助函数,使得向MQTT服务器发布一次性消息变得非常简单。

它支持Python 2.7.9+或3.5+。

MQTT协议是一种机器到机器(M2M)/“物联网”连接协议。设计为一个极轻量级的发布/订阅消息传输,它适用于需要小代码足迹和/或网络带宽受限的远程位置连接。

Paho是Eclipse基金会的一个项目。

内容

安装

最新稳定版本可在Python包索引(PyPi)中获取,并可以使用以下方式安装:

pip install paho-mqtt

或者使用 virtualenv

virtualenv paho-mqtt
source paho-mqtt/bin/activate
pip install paho-mqtt

要获取完整代码,包括示例和测试,您可以从git仓库克隆

git clone https://github.com/eclipse/paho.mqtt.python

一旦您有了代码,也可以从您的仓库中安装

cd paho.mqtt.python
python setup.py install

要执行所有测试(包括MQTT v5测试),您还需要在paho.mqtt.python文件夹中克隆paho.mqtt.testing

git clone https://github.com/eclipse/paho.mqtt.testing.git

已知限制

以下是一些已知未实现MQTT功能。

当clean_session为False时,会话仅存储在内存中,不会持久化。这意味着当客户端重启时(不仅仅是重新连接,通常是因为程序重启),会话会丢失。这可能导致消息丢失。

以下部分客户端会话丢失

  • 从服务器接收的QoS 2消息,但尚未完全确认。

    由于客户端会盲目确认任何PUBCOMP(QoS 2事务的最后一个消息),因此它不会挂起,但会丢失此QoS 2消息。

  • 已发送到服务器的QoS 1和QoS 2消息,但尚未完全确认。

    这意味着传递给publish()的消息可能会丢失。可以通过确保所有传递给publish()的消息都有一个相应的on_publish()调用来减轻这种情况。

    这也意味着代理可能具有会话中的Qos2消息。由于客户端以空会话开始,它不知道这一点,并且会重新使用mid。这尚未修复。

此外,当clean_session为True时,此库将在网络重新连接时重新发布QoS > 0消息。这意味着QoS > 0消息不会丢失。但是,标准规定我们应该丢弃任何已发送发布包的消息。我们的选择意味着我们不遵守标准,QoS 2可能被接收两次。如果您需要QoS 2的仅一次投递保证,则应使用clean_session = False。

用法和API

详细的API文档可通过 pydoc 获取。示例在 examples 目录中可用。

此包提供两个模块,一个是完整客户端,一个是简单发布的辅助工具。

入门指南

以下是一个非常简单的示例,它订阅到代理$SYS主题树,并打印出结果消息

import paho.mqtt.client as mqtt

# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, flags, rc):
    print("Connected with result code "+str(rc))

    # Subscribing in on_connect() means that if we lose the connection and
    # reconnect then subscriptions will be renewed.
    client.subscribe("$SYS/#")

# The callback for when a PUBLISH message is received from the server.
def on_message(client, userdata, msg):
    print(msg.topic+" "+str(msg.payload))

client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message

client.connect("mqtt.eclipse.org", 1883, 60)

# Blocking call that processes network traffic, dispatches callbacks and
# handles reconnecting.
# Other loop*() functions are available that give a threaded interface and a
# manual interface.
client.loop_forever()

客户端

您可以将客户端类作为一个实例使用,在一个类中或通过子类化。一般使用流程如下:

  • 创建一个客户端实例

  • 使用其中一个 connect*() 函数连接到代理

  • 调用其中一个 loop*() 函数以保持与代理的网络流量

  • 使用 subscribe() 订阅到主题并接收消息

  • 使用 publish() 将消息发布到代理

  • 使用 disconnect() 从代理断开连接

回调将被调用以允许应用程序根据需要处理事件。以下将描述这些回调。

构造函数/重新初始化

Client()
Client(client_id="", clean_session=True, userdata=None, protocol=MQTTv311, transport="tcp")

Client() 构造函数接受以下参数:

client_id

在连接到代理时使用的唯一客户端ID字符串。如果client_id为空或None,则将随机生成一个。在这种情况下,clean_session参数必须为True

clean_session

一个布尔值,用于确定客户端类型。如果为True,则在客户端断开连接时,代理将删除关于此客户端的所有信息。如果为False,则客户端是持久客户端,在客户端断开连接时将保留订阅信息和队列中的消息。

请注意,客户端在断开连接时永远不会丢弃自己的出站消息。调用connect()或reconnect()会导致消息重新发送。使用reinitialise()将客户端重置为其原始状态。

userdata

用户定义的任何类型的数据,作为userdata参数传递给回调函数。可以在稍后使用user_data_set()函数更新它。

protocol

此客户端使用的MQTT协议版本。可以是MQTTv31MQTTv311

transport

设置为“websockets”以通过WebSockets发送MQTT。保留默认值“tcp”以使用原始TCP。

构造函数示例
import paho.mqtt.client as mqtt

mqttc = mqtt.Client()
reinitialise()
reinitialise(client_id="", clean_session=True, userdata=None)

reinitialise()函数将客户端重置为其起始状态,就像刚刚创建一样。它接受与Client()构造函数相同的参数。

重置示例
mqttc.reinitialise()

选项函数

这些函数表示可以在客户端上设置的选项,以修改其行为。在大多数情况下,这必须在连接到代理之前完成。

max_inflight_messages_set()
max_inflight_messages_set(self, inflight)

设置一次可以部分完成网络流的QoS>0消息的最大数量。

默认值为20。增加此值将消耗更多内存,但可以增加吞吐量。

max_queued_messages_set()
max_queued_messages_set(self, queue_size)

设置可以处于待发队列中的QoS>0出站消息的最大数量。

默认值为0。0表示无限。当队列满时,任何进一步的出站消息都会被丢弃。

message_retry_set()
message_retry_set(retry)

设置在代理没有响应时,QoS>0消息重试前的秒数。

默认设置为5秒,通常不需要更改。

ws_set_options()
ws_set_options(self, path="/mqtt", headers=None)

设置Websocket连接选项。只有当在Client()构造函数中传递了transport="websockets"时,这些选项才会被使用。

path

在代理上使用的mqtt路径。

headers

指定附加到标准Websocket头部的额外头部的字典,或者是一个可调用对象,它接受正常的Websocket头并返回一个新的字典,其中包含连接到代理的头信息。

必须在connect*()之前调用。如何在examples文件夹中使用此功能与AWS IoT平台的一个示例。

tls_set()
tls_set(ca_certs=None, certfile=None, keyfile=None, cert_reqs=ssl.CERT_REQUIRED,
    tls_version=ssl.PROTOCOL_TLS, ciphers=None)

配置网络加密和身份验证选项。启用SSL/TLS支持。

ca_certs

指向要由客户端视为可信的证书机构证书文件字符串路径。如果这是唯一提供的选项,则客户端将类似于网络浏览器运行。也就是说,它将要求代理的证书由ca_certs中的证书机构签名,并使用TLS v1进行通信,但不会尝试任何形式的身份验证。这提供了基本的网络加密,但这可能不足以根据代理的配置方式。默认情况下,在Python 2.7.9+或3.4+上使用系统的默认证书机构。在较旧的Python版本中,此参数是必需的。

证书文件,密钥文件

指向PEM编码的客户证书和私钥的字符串。如果这些参数不是None,则它们将被用作基于TLS的认证的客户信息。此功能的支持取决于代理。注意,如果这些文件之一被加密并且需要密码来解密,Python将在命令行中请求密码。目前无法定义回调以提供密码。

cert_reqs

定义客户端对代理施加的证书要求。默认情况下是ssl.CERT_REQUIRED,这意味着代理必须提供证书。有关此参数的更多信息,请参阅ssl pydoc。

tls_version

指定要使用的SSL/TLS协议版本。默认情况下(如果Python版本支持),将检测最高TLS版本。如果不可用,则使用TLS v1。以前的版本(所有以SSL开始的版本)是可能的,但不推荐使用,因为这些版本可能存在安全风险。

ciphers

一个字符串,指定允许此连接的加密加密套件,或使用None以使用默认设置。有关更多信息,请参阅ssl pydoc。

必须在调用connect*()之前调用。

tls_set_context()
tls_set_context(context=None)

配置网络加密和认证上下文。启用SSL/TLS支持。

context

一个ssl.SSLContext对象。默认情况下,如果可用(自Python 3.4起添加),则由ssl.create_default_context()提供。

如果您不确定是否使用此方法,则可以使用默认上下文或使用tls_set方法。有关更多信息,请参阅ssl模块文档中关于安全考虑的部分。

必须在调用connect*()之前调用。

tls_insecure_set()
tls_insecure_set(value)

配置服务器证书中服务器主机名的验证。

如果value设置为True,则无法保证您连接到的主机不是在冒充您的服务器。这在初始服务器测试中可能很有用,但可能导致恶意第三方通过DNS欺骗等方式冒充您的服务器。

请勿在实际系统中使用此功能。将值设置为True意味着使用加密是没有意义的。

必须在调用connect*()之前以及调用tls_set()tls_set_context()之后调用。

enable_logger()
enable_logger(logger=None)

使用标准的Python日志包启用日志记录(参见PEP 282)。这可以与on_log回调方法同时使用。

如果指定了logger,则将使用该logging.Logger对象,否则将自动创建一个。

根据以下映射将Paho日志级别转换为标准日志级别

Paho

logging

MQTT_LOG_ERR

logging.ERROR

MQTT_LOG_WARNING

logging.WARNING

MQTT_LOG_NOTICE

logging.INFO (无直接对应项)

MQTT_LOG_INFO

logging.INFO

MQTT_LOG_DEBUG

logging.DEBUG

disable_logger()
disable_logger()

使用标准的Python日志包禁用日志记录。这不会影响on_log回调。

username_pw_set()
username_pw_set(username, password=None)

设置用于代理认证的用户名和可选密码。必须在调用connect*()之前调用。

user_data_set()
user_data_set(userdata)

设置在生成事件时传递给回调的私有用户数据。用于您自己的目的以支持您的应用程序。

will_set()
will_set(topic, payload=None, qos=0, retain=False)

设置要发送给代理的遗嘱。如果客户端在未调用disconnect()的情况下断开连接,代理将代表其发布消息。

topic

遗嘱消息应发布的主题。

payload

将发送的消息作为遗嘱。如果未指定或设置为None,则使用长度为零的消息作为遗嘱。传递整数或浮点数将导致有效负载转换为表示该数字的字符串。如果您希望发送真正的整数/浮点数,请使用struct.pack()创建所需的负载。

qos

用于遗嘱的服务质量级别。

retain

如果设置为True,遗嘱消息将作为主题的“最后已知良好”/保留消息。

如果qos不是0、1或2,或者如果topicNone或字符串长度为零,则引发ValueError

reconnect_delay_set
reconnect_delay_set(min_delay=1, max_delay=120)

客户端将自动重试连接。在每个尝试之间,它将在min_delaymax_delay之间等待几秒钟。

连接丢失时,最初的重连尝试延迟为min_delay秒。在后续尝试中翻倍,直到达到max_delay

当连接完成时(例如,接收到CONNACK而不是仅建立TCP连接时),延迟重置为min_delay

连接/重新连接/断开连接

connect()
connect(host, port=1883, keepalive=60, bind_address="")

connect()函数将客户端连接到代理。这是一个阻塞函数。它接受以下参数

host

远程代理的主机名或IP地址

port

要连接到的服务器主机的网络端口。默认为1883。请注意,MQTT over SSL/TLS的默认端口为8883,因此如果您正在使用tls_set()tls_set_context(),则可能需要手动提供端口。

keepalive

允许与代理之间通信的最大秒数。如果没有其他消息正在交换,这将控制客户端发送ping消息到代理的速率。

bind_address

要绑定此客户端的本地网络接口的IP地址,假设存在多个接口。

回调

当客户端从代理接收到响应连接的CONNACK消息时,它将生成一个on_connect()回调。

连接示例
mqttc.connect("mqtt.eclipse.org")
connect_async()
connect_async(host, port=1883, keepalive=60, bind_address="")

loop_start()一起使用,以非阻塞方式连接。连接将在调用loop_start()之前不会完成。

连接回调

当客户端从代理接收到响应连接的CONNACK消息时,它将生成一个on_connect()回调。

connect_srv()
connect_srv(domain, keepalive=60, bind_address="")

使用SRV DNS查找连接到代理,以获取代理地址。接受以下参数

domain

要搜索SRV记录的DNS域。如果None,则尝试确定本地域名。

有关keepalivebind_address参数的描述,请参阅connect()

连接SRV回调

当客户端从代理接收到响应连接的CONNACK消息时,它将生成一个on_connect()回调。

SRV连接示例
mqttc.connect_srv("eclipse.org")
reconnect()
reconnect()

使用之前提供的详细信息重新连接到代理。在调用此函数之前,您必须调用connect*()

重新连接回调

当客户端从代理接收到响应连接的CONNACK消息时,它将生成一个on_connect()回调。

disconnect()
disconnect()

干净地断开与代理的连接。使用disconnect()不会导致代理发送遗嘱消息。

断开连接不会等待所有队列中的消息发送,为了确保所有消息都发送,应使用MQTTMessageInfowait_for_publish()。有关详细信息,请参阅publish()

断开连接回调

当客户端发送断开连接消息时,它将生成一个on_disconnect()回调。

网络环路

这些函数是客户端背后的驱动力。如果不调用它们,传入的网络数据将不会被处理,并且传出网络数据可能不会及时发送。管理网络循环有四种选项。这里描述了其中三种,第四种在下面的“外部事件循环支持”中描述。不要混合使用不同的循环函数。

loop()
loop(timeout=1.0, max_packets=1)

定期调用以处理网络事件。此调用在 select() 中等待,直到网络套接字可用于读取或写入,如果适用,然后处理传入/传出数据。此函数最多阻塞 timeout 秒。 timeout 不应超过客户端的 keepalive 值,否则您的客户端将被定期断开连接。

max_packets 参数已过时,应保持未设置。

循环示例
run = True
while run:
    mqttc.loop()
loop_start() / loop_stop()
loop_start()
loop_stop(force=False)

这些函数实现了对网络循环的线程接口。在调用 connect*() 之前或之后调用 loop_start() 一次,在后台运行一个线程来自动调用 loop()。这将为其他可能阻塞的工作释放主线程。此调用还会处理与代理的重连。调用 loop_stop() 停止后台线程。 force 参数目前被忽略。

循环启动/停止示例
mqttc.connect("mqtt.eclipse.org")
mqttc.loop_start()

while True:
    temperature = sensor.blocking_read()
    mqttc.publish("paho/temperature", temperature)
loop_forever()
loop_forever(timeout=1.0, max_packets=1, retry_first_connection=False)

这是网络循环的阻塞形式,直到客户端调用 disconnect() 才返回。它会自动处理重连。

使用 connect_async 时,除了第一次连接尝试外,使用 retry_first_connection=True 使其重试第一次连接。警告:这可能导致客户端不断连接到不存在的服务器而不失败。

timeoutmax_packets 参数已过时,应保持未设置。

发布

从客户端向代理发送消息。

publish()
publish(topic, payload=None, qos=0, retain=False)

这将导致消息被发送到代理,然后从代理发送到订阅匹配主题的任何客户端。它需要以下参数

topic

消息应该发布的主题

payload

要发送的实际消息。如果没有提供或设置为 None,将使用零长度消息。传递 int 或 float 将导致有效负载转换为表示该数字的字符串。如果您想发送真正的 int/float,请使用 struct.pack() 创建所需的有效负载。

qos

要使用的服务质量级别

retain

如果设置为 True,则消息将设置为该主题的“最后已知良好”/保留消息。

返回一个 MQTTMessageInfo,它公开以下属性和方法

  • rc,发布的结果。它可以表示成功的 MQTT_ERR_SUCCESS,如果客户端当前未连接,则表示 MQTT_ERR_NO_CONN,或在使用 max_queued_messages_set 时表示消息既未排队也未发送的 MQTT_ERR_QUEUE_SIZE

  • mid 是发布请求的消息 ID。mid 值可以用于通过检查 on_publish() 回调中的 mid 参数来跟踪发布请求。根据您的用例,wait_for_publish 可能更容易。

  • wait_for_publish() 将会阻塞,直到消息被发布。如果消息未被排队(rc == MQTT_ERR_QUEUE_SIZE),则会抛出 ValueError。

  • is_published 如果消息已被发布则返回 True。如果消息未被排队(rc == MQTT_ERR_QUEUE_SIZE),则会抛出 ValueError。

如果主题 None、长度为零或无效(包含通配符),如果 qos 不是 0、1 或 2 中的一个,或者如果负载长度超过 268435455 字节,将会抛出 ValueError

回调(发布)

当消息已发送到代理时,将会生成一个 on_publish() 回调。

订阅/退订

subscribe()
subscribe(topic, qos=0)

订阅客户端到一个或多个主题。

此函数可以以三种不同的方式调用

简单字符串和整数

例如 subscribe("my/topic", 2)

topic

指定要订阅的订阅主题的字符串。

qos

订阅所需的期望服务质量等级。默认为 0。

字符串和整数元组

例如 subscribe(("my/topic", 1))

topic

一个包含 (topic, qos) 的元组。元组中必须包含主题和 qos。

qos

未使用。

字符串和整数元组列表

例如 subscribe([("my/topic", 0), ("another/topic", 2)])

这允许在单个 SUBSCRIPTION 命令中订阅多个主题,这比使用多个 subscribe() 调用更高效。

topic

一个格式为 (topic, qos) 的元组列表。所有元组中都必须包含主题和 qos。

qos

未使用。

函数返回一个元组 (result, mid),其中 result 表示成功为 MQTT_ERR_SUCCESS,如果客户端当前未连接则为 (MQTT_ERR_NO_CONN, None)mid 是订阅请求的消息 ID。mid 值可以用来通过检查 on_subscribe() 回调中的 mid 参数来跟踪订阅请求(如果已定义)。

如果 qos 不是 0、1 或 2,或者主题 None 或字符串长度为零,或者主题不是字符串、元组或列表,则会抛出 ValueError

回调(订阅)

当代理已确认订阅时,将会生成一个 on_subscribe() 回调。

unsubscribe()
unsubscribe(topic)

从客户端取消订阅一个或多个主题。

topic

一个字符串或字符串列表,表示要取消订阅的订阅主题。

返回一个元组 (result, mid),其中 result 表示成功为 MQTT_ERR_SUCCESS,如果客户端当前未连接则为 (MQTT_ERR_NO_CONN, None)mid 是取消订阅请求的消息 ID。mid 值可以用来通过检查 on_unsubscribe() 回调中的 mid 参数来跟踪取消订阅请求(如果已定义)。

如果主题 None 或字符串长度为零,或者不是字符串或列表,则会抛出 ValueError

回调(取消订阅)

当代理已确认取消订阅时,将会生成一个 on_unsubscribe() 回调。

回调

on_connect()
on_connect(client, userdata, flags, rc)

当代理响应我们的连接请求时调用。

client

此回调的客户端实例

userdata

Client()user_data_set() 中设置的私有用户数据

flags

由代理发送的响应标志

rc

连接结果

flags是一个包含代理响应标志的字典
flags['session present'] - 这个标志对于将clean session设置为0的客户很有用

。如果一个客户具有clean session=0,并重新连接到之前连接过的代理,这个标志指示代理是否仍然保存该客户的会话信息。如果为1,表示会话仍然存在。

rc的值表示是否成功

0: 连接成功 1: 连接被拒绝 - 协议版本不正确 2: 连接被拒绝 - 客户端标识符无效 3: 连接被拒绝 - 服务器不可用 4: 连接被拒绝 - 用户名或密码错误 5: 连接被拒绝 - 未授权 6-255: 目前未使用。

连接示例
def on_connect(client, userdata, flags, rc):
    print("Connection returned result: "+connack_string(rc))

mqttc.on_connect = on_connect
...
on_disconnect()
on_disconnect(client, userdata, rc)

当客户端从代理断开连接时被调用。

client

此回调的客户端实例

userdata

Client()user_data_set() 中设置的私有用户数据

rc

断开连接的结果

rc参数表示断开连接的状态。如果MQTT_ERR_SUCCESS (0),则回调是在响应disconnect()调用时调用的。如果任何其他值,表示断开连接是意外的,例如可能是由于网络错误导致的。

断开示例
def on_disconnect(client, userdata, rc):
    if rc != 0:
        print("Unexpected disconnection.")

mqttc.on_disconnect = on_disconnect
...
on_message()
on_message(client, userdata, message)

当客户端订阅的主题接收到消息,且消息不匹配现有的主题过滤器回调时被调用。使用message_callback_add()定义一个将针对特定主题过滤器调用的回调。如果没有任何匹配项,则on_message将作为后备使用。

client

此回调的客户端实例

userdata

Client()user_data_set() 中设置的私有用户数据

消息

MQTTMessage的一个实例。这是一个包含topicpayloadqosretain成员的类。

消息示例
def on_message(client, userdata, message):
    print("Received message '" + str(message.payload) + "' on topic '"
        + message.topic + "' with QoS " + str(message.qos))

mqttc.on_message = on_message
...
message_callback_add()

此函数允许您定义用于处理特定订阅过滤器接收到的消息的回调,包括使用通配符。这使得您能够,例如,订阅sensors/#,并有一个回调来处理sensors/temperature,另一个回调来处理sensors/humidity

message_callback_add(sub, callback)
sub

此回调要匹配的订阅过滤器。每个字面量子字符串只能定义一个回调。

callback

要使用的回调。其形式与on_message回调相同。

如果在使用message_callback_add()on_message时,只有不匹配特定订阅过滤器的消息才会传递给on_message回调。

如果有多个子匹配主题,则每个回调都会被调用(例如,子sensors/#和子+/humidity都匹配主题sensors/humidity的消息,因此两个回调都将处理此消息)。

message_callback_remove()

删除之前使用message_callback_add()注册的特定主题/订阅回调。

message_callback_remove(sub)
sub

要删除的订阅过滤器

on_publish()
on_publish(client, userdata, mid)

当使用publish()调用要发送的消息完成传输到代理时被调用。对于QoS级别1和2的消息,这意味着已完成适当的握手。对于QoS 0,这意味着消息已经离开客户端。变量mid与从相应的publish()调用返回的变量mid相匹配,以便跟踪出站消息。

此回调很重要,因为即使publish()调用返回成功,也并不意味着消息已发送。

on_subscribe()
on_subscribe(client, userdata, mid, granted_qos)

当代理响应订阅请求时调用。变量 mid 与对应 subscribe() 调用返回的 mid 变量相匹配。变量 granted_qos 是一个整数列表,表示代理为每个不同的订阅请求授予的 QoS 级别。

on_unsubscribe()
on_unsubscribe(client, userdata, mid)

当代理响应取消订阅请求时调用。变量 mid 与对应 unsubscribe() 调用返回的 mid 变量相匹配。

on_log()
on_log(client, userdata, level, buf)

当客户端有日志信息时调用。用于调试,定义允许调试。变量 level 表示消息的严重性,可以是 MQTT_LOG_INFOMQTT_LOG_NOTICEMQTT_LOG_WARNINGMQTT_LOG_ERRMQTT_LOG_DEBUG 之一。消息本身在 buf 中。

这可以与标准 Python 日志记录同时使用,可以通过 enable_logger 方法启用。

on_socket_open()
on_socket_open(client, userdata, sock)

当套接字已打开时调用。用于将套接字注册到外部事件循环以进行读取。

on_socket_close()
on_socket_close(client, userdata, sock)

当套接字即将关闭时调用。用于将套接字从外部事件循环中注销以进行读取。

on_socket_register_write()
on_socket_register_write(client, userdata, sock)

当对套接字的写入操作失败,例如输出缓冲区满时调用。用于将套接字注册到外部事件循环以进行写入。

on_socket_unregister_write()
on_socket_unregister_write(client, userdata, sock)

当套接字的写入操作在之前失败后成功时调用。用于将套接字从外部事件循环中注销以进行写入。

外部事件循环支持

loop_read()
loop_read(max_packets=1)

当套接字准备读取时调用。变量 max_packets 已过时,应保持未设置。

loop_write()
loop_write(max_packets=1)

当套接字准备写入时调用。变量 max_packets 已过时,应保持未设置。

loop_misc()
loop_misc()

每几秒钟调用一次以处理消息重试和心跳。

socket()
socket()

返回客户端正在使用的套接字对象,以允许与其他事件循环进行接口。此调用对于基于 select 的循环特别有用。请参阅 examples/loop_select.py

want_write()
want_write()

如果还有等待写入的数据,则返回 true,以允许与其他事件循环接口。此调用对于基于 select 的循环特别有用。请参阅 examples/loop_select.py

状态回调
on_socket_open
on_socket_close
on_socket_register_write
on_socket_unregister_write

使用这些回调来获取有关套接字状态变化的通知。对于您在读取+写入时注册或注销套接字的事件循环特别有用。请参阅 examples/loop_asyncio.py 中的示例。

当套接字打开时,调用 on_socket_open。将套接字注册到您的事件循环以进行读取。

当套接字即将关闭时,调用 on_socket_close。从您的事件循环中注销套接字以进行读取。

当写入套接字失败,例如输出缓冲区满时,调用 on_socket_register_write。将套接字注册到您的事件循环以进行写入。

当下一次写入套接字成功时,调用 on_socket_unregister_write。从您的事件循环中注销套接字以进行写入。

回调始终按此顺序调用

  • on_socket_open

  • 零次或多次

    • on_socket_register_write

    • on_socket_unregister_write

  • on_socket_close

全局辅助函数

客户端模块还提供了一些全局辅助函数。

topic_matches_sub(sub, topic) 可以用来检查一个 topic 是否与一个 subscription 匹配。

例如

主题 foo/bar 会匹配订阅 foo/#+/bar

主题 non/matching 不会匹配订阅 non/+/+

connack_string(connack_code) 返回与 CONNACK 结果相关联的错误字符串。

error_string(mqtt_errno) 返回与 Paho MQTT 错误号相关联的错误字符串。

发布

本模块提供了一些辅助函数,以便以单次方式直接发布消息。换句话说,它们适用于你只想将单个/多个消息发布到代理,然后断开连接,不再需要其他操作的情况。

提供的两个函数是 single()multiple()

单个

向代理发布单个消息,然后干净地断开连接。

single(topic, payload=None, qos=0, retain=False, hostname="localhost",
    port=1883, client_id="", keepalive=60, will=None, auth=None, tls=None,
    protocol=mqtt.MQTTv311, transport="tcp")
发布单个函数参数
topic

唯一的必需参数必须是将要发布负载的顶级字符串。

payload

要发布的负载。如果为“”或 None,则发布一个零长度的负载。

qos

发布时使用的 qos,默认为 0。

retain

设置消息为保留(True)或非保留(False)。

hostname

包含要连接的代理地址的字符串。默认为主机。

port

连接到代理的端口。默认为 1883。

client_id

要使用的 MQTT 客户端 ID。如果为“”或 None,Paho 库将自动生成客户端 ID。

keepalive

客户端的 keepalive 超时值。默认为 60 秒。

will

包含客户端 will 参数的字典

will = {‘topic’: “<topic>”, ‘payload’:”<payload”>, ‘qos’:<qos>, ‘retain’:<retain>}.

主题是必需的,所有其他参数都是可选的,默认分别为 None、0 和 False。

默认为 None,表示不使用 will。

auth

包含客户端认证参数的字典

auth = {‘username’:”<username>”, ‘password’:”<password>”}

用户名是必需的,密码是可选的,如果未提供,则默认为 None。

默认为 None,表示不使用认证。

tls

包含客户端 TLS 配置参数的字典

dict = {‘ca_certs’:”<ca_certs>”, ‘certfile’:”<certfile>”, ‘keyfile’:”<keyfile>”, ‘tls_version’:”<tls_version>”, ‘ciphers’:”<ciphers”>}

ca_certs 是必需的,所有其他参数都是可选的,如果未提供,则默认为 None,这将导致客户端使用默认行为 - 请参阅 paho.mqtt.client 文档。

默认为 None,表示不使用 TLS。

protocol

选择要使用的 MQTT 协议版本。使用 MQTTv31MQTTv311

transport

设置为“websockets”以通过WebSockets发送MQTT。保留默认值“tcp”以使用原始TCP。

发布单个示例
import paho.mqtt.publish as publish

publish.single("paho/test/single", "payload", hostname="mqtt.eclipse.org")

多个

向代理发布多个消息,然后干净地断开连接。

multiple(msgs, hostname="localhost", port=1883, client_id="", keepalive=60,
    will=None, auth=None, tls=None, protocol=mqtt.MQTTv311, transport="tcp")
发布多个函数参数
msgs

要发布的消息列表。每个消息可以是字典或元组。

如果为字典,则必须有顶级存在。对于任何缺失的参数,将使用默认值。字典必须具有以下形式

msg = {‘topic’:”<topic>”, ‘payload’:”<payload>”, ‘qos’:<qos>, ‘retain’:<retain>}

主题必须存在且不能为空。如果负载为“”,None 或不存在,则发布一个零长度的负载。如果 qos 不存在,则使用默认值 0。如果 retain 不存在,则使用默认值 False。

如果为元组,则必须具有以下形式

(“<topic>”, “<payload>”, qos, retain)

请参阅 single() 了解 hostnameportclient_idkeepalivewillauthtlsprotocoltransport 的说明。

发布多个示例
import paho.mqtt.publish as publish

msgs = [{'topic':"paho/test/multiple", 'payload':"multiple 1"},
    ("paho/test/multiple", "multiple 2", 0, False)]
publish.multiple(msgs, hostname="mqtt.eclipse.org")

订阅

此模块提供了一些辅助函数,以便可以直观地订阅和处理消息。

提供的两个函数是 simple()callback()

简单

订阅一组主题并返回接收到的消息。这是一个阻塞函数。

simple(topics, qos=0, msg_count=1, retained=False, hostname="localhost",
    port=1883, client_id="", keepalive=60, will=None, auth=None, tls=None,
    protocol=mqtt.MQTTv311)
简单订阅函数参数
topics

唯一必需的参数是客户端将订阅的主题字符串。这可以是一个字符串,也可以是一个字符串列表,如果需要订阅多个主题。

qos

订阅时使用的qos,默认为0。

msg_count

从代理程序中检索的消息数量。默认为1。如果为1,则返回单个 MQTTMessage 对象。如果 >1,则返回 MQTTMessages 列表。

retained

设置为 True 以考虑保留消息,设置为 False 以忽略设置了保留标志的消息。

hostname

包含要连接的代理地址的字符串。默认为主机。

port

连接到代理的端口。默认为 1883。

client_id

要使用的 MQTT 客户端 ID。如果为“”或 None,Paho 库将自动生成客户端 ID。

keepalive

客户端的 keepalive 超时值。默认为 60 秒。

will

包含客户端 will 参数的字典

will = {‘topic’: “<topic>”, ‘payload’:”<payload”>, ‘qos’:<qos>, ‘retain’:<retain>}.

主题是必需的,所有其他参数都是可选的,默认分别为 None、0 和 False。

默认为 None,表示不使用 will。

auth

包含客户端认证参数的字典

auth = {‘username’:”<username>”, ‘password’:”<password>”}

用户名是必需的,密码是可选的,如果未提供,则默认为 None。

默认为 None,表示不使用认证。

tls

包含客户端 TLS 配置参数的字典

dict = {‘ca_certs’:”<ca_certs>”, ‘certfile’:”<certfile>”, ‘keyfile’:”<keyfile>”, ‘tls_version’:”<tls_version>”, ‘ciphers’:”<ciphers”>}

ca_certs 是必需的,所有其他参数都是可选的,如果未提供,则默认为 None,这将导致客户端使用默认行为 - 请参阅 paho.mqtt.client 文档。

默认为 None,表示不使用 TLS。

protocol

选择要使用的 MQTT 协议版本。使用 MQTTv31MQTTv311

简单示例
import paho.mqtt.subscribe as subscribe

msg = subscribe.simple("paho/test/simple", hostname="mqtt.eclipse.org")
print("%s %s" % (msg.topic, msg.payload))

使用回调

订阅一组主题,并使用用户提供的回调处理接收到的消息。

callback(callback, topics, qos=0, userdata=None, hostname="localhost",
    port=1883, client_id="", keepalive=60, will=None, auth=None, tls=None,
    protocol=mqtt.MQTTv311)
回调订阅函数参数
callback

一个“on_message”回调,它将用于每个接收到的消息,形式为

def on_message(client, userdata, message)
topics

客户端将订阅的主题字符串。这可以是一个字符串,也可以是一个字符串列表,如果需要订阅多个主题。

qos

订阅时使用的qos,默认为0。

userdata

用户提供的对象,将在收到消息时传递给 on_message 回调。

请参阅 simple() 了解 hostnameportclient_idkeepalivewillauthtlsprotocol 的说明。

回调示例
import paho.mqtt.subscribe as subscribe

def on_message_print(client, userdata, message):
    print("%s %s" % (message.topic, message.payload))

subscribe.callback(on_message_print, "paho/test/callback", hostname="mqtt.eclipse.org")

报告错误

请在 https://github.com/eclipse/paho.mqtt.python/issues 的问题跟踪器中报告错误。

更多信息

有关 Paho 客户端的讨论在 Eclipse paho-dev 邮件列表 上进行。

有关 MQTT 协议本身(不是此库)的通用问题在 MQTT Google Group 上讨论。

有关 MQTT 社区的更多信息可通过 MQTT 社区网站 获取。

发布流程

# get in a Python 3 venv
git checkout ib/1.5
python setup.py bdist_wheel
twine upload dist/*

# then get in a Python 2 venv
# and release it

项目详细信息


下载文件

下载适合您平台的文件。如果您不确定选择哪个,请了解有关 安装软件包 的更多信息。

源代码分发

此发布没有可用的源代码分发文件。请参阅有关 生成分发存档 的教程。

构建分发

iottalk_paho_mqtt-1.5.1-py3-none-any.whl (61.8 kB 查看散列)

上传于 Python 3

支持者