跳转到主要内容

轻松托管您的深度学习模型。

项目描述

Ventu

pypi versions Python Test Python document Language grade: Python

轻松提供深度学习模型。

安装

pip install ventu

功能

  • 只需实现Model(preprocess, postprocess, inferencebatch_inference)
  • 使用 pydantic 进行请求和响应数据验证
  • API文档使用 SpecTree (在运行run_http时)
  • 后端服务使用 falcon 支持JSON和 msgpack
  • 使用 batching 进行动态批处理,使用Unix域套接字或TCP
    • 一个请求中的错误不会影响同一批中的其他请求
    • 负载均衡
  • 支持所有运行时
  • 健康检查
  • 监控指标(Prometheus)
    • 如果您有多个工作进程,请记住将prometheus_multiproc_dir环境变量设置为一个目录
  • 推理预热

如何使用

  • 使用pydantic定义您的请求数据模式和响应数据模式
    • 将示例添加到schema.Config.schema_extra[examples]以进行预热和健康检查(可选)
  • 继承ventu.Ventu,实现preprocesspostprocess方法
  • 对于独立的HTTP服务,实现inference方法,使用run_http运行
  • 对于动态批处理服务后面的工作进程,实现batch_inference方法,使用run_socket运行

查看文档以获取API详情

示例

示例代码可以在examples中找到。

服务

安装需求 pip install numpy torch transformers httpx

import argparse
import logging

import numpy as np
import torch
from pydantic import BaseModel, confloat, constr
from transformers import DistilBertTokenizer, DistilBertForSequenceClassification

from ventu import Ventu


# request schema used for validation
class Req(BaseModel):
    # the input sentence should be at least 2 characters
    text: constr(min_length=2)

    class Config:
        # examples used for health check and warm-up
        schema_extra = {
            'example': {'text': 'my cat is very cut'},
            'batch_size': 16,
        }


# response schema used for validation
class Resp(BaseModel):
    positive: confloat(ge=0, le=1)
    negative: confloat(ge=0, le=1)


class ModelInference(Ventu):
    def __init__(self, *args, **kwargs):
        # initialize super class with request & response schema, configs
        super().__init__(*args, **kwargs)
        # initialize model and other tools
        self.tokenizer = DistilBertTokenizer.from_pretrained(
            'distilbert-base-uncased')
        self.model = DistilBertForSequenceClassification.from_pretrained(
            'distilbert-base-uncased-finetuned-sst-2-english')

    def preprocess(self, data: Req):
        # preprocess a request data (as defined in the request schema)
        tokens = self.tokenizer.encode(data.text, add_special_tokens=True)
        return tokens

    def batch_inference(self, data):
        # batch inference is used in `socket` mode
        data = [torch.tensor(token) for token in data]
        with torch.no_grad():
            result = self.model(torch.nn.utils.rnn.pad_sequence(data, batch_first=True))[0]
        return result.numpy()

    def inference(self, data):
        # inference is used in `http` mode
        with torch.no_grad():
            result = self.model(torch.tensor(data).unsqueeze(0))[0]
        return result.numpy()[0]

    def postprocess(self, data):
        # postprocess a response data (returned data as defined in the response schema)
        scores = (np.exp(data) / np.exp(data).sum(-1, keepdims=True)).tolist()
        return {'negative': scores[0], 'positive': scores[1]}


def create_model():
    logger = logging.getLogger()
    formatter = logging.Formatter(
        fmt='%(asctime)s - %(levelname)s - %(module)s - %(message)s')
    handler = logging.StreamHandler()
    handler.setFormatter(formatter)
    logger.setLevel(logging.DEBUG)
    logger.addHandler(handler)

    model = ModelInference(Req, Resp, use_msgpack=True)
    return model


def create_app():
    """for gunicorn"""
    return create_model().app


if __name__ == "__main__":
    parser = argparse.ArgumentParser(description='Ventu service')
    parser.add_argument('--mode', '-m', default='http', choices=('http', 'unix', 'tcp'))
    parser.add_argument('--host', default='localhost')
    parser.add_argument('--port', '-p', default=8080, type=int)
    parser.add_argument('--socket', '-s', default='batching.socket')
    args = parser.parse_args()

    model = create_model()
    if args.mode == 'unix':
        model.run_unix(args.socket)
    elif args.mode == 'tcp':
        model.run_tcp(args.host, args.port)
    else:
        model.run_http(args.host, args.port)

您可以以以下方式运行此脚本

  • 单线程HTTP服务: python examples/app.py
  • 具有多个工作进程的HTTP服务: gunicorn -w 2 -b localhost:8080 'examples.app:create_app()'
    • 作为HTTP服务运行时,可以检查以下链接
      • /metrics Prometheus指标
      • /health 健康检查
      • /inference 推理
      • /apidoc/redoc/apidoc/swagger OpenAPI文档
  • 在批处理服务后面的推理工作进程: python examples/app.py -m socket (Unix域套接字) 或 python examples/app.py -m tcp --host localhost --port 8888 (TCP) (需要先运行批处理服务)

客户端

from concurrent import futures

import httpx
import msgpack

URL = 'http://localhost:8080/inference'
HEADER = {'Content-Type': 'application/msgpack'}
packer = msgpack.Packer(
    autoreset=True,
    use_bin_type=True,
)


def request(text):
    return httpx.post(URL, data=packer.pack({'text': text}), headers=HEADER)


if __name__ == "__main__":
    with futures.ThreadPoolExecutor() as executor:
        text = [
            'They are smart',
            'what is your problem?',
            'I hate that!',
            'x',
        ]
        results = executor.map(request, text)
        for i, resp in enumerate(results):
            print(
                f'>> {text[i]} -> [{resp.status_code}]\n'
                f'{msgpack.unpackb(resp.content)}'
            )

项目详情


下载文件

下载适合您平台的文件。如果您不确定选择哪个,请了解有关安装包的更多信息。

源分布

ventu-0.4.5.tar.gz (11.6 kB 查看哈希值)

上传时间

构建分布

ventu-0.4.5-py3-none-any.whl (10.2 kB 查看哈希值)

上传时间 Python 3

支持者