实现GraphQL API的高性能库
项目描述
GraphLayer是一个Python库,用于实现高性能的GraphQL API。通过为请求中的每个节点而不是响应中的每个节点运行解析函数,可以减少异步函数调用的开销。查询也可以直接编写,以直接获取批量数据,从而避免中间层如DataLoader的N+1问题。
为什么选择GraphLayer?
是什么导致GraphQL API运行缓慢?在大多数GraphQL实现中,解析函数为响应中的每个节点运行。这可能导致性能不佳,主要原因有两个。第一个是N+1问题。第二个是调用响应中每个节点的可能异步解析函数的开销。虽然每次调用的开销很小,但对于大型数据集,这种开销的总和可能是响应时间的大部分。
GraphLayer建议解析函数应根据请求的形状而不是响应的形状来调用。这避免了N+1问题,同时没有引入额外的复杂性,如DataLoader和类似库中的批量请求,并允许以更自然的方式编写映射到如SQL数据库等数据存储的解析函数。其次,尽管仍然存在调用解析函数的开销,但这种开销是乘以请求中的节点数量而不是响应中的节点数量:对于大型数据集,这是一个巨大的节省。
作为一个具体的例子,考虑以下查询
query { books { title author { name } } }
一个简单的GraphQL实现将发出一个SQL查询以获取所有书籍的列表,然后发出N个查询以获取每本书的作者。使用DataLoader,获取每本书作者的所有N个查询将合并为一个查询。使用GraphLayer,将有一个获取作者而不进行批量的单个查询。
安装
pip install git+https://github.com/mwilliamson/python-graphlayer.git#egg=graphlayer[graphql]
教程
本教程通过使用 SQLAlchemy 和 GraphLayer 构建了一个简单的应用程序。目标是要执行以下查询
query { books(genre: "comedy") { title author { name } } }
也就是说,获取所有喜剧类别的书籍列表,以及每本书的标题和作者姓名。
在本教程中,我们将从头开始构建必要的代码,仅使用 GraphLayer 的核心,以了解 GraphLayer 的工作原理。实际上,有一些辅助函数可以使实现更加简单。我们将在最后看到如何使用这些辅助函数编写我们的示例。
环境
您需要一个安装了 Python 3.5 或更高版本的 Python 环境,并且已经安装了 graphlayer、graphql 和 SQLAlchemy。例如,在命令行
python3 -m venv .venv . .venv/bin/activate pip install --upgrade pip setuptools wheel pip install git+https://github.com/mwilliamson/python-graphlayer.git#egg=graphlayer[graphql] pip install SQLAlchemy
入门指南
让我们从一个简单的查询开始,获取书籍的数量
query { bookCount }
所有查询都共享相同的根对象,但可以要求任何它们想要的字段。作为第一步,我们将定义根对象的模式。现在,我们将定义一个名为 book_count 的单个整数字段(请注意,大小写会在驼峰式和蛇形之间自动转换)
import graphlayer as g
Root = g.ObjectType("Root", fields=(
g.field("book_count", type=g.Int),
))
我们还需要定义如何通过定义解析器函数来解决书籍数量。每个解析器函数都接受图和特定类型的查询,并返回该查询的结果。装饰器 g.resolver() 用于标记解析器针对哪种类型的查询。在这种情况下,我们需要创建一个针对根类型的解析器。现在,我们将定义一个返回固定对象并打印查询以便我们可以查看的解析器。
import graphlayer as g
from graphlayer.graphql import execute
Root = g.ObjectType("Root", fields=(
g.field("book_count", type=g.Int),
))
@g.resolver(Root)
def resolve_root(graph, query):
print("query:", query)
return query.create_object({
"bookCount": 3,
})
resolvers = (resolve_root, )
graph_definition = g.define_graph(resolvers=resolvers)
graph = graph_definition.create_graph({})
execute(
"""
query {
bookCount
}
""",
graph=graph,
query_type=Root,
)
运行此代码将打印出
query: ObjectQuery( type=Root, field_queries=( FieldQuery( key="bookCount", field=Root.fields.book_count, type_query=ScalarQuery(type=Int), args=(), ), ), )
请注意,FieldQuery 有一个 key 属性。由于用户可以在查询中重命名字段,我们应该使用传递给字段查询的键。
@g.resolver(Root)
def resolve_root(graph, query):
field_query = query.field_queries[0]
return query.create_object({
field_query.key: 3,
})
目前,由于在根上只定义了一个字段,我们可以始终假设正在请求该字段。然而,情况通常不会是这样。例如,我们可以在根上添加一个作者数量
Root = g.ObjectType("Root", fields=(
g.field("author_count", type=g.Int),
g.field("book_count", type=g.Int),
))
现在我们需要检查请求的字段是什么。
@g.resolver(Root)
def resolve_root(graph, query):
def resolve_field(field):
if field == Root.fields.author_count:
return 2
elif field == Root.fields.book_count:
return 3
else:
raise Exception("unknown field: {}".format(field))
field_query = query.field_queries[0]
return query.create_object({
field_query.key: resolve_field(field_query.field),
})
更重要的是,用户可能会请求多个字段,因此我们在生成结果时应遍历 query.field_queries。
@g.resolver(Root)
def resolve_root(graph, query):
def resolve_field(field):
if field == Root.fields.author_count:
return 2
elif field == Root.fields.book_count:
return 3
else:
raise Exception("unknown field: {}".format(field))
return query.create_object(dict(
(field_query.key, resolve_field(field_query.field))
for field_query in query.field_queries
))
如果我们打印执行结果中的数据
print("result:", execute(
"""
query {
bookCount
}
""",
graph=graph,
query_type=Root,
).data)
然后我们应该得到以下输出
result: {'bookCount': 3}
添加 SQLAlchemy
到目前为止,我们返回的是硬编码的值。让我们添加一个使用 SQLAlchemy 的数据库以及一个内存中的 SQLite 数据库。在脚本开始时,我们将添加一些设置数据库模式和添加数据的代码
import sqlalchemy.ext.declarative
import sqlalchemy.orm
Base = sqlalchemy.ext.declarative.declarative_base()
class AuthorRecord(Base):
__tablename__ = "author"
id = sqlalchemy.Column(sqlalchemy.Integer, primary_key=True)
name = sqlalchemy.Column(sqlalchemy.Unicode, nullable=False)
class BookRecord(Base):
__tablename__ = "book"
id = sqlalchemy.Column(sqlalchemy.Integer, primary_key=True)
title = sqlalchemy.Column(sqlalchemy.Unicode, nullable=False)
genre = sqlalchemy.Column(sqlalchemy.Unicode, nullable=False)
author_id = sqlalchemy.Column(sqlalchemy.Integer, sqlalchemy.ForeignKey(AuthorRecord.id), nullable=False)
engine = sqlalchemy.create_engine("sqlite:///:memory:")
Base.metadata.create_all(engine)
session = sqlalchemy.orm.Session(engine)
author_wodehouse = AuthorRecord(name="PG Wodehouse")
author_bernières = AuthorRecord(name="Louis de Bernières")
session.add_all((author_wodehouse, author_bernières))
session.flush()
session.add(BookRecord(title="Leave It to Psmith", genre="comedy", author_id=author_wodehouse.id))
session.add(BookRecord(title="Right Ho, Jeeves", genre="comedy", author_id=author_wodehouse.id))
session.add(BookRecord(title="Captain Corelli's Mandolin", genre="historical_fiction", author_id=author_bernières.id))
session.flush()
接下来,我们将更新我们的解析器以使用数据库
@g.resolver(Root)
def resolve_root(graph, query):
def resolve_field(field):
if field == Root.fields.author_count:
return session.query(AuthorRecord).count()
elif field == Root.fields.book_count:
return session.query(BookRecord).count()
else:
raise Exception("unknown field: {}".format(field))
return query.create_object(dict(
(field_query.key, resolve_field(field_query.field))
for field_query in query.field_queries
))
在根中添加书籍
到目前为止,我们在根中添加了两个标量字段。让我们添加一个 books 字段,这将更有趣一些。我们的目标是能够运行以下查询
query { books { title } }
我们首先创建一个 Book 对象类型,并使用它来在 Root 上定义 books 字段
Book = g.ObjectType("Book", fields=(
g.field("title", type=g.String),
g.field("genre", type=g.String),
))
Root = g.ObjectType("Root", fields=(
g.field("author_count", type=g.Int),
g.field("book_count", type=g.Int),
g.field("books", type=g.ListType(Book)),
))
我们需要更新根解析器来处理新字段。尽管我们可以在根解析器中直接处理该字段,但我们将要求图为我们解析查询。这允许我们有共同的方式来解析书籍,无论它们在查询中的位置如何。
@g.resolver(Root)
def resolve_root(graph, query):
def resolve_field(field_query):
if field_query.field == Root.fields.author_count:
return session.query(AuthorRecord).count()
elif field_query.field == Root.fields.book_count:
return session.query(BookRecord).count()
elif field_query.field == Root.fields.books:
return graph.resolve(field_query.type_query)
else:
raise Exception("unknown field: {}".format(field_query.field))
return query.create_object(dict(
(field_query.key, resolve_field(field_query))
for field_query in query.field_queries
))
这意味着我们需要定义一个针对书籍列表的解析器。现在,让我们只打印查询并返回一个空列表,这样我们就可以看到查询的样子。
@g.resolver(g.ListType(Book))
def resolve_books(graph, query):
print("books query:", query)
return []
resolvers = (resolve_root, resolve_books)
如果我们更新传递给 execute 的查询
print("result:", execute(
"""
query {
books {
title
}
}
""",
graph=graph,
query_type=Root,
))
然后我们的脚本现在应该产生以下输出
books query: ListQuery( type=List(Book), element_query=ObjectQuery( type=Book, field_queries=( FieldQuery( key="title", field=Book.fields.title, type_query=ScalarQuery(type=String), args=(), ), ), ), ) result: {'books': []}
与在解析根对象时使用的 ObjectQuery 类似,我们还有一个针对 Book 的 ObjectQuery。由于请求的是一个列表,因此它被包裹在一个 ListQuery 中,对象查询可以通过 element_query 属性访问。
我们可以通过首先获取所有书籍,然后将每个获取的书籍映射到查询中请求的对象字段,来编写一个书籍列表的解析器。
@g.resolver(g.ListType(Book))
def resolve_books(graph, query):
books = session.query(BookRecord.title, BookRecord.genre).all()
def resolve_field(book, field):
if field == Book.fields.title:
return book.title
elif field == Book.fields.genre:
return book.genre
else:
raise Exception("unknown field: {}".format(field))
return [
query.element_query.create_object(dict(
(field_query.key, resolve_field(book, field_query.field))
for field_query in query.element_query.field_queries
))
for book in books
]
运行此代码应该会得到以下输出
result: {'books': [{'title': 'Leave It to Psmith'}, {'title': 'Right Ho, Jeeves'}, {'title': "Captain Corelli's Mandolin"}]}
我们可以通过只获取查询所需的列来使解析器更高效。尽管这对于目前我们拥有的数据来说影响相对较小,但这可以在用户可以请求更多字段和大数据集时帮助提高性能。
@g.resolver(g.ListType(Book))
def resolve_books(graph, query):
field_to_expression = {
Book.fields.title: BookRecord.title,
Book.fields.genre: BookRecord.genre,
}
expressions = frozenset(
field_to_expression[field_query.field]
for field_query in query.element_query.field_queries
)
books = session.query(*expressions).all()
def resolve_field(book, field):
if field == Book.fields.title:
return book.title
elif field == Book.fields.genre:
return book.genre
else:
raise Exception("unknown field: {}".format(field))
return [
query.element_query.create_object(dict(
(field_query.key, resolve_field(book, field_query.field))
for field_query in query.element_query.field_queries
))
for book in books
]
将类别参数添加到书籍字段中
到目前为止,books 字段返回数据库中的所有书籍。让我们添加一个可选的 genre 参数,这样我们就可以运行以下查询
query { books(genre: "comedy") { title } }
在我们开始实际添加参数之前,我们需要更改解析书籍的方式。目前,代码解析书籍列表的查询,这并不提供一种方便的方式来告诉解析器只获取子集书籍。为了解决这个问题,我们将对象查询包装在我们的自定义查询类中。
class BookQuery(object):
def __init__(self, object_query):
self.type = (BookQuery, object_query.type)
self.object_query = object_query
然后我们可以在根解析器中创建一个 BookQuery。
elif field_query.field == Root.fields.books:
return graph.resolve(BookQuery(field_query.type_query.element_query))
并且我们需要相应地更新 resolve_books。具体来说,我们需要将 g.resolver(g.ListType(Book)) 替换为 g.resolver((BookQuery, Book)),并将 query.element_query 替换为 query.object_query。
@g.resolver((BookQuery, Book))
def resolve_books(graph, query):
field_to_expression = {
Book.fields.title: BookRecord.title,
Book.fields.genre: BookRecord.genre,
}
expressions = frozenset(
field_to_expression[field_query.field]
for field_query in query.object_query.field_queries
)
books = session.query(*expressions).all()
def resolve_field(book, field):
if field == Book.fields.title:
return book.title
elif field == Book.fields.genre:
return book.genre
else:
raise Exception("unknown field: {}".format(field))
return [
query.object_query.create_object(dict(
(field_query.key, resolve_field(book, field_query.field))
for field_query in query.object_query.field_queries
))
for book in books
]
现在我们可以继续添加参数。我们首先需要更新 Root 上 books 字段的定义。
Root = g.ObjectType("Root", fields=(
g.field("author_count", type=g.Int),
g.field("book_count", type=g.Int),
g.field("books", type=g.ListType(Book), params=(
g.param("genre", type=g.String, default=None),
)),
))
接下来,我们将更新 BookQuery 以支持通过添加 where 方法进行过滤。
class BookQuery(object):
def __init__(self, object_query, genre=None):
self.type = (BookQuery, object_query.type)
self.object_query = object_query
self.genre = genre
def where(self, *, genre):
return BookQuery(self.object_query, genre=genre)
我们可以在根解析器中解析 books 字段时使用此 where 方法。
elif field_query.field == Root.fields.books:
book_query = BookQuery(field_query.type_query.element_query)
if field_query.args.genre is not None:
book_query = book_query.where(genre=field_query.args.genre)
return graph.resolve(book_query)
最后,我们需要过滤从数据库中获取的书籍。我们将
books = session.query(*expressions).all()
替换为
sqlalchemy_query = session.query(*expressions)
if query.genre is not None:
sqlalchemy_query = sqlalchemy_query.filter(BookRecord.genre == query.genre)
books = sqlalchemy_query.all()
如果我们用新的查询更新我们的脚本
print("result:", execute(
"""
query {
books(genre: "comedy") {
title
}
}
""",
graph=graph,
query_type=Root,
))
我们应该只看到输出中的喜剧类别的书籍
result: {'books': [{'title': 'Leave It to Psmith'}, {'title': 'Right Ho, Jeeves'}]}
依赖注入
在我们之前的例子中,我们将SQLAlchemy会话视为一个全局变量。在实践中,有时显式传递会话(以及其他依赖项)是有用的。解析器的依赖项使用装饰器g.dependencies标记,它允许将依赖项作为关键字参数传递给解析器。例如,为了将SQLAlchemy会话的依赖项添加到resolve_root
@g.resolver(Root)
@g.dependencies(session=sqlalchemy.orm.Session)
def resolve_root(graph, query, *, session):
依赖项可以由任何值标识。在这种情况下,我们通过其类标识会话依赖项,即sqlalchemy.orm.Session。在创建图时,我们需要传递依赖项
graph = graph_definition.create_graph({
sqlalchemy.orm.Session: session,
})
提取重复项
在实现解析器时,存在一些常见的模式。通过将这些常见模式提取到构建解析器的函数中,我们可以减少重复并简化解析器的定义。例如,我们可以将根解析器重写为
resolve_root = g.root_object_resolver(Root)
@resolve_root.field(Root.fields.author_count)
@g.dependencies(session=sqlalchemy.orm.Session)
def root_resolve_author_count(graph, query, args, *, session):
return session.query(AuthorRecord).count()
@resolve_root.field(Root.fields.authors)
def root_resolve_authors(graph, query, args):
return graph.resolve(AuthorQuery(query.element_query))
@resolve_root.field(Root.fields.book_count)
@g.dependencies(session=sqlalchemy.orm.Session)
def root_resolve_book_count(graph, query, args, *, session):
return session.query(BookRecord).count()
@resolve_root.field(Root.fields.books)
def root_resolve_books(graph, query, args):
book_query = BookQuery(query.element_query)
if args.genre is not None:
book_query = book_query.where(genre=args.genre)
return graph.resolve(book_query)
同样,我们可以使用graphlayer.sqlalchemy模块来定义作者和书籍的解析器
import graphlayer.sqlalchemy as gsql
@resolve_root.field(Root.fields.authors)
def root_resolve_authors(graph, query, args):
return graph.resolve(gsql.select(query))
@resolve_root.field(Root.fields.books)
def root_resolve_books(graph, query, args):
book_query = gsql.select(query)
if args.genre is not None:
book_query = book_query.where(BookRecord.genre == args.genre)
return graph.resolve(book_query)
resolve_authors = gsql.sql_table_resolver(
Author,
AuthorRecord,
fields={
Author.fields.name: gsql.expression(AuthorRecord.name),
},
)
resolve_books = gsql.sql_table_resolver(
Book,
BookRecord,
fields={
Book.fields.title: gsql.expression(BookRecord.title),
Book.fields.genre: gsql.expression(BookRecord.genre),
Book.fields.author: lambda graph, field_query: gsql.join(
key=BookRecord.author_id,
resolve=lambda author_ids: graph.resolve(
gsql.select(field_query.type_query).by(AuthorRecord.id, author_ids),
),
),
},
)
项目详情
下载文件
下载适合您平台的文件。如果您不确定选择哪一个,请了解更多关于安装包的信息。
源分发
构建分发
graphlayer-0.2.8.tar.gz的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 1e09ed9ef00cdd6b7b490b89b582e2bfb139742ca98d7b8fee1919e18c5024c5 |
|
MD5 | 55901f7445c96541668c21933e126bf0 |
|
BLAKE2b-256 | ff05d1ac4a6514c8d4fd9f2e6c65569a7c45c3e3cd352a882756f1c984c79641 |