MQTT版本5.0/3.1.1客户端类
项目描述
本文档描述了Eclipse Paho MQTT Python客户端库的源代码,该库实现了MQTT协议的5.0、3.1.1和3.1版本。
此代码提供了一个客户端类,允许应用程序连接到MQTT代理来发布消息,并订阅主题以接收发布的消息。它还提供了一些辅助函数,使得向MQTT服务器发布一次性消息变得非常简单。
它支持Python 2.7.9+或3.5+。
MQTT协议是一种机器到机器(M2M)/“物联网”连接协议。设计为一个极轻量级的发布/订阅消息传输,它适用于需要小代码足迹和/或网络带宽受限的远程位置连接。
Paho是Eclipse基金会的一个项目。
内容
安装
最新稳定版本可在Python包索引(PyPi)中获取,并可以使用以下方式安装:
pip install paho-mqtt
或者使用 virtualenv
virtualenv paho-mqtt source paho-mqtt/bin/activate pip install paho-mqtt
要获取完整代码,包括示例和测试,您可以从git仓库克隆
git clone https://github.com/eclipse/paho.mqtt.python
一旦您有了代码,也可以从您的仓库中安装
cd paho.mqtt.python python setup.py install
要执行所有测试(包括MQTT v5测试),您还需要在paho.mqtt.python文件夹中克隆paho.mqtt.testing
git clone https://github.com/eclipse/paho.mqtt.testing.git
已知限制
以下是一些已知未实现MQTT功能。
当clean_session为False时,会话仅存储在内存中,不会持久化。这意味着当客户端重启时(不仅仅是重新连接,通常是因为程序重启),会话会丢失。这可能导致消息丢失。
以下部分客户端会话丢失
从服务器接收的QoS 2消息,但尚未完全确认。
由于客户端会盲目确认任何PUBCOMP(QoS 2事务的最后一个消息),因此它不会挂起,但会丢失此QoS 2消息。
已发送到服务器的QoS 1和QoS 2消息,但尚未完全确认。
这意味着传递给publish()的消息可能会丢失。可以通过确保所有传递给publish()的消息都有一个相应的on_publish()调用来减轻这种情况。
这也意味着代理可能具有会话中的Qos2消息。由于客户端以空会话开始,它不知道这一点,并且会重新使用mid。这尚未修复。
此外,当clean_session为True时,此库将在网络重新连接时重新发布QoS > 0消息。这意味着QoS > 0消息不会丢失。但是,标准规定我们应该丢弃任何已发送发布包的消息。我们的选择意味着我们不遵守标准,QoS 2可能被接收两次。如果您需要QoS 2的仅一次投递保证,则应使用clean_session = False。
用法和API
详细的API文档可通过 pydoc 获取。示例在 examples 目录中可用。
此包提供两个模块,一个是完整客户端,一个是简单发布的辅助工具。
入门指南
以下是一个非常简单的示例,它订阅到代理$SYS主题树,并打印出结果消息
import paho.mqtt.client as mqtt
# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, flags, rc):
print("Connected with result code "+str(rc))
# Subscribing in on_connect() means that if we lose the connection and
# reconnect then subscriptions will be renewed.
client.subscribe("$SYS/#")
# The callback for when a PUBLISH message is received from the server.
def on_message(client, userdata, msg):
print(msg.topic+" "+str(msg.payload))
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect("mqtt.eclipse.org", 1883, 60)
# Blocking call that processes network traffic, dispatches callbacks and
# handles reconnecting.
# Other loop*() functions are available that give a threaded interface and a
# manual interface.
client.loop_forever()
客户端
您可以将客户端类作为一个实例使用,在一个类中或通过子类化。一般使用流程如下:
创建一个客户端实例
使用其中一个 connect*() 函数连接到代理
调用其中一个 loop*() 函数以保持与代理的网络流量
使用 subscribe() 订阅到主题并接收消息
使用 publish() 将消息发布到代理
使用 disconnect() 从代理断开连接
回调将被调用以允许应用程序根据需要处理事件。以下将描述这些回调。
构造函数/重新初始化
Client()
Client(client_id="", clean_session=True, userdata=None, protocol=MQTTv311, transport="tcp")
Client() 构造函数接受以下参数:
- client_id
在连接到代理时使用的唯一客户端ID字符串。如果client_id为空或None,则将随机生成一个。在这种情况下,clean_session参数必须为True。
- clean_session
一个布尔值,用于确定客户端类型。如果为True,则在客户端断开连接时,代理将删除关于此客户端的所有信息。如果为False,则客户端是持久客户端,在客户端断开连接时将保留订阅信息和队列中的消息。
请注意,客户端在断开连接时永远不会丢弃自己的出站消息。调用connect()或reconnect()会导致消息重新发送。使用reinitialise()将客户端重置为其原始状态。
- userdata
用户定义的任何类型的数据,作为userdata参数传递给回调函数。可以在稍后使用user_data_set()函数更新它。
- protocol
此客户端使用的MQTT协议版本。可以是MQTTv31或MQTTv311。
- transport
设置为“websockets”以通过WebSockets发送MQTT。保留默认值“tcp”以使用原始TCP。
构造函数示例
import paho.mqtt.client as mqtt
mqttc = mqtt.Client()
reinitialise()
reinitialise(client_id="", clean_session=True, userdata=None)
reinitialise()函数将客户端重置为其起始状态,就像刚刚创建一样。它接受与Client()构造函数相同的参数。
重置示例
mqttc.reinitialise()
选项函数
这些函数表示可以在客户端上设置的选项,以修改其行为。在大多数情况下,这必须在连接到代理之前完成。
max_inflight_messages_set()
max_inflight_messages_set(self, inflight)
设置一次可以部分完成网络流的QoS>0消息的最大数量。
默认值为20。增加此值将消耗更多内存,但可以增加吞吐量。
max_queued_messages_set()
max_queued_messages_set(self, queue_size)
设置可以处于待发队列中的QoS>0出站消息的最大数量。
默认值为0。0表示无限。当队列满时,任何进一步的出站消息都会被丢弃。
message_retry_set()
message_retry_set(retry)
设置在代理没有响应时,QoS>0消息重试前的秒数。
默认设置为5秒,通常不需要更改。
ws_set_options()
ws_set_options(self, path="/mqtt", headers=None)
设置Websocket连接选项。只有当在Client()构造函数中传递了transport="websockets"时,这些选项才会被使用。
- path
在代理上使用的mqtt路径。
- headers
指定附加到标准Websocket头部的额外头部的字典,或者是一个可调用对象,它接受正常的Websocket头并返回一个新的字典,其中包含连接到代理的头信息。
必须在connect*()之前调用。如何在examples文件夹中使用此功能与AWS IoT平台的一个示例。
tls_set()
tls_set(ca_certs=None, certfile=None, keyfile=None, cert_reqs=ssl.CERT_REQUIRED,
tls_version=ssl.PROTOCOL_TLS, ciphers=None)
配置网络加密和身份验证选项。启用SSL/TLS支持。
- ca_certs
指向要由客户端视为可信的证书机构证书文件字符串路径。如果这是唯一提供的选项,则客户端将类似于网络浏览器运行。也就是说,它将要求代理的证书由ca_certs中的证书机构签名,并使用TLS v1进行通信,但不会尝试任何形式的身份验证。这提供了基本的网络加密,但这可能不足以根据代理的配置方式。默认情况下,在Python 2.7.9+或3.4+上使用系统的默认证书机构。在较旧的Python版本中,此参数是必需的。
- 证书文件,密钥文件
指向PEM编码的客户证书和私钥的字符串。如果这些参数不是None,则它们将被用作基于TLS的认证的客户信息。此功能的支持取决于代理。注意,如果这些文件之一被加密并且需要密码来解密,Python将在命令行中请求密码。目前无法定义回调以提供密码。
- cert_reqs
定义客户端对代理施加的证书要求。默认情况下是ssl.CERT_REQUIRED,这意味着代理必须提供证书。有关此参数的更多信息,请参阅ssl pydoc。
- tls_version
指定要使用的SSL/TLS协议版本。默认情况下(如果Python版本支持),将检测最高TLS版本。如果不可用,则使用TLS v1。以前的版本(所有以SSL开始的版本)是可能的,但不推荐使用,因为这些版本可能存在安全风险。
- ciphers
一个字符串,指定允许此连接的加密加密套件,或使用None以使用默认设置。有关更多信息,请参阅ssl pydoc。
必须在调用connect*()之前调用。
tls_set_context()
tls_set_context(context=None)
配置网络加密和认证上下文。启用SSL/TLS支持。
- context
一个ssl.SSLContext对象。默认情况下,如果可用(自Python 3.4起添加),则由ssl.create_default_context()提供。
如果您不确定是否使用此方法,则可以使用默认上下文或使用tls_set方法。有关更多信息,请参阅ssl模块文档中关于安全考虑的部分。
必须在调用connect*()之前调用。
tls_insecure_set()
tls_insecure_set(value)
配置服务器证书中服务器主机名的验证。
如果value设置为True,则无法保证您连接到的主机不是在冒充您的服务器。这在初始服务器测试中可能很有用,但可能导致恶意第三方通过DNS欺骗等方式冒充您的服务器。
请勿在实际系统中使用此功能。将值设置为True意味着使用加密是没有意义的。
必须在调用connect*()之前以及调用tls_set()或tls_set_context()之后调用。
enable_logger()
enable_logger(logger=None)
使用标准的Python日志包启用日志记录(参见PEP 282)。这可以与on_log回调方法同时使用。
如果指定了logger,则将使用该logging.Logger对象,否则将自动创建一个。
根据以下映射将Paho日志级别转换为标准日志级别
Paho |
logging |
---|---|
MQTT_LOG_ERR |
logging.ERROR |
MQTT_LOG_WARNING |
logging.WARNING |
MQTT_LOG_NOTICE |
logging.INFO (无直接对应项) |
MQTT_LOG_INFO |
logging.INFO |
MQTT_LOG_DEBUG |
logging.DEBUG |
disable_logger()
disable_logger()
使用标准的Python日志包禁用日志记录。这不会影响on_log回调。
username_pw_set()
username_pw_set(username, password=None)
设置用于代理认证的用户名和可选密码。必须在调用connect*()之前调用。
user_data_set()
user_data_set(userdata)
设置在生成事件时传递给回调的私有用户数据。用于您自己的目的以支持您的应用程序。
will_set()
will_set(topic, payload=None, qos=0, retain=False)
设置要发送给代理的遗嘱。如果客户端在未调用disconnect()的情况下断开连接,代理将代表其发布消息。
- topic
遗嘱消息应发布的主题。
- payload
将发送的消息作为遗嘱。如果未指定或设置为None,则使用长度为零的消息作为遗嘱。传递整数或浮点数将导致有效负载转换为表示该数字的字符串。如果您希望发送真正的整数/浮点数,请使用struct.pack()创建所需的负载。
- qos
用于遗嘱的服务质量级别。
- retain
如果设置为True,遗嘱消息将作为主题的“最后已知良好”/保留消息。
如果qos不是0、1或2,或者如果topic为None或字符串长度为零,则引发ValueError。
reconnect_delay_set
reconnect_delay_set(min_delay=1, max_delay=120)
客户端将自动重试连接。在每个尝试之间,它将在min_delay和max_delay之间等待几秒钟。
连接丢失时,最初的重连尝试延迟为min_delay秒。在后续尝试中翻倍,直到达到max_delay。
当连接完成时(例如,接收到CONNACK而不是仅建立TCP连接时),延迟重置为min_delay。
连接/重新连接/断开连接
connect()
connect(host, port=1883, keepalive=60, bind_address="")
connect()函数将客户端连接到代理。这是一个阻塞函数。它接受以下参数
- host
远程代理的主机名或IP地址
- port
要连接到的服务器主机的网络端口。默认为1883。请注意,MQTT over SSL/TLS的默认端口为8883,因此如果您正在使用tls_set()或tls_set_context(),则可能需要手动提供端口。
- keepalive
允许与代理之间通信的最大秒数。如果没有其他消息正在交换,这将控制客户端发送ping消息到代理的速率。
- bind_address
要绑定此客户端的本地网络接口的IP地址,假设存在多个接口。
回调
当客户端从代理接收到响应连接的CONNACK消息时,它将生成一个on_connect()回调。
连接示例
mqttc.connect("mqtt.eclipse.org")
connect_async()
connect_async(host, port=1883, keepalive=60, bind_address="")
与loop_start()一起使用,以非阻塞方式连接。连接将在调用loop_start()之前不会完成。
连接回调
当客户端从代理接收到响应连接的CONNACK消息时,它将生成一个on_connect()回调。
connect_srv()
connect_srv(domain, keepalive=60, bind_address="")
使用SRV DNS查找连接到代理,以获取代理地址。接受以下参数
- domain
要搜索SRV记录的DNS域。如果None,则尝试确定本地域名。
有关keepalive和bind_address参数的描述,请参阅connect()。
连接SRV回调
当客户端从代理接收到响应连接的CONNACK消息时,它将生成一个on_connect()回调。
SRV连接示例
mqttc.connect_srv("eclipse.org")
reconnect()
reconnect()
使用之前提供的详细信息重新连接到代理。在调用此函数之前,您必须调用connect*()。
重新连接回调
当客户端从代理接收到响应连接的CONNACK消息时,它将生成一个on_connect()回调。
disconnect()
disconnect()
干净地断开与代理的连接。使用disconnect()不会导致代理发送遗嘱消息。
断开连接不会等待所有队列中的消息发送,为了确保所有消息都发送,应使用MQTTMessageInfo的wait_for_publish()。有关详细信息,请参阅publish()。
断开连接回调
当客户端发送断开连接消息时,它将生成一个on_disconnect()回调。
网络环路
这些函数是客户端背后的驱动力。如果不调用它们,传入的网络数据将不会被处理,并且传出网络数据可能不会及时发送。管理网络循环有四种选项。这里描述了其中三种,第四种在下面的“外部事件循环支持”中描述。不要混合使用不同的循环函数。
loop()
loop(timeout=1.0, max_packets=1)
定期调用以处理网络事件。此调用在 select() 中等待,直到网络套接字可用于读取或写入,如果适用,然后处理传入/传出数据。此函数最多阻塞 timeout 秒。 timeout 不应超过客户端的 keepalive 值,否则您的客户端将被定期断开连接。
max_packets 参数已过时,应保持未设置。
循环示例
run = True
while run:
mqttc.loop()
loop_start() / loop_stop()
loop_start()
loop_stop(force=False)
这些函数实现了对网络循环的线程接口。在调用 connect*() 之前或之后调用 loop_start() 一次,在后台运行一个线程来自动调用 loop()。这将为其他可能阻塞的工作释放主线程。此调用还会处理与代理的重连。调用 loop_stop() 停止后台线程。 force 参数目前被忽略。
循环启动/停止示例
mqttc.connect("mqtt.eclipse.org")
mqttc.loop_start()
while True:
temperature = sensor.blocking_read()
mqttc.publish("paho/temperature", temperature)
loop_forever()
loop_forever(timeout=1.0, max_packets=1, retry_first_connection=False)
这是网络循环的阻塞形式,直到客户端调用 disconnect() 才返回。它会自动处理重连。
使用 connect_async 时,除了第一次连接尝试外,使用 retry_first_connection=True 使其重试第一次连接。警告:这可能导致客户端不断连接到不存在的服务器而不失败。
timeout 和 max_packets 参数已过时,应保持未设置。
发布
从客户端向代理发送消息。
publish()
publish(topic, payload=None, qos=0, retain=False)
这将导致消息被发送到代理,然后从代理发送到订阅匹配主题的任何客户端。它需要以下参数
- topic
消息应该发布的主题
- payload
要发送的实际消息。如果没有提供或设置为 None,将使用零长度消息。传递 int 或 float 将导致有效负载转换为表示该数字的字符串。如果您想发送真正的 int/float,请使用 struct.pack() 创建所需的有效负载。
- qos
要使用的服务质量级别
- retain
如果设置为 True,则消息将设置为该主题的“最后已知良好”/保留消息。
返回一个 MQTTMessageInfo,它公开以下属性和方法
rc,发布的结果。它可以表示成功的 MQTT_ERR_SUCCESS,如果客户端当前未连接,则表示 MQTT_ERR_NO_CONN,或在使用 max_queued_messages_set 时表示消息既未排队也未发送的 MQTT_ERR_QUEUE_SIZE。
mid 是发布请求的消息 ID。mid 值可以用于通过检查 on_publish() 回调中的 mid 参数来跟踪发布请求。根据您的用例,wait_for_publish 可能更容易。
wait_for_publish() 将会阻塞,直到消息被发布。如果消息未被排队(rc == MQTT_ERR_QUEUE_SIZE),则会抛出 ValueError。
is_published 如果消息已被发布则返回 True。如果消息未被排队(rc == MQTT_ERR_QUEUE_SIZE),则会抛出 ValueError。
如果主题 None、长度为零或无效(包含通配符),如果 qos 不是 0、1 或 2 中的一个,或者如果负载长度超过 268435455 字节,将会抛出 ValueError。
回调(发布)
当消息已发送到代理时,将会生成一个 on_publish() 回调。
订阅/退订
subscribe()
subscribe(topic, qos=0)
订阅客户端到一个或多个主题。
此函数可以以三种不同的方式调用
简单字符串和整数
例如 subscribe("my/topic", 2)
- topic
指定要订阅的订阅主题的字符串。
- qos
订阅所需的期望服务质量等级。默认为 0。
字符串和整数元组
例如 subscribe(("my/topic", 1))
- topic
一个包含 (topic, qos) 的元组。元组中必须包含主题和 qos。
- qos
未使用。
字符串和整数元组列表
例如 subscribe([("my/topic", 0), ("another/topic", 2)])
这允许在单个 SUBSCRIPTION 命令中订阅多个主题,这比使用多个 subscribe() 调用更高效。
- topic
一个格式为 (topic, qos) 的元组列表。所有元组中都必须包含主题和 qos。
- qos
未使用。
函数返回一个元组 (result, mid),其中 result 表示成功为 MQTT_ERR_SUCCESS,如果客户端当前未连接则为 (MQTT_ERR_NO_CONN, None)。 mid 是订阅请求的消息 ID。mid 值可以用来通过检查 on_subscribe() 回调中的 mid 参数来跟踪订阅请求(如果已定义)。
如果 qos 不是 0、1 或 2,或者主题 None 或字符串长度为零,或者主题不是字符串、元组或列表,则会抛出 ValueError。
回调(订阅)
当代理已确认订阅时,将会生成一个 on_subscribe() 回调。
unsubscribe()
unsubscribe(topic)
从客户端取消订阅一个或多个主题。
- topic
一个字符串或字符串列表,表示要取消订阅的订阅主题。
返回一个元组 (result, mid),其中 result 表示成功为 MQTT_ERR_SUCCESS,如果客户端当前未连接则为 (MQTT_ERR_NO_CONN, None)。 mid 是取消订阅请求的消息 ID。mid 值可以用来通过检查 on_unsubscribe() 回调中的 mid 参数来跟踪取消订阅请求(如果已定义)。
如果主题 None 或字符串长度为零,或者不是字符串或列表,则会抛出 ValueError。
回调(取消订阅)
当代理已确认取消订阅时,将会生成一个 on_unsubscribe() 回调。
回调
on_connect()
on_connect(client, userdata, flags, rc)
当代理响应我们的连接请求时调用。
- client
此回调的客户端实例
- userdata
在 Client() 或 user_data_set() 中设置的私有用户数据
- flags
由代理发送的响应标志
- rc
连接结果
- flags是一个包含代理响应标志的字典
- flags['session present'] - 这个标志对于将clean session设置为0的客户很有用
。如果一个客户具有clean session=0,并重新连接到之前连接过的代理,这个标志指示代理是否仍然保存该客户的会话信息。如果为1,表示会话仍然存在。
rc的值表示是否成功
0: 连接成功 1: 连接被拒绝 - 协议版本不正确 2: 连接被拒绝 - 客户端标识符无效 3: 连接被拒绝 - 服务器不可用 4: 连接被拒绝 - 用户名或密码错误 5: 连接被拒绝 - 未授权 6-255: 目前未使用。
连接示例
def on_connect(client, userdata, flags, rc):
print("Connection returned result: "+connack_string(rc))
mqttc.on_connect = on_connect
...
on_disconnect()
on_disconnect(client, userdata, rc)
当客户端从代理断开连接时被调用。
- client
此回调的客户端实例
- userdata
在 Client() 或 user_data_set() 中设置的私有用户数据
- rc
断开连接的结果
rc参数表示断开连接的状态。如果MQTT_ERR_SUCCESS (0),则回调是在响应disconnect()调用时调用的。如果任何其他值,表示断开连接是意外的,例如可能是由于网络错误导致的。
断开示例
def on_disconnect(client, userdata, rc):
if rc != 0:
print("Unexpected disconnection.")
mqttc.on_disconnect = on_disconnect
...
on_message()
on_message(client, userdata, message)
当客户端订阅的主题接收到消息,且消息不匹配现有的主题过滤器回调时被调用。使用message_callback_add()定义一个将针对特定主题过滤器调用的回调。如果没有任何匹配项,则on_message将作为后备使用。
- client
此回调的客户端实例
- userdata
在 Client() 或 user_data_set() 中设置的私有用户数据
- 消息
MQTTMessage的一个实例。这是一个包含topic、payload、qos、retain成员的类。
消息示例
def on_message(client, userdata, message):
print("Received message '" + str(message.payload) + "' on topic '"
+ message.topic + "' with QoS " + str(message.qos))
mqttc.on_message = on_message
...
message_callback_add()
此函数允许您定义用于处理特定订阅过滤器接收到的消息的回调,包括使用通配符。这使得您能够,例如,订阅sensors/#,并有一个回调来处理sensors/temperature,另一个回调来处理sensors/humidity。
message_callback_add(sub, callback)
- sub
此回调要匹配的订阅过滤器。每个字面量子字符串只能定义一个回调。
- callback
要使用的回调。其形式与on_message回调相同。
如果在使用message_callback_add()和on_message时,只有不匹配特定订阅过滤器的消息才会传递给on_message回调。
如果有多个子匹配主题,则每个回调都会被调用(例如,子sensors/#和子+/humidity都匹配主题sensors/humidity的消息,因此两个回调都将处理此消息)。
message_callback_remove()
删除之前使用message_callback_add()注册的特定主题/订阅回调。
message_callback_remove(sub)
- sub
要删除的订阅过滤器
on_publish()
on_publish(client, userdata, mid)
当使用publish()调用要发送的消息完成传输到代理时被调用。对于QoS级别1和2的消息,这意味着已完成适当的握手。对于QoS 0,这意味着消息已经离开客户端。变量mid与从相应的publish()调用返回的变量mid相匹配,以便跟踪出站消息。
此回调很重要,因为即使publish()调用返回成功,也并不意味着消息已发送。
on_subscribe()
on_subscribe(client, userdata, mid, granted_qos)
当代理响应订阅请求时调用。变量 mid 与对应 subscribe() 调用返回的 mid 变量相匹配。变量 granted_qos 是一个整数列表,表示代理为每个不同的订阅请求授予的 QoS 级别。
on_unsubscribe()
on_unsubscribe(client, userdata, mid)
当代理响应取消订阅请求时调用。变量 mid 与对应 unsubscribe() 调用返回的 mid 变量相匹配。
on_log()
on_log(client, userdata, level, buf)
当客户端有日志信息时调用。用于调试,定义允许调试。变量 level 表示消息的严重性,可以是 MQTT_LOG_INFO、MQTT_LOG_NOTICE、MQTT_LOG_WARNING、MQTT_LOG_ERR 和 MQTT_LOG_DEBUG 之一。消息本身在 buf 中。
这可以与标准 Python 日志记录同时使用,可以通过 enable_logger 方法启用。
on_socket_open()
on_socket_open(client, userdata, sock)
当套接字已打开时调用。用于将套接字注册到外部事件循环以进行读取。
on_socket_close()
on_socket_close(client, userdata, sock)
当套接字即将关闭时调用。用于将套接字从外部事件循环中注销以进行读取。
on_socket_register_write()
on_socket_register_write(client, userdata, sock)
当对套接字的写入操作失败,例如输出缓冲区满时调用。用于将套接字注册到外部事件循环以进行写入。
on_socket_unregister_write()
on_socket_unregister_write(client, userdata, sock)
当套接字的写入操作在之前失败后成功时调用。用于将套接字从外部事件循环中注销以进行写入。
外部事件循环支持
loop_read()
loop_read(max_packets=1)
当套接字准备读取时调用。变量 max_packets 已过时,应保持未设置。
loop_write()
loop_write(max_packets=1)
当套接字准备写入时调用。变量 max_packets 已过时,应保持未设置。
loop_misc()
loop_misc()
每几秒钟调用一次以处理消息重试和心跳。
socket()
socket()
返回客户端正在使用的套接字对象,以允许与其他事件循环进行接口。此调用对于基于 select 的循环特别有用。请参阅 examples/loop_select.py。
want_write()
want_write()
如果还有等待写入的数据,则返回 true,以允许与其他事件循环接口。此调用对于基于 select 的循环特别有用。请参阅 examples/loop_select.py。
状态回调
on_socket_open on_socket_close on_socket_register_write on_socket_unregister_write
使用这些回调来获取有关套接字状态变化的通知。对于您在读取+写入时注册或注销套接字的事件循环特别有用。请参阅 examples/loop_asyncio.py 中的示例。
当套接字打开时,调用 on_socket_open。将套接字注册到您的事件循环以进行读取。
当套接字即将关闭时,调用 on_socket_close。从您的事件循环中注销套接字以进行读取。
当写入套接字失败,例如输出缓冲区满时,调用 on_socket_register_write。将套接字注册到您的事件循环以进行写入。
当下一次写入套接字成功时,调用 on_socket_unregister_write。从您的事件循环中注销套接字以进行写入。
回调始终按此顺序调用
on_socket_open
零次或多次
on_socket_register_write
on_socket_unregister_write
on_socket_close
全局辅助函数
客户端模块还提供了一些全局辅助函数。
topic_matches_sub(sub, topic) 可以用来检查一个 topic 是否与一个 subscription 匹配。
例如
主题 foo/bar 会匹配订阅 foo/# 或 +/bar
主题 non/matching 不会匹配订阅 non/+/+
connack_string(connack_code) 返回与 CONNACK 结果相关联的错误字符串。
error_string(mqtt_errno) 返回与 Paho MQTT 错误号相关联的错误字符串。
发布
本模块提供了一些辅助函数,以便以单次方式直接发布消息。换句话说,它们适用于你只想将单个/多个消息发布到代理,然后断开连接,不再需要其他操作的情况。
提供的两个函数是 single() 和 multiple()。
单个
向代理发布单个消息,然后干净地断开连接。
single(topic, payload=None, qos=0, retain=False, hostname="localhost",
port=1883, client_id="", keepalive=60, will=None, auth=None, tls=None,
protocol=mqtt.MQTTv311, transport="tcp")
发布单个函数参数
- topic
唯一的必需参数必须是将要发布负载的顶级字符串。
- payload
要发布的负载。如果为“”或 None,则发布一个零长度的负载。
- qos
发布时使用的 qos,默认为 0。
- retain
设置消息为保留(True)或非保留(False)。
- hostname
包含要连接的代理地址的字符串。默认为主机。
- port
连接到代理的端口。默认为 1883。
- client_id
要使用的 MQTT 客户端 ID。如果为“”或 None,Paho 库将自动生成客户端 ID。
- keepalive
客户端的 keepalive 超时值。默认为 60 秒。
- will
包含客户端 will 参数的字典
will = {‘topic’: “<topic>”, ‘payload’:”<payload”>, ‘qos’:<qos>, ‘retain’:<retain>}.
主题是必需的,所有其他参数都是可选的,默认分别为 None、0 和 False。
默认为 None,表示不使用 will。
- auth
包含客户端认证参数的字典
auth = {‘username’:”<username>”, ‘password’:”<password>”}
用户名是必需的,密码是可选的,如果未提供,则默认为 None。
默认为 None,表示不使用认证。
- tls
包含客户端 TLS 配置参数的字典
dict = {‘ca_certs’:”<ca_certs>”, ‘certfile’:”<certfile>”, ‘keyfile’:”<keyfile>”, ‘tls_version’:”<tls_version>”, ‘ciphers’:”<ciphers”>}
ca_certs 是必需的,所有其他参数都是可选的,如果未提供,则默认为 None,这将导致客户端使用默认行为 - 请参阅 paho.mqtt.client 文档。
默认为 None,表示不使用 TLS。
- protocol
选择要使用的 MQTT 协议版本。使用 MQTTv31 或 MQTTv311。
- transport
设置为“websockets”以通过WebSockets发送MQTT。保留默认值“tcp”以使用原始TCP。
发布单个示例
import paho.mqtt.publish as publish
publish.single("paho/test/single", "payload", hostname="mqtt.eclipse.org")
多个
向代理发布多个消息,然后干净地断开连接。
multiple(msgs, hostname="localhost", port=1883, client_id="", keepalive=60,
will=None, auth=None, tls=None, protocol=mqtt.MQTTv311, transport="tcp")
发布多个函数参数
- msgs
要发布的消息列表。每个消息可以是字典或元组。
如果为字典,则必须有顶级存在。对于任何缺失的参数,将使用默认值。字典必须具有以下形式
msg = {‘topic’:”<topic>”, ‘payload’:”<payload>”, ‘qos’:<qos>, ‘retain’:<retain>}
主题必须存在且不能为空。如果负载为“”,None 或不存在,则发布一个零长度的负载。如果 qos 不存在,则使用默认值 0。如果 retain 不存在,则使用默认值 False。
如果为元组,则必须具有以下形式
(“<topic>”, “<payload>”, qos, retain)
请参阅 single() 了解 hostname、port、client_id、keepalive、will、auth、tls、protocol、transport 的说明。
发布多个示例
import paho.mqtt.publish as publish
msgs = [{'topic':"paho/test/multiple", 'payload':"multiple 1"},
("paho/test/multiple", "multiple 2", 0, False)]
publish.multiple(msgs, hostname="mqtt.eclipse.org")
订阅
此模块提供了一些辅助函数,以便可以直观地订阅和处理消息。
提供的两个函数是 simple() 和 callback()。
简单
订阅一组主题并返回接收到的消息。这是一个阻塞函数。
simple(topics, qos=0, msg_count=1, retained=False, hostname="localhost",
port=1883, client_id="", keepalive=60, will=None, auth=None, tls=None,
protocol=mqtt.MQTTv311)
简单订阅函数参数
- topics
唯一必需的参数是客户端将订阅的主题字符串。这可以是一个字符串,也可以是一个字符串列表,如果需要订阅多个主题。
- qos
订阅时使用的qos,默认为0。
- msg_count
从代理程序中检索的消息数量。默认为1。如果为1,则返回单个 MQTTMessage 对象。如果 >1,则返回 MQTTMessages 列表。
- retained
设置为 True 以考虑保留消息,设置为 False 以忽略设置了保留标志的消息。
- hostname
包含要连接的代理地址的字符串。默认为主机。
- port
连接到代理的端口。默认为 1883。
- client_id
要使用的 MQTT 客户端 ID。如果为“”或 None,Paho 库将自动生成客户端 ID。
- keepalive
客户端的 keepalive 超时值。默认为 60 秒。
- will
包含客户端 will 参数的字典
will = {‘topic’: “<topic>”, ‘payload’:”<payload”>, ‘qos’:<qos>, ‘retain’:<retain>}.
主题是必需的,所有其他参数都是可选的,默认分别为 None、0 和 False。
默认为 None,表示不使用 will。
- auth
包含客户端认证参数的字典
auth = {‘username’:”<username>”, ‘password’:”<password>”}
用户名是必需的,密码是可选的,如果未提供,则默认为 None。
默认为 None,表示不使用认证。
- tls
包含客户端 TLS 配置参数的字典
dict = {‘ca_certs’:”<ca_certs>”, ‘certfile’:”<certfile>”, ‘keyfile’:”<keyfile>”, ‘tls_version’:”<tls_version>”, ‘ciphers’:”<ciphers”>}
ca_certs 是必需的,所有其他参数都是可选的,如果未提供,则默认为 None,这将导致客户端使用默认行为 - 请参阅 paho.mqtt.client 文档。
默认为 None,表示不使用 TLS。
- protocol
选择要使用的 MQTT 协议版本。使用 MQTTv31 或 MQTTv311。
简单示例
import paho.mqtt.subscribe as subscribe
msg = subscribe.simple("paho/test/simple", hostname="mqtt.eclipse.org")
print("%s %s" % (msg.topic, msg.payload))
使用回调
订阅一组主题,并使用用户提供的回调处理接收到的消息。
callback(callback, topics, qos=0, userdata=None, hostname="localhost",
port=1883, client_id="", keepalive=60, will=None, auth=None, tls=None,
protocol=mqtt.MQTTv311)
回调订阅函数参数
- callback
一个“on_message”回调,它将用于每个接收到的消息,形式为
def on_message(client, userdata, message)
- topics
客户端将订阅的主题字符串。这可以是一个字符串,也可以是一个字符串列表,如果需要订阅多个主题。
- qos
订阅时使用的qos,默认为0。
- userdata
用户提供的对象,将在收到消息时传递给 on_message 回调。
请参阅 simple() 了解 hostname、port、client_id、keepalive、will、auth、tls、protocol 的说明。
回调示例
import paho.mqtt.subscribe as subscribe
def on_message_print(client, userdata, message):
print("%s %s" % (message.topic, message.payload))
subscribe.callback(on_message_print, "paho/test/callback", hostname="mqtt.eclipse.org")
报告错误
请在 https://github.com/eclipse/paho.mqtt.python/issues 的问题跟踪器中报告错误。
更多信息
有关 Paho 客户端的讨论在 Eclipse paho-dev 邮件列表 上进行。
有关 MQTT 协议本身(不是此库)的通用问题在 MQTT Google Group 上讨论。
有关 MQTT 社区的更多信息可通过 MQTT 社区网站 获取。
发布流程
# get in a Python 3 venv git checkout ib/1.5 python setup.py bdist_wheel twine upload dist/* # then get in a Python 2 venv # and release it
项目详细信息
下载文件
下载适合您平台的文件。如果您不确定选择哪个,请了解有关 安装软件包 的更多信息。