Celery is a Python based task queuing software package that enables execution of asynchronous computational workloads driven by information contained in messages that are produced in application code (Django in this example) destined for a Celery task queue. The timetasks.settings in os.environ.setdefault('DJANGOSETTINGSMODULE', 'timetasks.settings') is also set in manage.py so, when in doubt, use the value in manage.py for your project. This is for our celery worker process. My Django project name is timetasks so I use it for celery as well with app = Celery('timetasks'). Installing Celery. For Django projects, we will install django-celery which in turn installs celery as a dependency. Run this command to install Django-celery: pip install django-celery. Configuring Celery. Adding Django celery configuration in settings.py: import djcelery //Add django celery in install apps INSTALLEDAPPS =.
This model defines a single periodic task to be run.
It must be associated with a schedule, which defines how often the task shouldrun.
A schedule that runs at a specific interval (e.g. every 5 seconds).
A schedule with fields like entries in cron:
This model is only used as an index to keep track of when the schedule haschanged.
Whenever you update a
PeriodicTask a counter in this table is alsoincremented, which tells the
celerybeat service to reload the schedulefrom the database.
If you update periodic tasks in bulk, you will need to update the countermanually:
Example creating interval-based periodic task¶
To create a periodic task executing at an interval you must firstcreate the interval object:
That’s all the fields you need: a period type and the frequency.
You can choose between a specific set of periods:
If you have multiple periodic tasks executing every 10 seconds,then they should all point to the same schedule object.
There’s also a “choices tuple” available should you need to present thisto the user:
Now that we have defined the schedule object, we can create the periodic taskentry:
Note that this is a very basic example, you can also specify the argumentsand keyword arguments used to execute the task, the
queue to send itto[*], and set an expiry time.
Here’s an example specifying the arguments, note how JSON serialization isrequired:
|[*]||you can also use low-level AMQP routing using the |
Example creating crontab-based periodic task¶
A crontab schedule has the fields:
month_of_year`,soifyouwanttheequivalentofa``30**** (execute at 30 minutes past the hour every hour) crontabentry you specify:
Then to create a periodic task using this schedule, use the same approach asthe interval-based periodic task earlier in this document, but insteadof
Temporarily disable a periodic task¶
You can use the
enabled flag to temporarily disable a periodic task:
Example running periodic tasks¶
The periodic tasks still need ‘workers’ to execute them.So make sure the default Celery package is installed.(If not installed, please follow the installation instructionshere: https://github.com/celery/celery)
Both the worker and beat services need to be running at the same time.
Start a Celery worker service (specify your Django project name):
As a separate process, start the beat service (specify the Django scheduler):
OR you can use the -S (scheduler flag), for more options see ``celery beat –help ``):
Also, as an alternative, you can run the two steps above (worker and beat services)with only one command (recommended for development environment only):
- Now you can add and manage your periodic tasks from the Django Admin interface.
In the last post, we have created “foundations” for this work, including basic project set-up, package installation and virtual environment.If you are familiar with Django, this post may be your starting point.However, if you are not or if you wish to consult the ‘reference’, feel free to take a look at it anytime.
In this post, we will develop our project further to make our task report its progress and render it in the front-end.Since the focus is on integration between Celery and Django, we will keep the task simple, mainly increament a number and watching it change in our html template.
If you already have up and running (e.g. taken from gtihub repo), take a look at the next post that discusses deployment.
We assume your basic application works and you are able to move between the index and test pages.It is now time to add Celery into the project.We will be using Celery v4.1.0, which is the mostrecent version at the time of writing of this post.
Before we get started, we must ensure that we have RabbitMQ installed on our system.It is going to be our message broker of choice to make django and Celery communicate.For other options, see Celery brokers.As for Ubuntu users it often the case that its shipped version is rather outdated, it is recommended that we reinstallit anyway:
Next, we can use pip to install the newest Celery:
As you see, in addition, we have installed amqp, which is necessary to ensure that the status of Celery’s tasksis stored somewhere. Furthermore, django-celery-results allows to integrate this functionality well into the project.SQLAlchemy will add one more functionality, mainly let us operate on the tasks just as if there were normal django models.We will see more of that later.
There exist multiple ways to integrate Celery into a project.Although our application is rather simple, it is considered general good practice to separate all definitions that are related to configuration of Celery from definitions of its tasks.This is especially important once our project grows in size, in which case it makessense to include separate definitions of Celery’s tasks for each individual application.Therefore, we will include Celery-specific defintions in four places:
hello_celery/hello_celery/celery.py- for storing ‘global’ configurations of Celery.
hello_celery/hello_celery/__init__.py- for making django include Celery’s configurations upon start.
hello_celery/demo/tasks.py- for including tasks that are specific to demo application.
hello_celery/hello_celery/settings.py- for establishing a link to the earlier three.
This file we need to create ourselves.Here, a Celery object is defined and through a series of optional arguments we can control what services are tobe the broker and backend for Celery.The last two lines tell django to search within
settings.py for further definitions related to individual tasks.
__init__.py files are usually empty files used by python to recognize entire directories with files as pythonmodules.Here, however, we leverage that to inform django of the existance of Celery object instance.
In this file, we define just one simple task that we will soon use to test our django-celery connection.
settings.py we add
'django-celery-backend' to the
INSTALLED_APPS to allow djangostore the status of Celery tasks.In addition, we also need to specify the mechanism, which is done by adding
CELERY_RESULT_BACKEND = 'django-db' to the last line of the file.
Our Celery should now be properly integrated into our project.Since our goal is to use Celery in the background, while keeping the application flexible, it is perhapsa good time to test, whether this connection simply works, before moving on to something slighly more complicated.For this reason, we will try to use the
add(x, y) function defined in
tasks.py to print out the resultof this operation in the terminal.
To create of a simple test, let’s make a few modifocationsto our
Here, line #2 imports
AsyncResult that allows us to retrieve results from the asynchroneous tasks. Also, we have directly imported our function
add(), which we call in line #12, but without passing its result to the front-end.
As before, we must register a new URL, and (as before) we do it through
Finally, we add a “trigger” to our template in line #12.
Now, to test this mechanism, we need to open a sepatate terminal window and execute the following command (ensuring virtual environment):
This command launches celery worker, with
--workdir flag pointing it to the project’s location, and
--app flag assigns the module.
Watching numbers change
Django Celery App
Finally, we have reached the stage in where we are ready to take the tasks to the next level.We are prepared with our project to integrate our “changing numbers promise”.
Principle behind it
The principle behind it is to use
tasks.py, in which we will define one more function that will increment numbers.At each iteration, we shall put a “watch”, namely update some user-defined meta-data, making it accessible for the
views.py, while keeping the task running.Then, we will define a series of auxiliary functions (also in
views.py) with corresponding URLs to manipulate the task and update the front-end.Finally, we will suppliment
test.html with a small script to poll the backend for the meta-data mentioned using AJAX, thus making it possible for to update the front-end without the necessity to relead the webpage.
The key to success is to keep track of the task id.If take a look again at Figure 3., you should observe a long sequence of characters within
demo.tasks.add[...]. This sequence is auto-generated each time we execute
add.delay(...) and it is our reference to the executed task.
Let’s first define our task with customized meta-data.
increment() is nothing, but an ordinary for loop with time delay (default is 0.5s).Here, however, we use
current_task.update_state(...) to define an auxiliary dictionary called meta for capturing progress.Whenever the state of the task equals to
'PROGRESS', we will be able to retrieve meta for as long as we know the task ID.As there is not limit to what we put in meta,we take the liberty to define two fields:
'iteration'for keeping the number, and
'status'to pass a custom defined informationof what this number represents.
Stitching front- and back-end together
Handing the task in the back-end is a bit more complicated.To simplify, we can say, however that it boils down to handling of the four following cases:
- starting the task,
- updating the status/state of the progress,
- moving back to where we can see it, and
- aborting the task.
Each one of these cases should be handled by one particular function in
views.py.Let’s leave aborting for now.
Since we had already defined
Before we proceed, let’s add a few
imports to the
views.py and remove the
quick_test() functions alongside with corresponding URL.
views.py: import statements
Starting the task
The task in itself is now defined. Having the
views.py prepared, we define
The function works in two steps. With no prior task running (with no
job_id passed through the request), it will execute our
increment task in line #15, setting it to run through 120 iterations.This will automatically define a new task, returning the reference object.Line #17 will then redirect us back to the same page, but this time with job id appended to our URL.Naturally, this mechanism will re-call the same function, passing this (appended) URL through GET request.The request gets processed in lines #4-6, defining new
We register the corresponding URL.
The button just created will start the celery process in the background. Although the progress tracking is not yet integrated with the front-end, the task should be observable in the terminal.
Updating task status
This is perhaps the most challenging part of the whole project, as it requires simultanous tweaking of both back-end and front-end functions.We are, however, well prepared to take on this challenge now.
Perhaps, you have noticed that our URL in the web browser has changed.It contains now the job id.If we only create a script in the front-end to call a sepatate URL containing this ID, we should be able to check for the task status - all the things we have defined as meta-data.This time, let’s first start with the front-end.
The template has been expanded considerably. Let’s analyze is a bit.
Lines #14-25 define fields, where we can watch the changes, with
task_id being field values that are passed to the template using liquid syntax that we have already used before.The fields have been categorized on purpose.The first two will directly reflect our definition of the meta-data, while the latter two are refered more to the Celery backend.Indeed, Celery has its own convention on naming particular states.Looking back on
views.py, we do not define
'status' on purpose.If not yet defined, Celery has no way of knowing of its status, has it?
update() is the essence.Using
$.ajax it requests the back-end’s URL for a particular function associated with
update_status, appending the task id through GET method.Assuming successful response, it enforces updating the fields’ values with corresponding back-end data.More information on AJAX APIs can be found here.
Finally, lines #47-51 “activate” the
update() function based on an interval of 200ms and force it to stop depending on the
check_status obtained from the back-end.
Knowing that the
$.ajax method expects a JSON-like format from the back-end, while calling it at a particular URL via GET, we can move on to
Again, let’s break it down.
Lines #4-8 (in this snippet) obtain the task status and result, retrieved from AJAX request via GET.In case of the task being undefined, asking either will cause an error.Later, in lines #12-17, we define a dictionary (JSON) using both Celery task
status and our own meta-data.Once the task is finished, our meta-data will no longer be available, causing
Last, but not least, we register another URL.
^[sS]*/ will allow the back-end to recognize the regular expression irrespectively of what other sequence stands before it.For example, if the task has been triggered using the button, the corresponding AJAX request URL will read as
/start_page/update_status/?task_id=....We require, however, to be also able to track the progress after for example clicking on index page and then going back to test.In this case, the request will read as
Possibility to come back
Right now, we loose track of the processed task when moving away and back to the test page.To keep it, we need to have a mechanism of storing the ID somewhere.A database would be a good idea.Fortunately, Celery integrates well with Django, making it possible to retrieve any processed task ID using
All we need to do, is to add one more
import statement in
Furthermore, we need to modify the
views.py: test_page (modified)
In this application, we are only interested in the last task.The
try/except statements are needed in case no tasks exists within the database.If this is so, the
TaskResult.objects.last() would return an empty object.Line #6 reuses the same mechanism of redirecting that we had used earlier.In case the task is finished, no progress tracking is needed anyomre, hence we can just render the
test.html template as before.
Check it out. Now, you should be able to move across the pages and for as long as the task is being processed, you should be able to track its progress.
One more thing, with
TaskResult, we are able to perform operations on tasks just like on any other django models.An interesting thing happens when we look into the admin panel, but we leave it as a bonus for the reader.
Aborting the task
The last thing remaining is a possibility to revoke a task, while it is being processed.This is a natural thing to have, especially if Celery queues are full of lengthy tasks, and under “normal” circumstances they will.
Comparing to what we just achieved, this is an easy thing.Before proceeding, we need to add one more
Then, it is a usual procedure of adding a function to
views.py, registering URL and adding a trigger to the template.
With this snippet being rather intuitive, it is perhaps worth to mention that revoking a task, as perfomed in line #6, forces the application to unconditionally terminate a task.This is a pretty “burtal” mechanism.Therefore, it must be used with caution, or even not used if the task performs some critical functions, which could compromise the application is data is lost.
Once done, we register a new URL in
Finally, we add one more line to our
test.html template. Here, it will be a new link-button, just under the Start button.
We have made it!The application is now finished, featuring all functionality we promised.
As the last steps, there are two more things that we should do before proceeding with the demployment.Mainly, it makes sense to record all of package dependencies.Indeed, once we move on to the next machine, we will be required to re-create all the environment so that our project runs smoothly.
A standard practice to do it in python is to freeze all installations we have made using pip and save them to a special file.Assuming we operate within virtual environment, this is what we do:
For the sake of reference
requirements.txt for our project should read:
Again, it makes perfect sense to keep this file revisioned through git alongside with the rest of the project.
Also, just for the sake of reference, the project file structure should read:
If you wish, you can use the finished code on github and move on to part III, which discusses deployment.
Hey! Do you mind helping me out?
It's been 4 years since I launched this blog. Now, I would like to bring it to the next level. I want to record some screencast tutorial videos on the very topics that brought you here!
If you want more of the stuff, you will help me greatly by filling out a survey I have prepared for you. By clicking below, you will be redirected to Google Forms with a few questions. Please, answer them. They won't take more than 5 minutes and I do not collect any personal data.
Thank you! I appreciate it.