How to use DigDag Restful APIs to Trigger a Job

DigDag has been a tool of choice to run data workflows and some batch jobs at First Walkin Technologies. I liked DigDag from day one. Just like any Unix tool, It does a limited set of things but it does them quite well. 

One of the things I really missed is to trigger a job remotely. Of course it has integration with S3 where it can start a job when a file appears inside a bucket. But I wanted something more, that's when I started exploring the Restful APIs of DigDag.

As of now documentation on the APIs is sparse but if you start the DigDag server with enable-swagger flag, It will expose the Swagger UI and that should make using APIs a little easy. Other than that don't expect how-tos or example runs etc

digdag-current version.jar server --memory --enable-swagger

But here, in this how-to, I am going to take you through an example Workflow/Task triggered through API call.

A Simple Project and Workflow

Let us start by creating a very simple workflow. You can create a DigDag workflow by using the command


digdag init my_test_digdag

This command creates a directory called my_test_digdag and a dig file called my_test_digdag.dig with various tasks like setup, disp_current_date etc. Go ahead explore the project now. When you run they all run in sequence. 


digdag run my_test_digdag

So as you see it has Project, Workflow and Tasks. A workflow is represented by a .dig file. A workflow can have one more tasks. Tasks are defined using "+". A task can have operators that start with "type>:". DigDag has range of supported Operators. In the example above we can see a echo>: operator which just prints to the console.
A set of workflows is called a Project. A project usually contains all the related workflows. If a workflow is not related to others then it's a good idea to make it a part of different project

Let's create a simple Project and Task that can be used across this how-to. I will use a simple Python script here for task. I will also use a different name for the folder and .dig file. The folder structure will look

>example_python_workflow_project [folder]
 >example_python_workflow.dig [file]
 >my_workflow.py [file]
timezone: UTC
_export:
        person: "Thejesh GN"

+run_task_1:
  py>: my_workflow.MyWorkflow.task1
#!/usr/bin/env python
import digdag

class MyWorkflow(object):
	def task1(self):
		param_person = digdag.env.params["person"]
		print "I am going to person the task for {0}".format(param_person)

So our task1(self) just reads the exported environment param from the .dig file and prints it. You can run it locally by

digdag run example_python_workflow.dig

To run it on a server. Start the server (here I am running the server on the same machine) and deploy it to the server.

#start the server
digdag server --database . --enable-swagger

#deploy the workflow project
digdag push example_python_workflow_project --project ./example_python_workflow_project --revision v1 

You should be able to see the deployed workflow at http://localhost:65432

Trigger the task using API

Before we start using the APIs. Let's take a minute to know how DigDag is deployed and run. When a Project gets deployed (or uploaded), DigDag backs up the current project and deploy's the new version. The deployment command takes Project Name and Version as input.

digdag push {folder_name} --project {project} --revision {revision}

This is what you saw in the last step of the previous section. Now to run the workflow it needs a Session. A session is a context under which a workflow will run. An Attempt is an actual run/execution of a session. A sessions can be rerun if an attempt fails. In DigDag sessions are created based on TimeStamp.
With this knowledge let's start invoking the APIs. Our Goal is to make an Attemp to run a workflow. To do that we need current Workflow ID of our workflow. The workflowId changes every time you deploy a new version of code. So it's important to get the latest workFlowId. To get the WorkFlow details, you need ProjectID and hence we will start with getting Project details using Project Name.

Get Project

Request:
URL : http://localhost:65432/api/project
Query Param:
name=example_python_workflow_project

Response:
{
    "id": "3",
    "name": "example_python_workflow_project",
    "revision": "v1",
    "createdAt": "2019-10-15T05:47:59Z",
    "updatedAt": "2019-10-15T05:47:59Z",
    "deletedAt": null,
    "archiveType": "db",
    "archiveMd5": "+tY17JVIIt0/S5McYmMm/w=="
}

Here the projectId is 3 which will be used to get the Worflow in the below request.

Get Current workflowId

Request:
URL: http://localhost:65432/api/projects/3/workflow
Query Params:
Name: example_python_workflow

Response:
{
    "id": "42",
    "name": "example_python_workflow",
    "project": {
        "id": "3",
        "name": "example_python_workflow_project"
    },
    "revision": "v1",
    "timezone": "UTC",
    "config": {
        "_export": {
            "person": "Thejesh GN"
        },
        "+run_task_1": {
            "py>": "my_workflow.MyWorkflow.task1"
        }
    }
}

Make an Attempt to Run

Request:
URL: http://localhost:65432/api/attempts
Body:
{
    "workflowId": "42",
    "sessionTime": "2019-10-14T12:42:29Z",
    "params": {
    }
}

Response:
{
    "id": "11",
    "index": 1,
    "project": {
        "id": "3",
        "name": "example_python_workflow_project"
    },
    "workflow": {
        "name": "example_python_workflow",
        "id": "42"
    },
    "sessionId": "10",
    "sessionUuid": "39bd9454-d360-4e2b-a63e-58144c0f388a",
    "sessionTime": "2019-10-14T12:42:29+00:00",
    "retryAttemptName": null,
    "done": false,
    "success": false,
    "cancelRequested": false,
    "params": {},
    "createdAt": "2019-10-15T05:53:54Z",
    "finishedAt": null
}

Your digdag should print the name "Thejesh GN" as expected

Make an Attempt with Params

Lets make the request again with params. Here I am sending person="Rajesh" as the param.

Request:

{
    "workflowId": "42",
    "sessionTime": "2019-10-14T12:44:29Z",
    "params": {
        "person": "Rajesh"
    }
}

But the digdag out still prints "Thejesh GN", that's because the exported environment variable inside the .dig file takes the precedence. So to use the params passed as part of API request remove the _export part or use a different param name. Deploy it again with v2 as the version.

#!/usr/bin/env python
import digdag

class MyWorkflow(object):
	def task1(self):
		param_person = digdag.env.params["person"]
		print "I am going to person the task for {0}".format(param_person)

Make an Attempt, Do make sure get the new workFlowId first and change the SessionTime so it starts a new session.

Request:
{
	"workflowId": "43",
	"sessionTime": "2019-10-20T12:45:29Z",
	"params": {
		"person": "Rajesh"
	}
}

Response:
{
	"id": "14",
	"index": 1,
	"project": {
		"id": "3",
		"name": "example_python_workflow_project"
	},
	"workflow": {
		"name": "example_python_workflow",
		"id": "43"
	},
	"sessionId": "11",
	"sessionUuid": "885bb992-010b-48f6-ba77-960cdc42dae4",
	"sessionTime": "2019-10-14T12:45:29+00:00",
	"retryAttemptName": null,
	"done": false,
	"success": false,
	"cancelRequested": false,
	"params": {
		"person": "Rajesh"
	},
	"createdAt": "2019-10-15T05:58:09Z",
	"finishedAt": null
}

Now you should get the correct result. With that you have triggered a workflow with params remotely using APIs. That ends this how-to. Happy remote triggering.

1 Response

  1. November 15, 2019

    […] a set of Restful APIs. You can control the aspects of DigDag using it. I have written a fairly long blog post about using the DigDag APIs. For a quick start, pass –enable-swagger flag to DigDag server. This will expose the Swagger […]

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.