Zope3的Elasticsearch客户端
项目描述
此软件包提供Zope3的Elasticsearch客户端。
README
此软件包提供Elasticsearch客户端。注意,我们在Elasticsearch服务器存根中使用了不同的端口(45299而不是9200)。有关更多信息,请参阅elasticsearch/config。
>>> from pprint import pprint >>> from p01.elasticsearch import interfaces >>> from p01.elasticsearch.pool import ServerPool >>> from p01.elasticsearch.pool import ElasticSearchConnectionPool>>> servers = ['localhost:45299'] >>> serverPool = ServerPool(servers, retryDelay=10, timeout=5)>>> import p01.elasticsearch.testing >>> statusRENormalizer = p01.elasticsearch.testing.statusRENormalizer
ElasticSearchConnectionPool
我们需要设置一个Elasticsearch连接池
>>> connectionPool = ElasticSearchConnectionPool(serverPool)
连接池将连接存储在线程局部中。您可以设置重连时间,默认设置为60秒
>>> connectionPool <ElasticSearchConnectionPool localhost:45299>>>> connectionPool.reConnectIntervall 60>>> connectionPool.reConnectIntervall = 30 >>> connectionPool.reConnectIntervall 30
ElasticSearchConnection
现在我们可以从池中获取一个持久连接,该连接由线程局部观察
>>> conn = connectionPool.connection >>> conn <ElasticSearchConnection localhost:45299>
这种连接提供了一个服务器池,连接可以从中选择。如果某个服务器宕机,另一个服务器将被使用。连接也会在所有服务器之间平衡HTTP连接。
>>> conn.serverPool <ServerPool retryDelay:10 localhost:45299>>>> conn.serverPool.info 'localhost:45299'
还提供了一个maxRetries值。如果默认值为None,连接将选择活动的服务器的最大重试次数,例如len(self.serverPool.aliveServers)。
>>> conn.maxRetries is None True
另一个名为autoRefresh的属性负责在先前的连接调用更改搜索索引时隐式调用刷新,例如索引调用应该做的那样。
>>> conn.autoRefresh False
还有一个批量大小的标记。这意味着如果我们使用某些方法提供的批量标记,bulkMaxSize值确保在发送到服务器之前不会缓存超过给定数量的项目。
>>> conn.bulkMaxSize 400
映射配置
我们的测试设置使用预定义的映射配置。我想,这可能是大多数项目中的常见用例。我并不是特别喜欢动态映射,至少在迁移和旧数据处理方面是这样。但当然,对于某些用例,动态映射是一个很好的功能。至少,如果你必须索引被检索的数据并搜索所有(所有)字段,这会很有用。让我们测试我们的预定义映射。
直到Elasticsearch版本19.1,这会返回{},但现在它返回状态404,因此我们的代码会抛出异常。这将在Elasticsearch 19.5中修复。
>>> conn.getMapping() {}
正如你所看到的,我们还没有默认的映射。首先我们需要索引至少一个项目。让我们索引一个第一个工作项目。
>>> job = {'title': u'Wir suchen einen Marketingplaner', ... 'description': u'Wir bieten eine gute Anstellung'}>>> pprint(conn.index(job, 'testing', 'job', 1)) {u'_id': u'1', u'_index': u'testing', u'_type': u'job', u'_version': 1, u'ok': True}>>> statusRENormalizer.pprint(conn.getMapping()) {u'testing': {u'job': {u'_all': {u'store': u'yes'}, u'_id': {u'store': u'yes'}, u'_index': {u'enabled': True}, u'_type': {u'store': u'yes'}, u'properties': {u'__name__': {u'boost': 2.0, u'include_in_all': False, u'null_value': u'na', u'type': u'string'}, u'contact': {u'include_in_all': False, u'properties': {u'firstname': {u'include_in_all': False, u'type': u'string'}, u'lastname': {u'include_in_all': False, u'type': u'string'}}}, u'description': {u'include_in_all': True, u'null_value': u'na', u'type': u'string'}, u'location': {u'geohash': True, u'lat_lon': True, u'type': u'geo_point'}, u'published': {u'format': u'date_optional_time', u'type': u'date'}, u'requirements': {u'properties': {u'description': {u'type': u'string'}, u'name': {u'type': u'string'}}}, u'tags': {u'index_name': u'tag', u'type': u'string'}, u'title': {u'boost': 2.0, u'include_in_all': True, u'null_value': u'na', u'type': u'string'}}}}}
让我们定义另一个包含更多数据的项目并索引它们。
>>> import datetime >>> job = {'title': u'Wir suchen einen Buchhalter', ... 'description': u'Wir bieten Ihnen eine gute Anstellung', ... 'requirements': [ ... {'name': u'MBA', 'description': u'MBA Abschluss'} ... ], ... 'tags': [u'MBA', u'certified'], ... 'published': datetime.datetime(2011, 02, 24, 12, 0, 0), ... 'contact': { ... 'firstname': u'Jessy', ... 'lastname': u'Ineichen', ... }, ... 'location': [-71.34, 41.12]} >>> pprint(conn.index(job, 'testing', 'job', 2)) {u'_id': u'2', u'_index': u'testing', u'_type': u'job', u'_version': 1, u'ok': True}>>> import time >>> time.sleep(1)
获取
现在让我们通过其id从我们的索引中获取工作项目。但在这样做之前,刷新我们的索引。
>>> statusRENormalizer.pprint(conn.get(2, "testing", "job")) {u'_id': u'2', u'_index': u'testing', u'_source': {u'contact': {u'firstname': u'Jessy', u'lastname': u'Ineichen'}, u'description': u'Wir bieten Ihnen eine gute Anstellung', u'location': [..., ...], u'published': datetime.datetime(2011, 2, 24, 12, 0), u'requirements': [{u'description': u'MBA Abschluss', u'name': u'MBA'}], u'tags': [u'MBA', u'certified'], u'title': u'Wir suchen einen Buchhalter'}, u'_type': u'job', u'_version': 1, u'exists': True}
搜索
现在也让我们尝试进行搜索。
>>> response = conn.search("title:Buchhalter", 'testing', 'job') >>> response <SearchResponse testing/job/_search>>>> statusRENormalizer.pprint(response.data) {u'_shards': {u'failed': 0, u'successful': 5, u'total': 5}, u'hits': {u'hits': [{u'_id': u'2', u'_index': u'testing', u'_score': ..., u'_source': {u'contact': {u'firstname': u'Jessy', u'lastname': u'Ineichen'}, u'description': u'Wir bieten Ihnen eine gute Anstellung', u'location': [..., ...], u'published': datetime.datetime(2011, 2, 24, 12, 0), u'requirements': [{u'description': u'MBA Abschluss', u'name': u'MBA'}], u'tags': [u'MBA', u'certified'], u'title': u'Wir suchen einen Buchhalter'}, u'_type': u'job'}], u'max_score': ..., u'total': 1}, u'timed_out': False, u'took': ...}
正如你所看到的,我们的搜索响应包装器了解一些重要值。
>>> response.start 0>>> response.size 0>>> response.total 1>>> response.pages 1>>> pprint(response.hits) [{u'_id': u'2', u'_index': u'testing', u'_score': ..., u'_source': {u'contact': {u'firstname': u'Jessy', u'lastname': u'Ineichen'}, u'description': u'Wir bieten Ihnen eine gute Anstellung', u'location': [..., ...], u'published': datetime.datetime(2011, 2, 24, 12, 0), u'requirements': [{u'description': u'MBA Abschluss', u'name': u'MBA'}], u'tags': [u'MBA', u'certified'], u'title': u'Wir suchen einen Buchhalter'}, u'_type': u'job'}]
现在让我们搜索多个工作项目。
>>> response = conn.search("Anstellung", 'testing', 'job') >>> pprint(response.data) {u'_shards': {u'failed': 0, u'successful': 5, u'total': 5}, u'hits': {u'hits': [{u'_id': u'1', u'_index': u'testing', u'_score': ..., u'_source': {u'description': u'Wir bieten eine gute Anstellung', u'title': u'Wir suchen einen Marketingplaner'}, u'_type': u'job'}, {u'_id': u'2', u'_index': u'testing', u'_score': ..., u'_source': {u'contact': {u'firstname': u'Jessy', u'lastname': u'Ineichen'}, u'description': u'Wir bieten Ihnen eine gute Anstellung', u'location': [..., ...], u'published': datetime.datetime(2011, 2, 24, 12, 0), u'requirements': [{u'description': u'MBA Abschluss', u'name': u'MBA'}], u'tags': [u'MBA', u'certified'], u'title': u'Wir suchen einen Buchhalter'}, u'_type': u'job'}], u'max_score': ..., u'total': 2}, u'timed_out': False, u'took': ...}
现在尝试使用form和size参数限制搜索结果。
>>> params = {'from': 0, 'size': 1} >>> response = conn.search("Anstellung", 'testing', 'job', **params) >>> pprint(response.data) {u'_shards': {u'failed': 0, u'successful': 5, u'total': 5}, u'hits': {u'hits': [{u'_id': u'1', u'_index': u'testing', u'_score': ..., u'_source': {u'description': u'Wir bieten eine gute Anstellung', u'title': u'Wir suchen einen Marketingplaner'}, u'_type': u'job'}], u'max_score': ..., u'total': 2}, u'timed_out': False, u'took': ...}>>> response.start 0>>> response.size 1>>> response.total 2>>> response.pages 2>>> params = {'from': 1, 'size': 1} >>> response = conn.search("Anstellung", 'testing', 'job', **params) >>> pprint(response.data) {u'_shards': {u'failed': 0, u'successful': 5, u'total': 5}, u'hits': {u'hits': [{u'_id': u'2', u'_index': u'testing', u'_score': ..., u'_source': {u'contact': {u'firstname': u'Jessy', u'lastname': u'Ineichen'}, u'description': u'Wir bieten Ihnen eine gute Anstellung', u'location': [..., ...], u'published': datetime.datetime(2011, 2, 24, 12, 0), u'requirements': [{u'description': u'MBA Abschluss', u'name': u'MBA'}], u'tags': [u'MBA', u'certified'], u'title': u'Wir suchen einen Buchhalter'}, u'_type': u'job'}], u'max_score': ..., u'total': 2}, u'timed_out': False, u'took': ...}>>> response.start 1>>> response.size 1>>> response.total 2>>> response.pages 2
正如上述示例中所示,由于我们的size=1参数,每个查询中只有一个命中,两个搜索结果都显示总数为2,这是我们可以从服务器中获得的,而不使用size和from。
索引
此测试将在我们的测试设置方法中设置一些示例数据。之后,将为本次测试在另一个沙箱中启动一个新的Elasticsearch实例。有关示例数据和Elasticsearch服务器设置的信息,请参阅p01/elasticsearch/test.py文件。
我们将测试是否可以删除现有的索引并使用相同的映射再次创建它们。
>>> import json >>> from pprint import pprint >>> import p01.elasticsearch.testing >>> statusRENormalizer = p01.elasticsearch.testing.statusRENormalizer
现在让我们根据我们的服务器池定义一个新的Elasticsearch连接。
>>> conn = p01.elasticsearch.testing.getTestConnection()
现在我们已准备好访问Elasticsearch服务器。检查状态。
>>> statusRENormalizer.pprint(conn.status()) {u'_shards': {u'failed': 0, u'successful': 1, u'total': 1}, u'indices': {u'companies': {u'docs': {u'deleted_docs': 0, u'max_doc': ..., u'num_docs': ...}, u'flush': {u'total': 0, u'total_time': u'...', u'total_time_in_millis': ...}, u'index': {u'primary_size': u'...', u'primary_size_in_bytes': ..., u'size': u'...', u'size_in_bytes': ...}, u'merges': {u'current': 0, u'current_docs': 0, u'current_size': u'0b', u'current_size_in_bytes': 0, u'total': 0, u'total_docs': 0, u'total_size': u'0b', u'total_size_in_bytes': 0, u'total_time': u'...', u'total_time_in_millis': ...}, u'refresh': {u'total': ..., u'total_time': u'...', u'total_time_in_millis': ...}, u'shards': {u'0': [{u'docs': {u'deleted_docs': 0, u'max_doc': ..., u'num_docs': ...}, u'flush': {u'total': 0, u'total_time': u'...', u'total_time_in_millis': ...}, u'index': {u'size': u'...', u'size_in_bytes': ...}, u'merges': {u'current': 0, u'current_docs': 0, u'current_size': u'0b', u'current_size_in_bytes': 0, u'total': 0, u'total_docs': 0, u'total_size': u'0b', u'total_size_in_bytes': 0, u'total_time': u'...', u'total_time_in_millis': ...}, u'refresh': {u'total': ..., u'total_time': u'...', u'total_time_in_millis': ...}, u'routing': {u'index': u'companies', u'node': u'...', u'primary': True, u'relocating_node': None, u'shard': 0, u'state': u'STARTED'}, u'state': u'STARTED', u'translog': {u'id': ..., u'operations': 0}}]}, u'translog': {u'operations': 0}}}, u'ok': True}
正如你所看到的,我们可以测试我们创建的示例数据映射。
>>> pprint(conn.getMapping('companies', 'company')) {u'company': {u'properties': {u'__name__': {u'type': u'string'}, u'city': {u'type': u'string'}, u'number': {u'ignore_malformed': False, u'type': u'long'}, u'street': {u'type': u'string'}, u'text': {u'type': u'string'}, u'zip': {u'type': u'string'}}}}
并在我们的测试设置中的示例数据生成器中添加搜索我们的示例数据。
>>> pprint(conn.search('street').total) 100
deleteIndex
现在我们将删除索引。
>>> conn.deleteIndex('companies') {u'acknowledged': True, u'ok': True}
正如你所看到的,索引已经不再存在。
>>> statusRENormalizer.pprint(conn.status()) {u'_shards': {u'failed': 0, u'successful': 0, u'total': 0}, u'indices': {}, u'ok': True}
createIndex
现在我们可以再次创建索引。让我们获取我们的示例数据映射。
>>> import os.path >>> import json >>> import p01.elasticsearch >>> mFile = os.path.join(os.path.dirname(p01.elasticsearch.__file__), ... 'sample', 'config', 'companies', 'company.json')>>> f = open(mFile) >>> data = f.read() >>> f.close() >>> mappings = json.loads(data) >>> pprint(mappings) {u'company': {u'_all': {u'enabled': True, u'store': u'yes'}, u'_id': {u'store': u'yes'}, u'_index': {u'enabled': True}, u'_source': {u'enabled': False}, u'_type': {u'store': u'yes'}, u'properties': {u'__name__': {u'include_in_all': False, u'index': u'not_analyzed', u'store': u'yes', u'type': u'string'}, u'_id': {u'include_in_all': False, u'index': u'no', u'store': u'yes', u'type': u'string'}, u'city': {u'boost': 1.0, u'include_in_all': True, u'index': u'not_analyzed', u'null_value': u'na', u'store': u'yes', u'type': u'string'}, u'street': {u'boost': 1.0, u'include_in_all': True, u'index': u'not_analyzed', u'null_value': u'na', u'store': u'yes', u'type': u'string'}, u'text': {u'boost': 1.0, u'include_in_all': True, u'index': u'not_analyzed', u'null_value': u'na', u'store': u'yes', u'type': u'string'}, u'zip': {u'boost': 1.0, u'include_in_all': True, u'index': u'not_analyzed', u'null_value': u'na', u'store': u'yes', u'type': u'string'}}}}
现在我们可以使用给定的映射创建一个新的索引。
>>> conn.createIndex('companies', mappings=mappings) {u'acknowledged': True, u'ok': True}
正如你所看到的,索引和映射又回来了。
>>> statusRENormalizer.pprint(conn.status()) {u'_shards': {u'failed': 0, u'successful': 1, u'total': 1}, u'indices': {u'companies': {u'docs': {u'deleted_docs': 0, u'max_doc': ..., u'num_docs': ...}, u'flush': {u'total': 0, u'total_time': u'...', u'total_time_in_millis': ...}, u'index': {u'primary_size': u'...', u'primary_size_in_bytes': ..., u'size': u'...', u'size_in_bytes': ...}, u'merges': {u'current': 0, u'current_docs': 0, u'current_size': u'0b', u'current_size_in_bytes': 0, u'total': 0, u'total_docs': 0, u'total_size': u'0b', u'total_size_in_bytes': 0, u'total_time': u'...', u'total_time_in_millis': ...}, u'refresh': {u'total': ..., u'total_time': u'...', u'total_time_in_millis': ...}, u'shards': {u'0': [{u'docs': {u'deleted_docs': 0, u'max_doc': ..., u'num_docs': ...}, u'flush': {u'total': 0, u'total_time': u'...', u'total_time_in_millis': ...}, u'index': {u'size': u'...', u'size_in_bytes': ...}, u'merges': {u'current': 0, u'current_docs': 0, u'current_size': u'0b', u'current_size_in_bytes': 0, u'total': 0, u'total_docs': 0, u'total_size': u'0b', u'total_size_in_bytes': 0, u'total_time': u'...', u'total_time_in_millis': ...}, u'refresh': {u'total': ..., u'total_time': u'...', u'total_time_in_millis': ...}, u'routing': {u'index': u'companies', u'node': u'...', u'primary': True, u'relocating_node': None, u'shard': 0, u'state': u'STARTED'}, u'state': u'STARTED', u'translog': {u'id': ..., u'operations': 0}}]}, u'translog': {u'operations': 0}}}, u'ok': True}>>> pprint(conn.getMapping('companies', 'company')) {u'company': {u'_all': {u'store': u'yes'}, u'_id': {u'store': u'yes'}, u'_index': {u'enabled': True}, u'_source': {u'enabled': False}, u'_type': {u'store': u'yes'}, u'properties': {u'__name__': {u'include_in_all': False, u'index': u'not_analyzed', u'store': u'yes', u'type': u'string'}, u'city': {u'include_in_all': True, u'index': u'not_analyzed', u'null_value': u'na', u'store': u'yes', u'type': u'string'}, u'street': {u'include_in_all': True, u'index': u'not_analyzed', u'null_value': u'na', u'store': u'yes', u'type': u'string'}, u'text': {u'include_in_all': True, u'index': u'not_analyzed', u'null_value': u'na', u'store': u'yes', u'type': u'string'}, u'zip': {u'include_in_all': True, u'index': u'not_analyzed', u'null_value': u'na', u'store': u'yes', u'type': u'string'}}}}
正如你所看到的,索引是空的。
>>> pprint(conn.search('street').total) 0
映射
注意:此测试将在端口45299上启动和运行Elasticsearch服务器!
此测试对一些映射配置进行了实验。由于Elasticsearch文档对我来说并不十分清晰,我试图找出映射部分应该如何完成。
>>> from pprint import pprint >>> from p01.elasticsearch import interfaces >>> from p01.elasticsearch.pool import ServerPool >>> from p01.elasticsearch.pool import ElasticSearchConnectionPool
设置连接
>>> servers = ['localhost:45299'] >>> serverPool = ServerPool(servers) >>> connectionPool = ElasticSearchConnectionPool(serverPool) >>> conn = connectionPool.connection
让我们设置映射定义。
>>> mapping = { ... 'item': { ... 'properties': { ... 'boolean': { ... 'type': 'boolean' ... }, ... 'date': { ... 'type': 'date' ... }, ... 'datetime': { ... 'type': 'date' ... }, ... 'double': { ... 'type': 'double' ... }, ... 'float': { ... 'type': 'float' ... }, ... 'integer': { ... 'type': 'integer' ... }, ... 'long': { ... 'type': 'long' ... }, ... 'string': { ... 'type': 'string', ... 'null_value' : 'nada' ... }, ... } ... } ... }
现在让我们使用我们的putMapping方法添加映射并调用刷新。
>>> conn.putMapping(mapping, 'test-mapping', 'item') Traceback (most recent call last): ... IndexMissingException: [test-mapping] missing
正如你所看到的,由于我们的索引还不存在,抛出了一个异常。让我们添加我们的测试-mapping索引并再次尝试。
>>> conn.createIndex('test-mapping') {u'acknowledged': True, u'ok': True}>>> pprint(conn.refresh('test-mapping', 4)) {u'_shards': {u'failed': 0, u'successful': ..., u'total': 10}, u'ok': True}>>> conn.putMapping(mapping, 'test-mapping', 'item') {u'acknowledged': True, u'ok': True}>>> pprint(conn.refresh('test-mapping', 4)) {u'_shards': {u'failed': 0, u'successful': ..., u'total': 10}, u'ok': True}
并获取我们的映射
>>> pprint(conn.getMapping('test-mapping', 'item'), width=60) {u'item': {u'properties': {u'boolean': {u'type': u'boolean'}, u'date': {u'format': u'dateOptionalTime', u'type': u'date'}, u'datetime': {u'format': u'dateOptionalTime', u'type': u'date'}, u'double': {u'type': u'double'}, u'float': {u'type': u'float'}, u'integer': {u'type': u'integer'}, u'long': {u'type': u'long'}, u'string': {u'null_value': u'nada', u'type': u'string'}}}}
现在让我们索引一个新项目。
>>> import datetime >>> doc = {'boolean': True, ... 'datetime': datetime.datetime(2011, 02, 24, 12, 0, 0), ... 'date': datetime.date(2011, 02, 24), ... 'float': float(42), ... 'integer': int(42), ... 'long': long(42*10000000000000000), ... 'string': 'string'} >>> conn.index(doc, 'test-mapping', 'item', 1) {u'_type': u'item', u'_id': u'1', u'ok': True, u'_version': 1, u'_index': u'test-mapping'}
刷新索引
>>> pprint(conn.refresh('test-mapping', 4)) {u'_shards': {u'failed': 0, u'successful': 5, u'total': 10}, u'ok': True}
并搜索我们的索引项
>>> response = conn.search('string', 'test-mapping', 'item') >>> data = response.data >>> pprint(data) {u'_shards': {u'failed': 0, u'successful': 5, u'total': 5}, u'hits': {u'hits': [{u'_id': u'1', u'_index': u'test-mapping', u'_score': ..., u'_source': {u'boolean': True, u'date': datetime.datetime(2011, 2, 24, 0, 0), u'datetime': datetime.datetime(2011, 2, 24, 12, 0), u'float': 42.0, u'integer': 42, u'long': 420000000000000000L, u'string': u'string'}, u'_type': u'item'}], u'max_score': ..., u'total': 1}, u'timed_out': False, u'took': ...}
现在检查我们的值
>>> source = data['hits']['hits'][0]['_source'] >>> pprint(source) {u'boolean': True, u'date': datetime.datetime(2011, 2, 24, 0, 0), u'datetime': datetime.datetime(2011, 2, 24, 12, 0), u'float': 42.0, u'integer': 42, u'long': 420000000000000000L, u'string': u'string'}>>> isinstance(source['boolean'], bool) True>>> isinstance(source['datetime'], datetime.datetime) True>>> isinstance(source['date'], datetime.date) True>>> isinstance(source['float'], float) True>>> isinstance(source['integer'], int) True>>> isinstance(source['long'], long) True>>> isinstance(source['string'], basestring) True>>> isinstance(source['string'], unicode) True
注意,datetime和date也是datetime和date项。
>>> isinstance(source['date'], datetime.datetime) True>>> isinstance(source['datetime'], datetime.date) True
扫描搜索类型
注意:此测试将在端口45299上启动和运行Elasticsearch服务器!
让我们做一些不使用连接池的简单测试。
>>> from pprint import pprint >>> from p01.elasticsearch.connection import ElasticSearchConnection >>> from p01.elasticsearch.exceptions import ElasticSearchServerException >>> from p01.elasticsearch.pool import ServerPool>>> servers = ['localhost:45299'] >>> serverPool = ServerPool(servers)
现在我们能够获取一个由线程局部观察的持久连接。
>>> conn = ElasticSearchConnection(serverPool)
设置一个测试映射并添加一些文档
>>> conn.createIndex('scanning') {u'acknowledged': True, u'ok': True}>>> for i in range(1000): ... _id = unicode(i) ... doc = {'_id': _id, 'dummy': u'dummy'} ... ignored = conn.index(doc, 'scanning', 'doc')>>> conn.refresh('scanning') {u'ok': True, u'_shards': {u'successful': 5, u'failed': 0, u'total': 10}}
让我们展示如何使用扫描方法批量处理大量搜索结果。
>>> pprint(conn.search('dummy', 'scanning').total) 1000>>> result = list(conn.scan('dummy', 'scanning')) >>> len(result) 1000>>> pprint(sorted(result)[:5]) [{u'_id': u'0', u'_index': u'scanning', u'_score': 0.0, u'_source': {u'_id': u'0', u'dummy': u'dummy'}, u'_type': u'doc'}, {u'_id': u'1', u'_index': u'scanning', u'_score': 0.0, u'_source': {u'_id': u'1', u'dummy': u'dummy'}, u'_type': u'doc'}, {u'_id': u'10', u'_index': u'scanning', u'_score': 0.0, u'_source': {u'_id': u'10', u'dummy': u'dummy'}, u'_type': u'doc'}, {u'_id': u'100', u'_index': u'scanning', u'_score': 0.0, u'_source': {u'_id': u'100', u'dummy': u'dummy'}, u'_type': u'doc'}, {u'_id': u'101', u'_index': u'scanning', u'_score': 0.0, u'_source': {u'_id': u'101', u'dummy': u'dummy'}, u'_type': u'doc'}]
批量操作
注意:此测试将在端口45299上启动和运行Elasticsearch服务器!
这个测试展示了如何使用批量概念索引项目。
>>> from pprint import pprint >>> from p01.elasticsearch import interfaces >>> from p01.elasticsearch.pool import ServerPool >>> from p01.elasticsearch.pool import ElasticSearchConnectionPool>>> servers = ['localhost:45299'] >>> serverPool = ServerPool(servers)
现在我们可以从池中获取一个持久连接,该连接由线程局部观察
>>> connectionPool = ElasticSearchConnectionPool(serverPool) >>> conn = connectionPool.connection >>> conn <ElasticSearchConnection localhost:45299>
将bulkMaxSize设置为5。这意味着如果我们索引5个项目,索引方法将隐式地向服务器发送索引请求
>>> conn.bulkMaxSize = 5>>> conn.bulkMaxSize 5
让我们批量索引一些项目
>>> doc = {'title': u'Wir suchen einen Marketingplaner', ... 'description': u'Wir bieten Ihnen eine gute Anstellung'} >>> conn.bulkIndex(doc, 'testing', 'job', 1)>>> doc = {'title': u'Wir suchen einen Buchhalter', ... 'description': u'Wir bieten Ihnen eine gute Anstellung'} >>> conn.bulkIndex(doc, 'testing', 'job', 2)
现在提交我们的批量数据。即使我们没有索引完整的bulkMaxSize
>>> pprint(conn.bulkCommit()) {u'items': [{u'index': {u'_id': u'1', u'_index': u'testing', u'_type': u'job', u'_version': 1, u'ok': True}}, {u'index': {u'_id': u'2', u'_index': u'testing', u'_type': u'job', u'_version': 1, u'ok': True}}], u'took': ...}>>> conn.bulkCounter 0
现在我们搜索这些项目
>>> response = conn.search("Anstellung", 'testing', 'job') >>> pprint(response.data) {u'_shards': {u'failed': 0, u'successful': 5, u'total': 5}, u'hits': {u'hits': [], u'max_score': None, u'total': 0}, u'timed_out': False, u'took': ...}
如您所见,我们没有提交数据,因为我们没有使用refresh参数。现在让我们调用refresh
>>> conn.refresh('testing') {u'ok': True, u'_shards': {u'successful': 5, u'failed': 0, u'total': 10}}
- 然后再次搜索
>>> response = conn.search("Anstellung", 'testing', 'job') >>> pprint(response.data) {u'_shards': {u'failed': 0, u'successful': 5, u'total': 5}, u'hits': {u'hits': [{u'_id': u'1', u'_index': u'testing', u'_score': ..., u'_source': {u'description': u'Wir bieten Ihnen eine gute Anstellung', u'title': u'Wir suchen einen Marketingplaner'}, u'_type': u'job'}, {u'_id': u'2', u'_index': u'testing', u'_score': ..., u'_source': {u'description': u'Wir bieten Ihnen eine gute Anstellung', u'title': u'Wir suchen einen Buchhalter'}, u'_type': u'job'}], u'max_score': ..., u'total': 2}, u'timed_out': False, u'took': ...}
让我们继续索引项目,直到达到bulkMaxSize
>>> len(conn.bulkItems) 0>>> doc = {'title': u'Wir suchen einen Koch', ... 'description': u'Wir bieten Ihnen eine gute Anstellung'} >>> conn.bulkIndex(doc, 'testing', 'job', 3)>>> conn.bulkCounter 1>>> doc = {'title': u'Wir suchen einen Sachbearbeiter', ... 'description': u'Wir bieten Ihnen eine gute Anstellung'} >>> conn.bulkIndex(doc, 'testing', 'job', 4)>>> conn.bulkCounter 2>>> doc = {'title': u'Wir suchen einen Mechaniker', ... 'description': u'Wir bieten Ihnen eine gute Anstellung'} >>> conn.bulkIndex(doc, 'testing', 'job', 5)>>> conn.bulkCounter 3>>> doc = {'title': u'Wir suchen einen Exportfachmann', ... 'description': u'Wir bieten Ihnen eine gute Anstellung'} >>> conn.bulkIndex(doc, 'testing', 'job', 6)>>> conn.bulkCounter 4
现在,我们的bulkMaxSize强制提交数据
>>> doc = {'title': u'Wir suchen einen Entwickler', ... 'description': u'Wir bieten Ihnen eine gute Anstellung'} >>> pprint(conn.bulkIndex(doc, 'testing', 'job', 7)) {u'items': [{u'index': {u'_id': u'3', u'_index': u'testing', u'_type': u'job', u'_version': 1, u'ok': True}}, {u'index': {u'_id': u'4', u'_index': u'testing', u'_type': u'job', u'_version': 1, u'ok': True}}, {u'index': {u'_id': u'5', u'_index': u'testing', u'_type': u'job', u'_version': 1, u'ok': True}}, {u'index': {u'_id': u'6', u'_index': u'testing', u'_type': u'job', u'_version': 1, u'ok': True}}, {u'index': {u'_id': u'7', u'_index': u'testing', u'_type': u'job', u'_version': 1, u'ok': True}}], u'took': ...}
只需等待服务器默认每秒自动调用refresh
>>> import time >>> time.sleep(1)>>> len(conn.bulkItems) 0
如您所见,我们已经全部索引了7个项目
>>> response = conn.search("Anstellung", 'testing', 'job') >>> pprint(response.data) {u'_shards': {u'failed': 0, u'successful': 5, u'total': 5}, u'hits': {u'hits': [{u'_id': u'1', u'_index': u'testing', u'_score': ..., u'_source': {u'description': u'Wir bieten Ihnen eine gute Anstellung', u'title': u'Wir suchen einen Marketingplaner'}, u'_type': u'job'}, {u'_id': u'6', u'_index': u'testing', u'_score': ..., u'_source': {u'description': u'Wir bieten Ihnen eine gute Anstellung', u'title': u'Wir suchen einen Exportfachmann'}, u'_type': u'job'}, {u'_id': u'2', u'_index': u'testing', u'_score': ..., u'_source': {u'description': u'Wir bieten Ihnen eine gute Anstellung', u'title': u'Wir suchen einen Buchhalter'}, u'_type': u'job'}, {u'_id': u'7', u'_index': u'testing', u'_score': ..., u'_source': {u'description': u'Wir bieten Ihnen eine gute Anstellung', u'title': u'Wir suchen einen Entwickler'}, u'_type': u'job'}, {u'_id': u'4', u'_index': u'testing', u'_score': ..., u'_source': {u'description': u'Wir bieten Ihnen eine gute Anstellung', u'title': u'Wir suchen einen Sachbearbeiter'}, u'_type': u'job'}, {u'_id': u'5', u'_index': u'testing', u'_score': ..., u'_source': {u'description': u'Wir bieten Ihnen eine gute Anstellung', u'title': u'Wir suchen einen Mechaniker'}, u'_type': u'job'}, {u'_id': u'3', u'_index': u'testing', u'_score': ..., u'_source': {u'description': u'Wir bieten Ihnen eine gute Anstellung', u'title': u'Wir suchen einen Koch'}, u'_type': u'job'}], u'max_score': ..., u'total': 7}, u'timed_out': False, u'took': ...}
简单索引和搜索
注意:此测试将在端口45299上启动和运行Elasticsearch服务器!
这个测试只是使用了非预定义的映射。让我们做一些不使用连接池的简单测试。
>>> from pprint import pprint >>> from p01.elasticsearch.connection import ElasticSearchConnection >>> from p01.elasticsearch.exceptions import ElasticSearchServerException >>> from p01.elasticsearch.pool import ServerPool>>> import p01.elasticsearch.testing >>> statusRENormalizer = p01.elasticsearch.testing.statusRENormalizer>>> servers = ['localhost:45299'] >>> serverPool = ServerPool(servers)
现在我们能够获取一个由线程局部观察的持久连接。
>>> conn = ElasticSearchConnection(serverPool)
添加一些文档
>>> pprint(conn.index({"name":"Document One"}, "testdocs", "doc", 1)) {u'_id': u'1', u'_index': u'testdocs', u'_type': u'doc', u'_version': 1, u'ok': True}>>> pprint(conn.index({"name":"Document Two"}, "testdocs", "doc", 2)) {u'_id': u'2', u'_index': u'testdocs', u'_type': u'doc', u'_version': 1, u'ok': True}
注意,我们在这里调用refresh,这将确保文档在服务器端被索引。通常,在生产环境中不应显式执行此操作。Elasticsearch服务器默认配置为每秒在服务器端进行刷新
>>> pprint(conn.refresh("testdocs")) {u'_shards': {u'failed': 0, u'successful': 5, u'total': 10}, u'ok': True}
获取一个
>>> pprint(conn.get(1, "testdocs", "doc")) {u'_id': u'1', u'_index': u'testdocs', u'_source': {u'name': u'Document One'}, u'_type': u'doc', u'_version': 1, u'exists': True}
统计文档数量
>>> pprint(conn.count("name:Document One")) {u'_shards': {u'failed': 0, u'successful': 5, u'total': 5}, u'count': 2}>>> pprint(conn.count("name:Document")) {u'_shards': {u'failed': 0, u'successful': 5, u'total': 5}, u'count': 2}
搜索文档
>>> response = conn.search("name:Document One") >>> pprint(response.data) {u'_shards': {u'failed': 0, u'successful': 5, u'total': 5}, u'hits': {u'hits': [{u'_id': u'1', u'_index': u'testdocs', u'_score': 0.2712221, u'_source': {u'name': u'Document One'}, u'_type': u'doc'}, {u'_id': u'2', u'_index': u'testdocs', u'_score': 0.028130025, u'_source': {u'name': u'Document Two'}, u'_type': u'doc'}], u'max_score': 0.2712221, u'total': 2}, u'timed_out': False, u'took': ...}
类似文档
>>> pprint(conn.index({"name":"Document Three"}, "testdocs", "doc", 3)) {u'_id': u'3', u'_index': u'testdocs', u'_type': u'doc', u'_version': 1, u'ok': True}>>> pprint(conn.refresh("testdocs")) {u'_shards': {u'failed': 0, u'successful': 5, u'total': 10}, u'ok': True}>>> pprint(conn.moreLikeThis(1, "testdocs", "doc", ... fields='name', min_term_freq=1, min_doc_freq=1)) {u'_shards': {u'failed': 0, u'successful': 5, u'total': 5}, u'hits': {u'hits': [{u'_id': u'2', u'_index': u'testdocs', u'_score': 0.19178301, u'_source': {u'name': u'Document Two'}, u'_type': u'doc'}, {u'_id': u'3', u'_index': u'testdocs', u'_score': 0.19178301, u'_source': {u'name': u'Document Three'}, u'_type': u'doc'}], u'max_score': 0.19178301, u'total': 2}, u'timed_out': False, u'took': ...}
删除文档二
>>> pprint(conn.delete('2', "testdocs", "doc")) {u'_id': u'2', u'_index': u'testdocs', u'_type': u'doc', u'_version': 2, u'found': True, u'ok': True}
删除文档三
>>> pprint(conn.delete('3', "testdocs", "doc")) {u'_id': u'3', u'_index': u'testdocs', u'_type': u'doc', u'_version': 2, u'found': True, u'ok': True}
删除索引
>>> pprint(conn.deleteIndex("testdocs")) {u'acknowledged': True, u'ok': True}
创建新索引
>>> pprint(conn.createIndex("testdocs")) {u'acknowledged': True, u'ok': True}
尝试再次创建索引,这将失败
>>> conn.createIndex("testdocs") Traceback (most recent call last): ... IndexAlreadyExistsException: Already exists
如您所见,错误提供了错误信息
>>> try: ... conn.createIndex("testdocs") ... except ElasticSearchServerException, e: ... e.args[0] 'Already exists'
添加新映射
>>> mapping = {"doc" : {"properties" : ... {"name" : {"type" : "string", "store" : "yes"}}}} >>> pprint(conn.putMapping(mapping, 'testdocs', 'doc')) {u'acknowledged': True, u'ok': True}
获取状态
>>> statusRENormalizer.pprint(conn.status("testdocs")) {u'_shards': {u'failed': 0, u'successful': 5, u'total': 10}, u'indices': {u'testdocs': {u'docs': {u'deleted_docs': 0, u'max_doc': ..., u'num_docs': ...}, u'flush': {u'total': 0, u'total_time': u'...', u'total_time_in_millis': ...}, u'index': {u'primary_size': u'...', u'primary_size_in_bytes': ..., u'size': u'...', u'size_in_bytes': ...}, u'merges': {u'current': 0, u'current_docs': 0, u'current_size': u'0b', u'current_size_in_bytes': 0, u'total': 0, u'total_docs': 0, u'total_size': u'0b', u'total_size_in_bytes': 0, u'total_time': u'...', u'total_time_in_millis': ...}, u'refresh': {u'total': ..., u'total_time': u'...', u'total_time_in_millis': ...}, u'shards': {u'0': [{u'docs': {u'deleted_docs': 0, u'max_doc': ..., u'num_docs': ...}, u'flush': {u'total': 0, u'total_time': u'...', u'total_time_in_millis': ...}, u'index': {u'size': u'...', u'size_in_bytes': ...}, u'merges': {u'current': 0, u'current_docs': 0, u'current_size': u'0b', u'current_size_in_bytes': 0, u'total': 0, u'total_docs': 0, u'total_size': u'0b', u'total_size_in_bytes': 0, u'total_time': u'...', u'total_time_in_millis': ...}, u'refresh': {u'total': ..., u'total_time': u'...', u'total_time_in_millis': ...}, u'routing': {u'index': u'testdocs', u'node': u'...', u'primary': True, u'relocating_node': None, u'shard': 0, u'state': u'STARTED'}, u'state': u'STARTED', u'translog': {u'id': ..., u'operations': 0}}], u'1': [{u'docs': {u'deleted_docs': 0, u'max_doc': ..., u'num_docs': ...}, u'flush': {u'total': 0, u'total_time': u'...', u'total_time_in_millis': ...}, u'index': {u'size': u'...', u'size_in_bytes': ...}, u'merges': {u'current': 0, u'current_docs': 0, u'current_size': u'0b', u'current_size_in_bytes': 0, u'total': 0, u'total_docs': 0, u'total_size': u'0b', u'total_size_in_bytes': 0, u'total_time': u'...', u'total_time_in_millis': ...}, u'refresh': {u'total': ..., u'total_time': u'...', u'total_time_in_millis': ...}, u'routing': {u'index': u'testdocs', u'node': u'...', u'primary': True, u'relocating_node': None, u'shard': 1, u'state': u'STARTED'}, u'state': u'STARTED', u'translog': {u'id': ..., u'operations': 0}}], u'2': [{u'docs': {u'deleted_docs': 0, u'max_doc': ..., u'num_docs': ...}, u'flush': {u'total': 0, u'total_time': u'...', u'total_time_in_millis': ...}, u'index': {u'size': u'...', u'size_in_bytes': ...}, u'merges': {u'current': 0, u'current_docs': 0, u'current_size': u'0b', u'current_size_in_bytes': 0, u'total': 0, u'total_docs': 0, u'total_size': u'0b', u'total_size_in_bytes': 0, u'total_time': u'...', u'total_time_in_millis': ...}, u'refresh': {u'total': ..., u'total_time': u'...', u'total_time_in_millis': ...}, u'routing': {u'index': u'testdocs', u'node': u'...', u'primary': True, u'relocating_node': None, u'shard': 2, u'state': u'STARTED'}, u'state': u'STARTED', u'translog': {u'id': ..., u'operations': 0}}], u'3': [{u'docs': {u'deleted_docs': 0, u'max_doc': ..., u'num_docs': ...}, u'flush': {u'total': 0, u'total_time': u'...', u'total_time_in_millis': ...}, u'index': {u'size': u'...', u'size_in_bytes': ...}, u'merges': {u'current': 0, u'current_docs': 0, u'current_size': u'0b', u'current_size_in_bytes': 0, u'total': 0, u'total_docs': 0, u'total_size': u'0b', u'total_size_in_bytes': 0, u'total_time': u'...', u'total_time_in_millis': ...}, u'refresh': {u'total': ..., u'total_time': u'...', u'total_time_in_millis': ...}, u'routing': {u'index': u'testdocs', u'node': u'...', u'primary': True, u'relocating_node': None, u'shard': 3, u'state': u'STARTED'}, u'state': u'STARTED', u'translog': {u'id': ..., u'operations': 0}}], u'4': [{u'docs': {u'deleted_docs': 0, u'max_doc': ..., u'num_docs': ...}, u'flush': {u'total': 0, u'total_time': u'...', u'total_time_in_millis': ...}, u'index': {u'size': u'...', u'size_in_bytes': ...}, u'merges': {u'current': 0, u'current_docs': 0, u'current_size': u'0b', u'current_size_in_bytes': 0, u'total': 0, u'total_docs': 0, u'total_size': u'0b', u'total_size_in_bytes': 0, u'total_time': u'...', u'total_time_in_millis': ...}, u'refresh': {u'total': ..., u'total_time': u'...', u'total_time_in_millis': ...}, u'routing': {u'index': u'testdocs', u'node': u'...', u'primary': True, u'relocating_node': None, u'shard': 4, u'state': u'STARTED'}, u'state': u'STARTED', u'translog': {u'id': ..., u'operations': 0}}]}, u'translog': {u'operations': 0}}}, u'ok': True}
测试使用自动id生成添加。
>>> pprint(conn.index({"name":"Document Four"}, "testdocs", "doc")) Traceback (most recent call last): ... ValueError: You must explicit define id=None without doc['_id']
如您所见,这需要我们显式设置id=None
>>> pprint(conn.index({"name":"Document Four"}, "testdocs", "doc", id=None)) {u'_id': u'...', u'_index': u'testdocs', u'_type': u'doc', u'_version': 1, u'ok': True}
设置显式id=None的原因是我们还支持doc[‘_id’]作为id
>>> pprint(conn.index({"name":"Document Five", "_id":"5"}, "testdocs", "doc")) {u'_id': u'...', u'_index': u'testdocs', u'_type': u'doc', u'_version': 1, u'ok': True}
变更记录
0.6.0 (2014-03-24)
功能:实现了使用PUT请求在_template端点实现的putTemplate方法
0.5.2 (2013-06-28)
错误修复:改进错误处理。如果没有错误信息,则使用JSON响应字符串。
0.5.1 (2012-12-22)
实现了put settings (putSettings)方法
基于更改的Elasticsearch 0.20.1输出修复测试
切换到p01.recipe.setup:importchecker
0.5.0 (2012-11-18)
初始发布
项目详情
p01.elasticsearch-0.6.0.zip的散列
算法 | 散列摘要 | |
---|---|---|
SHA256 | 70198462a8b1fe2ec212bfeb214fe1bff8b166c35ae35095dd0ed148415b9915 |
|
MD5 | 33140050e4bd8a4c3a18fd554dd4c442 |
|
BLAKE2b-256 | 6b21541459ab93f9efe72b2b7471f23c3f07e8d9ab315884e90940d148fbf27c |