跳转到主要内容

Zope3的Elasticsearch客户端

项目描述

此软件包提供Zope3的Elasticsearch客户端。

README

此软件包提供Elasticsearch客户端。注意,我们在Elasticsearch服务器存根中使用了不同的端口(45299而不是9200)。有关更多信息,请参阅elasticsearch/config。

>>> from pprint import pprint
>>> from p01.elasticsearch import interfaces
>>> from p01.elasticsearch.pool import ServerPool
>>> from p01.elasticsearch.pool import ElasticSearchConnectionPool
>>> servers = ['localhost:45299']
>>> serverPool = ServerPool(servers, retryDelay=10, timeout=5)
>>> import p01.elasticsearch.testing
>>> statusRENormalizer = p01.elasticsearch.testing.statusRENormalizer

ElasticSearchConnectionPool

我们需要设置一个Elasticsearch连接池

>>> connectionPool = ElasticSearchConnectionPool(serverPool)

连接池将连接存储在线程局部中。您可以设置重连时间,默认设置为60秒

>>> connectionPool
<ElasticSearchConnectionPool localhost:45299>
>>> connectionPool.reConnectIntervall
60
>>> connectionPool.reConnectIntervall = 30
>>> connectionPool.reConnectIntervall
30

ElasticSearchConnection

现在我们可以从池中获取一个持久连接,该连接由线程局部观察

>>> conn = connectionPool.connection
>>> conn
<ElasticSearchConnection localhost:45299>

这种连接提供了一个服务器池,连接可以从中选择。如果某个服务器宕机,另一个服务器将被使用。连接也会在所有服务器之间平衡HTTP连接。

>>> conn.serverPool
<ServerPool retryDelay:10 localhost:45299>
>>> conn.serverPool.info
'localhost:45299'

还提供了一个maxRetries值。如果默认值为None,连接将选择活动的服务器的最大重试次数,例如len(self.serverPool.aliveServers)。

>>> conn.maxRetries is None
True

另一个名为autoRefresh的属性负责在先前的连接调用更改搜索索引时隐式调用刷新,例如索引调用应该做的那样。

>>> conn.autoRefresh
False

还有一个批量大小的标记。这意味着如果我们使用某些方法提供的批量标记,bulkMaxSize值确保在发送到服务器之前不会缓存超过给定数量的项目。

>>> conn.bulkMaxSize
400

映射配置

我们的测试设置使用预定义的映射配置。我想,这可能是大多数项目中的常见用例。我并不是特别喜欢动态映射,至少在迁移和旧数据处理方面是这样。但当然,对于某些用例,动态映射是一个很好的功能。至少,如果你必须索引被检索的数据并搜索所有(所有)字段,这会很有用。让我们测试我们的预定义映射。

直到Elasticsearch版本19.1,这会返回{},但现在它返回状态404,因此我们的代码会抛出异常。这将在Elasticsearch 19.5中修复。

>>> conn.getMapping()
{}

正如你所看到的,我们还没有默认的映射。首先我们需要索引至少一个项目。让我们索引一个第一个工作项目。

>>> job = {'title': u'Wir suchen einen Marketingplaner',
...        'description': u'Wir bieten eine gute Anstellung'}
>>> pprint(conn.index(job, 'testing', 'job', 1))
{u'_id': u'1',
 u'_index': u'testing',
 u'_type': u'job',
 u'_version': 1,
 u'ok': True}
>>> statusRENormalizer.pprint(conn.getMapping())
{u'testing': {u'job': {u'_all': {u'store': u'yes'},
                       u'_id': {u'store': u'yes'},
                       u'_index': {u'enabled': True},
                       u'_type': {u'store': u'yes'},
                       u'properties': {u'__name__': {u'boost': 2.0,
                                                     u'include_in_all': False,
                                                     u'null_value': u'na',
                                                     u'type': u'string'},
                                       u'contact': {u'include_in_all': False,
                                                    u'properties': {u'firstname': {u'include_in_all': False,
                                                                                   u'type': u'string'},
                                                                    u'lastname': {u'include_in_all': False,
                                                                                  u'type': u'string'}}},
                                       u'description': {u'include_in_all': True,
                                                        u'null_value': u'na',
                                                        u'type': u'string'},
                                       u'location': {u'geohash': True,
                                                     u'lat_lon': True,
                                                     u'type': u'geo_point'},
                                       u'published': {u'format': u'date_optional_time',
                                                      u'type': u'date'},
                                       u'requirements': {u'properties': {u'description': {u'type': u'string'},
                                                                         u'name': {u'type': u'string'}}},
                                       u'tags': {u'index_name': u'tag',
                                                 u'type': u'string'},
                                       u'title': {u'boost': 2.0,
                                                  u'include_in_all': True,
                                                  u'null_value': u'na',
                                                  u'type': u'string'}}}}}

让我们定义另一个包含更多数据的项目并索引它们。

>>> import datetime
>>> job = {'title': u'Wir suchen einen Buchhalter',
...        'description': u'Wir bieten Ihnen eine gute Anstellung',
...        'requirements': [
...            {'name': u'MBA', 'description': u'MBA Abschluss'}
...        ],
...        'tags': [u'MBA', u'certified'],
...        'published': datetime.datetime(2011, 02, 24, 12, 0, 0),
...        'contact': {
...            'firstname': u'Jessy',
...            'lastname': u'Ineichen',
...        },
...        'location':  [-71.34, 41.12]}
>>> pprint(conn.index(job, 'testing', 'job', 2))
{u'_id': u'2',
 u'_index': u'testing',
 u'_type': u'job',
 u'_version': 1,
 u'ok': True}
>>> import time
>>> time.sleep(1)

获取

现在让我们通过其id从我们的索引中获取工作项目。但在这样做之前,刷新我们的索引。

>>> statusRENormalizer.pprint(conn.get(2, "testing", "job"))
{u'_id': u'2',
 u'_index': u'testing',
 u'_source': {u'contact': {u'firstname': u'Jessy', u'lastname': u'Ineichen'},
              u'description': u'Wir bieten Ihnen eine gute Anstellung',
              u'location': [..., ...],
              u'published': datetime.datetime(2011, 2, 24, 12, 0),
              u'requirements': [{u'description': u'MBA Abschluss',
                                 u'name': u'MBA'}],
              u'tags': [u'MBA', u'certified'],
              u'title': u'Wir suchen einen Buchhalter'},
 u'_type': u'job',
 u'_version': 1,
 u'exists': True}

索引

此测试将在我们的测试设置方法中设置一些示例数据。之后,将为本次测试在另一个沙箱中启动一个新的Elasticsearch实例。有关示例数据和Elasticsearch服务器设置的信息,请参阅p01/elasticsearch/test.py文件。

我们将测试是否可以删除现有的索引并使用相同的映射再次创建它们。

>>> import json
>>> from pprint import pprint
>>> import p01.elasticsearch.testing
>>> statusRENormalizer = p01.elasticsearch.testing.statusRENormalizer

现在让我们根据我们的服务器池定义一个新的Elasticsearch连接。

>>> conn = p01.elasticsearch.testing.getTestConnection()

现在我们已准备好访问Elasticsearch服务器。检查状态。

>>> statusRENormalizer.pprint(conn.status())
{u'_shards': {u'failed': 0, u'successful': 1, u'total': 1},
 u'indices': {u'companies': {u'docs': {u'deleted_docs': 0,
                                       u'max_doc': ...,
                                       u'num_docs': ...},
                             u'flush': {u'total': 0,
                                        u'total_time': u'...',
                                        u'total_time_in_millis': ...},
                             u'index': {u'primary_size': u'...',
                                        u'primary_size_in_bytes': ...,
                                        u'size': u'...',
                                        u'size_in_bytes': ...},
                             u'merges': {u'current': 0,
                                         u'current_docs': 0,
                                         u'current_size': u'0b',
                                         u'current_size_in_bytes': 0,
                                         u'total': 0,
                                         u'total_docs': 0,
                                         u'total_size': u'0b',
                                         u'total_size_in_bytes': 0,
                                         u'total_time': u'...',
                                         u'total_time_in_millis': ...},
                             u'refresh': {u'total': ...,
                                          u'total_time': u'...',
                                          u'total_time_in_millis': ...},
                             u'shards': {u'0': [{u'docs': {u'deleted_docs': 0,
                                                           u'max_doc': ...,
                                                           u'num_docs': ...},
                                                 u'flush': {u'total': 0,
                                                            u'total_time': u'...',
                                                            u'total_time_in_millis': ...},
                                                 u'index': {u'size': u'...',
                                                            u'size_in_bytes': ...},
                                                 u'merges': {u'current': 0,
                                                             u'current_docs': 0,
                                                             u'current_size': u'0b',
                                                             u'current_size_in_bytes': 0,
                                                             u'total': 0,
                                                             u'total_docs': 0,
                                                             u'total_size': u'0b',
                                                             u'total_size_in_bytes': 0,
                                                             u'total_time': u'...',
                                                             u'total_time_in_millis': ...},
                                                 u'refresh': {u'total': ...,
                                                              u'total_time': u'...',
                                                              u'total_time_in_millis': ...},
                                                 u'routing': {u'index': u'companies',
                                                              u'node': u'...',
                                                              u'primary': True,
                                                              u'relocating_node': None,
                                                              u'shard': 0,
                                                              u'state': u'STARTED'},
                                                 u'state': u'STARTED',
                                                 u'translog': {u'id': ...,
                                                               u'operations': 0}}]},
                             u'translog': {u'operations': 0}}},
 u'ok': True}

正如你所看到的,我们可以测试我们创建的示例数据映射。

>>> pprint(conn.getMapping('companies', 'company'))
{u'company': {u'properties': {u'__name__': {u'type': u'string'},
                              u'city': {u'type': u'string'},
                              u'number': {u'ignore_malformed': False,
                                          u'type': u'long'},
                              u'street': {u'type': u'string'},
                              u'text': {u'type': u'string'},
                              u'zip': {u'type': u'string'}}}}

并在我们的测试设置中的示例数据生成器中添加搜索我们的示例数据。

>>> pprint(conn.search('street').total)
100

deleteIndex

现在我们将删除索引。

>>> conn.deleteIndex('companies')
{u'acknowledged': True, u'ok': True}

正如你所看到的,索引已经不再存在。

>>> statusRENormalizer.pprint(conn.status())
{u'_shards': {u'failed': 0, u'successful': 0, u'total': 0},
 u'indices': {},
 u'ok': True}

createIndex

现在我们可以再次创建索引。让我们获取我们的示例数据映射。

>>> import os.path
>>> import json
>>> import p01.elasticsearch
>>> mFile = os.path.join(os.path.dirname(p01.elasticsearch.__file__),
...     'sample', 'config', 'companies', 'company.json')
>>> f = open(mFile)
>>> data = f.read()
>>> f.close()
>>> mappings = json.loads(data)
>>> pprint(mappings)
{u'company': {u'_all': {u'enabled': True, u'store': u'yes'},
              u'_id': {u'store': u'yes'},
              u'_index': {u'enabled': True},
              u'_source': {u'enabled': False},
              u'_type': {u'store': u'yes'},
              u'properties': {u'__name__': {u'include_in_all': False,
                                            u'index': u'not_analyzed',
                                            u'store': u'yes',
                                            u'type': u'string'},
                              u'_id': {u'include_in_all': False,
                                       u'index': u'no',
                                       u'store': u'yes',
                                       u'type': u'string'},
                              u'city': {u'boost': 1.0,
                                        u'include_in_all': True,
                                        u'index': u'not_analyzed',
                                        u'null_value': u'na',
                                        u'store': u'yes',
                                        u'type': u'string'},
                              u'street': {u'boost': 1.0,
                                          u'include_in_all': True,
                                          u'index': u'not_analyzed',
                                          u'null_value': u'na',
                                          u'store': u'yes',
                                          u'type': u'string'},
                              u'text': {u'boost': 1.0,
                                        u'include_in_all': True,
                                        u'index': u'not_analyzed',
                                        u'null_value': u'na',
                                        u'store': u'yes',
                                        u'type': u'string'},
                              u'zip': {u'boost': 1.0,
                                       u'include_in_all': True,
                                       u'index': u'not_analyzed',
                                       u'null_value': u'na',
                                       u'store': u'yes',
                                       u'type': u'string'}}}}

现在我们可以使用给定的映射创建一个新的索引。

>>> conn.createIndex('companies', mappings=mappings)
{u'acknowledged': True, u'ok': True}

正如你所看到的,索引和映射又回来了。

>>> statusRENormalizer.pprint(conn.status())
{u'_shards': {u'failed': 0, u'successful': 1, u'total': 1},
 u'indices': {u'companies': {u'docs': {u'deleted_docs': 0,
                                       u'max_doc': ...,
                                       u'num_docs': ...},
                             u'flush': {u'total': 0,
                                        u'total_time': u'...',
                                        u'total_time_in_millis': ...},
                             u'index': {u'primary_size': u'...',
                                        u'primary_size_in_bytes': ...,
                                        u'size': u'...',
                                        u'size_in_bytes': ...},
                             u'merges': {u'current': 0,
                                         u'current_docs': 0,
                                         u'current_size': u'0b',
                                         u'current_size_in_bytes': 0,
                                         u'total': 0,
                                         u'total_docs': 0,
                                         u'total_size': u'0b',
                                         u'total_size_in_bytes': 0,
                                         u'total_time': u'...',
                                         u'total_time_in_millis': ...},
                             u'refresh': {u'total': ...,
                                          u'total_time': u'...',
                                          u'total_time_in_millis': ...},
                             u'shards': {u'0': [{u'docs': {u'deleted_docs': 0,
                                                           u'max_doc': ...,
                                                           u'num_docs': ...},
                                                 u'flush': {u'total': 0,
                                                            u'total_time': u'...',
                                                            u'total_time_in_millis': ...},
                                                 u'index': {u'size': u'...',
                                                            u'size_in_bytes': ...},
                                                 u'merges': {u'current': 0,
                                                             u'current_docs': 0,
                                                             u'current_size': u'0b',
                                                             u'current_size_in_bytes': 0,
                                                             u'total': 0,
                                                             u'total_docs': 0,
                                                             u'total_size': u'0b',
                                                             u'total_size_in_bytes': 0,
                                                             u'total_time': u'...',
                                                             u'total_time_in_millis': ...},
                                                 u'refresh': {u'total': ...,
                                                              u'total_time': u'...',
                                                              u'total_time_in_millis': ...},
                                                 u'routing': {u'index': u'companies',
                                                              u'node': u'...',
                                                              u'primary': True,
                                                              u'relocating_node': None,
                                                              u'shard': 0,
                                                              u'state': u'STARTED'},
                                                 u'state': u'STARTED',
                                                 u'translog': {u'id': ...,
                                                               u'operations': 0}}]},
                             u'translog': {u'operations': 0}}},
 u'ok': True}
>>> pprint(conn.getMapping('companies', 'company'))
{u'company': {u'_all': {u'store': u'yes'},
              u'_id': {u'store': u'yes'},
              u'_index': {u'enabled': True},
              u'_source': {u'enabled': False},
              u'_type': {u'store': u'yes'},
              u'properties': {u'__name__': {u'include_in_all': False,
                                            u'index': u'not_analyzed',
                                            u'store': u'yes',
                                            u'type': u'string'},
                              u'city': {u'include_in_all': True,
                                        u'index': u'not_analyzed',
                                        u'null_value': u'na',
                                        u'store': u'yes',
                                        u'type': u'string'},
                              u'street': {u'include_in_all': True,
                                          u'index': u'not_analyzed',
                                          u'null_value': u'na',
                                          u'store': u'yes',
                                          u'type': u'string'},
                              u'text': {u'include_in_all': True,
                                        u'index': u'not_analyzed',
                                        u'null_value': u'na',
                                        u'store': u'yes',
                                        u'type': u'string'},
                              u'zip': {u'include_in_all': True,
                                       u'index': u'not_analyzed',
                                       u'null_value': u'na',
                                       u'store': u'yes',
                                       u'type': u'string'}}}}

正如你所看到的,索引是空的。

>>> pprint(conn.search('street').total)
0

映射

注意:此测试将在端口45299上启动和运行Elasticsearch服务器!

此测试对一些映射配置进行了实验。由于Elasticsearch文档对我来说并不十分清晰,我试图找出映射部分应该如何完成。

>>> from pprint import pprint
>>> from p01.elasticsearch import interfaces
>>> from p01.elasticsearch.pool import ServerPool
>>> from p01.elasticsearch.pool import ElasticSearchConnectionPool

设置连接

>>> servers = ['localhost:45299']
>>> serverPool = ServerPool(servers)
>>> connectionPool = ElasticSearchConnectionPool(serverPool)
>>> conn = connectionPool.connection

让我们设置映射定义。

>>> mapping = {
...     'item': {
...         'properties': {
...             'boolean': {
...                 'type': 'boolean'
...             },
...             'date': {
...                 'type': 'date'
...             },
...             'datetime': {
...                 'type': 'date'
...             },
...             'double': {
...                 'type': 'double'
...             },
...             'float': {
...                 'type': 'float'
...             },
...             'integer': {
...                 'type': 'integer'
...             },
...             'long': {
...                 'type': 'long'
...             },
...             'string': {
...                 'type': 'string',
...                 'null_value' : 'nada'
...             },
...         }
...     }
... }

现在让我们使用我们的putMapping方法添加映射并调用刷新。

>>> conn.putMapping(mapping, 'test-mapping', 'item')
Traceback (most recent call last):
...
IndexMissingException: [test-mapping] missing

正如你所看到的,由于我们的索引还不存在,抛出了一个异常。让我们添加我们的测试-mapping索引并再次尝试。

>>> conn.createIndex('test-mapping')
{u'acknowledged': True, u'ok': True}
>>> pprint(conn.refresh('test-mapping', 4))
{u'_shards': {u'failed': 0, u'successful': ..., u'total': 10}, u'ok': True}
>>> conn.putMapping(mapping, 'test-mapping', 'item')
{u'acknowledged': True, u'ok': True}
>>> pprint(conn.refresh('test-mapping', 4))
{u'_shards': {u'failed': 0, u'successful': ..., u'total': 10}, u'ok': True}

并获取我们的映射

>>> pprint(conn.getMapping('test-mapping', 'item'), width=60)
{u'item': {u'properties': {u'boolean': {u'type': u'boolean'},
                           u'date': {u'format': u'dateOptionalTime',
                                     u'type': u'date'},
                           u'datetime': {u'format': u'dateOptionalTime',
                                         u'type': u'date'},
                           u'double': {u'type': u'double'},
                           u'float': {u'type': u'float'},
                           u'integer': {u'type': u'integer'},
                           u'long': {u'type': u'long'},
                           u'string': {u'null_value': u'nada',
                                       u'type': u'string'}}}}

现在让我们索引一个新项目。

>>> import datetime
>>> doc = {'boolean': True,
...        'datetime': datetime.datetime(2011, 02, 24, 12, 0, 0),
...        'date': datetime.date(2011, 02, 24),
...        'float': float(42),
...        'integer': int(42),
...        'long': long(42*10000000000000000),
...        'string': 'string'}
>>> conn.index(doc, 'test-mapping', 'item', 1)
{u'_type': u'item', u'_id': u'1', u'ok': True, u'_version': 1, u'_index': u'test-mapping'}

刷新索引

>>> pprint(conn.refresh('test-mapping', 4))
{u'_shards': {u'failed': 0, u'successful': 5, u'total': 10}, u'ok': True}

并搜索我们的索引项

>>> response = conn.search('string', 'test-mapping', 'item')
>>> data = response.data
>>> pprint(data)
{u'_shards': {u'failed': 0, u'successful': 5, u'total': 5},
 u'hits': {u'hits': [{u'_id': u'1',
                      u'_index': u'test-mapping',
                      u'_score': ...,
                      u'_source': {u'boolean': True,
                                   u'date': datetime.datetime(2011, 2, 24, 0, 0),
                                   u'datetime': datetime.datetime(2011, 2, 24, 12, 0),
                                   u'float': 42.0,
                                   u'integer': 42,
                                   u'long': 420000000000000000L,
                                   u'string': u'string'},
                      u'_type': u'item'}],
           u'max_score': ...,
           u'total': 1},
 u'timed_out': False,
 u'took': ...}

现在检查我们的值

>>> source = data['hits']['hits'][0]['_source']
>>> pprint(source)
{u'boolean': True,
 u'date': datetime.datetime(2011, 2, 24, 0, 0),
 u'datetime': datetime.datetime(2011, 2, 24, 12, 0),
 u'float': 42.0,
 u'integer': 42,
 u'long': 420000000000000000L,
 u'string': u'string'}
>>> isinstance(source['boolean'], bool)
True
>>> isinstance(source['datetime'], datetime.datetime)
True
>>> isinstance(source['date'], datetime.date)
True
>>> isinstance(source['float'], float)
True
>>> isinstance(source['integer'], int)
True
>>> isinstance(source['long'], long)
True
>>> isinstance(source['string'], basestring)
True
>>> isinstance(source['string'], unicode)
True

注意,datetime和date也是datetime和date项。

>>> isinstance(source['date'], datetime.datetime)
True
>>> isinstance(source['datetime'], datetime.date)
True

扫描搜索类型

注意:此测试将在端口45299上启动和运行Elasticsearch服务器!

让我们做一些不使用连接池的简单测试。

>>> from pprint import pprint
>>> from p01.elasticsearch.connection import ElasticSearchConnection
>>> from p01.elasticsearch.exceptions import ElasticSearchServerException
>>> from p01.elasticsearch.pool import ServerPool
>>> servers = ['localhost:45299']
>>> serverPool = ServerPool(servers)

现在我们能够获取一个由线程局部观察的持久连接。

>>> conn = ElasticSearchConnection(serverPool)

设置一个测试映射并添加一些文档

>>> conn.createIndex('scanning')
{u'acknowledged': True, u'ok': True}
>>> for i in range(1000):
...     _id = unicode(i)
...     doc = {'_id': _id, 'dummy': u'dummy'}
...     ignored = conn.index(doc, 'scanning', 'doc')
>>> conn.refresh('scanning')
{u'ok': True, u'_shards': {u'successful': 5, u'failed': 0, u'total': 10}}

让我们展示如何使用扫描方法批量处理大量搜索结果。

>>> pprint(conn.search('dummy', 'scanning').total)
1000
>>> result = list(conn.scan('dummy', 'scanning'))
>>> len(result)
1000
>>> pprint(sorted(result)[:5])
[{u'_id': u'0',
  u'_index': u'scanning',
  u'_score': 0.0,
  u'_source': {u'_id': u'0', u'dummy': u'dummy'},
  u'_type': u'doc'},
 {u'_id': u'1',
  u'_index': u'scanning',
  u'_score': 0.0,
  u'_source': {u'_id': u'1', u'dummy': u'dummy'},
  u'_type': u'doc'},
 {u'_id': u'10',
  u'_index': u'scanning',
  u'_score': 0.0,
  u'_source': {u'_id': u'10', u'dummy': u'dummy'},
  u'_type': u'doc'},
 {u'_id': u'100',
  u'_index': u'scanning',
  u'_score': 0.0,
  u'_source': {u'_id': u'100', u'dummy': u'dummy'},
  u'_type': u'doc'},
 {u'_id': u'101',
  u'_index': u'scanning',
  u'_score': 0.0,
  u'_source': {u'_id': u'101', u'dummy': u'dummy'},
  u'_type': u'doc'}]

批量操作

注意:此测试将在端口45299上启动和运行Elasticsearch服务器!

这个测试展示了如何使用批量概念索引项目。

>>> from pprint import pprint
>>> from p01.elasticsearch import interfaces
>>> from p01.elasticsearch.pool import ServerPool
>>> from p01.elasticsearch.pool import ElasticSearchConnectionPool
>>> servers = ['localhost:45299']
>>> serverPool = ServerPool(servers)

现在我们可以从池中获取一个持久连接,该连接由线程局部观察

>>> connectionPool = ElasticSearchConnectionPool(serverPool)
>>> conn = connectionPool.connection
>>> conn
<ElasticSearchConnection localhost:45299>

将bulkMaxSize设置为5。这意味着如果我们索引5个项目,索引方法将隐式地向服务器发送索引请求

>>> conn.bulkMaxSize = 5
>>> conn.bulkMaxSize
5

让我们批量索引一些项目

>>> doc = {'title': u'Wir suchen einen Marketingplaner',
...        'description': u'Wir bieten Ihnen eine gute Anstellung'}
>>> conn.bulkIndex(doc, 'testing', 'job', 1)
>>> doc = {'title': u'Wir suchen einen Buchhalter',
...        'description': u'Wir bieten Ihnen eine gute Anstellung'}
>>> conn.bulkIndex(doc, 'testing', 'job', 2)

现在提交我们的批量数据。即使我们没有索引完整的bulkMaxSize

>>> pprint(conn.bulkCommit())
{u'items': [{u'index': {u'_id': u'1',
                        u'_index': u'testing',
                        u'_type': u'job',
                        u'_version': 1,
                        u'ok': True}},
            {u'index': {u'_id': u'2',
                        u'_index': u'testing',
                        u'_type': u'job',
                        u'_version': 1,
                        u'ok': True}}],
 u'took': ...}
>>> conn.bulkCounter
0

现在我们搜索这些项目

>>> response = conn.search("Anstellung", 'testing', 'job')
>>> pprint(response.data)
{u'_shards': {u'failed': 0, u'successful': 5, u'total': 5},
 u'hits': {u'hits': [], u'max_score': None, u'total': 0},
 u'timed_out': False,
 u'took': ...}

如您所见,我们没有提交数据,因为我们没有使用refresh参数。现在让我们调用refresh

>>> conn.refresh('testing')
{u'ok': True, u'_shards': {u'successful': 5, u'failed': 0, u'total': 10}}
然后再次搜索
>>> response = conn.search("Anstellung", 'testing', 'job')
>>> pprint(response.data)
{u'_shards': {u'failed': 0, u'successful': 5, u'total': 5},
 u'hits': {u'hits': [{u'_id': u'1',
                      u'_index': u'testing',
                      u'_score': ...,
                      u'_source': {u'description': u'Wir bieten Ihnen eine gute Anstellung',
                                   u'title': u'Wir suchen einen Marketingplaner'},
                      u'_type': u'job'},
                     {u'_id': u'2',
                      u'_index': u'testing',
                      u'_score': ...,
                      u'_source': {u'description': u'Wir bieten Ihnen eine gute Anstellung',
                                   u'title': u'Wir suchen einen Buchhalter'},
                      u'_type': u'job'}],
           u'max_score': ...,
           u'total': 2},
 u'timed_out': False,
 u'took': ...}

让我们继续索引项目,直到达到bulkMaxSize

>>> len(conn.bulkItems)
0
>>> doc = {'title': u'Wir suchen einen Koch',
...        'description': u'Wir bieten Ihnen eine gute Anstellung'}
>>> conn.bulkIndex(doc, 'testing', 'job', 3)
>>> conn.bulkCounter
1
>>> doc = {'title': u'Wir suchen einen Sachbearbeiter',
...        'description': u'Wir bieten Ihnen eine gute Anstellung'}
>>> conn.bulkIndex(doc, 'testing', 'job', 4)
>>> conn.bulkCounter
2
>>> doc = {'title': u'Wir suchen einen Mechaniker',
...        'description': u'Wir bieten Ihnen eine gute Anstellung'}
>>> conn.bulkIndex(doc, 'testing', 'job', 5)
>>> conn.bulkCounter
3
>>> doc = {'title': u'Wir suchen einen Exportfachmann',
...        'description': u'Wir bieten Ihnen eine gute Anstellung'}
>>> conn.bulkIndex(doc, 'testing', 'job', 6)
>>> conn.bulkCounter
4

现在,我们的bulkMaxSize强制提交数据

>>> doc = {'title': u'Wir suchen einen Entwickler',
...        'description': u'Wir bieten Ihnen eine gute Anstellung'}
>>> pprint(conn.bulkIndex(doc, 'testing', 'job', 7))
{u'items': [{u'index': {u'_id': u'3',
                        u'_index': u'testing',
                        u'_type': u'job',
                        u'_version': 1,
                        u'ok': True}},
            {u'index': {u'_id': u'4',
                        u'_index': u'testing',
                        u'_type': u'job',
                        u'_version': 1,
                        u'ok': True}},
            {u'index': {u'_id': u'5',
                        u'_index': u'testing',
                        u'_type': u'job',
                        u'_version': 1,
                        u'ok': True}},
            {u'index': {u'_id': u'6',
                        u'_index': u'testing',
                        u'_type': u'job',
                        u'_version': 1,
                        u'ok': True}},
            {u'index': {u'_id': u'7',
                        u'_index': u'testing',
                        u'_type': u'job',
                        u'_version': 1,
                        u'ok': True}}],
 u'took': ...}

只需等待服务器默认每秒自动调用refresh

>>> import time
>>> time.sleep(1)
>>> len(conn.bulkItems)
0

如您所见,我们已经全部索引了7个项目

>>> response = conn.search("Anstellung", 'testing', 'job')
>>> pprint(response.data)
{u'_shards': {u'failed': 0, u'successful': 5, u'total': 5},
 u'hits': {u'hits': [{u'_id': u'1',
                      u'_index': u'testing',
                      u'_score': ...,
                      u'_source': {u'description': u'Wir bieten Ihnen eine gute Anstellung',
                                   u'title': u'Wir suchen einen Marketingplaner'},
                      u'_type': u'job'},
                     {u'_id': u'6',
                      u'_index': u'testing',
                      u'_score': ...,
                      u'_source': {u'description': u'Wir bieten Ihnen eine gute Anstellung',
                                   u'title': u'Wir suchen einen Exportfachmann'},
                      u'_type': u'job'},
                     {u'_id': u'2',
                      u'_index': u'testing',
                      u'_score': ...,
                      u'_source': {u'description': u'Wir bieten Ihnen eine gute Anstellung',
                                   u'title': u'Wir suchen einen Buchhalter'},
                      u'_type': u'job'},
                     {u'_id': u'7',
                      u'_index': u'testing',
                      u'_score': ...,
                      u'_source': {u'description': u'Wir bieten Ihnen eine gute Anstellung',
                                   u'title': u'Wir suchen einen Entwickler'},
                      u'_type': u'job'},
                     {u'_id': u'4',
                      u'_index': u'testing',
                      u'_score': ...,
                      u'_source': {u'description': u'Wir bieten Ihnen eine gute Anstellung',
                                   u'title': u'Wir suchen einen Sachbearbeiter'},
                      u'_type': u'job'},
                     {u'_id': u'5',
                      u'_index': u'testing',
                      u'_score': ...,
                      u'_source': {u'description': u'Wir bieten Ihnen eine gute Anstellung',
                                   u'title': u'Wir suchen einen Mechaniker'},
                      u'_type': u'job'},
                     {u'_id': u'3',
                      u'_index': u'testing',
                      u'_score': ...,
                      u'_source': {u'description': u'Wir bieten Ihnen eine gute Anstellung',
                                   u'title': u'Wir suchen einen Koch'},
                      u'_type': u'job'}],
           u'max_score': ...,
           u'total': 7},
 u'timed_out': False,
 u'took': ...}

变更记录

0.6.0 (2014-03-24)

  • 功能:实现了使用PUT请求在_template端点实现的putTemplate方法

0.5.2 (2013-06-28)

  • 错误修复:改进错误处理。如果没有错误信息,则使用JSON响应字符串。

0.5.1 (2012-12-22)

  • 实现了put settings (putSettings)方法

  • 基于更改的Elasticsearch 0.20.1输出修复测试

  • 切换到p01.recipe.setup:importchecker

0.5.0 (2012-11-18)

  • 初始发布

项目详情


下载文件

为您的平台下载文件。如果您不确定要选择哪个,请了解有关安装包的更多信息。

源分发

p01.elasticsearch-0.6.0.zip (76.7 kB 查看散列)

上传时间

支持者

AWS AWS 云计算和安全赞助商 Datadog Datadog 监控 Fastly Fastly CDN Google Google 下载分析 Microsoft Microsoft PSF 赞助商 Pingdom Pingdom 监控 Sentry Sentry 错误记录 StatusPage StatusPage 状态页面