1. Anuncie Aqui ! Entre em contato fdantas@4each.com.br

[Python] Dag with task based on condition

Discussão em 'Python' iniciado por Stack, Setembro 12, 2024.

  1. Stack

    Stack Membro Participativo

    I need to create a dag with some tasks that should be executed daily, and one task that should be executed monthly on the 1st of each month:

    task_1: should be executed daily task_2: should be executed monthly, on the 1st of each month task_3: should be executed dailly

    So basically my dag should be either a simple sequence of tasks where task_2 should somehow be skipped if the execution_date is not the 1st of the month, or a dag with some branching that should execute task_1 and task_3 always, and task_2 only if the execution_date is the 1st of the month.

    I've tried the short circuit decorator, and also a BranchPythonOperator using a Python function that returns the task depending on the date logic, however I don't seem to be able to do this. How can I accomplish this using TaskFlow API? Thanks

    @dag(
    start_date=datetime(2024,9,9),
    schedule_interval=None
    )
    def my_dag():

    task_1 = EmptyOperator(task_id='task_1')
    task_2 = EmptyOperator(task_id='task_2')
    task_3 = EmptyOperator(task_id='task_3')


    def monthly_branch(**kwargs):
    execution_date = kwargs['execution_date']
    if execution_date.day == 1:
    return 'task_2'
    elif execution_date.day > 1:
    return 'task_3'
    else:
    return None

    branch = BranchPythonOperator(
    task_id = "branch",
    python_callable = monthly_branch
    )


    task_1 >> branch
    branch >> task_2 >> task_3
    branch >> task_3

    my_dag()

    Continue reading...

Compartilhe esta Página