Celery is the de-facto standard framework for managing task queues in the Python ecosystem. While its default MongoDB backend works well for most use cases, it poses limitations when you need to persist custom task metadata in a database throughout a task’s lifecycle.
This metadata might include things like original filenames for uploaded files, user-specific processing parameters, or other task-specific data needed to facilitate database queries of interest.
In particular, when Celery workers start or complete a task, they overwrite the task’s corresponding document in MongoDB. As a result, any custom metadata added to these task documents in advance will be erased.
This blog post demonstrates how to extend Celery’s MongoDB backend to preserve this metadata.
Project Setup
In this post, let’s assume we’re working with a Celery app that has these settings:
- Redis as the message broker for distributing tasks
- MongoDB as the result backend for storing task states and outputs
Problem
By default, Celery workers will create a document for a corresponding task in the MongoDB backend only after completing the task. This document has the field result
, which contains the output of the completed task, and a few other fields created by Celery.
In my use case, I wanted to achieve the following goals:
- Before a task is dispatched to the Redis queue, the app should create a document, assigned to the task, with custom metadata in the MongoDB backend.
- When a Celery worker starts working on a task, update the
status
field in the task’s document toSTARTED
. - After a worker completes a task, the task’s document should be updated to contain the outputs of the completed task.
Solution
The complete solution involves three key components:
- Implementing a signal handler to create initial task documents.
- Setting a parameter in Celery’s configuration to track when tasks start.
- Creating a custom MongoDB backend to preserve metadata.
Let’s dive into each component and understand how they work together.
Github Repo
I’ve created a repo showcasing a demo of how this works. Check it out here!
Create an Initial Mongodb Document with Custom Metadata
The first step is to create an initial document in the database backend with the custom metadata. Celery provides a decorator called @before_task_publish.connect
that makes this easy to implement:
@before_task_publish.connect
def on_task_publish(headers=None, body=None, exchange=None, routing_key=None, **kwargs):
task_id = headers.get("id")
# Body structure is (args, kwargs, embedding)
task_args, task_kwargs, _ = body
meta = {
"upload_date": task_kwargs.get("upload_date"),
"group_id": task_kwargs.get("group_id"),
"filename": task_kwargs.get("filename"),
}
celery_app.backend.collection.insert_one(
{
"_id": task_id,
"metadata": meta,
"status": "QUEUED",
"result": None,
"traceback": None,
"children": [],
}
)
Here, we use the insert_one
method to insert a document in the database with fields specified by the input dictionary.
For consistency, the document’s ID needs to be the task ID. This ID is how the celery worker will know which document to update after completing a task. We can obtain this ID from the headers
argument.
Updating Task Status When a Worker Starts a Task
Achieving this step only requires setting task_tracking=True
in the Celery app like so:
celery_app = Celery(task_track_started=True)
When this setting is enabled, Celery will automatically update the task’s document with the status STARTED
as soon as a worker begins processing it.
See here for more details about this setting.
Prevent Custom Metadata from Being Overwritten
By default, when a celery worker completes a task it will overwrite the tasks’s corresponding document in the MongoDB backend. As a consequence, the custom metadata added in the before_task_publish
signal function will be deleted. Let’s examine how this happens.
When a Celery app is created, it instantiates an object from a backend class that handles logic for how workers should interface with the database backend.
Because we’re using MongoDB as the backend, Celery will select the class MongoBackend
defined in the celery.backends.mongodb
module. This class has a method called _store_result
that is called when the task’s document needs to be updated or created. In the default implementation, reproduced below, the task’s document will be overwritten whenever this function is called:
def _store_result(self, task_id, result, state,
traceback=None, request=None, **kwargs):
"""Store return value and state of an executed task."""
meta = self._get_result_meta(result=self.encode(result), state=state,
traceback=traceback, request=request,
format_date=False)
# Add the _id for mongodb
meta['_id'] = task_id
try:
# this line below replaces the stored document
self.collection.replace_one({'_id': task_id}, meta, upsert=True)
except InvalidDocument as exc:
raise EncodeError(exc)
return result
To my knowledge, the easiest way to prevent this from happening is to create a custom Celery backend to override this method. This requires creating a new class that inherits from the original MongoBackend
class:
# custom_backend.py
from celery.backends.mongodb import MongoBackend
from kombu.exceptions import EncodeError
from pymongo.errors import InvalidDocument
class CustomMongoBackend(MongoBackend):
def _store_result(
self, task_id, result, state, traceback=None, request=None, **kwargs
):
# contains task data: result, date_done, traceback, etc..
task_info = self._get_result_meta(
result=self.encode(result),
state=state,
traceback=traceback,
request=request,
format_date=False,
)
task_info["_id"] = task_id
try:
# change to update the document instead of replacing it
self.collection.update_one(
{"_id": task_id}, {"$set": task_info}, upsert=True
)
except InvalidDocument as exc:
raise EncodeError(exc)
return result
The logic in the new _store_result
is almost the same as the original except for a minor change made to preserve the task document’s custom metadata. It now uses update_one
instead of replace_one
to modify the specified fields while preserving others.
To use this class, you’ll have to include it in the celery config like so:
# tasks.py
import os
from celery import Celery
MONGODB_HOST = "mongodb://localhost:27017/celery_result_db"
CELERY_BROKER_URL = "redis://redis:6379/0"
celery_app = Celery(
"tasks",
broker=CELERY_BROKER_URL,
backend="app.custom_backend.CustomBackend",
task_track_started=True,
mongodb_backend_settings={
"host": MONGODB_HOST,
"database": "tasks",
"taskmeta_collection": "task_results",
},
)
This assumes the following project structure:
app/
├── __init__.py
├── main.py # FastAPI app entry point
├── tasks.py # Celery app configuration and task definitions
└── custom_backend.py # Custom MongoDB backend implementation
Conclusion
In this post, we explored how to extend Celery’s MongoDB backend to preserve custom task metadata. By creating task documents in advance via a signal handler and modifying how the backend stores results, we prevented metadata loss while maintaining Celery’s core functionality.