HTTP任务队列
项目描述
# HTTP任务队列 (htq)
此项目的动机是提供一个标准接口,以便在后台向执行某些任务的服务发送请求,并提供一种标准机制来指示其被 *取消*。
在这种情况下,一个 *任务* 被定义为可能需要比典型HTTP响应更长的时间,或者允许最终完成的操作。例如,执行数据库查询、对某些数据进行分析,以及从网站或其他服务中摄取/抓取数据。客户端-服务器模型的一个副作用是服务器可能不知道客户端是否终止了请求。服务器将继续执行任务,占用其他任务或请求处理程序可能使用的资源。取消信号机制的目的是向服务发送后续的DELETE请求,以便可以处理以中断第一个请求。这当然需要服务支持DELETE方法并实现取消正在执行的任务的逻辑。下面是一个实现此接口的[服务](#service-example)的示例。
有关详细介绍,请阅读下面的[教程](#tutorial)。
## 依赖项
- Python 3.3+
- Redis 2.4+
## 命令行界面
```
$ htq -h
HTTP任务队列 (htq) 命令行界面
用法
htq server [--host <host>] [--port <port>] [--redis <redis>] [--debug]
htq worker [--threads <n>] [--redis <redis>] [--debug]
选项
-h --help 显示此屏幕。
-v --version 显示版本。
--debug 启用调试日志。
--host <host> HTTP服务的宿主机 [默认: localhost]。
--port <port> HTTP服务的端口号 [默认: 5000]。
--redis <redis> Redis服务器的宿主机/端口 [默认: localhost:6379]。
--threads <n> 工作进程应启动的线程数 [默认: 10]。
```
运行HTTP REST接口的服务器。
```
htq server
```
运行工作进程以发送请求并接收响应。
```
htq worker
```
## API
- `GET /` - 获取所有排队请求。
- `POST /` - 发送(排队)请求
- `GET /<uuid>/` - 通过UUID获取请求
- `DELETE /<uuid>/` - 取消请求,如果已收到响应则删除它
- `GET /<uuid>/response/` - 如果已收到,则获取请求的响应
- `DELETE /<uuid>/response/` - 删除请求的响应以清理空间
## 教程
启动HTQ服务器
```bash
$ htq server
启动htq REST服务器...
* 运行在 http://localhost:5000/
```
向服务发送一个POST请求,包含要发送的请求的JSON编码结构。这将立即返回一个`303 See Other`响应,其中包含指向请求的`Location`头部。
```bash
$ curl -i -X POST -H 'Content-Type: application/json' http://localhost:5000 -d '{"url": "http://httpbin.org/ip"}'
HTTP/1.0 303 SEE OTHER
Content-Type: text/html; charset=utf-8
Content-Length: 0
Location: http://localhost:5000/a936e1a1-68d8-4433-a0c0-4f4b2111670d/
Server: Werkzeug/0.9.6 Python/3.4.1
Date: Mon, 29 Sep 2014 17:18:49 GMT
```
查看请求资源的样子(使用`Location`头部中的URL作为输出)
```bash
$ curl -i http://localhost:5000/a936e1a1-68d8-4433-a0c0-4f4b2111670d/
HTTP/1.0 200 OK
Content-Type: application/json
Content-Length: 185
Link: <http://localhost:5000/a936e1a1-68d8-4433-a0c0-4f4b2111670d/response/>; rel="response",
<http://localhost:5000/a936e1a1-68d8-4433-a0c0-4f4b2111670d/>; rel="self",
<http://localhost:5000/a936e1a1-68d8-4433-a0c0-4f4b2111670d/status/>; rel="status"
Server: Werkzeug/0.9.6 Python/3.4.1
Date: Mon, 29 Sep 2014 17:26:05 GMT
{
"timeout": 60,
"status": "queued",
"data": null,
"headers": {},
"url": "http://httpbin.org/ip",
"method": "get",
"uuid": "a936e1a1-68d8-4433-a0c0-4f4b2111670d",
"time": 1412011537783
}
```
上述响应显示了请求的详细信息,例如URL、方法、头部和请求数据。此外,在请求排队时还捕获了元数据,包括UUID、请求排队的毫秒时间戳、请求状态和超时时间。`Link`头部显示了从该资源相关的链接,包括到响应和状态的链接。状态链接是轻量级的
```bash
$ curl -i http://localhost:5000/a936e1a1-68d8-4433-a0c0-4f4b2111670d/status/
HTTP/1.0 200 OK
Content-Type: application/json
Content-Length: 20
Server: Werkzeug/0.9.6 Python/3.4.1
Date: Mon, 29 Sep 2014 17:30:21 GMT
{"status": "queued"}
```
如果我们`curl`响应链接,它将阻塞,直到响应准备就绪。由于我们尚未启动工作进程,这将永远阻塞。让我们启动一个工作进程以实际发送请求。在另一个shell中执行此操作
```bash
$ htq worker --debug
启动了10个工作进程...
[a936e1a1-68d8-4433-a0c0-4f4b2111670d] 发送请求...
[a936e1a1-68d8-4433-a0c0-4f4b2111670d] 收到响应
```
立即启动工作进程守护程序开始消耗队列并发送请求。如您所见,上面发送的请求已经发送并收到响应。让我们检查请求的状态。
```bash
$ curl -i http://localhost:5000/a936e1a1-68d8-4433-a0c0-4f4b2111670d/status/
HTTP/1.0 200 OK
Content-Type: application/json
Content-Length: 21
Server: Werkzeug/0.9.6 Python/3.4.1
Date: Mon, 29 Sep 2014 17:36:56 GMT
{"status": "success"}
```
成功!现在让我们使用到响应本身的链接。
```bash
$ curl -i http://localhost:5000/a936e1a1-68d8-4433-a0c0-4f4b2111670d/response/
HTTP/1.0 200 OK
Content-Type: application/json
Content-Length: 19291
Link: <http://localhost:5000/a936e1a1-68d8-4433-a0c0-4f4b2111670d/>; rel="request",
<http://localhost:5000/a936e1a1-68d8-4433-a0c0-4f4b2111670d/response/>; rel="self"
Server: Werkzeug/0.9.6 Python/3.4.1
日期:Mon, 29 Sep 2014 17:38:22 GMT
{
"状态": "成功",
"时间": 1412012019457,
"代码": 200,
"原因": "OK",
"耗时": 79.81,
"UUID": "62db48a4-e511-4c8e-9c11-32b39758d1ff",
"头部": {
"Age": "0",
"Content-Length": "32",
"Connection": "Keep-Alive",
"Access-Control-Allow-Origin": "*",
"Server": "gunicorn/18.0",
"Access-Control-Allow-Credentials": "true",
"Date": "Mon, 29 Sep 2014 17:50:29 GMT",
"Content-Type": "application/json"
},
"数据": "{\n \"来源\": \"159.14.243.254\"\n}"
}
```
响应包含HTTP响应的所有元素,包括代码、原因、头部和数据(为了简洁已省略)。此外,响应接收的时间(以毫秒为单位)和耗时(以毫秒为单位)与UUID和状态元数据一起加入。
### 取消请求
HTQ定义了一个服务接口,允许实现取消请求的功能。例如,如果我发送的请求耗时超过预期(延迟30秒)
```bash
$ curl -i -X POST -H Content-Type:application/json http://localhost:5000 -d '{"url": "http://httpbin.org/delay/30"}'
HTTP/1.0 303 SEE OTHER
Content-Type: text/html; charset=utf-8
Content-Length: 0
位置:http://localhost:5000/1686e1b7-3b05-4d45-95e8-caf934f540aa/
Server: Werkzeug/0.9.6 Python/3.4.1
日期:Mon, 29 Sep 2014 18:00:28 GMT
```
然后我可以向请求URL发送一个DELETE请求
```bash
$ curl -i -X DELETE http://localhost:5000/1686e1b7-3b05-4d45-95e8-caf934f540aa/
HTTP/1.0 204 NO CONTENT
Content-Type: text/html; charset=utf-8
Content-Length: 0
Server: Werkzeug/0.9.6 Python/3.4.1
日期:Mon, 29 Sep 2014 18:00:41 GMT
```
现在如果我们检查状态,应该看到状态已变为“已取消”(响应也是空的)。
```bash
$ curl -i http://localhost:5000/9619b267-760d-4f0a-9f15-eb8ad99cd1c4/status/
HTTP/1.0 200 OK
Content-Type: application/json
Content-Length: 22
Server: Werkzeug/0.9.6 Python/3.4.1
日期:Mon, 29 Sep 2014 18:01:23 GMT
{"状态": "已取消"}
```
内部这会中断请求,但也会向端点(在此为`http://httpbin.org/delay/30`)发送一个DELETE请求。服务的实现者可以支持DELETE请求来取消正在进行的底层处理。当然,这取决于正在执行的任务,但这个简单的服务级合约提供了一种一致的机制来发出取消信号。
## 服务示例
以下是实现HTQ要求的服务的工作示例,该服务通过接收DELETE信号取消操作。此服务接收JSON编码的语句和参数,并在本地PostgreSQL实例中执行它们(为了简单起见,数据库设置是硬编码的)。
```python
import json
import logging
import psycopg2
from flask import Flask, abort, request
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler()
logger.addHandler(handler)
app = Flask(__name__)
# 数据库连接设置
settings = {
'dbname': 'example',
'host': 'localhost',
}
# 当前运行的任务按UUID映射到连接进程ID
# tasks = {}
tasks = {}
@app.route('/<uuid>/', methods=['POST'])
def run(uuid)
data = request.json
conn = psycopg2.connect(**settings)
c = conn.cursor()
# 存储执行此任务的连接的PID
pid = conn.get_backend_pid()
tasks[uuid] = pid
try
logger.debug('[{}] 执行查询{}...'.format(pid, uuid))
c.execute(data['statement'], data.get('parameters'))
except
conn.cancel()
abort(500)
finally
tasks.pop(uuid, None)
返回 json.dumps(c.fetchall()), 200
@app.route('/<uuid>/', methods=['DELETE'])
def cancel(uuid)
if uuid not in tasks
abort(404)
pid = tasks.pop(uuid)
# 打开新的连接以取消查询
conn = psycopg2.connect(**settings)
c = conn.cursor()
c.execute('select pg_terminate_backend(%s)', (pid,))
logger.debug('[{}] 取消查询{}'.format(pid, uuid))
返回 '', 204
if __name__ == '__main__'
app.run(threaded=True)
```
交互如下
```bash
% python example_service.py
* 运行在http://127.0.0.1:5000/
[57819] 执行查询abc123...
[57819] 取消查询abc123
127.0.0.1 - - [30/Sep/2014 11:16:01] "DELETE /abc123/ HTTP/1.1" 204 -
127.0.0.1 - - [30/九/2014 11:16:02] "POST /abc123/ HTTP/1.1" 500 -
```
首先发送了一个POST请求来执行查询。随后不久,发送了一个DELETE请求来取消查询并返回。当POST请求没有响应时,它返回了一个500错误,表示连接已关闭(这正是我们想要的)。
此项目的动机是提供一个标准接口,以便在后台向执行某些任务的服务发送请求,并提供一种标准机制来指示其被 *取消*。
在这种情况下,一个 *任务* 被定义为可能需要比典型HTTP响应更长的时间,或者允许最终完成的操作。例如,执行数据库查询、对某些数据进行分析,以及从网站或其他服务中摄取/抓取数据。客户端-服务器模型的一个副作用是服务器可能不知道客户端是否终止了请求。服务器将继续执行任务,占用其他任务或请求处理程序可能使用的资源。取消信号机制的目的是向服务发送后续的DELETE请求,以便可以处理以中断第一个请求。这当然需要服务支持DELETE方法并实现取消正在执行的任务的逻辑。下面是一个实现此接口的[服务](#service-example)的示例。
有关详细介绍,请阅读下面的[教程](#tutorial)。
## 依赖项
- Python 3.3+
- Redis 2.4+
## 命令行界面
```
$ htq -h
HTTP任务队列 (htq) 命令行界面
用法
htq server [--host <host>] [--port <port>] [--redis <redis>] [--debug]
htq worker [--threads <n>] [--redis <redis>] [--debug]
选项
-h --help 显示此屏幕。
-v --version 显示版本。
--debug 启用调试日志。
--host <host> HTTP服务的宿主机 [默认: localhost]。
--port <port> HTTP服务的端口号 [默认: 5000]。
--redis <redis> Redis服务器的宿主机/端口 [默认: localhost:6379]。
--threads <n> 工作进程应启动的线程数 [默认: 10]。
```
运行HTTP REST接口的服务器。
```
htq server
```
运行工作进程以发送请求并接收响应。
```
htq worker
```
## API
- `GET /` - 获取所有排队请求。
- `POST /` - 发送(排队)请求
- `GET /<uuid>/` - 通过UUID获取请求
- `DELETE /<uuid>/` - 取消请求,如果已收到响应则删除它
- `GET /<uuid>/response/` - 如果已收到,则获取请求的响应
- `DELETE /<uuid>/response/` - 删除请求的响应以清理空间
## 教程
启动HTQ服务器
```bash
$ htq server
启动htq REST服务器...
* 运行在 http://localhost:5000/
```
向服务发送一个POST请求,包含要发送的请求的JSON编码结构。这将立即返回一个`303 See Other`响应,其中包含指向请求的`Location`头部。
```bash
$ curl -i -X POST -H 'Content-Type: application/json' http://localhost:5000 -d '{"url": "http://httpbin.org/ip"}'
HTTP/1.0 303 SEE OTHER
Content-Type: text/html; charset=utf-8
Content-Length: 0
Location: http://localhost:5000/a936e1a1-68d8-4433-a0c0-4f4b2111670d/
Server: Werkzeug/0.9.6 Python/3.4.1
Date: Mon, 29 Sep 2014 17:18:49 GMT
```
查看请求资源的样子(使用`Location`头部中的URL作为输出)
```bash
$ curl -i http://localhost:5000/a936e1a1-68d8-4433-a0c0-4f4b2111670d/
HTTP/1.0 200 OK
Content-Type: application/json
Content-Length: 185
Link: <http://localhost:5000/a936e1a1-68d8-4433-a0c0-4f4b2111670d/response/>; rel="response",
<http://localhost:5000/a936e1a1-68d8-4433-a0c0-4f4b2111670d/>; rel="self",
<http://localhost:5000/a936e1a1-68d8-4433-a0c0-4f4b2111670d/status/>; rel="status"
Server: Werkzeug/0.9.6 Python/3.4.1
Date: Mon, 29 Sep 2014 17:26:05 GMT
{
"timeout": 60,
"status": "queued",
"data": null,
"headers": {},
"url": "http://httpbin.org/ip",
"method": "get",
"uuid": "a936e1a1-68d8-4433-a0c0-4f4b2111670d",
"time": 1412011537783
}
```
上述响应显示了请求的详细信息,例如URL、方法、头部和请求数据。此外,在请求排队时还捕获了元数据,包括UUID、请求排队的毫秒时间戳、请求状态和超时时间。`Link`头部显示了从该资源相关的链接,包括到响应和状态的链接。状态链接是轻量级的
```bash
$ curl -i http://localhost:5000/a936e1a1-68d8-4433-a0c0-4f4b2111670d/status/
HTTP/1.0 200 OK
Content-Type: application/json
Content-Length: 20
Server: Werkzeug/0.9.6 Python/3.4.1
Date: Mon, 29 Sep 2014 17:30:21 GMT
{"status": "queued"}
```
如果我们`curl`响应链接,它将阻塞,直到响应准备就绪。由于我们尚未启动工作进程,这将永远阻塞。让我们启动一个工作进程以实际发送请求。在另一个shell中执行此操作
```bash
$ htq worker --debug
启动了10个工作进程...
[a936e1a1-68d8-4433-a0c0-4f4b2111670d] 发送请求...
[a936e1a1-68d8-4433-a0c0-4f4b2111670d] 收到响应
```
立即启动工作进程守护程序开始消耗队列并发送请求。如您所见,上面发送的请求已经发送并收到响应。让我们检查请求的状态。
```bash
$ curl -i http://localhost:5000/a936e1a1-68d8-4433-a0c0-4f4b2111670d/status/
HTTP/1.0 200 OK
Content-Type: application/json
Content-Length: 21
Server: Werkzeug/0.9.6 Python/3.4.1
Date: Mon, 29 Sep 2014 17:36:56 GMT
{"status": "success"}
```
成功!现在让我们使用到响应本身的链接。
```bash
$ curl -i http://localhost:5000/a936e1a1-68d8-4433-a0c0-4f4b2111670d/response/
HTTP/1.0 200 OK
Content-Type: application/json
Content-Length: 19291
Link: <http://localhost:5000/a936e1a1-68d8-4433-a0c0-4f4b2111670d/>; rel="request",
<http://localhost:5000/a936e1a1-68d8-4433-a0c0-4f4b2111670d/response/>; rel="self"
Server: Werkzeug/0.9.6 Python/3.4.1
日期:Mon, 29 Sep 2014 17:38:22 GMT
{
"状态": "成功",
"时间": 1412012019457,
"代码": 200,
"原因": "OK",
"耗时": 79.81,
"UUID": "62db48a4-e511-4c8e-9c11-32b39758d1ff",
"头部": {
"Age": "0",
"Content-Length": "32",
"Connection": "Keep-Alive",
"Access-Control-Allow-Origin": "*",
"Server": "gunicorn/18.0",
"Access-Control-Allow-Credentials": "true",
"Date": "Mon, 29 Sep 2014 17:50:29 GMT",
"Content-Type": "application/json"
},
"数据": "{\n \"来源\": \"159.14.243.254\"\n}"
}
```
响应包含HTTP响应的所有元素,包括代码、原因、头部和数据(为了简洁已省略)。此外,响应接收的时间(以毫秒为单位)和耗时(以毫秒为单位)与UUID和状态元数据一起加入。
### 取消请求
HTQ定义了一个服务接口,允许实现取消请求的功能。例如,如果我发送的请求耗时超过预期(延迟30秒)
```bash
$ curl -i -X POST -H Content-Type:application/json http://localhost:5000 -d '{"url": "http://httpbin.org/delay/30"}'
HTTP/1.0 303 SEE OTHER
Content-Type: text/html; charset=utf-8
Content-Length: 0
位置:http://localhost:5000/1686e1b7-3b05-4d45-95e8-caf934f540aa/
Server: Werkzeug/0.9.6 Python/3.4.1
日期:Mon, 29 Sep 2014 18:00:28 GMT
```
然后我可以向请求URL发送一个DELETE请求
```bash
$ curl -i -X DELETE http://localhost:5000/1686e1b7-3b05-4d45-95e8-caf934f540aa/
HTTP/1.0 204 NO CONTENT
Content-Type: text/html; charset=utf-8
Content-Length: 0
Server: Werkzeug/0.9.6 Python/3.4.1
日期:Mon, 29 Sep 2014 18:00:41 GMT
```
现在如果我们检查状态,应该看到状态已变为“已取消”(响应也是空的)。
```bash
$ curl -i http://localhost:5000/9619b267-760d-4f0a-9f15-eb8ad99cd1c4/status/
HTTP/1.0 200 OK
Content-Type: application/json
Content-Length: 22
Server: Werkzeug/0.9.6 Python/3.4.1
日期:Mon, 29 Sep 2014 18:01:23 GMT
{"状态": "已取消"}
```
内部这会中断请求,但也会向端点(在此为`http://httpbin.org/delay/30`)发送一个DELETE请求。服务的实现者可以支持DELETE请求来取消正在进行的底层处理。当然,这取决于正在执行的任务,但这个简单的服务级合约提供了一种一致的机制来发出取消信号。
## 服务示例
以下是实现HTQ要求的服务的工作示例,该服务通过接收DELETE信号取消操作。此服务接收JSON编码的语句和参数,并在本地PostgreSQL实例中执行它们(为了简单起见,数据库设置是硬编码的)。
```python
import json
import logging
import psycopg2
from flask import Flask, abort, request
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler()
logger.addHandler(handler)
app = Flask(__name__)
# 数据库连接设置
settings = {
'dbname': 'example',
'host': 'localhost',
}
# 当前运行的任务按UUID映射到连接进程ID
# tasks = {}
tasks = {}
@app.route('/<uuid>/', methods=['POST'])
def run(uuid)
data = request.json
conn = psycopg2.connect(**settings)
c = conn.cursor()
# 存储执行此任务的连接的PID
pid = conn.get_backend_pid()
tasks[uuid] = pid
try
logger.debug('[{}] 执行查询{}...'.format(pid, uuid))
c.execute(data['statement'], data.get('parameters'))
except
conn.cancel()
abort(500)
finally
tasks.pop(uuid, None)
返回 json.dumps(c.fetchall()), 200
@app.route('/<uuid>/', methods=['DELETE'])
def cancel(uuid)
if uuid not in tasks
abort(404)
pid = tasks.pop(uuid)
# 打开新的连接以取消查询
conn = psycopg2.connect(**settings)
c = conn.cursor()
c.execute('select pg_terminate_backend(%s)', (pid,))
logger.debug('[{}] 取消查询{}'.format(pid, uuid))
返回 '', 204
if __name__ == '__main__'
app.run(threaded=True)
```
交互如下
```bash
% python example_service.py
* 运行在http://127.0.0.1:5000/
[57819] 执行查询abc123...
[57819] 取消查询abc123
127.0.0.1 - - [30/Sep/2014 11:16:01] "DELETE /abc123/ HTTP/1.1" 204 -
127.0.0.1 - - [30/九/2014 11:16:02] "POST /abc123/ HTTP/1.1" 500 -
```
首先发送了一个POST请求来执行查询。随后不久,发送了一个DELETE请求来取消查询并返回。当POST请求没有响应时,它返回了一个500错误,表示连接已关闭(这正是我们想要的)。
项目详情
关闭
htq-0.1.0.tar.gz的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 56d72b8dfe6f1d2998dcaad6186f60914e1ffdfe69948dee5246959646534329 |
|
MD5 | 63ee2f8acd6df6ddf3acc4ad8b3bd3d4 |
|
BLAKE2b-256 | 26fccac96ae8bf210d0f0fa108f077030b4daeae827cdeb8e8afe06bf15efe3a |