Programmatically Creating Embulk Configuration Files

Embulk needs a YAML file configuration for each data load. It's a simple format, very human-readable. But there are cases where I want the YAML files to generate dynamically. Embulk does support an experimental feature that involves liquid templates. But my team is well versed in Python and Jinja2. Hence that is what we use.

All our jobs are orchestrated using Digdag. If we assume that we are running an Embulk job with a manually prepared configuration file. Then the dig file will look like this.

timezone: UTC

+run_embulk_job:
    sh>: embulk run /data/config/embulk_configuration.yaml

But I have a job that runs before this, which creates an embulk configuration. It also uses Digdags built-in secrets and env feature. Now it's up to you how you want to build the YAML in Python. You can go to jinja2 way and build like that. Or you can just create dictionaries and use PyYAML dump to create the ENV variables.

timezone: UTC

+create_embulk_configurations:
    py>: tasks.embulk_jobs.create_all_embulk_configurations
    _env:
      PASSWORD: ${secret:PASSWORD}
      USERNAME: ${secret:USERNAME}

+run_embulk_job:
    sh>: embulk run /data/config/embulk_configuration.yaml

Once the Embuk job is complete, you can have a cleanup job to cleanup things, for example, you may want to destroy the YAML file. Below I am using dynamic tasks _check and _error to handle the completion.

timezone: UTC

+create_embulk_configurations:
    py>: tasks.embulk_jobs.create_all_embulk_configurations
    _env:
      PASSWORD: ${secret:PASSWORD}
      USERNAME: ${secret:USERNAME}

+run_embulk_job:
    sh>: embulk run /data/config/embulk_configuration.yaml
    _check:
        py>: tasks.embulk_jobs.success_handler    

_error:
    py>: tasks.embulk_jobs.error_handler

You could also make +create_embulk_configurations task generate more than one configuration file and run embulk in a loop if the situation needs it.

timezone: UTC

+create_embulk_configurations:
    py>: tasks.embulk_jobs.create_all_embulk_configurations
    _env:
      PASSWORD: ${secret:PASSWORD}
      USERNAME: ${secret:USERNAME}

+run_embulk_jobs:
  for_each>:
    embulk_configuration: [embulk_config_01.yml, embulk_config_02.yml]
  _do:
     sh>: embulk run /data/config/$embulk_configuration
    _check:
        py>: tasks.embulk_jobs.success_handler   

_error:
    py>: tasks.embulk_jobs.error_handler
 

Embulk and Digdag work very well togehter. This is one of my ways to make my workflows dynamic and testable. I hope it was helpful to you.


You can read this blog using RSS Feed. But if you are the person who loves getting emails, then you can join my readers by signing up.

Join 2,241 other subscribers

2 Responses

  1. December 28, 2022
  2. December 30, 2022

    […] wrote another post on Embulk and Digdag, which are a significant part of my data […]