some background
Roof over a tavern, Ireland 2016.

Celery-Inception: Tasks Inside Tasks

Intoduction

Celery has been a way to go when it comes to scheduling python jobs for quite some time. With increasing interest in python, often driven by machine-learning, Celery is often found a solution to the problem of executing lengthy computations on the server side. Regardless from whether your application has been designed with handling extensive calcuations to begin with or it just grew bigger and you are looking for a way to improve its responsiveness, chances are you have already stumbled across Celery.

In this post, we show a simple way of organizing computations through a series of atomic Celery tasks and appointing one master task to keep an eye on the progress of its children. What is more, we will aslo discuss how to organize the communication such that it is easy for the user to track progression.

Preparation

Assuming you have your django project set up as well as celery installed and integrated into the project, you are pretty much ready to start. Now, imagine that you have a number for heavy computational tasks that you would like to execute in order. It matters less what these tasks are. They could be some bluky signal processing or optimization calculations as much as a simple for-loop counting to one billion. What matters is that they are atomic and take enough time to complete that it is justified to define them separately.

For simplicity, we will assume they are simple for-loops with time delay. Also, for simplicity, we will define in the same tasks.py file belonging to one application. In case your project consists of several applications in django sense, all you need to take care are two things:

  1. You ensure correct import (e.g. from .yourapp.tasks import some_task_function) in the parent tasks.py file.
  2. You have registered all files containing celery tasks in celery.py file, so that they are recognized as celery tasks.

Registering tasks

MainProj/MainProj/celery.py

1
2
3
4
5
6
7
8
9
10
11
12
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'MainProj.settings')

app = Celery('MainProj',
        broker='amqp://',       # exmaple broker
        backend='amqp://',      # example backend
        include=['parent.tasks', 'children.tasks']) # <- here
app.config_from_object('django.conf:settings', namespace='Celery')
app.autodiscover_tasks()

Here, we have implicitely assumed we have two applications: parent and children, each having its own tasks.py file. However, there can be more or there can be just one application, in which tasks.py will contain both parent and children tasks.

Children tasks

As mentioned before, children tasks can be anything. Here, we wil define just one here, and yes… a for-loop.

MainProj/children/tasks.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
from __future__ import absolute_import, unicode_literals

from celery import current_task, shared_task
from celery.result import AsyncResult
from django_celery_results.models import TaskResult
from billiard.exceptions import Terminated

from time import sleep

@shared_task(throws(Terminated,))
def func_1(limit, testmode=False):
    print ("Calculating something veeery heavy.")
    try:
        for counter in range(limit):
            print ("Iteration: {} out of {}.".format(counter, limit))

            if not testmode:
                current_task.update_state(
                    state="PROGRESS",
                    meta={
                        'iteration':  counter,
                        'completion': counter/limit*100,
                        'status':     "PROGRESS",
                        'message':    "Executing func_1...",
                        'error':      None
                        }
                    )
            sleep(1)
     except Exception as e:
        print ("ERROR occured: {}.".format(e))
        outcome = {
            'iteration':  0,
            'completion': 0,
            'status':     "ERROR",
            'message':    "Excecuting func_1 failed!",
            'error':      "Error in func_1: {}.".format(e),
        }
        return outcome
    else:
        outcome = {
            'iteration':  limit,
            'completion': 100,
            'status':     "OK",
            'message':    "Executing func_1 completed.",
            'error':      None,
        }
        return outcome

There are some interesting things related to this implementation. First of all, we define an optional argument testmode that defaults to False. The reason for it is purely for debugging. When designing the tasks for to calculate something specific, we will certainly like to have a way of testing the function before we execute it in an asynchroneous more. The testmode argument will prevent an error when having current_task undefined.

Secondly, exceptions occuring in asynchroneous tasks can be a real pain to handle, as they do occur in a parallel process. For this reason, we surround the body of the function with try-except statment and let possible exeptions escalate in a contolled way. By defining a dedicated field in meta, as well as the outcome to store the information, we can propagate the error information all the way to the front end. Having this set up, we are free to define custom exceptions by using raise statement, which we can use for assertions. This is especially useful when designing tools that are research oriented.

Finally, it also important that we pass a consistent output from the function regardless of whether the function exits in a clean way or has raised an exception. As we are soon going to monitor this task with another tasks, we can save ourselves a lot of trouble just by having outcome have the same structure as meta.

Parent task

Once we have all our children tasks functions defined, we are ready to define the parent task. The parent task can (and should) be used solely for executing and monitoring the progress of its children.

MainProj/parent/tasks.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
from __future__ import absolute_import, unicode_literals

from celery import current_task, shared_task
from celery.result import AsyncResult
from django_celery_results.models import TaskResult
from billiard.exceptions import Terminated

from .children.tasks import func_1


@shared_task(throws=(Terminated,))
def main_flow(limit):
    print ("Calculating everything...")
    job = func_1.delay(limit)
    outcome = watch(job)
    if outcome['error'] != None:
        print ("Func_1 failed or revoked!")
        parent_state = "FAILURE",
        parent_meta ={
            'iteration':  0,
            'completion': 0,
            'status':     "FAILURE",
            'message':    "ERROR: {}.".format(outcome['error']),
            }
        current_task.update_state(
            state=parent_state,
            meta=parent_meta
            )
        return parent_meta
    else:
        parent_state = "PROGRESS"
        parent_meta ={
            'iteration':  0,
            'completion': 100,
            'status':     "SUCCESS",
            'message':    "Execution of func_1 finished.",
            }
        current_task.update_state(
            state=parent_state,
            meta=parent_meta
            )
    # Here there can be more children...
    # So you can repeat the routine above.
    # In the end...

    parent_state = "SUCCESS"
    parent_meta  = {
        'itration': 0,
        'completion': 100,
        'status':   "SUCCESS",
        'message': "Congratulations! You calculated everything."
        }
    current_task.update_state(
        state=parent_state,
        meta=parent_meta
        )
    sleep(10)
    return parent_meta

This function defines the execution flow of the sequence of operation if its children. In case one of the children raises an exception, we know it it will exit and return its outcome to the parent task that will also exit passing the exception information farther.

To monitor the progress of a child, we use another function watch in line #15. For simplicity, we can define this function in the same file.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
def watch(job):
    task = AsyncResult(job.id)
    task_process = 'RUNNING'
    try:
        while task_process == 'RUNNING':

            # check for STATUS:
            if  task.status == 'REVOKED':
                meta = {
                    "iteration":    1,
                    "completion":   100,
                    "status":       task.result['status'],
                    "task_message": task.result['message'],
                    "error":        task.result['error'],
                    }
            elif task.status == 'PROGRESS':
                meta = {
                    "iteration":    task.result['iteration'],
                    "completion":   task.result['completion'],
                    "status":       task.result['status'],
                    "task_message": task.result['message'],
                    "error":        task.result['error'],
                    }
            elif task.status == 'SUCCESS':
                meta = {
                    "iteration":    0,
                    "completion":   100,
                    "status":       "SUCCESS",
                    "task_message": "completed",
                    "error":        task.result['error'],
                    }
            elif task.status == 'PENDING':
                meta = {
                    "iteration":    0,
                    "completion":   0,
                    "status":       'PENDING',
                    "task_message": "Preparing... ",
                    "error":        None,
                    }
            else:
                meta = {
                    "iteration":    0,
                    "completion":   0,
                    "status":       "?",
                    "task_message": "",
                    "error":        "",
                    }

            # check for RESULT:
            if task.result != None and task.status != "PENDING":
                if  task.result['status'] == 'OK':
                    task_process = "OK"
                    return meta
                elif task.result['status'] == 'ERROR':
                    task_process = "ERROR"
                    break

            # update status
            current_task.update_state(
                    state=task.status,
                    meta=meta)
            sleep(0.1)
    except Exception as e:
        meta['error'] = 'Error in Task Monitoring: ' + str(e)
    finally:
        task_obj = TaskResult.objects.filter(task_id=task.id)
        task_obj.delete()
    return meta

The essence of this function is a while loop which will keep the parent task monitoring a child. For this reason, its is executed synchroneusly inside main_flow (not through .delay() method).

For as long as the child process is running, we can pass all of the meta information from it to the parent process, simply by assigning its meta data to the parent’s. However, because it is a while loop, it is extremely important that you are careful about exit condition(s). For this reason, we define two ways this function can break the loop and exit:

  1. The child process can either exit with "OK" or "ERROR" status, making the monitoring no longer necessary.
  2. The watch function can too raise an exception, for example due to some delays of the operating system, which may end up the child no longer be able to pass any information or simply becase we mislabel one of the keys in meta during development! In either way, we need to make sure we are prepared for this case. Otherwise, we can end up in the parent tasks being locked on forever, making killing of process a rather difficult operation. In fact, we should define the third mechanism of timeout in case the child task process is “PENDING” and never gets started. However, this is beyond the scope of this post.

Another thing is that the monitoring function should not be to “oppressive” with polling for the children’s status or else it can make things slow. This is why we include is a small delay in the watch’s loop to be e.g. 100ms.

Finally, all celery tasks’ results end up in the database by default. In case it is not our intention to keep that information any longer, we can delete that reference from the database under finally statement, which makes it done irrespectively of what happens.

Exceuting in views

The only thing that remains is to execute the main_flow function somewhere and it will take care of the rest. By all means, it is the simplest thing. All we need to do is to define a specific function in views and execute it using .delay() method).

Somewhere in views.py

from .tasks import main_flow

def some_view(request):
    # ...
    job = main_flow.delay(limit)
    # ...
    return SomeResponse(...)

For as long as we pass the task.id we can use other views and AJAX to monitor progression and pass information of potential exceptions, just like in we did earlier.

Conclusion

In this post, we have demonstrated a way to use a dedicated celery task to monitor several other celery tasks executed in sequence. We have aslo discussed propagating information about potential exceptions and ways to ensure that we do not end up in an infinite loop when monitoring the tasks.