0

I have a task defined using KubernetesPodOperator and I am passing value of the templated variable run_id as an environment variable.

What baffles me is that when I use the Jinja template syntax, it works. However when I use the Variable.get() syntax, I got a key error from Airflow.

Here is the code:

# this code works!

with DAG(
    dag_id="test",
    default_args={'owner': 'xxx'},
    start_date=days_ago(0),
    schedule_interval=None,
) as dag:
    task_dry_run = KubernetesPodOperator(
        name="test-dry-run",
        image="debian",
        cmds=["bash", "-cx"],
        arguments=["echo", "10"],
        labels={"foo": "bar"},
        task_id="dry_run_demo",
        do_xcom_push=True,
        in_cluster=False,
        get_logs=True,
        env_vars={'run_id': "{{ run_id }}"}
    )

# this code does NOT work!

with DAG(
    dag_id="test",
    default_args={'owner': 'xxx'},
    start_date=days_ago(0),
    schedule_interval=None,
) as dag:
    task_dry_run = KubernetesPodOperator(
        name="test-dry-run",
        image="debian",
        cmds=["bash", "-cx"],
        arguments=["echo", "10"],
        labels={"foo": "bar"},
        task_id="dry_run_demo",
        do_xcom_push=True,
        in_cluster=False,
        get_logs=True,
        env_vars={'run_id': Variable.get("run_id")}
    )

1 Answer 1

1

Presumably the "run_id" Airflow Variable doesn't exist and that's why you are seeing issues. The Variable.get() call will be executed each time the Scheduler parses the DAG aka parse time (by default every 30 seconds) since it's considered top-level code. Meanwhile, Jinja expressions are not evaluated until runtime. Check out this documentation for more info.

You can access Airflow Variables in Jinja templates as well using {{ var.value.run_id }}.

Finally, you can also set a default value in a Variable.get() call too like Variable.get("run_id", default="1234").

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.