跳转到主要内容

用于不平衡帧大小的流缓冲和事件触发。

项目描述

ocycle

用于不平衡帧大小的流缓冲和事件触发。数据以一定的帧大小进入,并以不同的大小输出。

它支持

  • 线程/多进程回调(除了串行执行外)
    • 创建了一个进程/线程池,将回调发送到该池。
  • 始终可用的,可回收的缓冲区
    • 当将缓冲区传递给异步回调时,将创建一个新的缓冲区用于写入,而另一个被使用。
    • 一旦回调函数完成,它将被添加回堆栈,以便可以重复使用。
  • 时间采样 - 您可以传递一个函数或标量,该函数定义在发出回调后多长时间再次开始写入。
    • 例如,如果您想获得50%的覆盖率,可以传递 sampler=size / fs,这将等待 size / fs 秒再次写入。

示例

import ocycle

# this is some random data that represents our
# incoming data buffer
SOME_DATA = 'asdfghjkl;'

# how full should the buffer be before emitting data
CONSUME_SIZE = 29

# a data consumer
def create_consumer(size):
    # this is called when the buffer reaches/exceeds size.
    def callback(value, t0):
        assert len(value) >= CONSUME_SIZE
        print('Emitting:', t0, value)

    return ocycle.BufferEmit(callback, size)

buff = create_consumer(CONSUME_SIZE)

# a generator then writes to the buffer.
# once it is full enough it will call `callback(buff, t0)`
while True:
    print('Writing to buffer:', SOME_DATA)
    buff.write(SOME_DATA)

示例用例

音频处理

您想从麦克风流式传输音频,并对该流执行多个独立任务。每个特征提取器都需要不同大小的音频块。

例如,您想

  • 提取SPL(每0.5秒)
  • 写入加密的flac文件(每10秒)
  • 运行tensorflow模型(每2秒)

而不是使用单个块大小收集数据,然后手动跟踪并每次每个缓冲区填满时启动新进程,BufferEmit 将为您处理所有这些。

用法

回调

您需要传入一个回调函数,callback,该函数将在缓冲区超出大小size时被调用。这将接收已填充的缓冲区和第一个缓冲区帧到达的时间戳。

您还可以传递一个额外的回调on_done,该回调将在主进程中调用,如果您需要在内存中的某个对象上存储回调的结果,则非常有用。

执行方法

您可以选择如何执行回调

  • mode='serial':回调将在执行线程/进程中运行
  • mode='thread':回调将在新线程中运行
  • mode='process':回调将在新进程中运行

您可以通过传递例如npool=10来使用10线程/进程工作池。

值类型

值类型

默认情况下,回调将接收一个bytes和时间戳对象。如果您传递asbuffer=True,则它将传递io.BytesIO而不调用buff.getvalue()

剪辑

默认情况下,回调将在缓冲区超出size后接收整个缓冲区。如果您不想剪辑部分消息,这很有用,但这也意味着len(buff) >= size。如果您传递clip=True,它将返回大小为size的精确块。

采样器

在某些情况下,您可能不希望获取所有数据,只想在一段时间内取一些子样本。

例如,假设您的机器学习模型不是实时模型。您已经尽力而为,但只能将其降低到1.7倍。您可以定义一个采样器,该采样器会在您发出一个缓冲区后等待x秒,然后再次开始向缓冲区写入。因此,您可以设置sampler=1.0,它将在再次开始写入缓冲区之前暂停1秒。

采样器也可以是任何不需要参数并且返回int/float的可调用对象。如果您想实现随机/动态采样策略,这很有用。例如,您可以设置

sampler=lambda: (np.random.randn() * 1.3 + 4)

这将延迟使用均值为4和标准差为1.3的正态分布。

配置示例

这显示了BufferEmit可以存在的不同配置。

待办事项:有不同缓冲区大小的示例将更有说明性...我不知道我在想什么 lol

import time
import ocycle

# build a list of receivers

buffers = []
SIZE = 15

def on_emit(label): # callback
    return lambda buff, t0: print(
        label, buff.getvalue(), t0)

# default operation
buffers.append(ocycle.BufferEmit(
    on_emit('default'), size=SIZE))

# execute the callback in a thread
buffers.append(ocycle.BufferEmit(
    on_emit('im in a thread!'),
    size=SIZE, mode='thread'))

# the callback for multiprocessing must be pickleable
def on_process_emit(buff, t0): # callback
    x = buff.getvalue()
    print('Im in a process!', x, t0)
    return x[::-1]

# execute the callback in a separate process
# execute `on_done` in the main process
buffers.append(ocycle.BufferEmit(
    on_process_emit,
    on_done=lambda res:
        print('The process returned:', res),
    size=SIZE, mode='process'))

# send data to each receiver

def generate_data(length=4, n=1):
    for _ in range(n):
        for i in range(10):
            yield str(i).encode('utf-8') * length

for x in generate_data(4):
    print('writing', x)
    for b in buffers:
        b.write(x)
    time.sleep(1)

输出

writing b'0000'
writing b'1111'
writing b'2222'
writing b'3333'
default b'0000111122223333' 1585253921.2961438
im in a thread! b'0000111122223333' 1585253921.296198
gets value, not buffer object b'0000111122223333' 1585253921.2967098
value clipped to size b'000011112222333' 1585253921.296758
Im in a process! b'0000111122223333' 1585253921.296656
The process returned: b'3333222211110000'
writing b'4444'
writing b'5555'
writing b'6666'
writing b'7777'
default b'4444555566667777' 1585253924.3013031
im in a thread! b'4444555566667777' 1585253924.3013911
gets value, not buffer object b'4444555566667777' 1585253924.3263922
value clipped to size b'344445555666677' 1585253924.326594
Im in a process! b'4444555566667777' 1585253924.301939
The process returned: b'7777666655554444'
writing b'8888'
writing b'9999'
writing b'0000'
writing b'1111'
default b'8888999900001111' 1585253928.3309472
im in a thread! b'8888999900001111' 1585253928.331071
gets value, not buffer object b'8888999900001111' 1585253928.3323038
value clipped to size b'778888999900001' 1585253928.33305
Im in a process! b'8888999900001111' 1585253928.332133
The process returned: b'1111000099998888'
writing b'2222'
writing b'3333'

项目详情


下载文件

下载您平台上的文件。如果您不确定选择哪个,请了解有关安装包的更多信息。

源分布

ocycle-0.1.0.tar.gz (8.9 kB 查看散列)

上传时间

支持者

AWS AWS 云计算和安全赞助商 Datadog Datadog 监控 Fastly Fastly CDN Google Google 下载分析 Microsoft Microsoft PSF 赞助商 Pingdom Pingdom 监控 Sentry Sentry 错误记录 StatusPage StatusPage 状态页面