Define a BranchPythonOperator
Nachdem du die Möglichkeiten der bedingten Logik in Airflow kennengelernt hast, möchtest du den BranchPythonOperator ausprobieren. Du möchtest einen anderen Codepfad ausführen, wenn das aktuelle Ausführungsdatum ein neues Jahr ist (z. B. 2020 gegenüber 2019).
Die DAG wird für dich definiert, zusammen mit den jeweiligen Aufgaben. Deine aktuelle Aufgabe ist es, den BranchPythonOperator zu implementieren.
Diese Übung ist Teil des Kurses
Einführung in Apache Airflow in Python
Anleitung zur Übung
- Konfiguriere in der Funktion
year_check
den Code, um festzustellen, ob sich das Jahr des aktuellen Ausführungsdatums von dem des vorherigen Ausführungsdatums unterscheidet (d.h., ob sich das Jahr zwischen den entsprechenden Airflow-Vorlagenvariablen unterscheidet). - Schließe die
BranchPythonOperator
ab, indem du die entsprechenden Argumente hinzufügst. - Setze die Abhängigkeiten auf
current_year_task
undnew_year_task
.
Interaktive Übung zum Anfassen
Probieren Sie diese Übung aus, indem Sie diesen Beispielcode ausführen.
# Create a function to determine if years are different
def year_check(**kwargs):
current_year = int(kwargs[____][0:4])
previous_year = int(____[____][0:4])
if current_year == previous_year:
return 'current_year_task'
else:
return 'new_year_task'
# Define the BranchPythonOperator
branch_task = BranchPythonOperator(task_id='branch_task', dag=branch_dag,
python_callable=____, ____=True)
# Define the dependencies
branch_task ____ current_year_task
branch_task ____ new_year_task