跳转到主要内容

促进跨进程编程。

项目描述

remoteobj

与跨进程的对象交互。

这使用multiprocessing管道从主进程发送消息和操作到远程进程,并返回返回值。

基本上,这允许对发送到另一个进程的对象进行诸如调用.close().pause(),或.ready等操作。但它也支持更多功能!

包含内容

  • 代理:与通过fork的对象通信的远程代理。这是此包的初始目的
    class Blah:
        def __init__(self):
            self.remote = remoteobj.Proxy(self)
        def say_hi(self):
            print('hi from', mp.current_process().name)
        def run(self):  # remote process function
            with self.remote.listen_():
                while True:
                    ... # do other stuff
                    self.remote.process_requests()
    
    obj = Blah()
    mp.Process(target=obj.run).start()
    obj.remote.wait_until_listening()
    obj.remote.say_hi()
    # hi from Process-1
    ...
    
  • util.process:比multiprocessing.Process更少障碍的版本。使代码更像是常规Python。处理远程异常和返回/生成值。
    def func(x):
        yield from range(x)
        raise ValueError('error from {}'.format(
           mp.current_process().name))
    
    try:
       with remoteobj.util.process(func, 4) as p:
           for i in p.result:
               print(i)
    except ValueError as e:
       print('woah!!', e)
    # prints 0, 1, 2, 3, woah!! error from Process-1
    
  • 异常:捕获远程异常、将它们分配到不同的组并将它们发送回主进程的上下文管理器
    exc = remoteobj.Except()
    
    def remote_func(exc):
        with exc(raises=False):
            try:
                with exc('init'):
                    do_init()
                with exc('work'):
                    do_work()
            finally:
                with exc('cleanup'):
                    do_cleanup()
    
    p = mp.Process(target=remote_func, args=(exc,))
    p.start()
    ...
    p.join()
    exc.raise_any('work')  # if none, its a noop
    exc.raise_any('cleanup')
    exc.raise_any()  # will raise the last caught exception
    
  • 局部异常:与上下文管理器接口相同,但没有进程间通信
    exc = remoteobj.LocalExcept()
    
    with exc(raises=False):
        try:
            with exc('init'):
                do_init()
            do_other_stuff()
            with exc('work'):
                do_work()
        finally:
            with exc('cleanup'):
                do_cleanup()
    
    exc.raise_any('work')
    exc.raise_any('cleanup')
    exc.raise_any()
    

注意: 这仍然是alpha版本,接口仍在演变(尽管事物开始稳定),所以请确保锁定版本或保持对更改的了解。另外,如果有什么想法/建议/请求,请告诉我!有关如何工作的信息,请参阅这里

安装

pip install remoteobj

完整示例(代理)

这里有一个我们希望在独立进程中运行的对象。我们希望在对象运行时能够获取/设置其状态,因此我们将其封装在一个远程的Proxy对象中,该对象将在远程进程的背景中运行并执行我们在主进程中发出的调用。

import time
import remoteobj
import multiprocessing as mp

class Object:
    '''Remote object to do some counting.'''
    def __init__(self):
        self.remote = remoteobj.Proxy(self)

    count = 0
    on, stop = False, False
    def toggle(self, state=True):
        self.on = state

    def run(self):
        '''Remote process'''
        # starts thread to execute requests
        with self.remote.listen_(bg=True):
            while not self.stop and self.remote.listening_:
                if self.on:
                    self.count += 1
                time.sleep(0.1)

# start process

obj = Object()

p = mp.Process(target=obj.run)
p.start()
# make sure we've started up
# pass p so we aren't left hanging if the process dies
obj.remote.wait_until_listening(p)

# turn on, get starting count, count should be increasing

obj.remote.toggle(True)
x = obj.remote.count.get_()
time.sleep(1)
assert obj.remote.count.get_() > x  # 10 > 1

# turn off, count should stay the same

obj.remote.toggle(False)
x = obj.remote.count.get_()
time.sleep(1)
assert obj.remote.count.get_() == x  # 10 == 10

# turn back on, count should increase again

obj.remote.toggle(True)
x = obj.remote.count.__  # alias for get_()
time.sleep(1)
assert obj.remote.count.__ > x  # 20 > 10

# set the remote stop attribute to exit and join
obj.remote.stop = True  # you can set attrs too?? (゚ロ゚*)
p.join()

基本用法

代理

这更详细地解释了正在进行的机制。

注意:为了避免名称冲突,许多代理方法使用下划线后缀(例如 get_())。这也适用于util.process参数。下面将进行说明。

import remoteobj

# building some arbitrary object
class Idk(list):
    something = 5
    another = []

obj = Idk()

# creating a remote proxy to interact with
# we want to make sure that the proxy gets
# sent along with it so we can handle remote
# requests.
r_obj = obj.remote = remoteobj.Proxy(obj)

# ... now send obj to mp.Process and start listener thread ...

# then meanwhile back in the *main* process

# call a method
r_obj.append(5)
# now the remote object has 5 appended to it
assert len(r_obj) == 1

# getting an attribute returns a proxy so you can chain
assert isinstance(r_obj.another.append, remoteobj.Proxy)

# calling will automatically resolve a proxy
r_obj.another.append(6)
# now another == [6]

# you can access keys, but we still allow chaining
# so they're proxies too
assert isinstance(r_obj[0], remoteobj.Proxy)
assert isinstance(r_obj.another[0], remoteobj.Proxy)

# to make the request and get the value, do

assert remoteobj.get( r_obj[0] ) == 5
# or more concisely
assert r_obj[0].__ == 5
# or if you prefer a less witch-y way
assert r_obj[0].get_() == 5

# you can even set keys and attributes on remote objects
r_obj[0] = 6
r_obj.something = 10


# len() is a special case, but in most cases, you can't
# just blindly pass a remote object to a function and
# expect it to work all the time.
assert str(r_obj) == <Remote <Idk object at ...> : (?)>
assert r_obj.passto(str) == <Idk object at ...>
# the first is the local instance, while the second is the
# remote instance. To pass an object to a function to execute
# on the remote side, use `passto` which will pickle the function
# reference.
# So you probably wouldn't want to pass an object method like that
# because it would pickle and unpickle the object instance which
# is most likely **not** what you want.
# but I'm not your mom.

注意:不能获取/设置以下划线开头的属性。所有以下划线属性都引用代理对象本身。

现在在远程端

def run(obj):
    # indicate that we're listening
    with obj.remote:
        # do whatever nonsense you need
        value = 0
        while True:  # do nonsense
            for x in obj:
                value = x * obj.something
            for x, y in zip(obj, obj.another):
                value -= y / x * obj.something
            time.sleep(0.4)
            # this will make the requests
            obj.remote.process_requests()
    # after exiting, listening is set to false,
    # clients will fail or return their default
    # immediately because we have notified that
    # we will not be processing any more requests.

# Here's how to run the listener in a background thread

def run(obj):
    # start background thread to listen.
    with obj.remote.listen_():
        while obj.remote.listening_:
            ...
            # don't need to call obj.remote.process_requests()

代理操作

这些是远程视图可以处理的操作,涵盖了从对象访问信息的主要方式。如果还有其他遗漏,请告知。

注意:任何声明返回Proxy的操作都可以进行链式调用。如果一个远程方法返回self,将会捕获它并返回一个远程代理,以便进行链式调用。

  • __call__ (obj(*a, **kw)):获取返回值。
    • 要返回代理,可以执行以下操作之一
      • Proxy(obj, eager_proxy=True)以获取所有代理或
      • obj.method(_proxy=True)进行一次性操作
  • __getitem__ (obj[key]):返回代理
  • __getattr__ (obj.attr):返回代理
  • __setitem__ (obj[key] = value):评估
  • __setattr__ (obj.attr = value):评估
  • __delitem__ (del obj[key]):评估
  • __delattr__ (del obj.attr):评估
  • __contains__ (x in obj):评估
  • len (len(obj)):评估
  • passto (func(obj)):将对象传递给一个函数
    • 例如,obj.passto(str)等同于str(obj)
    • 也可以传递参数:obj.passto(func, *a, **kw)

要解析代理,可以执行以下三种等效的样式之一

  • remoteobj.get(obj.attr, default=False) - 使obj.attr被发送到主进程的情况更加清晰
  • obj.attr.get_(default='asdf') - 通过链式访问 - 方便,有些清晰
  • obj.attr.__ - 尝试简化接口,不处理默认值,不够清晰。一旦你知道其含义,它就很容易看懂,但我同意这种模糊性是一个问题。

增强进程

有时multiprocessing.Process在接口方面有些不足,所以我编写了一个轻量级的包装器,

  • 具有更干净的签名 - process(func, *a, **kw) => func(*a, **kw)
    • 其他参数,如namegroup,具有下划线后缀(例如name_group_daemon_
  • 可以用作上下文管理器with process(...):,在退出时将加入
  • 默认从函数名中获取进程名
  • 默认为daemon_=True
  • 将抛出远程异常(使用remoteobj.Except()
  • 通过p.result发送回returnyield
def remote_func(x, y):
    time.sleep(1)
    return 10

with remoteobj.util.process(remote_func, 5, 2) as p:
    assert p.name == 'remote_func-1'
    assert p.result == None  # called before return
    ... # do some other stuff

# now the process has joined
# and we can access the return value of the process!
assert p.result == 10


# now...
# wait for it....

def remote_func(x, y):
    for i in range(x, y):
        yield i

with remoteobj.util.process(remote_func, 5, 10) as p:
    # p.result returns a generator which will yield the values from the
    # remote side
    a_generator = p.result
    assert list(a_generator) == list(range(5, 10))

发送进程异常

从另一个进程发送异常始终很痛苦,因为你必须处理所有进程间通信的框架,设置队列等,这可能会使你的代码变得混乱。

Except类允许你捕获异常并将其添加到不同的命名组中。如果你需要区分设置错误、处理错误或清理错误,这将很有用。

工作原理:定义一个Except对象。在远程进程中使用catch作为上下文管理器,在此上下文中引发的任何匹配异常将与它的堆栈跟踪一起序列化并附加到其队列中。

# define an exception handler
catch = remoteobj.Except()
# or be more specific
catch = remoteobj.Except(ValueError, TypeError)

def remote_process(catch):
    with catch:
        raise ValueError('!!!')
    with catch('hi'):  # named exception contexts
        raise TypeError('hi')

p = mp.Process(target=remote_process, args=(catch,))
p.start()
p.join()
catch.raise_any('hi')  # will raise hi
# or
catch.raise_any()  # will raise any exception
# or
catch.raise_any(None)  # will raise any exception in the default context

本地异常

我们可以使用相同的语法和上下文机制,而不需要进程间通信来捕获本地错误。

# define an exception handler
catch = remoteobj.LocalExcept(raises=True)

try:
    with catch:
        raise ValueError('!!!')
except:
    with catch('hi', raises=False):
        raise TypeError('hello')

catch.raise_any('hi')
catch.raise_any()

Proxy的工作原理

我们覆盖了基本的Python操作符,以便它们返回表示操作链(ProxyView对象)的对象。

  • View对象表示操作链 - 这主要是内部接口。
  • Proxy对象表示与对象关联的操作链。

当我们去解析操作链时,我们

  • 首先获取一个锁,这样监听状态就不会改变,并且不能同时发起其他请求。
  • 检查远程实例是否正在监听
  • 我们通过管道发送操作集,然后等待值从另一侧出来
  • 返回后,我们检查返回值,引发任何异常,然后返回。

在远程端,我们

  • 轮询连接,检查操作请求,一旦找到其中一个
  • 获取一个写锁
  • 在代理对象上评估视图
  • 处理异常,然后将结果和异常放入管道以发送回去

如果没有监听进程,则将返回默认值(如果你通过get_(default=False)提供了默认值)或者引发RuntimeError

在远程进程启动时调用proxy.wait_until_listening()是有用的,这样你不会因为监听器还没有启动而得到RuntimeError

如果远程对象在监听进程相同的进程中调用,则它将绕过管道并直接评估。这意味着如果你使用线程而不是进程,则不会通过管道发送数据。

高级

import remoteobj

class A:
    def __init__(self):
        self.remote = remoteobj.Proxy(self)

    def asdf(self):
        return 5

class B(A):
    x = 0
    def asdf(self):
        return 6

    def chain(self):
        x += 1
        return self

obj = B()

访问super()

# call super method
assert obj.remote.asdf() == 6
assert obj.remote.super.asdf() == 6
# is equivalent to: super(type(obj), obj).asdf()

远程方法链

一个常见的模式是让函数返回self,以便你可以链接方法。但是,当你从另一个进程发送对象时,这不起作用,因为它会被序列化,并且它将不再是同一个对象。

因此有一个特殊情况 - 如果返回值是self,它将标记它,在另一端,它将返回基本的代理对象。

# remote chaining
assert obj.remote.x.__ == 0  # check start value
assert obj.remote.chain().chain().chain().x.__ == 3

# equivalent to doing this locally
assert obj.x == 0
assert obj.chain().chain().chain().x == 3

死锁

在处理并发编程时,你总是必须担心你的程序会死锁。

死锁可能成为问题的领域之一是如果客户端进程开始请求操作,而监听进程开始清理。

为了防止这种情况,当监听进程关闭时,它将完成任何未完成的请求(默认行为)或拒绝它们(Proxy(fulfill_final=False))。

项目详情


下载文件

下载适用于您平台的文件。如果您不确定该选择哪个,请了解更多关于安装包的信息。

源代码分发

remoteobj-0.4.0.tar.gz (24.5 kB 查看哈希值)

上传时间 源代码

支持者