响应式日志记录
项目描述
giving — 响应式记录器
giving
是一个简单而神奇的库,允许您在程序中记录或“提供”任意数据,然后将其作为事件流进行处理。您可以使用它将日志记录到终端、到 wandb 或 mlflow,计算最小值、最大值、滚动平均值等,而无需更改程序的核心逻辑。
- 在您的代码中,使用
give()
记录或计算您可能想要记录或度量指标的所有对象或数据。 - 使用
given()
包装主循环,并定义管道来映射、过滤和减少您提供的数据。
示例
代码 | 输出 |
---|---|
简单日志记录 # All calls to give() will log to the configured console
with given().display():
a, b = 10, 20
# Without parameters: last expression + result
give()
# With parameters:
# parameter is just value: value => value
# parameter is key and value: key => value
give(a * b, c=30)
|
|
将值提取到列表中 # give(key=value) with key == "s" will add value to `results`
with given()["s"].values() as results:
s = 0
for i in range(5):
s += i
give(s)
print(results)
|
|
减少(最小值、最大值、计数等) def collatz(n):
while n != 1:
give(n)
n = (3 * n + 1) if n % 2 else (n // 2)
with given() as gv:
gv["n"].max().print("max: {}")
gv["n"].count().print("steps: {}")
collatz(2021)
|
|
使用 st, = given()["n"].count().eval(collatz, 2021)
print(st)
|
|
with given() as gv:
gv.kscan().display()
give(elk=1)
give(rabbit=2)
give(elk=3, wolf=4)
|
|
with given() as gv:
gv.throttle(1).display()
for i in range(50):
give(i)
time.sleep(0.1)
|
|
上述示例仅展示了所有可用操作符的一小部分。查看所有操作符。
提供
您可以使用多种方式使用 give
。除非 give
被赋予单个位置参数,否则它返回 None,在这种情况下,它返回该参数的值。
-
give(key=value)
这是使用
give
的最直接方式:您写出了键和关联的值。返回: None
-
x = give(value)
当没有提供键,但
give
的结果被分配给变量时,键是该变量的名称。换句话说,上面的代码等同于give(x=value)
。返回: 值
-
give(x)
当没有提供键且结果未分配给变量时,
give(x)
等同于give(x=x)
。如果参数是表达式,如x * x
,则键将是字符串"x * x"
。返回: 值
-
give(x, y, z)
可以提供多个参数。上面的代码等同于
give(x=x, y=y, z=z)
。返回: None
-
x = value; give()
如果
give
没有任何参数,它将查看紧接之前的语句并推断你的意图。上述代码等同于x = value; give(x=value)
。返回: None
重要函数和方法
- print 和 display:用于打印输出流
- values 和 accum:用于累加到列表中
- subscribe 和 ksubscribe:对每个元素执行任务
- where,where_any,keep,
gv["key"]
,gv["?key"]
:基于键进行过滤
运算符总结
这里并没有列出所有运算符。请点击此处查看完整列表。
过滤
- filter:使用函数进行过滤
- kfilter:使用函数进行过滤(关键字参数)
- where:基于键和简单条件进行过滤
- where_any:基于键进行过滤
- keep:基于键进行过滤(+删除其他内容)
- distinct:仅发射不同的元素
- norepeat:仅发射不同的连续元素
- first:仅发射第一个元素
- last:仅发射最后一个元素
- take:仅发射前n个元素
- take_last:仅发射最后n个元素
- skip:抑制前n个元素
- skip_last:抑制最后n个元素
映射
归约
算术归约
大多数这些减少可以通过将 scan
参数设置为 True
来使用 scan
而不是 reduce
。 scan
也可以设置为整数,在这种情况下使用 roll
。
包装
- wrap:在块的开始和结束时提供特殊键
- wrap_inherit:在块的开始和结束时提供特殊键
- inherit:在块中的每个 give() 上添加默认键/值
- wrap:在
give.wrap
的位置插入上下文管理器 - kwrap:与 wrap 相同,但传递关键字参数
计时
调试
- breakpoint:在数据到来时设置断点。与过滤器一起使用。
- tag:为每个条目分配一个特殊词。与
breakword
一起使用。 - breakword:使用
BREAKWORD
环境变量在由tag
设置的特定词集上设置断点。
其他
- accum:累积到列表中
- display:打印流(美化)。
- print:打印流。
- values:累积到列表中(上下文管理器)
- subscribe:在每个元素上运行任务
- ksubscribe:在每个元素上运行任务(关键字参数)
机器学习想法
这里有一些关于在机器学习模型训练环境中使用 giving 的想法
from giving import give, given
def main():
model = Model()
for i in range(niters):
# Give the model. give looks at the argument string, so
# give(model) is equivalent to give(model=model)
give(model)
loss = model.step()
# Give the iteration number and the loss (equivalent to give(i=i, loss=loss))
give(i, loss)
# Give the final model. The final=True key is there so we can filter on it.
give(model, final=True)
if __name__ == "__main__":
with given() as gv:
# ===========================================================
# Define our pipeline **before** running main()
# ===========================================================
# Filter all the lines that have the "loss" key
# NOTE: Same as gv.filter(lambda values: "loss" in values)
losses = gv.where("loss")
# Print the losses on stdout
losses.display() # always
losses.throttle(1).display() # OR: once every second
losses.slice(step=10).display() # OR: every 10th loss
# Log the losses (and indexes i) with wandb
# >> is shorthand for .subscribe()
losses >> wandb.log
# Print the minimum loss at the end
losses["loss"].min().print("Minimum loss: {}")
# Print the mean of the last 100 losses
# * affix adds columns, so we will display i, loss and meanloss together
# * The scan argument outputs the mean incrementally
# * It's important that each affixed column has the same length as
# the losses stream (or "table")
losses.affix(meanloss=losses["loss"].mean(scan=100)).display()
# Store all the losses in a list
losslist = losses["loss"].accum()
# Set a breakpoint whenever the loss is nan or infinite
losses["loss"].filter(lambda loss: not math.isfinite(loss)).breakpoint()
# Filter all the lines that have the "model" key:
models = gv.where("model")
# Write a checkpoint of the model at most once every 30 minutes
models["model"].throttle(30 * 60).subscribe(
lambda model: model.checkpoint()
)
# Watch with wandb, but only once at the very beginning
models["model"].first() >> wandb.watch
# Write the final model (you could also use models.last())
models.where(final=True)["model"].subscribe(
lambda model: model.save()
)
# ===========================================================
# Finally, execute the code. All the pipelines we defined above
# will proceed as we give data.
# ===========================================================
main()
项目细节
下载文件
下载您平台的文件。如果您不确定选择哪个,请了解更多关于 安装软件包 的信息。