跳转到主要内容

Python轻量级通信和打包(LCM)工具库。

项目描述

lcmutils

Python轻量级通信和打包(LCM)工具库。

安装

pip install lcmutils

使用

类型

lcmutils 定义了通过 lcm-gen 生成的 LCM 类型类的 LCMType 协议。您可以使用此协议在 Python 代码中对 LCM 类型类进行类型提示。

from lcmutils import LCMType

def publish_lcm_message(lcm: LCM, message: LCMType) -> None:
    lcm.publish("channel", message.encode())

LCM守护进程

对于许多应用程序,在后台线程中运行 LCM 处理是理想的。lcmutils 提供了一个 LCMDaemon 类,它包装一个 LCM 实例,并在后台线程中反复调用其 handle 方法。

from lcmutils import LCMDaemon

lcm = LCMDaemon()

@lcm.subscribe("channel1", "channel2")
def handle_message(channel: str, data: bytes) -> None:
    print(f"Received message {data} on channel {channel}")

lcm.start()
...
lcm.stop()  # will stop when the next message is handled

默认情况下,LCMDaemon 实例将使用阻塞的 LCM.handle 方法。如果您想使用 LCM.handle_timeout 方法,可以向构造函数传递一个 timeout_millis 参数。您还可以传递一个 start 参数来自动启动守护进程。

from lcmutils import LCMDaemon

# Use LCM.handle_timeout with a 100ms timeout and start the daemon
lcm = LCMDaemon(timeout_millis=100, start=True)
...
lcm.stop()  # will stop within 100ms

LCMDaemon 可以传递一个现有的 LCM 实例以进行包装,如果没有提供,它将创建一个新的 LCM 实例。

from lcm import LCM
from lcmutils import LCMDaemon

lcm = LCM("udpm://239.255.76.67:7667?ttl=1")  # create an LCM instance with ttl=1
lcm_daemon = LCMDaemon(lcm, start=True)
...
lcm_daemon.stop()

类型注册表

lcmutils 提供了一个 LCMTypeRegistry 类,可以用于通过指纹注册和查找 LCM 类型类。

from lcmutils import LCMTypeRegistry
from my_lcm_types import MyLCMType

registry = LCMTypeRegistry(MyLCMType)
# or:
#   registry = LCMTypeRegistry()
#   registry.register(MyLCMType)

msg = MyLCMType()
msg_encoded = msg.encode()

print(registry.detect(msg_encoded))         # MyLCMType
print(registry.decode(msg_encoded) == msg)  # True

项目详情


下载文件

下载适用于您的平台的文件。如果您不确定选择哪个,请了解有关 安装包 的更多信息。

源分发

lcmutils-0.1.1.tar.gz (3.4 kB 查看哈希)

上传时间

构建分发

lcmutils-0.1.1-py3-none-any.whl (4.5 kB 查看散列值)

上传时间 Python 3

由以下支持