Professional Task Queues in Python with Celery, RabbitMQ & Redis

By NeuralNine

TechnologyAIBusiness
Share:

Key Concepts

Celery, RabbitMQ, Redis, Task Queues, Asynchronous Task Processing, Message Broker, Celery Client, Celery Worker, Docker Compose, Task Scheduling, Concurrency, Django Integration, Celery Beat, Groups and Chords, Asynchronous Results, Open AI API integration.

Celery, RabbitMQ, and Redis: An Overview

Celery is a distributed task queue that enables asynchronous task processing and task scheduling. It uses a message broker (like RabbitMQ or Redis) to decouple task producers (clients) from task consumers (workers). The client submits tasks to the message broker, and the worker retrieves and processes them, storing the results in a backend (like Redis or a database).

  • Celery: A "simple, flexible, and reliable distributed system to process vast amounts of messages." Use cases include asynchronous task processing and scheduled tasks.
  • RabbitMQ: A message broker that facilitates communication between the Celery client and worker. It offers features like guaranteed delivery and message loss prevention.
  • Redis: Can be used as both a message broker and a result backend. It's simpler than RabbitMQ but lacks guaranteed delivery mechanisms.
  • Message Broker Goal: To decouple the task producer (client) from the task consumer (worker) for asynchronous communication.
  • Backend: Stores the results of Celery tasks. Can be Redis, a PostgreSQL database, in-memory storage, or Amazon S3.

Docker Compose Setup

The video emphasizes using Docker Compose for setting up Celery, RabbitMQ, and Redis. This approach is preferred because it mirrors real-world application deployments.

  • Docker Compose YAML File: Defines the services (RabbitMQ, Redis, Celery worker, Celery client) and their configurations.
  • RabbitMQ Service: Uses the rabbitmq:3-management image, sets environment variables for user credentials (RABBITMQ_DEFAULT_USER, RABBITMQ_DEFAULT_PASS), defines a volume (rabbitmq_data) for data persistence, maps port 15672 for the management UI, and includes a health check.
  • Redis Service: Uses the redis:7-alpine image, sets the command to redis-server --save 60 1 (save to disk if at least one key changed in 60 seconds), defines a volume (redis_data) for data persistence, and includes a health check.
  • Celery Worker Service: Builds from the current directory (or a specified backend directory), uses the celery -A worker worker command to start the worker, loads environment variables from an .env file, and depends on RabbitMQ and Redis being healthy.
  • Celery Client Service: Builds from the same directory as the worker, runs a Python script (client.py), loads environment variables, and depends on RabbitMQ, Redis, and the Celery worker.
  • .env File: Stores environment variables like RabbitMQ credentials (RABBITMQ_USER, RABBITMQ_PASS) and Celery broker/backend URLs (CELERY_BROKER_URL, CELERY_RESULT_BACKEND).
  • Docker Prerequisite: Basic Docker knowledge is assumed for understanding the setup.

Simple Celery Task Example

This example demonstrates a basic Celery task that generates a random number with a delay.

  • worker.py: Defines a Celery application instance (random_number) and a task (random_number) decorated with @app.task. The task takes a max_value argument, waits for 5 seconds, and returns a random integer between 0 and max_value.
  • client.py: Submits the random_number task using the delay() method, retrieves the asynchronous result, and prints the result when it's ready. It uses celery.result.AsyncResult to check the task's status.
  • Asynchronous Result: The delay() method returns an AsyncResult object, which represents a promise of a future result.
  • Result States: The AsyncResult object has a state attribute that indicates the task's status (e.g., "PENDING", "SUCCESS").
  • Result Retrieval: The result.get() method blocks until the result is ready. To avoid blocking, check result.ready() before calling result.get().
  • Dockerfile: A basic Dockerfile for Python that installs Celery and Redis from requirements.txt.
  • requirements.txt: Contains the dependencies: celery and redis[redis].
  • Execution: The example is run using docker-compose up -d --build.

Open AI Integration Example

This example demonstrates integrating Celery with the Open AI API to extract movie information.

  • Dependencies: Requires the openai and pydantic packages, added to requirements.txt.
  • Movie Class: Defines a Movie class using Pydantic to structure the extracted movie information (title, release year, director, genre).
  • movie_info Task: A Celery task that takes a prompt (movie title) as input, sends it to the Open AI API, and returns a structured Movie object.
  • API Key: Requires an Open AI API key, stored in the .env file as OPENAI_API_KEY.
  • Client Interaction: The client submits multiple movie information requests using delay(), retrieves the asynchronous results, and prints the extracted movie information.
  • Concurrency: Celery utilizes concurrency to process multiple requests simultaneously.

Splitting Tasks with Groups and Chords

This example demonstrates how to split a complex task into multiple subtasks and combine their results using Celery groups and chords.

  • Movie Model Parts: The Movie model is split into three parts: MoviePartA (title, release year), MoviePartB (director, genre), and MoviePartC (list of actors).
  • Partial Tasks: Three Celery tasks are defined: movie_info_a, movie_info_b, and movie_info_c, each responsible for filling one part of the Movie model.
  • Combine Task: A combine_parts task merges the results of the partial tasks into a single dictionary.
  • Groups: A group is used to execute the partial tasks in parallel: group(movie_info_a.s(prompt), movie_info_b.s(prompt), movie_info_c.s(prompt)).
  • Chords: A chord is used to combine the results of the group with the combine_parts task: chord(header)(combine_parts.s()). The header is the group of partial tasks, and the combine_parts.s() is the callback task that receives the results of the group.
  • Result: The client retrieves the result of the chord, which is a single JSON object containing all the movie information.

Task Scheduling with Celery Beat

This example demonstrates how to schedule tasks to run periodically using Celery Beat.

  • Celery Beat Service: A new service is added to the docker-compose.yml file, using the celery -A worker beat command.
  • Volume Mapping: A volume is mapped to persist the Celery Beat schedule.
  • celery.py Modifications: The celery.py file is modified to include Celery Beat settings.
  • Task Scheduling: Tasks can be scheduled using either the schedule or crontab functions from celery.schedules.
  • schedule: Executes a task every n seconds (e.g., schedule=10.0).
  • crontab: Executes a task at a specific time and date, using cron-like syntax (e.g., crontab(hour=16, minute=48, tzinfo=timezone('Europe/Vienna'))).
  • Example Task: The example task writes the current timestamp to a file (/data/timestamp.txt) every 5 seconds or at a specific time.

Django Integration

This example demonstrates how to integrate Celery with a Django application.

  • Django Web Service: A new service is added to the docker-compose.yml file to run the Django application.
  • Django DB Backend: Redis is replaced with the Django database (SQLite in this case) as the Celery result backend.
  • celery.py: A celery.py file is created in the Django project to configure Celery. It imports Django settings, creates a Celery application instance, and enables task discovery.
  • __init__.py: The \_\_init\_\_.py file is modified to import the Celery app instance.
  • settings.py: The Django settings file is updated to include Celery-related settings, such as CELERY_BROKER_URL, CELERY_RESULT_BACKEND, CELERY_TIMEZONE, and CELERY_ENABLE_UTC. The CELERY_RESULT_BACKEND is set to django-db.
  • tasks.py: Celery tasks are defined in a tasks.py file within a Django app. The @shared_task decorator is used instead of @app.task.
  • Task Execution: Tasks are executed asynchronously from Django views.
  • Notes: The video provides a link to a GitHub repository with a complete Django Celery example. The presenter recommends searching for "NOTE" in the code to find the changes made for Celery integration.

Conclusion

The video provides a comprehensive guide to building professional task queues in Python using Celery, RabbitMQ, and Redis. It covers the fundamental concepts, Docker Compose setup, basic and advanced examples (Open AI integration, task splitting), task scheduling with Celery Beat, and Django integration. The video emphasizes practical application and provides a GitHub repository with complete code examples. The key takeaways are the importance of asynchronous task processing for building scalable applications, the flexibility of Celery in integrating with various message brokers and backends, and the power of Docker Compose for managing complex application deployments.

Chat with this Video

AI-Powered

Hi! I can answer questions about this video "Professional Task Queues in Python with Celery, RabbitMQ & Redis". What would you like to know?

Chat is based on the transcript of this video and may not be 100% accurate.

Related Videos

Ready to summarize another video?

Summarize YouTube Video