![]() Below is a screenshot of my slightly modified code: MyFirstSensor from datetime import datetimeįrom import BaseSensorOperatorįrom import apply_defaultsĬlass _init_(self, xcom_task_id_key, *args, **kwargs): Michal’s sample code was missing a couple of small changes that would help productionalize it. The Solution Part 1: Flexible Task Parameters What I realized would work better was to retrieve the file list, store it in an XCOM variable and pass the same file list to each of the downstream tasks, letting each iterate through the file list. That led me to finally taking a closer look at XCOM and Michal’s post. More time was spent troubleshooting this type of use case than was warranted. Sometimes a restart of the services helped, sometimes not. Other DAGs following the same pattern were fine. Oddly enough, the DAG wouldn’t always register properly in one environment. Not terribly elegant, especially when combined with the next problem. This meant digging through the file system logs instead of viewing them through the web UI. Occasionally, you would see the full graph during execution, only to have it disappear when the DAG completed. While it generated less noise, it added additional complexity because if there was a failure in one of the upstream tasks, the Airflow variable had to be cleared so it would run again on the next scheduled interval.įrom a debugging standpoint, because an Airflow variable was being read to prevent task execution, the DAG always showed the task that tested for the Airflow variable and rarely showed the fully populated DAG graph, with the dynamically rendered tasks. If it was really executing, only then would the LIST command run on the SFTP site and generate the dynamic tasks from there. Prior to trying XCOM, my solution was to create an Airflow variable that tracked actual execution of the DAG. Additionally, any log entries generated from executing the top level code essentially creates “noise” that is best left unseen. AFAIK, this hasn’t changed for Airflow 1.10.0.īecause the code is retrieving a remote file list every second, there’s an excessive amount of chattiness from Airflow to the SFTP server. The problem came down to one that currently exists in Airflow 1.9.0 where the scheduler refreshes the DAG list every second because it doesn’t maintain DAG state and the scheduler interval isn’t currently configurable. It’s not a complicated DAG, and you can see that the task_id’s are uniquely named based on the files found on the remote SFTP site by appending the filename to each task_id. Task_id='delete_remote_file' + '-' + file,ĭb_load_t_downstream(delete_remote_file_task) Get_remote_file_t_downstream(db_load_task)ĭelete_remote_file_task = SFTPToS3Operator( ![]() This is what some of the code looked like: directory_list = sftp_handler('sftp', None, '/Remote/', None, SFTPToS3Operation.LIST)ĭestination_file_path='s3://' + bucket_name + '/SFTP/raw/' + file,ĭb_load_task = DBLoadOperator(task_id='data_load', ![]() The SFTP implementation went through a couple of revisions, primarily due to the chattiness of the DAG. ![]() writing code in your DAGs can be covered in a separate blog post.įor the SFTP handling, dynamic tasks were written that downloaded the file, decrypted it and then deleted the source file on the SFTP site. Other plugins handle the SFTP of files, GNUPG encryption and decryption tasks, etc. The Problemįirst, a little bit of background: As part of our Airflow implementations, we’ve developed custom plugins that do a great job of encapsulating the need for querying databases, storing the results in a CSV file to an S3 or GCS bucket and then ingesting that data into a Cloud Data Warehouse. So rather than reinvent the wheel, I suggest you read his blog and reference his code where applicable and I will call out the changes I made. ![]() I’m going to walk you through my own introduction to XCOM that came from Michal Karzynski’s great blog: This blog is not geared towards introducing you to Airflow and all that it can do, but focused on a couple of XCOM use cases that may be beneficial to your own projects. Airflow is a robust workflow pipeline framework that we’ve used at Precocity for with a number of clients with great success. ![]()
0 Comments
Leave a Reply. |