Skip to Content

Low effort dbt logging

I’ve been quiet for a while, mostly because I have been hard at work at a new day job as a data engineer at a large UK based retailer.

A lot of my day to day work involves building and maintaining data pipelines, and it has become clear that the tool of choice for this sort of work is dbt, which I have grown to love.

An aspect of our work which has been somewhat neglected is observability, the ability to measure the internal states of a system by examining its outputs (yes, I googled that definition). Basically, setting up logging, and sending those logs to a data store which you can plonk a dashboard on top of. This is super useful when things break, or when your boss starts asking awkward questions about cloud compute costs.

Now, you can get really fancy with your observability tooling, and there exists a pretty full featured dbt package that does just that. But, it’s a bit too heavyweight for what we need, and it’s quite a bit of technical debt to take on.

I put in a couple of days to set up what I’d call minimum viable logging for dbt, and it basically amounts to a single macro, two models, and an on-run-end hook.

├── project
│   ├── macros/upload_dbt_results.sql
│   ├── models/observability/dbt_results_json.sql
│   ├── models/observability/dbt_results.sql

...
on-run-end: 
  - "{{ upload_dbt_results(results, 1000000) }}"

How it works is, at the end of a dbt run, the on-end-hook calls the macro. What the macro does is it iterates through the dbt results object, creating a set of JSON records and inserting them into a table in your {{ target.schema }}. This table is called _dbt_results_json, and you can reference it in your models, so you can build out whatever you want off of it.

I’d say the _dbt_results_json table is where the reusability of the code included below ends. Because what you’re putting into the results table is unparsed JSON, and the structure of the JSON adapter_response object varies between database backends, you’ll have to adapt your JSON parsing of that table for your own environment. That said, I’ve included a sample _dbt_results tabular view example below, which works on BigQuery, and gives you visibility of things like status, bytes billed, execution time etc.

What’s really nice about this is I can just drop this in every dbt project we have, and they immediately all automagically just start saving their logs to a _dbt_results_json table in their schema. I’ve set up an observability dbt project, that basically just has a UNION ALL of all my other projects’ _dbt_results_json tables, and plonked a view on top of that.

The result have been pretty good so far. We’ve been running this in production for about a week, and we’ve already managed to reduce our BigQuery costs by about 75%, mostly through the identification and modification of inefficient or redundant tests.

We have complete visibility of execution times, execution costs and successes and failures on a per model level, and it’s all on one place, not spread across a whole lot of jobs, so we can slice and dice as we choose, and we know to go to one place to find out what’s wrong.

Code

Here are the models and macros I use for my ’low effort logging':

upload_dbt_results.sql:

{% macro upload_dbt_results(results, batch_size=50000) %}
{# /* If we are in execute mode */ #}
  {% if execute and results | length > 0%}

  {{ log('Uploading run statistics to database', info=True) }}

{# /* Create a namespace for the query string to avoid scoping issues */ #}
  {% set ns = namespace() %}
{# /* Start with an empty query string */ #}
  {% set ns.batch_query = "" %}
  {% set loops = results | length %} 

{# /* Loop through native dbt results objects */ #}
  {% for res in results -%}
{# /* Convert to dict */ #}
    {%- set res_dict = res.to_dict()  %}
{# /* Remove two troublesome keys (too long / special characters) */ #}
    {%- set _ = res_dict.node.pop('raw_code', None)  %}
    {%- set _ = res_dict.node.pop('compiled_code', None)  %}

{# /* Convert to JSON and strip special characters */ #}
    {%- set res_json = res_dict | tojson |  replace("'", " ")  | replace("\\n", " ") | replace("`", "")| replace("\\", "\\\\")  %}

{# /* Wrap json in '' so we can pass in as string */ #}
    {%- set results_object = ["'", res_json, "'"] | join(" ")  %}
{# /* Construct insert query for this result */ #}
    {%- set results_query = [ "(CURRENT_TIMESTAMP, JSON  ",results_object,")"] | join(" ") %}

{# /* If the batch insert string is empty, replace it with the results_query... */ #}
    {% if ns.batch_query|length == 0 %}
      {% set ns.batch_query = results_query %}
{# /* If the batch insert string is small enough... */ #}
    {% elif ns.batch_query | length < batch_size %}
{# /* Append this current query to the batch insert string */ #}
      {% set ns.batch_query = [ns.batch_query, results_query] | join(", ") %}
{# /* But if the batch insert string is big enough... */ #}
    {% else %}
{# /* Append this current query to the batch insert string */ #}
      {% set ns.batch_query = [ns.batch_query, results_query] | join(", ") %}
{# /* Log the loop... */ #}
      {{ log('Running insert operation on loop ' ~ loop.index ~ '/' ~ loops, info=True) }}
{# /* Run the batch insert query... */ #}
      {%- call statement('upload', fetch_result=False) -%}
       INSERT INTO {{ target.schema }}._dbt_results_json (log_write_time, log) VALUES {{ ns.batch_query }}
      {%- endcall -%}
{# /* And clear it back to an empty string */ #}
      {% set ns.batch_query = "" %}
    {% endif %}

{# /* Once you've worked through all the objects... */ #}
  {% endfor %}

{# /* Insert the last batch query string */ #}
  {{ log('Running final insert operation', info=True) }}

  {%- call statement('upload', fetch_result=False) -%}
       INSERT INTO {{ target.schema }}._dbt_results_json (log_write_time, log) VALUES {{ ns.batch_query }}
  {%- endcall -%}

  {% endif %} 

{% endmacro %}

_dbt_results_json.sql:

{{
  config(
    materialized = 'incremental',
    unique_key = 'log_write_time',
    partition_by = {
      "field": "log_write_time",
      "data_type": "timestamp",
      "granularity": "day"
    },
    partition_expiration_days = 30
  )
}}

with empty_table as (
    select
        cast(null as TIMESTAMP) as log_write_time,
        cast(null as JSON) as log
)

select * from empty_table
-- This is a filter so we will never actually insert these values
where 1 = 0

_dbt_results.sql

SELECT
    log_write_time,
    cast(json_value(log, '$.execution_time') AS numeric) AS execution_time,
    cast(json_value(log, '$.adapter_response.bytes_processed') AS numeric)
        AS bytes_processed,
    json_value(log, '$.node.name') AS node_name,
    json_value(log, '$.node.schema') AS schema_name,
    json_value(log, '$.node.database') AS database_name,
    json_value(log, '$.node.unique_id') AS unique_id,
    json_value(log, '$.adapter_response.job_id') AS bigquery_job,
    concat(
        'https://console.cloud.google.com/bigquery?project=',
        json_value(log, '$.adapter_response.project_id'),
        '&j=bq:',
        json_value(log, '$.adapter_response.location'),
        ':',
        json_value(log, '$.adapter_response.job_id'),
        '&page=queryresults'
    ) AS url,
    cast(json_value(log, '$.adapter_response.bytes_billed') AS numeric)
    / pow(10, 12) AS tebi_bytes_billed,
    cast(json_value(log, '$.adapter_response.bytes_billed') AS numeric)
    / pow(10, 12)
    * 6.25 AS billed_dollar_estimate,
    json_value(log, '$.status') AS status,
    json_value(log, '$.message') AS log_message
FROM
    {{ ref('_dbt_results_json') }}
ORDER BY log_write_time DESC