Simple and Easy Background Tasks with Dramatiq

For a long time, I have been using Celery to do background tasks. It does work, but has limitations. Limitations become apparent when you teach it to someone or when someone new tries to use it. So I am always looking for simple (not trivial) and easier ways to achieve this.

I have been watching the Dramatiq for a while, and now I think it’s good enough to use. The other option was RQ, but I found it very tightly bound to Redis. Dramatiq can work with Redis, but can also use other brokers.

Dramatiq: background tasks
Dramatiq: background tasks

Dramatiq is a background task processing library for Python with a focus on simplicity, reliability, and performance.

The things I like about Dramatiq are

  1. It’s simple and easy to understand
  2. Works on all operating systems
  3. Easy to use
  4. Works with many brokers and includes a stub broker for unit testing.
  5. Integrates well with Flask, Django, and other frameworks. It can be used independently as well.
  6. It has all the features, such as scheduled tasks, delayed execution, task prioritization, callbacks, rate limiting, grouping, and pipelines.
  7. The message format is JSON. And the tasks can be queued from other languages or the Redis CLI by pushing a JSON-encoded dictionary to the queue. This is a handy feature. I will write a separate blog post. ⚠️ On the other side, any message that can’t be encoded as JSON with standard json package will fail.

Before we start, let’s get some definitions clear. So it’s easy to

  1. Actor – A Python function that Dramatiq can run asynchronously. You can make a function an actor by annotating it with @dramatiq.actor. You can queue it (or call it asynchronously) by calling .send() on it.
  2. Broker – The message transport layer or a queue. Like Redis, RabbitMQ, etc
  3. Worker – It’s a process that runs actors. It gets the jobs from the broker and calls a specific actor.

Now, let’s have a function that sums two numbers and returns the value. Assume this was a CPU-heavy function that needs to be called asynchronously and executed by workers. We also have an IO-heavy emailer that doesn’t return anything.

def count_words(text):
    count = len(text.split())
    print(f"Actor: There are {count} words.")
    return {"count": count, "text": text}

@dramatiq.actor
def send_welcome_email(user_id):
    print(f"Send welcome email to {user_id}")

Now convert them into actors. It’s straightforward: I am just adding the @dramatiq.actor annotation and, when the results need to be returned to the caller, passing store_results=True.

# actors.py
import dramatiq
import brokers

@dramatiq.actor(store_results=True)
def count_words(text):
    count = len(text.split())
    print(f"Actor: There are {count} words.")
    return {"count": count, "text": text}

@dramatiq.actor
def send_welcome_email(user_id, msg):
    print(f"Send welcome email to {user_id} {msg}")

Also, you can see I have imported brokers. Let’s look into that brokers Python file/module

# broker.py
import os
import dramatiq
from dramatiq.brokers.stub import StubBroker
from dramatiq.results.backends.stub import StubBackend
from dramatiq.brokers.redis import RedisBroker
from dramatiq.results.backends import RedisBackend
from dramatiq.results import Results

ENV = os.getenv("ENV", "PROD")
broker = None
backend = None
if ENV == "PROD":
    print("Redis broker and backend")
    backend = RedisBackend()
    broker = RedisBroker(url="redis://localhost:6379/0")
elif ENV == "UNIT_TESTING":
    # no external service,
    # messages stay in-memory,
    # so it has to be in the same process
    # good for testing
    print("Stubs broker and backend")
    backend = StubBackend()
    broker = StubBroker()
broker.add_middleware(Results(backend=backend))
dramatiq.set_broker(broker)

It defines the broker for us. In this case, I am using Redis or Stub based on the ENV, and I am also adding middle-ware to store the results. We are using the same Redis as the backend to store the results. This same broker and backend will be used by the caller of the functions.

# pull Redict and run it using podman
podman run --name redict -d -p 6379:6379 registry.redict.io/redict

# now install dramatiq as a cli. Once done you will be able to call dramatiq
uv tool -U 'dramatiq[redis]'

# start workers using dramatiq and point it to actors.py as a module
dramatiq --verbose --threads 1 --processes 4 actors

This should start several workers based on the parameters you passed, connect to the broker, and wait for the tasks to execute.

In the first set of calls, count_words, we send the task, wait, and then ask for the results. In the second set, send_welcome_email is a fire-and-forget call.

# /// script
# requires-python = ">=3.10"
# dependencies = [
#   "dramatiq",
#   "redis"
# ]
# ///
"""
Usage:
  uv run send_tasks.py
"""
import time
import dramatiq
import brokers
# Import or reference the actor
# Assuming an actor named actors.count_words exists
from actors import count_words, send_welcome_email

# send job
messages = []
for x in ["Dramatiq works with many backends", "Welcome world","My name is Thej", "is this true ?"]:
    print("sending")
    print(f"Current time in nanoseconds: {time.time_ns()} ns")
    msg = count_words.send(x)
    messages.append(msg)

for msg in messages:
    print(msg)
    result = msg.get_result(backend=brokers.backend, block=True, timeout=3000)
    print(f"Current time in nanoseconds: {time.time_ns()} ns")
    print(f"Word count: {result}")


for x in range(1, 10):
    print("sending")
    send_welcome_email.send(user_id=x, msg="Welcome to app")

You can run it using uv

  uv run send_tasks.py

You can also schedule tasks, delay their execution, prioritize tasks, create groups, pipelines etc. Since this is an introduction post, I am stopping it here.

What do you think of Dramatiq? Is it simple and easy for you to pick up and use?


You can read this blog using RSS Feed. But if you are the person who loves getting emails, then you can join my readers by signing up.

Join 2,259 other subscribers

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.