Airflow and Dynamic SQL DAG Generation

Data has been at the heart of Oneflare for a number of years now. We have a very extensive data-pipeline that helps us drive decisions, provide health checks and validate hypotheses. The core of the platform consists of Snowplow as our event stream collector, Airflow as our ETL tool and AWS Redshift as our data warehouse. I will be writing another piece on how our infrastructure all forms together in more detail but wanted to take the opportunity to provide some detail on how we manage SQL ETL tasks via Airflow.

One of the early decisions we made when picking a scheduler to manage our ETL jobs was what type of jobs we were going to execute. Would we write Python task? Start exploring Spark? After much excitement and over-engineered whiteboard sessions, we settled back into what our insights team was good at, SQL. We had a hard look at what we really wanted to do and settled on a few concepts:

  • Write SQL queries to create and update tables
  • Execute update statements to transform and load raw table data into modeled report tables from and back into Redshift
  • Build fairly complex pipelines including parallel queries and dependent steps

Premature optimization is the root of all evil

Donald Knuth

Instead of rushing to optimise this early, we settled for a very simple dynamic generator that allows us to write raw SQL queries and with some clever folder and file naming, provide scheduling and DAG(directed acyclic graph) structure to our SQL scripts. The solution we built, creatively named ‘SQL DAG Generator’, allows the Insights team at Oneflare to focus on the SQL rather than the infrastructure and we did not have to learn new technologies or tools.

Our Dynamic SQL DAG Generator

One of the great parts about Airflow is its ability to programmatically build DAGs via Python. We leverage this heavily with the Generator by looping over our ETL jobs when Airflow starts up and creating and modifying our DAGs. The basic form of the SQL jobs consists of a root folder sql_jobs that contains folders with a special naming convention, which in turn contain the SQL to execute like so:

This above ‘job’ would generate the following DAG with a scheduled time of every hour.

my_sql_job to be executed every hour
The steps are correctly ordered into a DAG

There is a lot going on behind the scenes here so I’ll break down each part of the parsing and generation.

Root SQL Jobs Folder

The root sql_jobs folder sits under version control in our Airflow DAGs repository. Alongside this folder is a single python file called ‘sql_dag_generator.py’, that I will walk through in more detail further down. When Airflow starts, this python file is picked up like a normal DAG and looks for the sql_jobs folder and iterates over each folder object it finds.

Folder Name

The folder name represents the DAG name and a CRON schedule to set the execution time. To allow for this folder structure, we need to do some subsections and use X instead of *, % instead of / and _instead of a space in the CRON expression to support various operating systems. The generator will replace these with their correct symbol as part of the initial parsing process.

Some examples that we use:

The beauty of this folder syntax is it simplifies deployments as there is no additional setup or config required to match a job to an execution time. We can also easily change schedules in future but renaming the folder, which can cause some bloat in version control with files being moved around, but we feel like it being self-contained is more advantages at the present time.

Step Ordering

The next parsing step is to generate the individual steps of the DAG for the job. SQL files contained in the folder are ordered based on their prefix of order number, e.g. 1_... for the first step. The generator supports parallel execution by duplicating order prefixes. You can build the steps into a logical block, making it easier to separate large jobs by allowing you to make changes to inner block steps and not have to alter the other areas of the DAG execution order. For example:

If we wanted to add an extra step after 202_..., we can just add a new SQL file prefixed with 203_..., instead of having to rename every file downstream.

File Contents

The files themselves are raw SQL, nothing complicated and with no special templating. One of our reasons behind this is we can easily validate and test the file and execution of the script separate to the scheduling. We generally follow a principle of separating CREATE and UPDATE/INSERT statements into two subsequent files like so:

This would be parsed into the following DAG:

We generally make CREATE statements parallel steps to speed up the execution of the DAG. The contents of the CREATE statements are structured similarly to this:

This allows for the execution of the step each time, and in the event that its the first run we set up the table.  The update queries are structured similarly to this:

By using a temporary staging table and insert, we can ensure we wrap the update in a transaction and provides us a cleaner separation of concern between creating and updating. In the event that ids may overlap or we are not generating the full table each time, we using WHERE statements in the DELETE block to allow for upserts. Eg:

sql_dag_generator.py

The Python file that generates the DAG generally follows the steps ordered above:

  1. Iterate over the sql_jobs folder, and for each folder we find:
  2. Parse the name and execution time
  3. Build the order of steps
  4. Populate the steps with the correct SQL code
  5. Order the steps correctly
  6. Build and inject the DAG into Airflow

Here is a snippet of this process:

Most of these steps are fairly straightforward, but with Airflow, there are a few tricks we had to solve. TO execute the SQL files, we decided to leverage the inbuilt PostgresHook that Airflow provides. This allows us to store the connection information securely in Airflow and wraps the execution with a few other niceties. The accomplish this we use the following method:

We then wrap the hook function around a Python Operator and pass in the function as an argument.

Finally, we create the DAG by looping over the job_steps and creating operators for each step:

We can then construct the graph by iterating over the reversed ops list and inject the DAG into the global Airflow object:

Next Steps

Whilst this process is happily working and has remained largely unchanged for almost 2 years we are always looking at exploring new technologies and options. Some of the caveats of the DAG Generator is we are still explicitly ordering files, and have run into trouble once or twice where the order is wrong, or we, unfortunately, discover we have a cyclic reference across a few files. Testing SQL ETL pipelines is an article, not to mention an artform, in itself.

As the complexity of our ETL jobs is growing we evaluating more robust solutions, specifically DBT (https://www.getdbt.com/). The advantages of a tool like this allow us to not have to manually create the graph by leveraging its inbuilt referencing tools to help us build out the graphs.

Feel free to comment if have any questions or want some more detail about the process.

I lead the Platforms team and am responsible for Backend Engineering, DevOps and Data Engineering across a diverse stack of technologies including Ruby on Rails, Python, Go, Docker, Kubernetes, MySQL & Redshift.

Leave a reply:

Your email address will not be published.

Site Footer