Python的Asyncio DataLoader实现
项目描述
Asyncio DataLoader
DataLoader是一个通用工具,可以作为您应用程序数据获取层的一部分使用,通过批处理和缓存,提供对各种远程数据源(如数据库或Web服务)的简化且一致的API。
这是2010年Facebook的@schrockn最初为简化当时存在的各种键值存储后端API而开发的"Loader" API的移植。在Facebook,"Loader"成为"Ent"框架的实现细节之一,这是一个隐私感知的数据实体加载和缓存层,位于Web服务器产品代码中。这最终成为了Facebook的GraphQL服务器实现和类型定义的基础。
Asyncio DataLoader 是原始 JavaScript DataLoader 实现的 Python 版本。DataLoader 通常在实现 GraphQL 服务时使用,但它也在其他情况下具有广泛的应用价值。
入门指南
首先,使用 pip 安装 DataLoader。
pip install aiodataloader
要开始使用,创建一个 DataLoader
。每个 DataLoader
实例代表一个唯一的缓存。通常,当在类似 Sanic 的 web 服务器中使用时,会根据请求创建实例,如果不同用户可以看到不同内容。
注意:DataLoader 假设是在 Python 3.5+ 中具有
async/await
功能的 AsyncIO 环境中。
批处理
批处理不是高级功能,而是 DataLoader 的主要功能。通过提供一个批加载函数来创建加载器。
from aiodataloader import DataLoader
class UserLoader(DataLoader):
async def batch_load_fn(self, keys):
return await my_batch_get_users(keys)
user_loader = UserLoader()
批加载函数接受一个键的可迭代对象,并返回一个解析为值列表的 Promise*。
然后从加载器中加载单个值。DataLoader 会将单个加载在单个执行帧(事件循环的单次滴答)中发生的所有加载合并,然后使用所有请求的键调用您的批处理函数。
user1_future = user_loader.load(1)
user2_future = user_loader.load(2)
user1 = await user1_future
user2 = await user2_future
user1_invitedby = user_loader.load(user1.invited_by_id)
user2_invitedby = user_loader.load(user2.invited_by_id)
print("User 1 was invited by", await user1_invitedby)
print("User 2 was invited by", await user2_invitedby)
一个简单的应用程序可能会为所需信息发起四个后端往返,但使用 DataLoader 的应用程序最多只需进行两次。
DataLoader 允许您在不牺牲批数据加载性能的情况下解耦应用程序的不同部分。虽然加载器提供了一个加载单个值的 API,但所有并发请求都将合并并呈现给您的批加载函数。这允许您的应用程序安全地在应用程序中分配数据获取需求,并保持最小的输出数据请求。
批处理函数
批加载函数接受一个键列表,并返回一个解析为值列表的 Future。必须遵守一些约束条件
- 值的列表必须与键的列表长度相同。
- 值的列表中的每个索引必须与键的列表中的相同索引相对应。
例如,如果您的批处理函数提供了键列表:[ 2, 9, 6, 1 ]
,并且从后端服务加载返回了以下值
{ 'id': 9, 'name': 'Chicago' }
{ 'id': 1, 'name': 'New York' }
{ 'id': 2, 'name': 'San Francisco' }
我们的后端服务以我们请求的顺序返回了不同的结果,这很可能是由于它以这种方式更有效率。它还省略了键 6
的结果,我们可以将其解释为没有为该键存在值。
为了遵守批处理函数的约束,它必须返回一个与键的列表长度相同的值列表,并重新排序以确保每个索引与原始键 [ 2, 9, 6, 1 ]
对齐
[
{ 'id': 2, 'name': 'San Francisco' },
{ 'id': 9, 'name': 'Chicago' },
None,
{ 'id': 1, 'name': 'New York' }
]
缓存
DataLoader 为应用程序的单个请求中发生的所有加载提供了一个记忆缓存。在给定键上调用 .load()
一次后,结果将缓存起来以消除重复加载。
除了减轻对数据存储的压力外,按请求缓存结果还会创建更少的对象,这可能会减轻应用程序的内存压力。
user_future1 = user_loader.load(1)
user_future2 = user_loader.load(1)
assert user_future1 == user_future2
按请求缓存
DataLoader 缓存 不会 替代 Redis、Memcache 或任何其他共享应用程序级缓存。DataLoader 首先是数据加载机制,其缓存仅用于在单个应用程序请求的上下文中避免重复加载相同的数据。为此,它维护一个简单的内存记忆缓存(更准确地说是:.load()
是一个记忆化的函数)。
避免使用 DataLoader 实例的不同用户发起多次请求,这可能导致缓存数据在每个请求中错误地出现。通常,当请求开始时创建 DataLoader 实例,并在请求结束时不再使用。
例如,当使用 Sanic
def create_loaders(auth_token) {
return {
'users': user_loader,
}
}
app = Sanic(__name__)
@app.route("/")
async def test(request):
auth_token = authenticate_user(request)
loaders = create_loaders(auth_token)
return render_page(request, loaders)
清除缓存
在某些不常见的情况下,可能需要清除请求缓存。
在需要清除加载器缓存的情况下,最常见的例子是在同一请求内的变更或更新之后,当缓存的值可能过时时,未来的加载不应使用任何可能已缓存的值。
以下是一个使用SQL更新来简单说明的例子。
# Request begins...
user_loader = ...
# And a value happens to be loaded (and cached).
user4 = await user_loader.load(4)
# A mutation occurs, invalidating what might be in cache.
await sql_run('UPDATE users WHERE id=4 SET username="zuck"')
user_loader.clear(4)
# Later the value load is loaded again so the mutated data appears.
user4 = await user_loader.load(4)
# Request completes.
缓存异常
如果批量加载失败(即批量函数抛出异常或返回一个被拒绝的Promise),则请求的值将不会被缓存。然而,如果批量函数为单个值返回一个Exception
实例,那么这个Exception
将被缓存,以避免频繁加载相同的Exception
。
在某些情况下,您可能希望清除这些个别错误的缓存。
try:
user_loader.load(1)
except Exception as e:
user_loader.clear(1)
raise
禁用缓存
在某些不常见的情况下,可能需要 DataLoader 不缓存。调用 DataLoader(batch_fn, cache=false)
将确保每次调用 .load()
都会生成一个新的 Future,并且请求的键不会被保存在内存中。
然而,当禁用备忘录缓存时,您的批量函数将收到一个可能包含重复项的键数组!每个键都将与对 .load()
的每次调用相关联。您的批量加载器应为请求键的每个实例提供值。
例如
class MyLoader(DataLoader):
cache = False
async def batch_load_fn(self, keys):
print(keys)
return keys
my_loader = MyLoader()
my_loader.load('A')
my_loader.load('B')
my_loader.load('A')
# > [ 'A', 'B', 'A' ]
通过调用 .clear()
或 .clear_all()
而不是完全禁用缓存,可以实现更复杂的缓存行为。例如,由于备忘录缓存被启用,这个 DataLoader 将为批量函数提供唯一的键,但在调用批量函数时立即清除其缓存,因此后续请求将加载新的值。
class MyLoader(DataLoader):
cache = False
async def batch_load_fn(self, keys):
self.clear_all()
return keys
API
class DataLoader
DataLoader 创建了一个公共 API,用于使用批量加载函数从特定的数据后端加载数据,给定具有唯一键的数据加载函数,例如 SQL 表的 id
列或 MongoDB 数据库中的文档名称。
每个 DataLoader
实例包含一个唯一的备忘录缓存。在用于长期运行的应用程序或为具有不同访问权限的许多用户提供服务的应用程序中谨慎使用,并考虑为每个 Web 请求创建一个新实例。
DataLoader(batch_load_fn, **options)
根据批量加载函数和选项创建一个新的 DataLoader
。
-
batch_load_fn: 一个接受键列表并返回一个解析为值列表的 Future 的异步函数(协程)。
-
options:
-
batch: 默认值为
True
。设置为False
来禁用批量处理,而不是立即使用单个加载键调用batch_load_fn
。 -
max_batch_size: 默认值为
Infinity
。限制传递给batch_load_fn
的项目数量。 -
cache: 默认值为
True
。设置为False
来禁用备忘录缓存,而是在每次相同的键加载时,在batch_load_fn
中创建一个新的 Promise 和新的键。 -
cache_key_fn: 一个用于为给定加载键生成缓存键的函数。默认为
key => key
。当 Python 对象是键且形状相似的两个对象应被视为等效时,非常有用。 -
cache_map: 一个用于作为此加载器底层缓存的 dict(或具有类似 API 的对象)实例。默认为
{}
。
-
load(key)
加载一个键,返回一个表示该键的值的 Future
。
- key: 要加载的键值。
load_many(keys)
加载多个键,承诺一个值数组
a, b = await my_loader.load_many([ 'a', 'b' ]);
这等同于更冗长的
from asyncio import gather
a, b = await gather(
my_loader.load('a'),
my_loader.load('b')
)
- keys: 要加载的键值列表。
clear(key)
如果存在,从缓存中清除 key
的值。返回自身以用于方法链接。
- key: 要清除的键值。
clear_all()
清除整个缓存。在某个事件导致特定 DataLoader
的未知无效化时使用。返回自身以实现方法链。
prime(key, value)
使用提供的键和值预填充缓存。如果键已存在,则不进行任何更改。(要强制预填充缓存,请先使用 loader.clear(key).prime(key, value)
清除键。)返回自身以实现方法链。
与 GraphQL 一起使用
DataLoader 与 GraphQL 结合得很好。GraphQL 字段被设计为独立的函数。如果没有缓存或批处理机制,一个简单的 GraphQL 服务器每次解析一个字段时都可能会发出新的数据库请求。
考虑以下 GraphQL 请求
{
me {
name
bestFriend {
name
}
friends(first: 5) {
name
bestFriend {
name
}
}
}
}
如果 me
、bestFriend
和 friends
每个都需要请求后端,最多可能进行 13 次数据库请求!
当使用 DataLoader 与 graphene 时,我们可以定义具有更清晰代码的 User
类型,并且最多进行 4 次数据库请求,如果缓存命中,则可能更少。
class User(graphene.ObjectType):
name = graphene.String()
best_friend = graphene.Field(lambda: User)
friends = graphene.List(lambda: User)
def resolve_best_friend(self, args, context, info):
return user_loader.load(self.best_friend_id)
def resolve_friends(self, args, context, info):
return user_loader.load_many(self.friend_ids)
常见模式
为每个请求创建一个新的 DataLoader。
在许多应用程序中,使用 DataLoader 的 Web 服务器为具有不同访问权限的许多不同用户提供请求。在多个用户之间使用一个缓存可能是危险的,因此建议为每个请求创建一个新的 DataLoader。
def create_loaders(auth_token):
return {
'users': DataLoader(lambda ids: gen_users(auth_token, ids)),
'cdn_urls': DataLoader(lambda raw_urls: gen_cdn_urls(auth_token, raw_urls)),
'stories': DataLoader(lambda keys: gen_stories(auth_token, keys)),
}
}
# When handling an incoming web request:
loaders = create_loaders(request.query.auth_token)
# Then, within application logic:
user = await loaders.users.load(4)
pic = await loaders.cdn_urls.load(user.raw_pic_url)
创建一个对象,其中每个键都是一个 DataLoader
,这是一种常见模式,它为需要执行数据加载的代码提供了一个单一的值,例如在 GraphQL 请求的 root_value
的一部分。
通过备用键进行加载。
偶尔,可以通过多种方式访问某种类型的值。例如,可能一个 "User" 类型不仅可以按 "id" 加载,也可以按 "username" 值加载。如果同一个用户由两个键加载,那么当从任一来源加载用户时,填充两个缓存可能是有用的。
async def user_by_id_batch_fn(ids):
users = await gen_users_by_id(ids)
for user in users:
username_loader.prime(user.username, user)
return users
user_by_id_loader = DataLoader(user_by_id_batch_fn)
async def username_batch_fn(names):
users = await gen_usernames(names)
for user in users:
user_by_id_loader.prime(user.id, user)
return users
username_loader = DataLoader(username_batch_fn)
自定义缓存
DataLoader 可选地可以提供一个自定义字典实例作为其记忆化缓存。更具体地说,任何实现了 get()
、set()
、delete()
和 clear()
方法的对象都可以提供。这允许提供实现各种 缓存算法 的自定义字典。默认情况下,DataLoader 使用标准 dict,它简单地增长直到 DataLoader 释放。默认值适用于您的应用程序请求是短生命周期的。
视频源代码讲解
DataLoader 源代码讲解(YouTube)
项目详情
下载文件
下载适合您平台的应用程序。如果您不确定选择哪个,请了解更多关于 安装包 的信息。