Programmatically Creating Embulk Configuration Files
Embulk needs a YAML file configuration for each data load. It's a simple format, very human-readable. But there are cases where I want the YAML files to generate dynamically. Embulk does support an experimental feature that involves liquid templates. But my team is well versed in Python and Jinja2. Hence that is what we use.
All our jobs are orchestrated using Digdag. If we assume that we are running an Embulk job with a manually prepared configuration file. Then the dig file will look like this.
timezone: UTC
+run_embulk_job:
sh>: embulk run /data/config/embulk_configuration.yaml
But I have a job that runs before this, which creates an embulk configuration. It also uses Digdags built-in secrets and env feature. Now it's up to you how you want to build the YAML in Python. You can go to jinja2 way and build like that. Or you can just create dictionaries and use PyYAML dump to create the ENV variables.
timezone: UTC
+create_embulk_configurations:
py>: tasks.embulk_jobs.create_all_embulk_configurations
_env:
PASSWORD: ${secret:PASSWORD}
USERNAME: ${secret:USERNAME}
+run_embulk_job:
sh>: embulk run /data/config/embulk_configuration.yaml
Once the Embuk job is complete, you can have a cleanup job to cleanup things, for example, you may want to destroy the YAML file. Below I am using dynamic tasks _check
and _error
to handle the completion.
timezone: UTC
+create_embulk_configurations:
py>: tasks.embulk_jobs.create_all_embulk_configurations
_env:
PASSWORD: ${secret:PASSWORD}
USERNAME: ${secret:USERNAME}
+run_embulk_job:
sh>: embulk run /data/config/embulk_configuration.yaml
_check:
py>: tasks.embulk_jobs.success_handler
_error:
py>: tasks.embulk_jobs.error_handler
You could also make +create_embulk_configurations
task generate more than one configuration file and run embulk in a loop if the situation needs it.
timezone: UTC
+create_embulk_configurations:
py>: tasks.embulk_jobs.create_all_embulk_configurations
_env:
PASSWORD: ${secret:PASSWORD}
USERNAME: ${secret:USERNAME}
+run_embulk_jobs:
for_each>:
embulk_configuration: [embulk_config_01.yml, embulk_config_02.yml]
_do:
sh>: embulk run /data/config/$embulk_configuration
_check:
py>: tasks.embulk_jobs.success_handler
_error:
py>: tasks.embulk_jobs.error_handler
Embulk and Digdag work very well togehter. This is one of my ways to make my workflows dynamic and testable. I hope it was helpful to you.
2 Responses
[…] https://thejeshgn.com/2022/12/28/programmatically-creating-embulk-configuration-files/ […]
[…] wrote another post on Embulk and Digdag, which are a significant part of my data […]