ARTICLE AD BOX
I'm trying to write a DAG that conditionally executes another task. The simplified version of what I'm working with is this:
to_be_triggered = EmptyOperator(task_id="to_be_triggered") @task.branch() def trigger_dag(**kwargs): config = kwargs.get("dag_run_config") if config.get("run_trigger") is True: return ["to_be_triggered"] return None with DAG("example") as dag: dag_run_config = { "run_trigger": True } t0 = trigger_dag(dag_run_config=dag_run_config) t1 = EmptyOperator(task_id="end", trigger_rule=TriggerRule.ONE_SUCCESS) t0 >> t1So I want to conditionally run to_be_triggered if the run_trigger variable in the config is True. I am unable to do this because branch_task_ids must contain only valid task_ids, and for some reason, to_be_triggered is invalid:
Following branch {'to_be_triggered'} Task failed with exception AirflowException: 'branch_task_ids' must contain only valid task_ids. Invalid tasks found: {'to_be_triggered'}From what I can tell from Google, this is usually because a task is in a task group, and needs to be specified with the group id, but I don't have a task group here. Does anyone know if a task group is implicitly set anywhere, or if there's another possible cause for to_be_triggered to be invalid?
