Python分布式Redis队列工作者
项目描述
qw
==
qw (或QueueWorker)用于运行工作进程,这些进程监听Redis列表以处理作业。
## 安装
### pip
`pip install qw`
### git
```
git clone git://github.com/brettlangdon/qw.git
cd ./qw
python setup.py install
```
## 设计
### 管理器
管理器只是一个进程管理器。其任务是启动/停止工作子进程。
### 工作者
工作者是等待并监听几个队列上的作业的进程,然后处理这些作业。
。
### 目标
工作者/管理器接受一个`target`,它可以是一个函数或一个字符串(可导入的函数)。
```python
def target(job_id, job_data)
pass
manager = Manager(target)
# OR
manager = Manager('__main__.target')
```
### 队列
有一些不同的队列被使用。作业队列只是Redis列表,管理器/工作者列表是集合,作业是散列。
工作者从`all:jobs`、`<manager>:jobs`或`<worker>:jobs`中获取作业,拉取相应的`job:<job_id>`键,
并使用提供的`target`处理它,处理完毕后,将`job:<job_id>`键以及作业ID从`<worker>:jobs`队列中删除。
。
* `all:managers` - 所有管理器的集合
* `all:jobs` - 所有工作者都可以从中拉取作业的队列,值只是作业ID
* `job:<job_id>` - 作业数据的散列
* `<manager>:workers` - 属于特定管理器的所有工作者的集合
* `<manager>:jobs` - 特定管理器的作业队列,工作者会尝试从这里拉取,然后再从`all:jobs`中拉取,值只是作业ID
* `<worker>:jobs` - 某个工作者的工作队列,这被视为每个工作者的进行中队列,工作者将从 `<manager>:jobs` 或 `all:jobs` 中将工作拉入此队列,值仅为工作ID
## 基本用法
```python
from qw.manager import Manager
def job_printer(job_id, job_data)
print(job_id)
print(job_data)
manager = Manager(job_printer)
manager.start()
manager.join()
```
## API
### Manager(object)
* `__init__(self, target, host="localhost", port=6379, db=0, num_workers=None, name=None)`
* `start(self)`
* `stop(self)`
* `join(self)`
### Worker(multiprocess.Process)
* `__init__(self, client, target, manager_name=None, timeout=10)`
* `run(self)`
* `shutdown(self)`
### Client(redis.StrictRedi)
* `__init__(self, host="localhost", port=6379, db=0)`
* `register_manager(self, name)`
* `deregister_manager(self, name)`
* `register_worker(self, manager, name)`
* `deregister_worker(self, manager, name)`
* `queue_job(self, job_data, manager=None, worker=None)`
* `fetch_next_job(self, manager, worker, timeout=10)`
* `finish_job(self, job_id, worker_name)`
* `get_all_managers(self)`
* `get_manager_workers(self, manager_name)`
* `get_worker_pending_jobs(self, worker_name)`
* `get_manager_queued_jobs(self, manager_name)`
* `get_all_queued_jobs(self)`
* `get_all_pending_jobs(self)`
## CLI工具
### qw-manager
`qw-manager`工具用于启动一个新的管理进程,该进程使用提供的`target`字符串运行,该字符串为每个由工作者处理的工作而运行。
```
$ qw-manager --help
用法
qw-manager [--level=<日志级别>] [--workers=<工作者数量>] [--name=<名称>] [--host=<主机>] [--port=<端口>] [--db=<数据库>] <目标>
qw-manager (--help | --version)
选项
--help 显示此帮助信息
--version 显示版本信息
-l --level=<日志级别> 设置日志级别(调试、信息、警告、错误)[默认:信息]
-w --workers=<工作者数量> 设置要启动的工作者数量,默认为CPU数量
-n --name=<名称> 设置管理器名称,默认为主机名
-h --host=<主机> 设置要使用的redis主机[默认:localhost]
-p --port=<端口> 设置要使用的redis端口[默认:6379]
-d --db=<数据库> 设置要使用的redis数据库编号[默认:0]
```
### qw-client
`qw-client`命令用于查看正在运行的管理器、工作者和作业队列的基本统计信息,以及将json数据以字符串或文件形式推送到主队列或特定管理器队列。
```
$ qw-client --help
用法
qw-client [--host=<主机>] [--port=<端口>] [--db=<数据库>] managers
qw-client [--host=<主机>] [--port=<端口>] [--db=<数据库>] workers [<管理器>]
qw-client [--host=<主机>] [--port=<端口>] [--db=<数据库>] jobs [<管理器>]
qw-client [--host=<主机>] [--port=<端口>] [--db=<数据库>] queue string <数据> [<管理器>]
qw-client [--host=<主机>] [--port=<端口>] [--db=<数据库>] queue file <文件> [<管理器>]
qw-client (--help | --version)
选项
--help 显示此帮助信息
--version 显示版本信息
-h --host=<主机> 设置要使用的redis主机[默认:localhost]
-p --port=<端口> 设置要使用的redis端口[默认:6379]
-d --db=<数据库> 设置要使用的redis数据库编号[默认:0]
```
==
qw (或QueueWorker)用于运行工作进程,这些进程监听Redis列表以处理作业。
## 安装
### pip
`pip install qw`
### git
```
git clone git://github.com/brettlangdon/qw.git
cd ./qw
python setup.py install
```
## 设计
### 管理器
管理器只是一个进程管理器。其任务是启动/停止工作子进程。
### 工作者
工作者是等待并监听几个队列上的作业的进程,然后处理这些作业。
。
### 目标
工作者/管理器接受一个`target`,它可以是一个函数或一个字符串(可导入的函数)。
```python
def target(job_id, job_data)
pass
manager = Manager(target)
# OR
manager = Manager('__main__.target')
```
### 队列
有一些不同的队列被使用。作业队列只是Redis列表,管理器/工作者列表是集合,作业是散列。
工作者从`all:jobs`、`<manager>:jobs`或`<worker>:jobs`中获取作业,拉取相应的`job:<job_id>`键,
并使用提供的`target`处理它,处理完毕后,将`job:<job_id>`键以及作业ID从`<worker>:jobs`队列中删除。
。
* `all:managers` - 所有管理器的集合
* `all:jobs` - 所有工作者都可以从中拉取作业的队列,值只是作业ID
* `job:<job_id>` - 作业数据的散列
* `<manager>:workers` - 属于特定管理器的所有工作者的集合
* `<manager>:jobs` - 特定管理器的作业队列,工作者会尝试从这里拉取,然后再从`all:jobs`中拉取,值只是作业ID
* `<worker>:jobs` - 某个工作者的工作队列,这被视为每个工作者的进行中队列,工作者将从 `<manager>:jobs` 或 `all:jobs` 中将工作拉入此队列,值仅为工作ID
## 基本用法
```python
from qw.manager import Manager
def job_printer(job_id, job_data)
print(job_id)
print(job_data)
manager = Manager(job_printer)
manager.start()
manager.join()
```
## API
### Manager(object)
* `__init__(self, target, host="localhost", port=6379, db=0, num_workers=None, name=None)`
* `start(self)`
* `stop(self)`
* `join(self)`
### Worker(multiprocess.Process)
* `__init__(self, client, target, manager_name=None, timeout=10)`
* `run(self)`
* `shutdown(self)`
### Client(redis.StrictRedi)
* `__init__(self, host="localhost", port=6379, db=0)`
* `register_manager(self, name)`
* `deregister_manager(self, name)`
* `register_worker(self, manager, name)`
* `deregister_worker(self, manager, name)`
* `queue_job(self, job_data, manager=None, worker=None)`
* `fetch_next_job(self, manager, worker, timeout=10)`
* `finish_job(self, job_id, worker_name)`
* `get_all_managers(self)`
* `get_manager_workers(self, manager_name)`
* `get_worker_pending_jobs(self, worker_name)`
* `get_manager_queued_jobs(self, manager_name)`
* `get_all_queued_jobs(self)`
* `get_all_pending_jobs(self)`
## CLI工具
### qw-manager
`qw-manager`工具用于启动一个新的管理进程,该进程使用提供的`target`字符串运行,该字符串为每个由工作者处理的工作而运行。
```
$ qw-manager --help
用法
qw-manager [--level=<日志级别>] [--workers=<工作者数量>] [--name=<名称>] [--host=<主机>] [--port=<端口>] [--db=<数据库>] <目标>
qw-manager (--help | --version)
选项
--help 显示此帮助信息
--version 显示版本信息
-l --level=<日志级别> 设置日志级别(调试、信息、警告、错误)[默认:信息]
-w --workers=<工作者数量> 设置要启动的工作者数量,默认为CPU数量
-n --name=<名称> 设置管理器名称,默认为主机名
-h --host=<主机> 设置要使用的redis主机[默认:localhost]
-p --port=<端口> 设置要使用的redis端口[默认:6379]
-d --db=<数据库> 设置要使用的redis数据库编号[默认:0]
```
### qw-client
`qw-client`命令用于查看正在运行的管理器、工作者和作业队列的基本统计信息,以及将json数据以字符串或文件形式推送到主队列或特定管理器队列。
```
$ qw-client --help
用法
qw-client [--host=<主机>] [--port=<端口>] [--db=<数据库>] managers
qw-client [--host=<主机>] [--port=<端口>] [--db=<数据库>] workers [<管理器>]
qw-client [--host=<主机>] [--port=<端口>] [--db=<数据库>] jobs [<管理器>]
qw-client [--host=<主机>] [--port=<端口>] [--db=<数据库>] queue string <数据> [<管理器>]
qw-client [--host=<主机>] [--port=<端口>] [--db=<数据库>] queue file <文件> [<管理器>]
qw-client (--help | --version)
选项
--help 显示此帮助信息
--version 显示版本信息
-h --host=<主机> 设置要使用的redis主机[默认:localhost]
-p --port=<端口> 设置要使用的redis端口[默认:6379]
-d --db=<数据库> 设置要使用的redis数据库编号[默认:0]
```