适用于DataFrame、大数据、机器学习以及Elasticsearch中ETL的Python客户端和工具包
项目描述
关于
Eland是一个Python Elasticsearch客户端,它使用与Pandas兼容的API来探索和分析Elasticsearch中的数据。
尽可能使用现有的Python API和数据结构,使您能够轻松地在numpy、pandas或scikit-learn及其Elasticsearch等效项之间切换。一般来说,数据驻留在Elasticsearch中,而不是内存中,这使得Eland能够访问存储在Elasticsearch中的大型数据集。
Eland还提供了工具,可以从scikit-learn、XGBoost和LightGBM等常用库中上传训练好的机器学习模型到Elasticsearch。
入门指南
可以使用Pip从PyPI安装Eland
$ python -m pip install eland
如果使用Eland上传NLP模型到Elasticsearch,请安装PyTorch扩展
$ python -m pip install 'eland[pytorch]'
也可以使用Conda从Conda Forge安装Eland
$ conda install -c conda-forge eland
兼容性
- 支持Python 3.8、3.9、3.10、3.11和Pandas 1.5
- 支持7.11+版本的Elasticsearch集群,建议使用8.13或更高版本以实现所有功能。如果您正在使用与PyTorch的NLP功能,请确保Eland的次要版本与Elasticsearch集群的次要版本匹配。对于所有其他功能,主要版本匹配就足够了。
- 您需要安装适当的PyTorch版本来导入NLP模型。运行
python -m pip install 'eland[pytorch]'
来安装该版本。
先决条件
在基于Debian的发行版上安装Eland的用户可能需要安装Eland的传递依赖项的先决包
$ sudo apt-get install -y \
build-essential pkg-config cmake \
python3-dev libzip-dev libjpeg-dev
请注意,CentOS、RedHat、Arch等其他发行版可能需要使用不同的包管理器和指定不同的包名。
Docker
如果您只想运行可用的脚本而不安装Eland,请使用Docker镜像。它可以交互式使用
$ docker run -it --rm --network host docker.elastic.co/eland/eland
也可以在没有交互式shell的情况下运行已安装的脚本,例如
$ docker run -it --rm --network host \
docker.elastic.co/eland/eland \
eland_import_hub_model \
--url http://host.docker.internal:9200/ \
--hub-model-id elastic/distilbert-base-cased-finetuned-conll03-english \
--task-type ner
连接到Elasticsearch
Eland使用Elasticsearch低级客户端连接到Elasticsearch。此客户端支持一系列连接选项和认证选项。
您可以将elasticsearch.Elasticsearch
的实例传递给Eland API,或者传递一个包含要连接的主机的字符串
import eland as ed
# Connecting to an Elasticsearch instance running on 'https://127.0.0.1:9200'
df = ed.DataFrame("https://127.0.0.1:9200", es_index_pattern="flights")
# Connecting to an Elastic Cloud instance
from elasticsearch import Elasticsearch
es = Elasticsearch(
cloud_id="cluster-name:...",
basic_auth=("elastic", "<password>")
)
df = ed.DataFrame(es, es_index_pattern="flights")
Eland中的DataFrame
eland.DataFrame
将Elasticsearch索引包装在类似Pandas的API中,并将所有数据处理和过滤操作延迟到Elasticsearch,而不是您的本地机器。这意味着您可以在不超载机器的情况下,从Jupyter Notebook中处理Elasticsearch中的大量数据。
>>> import eland as ed
>>> # Connect to 'flights' index via localhost Elasticsearch node
>>> df = ed.DataFrame('https://127.0.0.1:9200', 'flights')
# eland.DataFrame instance has the same API as pandas.DataFrame
# except all data is in Elasticsearch. See .info() memory usage.
>>> df.head()
AvgTicketPrice Cancelled ... dayOfWeek timestamp
0 841.265642 False ... 0 2018-01-01 00:00:00
1 882.982662 False ... 0 2018-01-01 18:27:00
2 190.636904 False ... 0 2018-01-01 17:11:14
3 181.694216 True ... 0 2018-01-01 10:33:28
4 730.041778 False ... 0 2018-01-01 05:13:00
[5 rows x 27 columns]
>>> df.info()
<class 'eland.dataframe.DataFrame'>
Index: 13059 entries, 0 to 13058
Data columns (total 27 columns):
# Column Non-Null Count Dtype
--- ------ -------------- -----
0 AvgTicketPrice 13059 non-null float64
1 Cancelled 13059 non-null bool
2 Carrier 13059 non-null object
...
24 OriginWeather 13059 non-null object
25 dayOfWeek 13059 non-null int64
26 timestamp 13059 non-null datetime64[ns]
dtypes: bool(2), datetime64[ns](1), float64(5), int64(2), object(17)
memory usage: 80.0 bytes
Elasticsearch storage usage: 5.043 MB
# Filtering of rows using comparisons
>>> df[(df.Carrier=="Kibana Airlines") & (df.AvgTicketPrice > 900.0) & (df.Cancelled == True)].head()
AvgTicketPrice Cancelled ... dayOfWeek timestamp
8 960.869736 True ... 0 2018-01-01 12:09:35
26 975.812632 True ... 0 2018-01-01 15:38:32
311 946.358410 True ... 0 2018-01-01 11:51:12
651 975.383864 True ... 2 2018-01-03 21:13:17
950 907.836523 True ... 2 2018-01-03 05:14:51
[5 rows x 27 columns]
# Running aggregations across an index
>>> df[['DistanceKilometers', 'AvgTicketPrice']].aggregate(['sum', 'min', 'std'])
DistanceKilometers AvgTicketPrice
sum 9.261629e+07 8.204365e+06
min 0.000000e+00 1.000205e+02
std 4.578263e+03 2.663867e+02
Eland中的机器学习
回归和分类
Eland允许将scikit-learn、XGBoost和LightGBM库中训练好的回归和分类模型转换为序列化格式,并在Elasticsearch中用作推理模型。
>>> from sklearn import datasets
>>> from xgboost import XGBClassifier
>>> from eland.ml import MLModel
# Train and exercise an XGBoost ML model locally
>>> training_data = datasets.make_classification(n_features=5)
>>> xgb_model = XGBClassifier(booster="gbtree")
>>> xgb_model.fit(training_data[0], training_data[1])
>>> xgb_model.predict(training_data[0])
[0 1 1 0 1 0 0 0 1 0]
# Import the model into Elasticsearch
>>> es_model = MLModel.import_model(
es_client="https://127.0.0.1:9200",
model_id="xgb-classifier",
model=xgb_model,
feature_names=["f0", "f1", "f2", "f3", "f4"],
)
# Exercise the ML model in Elasticsearch with the training data
>>> es_model.predict(training_data[0])
[0 1 1 0 1 0 0 0 1 0]
PyTorch中的NLP
对于NLP任务,Eland允许将PyTorch训练的BERT模型导入到Elasticsearch。模型可以是普通的PyTorch模型,也可以是从Hugging Face模型库支持的transformers模型。
$ eland_import_hub_model \
--url https://127.0.0.1:9200/ \
--hub-model-id elastic/distilbert-base-cased-finetuned-conll03-english \
--task-type ner \
--start
上述示例将自动启动模型部署。这对于初始实验来说是一个很好的快捷方式,但对于需要良好吞吐量的任何事物,您应从Eland命令行中省略--start
参数,并改用Kibana中的ML UI启动模型。--start
参数将以一个分配和一个线程每分配的配置部署模型,这不会提供良好的性能。当您使用Kibana中的ML UI或Elasticsearch API启动模型部署时,您将能够设置线程选项以充分利用您的硬件。
>>> import elasticsearch
>>> from pathlib import Path
>>> from eland.common import es_version
>>> from eland.ml.pytorch import PyTorchModel
>>> from eland.ml.pytorch.transformers import TransformerModel
>>> es = elasticsearch.Elasticsearch("http://elastic:mlqa_admin@localhost:9200")
>>> es_cluster_version = es_version(es)
# Load a Hugging Face transformers model directly from the model hub
>>> tm = TransformerModel(model_id="elastic/distilbert-base-cased-finetuned-conll03-english", task_type="ner", es_version=es_cluster_version)
Downloading: 100%|██████████| 257/257 [00:00<00:00, 108kB/s]
Downloading: 100%|██████████| 954/954 [00:00<00:00, 372kB/s]
Downloading: 100%|██████████| 208k/208k [00:00<00:00, 668kB/s]
Downloading: 100%|██████████| 112/112 [00:00<00:00, 43.9kB/s]
Downloading: 100%|██████████| 249M/249M [00:23<00:00, 11.2MB/s]
# Export the model in a TorchScrpt representation which Elasticsearch uses
>>> tmp_path = "models"
>>> Path(tmp_path).mkdir(parents=True, exist_ok=True)
>>> model_path, config, vocab_path = tm.save(tmp_path)
# Import model into Elasticsearch
>>> ptm = PyTorchModel(es, tm.elasticsearch_model_id())
>>> ptm.import_model(model_path=model_path, config_path=None, vocab_path=vocab_path, config=config)
100%|██████████| 63/63 [00:12<00:00, 5.02it/s]
反馈 🗣️
Elastic的工程团队正在寻找开发者参与研究和反馈会议,以了解更多关于您如何使用Eland以及我们可以在设计和工作流程中做哪些改进。如果您对分享您对开发者体验和语言客户端设计的见解感兴趣,请填写此简短表格。根据我们收到的回复数量,我们可能会联系您进行一对一对话,或者与使用相同客户端的其他开发者进行焦点小组讨论。提前感谢您——您的反馈对提高所有Elasticsearch开发者的用户体验至关重要!
项目详情
下载文件
下载适用于您平台的应用程序。如果您不确定该选择哪一个,请了解有关安装包的更多信息。