Persisting Custom Task Metadata in MongoDB with Celery

How to create and maintain task metadata with Celery's MongoDB backend.

Headshot of Bryan Anthonio
Bryan Anthonio
Steep limestone mountains with snow patches rise above a turquoise alpine lake where small boats drift near forested shores.
Lago di Braies, Dolomites

Celery is the de-facto standard framework for managing task queues in the Python ecosystem. While its default MongoDB backend works well for most use cases, it poses limitations when you need to persist custom task metadata in a database throughout a task’s lifecycle.

This metadata might include things like original filenames for uploaded files, user-specific processing parameters, or other task-specific data needed to facilitate database queries of interest.

In particular, when Celery workers start or complete a task, they overwrite the task’s corresponding document in MongoDB. As a result, any custom metadata added to these task documents in advance will be erased.

This blog post demonstrates how to extend Celery’s MongoDB backend to preserve this metadata.

Project Setup

In this post, let’s assume we’re working with a Celery app that has these settings:

  • Redis as the message broker for distributing tasks
  • MongoDB as the result backend for storing task states and outputs

Problem

By default, Celery workers will create a document for a corresponding task in the MongoDB backend only after completing the task. This document has the field result, which contains the output of the completed task, and a few other fields created by Celery.

In my use case, I wanted to achieve the following goals:

  • Before a task is dispatched to the Redis queue, the app should create a document, assigned to the task, with custom metadata in the MongoDB backend.
  • When a Celery worker starts working on a task, update the status field in the task’s document to STARTED.
  • After a worker completes a task, the task’s document should be updated to contain the outputs of the completed task.

Solution

The complete solution involves three key components:

  • Implementing a signal handler to create initial task documents.
  • Setting a parameter in Celery’s configuration to track when tasks start.
  • Creating a custom MongoDB backend to preserve metadata.

Let’s dive into each component and understand how they work together.

Github Repo

I’ve created a repo showcasing a demo of how this works. Check it out here!

Create an Initial Mongodb Document with Custom Metadata

The first step is to create an initial document in the database backend with the custom metadata. Celery provides a decorator called @before_task_publish.connect that makes this easy to implement:

@before_task_publish.connect
def on_task_publish(headers=None, body=None, exchange=None, routing_key=None, **kwargs):
    task_id = headers.get("id")

    # Body structure is (args, kwargs, embedding)
    task_args, task_kwargs, _ = body

    meta = {
        "upload_date": task_kwargs.get("upload_date"),
        "group_id": task_kwargs.get("group_id"),
        "filename": task_kwargs.get("filename"),
    }

    celery_app.backend.collection.insert_one(
        {
            "_id": task_id,
            "metadata": meta,
            "status": "QUEUED",
            "result": None,
            "traceback": None,
            "children": [],
        }
    )

Here, we use the insert_one method to insert a document in the database with fields specified by the input dictionary.

For consistency, the document’s ID needs to be the task ID. This ID is how the celery worker will know which document to update after completing a task. We can obtain this ID from the headers argument.

Updating Task Status When a Worker Starts a Task

Achieving this step only requires setting task_tracking=True in the Celery app like so:

celery_app = Celery(task_track_started=True)

When this setting is enabled, Celery will automatically update the task’s document with the status STARTED as soon as a worker begins processing it.

See here for more details about this setting.

Prevent Custom Metadata from Being Overwritten

By default, when a celery worker completes a task it will overwrite the tasks’s corresponding document in the MongoDB backend. As a consequence, the custom metadata added in the before_task_publish signal function will be deleted. Let’s examine how this happens.

When a Celery app is created, it instantiates an object from a backend class that handles logic for how workers should interface with the database backend.

Because we’re using MongoDB as the backend, Celery will select the class MongoBackend defined in the celery.backends.mongodb module. This class has a method called _store_result that is called when the task’s document needs to be updated or created. In the default implementation, reproduced below, the task’s document will be overwritten whenever this function is called:

def _store_result(self, task_id, result, state,
                    traceback=None, request=None, **kwargs):
    """Store return value and state of an executed task."""
    meta = self._get_result_meta(result=self.encode(result), state=state,
                                    traceback=traceback, request=request,
                                    format_date=False)
    # Add the _id for mongodb
    meta['_id'] = task_id

    try:
        # this line below replaces the stored document
        self.collection.replace_one({'_id': task_id}, meta, upsert=True)
    except InvalidDocument as exc:
        raise EncodeError(exc)

    return result

To my knowledge, the easiest way to prevent this from happening is to create a custom Celery backend to override this method. This requires creating a new class that inherits from the original MongoBackend class:

# custom_backend.py
from celery.backends.mongodb import MongoBackend
from kombu.exceptions import EncodeError
from pymongo.errors import InvalidDocument

class CustomMongoBackend(MongoBackend):

    def _store_result(
        self, task_id, result, state, traceback=None, request=None, **kwargs
    ):

        # contains task data: result, date_done, traceback, etc..
        task_info = self._get_result_meta(
            result=self.encode(result),
            state=state,
            traceback=traceback,
            request=request,
            format_date=False,
        )

        task_info["_id"] = task_id

        try:
            # change to update the document instead of replacing it
            self.collection.update_one(
                {"_id": task_id}, {"$set": task_info}, upsert=True
            )
        except InvalidDocument as exc:
            raise EncodeError(exc)

        return result

The logic in the new _store_result is almost the same as the original except for a minor change made to preserve the task document’s custom metadata. It now uses update_one instead of replace_one to modify the specified fields while preserving others.

To use this class, you’ll have to include it in the celery config like so:

# tasks.py
import os
from celery import Celery

MONGODB_HOST = "mongodb://localhost:27017/celery_result_db"
CELERY_BROKER_URL = "redis://redis:6379/0"

celery_app = Celery(
    "tasks",
    broker=CELERY_BROKER_URL,
    backend="app.custom_backend.CustomBackend",
    task_track_started=True,
    mongodb_backend_settings={
        "host": MONGODB_HOST,
        "database": "tasks",
        "taskmeta_collection": "task_results",
    },
)

This assumes the following project structure:

app/
├── __init__.py
├── main.py # FastAPI app entry point
├── tasks.py # Celery app configuration and task definitions
└── custom_backend.py # Custom MongoDB backend implementation

Conclusion

In this post, we explored how to extend Celery’s MongoDB backend to preserve custom task metadata. By creating task documents in advance via a signal handler and modifying how the backend stores results, we prevented metadata loss while maintaining Celery’s core functionality.