跳转到主要内容

KNXD的纯Python异步客户端,用于其本地第4层

项目描述

KNXDclient

KNXD(EIBD)的纯Python异步客户端,用于其本地第4层KNX协议。

EIBD是一个*NIX守护进程,用于通过各种接口路由EIB/KNX报文,并程序化访问EIB/KNX总线。它是已不再维护的BCUSDK的一部分。然而,有一个名为KNXD的分支,仍在积极维护。

此软件包基于异步协程(asyncio)以纯Python 3重实现了EIBD客户端的小部分功能(参见BCUSDK文档,第7.7节)。目前,它允许通过KNXD打开组套接字,以发送和接收任何组地址的KNX报文。此外,此软件包还包括将发送/接收数据从/转换为已知KNX数据点类型(DPT)的辅助方法encode_value()decode_value()

使用示例

import asyncio
import knxdclient


def handler(packet: knxdclient.ReceivedGroupAPDU) -> None:
    print("Received group telegram: {}".format(packet))

async def main() -> None:
    # Raises a TimeoutError after 30 seconds of not receiving any traffic. This argument is optional
    connection = knxdclient.KNXDConnection(timeout=30.0) 
    connection.set_group_apdu_handler(handler)
    await connection.connect()
    # Connection was successful. Start receive loop:
    run_task = asyncio.create_task(connection.run())
    # Now that the receive loop is running, we can open the KNXd Group Socket:
    await connection.open_group_socket()

    # Startup completed. Now our `handler()` will receive incoming telegrams and we can send some:
    await connection.group_write(knxdclient.GroupAddress(1,3,2),
                                 knxdclient.KNXDAPDUType.WRITE,
                                 knxdclient.encode_value(True, knxdclient.KNXDPT.BOOLEAN))
    
    await asyncio.sleep(5)
    # Let's stop the connection and wait for graceful termination of the receive loop:
    await connection.stop()
    await run_task


asyncio.run(main())

或者,可以使用异步迭代器接收组报文

import asyncio
import knxdclient

# KNX Datapoint Types of some known group addresses, used for decoding incoming values and encoding group RESPONSE
DPTs = {
    knxdclient.GroupAddress(1, 2, 3): knxdclient.KNXDPT.BOOLEAN,
    knxdclient.GroupAddress(4, 5, 6): knxdclient.KNXDPT.FLOAT16,
}

# Some values for responding to group READ telegrams
CURRENT_VALUE = {
    knxdclient.GroupAddress(1, 2, 3): True
}


async def main() -> None:
    connection = knxdclient.KNXDConnection()
    await connection.connect()
    try:
        # Start run task and open group socket
        run_task = asyncio.create_task(connection.run())
        await connection.open_group_socket()

        # Iterate asynchronously over incoming group telegrams
        packet: knxdclient.ReceivedGroupAPDU
        async for packet in connection.iterate_group_telegrams():
            # Respond to GROUP READ telegrams with known values from CURRENT_VALUE dict
            if packet.payload.type == knxdclient.KNXDAPDUType.READ:
                address = packet.dst
                if address in CURRENT_VALUE:
                    await connection.group_write(address,
                                                 knxdclient.KNXDAPDUType.RESPONSE,
                                                 knxdclient.encode_value(CURRENT_VALUE[address], DPTs[address]))
            # Decode and log incoming group WRITE and RESPONSE telegrams 
            else:
                if packet.dst not in DPTs:
                    # Skip telegrams with unknown datatype
                    continue
                value = knxdclient.decode_value(packet.payload.value, DPTs[packet.dst])
                print(f"Telegram from {packet.src} to GAD {packet.dst}: {value}")

    finally:
        # Let's stop the connection and wait for graceful termination of the receive loop:
        await connection.stop()
        await run_task


asyncio.run(main())

许可证

此软件包根据Apache许可证2.0条款发布。

项目详情


下载文件

下载适用于您平台上的文件。如果您不确定选择哪个,请了解更多关于 安装包 的信息。

源代码分发

knxdclient-1.1.0.tar.gz (22.4 kB 查看哈希值)

上传时间 源代码

构建分发

knxdclient-1.1.0-py3-none-any.whl (20.0 kB 查看哈希值)

上传时间 Python 3

支持