跳转到主要内容

Elasticsearch的Python客户端

项目描述

Elasticsearch DSL是一个高层库,其目标是帮助编写和运行针对Elasticsearch的查询。它构建在官方低级客户端(elasticsearch-py)之上。

它提供了一种更方便、更符合习惯的方法来编写和操作查询。它紧随Elasticsearch JSON DSL,反映其术语和结构。它通过Python直接使用定义的类或类似查询集的表达式暴露了整个DSL的范围。

它还提供了一个可选的包装器,用于将文档作为Python对象进行操作:定义映射、检索和保存文档、将文档数据包装在用户定义的类中。

要使用其他 Elasticsearch API(例如集群健康),只需使用底层客户端。

安装

pip install elasticsearch-dsl

示例

请参阅 示例目录,以查看使用 elasticsearch-dsl 的复杂示例。

兼容性

该库自 2.x 版本的 Elasticsearch 以来都与所有版本兼容,但您必须使用匹配的主版本

对于 Elasticsearch 7.0 及以后的版本,使用库的主版本 7(《7.x.y》)。

对于 Elasticsearch 6.0 及以后的版本,使用库的主版本 6(《6.x.y》)。

对于 Elasticsearch 5.0 及以后的版本,使用库的主版本 5(《5.x.y》)。

对于 Elasticsearch 2.0 及以后的版本,使用库的主版本 2(《2.x.y》)。

在您的 setup.pyrequirements.txt 中设置要求的推荐方法是

# Elasticsearch 7.x
elasticsearch-dsl>=7.0.0,<8.0.0

# Elasticsearch 6.x
elasticsearch-dsl>=6.0.0,<7.0.0

# Elasticsearch 5.x
elasticsearch-dsl>=5.0.0,<6.0.0

# Elasticsearch 2.x
elasticsearch-dsl>=2.0.0,<3.0.0

开发在 master 上进行,较旧的分支仅获得错误修复版本

搜索示例

让我们直接以 dict 的形式编写一个典型的搜索请求

from elasticsearch import Elasticsearch
client = Elasticsearch()

response = client.search(
    index="my-index",
    body={
      "query": {
        "bool": {
          "must": [{"match": {"title": "python"}}],
          "must_not": [{"match": {"description": "beta"}}],
          "filter": [{"term": {"category": "search"}}]
        }
      },
      "aggs" : {
        "per_tag": {
          "terms": {"field": "tags"},
          "aggs": {
            "max_lines": {"max": {"field": "lines"}}
          }
        }
      }
    }
)

for hit in response['hits']['hits']:
    print(hit['_score'], hit['_source']['title'])

for tag in response['aggregations']['per_tag']['buckets']:
    print(tag['key'], tag['max_lines']['value'])

这种方法的缺点是它非常冗长,容易出错,如不正确的嵌套,难以修改(例如添加另一个过滤器),并且肯定不是一件有趣的事情。

让我们使用 Python DSL 重新编写这个示例

from elasticsearch import Elasticsearch
from elasticsearch_dsl import Search

client = Elasticsearch()

s = Search(using=client, index="my-index") \
    .filter("term", category="search") \
    .query("match", title="python")   \
    .exclude("match", description="beta")

s.aggs.bucket('per_tag', 'terms', field='tags') \
    .metric('max_lines', 'max', field='lines')

response = s.execute()

for hit in response:
    print(hit.meta.score, hit.title)

for tag in response.aggregations.per_tag.buckets:
    print(tag.key, tag.max_lines.value)

如您所见,库负责

  • 根据名称(例如“match”)创建适当的 Query 对象

  • 将查询组合成复合 bool 查询

  • term 查询放入 bool 查询的过滤器上下文中

  • 提供对响应数据的方便访问

  • 无需在所有地方都使用花括号或方括号

持久性示例

让我们有一个简单的 Python 类,表示博客系统中的一个文章

from datetime import datetime
from elasticsearch_dsl import Document, Date, Integer, Keyword, Text, connections

# Define a default Elasticsearch client
connections.create_connection(hosts=['localhost'])

class Article(Document):
    title = Text(analyzer='snowball', fields={'raw': Keyword()})
    body = Text(analyzer='snowball')
    tags = Keyword()
    published_from = Date()
    lines = Integer()

    class Index:
        name = 'blog'
        settings = {
          "number_of_shards": 2,
        }

    def save(self, ** kwargs):
        self.lines = len(self.body.split())
        return super(Article, self).save(** kwargs)

    def is_published(self):
        return datetime.now() > self.published_from

# create the mappings in elasticsearch
Article.init()

# create and save and article
article = Article(meta={'id': 42}, title='Hello world!', tags=['test'])
article.body = ''' looong text '''
article.published_from = datetime.now()
article.save()

article = Article.get(id=42)
print(article.is_published())

# Display cluster health
print(connections.get_connection().cluster.health())

在这个例子中,您可以看到

  • 提供一个默认连接

  • 定义带有映射配置的字段

  • 设置索引名称

  • 定义自定义方法

  • 重写内置的 .save() 方法以挂钩到持久性生命周期

  • 检索并将对象保存到 Elasticsearch 中

  • 访问底层客户端以使用其他 API

您可以在文档的持久性章节中看到更多内容。

elasticsearch-py 迁移

您不需要将整个应用程序移植到 Python DSL 的优点,您可以从创建一个从现有 dict 创建的 Search 对象开始,使用 API 修改它,并将其反序列化为 dict

body = {...} # insert complicated query here

# Convert to Search object
s = Search.from_dict(body)

# Add some filters, aggregations, queries, ...
s.filter("term", tags="python")

# Convert back to dict to plug back into existing code
body = s.to_dict()

开发

激活虚拟环境(《虚拟环境》)

$ virtualenv venv
$ source venv/bin/activate

要安装所有必要的开发依赖项,请运行

$ pip install -e '.[develop]'

要运行 elasticsearch-dsl-py 的所有测试,请运行

$ python setup.py test

或者,您可以使用 test_elasticsearch_dsl 中的 run_tests.py 脚本,该脚本包装 pytest,以运行测试套件的子集。以下是一些示例

# Run all of the tests in `test_elasticsearch_dsl/test_analysis.py`
$ ./run_tests.py test_analysis.py

# Run only the `test_analyzer_serializes_as_name` test.
$ ./run_tests.py test_analysis.py::test_analyzer_serializes_as_name

pytest 会跳过来自 test_elasticsearch_dsl/test_integration 的测试,除非存在一个可以建立连接的 Elasticsearch 实例。默认情况下,测试连接尝试在 localhost:9200 进行,根据 elasticsearch-py 连接 类中指定的默认值。 由于运行集成测试会导致 Elasticsearch 集群发生破坏性更改,因此只有在相关的集群为空时才运行它们。 因此,如果 localhost:9200 上的 Elasticsearch 实例不符合这些要求,可以通过 TEST_ES_SERVER 环境变量指定不同的测试 Elasticsearch 服务器。

$ TEST_ES_SERVER=my-test-server:9201 ./run_tests

文档

文档可在 https://elasticsearch-dsl.elastic.ac.cn 获得。

贡献指南

想要对 Elasticsearch DSL 进行黑客攻击?太棒了!我们有一个 贡献指南

许可

版权 2013 Elasticsearch

根据 Apache 许可证 2.0 版(“许可证”);除非您遵守许可证,否则不得使用此文件。您可以在以下位置获得许可证副本:

https://apache.ac.cn/licenses/LICENSE-2.0

除非适用法律要求或书面同意,否则根据许可证分发的软件按“原样”基础分发,不提供任何形式的保证或条件,无论是明示的还是隐含的。有关许可证中规定的特定语言管理权限和限制,请参阅许可证。

项目详情


下载文件

下载适合您平台的文件。如果您不确定选择哪个,请了解有关 安装包 的更多信息。

源分发

elasticsearch7-dsl-7.3.0.post1.tar.gz (54.1 kB 查看哈希)

上传时间

构建分发

elasticsearch7_dsl-7.3.0.post1-py2.py3-none-any.whl (63.1 kB 查看哈希)

上传时间 Python 2 Python 3

由以下赞助商支持

AWS AWS 云计算和安全赞助商 Datadog Datadog 监控 Fastly Fastly CDN Google Google 下载分析 Microsoft Microsoft PSF 赞助商 Pingdom Pingdom 监控 Sentry Sentry 错误记录 StatusPage StatusPage 状态页面