跳转到主要内容

使用连接实现GraphQL

项目描述

在参考GraphQL实现中,解析函数描述了如何为每个对象的实例完成请求数据的一部分。如果使用SQL后端天真地实现,这将导致N+1问题。例如,给定查询

{
    books(genre: "comedy") {
        title
        author {
            name
        }
    }
}

一个天真的GraphQL实现会发出一个SQL查询来获取所有喜剧类型书籍的列表,然后发出N个查询来获取每本书的作者(N是第一个查询返回的书籍数量)。

为此问题已提出了各种解决方案:GraphJoiner建议使用连接对于许多用例是自然的。对于此特定情况,我们只需要运行两个查询:一个用于查找所有喜剧类型书籍的列表,另一个用于获取喜剧类型书籍的作者。

示例

假设我们使用SQLAlchemy定义了一些模型。一本书有一个ID、一个标题、一个类型和一个作者ID。一个作者有一个ID和一个名字。

from sqlalchemy import Column, Integer, Unicode, ForeignKey
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()

class Author(Base):
    __tablename__ = "author"

    id = Column(Integer, primary_key=True)
    name = Column(Unicode, nullable=False)

class Book(Base):
    __tablename__ = "book"

    id = Column(Integer, primary_key=True)
    title = Column(Unicode, nullable=False)
    genre = Column(Unicode, nullable=False)
    author_id = Column(Integer, ForeignKey(Author.id))

然后我们定义根、书籍和作者的对象类型

from graphql import GraphQLInt, GraphQLString, GraphQLArgument
from graphjoiner import JoinType, RootJoinType, single, many, field
from sqlalchemy.orm import Query

def create_root():
    def fields():
        return {
            "books": many(
                book_join_type,
                books_query,
                args={"genre": GraphQLArgument(type=GraphQLString)}
            )
        }

    def books_query(args, _):
        query = Query([]).select_from(Book)

        if "genre" in args:
            query = query.filter(Book.genre == args["genre"])

        return query

    return RootJoinType(name="Root", fields=fields)

root = create_root()

def fetch_immediates_from_database(selections, query, context):
    query = query.with_entities(*(
        selection.field.column_name
        for selection in selections
    ))
    keys = tuple(selection.key for selection in selections)

    return [
        dict(zip(keys, row))
        for row in query.with_session(context.session).all()
    ]

def create_book_join_type():
    def fields():
        return {
            "id": field(column_name="id", type=GraphQLInt),
            "title": field(column_name="title", type=GraphQLString),
            "genre": field(column_name="genre", type=GraphQLString),
            "authorId": field(column_name="author_id", type=GraphQLInt),
            "author": single(author_join_type, author_query, join={"authorId": "id"}),
        }

    def author_query(args, book_query):
        books = book_query.with_entities(Book.author_id).distinct().subquery()
        return Query([]) \
            .select_from(Author) \
            .join(books, books.c.author_id == Author.id)

    return JoinType(
        name="Book",
        fields=fields,
        fetch_immediates=fetch_immediates_from_database,
    )

book_join_type = create_book_join_type()

def create_author_join_type():
    def fields():
        return {
            "id": field(column_name="id", type=GraphQLInt),
            "name": field(column_name="name", type=GraphQLString),
        }

    return JoinType(
        name="Author",
        fields=fields,
        fetch_immediates=fetch_immediates_from_database,
    )
author_join_type = create_author_join_type()

我们可以通过调用 execute 来执行查询

from graphjoiner import execute

query = """
    {
        books(genre: "comedy") {
            title
            author {
                name
            }
        }
    }
"""

class Context(object):
    def __init__(self, session):
        self.session = session

execute(root, query, context=Context(session))

它产生

{
    "books": [
        {
            "title": "Leave It to Psmith",
            "author": {
                "name": "PG Wodehouse"
            }
        },
        {
            "title": "Right Ho, Jeeves",
            "author": {
                "name": "PG Wodehouse"
            }
        },
        {
            "title": "Catch-22",
            "author": {
                "name": "Joseph Heller"
            }
        },
    ]
}

让我们稍微分解一下,从根对象的定义开始

def create_root():
    def fields():
        return {
            "books": many(
                book_join_type,
                books_query,
                args={"genre": GraphQLArgument(type=GraphQLString)}
            )
        }

    def books_query(args, _):
        query = Query([]).select_from(Book)

        if "genre" in args:
            query = query.filter(Book.genre == args["genre"])

        return query

    return RootJoinType(name="Root", fields=fields)

root = create_root()

对于每种对象类型,我们需要定义其字段。根节点只有一个字段,即 books,它表示一个一对一的关系,我们使用 many() 来定义。第一个参数 book_join_type 是我们定义关系的类型。第二个参数描述了如何创建一个查询来表示所有相关书籍:在这种情况下,所有书籍,可能通过一个类型参数进行筛选。

这意味着我们需要定义 book_join_type

def create_book_join_type():
    def fields():
        return {
            "id": field(column_name="id", type=GraphQLInt),
            "title": field(column_name="title", type=GraphQLString),
            "genre": field(column_name="genre", type=GraphQLString),
            "authorId": field(column_name="author_id", type=GraphQLInt),
            "author": single(author_join_type, author_query, join={"authorId": "id"}),
        }

    def author_query(args, book_query):
        books = book_query.with_entities(Book.author_id).distinct().subquery()
        return Query([]) \
            .select_from(Author) \
            .join(books, books.c.author_id == Author.id)

    return JoinType(
        name="Book",
        fields=fields,
        fetch_immediates=fetch_immediates_from_database,
    )

book_join_type = create_book_join_type()

author 字段被定义为从书籍到作者的单一映射。像之前一样,我们定义一个函数来生成请求的作者的查询。我们还提供一个 join 参数给 single(),以便 GraphJoiner 知道如何将作者查询和书籍查询的结果连接起来:在这种情况下,书籍中的 authorId 字段对应于作者中的 id 字段。(如果我们省略 join 参数,那么 GraphJoiner 将执行交叉连接,即笛卡尔积。由于始终存在一个根实例,这适用于在根上定义的关系。)

剩余的字段定义了 GraphQL 字段到数据库列的映射。这个映射由 fetch_immediates_from_database 处理。在 fetch_immediates() 中的 selections 的值是未定义为关系的字段的选择(使用 singlemany),这些字段要么在原始 GraphQL 查询中明确请求,要么作为连接的一部分是必需的。

def fetch_immediates_from_database(selections, query, context):
    query = query.with_entities(*(
        fields[selection.field_name].column_name
        for selection in selections
    ))
    keys = tuple(selection.key for selection in selections)

    return [
        dict(zip(keys, row))
        for row in query.with_session(context.session).all()
    ]

为了完整性,我们可以调整 author_join_type 的定义,以便我们可以通过作者请求书籍

def create_author_join_type():
    def fields():
        return {
            "id": field(column_name="id", type=GraphQLInt),
            "name": field(column_name="name", type=GraphQLString),
            "author": many(book_join_type, book_query, join={"id": "authorId"}),
        }

    def book_query(args, author_query):
        authors = author_query.with_entities(Author.id).distinct().subquery()
        return Query([]) \
            .select_from(Book) \
            .join(authors, authors.c.id == Book.author_id)

    return JoinType(
        name="Author",
        fields=fields,
        fetch_immediates=fetch_immediates_from_database,
    )

author_join_type = create_author_join_type()

安装

pip install graphjoiner

项目详细信息


下载文件

下载适用于您的平台的文件。如果您不确定选择哪个,请了解有关安装包的更多信息。

源分发

graphjoiner-0.3.1.tar.gz (9.2 kB 查看哈希值)

上传时间

构建分发

graphjoiner-0.3.1-py2.py3-none-any.whl (11.1 kB 查看哈希值)

上传时间 Python 2 Python 3

支持

AWSAWS云计算和安全赞助商DatadogDatadog监控FastlyFastlyCDNGoogleGoogle下载分析MicrosoftMicrosoftPSF赞助商PingdomPingdom监控SentrySentry错误日志StatusPageStatusPage状态页面