DAG workflows usually start at the beginning and work towards some end goal. They are action focussed, not results focussed: "start this workflow at 8:00am every day" rather than "the data in this table needs to be up to date". The big drawbacks here are:
They exist because they are a natural way to develop a workflow. For example: "We need this data downloaded daily". Ok, set up a cron job that downloads the files and run it daily. Then one day it fails: "We need a monitor to check that files are downloaded". Ok, add some code to the end of the download job that flags that it was done correctly. Then one day: "The files were downloaded but the email at the end wasn't sent to tell users the files were ready". And on it goes until you end up with a spagetti of cron jobs.
An important consideration here is when a dependency becomes unsatified it needs to somehow "know" that something is working on it. This is because satisfying a dependency can take some amount of time. How much time?. So there has to be a "working on it" state, and an acceptable amount of time that something is allowed to be working on it. Any dependency that relies on another dependency will be "working on it" while it's sub dependencies are "working on it".
Consider kuberneties: there is a daemon running that is constantly checking the state of the system and comparing that to the desired state. When that state is not met kuberneties will attempt to modify the system until that state is met. In order to do that it must know: a) is the state how I want it (eg: is this program running) and b) actions to alter the state (eg: start this program).
Do declarative workflows are continuously asking:
Most ETL workflow managment software is ass-backwards. They come at the problem from the same approach: You want to execute something regularly (a schedule) and that something is likely a complex chain of interdependent sub-tasks. So the focus is: trigger this workflow at this specific time of the day and it will execute all the steps in the dependency tree (DAG) until it's complete.
The problem of scheduling is aparent when you ask the business "when do you want your data?". They are not going to say "0:00am plus how ever long the workflow takes". They are going to say "whenever it's ready" or "as soon as possible". They don't really care what time a workflow is started - they care when the data they want is available. Their focus is on the result. They are focussed on "pulling" the data, but the infrastructure is focussed on "pushing" the data. What if there was an ETL system that was focussed on pulling the data?
The problem of action-based workflows is aparent when looking at failures. When can a workflow fail and it is ok? What do you do when a workflow fails? This problem of "pushing" the data down the pipeline is that you end up slapping a data quality monitor at the end of each flow, so that you are able to report to the business: yes, this system really worked and here is your data. Because the data quality monitor is completely disconnected from your ETL workflow system, you end up with alerts that need to be "acknowledged" by humans - because how else does your system know that things were resolved? Or you run your data quality monitor frequently and hope you can handle the noise.
We have been here before with server management. If we look at server management from the perspective of scheduled actions we encounter the same problems. With server management the solution was to move to a declarative state manager ("this is what the system should look like") and when the system deviates from the expectations actions are triggered to bring the system up to expectations.
My proposal is to look at ETL workflows in the same manner. We start at "this is what the data should look like" and when the system deviates from the expectations actions are triggered to bring the system up to expectations. Lets take an example:
Doug has been asked to implement a workflow DAG for a complex data load. The workflow must:
What if we reverse the problem, and instead declare: "We expect our resulting Hadoop table to contain data for our 1000 campaigns daily, starting from date A and ending date B". We then declare that in order to have this data "The dependency Hadoop table needs have data from date A and ending date B". We also need to have "downloaded this CSV file for this campaign for this day" for each of our 1000 campaigns. So we build a tree of dependencies starting from the results we want to see, building "downwards" towards the first step.
Each day we update our expectations of date B. The system recognises this and knows that we need to have 1000 CSV files downloaded for every day for every campaign. Because the states for the previous days are marked completed we only add 1000 new "expectations" for the new date. Those new expectations can see that the CSV files are not there and will trigger acions to execute the downloads.
Now, if the Ad network changes our SFTP password there is a big scramble and an administrator needs to log into a box and test the new password. To test it is working she manually SFTP's the missing files over to our server. Boom, she has just satisfied all the "downloaded this CSV file for this campaign" dependencies. She does not need to go and restart the whole job from scratch because the system is constantly checking the current state. She doesn't need to do anything because satisfying that dependency automatially bubbles up the chain.
Now we discover that it should have been 1001 campaigns this month, so we update our desired state to include that missing campaign. Immediately our system recognises that campaign id 1001 is missing from our results for the past month! It then notices that we are missing 31 CSV files! However it knows how to satisfy those dependencies.
Let's say Doug mistakenly deletes campaign data for campaign id 501 on the 25th March from the final results table - whoops! It's ok becuase our system knows this happened immediately and can repair the data automatically. If we cleaned up the CSV files and the Ad network doesn't have this one anymore it's tough luck for us but at least we know exactly why this data is missing and why we can't recover it. Perhaps we can reconstruct the data from estimates and insert that into our results table? That will satisfy the desired state and we won't keep trying to download CSV files that are not there anymore, and no one needs to go into the UI and cancel an ETL job that's failing and spamming everyone.
In order to have this system we need to build a bunch of state checkers. Bit's of code that know how to check if the state is correct. These usually need to be fast executing if they will be used in polling mode, because our system is going to be checking them all the time. Alternatively they can inform our system via a message if polling is not a good idea.
We also need to build a bunch of actions that know what to do move the system to the desired state.
Finaly we need a place to store and manage the expected states. These chunks of data will be passed to the state checkers in order for them to verify if our system state is ok or if something is missing. If something is missing these state checkers can create additional expected states for downstream dependencies, and/or trigger specific actions to make the system state meet expectations.
The state checker callback does not return the state, but rather informs the system of the state of the system via a function call. This way a state checker could fire up a job to check the state and return immediately. Later on in an async way when we know the state the system can be updated. If a state checker is really fas it can also just make that call inline before returning.
Actions callbacks should trigger specific actions to occur, however they should also be able to fork off in an async way.
I do not know how to build such a system, but I wish it existed.