Skip to main content

Run and monitor celery tasks

Project description

Summary

Run, monitor and log celery tasks.

Installation and setup

Declare tasks using celery task or cubicweb-celery cwtasks.

On worker side, install cw-celerytask-helpers.

celeryconfig.py example:

BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = BROKER_URL
CUBICWEB_CELERYTASK_REDIS_URL = BROKER_URL
CELERY_IMPORTS = ('cw_celerytask_helpers.helpers', 'module.containing.tasks')
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json', 'msgpack', 'yaml']

In this configuration example, the cw_celerytask_helpers in CELERY_IMPORTS is required to have logging data (in the task) sent back to the Cubicweb instance via Redis. The CUBICWEB_CELERYTASK_REDIS_URL is the Redis endpoint used for this logging handling mechanism.

Start a worker:

# running cubicweb tasks (celeryconfig.py will be imported from your instance config directory)
celery -A cubicweb_celery -i <CW_INSTANCE_NAME> worker -l info

# running pure celery tasks
celery worker -l info

Task state synchronization requires to run the celery-monitor command:

cubicweb-ctl celery-monitor <instance-name>

Ensure to have the celeryconfig.py loaded for both cubicweb instance and celery worker, enforce by settings with CELERY_CONFIG_MODULE environment variable (it must be an importable python module).

Running tasks

Create a task:

from celery import current_app as app
from celery.utils.log import get_task_logger

logger = get_task_logger(__name__)

@app.task(name='hi_there')
def my_task(arg, kw=0):
    logger.info('HI %s %s!', arg, kw)
    return 42

Run a task:

from cubes.celerytask.entities import start_async_task

cwtask = start_async_task(cnx, 'hi_there', 'THERE', kw=42)
cnx.commit()

start_async_task() accept task names, task objects or task signatures: http://docs.celeryproject.org/en/latest/userguide/canvas.html#signatures

For instance, to start the above task in a dedicated queue named myqueue:

import celery

start_async_task(cnx, celery.signature('hi_there', args=('THERE',),
                                       kwargs={'kw': 42}, queue='myqueue'))

Testing task based application

In CubicWeb test mode, tasks don’t run automatically, use cubes.celerytask.entities.get_tasks() to introspect them and cubes.celerytask.entities.run_all_tasks() to run them.

Also, CELERY_ALWAYS_EAGER and CELERY_EAGER_PROPAGATES_EXCEPTIONS are set to True by default.

Demo

A simple demo is supplied with the source code.

We assume the present cubicweb-celerytask cube is properly installed in a working Cubicweb environment, and there is a redis server available on redis://localhost:6379/0 (you can change this in demo/celeryconfig.py is needed).

Then:

user@host:~/celerytask$ cubicweb-ctl create celerytask demo

For the sake of simplicity, choose sqlite as database driver, and say ‘yes’ to the question “aAllow anonymous access ?”

Start the web application:

user@host:~/celerytask$ cubicweb-ctl start -D -linfo demo

Open your web browser on http://127.0.0.1:8080/

In another terminal, start a celery worker:

user@host:~$ cd celerytask/demo
user@host:~/celerytask/demo$ celery worker -l info -E

In a third terminal, launch some tasks:

user@host:~$ cd celerytask/demo
user@host:~/celerytask/demo$ cubicweb-ctl shell demo launchtasks.py

You should be able to see 3 tasks on http://127.0.0.1:8080/CeleryTask

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page