Declarative dependency ETL workflows

DAG workflows usually start at the beginning and work towards some end goal. They are action focussed, not results focussed: "start this workflow at 8:00am every day" rather than "the data in this table needs to be up to date". The big drawbacks here are:

  • What you want in the end is separated from the process of doing that. This means you end up with a data quality monitor as a separate process. This is make-work because your data quality monitor flags an error, and you then must go and find out how the data is supposed to get there and why it failed.
  • Workflows are difficult to properly resume and in general chop up. Lets say a workflow failed because a file could not be downloaded. Generally you need to restart the entire workflow.
  • If a workflow failed on the previous day but succeded today, is that ok? Are we missing some data or are we good?

They exist because they are a natural way to develop a workflow. For example: "We need this data downloaded daily". Ok, set up a cron job that downloads the files and run it daily. Then one day it fails: "We need a monitor to check that files are downloaded". Ok, add some code to the end of the download job that flags that it was done correctly. Then one day: "The files were downloaded but the email at the end wasn't sent to tell users the files were ready". And on it goes until you end up with a spagetti of cron jobs.

Declarative workflows have advantages:

  • The system does not need to care how the dependency was satisfied. If it all went to hell and someone had to manually insert the final data, administrators don't need to go back and mark previously failed tasks as "ok, we don't care this failed anymore".
  • If some data is accidentally deleted the workflow engine will automatically "make it right" without needing to manually go and schedule jobs to make that happen.
  • Lets say in the middle of a workflow there is a file download process, and the password gets changed. An adminstrator can test the file download manually and when it's working the manual action of downloading the file resolves the dependency.
  • "Backfill" jobs occur automatically when you change your "desired outcome" requirements.

An important consideration here is when a dependency becomes unsatified it needs to somehow "know" that something is working on it. This is because satisfying a dependency can take some amount of time. How much time?. So there has to be a "working on it" state, and an acceptable amount of time that something is allowed to be working on it. Any dependency that relies on another dependency will be "working on it" while it's sub dependencies are "working on it".

Consider kuberneties: there is a daemon running that is constantly checking the state of the system and comparing that to the desired state. When that state is not met kuberneties will attempt to modify the system until that state is met. In order to do that it must know: a) is the state how I want it (eg: is this program running) and b) actions to alter the state (eg: start this program).

Do declarative workflows are continuously asking:

  • Does my desired state look ok?
  • If not, is there anything "working on it"?
  • If not, trigger any actions required

Most ETL workflow managment software is ass-backwards. They come at the problem from the same approach: You want to execute something regularly (a schedule) and that something is likely a complex chain of interdependent sub-tasks. So the focus is: trigger this workflow at this specific time of the day and it will execute all the steps in the dependency tree (DAG) until it's complete.

The problem of scheduling is aparent when you ask the business "when do you want your data?". They are not going to say "0:00am plus how ever long the workflow takes". They are going to say "whenever it's ready" or "as soon as possible". They don't really care what time a workflow is started - they care when the data they want is available. Their focus is on the result. They are focussed on "pulling" the data, but the infrastructure is focussed on "pushing" the data. What if there was an ETL system that was focussed on pulling the data?

The problem of action-based workflows is aparent when looking at failures. When can a workflow fail and it is ok? What do you do when a workflow fails? This problem of "pushing" the data down the pipeline is that you end up slapping a data quality monitor at the end of each flow, so that you are able to report to the business: yes, this system really worked and here is your data. Because the data quality monitor is completely disconnected from your ETL workflow system, you end up with alerts that need to be "acknowledged" by humans - because how else does your system know that things were resolved? Or you run your data quality monitor frequently and hope you can handle the noise.

We have been here before with server management. If we look at server management from the perspective of scheduled actions we encounter the same problems. With server management the solution was to move to a declarative state manager ("this is what the system should look like") and when the system deviates from the expectations actions are triggered to bring the system up to expectations.

My proposal is to look at ETL workflows in the same manner. We start at "this is what the data should look like" and when the system deviates from the expectations actions are triggered to bring the system up to expectations. Lets take an example:

Doug has been asked to implement a workflow DAG for a complex data load. The workflow must:

  • Call an API to get a list of 1000 campaigns from an Ad network
  • For each of those campaigns log in via SFTP and download a huge CSV file of data (1000 CSV file downloads)
  • Munge that data and load it into Hadoop tables
  • Wait for a separate Hadoop job that will populate dependent data we need to join on
  • Run a Hadoop job to join our new data with that dependency and produce the final "results" Hadoop table

Some questions:

  • What if one of the CSV file downloads fails because the Ad network is an hour late generating it?
  • What if we discover that we are missing a campaign and we should have been downloading 1001 CSV files this month?
  • What if that separate Hadoop job that we depend on fails?
  • What time of the day should we schedule this so we don't get bombarded with dependency warnings?
  • What if the Ad network changes our SFTP password and doesn't tell us?

What if we reverse the problem, and instead declare: "We expect our resulting Hadoop table to contain data for our 1000 campaigns daily, starting from date A and ending date B". We then declare that in order to have this data "The dependency Hadoop table needs have data from date A and ending date B". We also need to have "downloaded this CSV file for this campaign for this day" for each of our 1000 campaigns. So we build a tree of dependencies starting from the results we want to see, building "downwards" towards the first step.

Each day we update our expectations of date B. The system recognises this and knows that we need to have 1000 CSV files downloaded for every day for every campaign. Because the states for the previous days are marked completed we only add 1000 new "expectations" for the new date. Those new expectations can see that the CSV files are not there and will trigger acions to execute the downloads.

Now, if the Ad network changes our SFTP password there is a big scramble and an administrator needs to log into a box and test the new password. To test it is working she manually SFTP's the missing files over to our server. Boom, she has just satisfied all the "downloaded this CSV file for this campaign" dependencies. She does not need to go and restart the whole job from scratch because the system is constantly checking the current state. She doesn't need to do anything because satisfying that dependency automatially bubbles up the chain.

Now we discover that it should have been 1001 campaigns this month, so we update our desired state to include that missing campaign. Immediately our system recognises that campaign id 1001 is missing from our results for the past month! It then notices that we are missing 31 CSV files! However it knows how to satisfy those dependencies.

Let's say Doug mistakenly deletes campaign data for campaign id 501 on the 25th March from the final results table - whoops! It's ok becuase our system knows this happened immediately and can repair the data automatically. If we cleaned up the CSV files and the Ad network doesn't have this one anymore it's tough luck for us but at least we know exactly why this data is missing and why we can't recover it. Perhaps we can reconstruct the data from estimates and insert that into our results table? That will satisfy the desired state and we won't keep trying to download CSV files that are not there anymore, and no one needs to go into the UI and cancel an ETL job that's failing and spamming everyone.

In order to have this system we need to build a bunch of state checkers. Bit's of code that know how to check if the state is correct. These usually need to be fast executing if they will be used in polling mode, because our system is going to be checking them all the time. Alternatively they can inform our system via a message if polling is not a good idea.

We also need to build a bunch of actions that know what to do move the system to the desired state.

Finaly we need a place to store and manage the expected states. These chunks of data will be passed to the state checkers in order for them to verify if our system state is ok or if something is missing. If something is missing these state checkers can create additional expected states for downstream dependencies, and/or trigger specific actions to make the system state meet expectations.

  • For every blob of state
  • Send the blob to a state checker callback
  • If the system state matches the expected state:
  • Mark the state blob as ok
  • If the system state does not match the expected state:
  • Send the state blob to an actions callback

The state checker callback does not return the state, but rather informs the system of the state of the system via a function call. This way a state checker could fire up a job to check the state and return immediately. Later on in an async way when we know the state the system can be updated. If a state checker is really fas it can also just make that call inline before returning.

Actions callbacks should trigger specific actions to occur, however they should also be able to fork off in an async way.

Conclusion

I do not know how to build such a system, but I wish it existed.