Skip to main content

Command Palette

Search for a command to run...

Setting Up Celery, Celery Beat, Redis, and Django

Updated
4 min read
Setting Up Celery, Celery Beat, Redis, and Django
D

"Passionate software developer with a focus on Python. Driven by a love for technology and a constant desire to explore and learn new skills. Constantly striving to push the limits and create innovative solutions.”

This guide will walk you through the detailed process of setting up and using Celery with Redis in a Django application. We'll cover task creation, enqueuing tasks, using Redis as a message broker, processing tasks with Celery workers, and scheduling periodic tasks using Celery Beat.

Overview

  1. Celery: A task queue for managing and executing asynchronous tasks.

  2. Celery Beat: A scheduler for periodic tasks.

  3. Redis: A message broker to store and manage task queues.

  4. Django: A web framework to create and manage web applications.

Step-by-Step Guide

1. Install Required Packages

First, install the necessary packages using pip:

pip install celery redis django

2. Configure Django Project

Create a Django project and application if you haven't already:

django-admin startproject myproject
cd myproject
django-admin startapp myapp

3. Setup Celery

Create a celery.py file in your project directory (myproject/celery.py) to configure Celery:↳

# myproject/celery.py

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

# Set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')

app = Celery('myproject')

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()

@app.task(bind=True)
def debug_task(self):
    print(f'Request: {self.request!r}')

4. Ensure Celery Loads When Django Starts

Modify the __init__.py file in your project directory (myproject/__init__.py):

# myproject/__init__.py

from __future__ import absolute_import, unicode_literals

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ('celery_app',)

5. Configure Django Settings

Add the Celery and Redis configurations to your Django settings (myproject/settings.py):

# settings.py

CELERY_BROKER_URL = 'redis://127.0.0.1:6379/0'
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'UTC'

6. Define Tasks

Create tasks in your Django app (myapp/tasks.py):

# myapp/tasks.py

from celery import shared_task

@shared_task
def add(x, y):
    return x + y

@shared_task
def some_periodic_task():
    # Task code here
    pass

7. Enqueue Tasks

Enqueue tasks from your Django views or other parts of your application (myapp/views.py):

# myapp/views.py

from django.http import JsonResponse
from .tasks import add

def my_view(request):
    result = add.delay(4, 4)  # Enqueues the task to Redis
    return JsonResponse({'task_id': result.id})

8. Setup Celery Beat for Periodic Tasks

Configure periodic tasks in celery.py:

# myproject/celery.py

from celery.schedules import crontab

app.conf.beat_schedule = {
    'add-every-30-seconds': {
        'task': 'myapp.tasks.add',
        'schedule': 30.0,
        'args': (16, 16)
    },
    'run-every-monday-morning': {
        'task': 'myapp.tasks.some_periodic_task',
        'schedule': crontab(hour=7, minute=30, day_of_week=1),
        'args': ()
    },
}
app.conf.timezone = 'UTC'
  • add-every-30-seconds: This task runs every 30 seconds and calls the add function with the arguments 16 and 16.

  • run-every-monday-morning: This task runs every Monday at 7:30 AM and calls the some_periodic_task function without any arguments.

9. Start Celery Worker and Celery Beat

Start a Celery worker to process tasks:

celery -A myproject worker --loglevel=info

Start Celery Beat to schedule periodic tasks:

celery -A myproject beat --loglevel=info

Alternatively, you can run both a worker and beat in one command:

celery -A myproject worker -B --loglevel=info

10. Monitoring and Management

Flower

Flower is a real-time web-based monitoring tool for Celery.

Install Flower:

pip install flower

Run Flower:

celery -A myproject flower

Access Flower in your web browser at http://localhost:5555.

Redis CLI

Inspect the queues and tasks using Redis CLI.

Start the Redis CLI:

redis-cli

Example commands:

  • KEYS *: Lists all keys in the Redis database.

  • LLEN celery: Shows the number of tasks in the celery queue.

Example Django Project Structure

Here's an example directory structure for a Django project with Celery tasks:

myproject/
    __init__.py
    settings.py
    urls.py
    wsgi.py
    celery.py
myapp/
    __init__.py
    tasks.py
    views.py

Summary

  1. Install Packages: Install Celery and Redis.

  2. Configure Celery: Create and configure celery.py.

  3. Load Celery: Ensure Celery loads with Django.

  4. Configure Settings: Add Celery and Redis configurations in Django settings.

  5. Define Tasks: Create tasks in your Django app.

  6. Enqueue Tasks: Use the delay method to add tasks to the Redis queue.

  7. Periodic Tasks: Configure periodic tasks with Celery Beat.

  8. Start Processes: Run Celery worker and beat to process tasks and schedule periodic tasks.

  9. Monitoring: Use tools like Flower and Redis CLI to monitor and manage tasks.

By following these detailed steps, you can set up a Django application to handle asynchronous tasks and periodic tasks using Celery, Celery Beat, and Redis. This setup helps in managing background jobs efficiently and ensures your application remains responsive.

#######################BONUS :D #############################
Retrieving the Result:

  • The task ID (result.id) can be used to query the result.
from celery.result import AsyncResult

def get_task_result(task_id):
    result = AsyncResult(task_id)
    if result.ready():
        return result.result  # The result of the task
    else:
        return 'Task not yet completed'

The End :D