Python的Asyncio DataLoader实现
项目描述
DataLoader是一个通用实用工具,可作为您应用程序数据获取层的一部分使用,通过批处理和缓存提供简化和一致的API,以覆盖各种远程数据源,如数据库或Web服务。
这是2010年由[@schrockn][]在Facebook开发的“Loader”API的移植,当时作为简化现有各种键值存储后端API的工具。在Facebook中,“Loader”成为“Ent”框架的实现细节之一,这是一个隐私感知的数据实体加载和缓存层,位于Web服务器产品代码中。这最终成为Facebook的GraphQL服务器实现和类型定义的基础。
DataLoader是针对AsyncIO服务实现的原始想法的简化版本。DataLoader在实现graphene服务时经常使用,尽管它在其他情况下也非常有用。
DataLoader提供,以便它不仅对使用AsyncIO构建GraphQL服务有用,而且还作为该概念公开可用的参考实现,希望它可以移植到其他语言。如果您将DataLoader移植到另一种语言,请打开一个问题,包括从本存储库的链接。
入门
首先,使用pip安装DataLoader。
pip install aiodataloader
开始使用前,创建一个DataLoader。每个DataLoader实例代表一个独特的缓存。通常,当在类似Sanic这样的web服务器中使用时,每个请求都会创建一个实例,如果不同用户可以看到不同的内容。
注意:DataLoader假设是在支持async/await的AsyncIO环境中,且仅在Python 3.5+中可用。
批处理
批处理不是高级功能,而是DataLoader的主要功能。通过提供一个批加载函数来创建加载器。
from aiodataloader import DataLoader
class UserLoader(DataLoader):
async def batch_load_fn(self, keys):
return await my_batch_get_users(keys)
user_loader = UserLoader()
批加载函数接受一组键的迭代器,并返回一个解析为值列表的Promise*。
然后从加载器中加载单个值。DataLoader会将执行帧(事件循环的单次跳动)内发生的所有单个加载合并,然后使用所有请求的键调用您的批处理函数。
user1_future = user_loader.load(1)
user2_future = user_loader.load(2)
user1 = await user1_future
user2 = await user2_future
user1_invitedby = user_loader.load(user1.invited_by_id)
user2_invitedby = user_loader.load(user2.invited_by_id)
print("User 1 was invited by", await user1_invitedby)
print("User 2 was invited by", await user2_invitedby)
一个简单的应用可能需要向后端发出四次往返以获取所需信息,但使用DataLoader,这个应用最多只做两次。
DataLoader允许您在不牺牲批数据加载性能的情况下解耦应用程序的不相关部分。虽然加载器提供了一个加载单个值的API,但所有并发请求都将合并并呈现给您的批加载函数。这允许您的应用程序在应用程序中安全地分配数据检索需求,并保持最小的外出数据请求。
批处理函数
批加载函数接受一个键列表,并返回一个解析为值列表的Future。必须遵守一些约束条件
值列表的长度必须与键列表的长度相同。
值列表中的每个索引必须与键列表中的相同索引相对应。
例如,如果您的批处理函数提供了一个键列表:[ 2, 9, 6, 1 ],并且从后端服务加载返回了以下值
{ 'id': 9, 'name': 'Chicago' }
{ 'id': 1, 'name': 'New York' }
{ 'id': 2, 'name': 'San Francisco' }
我们的后端服务返回的结果顺序与我们请求的不同,这可能是由于它这样做更有效率。此外,它省略了键6的结果,我们可以将其解释为没有为该键存在值。
为了遵守批处理函数的约束,它必须返回一个长度与键列表相同的值列表,并重新排序以确保每个索引与原始键[ 2, 9, 6, 1 ]对齐
[
{ 'id': 2, 'name': 'San Francisco' },
{ 'id': 9, 'name': 'Chicago' },
None,
{ 'id': 1, 'name': 'New York' }
]
缓存
DataLoader为您的应用程序的单个请求中发生的所有加载提供了记忆缓存。在用给定的键调用.load()之后,结果被缓存以消除重复加载。
除了减轻数据存储的压力外,按请求缓存结果还会创建更少的对象,这可能会减轻应用程序的内存压力。
user_future1 = user_loader.load(1)
user_future2 = user_loader.load(1)
assert user_future1 == user_future2
按请求缓存
DataLoader的缓存不代替Redis、Memcache或其他任何共享应用程序级缓存。DataLoader首先和 foremost是一个数据加载机制,其缓存仅在单个请求的上下文中避免重复加载相同的数据。为此,它维护一个简单的内存记忆缓存(更准确地讲:.load()是一个记忆化的函数)。
避免使用DataLoader实例从不同用户发出多个请求,这可能导致缓存数据在每个请求中错误地出现。通常,DataLoader实例在请求开始时创建,在请求结束时不再使用。
例如,当使用Sanic时
def create_loaders(auth_token) {
return {
'users': user_loader,
}
}
app = Sanic(__name__)
@app.route("/")
async def test(request):
auth_token = authenticate_user(request)
loaders = create_loaders(auth_token)
return render_page(request, loaders)
清除缓存
在某些不常见的情况下,清除请求缓存可能是必要的。
清除加载器缓存最常见的情况是在同一请求中进行变更或更新后,当缓存值可能已过时,未来的加载不应使用任何可能缓存的值。
以下是一个使用SQL UPDATE来演示的简单示例。
# Request begins...
user_loader = ...
# And a value happens to be loaded (and cached).
user4 = await user_loader.load(4)
# A mutation occurs, invalidating what might be in cache.
await sql_run('UPDATE users WHERE id=4 SET username="zuck"')
user_loader.clear(4)
# Later the value load is loaded again so the mutated data appears.
user4 = await user_loader.load(4)
# Request completes.
缓存异常
如果批加载失败(即批处理函数抛出错误或返回一个被拒绝的Promise),则请求的值不会被缓存。但是,如果批处理函数为单个值返回一个Exception实例,则该Exception将被缓存,以避免频繁加载相同的Exception。
在某些情况下,您可能希望清除这些单个错误的缓存
try:
user_loader.load(1)
except Exception as e:
user_loader.clear(1)
raise
禁用缓存
在某些不常见的情况下,可能希望有一个不缓存的数据加载器。调用DataLoader(batch_fn, cache=false)将确保每次调用.load()都会生成一个新的Future,并且请求的键将不会保存在内存中。
然而,当禁用记忆缓存时,您的批处理函数将收到一个可能包含重复项的键数组!每个键将与每个.load()调用相关联。您的批加载器应为请求的每个键实例提供一个值。
例如
class MyLoader(DataLoader):
cache = False
async def batch_load_fn(self, keys):
print(keys)
return keys
my_loader = MyLoader()
my_loader.load('A')
my_loader.load('B')
my_loader.load('A')
# > [ 'A', 'B', 'A' ]
通过调用.clear()或.clear_all()而不是完全禁用缓存,可以实现更复杂的缓存行为。例如,由于启用了记忆缓存,此数据加载器将为批处理函数提供唯一的键,但在批处理函数被调用时会立即清除其缓存,因此后续请求将加载新值。
class MyLoader(DataLoader):
cache = False
async def batch_load_fn(self, keys):
self.clear_all()
return keys
API
class DataLoader
DataLoader创建了一个公共API,用于在给定的批加载函数的特定数据后端中加载数据,这些数据具有唯一键,例如SQL表的id列或MongoDB数据库中的文档名称。
每个DataLoader实例都包含一个唯一的记忆缓存。在使用长时间运行的应用程序或在为具有不同访问权限的许多用户提供服务的应用程序中请谨慎使用,并考虑在每个Web请求中创建一个新的实例。
new DataLoader(batch_load_fn, **options)
根据批加载函数和选项创建一个新的DataLoader。
batch_load_fn: 一个接受键列表并返回解析为值列表的Future的异步函数(协程)。
options:
batch: 默认为True。设置为False以禁用批处理,而不是立即使用单个加载键调用batch_load_fn。
max_batch_size: 默认为Infinity。限制传递给batch_load_fn的项数。
cache: 默认为True。设置为False以禁用记忆缓存,而是在每次加载相同键时在batch_load_fn中创建一个新的Promise和一个新的键。
cache_key_fn: 一个用于生成给定加载键的缓存键的函数。默认为key => key。当Python对象是键且两个形状相似的对象应被视为等效时很有用。
cache_map: 一个字典(或具有类似API的对象)的实例,用作此加载器的底层缓存。默认为{}。
load(key)
加载一个键,返回表示该键的值的Future。
key:要加载的键值。
load_many(keys)
加载多个键,承诺一个值数组。
a, b = await my_loader.load_many([ 'a', 'b' ]);
这与更冗长的描述等效:
from asyncio import gather
a, b = await gather(
my_loader.load('a'),
my_loader.load('b')
)
keys:要加载的键值列表。
clear(key)
清除缓存中key处的值,如果存在。返回自身以实现方法链。
key:要清除的键值。
clear_all()
清除整个缓存。在某个事件导致特定DataLoader中的未知无效化时使用。返回自身以实现方法链。
prime(key, value)
使用提供的键和值预充缓存。如果键已存在,则不进行更改。(要强制预充缓存,请先使用loader.clear(key).prime(key, value)清除键。)返回自身以实现方法链。
与GraphQL一起使用
DataLoader与GraphQL配合良好。GraphQL字段设计为独立的函数。如果没有缓存或批处理机制,那么一个天真的GraphQL服务器在解析每个字段时很容易发出新的数据库请求。
考虑以下GraphQL请求
{ me { name bestFriend { name } friends(first: 5) { name bestFriend { name } } } }
天真地,如果me、bestFriend和friends每个都需要请求后端,则最多可能发出13次数据库请求!
当使用DataLoader时,我们可以使用SQLite示例以更清晰的代码定义User类型,最多发出4次数据库请求,并且如果发生缓存命中,可能还会更少。
class User(graphene.ObjectType):
name = graphene.String()
best_friend = graphene.Field(lambda: User)
friends = graphene.List(lambda: User)
def resolve_best_friend(self, args, context, info):
return user_loader.load(self.best_friend_id)
def resolve_friends(self, args, context, info):
return user_loader.load_many(self.friend_ids)
常见模式
每个请求创建一个新的DataLoader。
在许多应用程序中,使用DataLoader的Web服务器为许多不同用户的不同权限请求提供服务。使用一个缓存跨多个用户可能是危险的,因此建议每个请求创建一个新的DataLoader。
def create_loaders(auth_token):
return {
'users': DataLoader(lambda ids: gen_users(auth_token, ids)),
'cdn_urls': DataLoader(lambda raw_urls: gen_cdn_urls(auth_token, raw_urls)),
'stories': DataLoader(lambda keys: gen_stories(auth_token, keys)),
}
}
# When handling an incoming web request:
loaders = create_loaders(request.query.auth_token)
# Then, within application logic:
user = await loaders.users.load(4)
pic = await loaders.cdn_urls.load(user.raw_pic_url)
创建一个对象,其中每个键都是一个DataLoader,这是一种常见的模式,该模式为需要执行数据加载的代码提供单个值,例如[graphql][]请求中的root_value的一部分。
通过替代键进行加载。
有时,某些值可以通过多种方式访问。例如,可能一个“User”类型不仅可以通过“id”加载,还可以通过“username”值加载。如果通过两个键加载相同的用户,那么在用户从任一来源加载时填充两个缓存可能很有用。
async def user_by_id_batch_fn(ids):
users = await gen_users_by_id(ids)
for user in users:
username_loader.prime(user.username, user)
return users
user_by_id_loader = DataLoader(user_by_id_batch_fn)
async def username_batch_fn(names):
users = await gen_usernames(names)
for user in users:
user_by_id_loader.prime(user.id, user)
return users
username_loader = DataLoader(username_batch_fn)
自定义缓存
DataLoader可以选择提供自定义字典实例,用作其记忆化缓存。更具体地说,任何实现get()、set()、delete()和clear()方法的对象都可以提供。这允许提供实现各种缓存算法的自定义字典。默认情况下,DataLoader使用标准的dict,它简单地增长,直到DataLoader被释放。默认情况下,当应用程序的请求是短暂的生命周期时,这是合适的。
视频源代码演示
DataLoader源代码演示(YouTube)
项目详细信息
下载文件
下载您平台的文件。如果您不确定选择哪个,请了解更多关于 安装包 的信息。