跳转到主要内容

将函数包装起来,以在独立的线程/进程中运行。

项目描述

remoteit

具有 concurrent.futures 类型界面的基本多进程/多线程。

安装

pip install remoteit

使用方法

import time
import remoteit

@remoteit.remote
def do_something(x):
    time.sleep(1)
    return x * 2

@remoteit.threaded
def do_something_else(x):
    time.sleep(1)
    return x * 2


# simple - just call and get results

future = do_something(5)
future2 = do_something_else(6)

assert future.result() == 10
assert future2.result() == 12

# concurrent processes - run multiple together

# get results in order
futures = [do_something(i) for i in range(4)]
assert list(remoteit.results(futures)) == [0, 2, 4, 6]

# get results as soon as they finish
futures = [do_something(i) for i in range(4)]
assert set(remoteit.as_completed(futures)) == {0, 2, 4, 6}

描述 & 动机

这与 concurrent.futures 的不同之处在于

  • 它是一个单独的工作进程
  • 每次都会派生一个新的进程。

我制作了这个,而没有直接使用 concurrent.futures 的原因是我尝试提交一个引用了 Queue 对象的作业时遇到了错误,错误信息是关于某些对象只能通过继承来共享。

所以我回到了使用 multiprocessing.Process,但是传递和抛出远程异常非常麻烦,这是我发现我经常需要解决和解决的问题。所以我决定这将是最后一次思考消息需要如何传递。

此包基本上是对 multiprocessing.Processthreading.Thread 的薄包装,它将结果发送回来并抛出远程工作进程中的任何异常。超级简单!

注意事项 & 声明

Future 对象应由单个进程使用。
  • 目前,我通过生成随机ID以及将ID与结果一起返回来处理多个并发Future+结果。
  • 然后如果另一个Future获取了不匹配其result_id的结果,它将结果放入一个结果字典中,该字典由其他Future检查其result_ids。
  • 但问题是字典在进程之间不可共享,所以如果你在两个不同的进程中从结果队列中读取项目,则会导致死锁,因为一个进程可能会弹出另一个进程的结果,并且Future将永远看不到其结果。
  • 如果有任何关于如何在不产生太多开销的情况下修复此问题的想法,请发布一个issue!!

缺失接口

  • fut.cancel() - 取消任务。我们尚未处理已取消的结果
  • fut.result(timeout) - 目前我们没有结果超时。
  • fut.add_done_callback() - 我们没有运行结果的监控线程。目前最好直接调用 future.result()

项目详情


下载文件

下载适合您平台的文件。如果您不确定选择哪个,请了解有关安装包的更多信息。

源分发

remoteit-0.0.1.tar.gz (3.9 kB 查看哈希值)

上传时间

由以下支持