KNXD的纯Python异步客户端,用于其本地第4层
项目描述
KNXDclient
KNXD(EIBD)的纯Python异步客户端,用于其本地第4层KNX协议。
EIBD是一个*NIX守护进程,用于通过各种接口路由EIB/KNX报文,并程序化访问EIB/KNX总线。它是已不再维护的BCUSDK的一部分。然而,有一个名为KNXD的分支,仍在积极维护。
此软件包基于异步协程(asyncio)以纯Python 3重实现了EIBD客户端的小部分功能(参见BCUSDK文档,第7.7节)。目前,它允许通过KNXD打开组套接字,以发送和接收任何组地址的KNX报文。此外,此软件包还包括将发送/接收数据从/转换为已知KNX数据点类型(DPT)的辅助方法encode_value()
和decode_value()
。
使用示例
import asyncio
import knxdclient
def handler(packet: knxdclient.ReceivedGroupAPDU) -> None:
print("Received group telegram: {}".format(packet))
async def main() -> None:
# Raises a TimeoutError after 30 seconds of not receiving any traffic. This argument is optional
connection = knxdclient.KNXDConnection(timeout=30.0)
connection.set_group_apdu_handler(handler)
await connection.connect()
# Connection was successful. Start receive loop:
run_task = asyncio.create_task(connection.run())
# Now that the receive loop is running, we can open the KNXd Group Socket:
await connection.open_group_socket()
# Startup completed. Now our `handler()` will receive incoming telegrams and we can send some:
await connection.group_write(knxdclient.GroupAddress(1,3,2),
knxdclient.KNXDAPDUType.WRITE,
knxdclient.encode_value(True, knxdclient.KNXDPT.BOOLEAN))
await asyncio.sleep(5)
# Let's stop the connection and wait for graceful termination of the receive loop:
await connection.stop()
await run_task
asyncio.run(main())
或者,可以使用异步迭代器接收组报文
import asyncio
import knxdclient
# KNX Datapoint Types of some known group addresses, used for decoding incoming values and encoding group RESPONSE
DPTs = {
knxdclient.GroupAddress(1, 2, 3): knxdclient.KNXDPT.BOOLEAN,
knxdclient.GroupAddress(4, 5, 6): knxdclient.KNXDPT.FLOAT16,
}
# Some values for responding to group READ telegrams
CURRENT_VALUE = {
knxdclient.GroupAddress(1, 2, 3): True
}
async def main() -> None:
connection = knxdclient.KNXDConnection()
await connection.connect()
try:
# Start run task and open group socket
run_task = asyncio.create_task(connection.run())
await connection.open_group_socket()
# Iterate asynchronously over incoming group telegrams
packet: knxdclient.ReceivedGroupAPDU
async for packet in connection.iterate_group_telegrams():
# Respond to GROUP READ telegrams with known values from CURRENT_VALUE dict
if packet.payload.type == knxdclient.KNXDAPDUType.READ:
address = packet.dst
if address in CURRENT_VALUE:
await connection.group_write(address,
knxdclient.KNXDAPDUType.RESPONSE,
knxdclient.encode_value(CURRENT_VALUE[address], DPTs[address]))
# Decode and log incoming group WRITE and RESPONSE telegrams
else:
if packet.dst not in DPTs:
# Skip telegrams with unknown datatype
continue
value = knxdclient.decode_value(packet.payload.value, DPTs[packet.dst])
print(f"Telegram from {packet.src} to GAD {packet.dst}: {value}")
finally:
# Let's stop the connection and wait for graceful termination of the receive loop:
await connection.stop()
await run_task
asyncio.run(main())
许可证
此软件包根据Apache许可证2.0条款发布。
项目详情
下载文件
下载适用于您平台上的文件。如果您不确定选择哪个,请了解更多关于 安装包 的信息。
源代码分发
knxdclient-1.1.0.tar.gz (22.4 kB 查看哈希值)
构建分发
knxdclient-1.1.0-py3-none-any.whl (20.0 kB 查看哈希值)
关闭
knxdclient-1.1.0.tar.gz 的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 433ad64933850ff8b6ea673772567f575a6bcd7a5920f92b53b030cd36ac6f6c |
|
MD5 | ab1aa5fd4740ca95a1dd7eb25a443aad |
|
BLAKE2b-256 | ecef841b3331a6f0834d99fdfed1ff8e062d861c7d9701a7391b75519d6bfe7f |
关闭
knxdclient-1.1.0-py3-none-any.whl 的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | a9fa49adc605d01b69ae797b477e04794708460ff315ed8b8308b9a492336902 |
|
MD5 | 59cc1ad13f57c6aa4f686f1ec1b56187 |
|
BLAKE2b-256 | 0665332ef611caa1232702eb60cca280ea179a7bf0db75868ca4e4ebde2d0b7b |