Environment Based Airflow DAGs

Joaquín Menchaca (智裕)
2 min readJul 31, 2019

You can inject values using either command line or REST API. Through the command line, you’d do this:

airflow trigger_dag my_workflow --conf '{"org_env":"stage"}'

You can access these values through the dag_run.conf dictionary by the operator. Thus as an example, you can create a BashOperator to test it out by printing something like

test = BashOperator(
task_id="bash_task",
bash_command="""
echo ORG_ENV='{{ dag_run.conf["message"] if dag_run else "" }}'
"""
)

To use this at the DAG level, you can call another DAG with TriggerDagRunOperator and fetch the dag_run.conf from kwargs in a Python callable.

Elsewhere, you can dynamically create a DAG using the environments, indexed by a key that matches the org_env setting.

You can iterate through a sub dictionary, then do something like:

globals()[dag_id] = create_dag(dag_id,
backup_schedule,
default_args)

And before this:

def create_dag(dag_id, schedule, default_args):
dag = DAG(dag_id=dag_id,
default_args=default_args,
schedule_interval=schedule)
with dag:
# do create tasks
# do create workflow
return dag

In my dynamic dags, I also pass in references to connection names (slack tokens, database) that I then fetch and pass to the operators. The task_id are setup with unique name based on variable that contains configuration for the environment.

Off topic, related: For configuring a system with the variables and connections. You can pass in a JSON from the command line, and it’ll setup all the variables, or you can pass one by one. Connections can be configured by the command line as well. So you can use your favorite change config, kubectl or helm, or favorite templating system for automation. If using containers, suggest making a utility container that runs the config commands as needed, which are ultimately saved in Airflow’s metadatabase. This way you can share same config across airflow and others (salt pillars, ansible group_vars, chef databags, etc) that is related to dev/test/stage/prod environments for example.

--

--

Joaquín Menchaca (智裕)

DevOps/SRE/PlatformEng — k8s, o11y, vault, terraform, ansible