How to create and maintain task metadata with Celery's MongoDB backend.
Celery is the de-facto standard framework for managing task queues in the Python ecosystem. While its default MongoDB backend works well for most use cases, it poses limitations when you need to persist custom task metadata in a database throughout a task’s lifecycle.
This metadata might include things like original filenames for uploaded files, user-specific processing parameters, or other task-specific data needed to facilitate database queries of interest.
In particular, when Celery workers start or complete a task, they overwrite the task’s corresponding document in MongoDB. As a result, any custom metadata added to these task documents in advance will be erased.
This blog post demonstrates how to extend Celery’s MongoDB backend to preserve this metadata.
In this post, let’s assume we’re working with a Celery app that has these settings:
By default, Celery workers will create a document for a corresponding task in the MongoDB backend only after completing the task. This document has the field result
, which contains the output of the completed task, and a few other fields created by Celery.
In my use case, I wanted to achieve the following goals:
status
field in the task’s document to STARTED
.The complete solution involves three key components:
Let’s dive into each component and understand how they work together.
I’ve created a repo showcasing a demo of how this works. Check it out here!
The first step is to create an initial document in the database backend with the custom metadata. Celery provides a decorator called @before_task_publish.connect
that makes this easy to implement:
@before_task_publish.connect
def on_task_publish(headers=None, body=None, exchange=None, routing_key=None, **kwargs):
task_id = headers.get("id")
# Body structure is (args, kwargs, embedding)
task_args, task_kwargs, _ = body
meta = {
"upload_date": task_kwargs.get("upload_date"),
"group_id": task_kwargs.get("group_id"),
"filename": task_kwargs.get("filename"),
}
celery_app.backend.collection.insert_one(
{
"_id": task_id,
"metadata": meta,
"status": "QUEUED",
"result": None,
"traceback": None,
"children": [],
}
)
Here, we use the insert_one
method to insert a document in the database with fields specified by the input dictionary.
For consistency, the document’s ID needs to be the task ID. This ID is how the celery worker will know which document to update after completing a task. We can obtain this ID from the headers
argument.
Achieving this step only requires setting task_tracking=True
in the Celery app like so:
celery_app = Celery(task_track_started=True)
When this setting is enabled, Celery will automatically update the task’s document with the status STARTED
as soon as a worker begins processing it.
See here for more details about this setting.
By default, when a celery worker completes a task it will overwrite the tasks’s corresponding document in the MongoDB backend. As a consequence, the custom metadata added in the before_task_publish
signal function will be deleted. Let’s examine how this happens.
When a Celery app is created, it instantiates an object from a backend class that handles logic for how workers should interface with the database backend.
Because we’re using MongoDB as the backend, Celery will select the class MongoBackend
defined in the celery.backends.mongodb
module. This class has a method called _store_result
that is called when the task’s document needs to be updated or created. In the default implementation, reproduced below, the task’s document will be overwritten whenever this function is called:
def _store_result(self, task_id, result, state,
traceback=None, request=None, **kwargs):
"""Store return value and state of an executed task."""
meta = self._get_result_meta(result=self.encode(result), state=state,
traceback=traceback, request=request,
format_date=False)
# Add the _id for mongodb
meta['_id'] = task_id
try:
# this line below replaces the stored document
self.collection.replace_one({'_id': task_id}, meta, upsert=True)
except InvalidDocument as exc:
raise EncodeError(exc)
return result
To my knowledge, the easiest way to prevent this from happening is to create a custom Celery backend to override this method. This requires creating a new class that inherits from the original MongoBackend
class:
# custom_backend.py
from celery.backends.mongodb import MongoBackend
from kombu.exceptions import EncodeError
from pymongo.errors import InvalidDocument
class CustomMongoBackend(MongoBackend):
def _store_result(
self, task_id, result, state, traceback=None, request=None, **kwargs
):
# contains task data: result, date_done, traceback, etc..
task_info = self._get_result_meta(
result=self.encode(result),
state=state,
traceback=traceback,
request=request,
format_date=False,
)
task_info["_id"] = task_id
try:
# change to update the document instead of replacing it
self.collection.update_one(
{"_id": task_id}, {"$set": task_info}, upsert=True
)
except InvalidDocument as exc:
raise EncodeError(exc)
return result
The logic in the new _store_result
is almost the same as the original except for a minor change made to preserve the task document’s custom metadata. It now uses update_one
instead of replace_one
to modify the specified fields while preserving others.
To use this class, you’ll have to include it in the celery config like so:
# tasks.py
import os
from celery import Celery
MONGODB_HOST = "mongodb://localhost:27017/celery_result_db"
CELERY_BROKER_URL = "redis://redis:6379/0"
celery_app = Celery(
"tasks",
broker=CELERY_BROKER_URL,
backend="app.custom_backend.CustomBackend",
task_track_started=True,
mongodb_backend_settings={
"host": MONGODB_HOST,
"database": "tasks",
"taskmeta_collection": "task_results",
},
)
This assumes the following project structure:
app/
├── __init__.py
├── main.py # FastAPI app entry point
├── tasks.py # Celery app configuration and task definitions
└── custom_backend.py # Custom MongoDB backend implementation
In this post, we explored how to extend Celery’s MongoDB backend to preserve custom task metadata. By creating task documents in advance via a signal handler and modifying how the backend stores results, we prevented metadata loss while maintaining Celery’s core functionality.