用于不平衡帧大小的流缓冲和事件触发。
项目描述
ocycle
用于不平衡帧大小的流缓冲和事件触发。数据以一定的帧大小进入,并以不同的大小输出。
它支持
- 线程/多进程回调(除了串行执行外)
- 创建了一个进程/线程池,将回调发送到该池。
- 始终可用的,可回收的缓冲区
- 当将缓冲区传递给异步回调时,将创建一个新的缓冲区用于写入,而另一个被使用。
- 一旦回调函数完成,它将被添加回堆栈,以便可以重复使用。
- 时间采样 - 您可以传递一个函数或标量,该函数定义在发出回调后多长时间再次开始写入。
- 例如,如果您想获得50%的覆盖率,可以传递
sampler=size / fs
,这将等待size / fs
秒再次写入。
- 例如,如果您想获得50%的覆盖率,可以传递
示例
import ocycle
# this is some random data that represents our
# incoming data buffer
SOME_DATA = 'asdfghjkl;'
# how full should the buffer be before emitting data
CONSUME_SIZE = 29
# a data consumer
def create_consumer(size):
# this is called when the buffer reaches/exceeds size.
def callback(value, t0):
assert len(value) >= CONSUME_SIZE
print('Emitting:', t0, value)
return ocycle.BufferEmit(callback, size)
buff = create_consumer(CONSUME_SIZE)
# a generator then writes to the buffer.
# once it is full enough it will call `callback(buff, t0)`
while True:
print('Writing to buffer:', SOME_DATA)
buff.write(SOME_DATA)
示例用例
音频处理
您想从麦克风流式传输音频,并对该流执行多个独立任务。每个特征提取器都需要不同大小的音频块。
例如,您想
- 提取SPL(每0.5秒)
- 写入加密的flac文件(每10秒)
- 运行tensorflow模型(每2秒)
而不是使用单个块大小收集数据,然后手动跟踪并每次每个缓冲区填满时启动新进程,BufferEmit
将为您处理所有这些。
用法
回调
您需要传入一个回调函数,callback
,该函数将在缓冲区超出大小size
时被调用。这将接收已填充的缓冲区和第一个缓冲区帧到达的时间戳。
您还可以传递一个额外的回调on_done
,该回调将在主进程中调用,如果您需要在内存中的某个对象上存储回调的结果,则非常有用。
执行方法
您可以选择如何执行回调
mode='serial'
:回调将在执行线程/进程中运行mode='thread'
:回调将在新线程中运行mode='process'
:回调将在新进程中运行
您可以通过传递例如npool=10
来使用10线程/进程工作池。
值类型
值类型
默认情况下,回调将接收一个bytes
和时间戳对象。如果您传递asbuffer=True
,则它将传递io.BytesIO
而不调用buff.getvalue()
。
剪辑
默认情况下,回调将在缓冲区超出size
后接收整个缓冲区。如果您不想剪辑部分消息,这很有用,但这也意味着len(buff) >= size
。如果您传递clip=True
,它将返回大小为size
的精确块。
采样器
在某些情况下,您可能不希望获取所有数据,只想在一段时间内取一些子样本。
例如,假设您的机器学习模型不是实时模型。您已经尽力而为,但只能将其降低到1.7倍。您可以定义一个采样器,该采样器会在您发出一个缓冲区后等待x
秒,然后再次开始向缓冲区写入。因此,您可以设置sampler=1.0
,它将在再次开始写入缓冲区之前暂停1秒。
采样器也可以是任何不需要参数并且返回int/float的可调用对象。如果您想实现随机/动态采样策略,这很有用。例如,您可以设置
sampler=lambda: (np.random.randn() * 1.3 + 4)
这将延迟使用均值为4和标准差为1.3的正态分布。
配置示例
这显示了BufferEmit
可以存在的不同配置。
待办事项:有不同缓冲区大小的示例将更有说明性...我不知道我在想什么 lol
import time
import ocycle
# build a list of receivers
buffers = []
SIZE = 15
def on_emit(label): # callback
return lambda buff, t0: print(
label, buff.getvalue(), t0)
# default operation
buffers.append(ocycle.BufferEmit(
on_emit('default'), size=SIZE))
# execute the callback in a thread
buffers.append(ocycle.BufferEmit(
on_emit('im in a thread!'),
size=SIZE, mode='thread'))
# the callback for multiprocessing must be pickleable
def on_process_emit(buff, t0): # callback
x = buff.getvalue()
print('Im in a process!', x, t0)
return x[::-1]
# execute the callback in a separate process
# execute `on_done` in the main process
buffers.append(ocycle.BufferEmit(
on_process_emit,
on_done=lambda res:
print('The process returned:', res),
size=SIZE, mode='process'))
# send data to each receiver
def generate_data(length=4, n=1):
for _ in range(n):
for i in range(10):
yield str(i).encode('utf-8') * length
for x in generate_data(4):
print('writing', x)
for b in buffers:
b.write(x)
time.sleep(1)
输出
writing b'0000'
writing b'1111'
writing b'2222'
writing b'3333'
default b'0000111122223333' 1585253921.2961438
im in a thread! b'0000111122223333' 1585253921.296198
gets value, not buffer object b'0000111122223333' 1585253921.2967098
value clipped to size b'000011112222333' 1585253921.296758
Im in a process! b'0000111122223333' 1585253921.296656
The process returned: b'3333222211110000'
writing b'4444'
writing b'5555'
writing b'6666'
writing b'7777'
default b'4444555566667777' 1585253924.3013031
im in a thread! b'4444555566667777' 1585253924.3013911
gets value, not buffer object b'4444555566667777' 1585253924.3263922
value clipped to size b'344445555666677' 1585253924.326594
Im in a process! b'4444555566667777' 1585253924.301939
The process returned: b'7777666655554444'
writing b'8888'
writing b'9999'
writing b'0000'
writing b'1111'
default b'8888999900001111' 1585253928.3309472
im in a thread! b'8888999900001111' 1585253928.331071
gets value, not buffer object b'8888999900001111' 1585253928.3323038
value clipped to size b'778888999900001' 1585253928.33305
Im in a process! b'8888999900001111' 1585253928.332133
The process returned: b'1111000099998888'
writing b'2222'
writing b'3333'
项目详情
ocycle-0.1.0.tar.gz的散列
算法 | 散列摘要 | |
---|---|---|
SHA256 | a7d9b5712f2b3f67934ce15b8409d23aef4e513b26e55844d3a5eaa106c659f3 |
|
MD5 | 0f1a3fb1cd06504ffbb2389c1e483dbf |
|
BLAKE2b-256 | c1bd11e642cdcf0a85792ea1262f5fca373e3295b6afc57b71cbd1644358b932 |