Einen BranchPythonOperator definieren
Nachdem du die Möglichkeiten bedingter Logik in Airflow kennengelernt hast, möchtest du den BranchPythonOperator ausprobieren. Du möchtest einen anderen Codepfad ausführen, wenn das aktuelle Ausführungsdatum ein neues Jahr darstellt (z. B. 2020 vs. 2019).
Der DAG ist zusammen mit den betreffenden Tasks bereits für dich definiert. Deine aktuelle Aufgabe ist es, den BranchPythonOperator zu implementieren.
Diese Übung ist Teil des Kurses
Einführung in Apache Airflow mit Python
Anleitung zur Übung
- Konfiguriere in der Funktion
year_checkden Code so, dass geprüft wird, ob sich das Jahr des aktuellen Ausführungsdatums vom vorherigen Ausführungsdatum unterscheidet (d. h., ob sich das Jahr zwischen den entsprechenden Airflow-Template-Variablen unterscheidet). - Vervollständige den
BranchPythonOperator, indem du die passenden Argumente hinzufügst. - Setze die Abhängigkeiten zu
current_year_taskundnew_year_task.
Interaktive Übung
Vervollständige den Beispielcode, um diese Übung erfolgreich abzuschließen.
# Create a function to determine if years are different
def year_check(**kwargs):
current_year = int(kwargs[____][0:4])
previous_year = int(____[____][0:4])
if current_year == previous_year:
return 'current_year_task'
else:
return 'new_year_task'
# Define the BranchPythonOperator
branch_task = BranchPythonOperator(task_id='branch_task', dag=branch_dag,
python_callable=____, ____=True)
# Define the dependencies
branch_task ____ current_year_task
branch_task ____ new_year_task