Celery-Inception: Tasks Inside Tasks
Intoduction
Celery has been a way to go when it comes to scheduling python jobs for quite some time. With increasing interest in python, often driven by machine-learning, Celery is often found a solution to the problem of executing lengthy computations on the server side. Regardless from whether your application has been designed with handling extensive calcuations to begin with or it just grew bigger and you are looking for a way to improve its responsiveness, chances are you have already stumbled across Celery.
In this post, we show a simple way of organizing computations through a series of atomic Celery tasks and appointing one master task to keep an eye on the progress of its children. What is more, we will aslo discuss how to organize the communication such that it is easy for the user to track progression.
Preparation
Assuming you have your django project set up as well as celery installed and integrated into the project, you are pretty much ready to start. Now, imagine that you have a number for heavy computational tasks that you would like to execute in order. It matters less what these tasks are. They could be some bluky signal processing or optimization calculations as much as a simple for-loop counting to one billion. What matters is that they are atomic and take enough time to complete that it is justified to define them separately.
For simplicity, we will assume they are simple for-loops with time delay.
Also, for simplicity, we will define in the same tasks.py
file belonging to one application.
In case your project consists of several applications in django sense, all you need to take care are two things:
- You ensure correct import (e.g.
from .yourapp.tasks import some_task_function
) in the parenttasks.py
file. - You have registered all files containing celery tasks in
celery.py
file, so that they are recognized as celery tasks.
Registering tasks
MainProj/MainProj/celery.py
1
2
3
4
5
6
7
8
9
10
11
12
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'MainProj.settings')
app = Celery('MainProj',
broker='amqp://', # exmaple broker
backend='amqp://', # example backend
include=['parent.tasks', 'children.tasks']) # <- here
app.config_from_object('django.conf:settings', namespace='Celery')
app.autodiscover_tasks()
Here, we have implicitely assumed we have two applications: parent and children, each having its own tasks.py
file.
However, there can be more or there can be just one application, in which tasks.py
will contain both parent and children tasks.
Children tasks
As mentioned before, children tasks can be anything. Here, we wil define just one here, and yes… a for-loop.
MainProj/children/tasks.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
from __future__ import absolute_import, unicode_literals
from celery import current_task, shared_task
from celery.result import AsyncResult
from django_celery_results.models import TaskResult
from billiard.exceptions import Terminated
from time import sleep
@shared_task(throws(Terminated,))
def func_1(limit, testmode=False):
print ("Calculating something veeery heavy.")
try:
for counter in range(limit):
print ("Iteration: {} out of {}.".format(counter, limit))
if not testmode:
current_task.update_state(
state="PROGRESS",
meta={
'iteration': counter,
'completion': counter/limit*100,
'status': "PROGRESS",
'message': "Executing func_1...",
'error': None
}
)
sleep(1)
except Exception as e:
print ("ERROR occured: {}.".format(e))
outcome = {
'iteration': 0,
'completion': 0,
'status': "ERROR",
'message': "Excecuting func_1 failed!",
'error': "Error in func_1: {}.".format(e),
}
return outcome
else:
outcome = {
'iteration': limit,
'completion': 100,
'status': "OK",
'message': "Executing func_1 completed.",
'error': None,
}
return outcome
There are some interesting things related to this implementation.
First of all, we define an optional argument testmode that defaults to False
.
The reason for it is purely for debugging.
When designing the tasks for to calculate something specific, we will certainly like to have a way of testing
the function before we execute it in an asynchroneous more.
The testmode argument will prevent an error when having current_task
undefined.
Secondly, exceptions occuring in asynchroneous tasks can be a real pain to handle, as they do occur in a parallel process.
For this reason, we surround the body of the function with try-except
statment and let possible exeptions
escalate in a contolled way.
By defining a dedicated field in meta
, as well as the outcome
to store the information, we
can propagate the error information all the way to the front end.
Having this set up, we are free to define custom exceptions by using raise
statement, which we can use for assertions.
This is especially useful when designing tools that are research oriented.
Finally, it also important that we pass a consistent output from the function regardless of whether
the function exits in a clean way or has raised an exception.
As we are soon going to monitor this task with another tasks, we can save ourselves a lot of trouble
just by having outcome
have the same structure as meta
.
Parent task
Once we have all our children tasks functions defined, we are ready to define the parent task. The parent task can (and should) be used solely for executing and monitoring the progress of its children.
MainProj/parent/tasks.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
from __future__ import absolute_import, unicode_literals
from celery import current_task, shared_task
from celery.result import AsyncResult
from django_celery_results.models import TaskResult
from billiard.exceptions import Terminated
from .children.tasks import func_1
@shared_task(throws=(Terminated,))
def main_flow(limit):
print ("Calculating everything...")
job = func_1.delay(limit)
outcome = watch(job)
if outcome['error'] != None:
print ("Func_1 failed or revoked!")
parent_state = "FAILURE",
parent_meta ={
'iteration': 0,
'completion': 0,
'status': "FAILURE",
'message': "ERROR: {}.".format(outcome['error']),
}
current_task.update_state(
state=parent_state,
meta=parent_meta
)
return parent_meta
else:
parent_state = "PROGRESS"
parent_meta ={
'iteration': 0,
'completion': 100,
'status': "SUCCESS",
'message': "Execution of func_1 finished.",
}
current_task.update_state(
state=parent_state,
meta=parent_meta
)
# Here there can be more children...
# So you can repeat the routine above.
# In the end...
parent_state = "SUCCESS"
parent_meta = {
'itration': 0,
'completion': 100,
'status': "SUCCESS",
'message': "Congratulations! You calculated everything."
}
current_task.update_state(
state=parent_state,
meta=parent_meta
)
sleep(10)
return parent_meta
This function defines the execution flow of the sequence of operation if its children.
In case one of the children raises an exception, we know it it will exit and
return its outcome
to the parent task that will also exit passing the exception information
farther.
To monitor the progress of a child, we use another function watch
in line #15.
For simplicity, we can define this function in the same file.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
def watch(job):
task = AsyncResult(job.id)
task_process = 'RUNNING'
try:
while task_process == 'RUNNING':
# check for STATUS:
if task.status == 'REVOKED':
meta = {
"iteration": 1,
"completion": 100,
"status": task.result['status'],
"task_message": task.result['message'],
"error": task.result['error'],
}
elif task.status == 'PROGRESS':
meta = {
"iteration": task.result['iteration'],
"completion": task.result['completion'],
"status": task.result['status'],
"task_message": task.result['message'],
"error": task.result['error'],
}
elif task.status == 'SUCCESS':
meta = {
"iteration": 0,
"completion": 100,
"status": "SUCCESS",
"task_message": "completed",
"error": task.result['error'],
}
elif task.status == 'PENDING':
meta = {
"iteration": 0,
"completion": 0,
"status": 'PENDING',
"task_message": "Preparing... ",
"error": None,
}
else:
meta = {
"iteration": 0,
"completion": 0,
"status": "?",
"task_message": "",
"error": "",
}
# check for RESULT:
if task.result != None and task.status != "PENDING":
if task.result['status'] == 'OK':
task_process = "OK"
return meta
elif task.result['status'] == 'ERROR':
task_process = "ERROR"
break
# update status
current_task.update_state(
state=task.status,
meta=meta)
sleep(0.1)
except Exception as e:
meta['error'] = 'Error in Task Monitoring: ' + str(e)
finally:
task_obj = TaskResult.objects.filter(task_id=task.id)
task_obj.delete()
return meta
The essence of this function is a while loop which will keep the parent task monitoring
a child. For this reason, its is executed synchroneusly inside main_flow
(not through .delay()
method).
For as long as the child process is running, we can pass all of the meta information from it to the parent process, simply by assigning its meta data to the parent’s. However, because it is a while loop, it is extremely important that you are careful about exit condition(s). For this reason, we define two ways this function can break the loop and exit:
- The child process can either exit with
"OK"
or"ERROR"
status, making the monitoring no longer necessary. - The
watch
function can too raise an exception, for example due to some delays of the operating system, which may end up the child no longer be able to pass any information or simply becase we mislabel one of the keys in meta during development! In either way, we need to make sure we are prepared for this case. Otherwise, we can end up in the parent tasks being locked on forever, making killing of process a rather difficult operation. In fact, we should define the third mechanism of timeout in case the child task process is “PENDING” and never gets started. However, this is beyond the scope of this post.
Another thing is that the monitoring function should not be to “oppressive” with polling for the children’s status
or else it can make things slow.
This is why we include is a small delay in the watch
’s loop to be e.g. 100ms.
Finally, all celery tasks’ results end up in the database by default.
In case it is not our intention to keep that information any longer, we can delete that reference from the database
under finally
statement, which makes it done irrespectively of what happens.
Exceuting in views
The only thing that remains is to execute the main_flow
function somewhere and it will take care of the rest.
By all means, it is the simplest thing.
All we need to do is to define a specific function in views and execute it using .delay()
method).
Somewhere in views.py
For as long as we pass the task.id we can use other views and AJAX to monitor progression and pass information of potential exceptions, just like in we did earlier.
Conclusion
In this post, we have demonstrated a way to use a dedicated celery task to monitor several other celery tasks executed in sequence. We have aslo discussed propagating information about potential exceptions and ways to ensure that we do not end up in an infinite loop when monitoring the tasks.