轻松托管您的深度学习模型。
项目描述
Ventu
轻松提供深度学习模型。
安装
pip install ventu
功能
- 只需实现Model(
preprocess
,postprocess
,inference
或batch_inference
) - 使用 pydantic 进行请求和响应数据验证
- API文档使用 SpecTree (在运行
run_http
时) - 后端服务使用 falcon 支持JSON和 msgpack
- 使用 batching 进行动态批处理,使用Unix域套接字或TCP
- 一个请求中的错误不会影响同一批中的其他请求
- 负载均衡
- 支持所有运行时
- 健康检查
- 监控指标(Prometheus)
- 如果您有多个工作进程,请记住将
prometheus_multiproc_dir
环境变量设置为一个目录
- 如果您有多个工作进程,请记住将
- 推理预热
如何使用
- 使用
pydantic
定义您的请求数据模式和响应数据模式- 将示例添加到
schema.Config.schema_extra[examples]
以进行预热和健康检查(可选)
- 将示例添加到
- 继承
ventu.Ventu
,实现preprocess
和postprocess
方法 - 对于独立的HTTP服务,实现
inference
方法,使用run_http
运行 - 对于动态批处理服务后面的工作进程,实现
batch_inference
方法,使用run_socket
运行
查看文档以获取API详情
示例
示例代码可以在examples中找到。
服务
安装需求 pip install numpy torch transformers httpx
import argparse
import logging
import numpy as np
import torch
from pydantic import BaseModel, confloat, constr
from transformers import DistilBertTokenizer, DistilBertForSequenceClassification
from ventu import Ventu
# request schema used for validation
class Req(BaseModel):
# the input sentence should be at least 2 characters
text: constr(min_length=2)
class Config:
# examples used for health check and warm-up
schema_extra = {
'example': {'text': 'my cat is very cut'},
'batch_size': 16,
}
# response schema used for validation
class Resp(BaseModel):
positive: confloat(ge=0, le=1)
negative: confloat(ge=0, le=1)
class ModelInference(Ventu):
def __init__(self, *args, **kwargs):
# initialize super class with request & response schema, configs
super().__init__(*args, **kwargs)
# initialize model and other tools
self.tokenizer = DistilBertTokenizer.from_pretrained(
'distilbert-base-uncased')
self.model = DistilBertForSequenceClassification.from_pretrained(
'distilbert-base-uncased-finetuned-sst-2-english')
def preprocess(self, data: Req):
# preprocess a request data (as defined in the request schema)
tokens = self.tokenizer.encode(data.text, add_special_tokens=True)
return tokens
def batch_inference(self, data):
# batch inference is used in `socket` mode
data = [torch.tensor(token) for token in data]
with torch.no_grad():
result = self.model(torch.nn.utils.rnn.pad_sequence(data, batch_first=True))[0]
return result.numpy()
def inference(self, data):
# inference is used in `http` mode
with torch.no_grad():
result = self.model(torch.tensor(data).unsqueeze(0))[0]
return result.numpy()[0]
def postprocess(self, data):
# postprocess a response data (returned data as defined in the response schema)
scores = (np.exp(data) / np.exp(data).sum(-1, keepdims=True)).tolist()
return {'negative': scores[0], 'positive': scores[1]}
def create_model():
logger = logging.getLogger()
formatter = logging.Formatter(
fmt='%(asctime)s - %(levelname)s - %(module)s - %(message)s')
handler = logging.StreamHandler()
handler.setFormatter(formatter)
logger.setLevel(logging.DEBUG)
logger.addHandler(handler)
model = ModelInference(Req, Resp, use_msgpack=True)
return model
def create_app():
"""for gunicorn"""
return create_model().app
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Ventu service')
parser.add_argument('--mode', '-m', default='http', choices=('http', 'unix', 'tcp'))
parser.add_argument('--host', default='localhost')
parser.add_argument('--port', '-p', default=8080, type=int)
parser.add_argument('--socket', '-s', default='batching.socket')
args = parser.parse_args()
model = create_model()
if args.mode == 'unix':
model.run_unix(args.socket)
elif args.mode == 'tcp':
model.run_tcp(args.host, args.port)
else:
model.run_http(args.host, args.port)
您可以以以下方式运行此脚本
- 单线程HTTP服务:
python examples/app.py
- 具有多个工作进程的HTTP服务:
gunicorn -w 2 -b localhost:8080 'examples.app:create_app()'
- 作为HTTP服务运行时,可以检查以下链接
/metrics
Prometheus指标/health
健康检查/inference
推理/apidoc/redoc
或/apidoc/swagger
OpenAPI文档
- 作为HTTP服务运行时,可以检查以下链接
- 在批处理服务后面的推理工作进程:
python examples/app.py -m socket
(Unix域套接字) 或python examples/app.py -m tcp --host localhost --port 8888
(TCP) (需要先运行批处理服务)
客户端
from concurrent import futures
import httpx
import msgpack
URL = 'http://localhost:8080/inference'
HEADER = {'Content-Type': 'application/msgpack'}
packer = msgpack.Packer(
autoreset=True,
use_bin_type=True,
)
def request(text):
return httpx.post(URL, data=packer.pack({'text': text}), headers=HEADER)
if __name__ == "__main__":
with futures.ThreadPoolExecutor() as executor:
text = [
'They are smart',
'what is your problem?',
'I hate that!',
'x',
]
results = executor.map(request, text)
for i, resp in enumerate(results):
print(
f'>> {text[i]} -> [{resp.status_code}]\n'
f'{msgpack.unpackb(resp.content)}'
)
项目详情
下载文件
下载适合您平台的文件。如果您不确定选择哪个,请了解有关安装包的更多信息。
源分布
ventu-0.4.5.tar.gz (11.6 kB 查看哈希值)
构建分布
ventu-0.4.5-py3-none-any.whl (10.2 kB 查看哈希值)
关闭
ventu-0.4.5.tar.gz的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | c5a53abe57bed569fa6f8ea3d4633c0a8c3b8e38a912fe86223fcdbb5fe8ced9 |
|
MD5 | f0599b17c0d9f3b1e490c14ff788d669 |
|
BLAKE2b-256 | c9ec877ce10daf7858e6f0f71eb7c4c7a167776b172a3eeea791bb4d9e6e1830 |
关闭
ventu-0.4.5-py3-none-any.whl的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | cb9809de702e85fbf3705cf2347fc6037ca5efc369194f39bc2ee9e69637b97f |
|
MD5 | 98a83bd597fbe3c7d1e5051335728941 |
|
BLAKE2b-256 | 37b1a6d81754370065b1a7c9dc558a1a664b878c276432aca8e06dddb6319490 |