Professional Task Queues in Python with Celery, RabbitMQ & Redis
By NeuralNine
Key Concepts
Celery, RabbitMQ, Redis, Task Queues, Asynchronous Task Processing, Message Broker, Celery Client, Celery Worker, Docker Compose, Task Scheduling, Concurrency, Django Integration, Celery Beat, Groups and Chords, Asynchronous Results, Open AI API integration.
Celery, RabbitMQ, and Redis: An Overview
Celery is a distributed task queue that enables asynchronous task processing and task scheduling. It uses a message broker (like RabbitMQ or Redis) to decouple task producers (clients) from task consumers (workers). The client submits tasks to the message broker, and the worker retrieves and processes them, storing the results in a backend (like Redis or a database).
- Celery: A "simple, flexible, and reliable distributed system to process vast amounts of messages." Use cases include asynchronous task processing and scheduled tasks.
- RabbitMQ: A message broker that facilitates communication between the Celery client and worker. It offers features like guaranteed delivery and message loss prevention.
- Redis: Can be used as both a message broker and a result backend. It's simpler than RabbitMQ but lacks guaranteed delivery mechanisms.
- Message Broker Goal: To decouple the task producer (client) from the task consumer (worker) for asynchronous communication.
- Backend: Stores the results of Celery tasks. Can be Redis, a PostgreSQL database, in-memory storage, or Amazon S3.
Docker Compose Setup
The video emphasizes using Docker Compose for setting up Celery, RabbitMQ, and Redis. This approach is preferred because it mirrors real-world application deployments.
- Docker Compose YAML File: Defines the services (RabbitMQ, Redis, Celery worker, Celery client) and their configurations.
- RabbitMQ Service: Uses the
rabbitmq:3-managementimage, sets environment variables for user credentials (RABBITMQ_DEFAULT_USER,RABBITMQ_DEFAULT_PASS), defines a volume (rabbitmq_data) for data persistence, maps port15672for the management UI, and includes a health check. - Redis Service: Uses the
redis:7-alpineimage, sets the command toredis-server --save 60 1(save to disk if at least one key changed in 60 seconds), defines a volume (redis_data) for data persistence, and includes a health check. - Celery Worker Service: Builds from the current directory (or a specified backend directory), uses the
celery -A worker workercommand to start the worker, loads environment variables from an.envfile, and depends on RabbitMQ and Redis being healthy. - Celery Client Service: Builds from the same directory as the worker, runs a Python script (
client.py), loads environment variables, and depends on RabbitMQ, Redis, and the Celery worker. - .env File: Stores environment variables like RabbitMQ credentials (
RABBITMQ_USER,RABBITMQ_PASS) and Celery broker/backend URLs (CELERY_BROKER_URL,CELERY_RESULT_BACKEND). - Docker Prerequisite: Basic Docker knowledge is assumed for understanding the setup.
Simple Celery Task Example
This example demonstrates a basic Celery task that generates a random number with a delay.
- worker.py: Defines a Celery application instance (
random_number) and a task (random_number) decorated with@app.task. The task takes amax_valueargument, waits for 5 seconds, and returns a random integer between 0 andmax_value. - client.py: Submits the
random_numbertask using thedelay()method, retrieves the asynchronous result, and prints the result when it's ready. It usescelery.result.AsyncResultto check the task's status. - Asynchronous Result: The
delay()method returns anAsyncResultobject, which represents a promise of a future result. - Result States: The
AsyncResultobject has astateattribute that indicates the task's status (e.g., "PENDING", "SUCCESS"). - Result Retrieval: The
result.get()method blocks until the result is ready. To avoid blocking, checkresult.ready()before callingresult.get(). - Dockerfile: A basic Dockerfile for Python that installs Celery and Redis from
requirements.txt. - requirements.txt: Contains the dependencies:
celeryandredis[redis]. - Execution: The example is run using
docker-compose up -d --build.
Open AI Integration Example
This example demonstrates integrating Celery with the Open AI API to extract movie information.
- Dependencies: Requires the
openaiandpydanticpackages, added torequirements.txt. - Movie Class: Defines a
Movieclass using Pydantic to structure the extracted movie information (title, release year, director, genre). - movie_info Task: A Celery task that takes a prompt (movie title) as input, sends it to the Open AI API, and returns a structured
Movieobject. - API Key: Requires an Open AI API key, stored in the
.envfile asOPENAI_API_KEY. - Client Interaction: The client submits multiple movie information requests using
delay(), retrieves the asynchronous results, and prints the extracted movie information. - Concurrency: Celery utilizes concurrency to process multiple requests simultaneously.
Splitting Tasks with Groups and Chords
This example demonstrates how to split a complex task into multiple subtasks and combine their results using Celery groups and chords.
- Movie Model Parts: The
Moviemodel is split into three parts:MoviePartA(title, release year),MoviePartB(director, genre), andMoviePartC(list of actors). - Partial Tasks: Three Celery tasks are defined:
movie_info_a,movie_info_b, andmovie_info_c, each responsible for filling one part of theMoviemodel. - Combine Task: A
combine_partstask merges the results of the partial tasks into a single dictionary. - Groups: A group is used to execute the partial tasks in parallel:
group(movie_info_a.s(prompt), movie_info_b.s(prompt), movie_info_c.s(prompt)). - Chords: A chord is used to combine the results of the group with the
combine_partstask:chord(header)(combine_parts.s()). Theheaderis the group of partial tasks, and thecombine_parts.s()is the callback task that receives the results of the group. - Result: The client retrieves the result of the chord, which is a single JSON object containing all the movie information.
Task Scheduling with Celery Beat
This example demonstrates how to schedule tasks to run periodically using Celery Beat.
- Celery Beat Service: A new service is added to the
docker-compose.ymlfile, using thecelery -A worker beatcommand. - Volume Mapping: A volume is mapped to persist the Celery Beat schedule.
- celery.py Modifications: The
celery.pyfile is modified to include Celery Beat settings. - Task Scheduling: Tasks can be scheduled using either the
scheduleorcrontabfunctions fromcelery.schedules. - schedule: Executes a task every
nseconds (e.g.,schedule=10.0). - crontab: Executes a task at a specific time and date, using cron-like syntax (e.g.,
crontab(hour=16, minute=48, tzinfo=timezone('Europe/Vienna'))). - Example Task: The example task writes the current timestamp to a file (
/data/timestamp.txt) every 5 seconds or at a specific time.
Django Integration
This example demonstrates how to integrate Celery with a Django application.
- Django Web Service: A new service is added to the
docker-compose.ymlfile to run the Django application. - Django DB Backend: Redis is replaced with the Django database (SQLite in this case) as the Celery result backend.
- celery.py: A
celery.pyfile is created in the Django project to configure Celery. It imports Django settings, creates a Celery application instance, and enables task discovery. - __init__.py: The
\_\_init\_\_.pyfile is modified to import the Celery app instance. - settings.py: The Django settings file is updated to include Celery-related settings, such as
CELERY_BROKER_URL,CELERY_RESULT_BACKEND,CELERY_TIMEZONE, andCELERY_ENABLE_UTC. TheCELERY_RESULT_BACKENDis set todjango-db. - tasks.py: Celery tasks are defined in a
tasks.pyfile within a Django app. The@shared_taskdecorator is used instead of@app.task. - Task Execution: Tasks are executed asynchronously from Django views.
- Notes: The video provides a link to a GitHub repository with a complete Django Celery example. The presenter recommends searching for "NOTE" in the code to find the changes made for Celery integration.
Conclusion
The video provides a comprehensive guide to building professional task queues in Python using Celery, RabbitMQ, and Redis. It covers the fundamental concepts, Docker Compose setup, basic and advanced examples (Open AI integration, task splitting), task scheduling with Celery Beat, and Django integration. The video emphasizes practical application and provides a GitHub repository with complete code examples. The key takeaways are the importance of asynchronous task processing for building scalable applications, the flexibility of Celery in integrating with various message brokers and backends, and the power of Docker Compose for managing complex application deployments.
Chat with this Video
AI-PoweredHi! I can answer questions about this video "Professional Task Queues in Python with Celery, RabbitMQ & Redis". What would you like to know?