跳转到主要内容

Async ModBus python库

项目描述

Async ModBus

Async Modbus Python Versions CI Status License pre-commit

异步(即python async/await)modbus python 3客户端库。在备受尊敬的umodbus库之上提供一个薄层,提供面向对象的客户端API。

async_modbus与异步库无关。您应该能够使用它与asynciocuriotrioanyio一起使用。

它期望一个具有与StreamReaderStreamWriter相同接口的对象,因此如果您不使用asyncio,您可能需要编写一个薄层包装器。下面的示例显示了如何与curio一起使用它。

注意:`modbus_for_url()`依赖于connio库,该库依赖于asyncio事件循环,因此它的使用仅限于asyncio应用程序。

为什么还需要另一个modbus库?

此库不是ModBus通信协议的重新实现。您可以将它视为umodbus库的补充。

以下是async_modbus在umodbus之上提供的内容

  • 简单而强大的面向对象API
  • 方便的`modbus_for_url()`辅助函数。输入一个URL,您就可以开始了。
  • 在适当的情况下,使用numpy数组。其使用不仅减少了内存占用并提高了速度,而且还使得用户能够高效地重新格式化数据。
  • conniosockioserialio库兼容,这些库提供了透明的套接字重新连接等功能。

安装

在您喜欢的Python环境中输入

$ pip install async_modbus

如果已安装,将使用Numpy。您可以自行安装,或者包含可选依赖项

$ pip install async_modbus[numpy]

async_modbus库的核心是一个modbus_for_url()函数和两个类AsyncTCPClientAsyncRTUClient

以下是一些示例

asyncio示例

简单的TCP客户端

import asyncio

import numpy

from async_modbus import modbus_for_url


async def main():

    client = modbus_for_url("tcp://localhost:15020")

    values = numpy.array([1, 0, 1, 1])  # would also work with list<bool or int>
    reply = await client.write_coils(slave_id=1, starting_address=1, values=values)
    assert reply is len(values)

    reply = await client.read_coils(slave_id=1, starting_address=1, quantity=len(values))
    assert (reply == values).all()


asyncio.run(main())

使用自定义串行选项的本地串行线上的RTU

import asyncio

from async_modbus import modbus_for_url


async def main():

    client = modbus_for_url("serial:///dev/ttyS0", {"baudrate":19200, "parity":"E"})

    values = [1, 0, 1, 1]
    reply = await client.write_coils(slave_id=1, starting_address=1, values=values)
    assert reply is len(values)

    reply = await client.read_discrete_inputs(slave_id=1, starting_address=1, quantity=len(values))
    assert (reply == values).all()


asyncio.run(main())

使用RFC2217的远程串行线上的RTU

import asyncio

from async_modbus import modbus_for_url


async def main():

    client = modbus_for_url("rfc2217://moxa.acme.org:6610")

    values = [1, 0, 1, 1]
    reply = await client.write_coils(slave_id=1, starting_address=1, values=values)
    assert reply is len(values)

    reply = await client.read_discrete_inputs(slave_id=1, starting_address=1, quantity=len(values))
    assert (reply == values).all()


asyncio.run(main())

asyncio TCP流

import asyncio

import numpy

from async_modbus import AsyncTCPClient


async def main():

    reader, writer = await asyncio.open_connection('localhost', 15020)
    client = AsyncTCPClient((reader, writer))

    values = numpy.array([0, 2**15 - 1, 10, 3, 32766])
    reply = await client.write_registers(slave_id=1, starting_address=1, values=values)
    assert reply is len(values)

    reply = await client.read_holding_registers(slave_id=1, starting_address=1, quantity=len(values))
    assert (reply == values).all()

    writer.close()
    await writer.wait_closed()


asyncio.run(main())

使用远程原始TCP的异步串行行RTU

import asyncio

import numpy

from async_modbus import AsyncRTUClient
from serial_asyncio import open_serial_connection


async def main():

    reader, writer = await open_serial_connection(url="socket://moxa.acme.org:6610")
    client = AsyncRTUClient((reader, writer))

    values = [0, 2**15 - 1, 10, 3, 32766]
    reply = await client.write_registers(slave_id=1, starting_address=1, values=values)
    assert reply is len(values)

    reply = await client.read_input_registers(slave_id=1, starting_address=1, quantity=len(values))
    assert (reply == values).all()

    writer.close()
    await writer.wait_closed()


asyncio.run(main())

curio示例

curio TCP流

import curio
from async_modbus import AsyncTCPClient


async def main():

    sock = await curio.open_connection("0", 15020)
    client = AsyncTCPClient(sock.as_stream())

    values = [1, 0, 1, 1]
    reply = await client.write_coils(slave_id=1, starting_address=1, values=values)
    assert reply is len(values)

    reply = await client.read_coils(slave_id=1, starting_address=1, quantity=len(values))
    assert (reply == values).all()

    await sock.close()

致谢

开发负责人

贡献者

暂无。为何不成为第一个呢?

特别感谢

项目详情


下载文件

下载适合您平台的应用程序。如果您不确定选择哪个,请了解有关安装包的更多信息。

源代码分发

此版本中没有可用的源代码分发文件。请参阅生成分发存档的教程

构建分发

async_modbus-0.2.1-py3-none-any.whl (10.9 kB 查看散列值)

上传时间 Python 3

由以下机构支持