There are a lot of things to love about dbt … and then there are some omissions. One that I find particularly disturbing is how tests work. With the common invocation of dbt run
, tests don’t run at all. You can of course do a dbt test
call afterwards, but if the data broke, broken data is unfortunately already updated. Any downstream that reads that data, will read data that didn’t pass the tests.
There are several ways around this. One is to run dbt in steps, and only trigger the next step if the previous succeeded. A nice helper for this is the dbt build
command, which does both a run and a test at the same time — however you’ll still need a bash script or workflow orchestration tool to set up the workflow.
There are many patterns that can be hacked in place, to ensure that data is created in some table, then moved somewhere else only if tests pass. You can configure this as a post-hook on the pipeline, and it’s pretty smart! However, it also risks breaking assumptions people make about what data to actually read, given that the result of your model is no longer stored in a table named as your model (you can’t read it with a {{ ref(…
in a downstream query).
There are of course ways around this, overriding table names and dataset names, moving data back and forth.
… But if you’re using BigQuery, there’s a potentially nicer way, use the time travel functionality to restore the tables to the state prior to the run, if there’s a failure. This should work well both with regular tables, and incremental models (though in fairness I didn’t try) → and the data will rest where you expect it to. The only downside is that between a run that fails, and the automatic roll-back, there is broken data that downstream consumers risk reading.
{% macro roll_back_on_failure() %} | |
{# Only run this macro on actual executions of build and run (as e.g. 'test' wouldnt know how far to roll back) #} | |
{% if execute and invocation_args_dict.which in ("build", "run") %} | |
{# Iterate over all the results of this run to find failures #} | |
{% for result in results %} | |
{% if result.status in ("fail", "error") %} | |
{# We found a failure! Let's roll back things and break out of the loop #} | |
{{ log("", info=True) }} | |
{{ log("╭"+"─"*50, info=True) }} | |
{{ log("│ Failure detected in: " + result.node.name , info=True) }} | |
{{ log("├"+"─"*50, info=True) }} | |
{{ log("│ Reverting tables to " + run_started_at.strftime("%Y-%m-%d %H:%M:%S.%s")[:-4], info=True) }} | |
{# Let's find all tables that were successfully refreshed in this run #} | |
{% for inner_result in results %} | |
{% if inner_result.node.resource_type == "model" and inner_result.node.config.materialized == "table" and inner_result.status == "success" %} | |
{{ log("│ Rolling back " + inner_result.node.name, info=True) }} | |
{# Create the time-travel query to BigQuery #} | |
{% set query %} | |
CREATE OR REPLACE TABLE {{ inner_result.node.relation_name }} AS | |
SELECT * FROM | |
{{ inner_result.node.relation_name }} | |
FOR SYSTEM_TIME AS OF '{{ run_started_at.astimezone(modules.pytz.timezone("America/Los_Angeles")).strftime("%Y-%m-%d %H:%M:%S.%s")[:-4] }}'; | |
{% endset %} | |
{# Run the rollback #} | |
{% do run_query(query) %} | |
{% endif %} | |
{% endfor %} | |
{{ log("╰"+"─"*50+"\n", info=True) }} | |
{% break %} | |
{% endif %} | |
{% endfor %} | |
{% endif %} | |
{% endmacro %} |
(the git warning for unicode chars is likely due to the box drawing chars I use to highlight the output)
Above macro can be called automatically on run end, setting up the dbt_project.yml like this:
on-run-end:
- "{{ roll_back_on_failure() }}"
The code is one of several handy run-end macros I use, if you feel like hacking on this, a nice exercise is to query the test-output tables and log some of the resulting records for failed tests.