跳转到主要内容

Python分布式Redis队列工作者

项目描述

qw
==

qw (或QueueWorker)用于运行工作进程,这些进程监听Redis列表以处理作业。

## 安装
### pip

`pip install qw`

### git

```
git clone git://github.com/brettlangdon/qw.git
cd ./qw
python setup.py install
```

## 设计
### 管理器
管理器只是一个进程管理器。其任务是启动/停止工作子进程。

### 工作者
工作者是等待并监听几个队列上的作业的进程,然后处理这些作业。


### 目标
工作者/管理器接受一个`target`,它可以是一个函数或一个字符串(可导入的函数)。

```python
def target(job_id, job_data)
pass

manager = Manager(target)
# OR
manager = Manager('__main__.target')
```
### 队列
有一些不同的队列被使用。作业队列只是Redis列表,管理器/工作者列表是集合,作业是散列。

工作者从`all:jobs`、`<manager>:jobs`或`<worker>:jobs`中获取作业,拉取相应的`job:<job_id>`键,
并使用提供的`target`处理它,处理完毕后,将`job:<job_id>`键以及作业ID从`<worker>:jobs`队列中删除。


* `all:managers` - 所有管理器的集合
* `all:jobs` - 所有工作者都可以从中拉取作业的队列,值只是作业ID
* `job:<job_id>` - 作业数据的散列
* `<manager>:workers` - 属于特定管理器的所有工作者的集合
* `<manager>:jobs` - 特定管理器的作业队列,工作者会尝试从这里拉取,然后再从`all:jobs`中拉取,值只是作业ID
* `<worker>:jobs` - 某个工作者的工作队列,这被视为每个工作者的进行中队列,工作者将从 `<manager>:jobs` 或 `all:jobs` 中将工作拉入此队列,值仅为工作ID

## 基本用法

```python
from qw.manager import Manager


def job_printer(job_id, job_data)
print(job_id)
print(job_data)


manager = Manager(job_printer)
manager.start()
manager.join()
```

## API
### Manager(object)
* `__init__(self, target, host="localhost", port=6379, db=0, num_workers=None, name=None)`
* `start(self)`
* `stop(self)`
* `join(self)`

### Worker(multiprocess.Process)
* `__init__(self, client, target, manager_name=None, timeout=10)`
* `run(self)`
* `shutdown(self)`

### Client(redis.StrictRedi)
* `__init__(self, host="localhost", port=6379, db=0)`
* `register_manager(self, name)`
* `deregister_manager(self, name)`
* `register_worker(self, manager, name)`
* `deregister_worker(self, manager, name)`
* `queue_job(self, job_data, manager=None, worker=None)`
* `fetch_next_job(self, manager, worker, timeout=10)`
* `finish_job(self, job_id, worker_name)`
* `get_all_managers(self)`
* `get_manager_workers(self, manager_name)`
* `get_worker_pending_jobs(self, worker_name)`
* `get_manager_queued_jobs(self, manager_name)`
* `get_all_queued_jobs(self)`
* `get_all_pending_jobs(self)`

## CLI工具
### qw-manager
`qw-manager`工具用于启动一个新的管理进程,该进程使用提供的`target`字符串运行,该字符串为每个由工作者处理的工作而运行。

```
$ qw-manager --help
用法
qw-manager [--level=<日志级别>] [--workers=<工作者数量>] [--name=<名称>] [--host=<主机>] [--port=<端口>] [--db=<数据库>] <目标>
qw-manager (--help | --version)

选项
--help 显示此帮助信息
--version 显示版本信息
-l --level=<日志级别> 设置日志级别(调试、信息、警告、错误)[默认:信息]
-w --workers=<工作者数量> 设置要启动的工作者数量,默认为CPU数量
-n --name=<名称> 设置管理器名称,默认为主机名
-h --host=<主机> 设置要使用的redis主机[默认:localhost]
-p --port=<端口> 设置要使用的redis端口[默认:6379]
-d --db=<数据库> 设置要使用的redis数据库编号[默认:0]
```
### qw-client
`qw-client`命令用于查看正在运行的管理器、工作者和作业队列的基本统计信息,以及将json数据以字符串或文件形式推送到主队列或特定管理器队列。

```
$ qw-client --help
用法
qw-client [--host=<主机>] [--port=<端口>] [--db=<数据库>] managers
qw-client [--host=<主机>] [--port=<端口>] [--db=<数据库>] workers [<管理器>]
qw-client [--host=<主机>] [--port=<端口>] [--db=<数据库>] jobs [<管理器>]
qw-client [--host=<主机>] [--port=<端口>] [--db=<数据库>] queue string <数据> [<管理器>]
qw-client [--host=<主机>] [--port=<端口>] [--db=<数据库>] queue file <文件> [<管理器>]
qw-client (--help | --version)

选项
--help 显示此帮助信息
--version 显示版本信息
-h --host=<主机> 设置要使用的redis主机[默认:localhost]
-p --port=<端口> 设置要使用的redis端口[默认:6379]
-d --db=<数据库> 设置要使用的redis数据库编号[默认:0]
```

项目详情


下载文件

下载适用于您的平台的文件。如果您不确定要选择哪个,请了解有关安装软件包的更多信息。

源分布

qw-0.1.1.tar.gz (5.6 kB 查看散列)

上传时间: 来源

由以下支持