跳转到主要内容

Elasticsearch Watcher的Python客户端

项目描述

这是官方Elasticsearch Python客户端的一个插件,它为Watcher插件添加了功能。

安装

您可以使用pip安装此插件

pip install elasticsearch-watcher

使用方法

您可以使用此客户端独立使用

from elasticsearch import Elasticsearch
from elasticsearch_watcher import WatcherClient

client = Elasticsearch()
watcher = WatcherClient(client)

watcher.get_watch(id=42)

或者,您可以将watcher命名空间添加到官方客户端,以模拟其他命名空间的行为

WatcherClient.infect_client(client)

client.watcher.get_watch(id=42)

复杂示例

from time import sleep
from datetime import datetime
from random import randint

from elasticsearch import Elasticsearch
from elasticsearch_watcher import WatcherClient

# initialize the standard client as usual
es = Elasticsearch()
# add the .watcher namespace to it
WatcherClient.infect_client(es)

# clear the index fiorst
es.indices.delete(
    index=['alerts', 'test', '.watches', '.watch_history*'], ignore=404)

# get the watcher plugin version
print('Using watcher', es.watcher.info()['version']['number'])

# Register a new watch
es.watcher.put_watch(
    id='error_500',
    body={
        # label the watch
        'metadata': {'tags': ['errors']},

        # Run the watch every 10 seconds
        'trigger': { 'schedule': { 'interval': '10s' } },

        # Search for at least 3 documents matching the condition
        'condition': {  'script': { 'inline': 'ctx.payload.hits.total > 3' } },

        # Throttle the watch execution for 30 seconds
        'throttle_period': '30s',

        # The search request to execute
        'input':   {
            'search': {
                'request': {
                    'indices': ['test'],
                    'body': {
                        'query': {
                            'filtered': {
                                'query': { 'match': { 'status': 500 } },
                                'filter': { 'range': { 'timestamp': { 'from': '{{ctx.trigger.scheduled_time}}||-5m', 'to': '{{ctx.trigger.triggered_time}}' } } }
                            }
                        },
                        # Return statistics about different hosts
                        'aggregations': {
                            'hosts': { 'terms': { 'field': 'host' } }
                        }
        }}}},

        # The actions to perform
        'actions': {
            'send_email':    {
                'transform': {
                    # Transform the data for the template
                    'script': '''return [
                            total: ctx.payload.hits.total,
                            hosts: ctx.payload.aggregations.hosts.buckets.collect { [ host: it.key, errors: it.doc_count ] },
                            errors: ctx.payload.hits.hits.collect { it._source }
                        ];'''
                },
                'email': {
                    'to': 'you@example.com',
                    'subject': '[ALERT] {{ctx.watch_id}}',
                    'attach_data': True,
                    'body':  '''
                        Received {{ctx.payload.total}} error documents in the last 5 minutes.

                        Hosts:

                        {{#ctx.payload.hosts}}* {{host}} ({{errors}})
                        {{/ctx.payload.hosts}}'''.replace('\n'+' '*24, '\n').strip(),
                }
            },
            'index_payload': {
                # Transform the data to be stored
                'transform': { 'script': 'return [ watch_id: ctx.watch_id, payload: ctx.payload ]' },
                'index': { 'index': 'alerts', 'doc_type': 'alert' }
            },
            'ping_webhook': {
                'webhook': {
                    'method': 'POST',
                    'host': 'localhost',
                    'port': 8000,
                    'path': '/',
                    'body': '{"watch_id" : "{{ctx.watch_id}}", "payload" : "{{ctx.payload}}"}'
                }
            }
        }
    }
)

# index documents to trigger the watch
for _ in range(5):
    es.index(
        index='test',
        doc_type='d',
        body={
            'timestamp': datetime.utcnow(),
            'status': 500,
            'host': '10.0.0.%d' % randint(1, 3)
        }
    )

# wait a bit...
for _ in range(30):
    sleep(1)
    print('.', sep='', end='', flush=True)
print()

# display information about watch execution
print('=' * 80)
s = es.search(
    index='.watch_history*',
    q='watch_id:error_500',
    sort='trigger_event.schedule.triggered_time:asc'
)
for hit in s['hits']['hits']:
    print('%s: %s' % (hit['_id'], hit['_source']['state']))

# delete the watch
es.watcher.delete_watch(id='error_500', force=True)

许可证

版权所有 2015 Elasticsearch

根据Apache许可证版本2.0(“许可证”)许可;除非遵守许可证规定,否则不得使用此文件。您可以在以下位置获得许可证副本:

https://apache.ac.cn/licenses/LICENSE-2.0

除非适用法律要求或书面同意,否则根据许可证分发的软件按“原样”提供,不提供任何明示或暗示的保证。有关许可证规定的权限和限制的具体语言,请参阅许可证。

项目详情


下载文件

下载您平台对应的文件。如果您不确定该选择哪一个,请了解更多关于 安装包 的信息。

源代码分发

elasticsearch-watcher-0.4.0.tar.gz (4.8 kB 查看哈希值)

上传时间 源代码

构建分发

elasticsearch_watcher-0.4.0-py2.py3-none-any.whl (7.7 kB 查看哈希值)

上传时间 Python 2 Python 3

由以下支持