Lebih banyak @tasks
Untuk melanjutkan penerapan alur kerja Anda, Anda perlu menambahkan langkah lain untuk mengurai dan menyimpan perubahan dari file yang diunduh. Dag process_sales sudah didefinisikan dan memiliki task pull_file yang telah ditambahkan. Dalam kasus ini, fungsi Python sudah disediakan untuk Anda, parse_file(inputfile, outputfile).
Perhatikan bahwa saat menerapkan task Airflow, Anda sering kali tidak perlu memahami setiap langkah secara terperinci. Selama Anda memahami cara membungkus langkah-langkah tersebut dalam struktur Airflow, Anda akan dapat menerapkan alur kerja yang diinginkan.
Latihan ini adalah bagian dari kursus
Pengantar Apache Airflow dengan Python
Petunjuk latihan
- Buat task Airflow menggunakan metode
parse_file. - Panggil task tersebut dengan argumen yang diperlukan.
Latihan interaktif praktis
Cobalah latihan ini dengan menyelesaikan kode contoh berikut.
@dag(dag_id='process_sales')
def process_sales():
# Decorate parse_file as a task
____
def parse_file(inputfile: str, outputfile: str):
with open(inputfile) as infile:
data = json.load(infile)
with open(outputfile, 'w') as outfile:
json.dump(data, outfile)
pull_file('http://dataserver/sales.json', 'latestsales.json')
# Call the parse_file task
____('latestsales.json', 'latestsales_parsed.json')
process_sales()