Skip to main content

Decorators for running functions in Thread/ThreadPool/IOLoop

Project description

threaded

https://travis-ci.org/python-useful-helpers/threaded.svg?branch=master https://coveralls.io/repos/github/python-useful-helpers/threaded/badge.svg?branch=master Documentation Status https://img.shields.io/pypi/v/threaded.svg https://img.shields.io/pypi/pyversions/threaded.svg https://img.shields.io/pypi/status/threaded.svg https://img.shields.io/github/license/python-useful-helpers/threaded.svg

threaded is a set of decorators, which wrap functions in:

  • concurrent.futures.ThreadPool

  • threading.Thread

  • asyncio.Task in Python 3.

  • gevent.threadpool.ThreadPool if gevent is installed.

Why? Because copy-paste of loop.create_task, threading.Thread and thread_pool.submit is boring, especially if target functions is used by this way only.

Pros:

Python 2.7
PyPy

Decorators:

  • ThreadPooled - native concurrent.futures.ThreadPool usage on Python 3 and it’s backport on Python 2.7.

  • threadpooled is alias for ThreadPooled.

  • Threaded - wrap in threading.Thread.

  • threaded is alias for Threaded.

  • AsyncIOTask - wrap in asyncio.Task. Uses the same API, as Python 3 ThreadPooled.

  • asynciotask is alias for AsyncIOTask.

  • GThreadPooled - wrap function in gevent.threadpool.ThreadPool.

  • gthreadpooled is alias for GThreadPooled.

Usage

ThreadPooled

Mostly it is required decorator: submit function to ThreadPoolExecutor on call.

threaded.ThreadPooled.configure(max_workers=3)

Python 2.7 usage:

@threaded.ThreadPooled
def func():
    pass

concurrent.futures.wait([func()])

Python 3.3+ usage:

@threaded.ThreadPooled
def func():
    pass

concurrent.futures.wait([func()])

Python 3.3+ usage with asyncio:

loop = asyncio.get_event_loop()
@threaded.ThreadPooled(loop_getter=loop, loop_getter_need_context=False)
def func():
    pass

loop.run_until_complete(asyncio.wait_for(func(), timeout))

Python 3.3+ usage with asyncio and loop extraction from call arguments:

loop_getter = lambda tgt_loop: tgt_loop
@threaded.ThreadPooled(loop_getter=loop_getter, loop_getter_need_context=True)  # loop_getter_need_context is required
def func(*args, **kwargs):
    pass

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait_for(func(loop), timeout))

During application shutdown, pool can be stopped (while it will be recreated automatically, if some component will request).

threaded.ThreadPooled.shutdown()

Threaded

Classic threading.Thread. Useful for running until close and self-closing threads without return.

Usage example:

@threaded.Threaded
def func(*args, **kwargs):
    pass

thread = func()
thread.start()
thread.join()

Without arguments, thread name will use pattern: 'Threaded: ' + func.__name__

Override name can be don via corresponding argument:

@threaded.Threaded(name='Function in thread')
def func(*args, **kwargs):
    pass

Thread can be daemonized automatically:

@threaded.Threaded(daemon=True)
def func(*args, **kwargs):
    pass

Also, if no any addition manipulations expected before thread start, it can be started automatically before return:

@threaded.Threaded(started=True)
def func(*args, **kwargs):
    pass

AsyncIOTask

Wrap in asyncio.Task.

usage with asyncio:

@threaded.AsyncIOTask
def func():
    pass

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait_for(func(), timeout))

Provide event loop directly:

loop = asyncio.get_event_loop()
@threaded.AsyncIOTask(loop_getter=loop)
def func():
    pass

loop.run_until_complete(asyncio.wait_for(func(), timeout))

Usage with loop extraction from call arguments:

loop_getter = lambda tgt_loop: tgt_loop
@threaded.AsyncIOTask(loop_getter=loop_getter, loop_getter_need_context=True)
def func(*args, **kwargs):
    pass

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait_for(func(loop), timeout))

GThreadPooled

Post function to gevent.threadpool.ThreadPool.

threaded.GThreadPooled.configure(max_workers=3)

Basic usage example:

@threaded.GThreadPooled
def func():
    pass

func().wait()

Testing

The main test mechanism for the package threaded is using tox. Available environments can be collected via tox -l

CI systems

For code checking several CI systems is used in parallel:

  1. Travis CI: is used for checking: PEP8, pylint, bandit, installation possibility and unit tests. Also it’s publishes coverage on coveralls.

  2. coveralls: is used for coverage display.

CD system

Travis CI: is used for package delivery on PyPI.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page