Skip to content

osl-pocs/poc-celery

Repository files navigation

Here's the improved README with an explanation of the provided test cases:


Celery with Subcollectors

Overview

This proof of concept (PoC) demonstrates a distributed task queue implementation using Celery with RabbitMQ as the message broker and Redis as the result backend. The project simulates a data collection and processing workflow using a system of subcollectors. Each subcollector generates a random list of integers for a given topic, and the results are aggregated, cleaned, and processed.

Purpose

The primary goal of this PoC is to test and demonstrate the capability of Celery to manage complex workflows involving multiple asynchronous tasks. It showcases how to:

  • Dispatch tasks to subcollectors in parallel.
  • Aggregate results from all subcollectors.
  • Perform cleanup and processing on aggregated data.

Project Structure

  • celery_app.py: Initializes the Celery app and includes the configuration for the Celery connection to RabbitMQ as the broker and Redis as the result backend.
  • tasks.py: Contains the definition of all Celery tasks, including subcollectors, data aggregation, cleanup, and processing tasks.
  • tests/: Directory containing test files to validate the functionality of the Celery tasks.
    • test_tasks.py: Implements tests for each Celery task using pytest.
  • conda/base.yaml: Conda environment file listing all necessary Python dependencies for the project.

Workflow

The Celery workflow in this project orchestrates a series of tasks to simulate a data collection and processing pipeline using subcollector tasks. The workflow leverages Celery's capabilities for asynchronous task execution, task grouping, and result aggregation. Below is a step-by-step breakdown of the workflow:

1. Task Initiation with collector_request

  • Functionality: The workflow begins with the collector_request task, which initiates the data collection process based on a specified topic.
  • Key Methods:
    • generate_collector_request(topic: str) -> str: Generates a unique request ID for the collection request.
    • group(): Groups multiple subcollector tasks (collector_1, collector_2, collector_3) to be executed in parallel.
    • chord(group())(callback): A chord is a Celery primitive that takes a group of tasks and a callback task. It ensures that the callback task (collector_gathering) is executed only after all tasks in the group have completed.

2. Parallel Execution of Subcollector Tasks

  • Subcollectors: collector_1, collector_2, collector_3
  • Functionality: Each subcollector task generates a random list of integers simulating the collection of data for the given topic.
  • Execution: These tasks are executed in parallel as part of the group passed to the chord. This parallel execution is enabled by the .apply_async() method, ensuring that each task can run concurrently without waiting for the others.

3. Aggregation and Processing

  • Callback Task: collector_gathering
    • Functionality: Aggregates the results from all subcollector tasks. This task acts as the callback for the chord, which means it automatically receives the aggregated results of the group as its input.
    • Method Calls:
      • cleanup.delay(combined_result, request_id): After aggregation, the cleanup task is called asynchronously with the .delay() method, passing the combined results for further processing.
  • Cleanup Task: cleanup
    • Functionality: Performs preliminary processing or cleanup on the aggregated data.
    • Method Calls:
      • process.delay(data, request_id): Calls the process task asynchronously for final processing.
  • Process Task: process
    • Functionality: Conducts the final processing of the data. In this example, it counts the total items and prints the result.
    • Returns: A dictionary with the request_id and the item_count, providing a simple summary of the processing outcome.

Summary

This workflow demonstrates the power of Celery for handling complex asynchronous task pipelines. It showcases task parallelization (group), conditional task execution based on the completion of a group of tasks (chord), and chaining further processing steps (delay). Each task is designed to perform a specific role within the data collection and processing pipeline, from initiating collection requests to final data processing.

Setup Instructions

Prerequisites

  • Docker
  • Conda or Mamba

Environment Setup

  1. Clone the repository and navigate to the project directory.

  2. Create a new Conda environment using the base.yaml file:

    mamba env create -f conda/base.yaml

    Or, if you're using Conda:

    mamba env create -f conda/base.yaml
  3. Activate the new environment:

    mamba activate poc-celery

RabbitMQ and Redis Setup Using Docker

Run RabbitMQ and Redis containers with the following commands:

docker run --name rabbitmq -d -p 5672:5672 rabbitmq
docker run --name redis -d -p 6379:6379 redis

or

bash scripts/setup.sh

These commands start RabbitMQ and Redis servers in Docker containers named rabbitmq and redis, respectively.

Monitoring Celery Tasks with Flower

Using the Startup Script

To facilitate an efficient development and monitoring environment, we've prepared a bash script start_celery_and_flower.sh that simultaneously starts a Celery worker and Flower, a real-time monitoring tool. This script fetches the RabbitMQ and Redis container IP addresses dynamically, ensuring that both Celery and Flower are correctly configured to communicate with RabbitMQ as the broker and Redis as the result backend.

To start both the Celery worker and Flower, navigate to your project's root directory and run:

bash scripts/setup.sh

This command executes the script that:

  1. Starts a Celery Worker: Launches a Celery worker instance using poc_celery.poc1.celery_app as the application module. This worker listens for tasks dispatched to the queues and executes them as they arrive.

  2. Launches Flower: Initiates Flower on the default port (5555), allowing you to access a web-based user interface to monitor and manage the Celery worker and tasks. Flower provides insights into task progress, worker status, task history, and much more, making it an invaluable tool for debugging and optimizing your task workflows.

Executing the Tests with Pytest

With your environment prepared and the Celery worker launched, you can now run the tests using pytest. Navigate to the root directory of your project and execute:

pytest -vvv tests/

Explanation of Provided Test Cases

The test cases aim to validate the behavior of various Celery tasks within the project, including both asynchronous tasks (tasks_async.py) and collectors tasks (tasks_collectors.py). Here's an overview of each test case:

Asynchronous Tasks (tasks_async.py)

  1. test_clean_data: This test verifies that the clean_data task behaves correctly when called with a file path. It uses mock_open to simulate file I/O operations and ensures that the open function is not called, indicating that the task does not perform any file operations directly.

  2. test_create_project: This test validates the behavior of the create_project task by mocking file I/O operations and asserting that the task correctly writes data to the specified file path. It generates sample data rows and calls the task with each row, mocking the open function to capture the contents written to the file.

  3. test_create_project_stress: This asynchronous test case simulates a stress scenario by executing a large number of create_project tasks concurrently. It repeats a set of sample data rows multiple times to reach the desired number of task calls and then waits for a short duration to allow all tasks to complete. The test asserts True at the end to indicate successful execution.

Collector Tasks (tasks_collectors.py)

  1. test_generate_collector_request: Validates that generate_collector_request produces a valid UUID string, confirming the generation of unique request identifiers.

  2. test_collector_request_triggers_sub_collectors: Verifies the orchestration within collector_request to trigger subcollector tasks. It mocks the group method and the signature of collector_gathering to intercept and assert their usage without actual task execution.

  • Interpreting Results: Pytest will provide a summary indicating which tests passed, failed, or were skipped. For any failures, detailed error information and traceback will be provided to aid in debugging.

These test cases ensure that the Celery tasks perform as expected under different scenarios, including normal operation, stress testing, and error conditions. They help verify the correctness and robustness of the task implementations and provide confidence in the overall reliability of the Celery workflow.

Best Practices in Asynchronous Task Management with Celery and Flower

Embracing Celery's Asynchronous Capabilities

In this project, we harness Celery's powerful asynchronous task execution to bolster performance and scalability. Here are some key best practices we've applied:

  • Decoupling Components: By employing Celery, we effectively decouple the

task execution from the main application flow. This separation allows for more scalable and maintainable code architecture.

  • Optimizing Task Execution: Utilizing Celery's group, chain, and chord primitives, we've structured complex task workflows that maximize concurrency and minimize processing time. This structured approach ensures tasks are executed in an optimal sequence and only after all required dependencies have been satisfied.

  • Robust Error Handling: We've implemented strategic error handling within our tasks to ensure resilience. By catching exceptions and using retry mechanisms where applicable, we maintain the integrity of our task pipeline, even in the face of transient failures.

Leveraging Flower for Enhanced Task Monitoring and Management

Flower is a critical tool for our project, providing comprehensive monitoring and management capabilities for our Celery workers and tasks. Here's why Flower is indispensable:

  • Visibility: Flower's real-time monitoring gives us instant visibility into our task queues, worker status, and the progress of task execution. This level of insight is invaluable for quickly identifying bottlenecks or failures in the task pipeline.

  • Direct Task Management: Through Flower's web interface, we gain direct control over task execution. The ability to cancel tasks, restart workers, and adjust task priorities on-the-fly empowers us to maintain optimal task flow under varying load conditions.

  • Insights for Debugging and Optimization: Flower's detailed execution statistics for each task guide our debugging and optimization efforts. Analyzing task durations, success rates, and retry counts helps us pinpoint inefficiencies and improve the overall performance of our task workflows.

  • Remote Accessibility: The web-based interface of Flower means we can monitor and manage our task environment from any location, facilitating remote debugging and administration without the need for direct server access.

Additional Practices

  • Environment Isolation: Utilizing virtual environments and containerization (e.g., Docker) for our Celery and RabbitMQ/Redis setup ensures consistency across development, testing, and production environments, reducing the "it works on my machine" syndrome.

  • Automated Testing: Our project includes a suite of automated tests to validate task logic, asynchronous behavior, and failure handling scenarios. These tests, integrated into a CI/CD pipeline, ensure code quality and prevent regressions.

  • Documentation and Logging: Comprehensive documentation, including clear docstrings and READMEs, alongside detailed logging within tasks, facilitates easier maintenance and accelerates the onboarding process for new developers.

Conclusion

This PoC demonstrates a scalable and efficient way to manage distributed tasks using Celery with RabbitMQ as a message broker and Redis as the result backend. It exemplifies a practical application of Celery's capabilities in handling complex workflows with multiple asynchronous tasks, showcasing a system that can be adapted for various data processing and aggregati

About

No description, website, or topics provided.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Sponsor this project

 

Packages

No packages published