More @tasks
To continue implementing your workflow, you need to add another step to parse and save the changes of the downloaded file. The Dag process_sales is defined and has the pull_file task already added. In this case, the Python function is already defined for you, parse_file(inputfile, outputfile).
Note that often when implementing Airflow tasks, you won't necessarily understand the individual steps given to you. As long as you understand how to wrap the steps within Airflow's structure, you'll be able to implement a desired workflow.
This exercise is part of the course
Introduction to Apache Airflow in Python
Exercise instructions
- Create an Airflow task using the
parse_filemethod. - Call the task with the necessary arguments.
Hands-on interactive exercise
Have a go at this exercise by completing this sample code.
@dag(dag_id='process_sales')
def process_sales():
# Decorate parse_file as a task
____
def parse_file(inputfile: str, outputfile: str):
with open(inputfile) as infile:
data = json.load(infile)
with open(outputfile, 'w') as outfile:
json.dump(data, outfile)
pull_file('http://dataserver/sales.json', 'latestsales.json')
# Call the parse_file task
____('latestsales.json', 'latestsales_parsed.json')
process_sales()