Integrate amqp into guillotina
Project description
guillotina_amqp Docs
Integrates aioamqp into guillotina, providing an execution framework for asyncio tasks:
Guillotina command to start a worker: amqp-worker
Workers consume tasks from rabbit-mq through the aioamqp integration
Redis state manager implementation to keep a global view of running tasks
Utilities and endpoints for adding new tasks and for task cancellation
Its distributed design - the absence of a central worker manager - makes it more robust. Task cancelation is signaled over the state manager, and workers will be responsible for stopping canceled tasks.
A watchdog on the asyncio loop can be launched with the auto-kill-timeout command argument, which will kill the worker if one of its tasks has captured the loop for too long.
When a task fails, the worker will send it to the delay queue, which has been configured to re-queue tasks to the main queue after a certain TTL. Failed tasks are retried a limited amount of times.
Configuration
Example docs:
{
"amqp": {
"host": "localhost",
"port": 5673,
"login": "guest",
"password": "guest",
"vhost": "/",
"heartbeat": 800,
"queue": "guillotina", # Main consuming queue for workers
"persistent_manager": "redis",
"delayed_ttl_ms": 60 * 1000,
"errored_ttl_ms": 1000 * 60 * 60 * 24 * 7,
}
}
host and port: should point to the rabbit-mq instance
login and password: should match the rabbit-mq access credentials
queue: main queue where tasks are consumed from
persistent_manager: named utility to use to keep tasks state.
delay_ttl_ms and errored_ttl_ms: can be used to configure queue delays. Default to 2 minutes and 1 week, correspondingly.
max_running_tasks: maximum number of simultaneous asyncio tasks hat workers are allowed to run.
Dependencies
Python >= 3.7
Installation
This example will use virtualenv:
virtualenv . ./bin/pip install .[test]
Running
Most simple way to get running:
./bin/guillotina
Queue tasks
code:
from guillotina_amqp import add_task await add_task(my_func, 'foobar', kw_arg='blah')
With decorators
code:
from guillotina_amqp import task
@task
async def my_func(foo):
print(foo)
await my_func('bar')
Run the worker
command:
g amqp-worker
You can use a couple of additional parameters:
–auto-kill-timeout: time of inactivity after which the worker will restart himself assuming it got stuck.
–max-running-tasks: max number of simultaneous asyncio tasks in the event loop. Overwrites configuraiton parameter.
API
GET /@amqp-tasks - get list of tasks
GET /@amqp-tasks/{task_id} - get task info
DELETE /@amqp-tasks/{task_id} - delete task