Send Dramatiq Tasks directly by Pushing JSON to Redis

Dramatiq uses JSON as the messaging format, and the sender or caller of the Job doesn’t need to have access to the Actor or know a lot about it. This has a considerable advantage. Once you set up the workers to execute the Job, all that is needed is to call the Job by posting a JSON message to the Broker. The JSON payload (from docs) looks like this

{
  "queue_name": "",     // The name of the queue the message is being pushed on
  "actor_name": "",         // The name of the actor that should handle this message
  "args": [],              // A list of positional arguments that are passed to the actor
  "kwargs": {},                // A dictionary of keyword arguments that are passed to the actor
  "options": {},               // Arbitrary options that are used by middleware. Leave this empty
  "message_id": "unique-id",   // A UUID4 value representing the message's unique id in the system
  "message_timestamp": 0,      // The UNIX timestamp in milliseconds representing when the message was first enqueued
}

This feature enables us to call the Job from any programming language that can communicate with the Broker and format a JSON payload. Let’s examine an example Actor from our previous blog post, called send_welcome_email. which takes two arguments, user_id and msg in that order. Now, let’s create a payload to call send_welcome_email

{
  "queue_name": "default",
  "actor_name": "send_welcome_email",
  "args": [
    1234,
    "Message for email send to redis directly"
  ],
  "kwargs": {},
  "options": {
    "redis_message_id": "f3bcdcb4-1e18-41fa-9190-bf34d77a8fbe"
  },
  "message_id": "425564d6-f762-455f-b8dd-69c9da1dd8fd",
  "message_timestamp": 1762113802478
}

In our example, we are using Redis as the Broker. The default queue is called default. We also need to send redis_message_id, as part of options, that is a different UUID4 value than message_id. You also need to make sure it’s unique. We are using a generated UUIDs here. Additionally, you can see that the args contain the arguments passed to the Actor. The remaining attributes are self-explanatory.

Now we need to post it to the Redis broker, to a default namespace called dramatiq. Dramatiq stores the message in a hash (HSET) and then pushes (RPUSH) redis_message_id of the message to a queue. The hash key is dramatiq:default.msgs and the filed is the redis_message_id. For the queue the key is dramatiq:default.

Then get to redict-cli. I am running redict using docker/podman, I will just shell into it

 podman  exec -it redict sh

Then call the command on db 1, as that is the redis DB I am using.

# PART 1 - Add message to hash
redict-cli -n 1 HSET dramatiq:default.msgs f3bcdcb4-1e18-41fa-9190-bf34d77a8fbe '{
  "queue_name": "default",
  "actor_name": "send_welcome_email",
  "args": [
    1234,
    "Message for email send to redis directly"
  ],
  "kwargs": {},
  "options": {
    "redis_message_id": "f3bcdcb4-1e18-41fa-9190-bf34d77a8fbe"
  },
  "message_id": "425564d6-f762-455f-b8dd-69c9da1dd8fd",
  "message_timestamp": 1762113802478
}'

# PART 2 - Push the id to queue
redict-cli -n 1 RPUSH dramatiq:default f3bcdcb4-1e18-41fa-9190-bf34d77a8fbe

That’s it. That should run the Job on one of the workers. Below, I have written a simple Python script that posts a similar message directly to Redis.

# /// script
# requires-python = ">=3.10"
# dependencies = [
#   "redis"
# ]
# ///
"""
Usage:
  uv run send_tasks_directly.py
"""
import redis, json, uuid, time

# connect to redis
r = redis.Redis(host='localhost', port=6379, db=1)
namespace = "dramatiq"
queue_name = "default"
redis_message_id = str(uuid.uuid4())
msg = {
  "queue_name": queue_name,
  "actor_name": "send_welcome_email",
  "args": [1234, "Message for email send to redis directly"],
  "kwargs": {},
  "options": {"redis_message_id": redis_message_id},
  "message_id": str(uuid.uuid4()),
  "message_timestamp": int(time.time() * 1000)
}
payload = json.dumps(msg, separators=(",", ":"))

r.hset(f"{namespace}:{queue_name}.msgs", redis_message_id, payload)
r.rpush(f"{namespace}:{queue_name}", redis_message_id)

I can do the same from Rust (run using rust-script) as well.

Post to Dramatiq from Rust Lang
//! ```cargo
//! [dependencies]
//! redis = "0.25"
//! serde = { version = "1", features = ["derive"] }
//! serde_json = "1"
//! uuid = { version = "1", features = ["v4"] }
//! chrono = "0.4"
//! ```

// Run using
// rust-script send_tasks_directly.rs

use chrono::Utc;
use redis::Commands;
use redis::ConnectionLike;
use serde::Serialize;
use uuid::Uuid;

#[derive(Serialize)]
struct Message {
    queue_name: String,
    actor_name: String,
    args: Vec<serde_json::Value>,
    kwargs: serde_json::Value,
    options: serde_json::Value,
    message_id: String,
    message_timestamp: i64,
}
fn main() -> redis::RedisResult<()> {
    let client = redis::Client::open("redis://127.0.0.1:6379/1")?;
    let mut con = client.get_connection()?;

    let namespace = "dramatiq";
    let queue_name = "default";
    let redis_message_id = Uuid::new_v4().to_string();
    println!("redis_message_id '{}'", redis_message_id);

    let msg = Message {
        queue_name: queue_name.into(),
        actor_name: "send_welcome_email".into(),
        args: vec![
            serde_json::json!(1234),
            serde_json::json!("Message for email send to redis directly from Rust"),
        ],
        kwargs: serde_json::json!({}),
        options: serde_json::json!({ "redis_message_id": redis_message_id }),
        message_id: Uuid::new_v4().to_string(),
        message_timestamp: Utc::now().timestamp_millis(),
    };

    let payload = serde_json::to_string_pretty(&msg).unwrap();
    println!("payload '{}'", payload);

    let hash_key = format!("{}:{}", namespace, queue_name.to_string() + ".msgs");
    let list_key = format!("{}:{}", namespace, queue_name);

    con.hset::<_, _, _, ()>(&hash_key, &redis_message_id, payload)?;
    println!("Sent message to hash_key '{}'", hash_key);

    let stored_value: String = con.hget(&hash_key, &redis_message_id)?;
    println!("🔍 Retrieved from Redis:\n{}", stored_value);

    con.req_packed_command(&redis::cmd("PING").get_packed_command())?;
    con.rpush::<_, _, ()>(&list_key, &redis_message_id)?;

    println!("Sent message to Redis queue list_key '{}'", list_key);
    Ok(())
}

Isn’t that nifty?


Question: Why redis_message_id when there is already message_id at the Dramatiq level?

We are manually adding redis_message_id here because we are performing the same work that RedisBroker does internally. If we use the standard APIs, then it’s done internally automatically. Internally, redis_message_id is generated as a unique ID every time a message is retried, whereas the message_id remains the same. So if a message is tried only once, there would be a single message_id and redis_message_id. But if it’s retried five times, then there will be a single message_id and five different redis_message_id.


You can read this blog using RSS Feed. But if you are the person who loves getting emails, then you can join my readers by signing up.

Join 2,259 other subscribers

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.