I have a task defined using KubernetesPodOperator and I am passing value of the templated variable run_id as an environment variable.
What baffles me is that when I use the Jinja template syntax, it works. However when I use the Variable.get() syntax, I got a key error from Airflow.
Here is the code:
# this code works!
with DAG(
dag_id="test",
default_args={'owner': 'xxx'},
start_date=days_ago(0),
schedule_interval=None,
) as dag:
task_dry_run = KubernetesPodOperator(
name="test-dry-run",
image="debian",
cmds=["bash", "-cx"],
arguments=["echo", "10"],
labels={"foo": "bar"},
task_id="dry_run_demo",
do_xcom_push=True,
in_cluster=False,
get_logs=True,
env_vars={'run_id': "{{ run_id }}"}
)
# this code does NOT work!
with DAG(
dag_id="test",
default_args={'owner': 'xxx'},
start_date=days_ago(0),
schedule_interval=None,
) as dag:
task_dry_run = KubernetesPodOperator(
name="test-dry-run",
image="debian",
cmds=["bash", "-cx"],
arguments=["echo", "10"],
labels={"foo": "bar"},
task_id="dry_run_demo",
do_xcom_push=True,
in_cluster=False,
get_logs=True,
env_vars={'run_id': Variable.get("run_id")}
)