Skip to content

Commit

Permalink
Merge pull request #80 from bcodell/bryce/timespine-poc
Browse files Browse the repository at this point in the history
Feature: Time Spine Datasets
  • Loading branch information
bcodell authored Jan 16, 2025
2 parents 8249a09 + ea35c50 commit ce7549e
Show file tree
Hide file tree
Showing 12 changed files with 285 additions and 35 deletions.
27 changes: 0 additions & 27 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,33 +47,6 @@ jobs:
env:
GCP_KEYFILE_PATH: ./gcp_keyfile.json
DBT_PROFILES_DIR: . # Use integration_tests/profiles.yml
- name: dbt CI - snowflake - with stream
id: snowflake_ci_with_stream
if: github.repository == 'bcodell/dbt-activity-schema'
run: |
localstack extensions install localstack-extension-snowflake
localstack start -d
cd ./integration_tests
sed -i 's/skip_stream: true/skip_stream: false/' dbt_project.yml
dbt build -x --target snowflake --exclude dataset__select_all_aggregate_all
env:
DBT_PROFILES_DIR: . # Use integration_tests/profiles.yml
DEBUG: 1

- name: dbt CI - snowflake - skip stream
id: snowflake_ci_skip_stream
if: github.repository == 'bcodell/dbt-activity-schema'
run: |
cd ./integration_tests
sed -i 's/skip_stream: false/skip_stream: true/' dbt_project.yml
dbt run -s activities+ -x --target snowflake --exclude dataset__select_all_aggregate_all
dbt test -s activities+ -x --target snowflake --exclude dataset__select_all_aggregate_all
env:
DBT_PROFILES_DIR: . # Use integration_tests/profiles.yml
DEBUG: 1
- name: localstack logs
if: failure() && (steps.snowflake_ci_with_stream.outcome == 'failure' || steps.snowflake_ci_skip_stream.outcome == 'failure' )
run: localstack logs
- name: dbt CI - bigquery - with stream
if: github.repository == 'bcodell/dbt-activity-schema'
run: |
Expand Down
56 changes: 56 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -647,6 +647,39 @@ Each included column can optionally be aliased to a whitespace-free friendly nam

# **Advanced Usage**

## **Combining SQL and AQL**
AQL is meant to be a SQL-esque way to seamlessly and consistently transform data from event structures to the denormalized structure needed for analysis and visualization tasks. But there are cases where AQL alone can't produce the desired dataset - for example, building a pre-aggregated table (e.g. weekly or monthly) or combining data from multiple entities/streams. For these cases, having an escape hatch is essential, and the obvious choice here is SQL. And since AQL simply renders to SQL during the dbt compilation process, the two can be combined within a single model. It's as easy as wrapping the `dbt_activity_schema.dataset` macro call in a CTE. See the example below for a basic idea:

```sql
{% set aql %}
select all bought_something (
activity_id as activity_id,
entity_uuid as customer_id,
ts as bought_something_at,
revenue_impact as order_revenue
-- json_extract will be rendered appropriately based on the target
-- keys passed to json_extract should be wrapped in quotes
)
append first ever visited_page (
ts as first_page_visit_at
filter {ts} >= '2023-01-01'
)
{% endset %}


with base as ( -- wrap the dataset macro with a CTE
{{dbt_activity_schema.dataset(aql=aql)}} -- this macro renders to sql
)
-- apply an arbitrary follow-up sql transformation
select
date_trunc('month', first_page_visit_at)::date as first_pageview_month,
count(distinct customer_id) as total_customers,
sum(order_revenue) as total_revenue,
total_revenue/total_customers as revenue_per_customer
from base
group by 1
```

## **Extra Join Criteria for Joined Activities**
Example code:
```sql
Expand Down Expand Up @@ -755,6 +788,29 @@ aggregate all bought_something (
)
```

## **Building Datasets Over Time**
As of version 0.5.0, there is an AQL-centric way to do this - the `time_spine` relationship selector for the primary activity. See code example and relevant arguments below for use:
```sql
using customer_stream
select time_spine(interval=month, end_period=current) visited_page (
entity_uuid as customer_id
)
aggregate between visited_page (
count(activity_id) as monthly_page_visits
)
aggregate between bought_something (
sum(revenue_impact) as monthly_revenue
)
```
The above will produce a dataset with one row per entity per month for all entities who have at least one `visited_page` activity, starting with the month of their first event and up until the current month (as of when the query is executed), with all months included. The timestamp is included in the output, as are all of the columns defined in the AQL statement. Appending and aggregating joined activities operates as normal. This feature effectively allows the developer to create a synthetic event that they can use for easy construction of datasets used for time series reporting and analysis. Usage notes:

#### Arguments:
* __interval__: the frequency of time in the time spine. Currently supports `day, week, month, quarter, year`.
* __end_period__: the last period to use for a given entity instance. Options are `max` - which uses the last observed period for each entity in the specified activity, or `current` - which goes up to the current time period at which the query is run. For `max`, different entity instances will have different last periods, while for `current` the last period is the same for all entities.

#### Functionality Notes:
* The query will ensure one row per entity-period even if the entity has no events in a given period. In the above example, if customer *1* has a pageview in January 2024 but not February 2024, there will still be a row for customer *1* and time period February 2024 in the resulting dataset.

## **Adding Custom Aggregation Functions**
Placeholder. Documentation coming soon.

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{% set aql %}
using customer_stream
select time_spine(interval=month, end_period=max) visited_page (
entity_uuid as customer_id
)
aggregate between bought_something(
sum(total_items_purchased) as total_items_purchased_between,
sum(total_sales) as total_sales_between,
count(activity_id) as total_purchases_between
)
{% endset %}

-- depends_on: {{ ref('output__time_spine') }}

{{ dbt_activity_schema.dataset(aql) }}
order by 3,1
9 changes: 9 additions & 0 deletions integration_tests/models/datasets/time_spine/schema.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
version: 2

models:

- name: dataset__time_spine
description: A test to validate the functionality of macro-based dataset generation.
tests:
- dbt_utils.equality:
compare_model: ref("output__time_spine")
4 changes: 2 additions & 2 deletions integration_tests/profiles.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ integration_tests:
bigquery:
type: bigquery
method: service-account
project: dbt-activity-schema
dataset: dbt_activity_schema_integration_tests
project: dbt-aql
dataset: dbt_aql_integration_tests
threads: 4 # Must be a value of 1 or greater
keyfile: "{{ env_var('GCP_KEYFILE_PATH') }}"
OPTIONAL_CONFIG: VALUE
Expand Down
13 changes: 13 additions & 0 deletions integration_tests/seeds/datasets/time_spine/output__time_spine.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
ts,activity_id,customer_id,total_items_purchased_between,total_sales_between,total_purchases_between
2022-01-01,ac78479a9f4e0a82141e63285e0bd937,1,6,350,2
2022-02-01,40baeaad6b5e05afb0f2346f3f73f2c4,1,5,250,1
2022-03-01,c258c810fde59ef5e4d5f0c5b3b59c50,1,0,0,0
2022-01-01,d242adbbac9ebdbba8caf4d669a13db8,4,2,80,1
2022-02-01,b040c6925c8a8b4fda30d87bb3c828fe,4,6,600,2
2022-03-01,414fd7624da769643b82b680a0476d01,4,0,0,0
2022-01-01,e93fa0bce5965387e1ee99086d13ebba,7,3,120,1
2022-02-01,6b62dceb80db68b6260c1ab313a13d36,7,2,150,2
2022-03-01,e554eceabffb4739dffa03e6d96d90b0,7,0,0,0
2022-01-01,5168a14e1555818beb23a394f9e2f95f,10,4,50,1
2022-02-01,91ee7a6ca7f704b424afae0af772078c,10,18,1600,2
2022-03-01,97e03bb3fa450367f3fff74634a2f886,10,0,0,0
8 changes: 6 additions & 2 deletions macros/activity_schema/activity/activity.sql
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
columns,
nth=none,
filters=none,
extra_joins=none
extra_joins=none,
interval=none,
end_period=none
) %}

{%- set join_clauses = dbt_activity_schema._join_clause_map() -%}
Expand Down Expand Up @@ -50,7 +52,9 @@ aql query in model '{{ model.unique_id }}' has invalid syntax. Parsed invalid re
columns=columns,
nth=nth,
filters=filters,
extra_joins=extra_joins
extra_joins=extra_joins,
interval=interval,
end_period=end_period
)) -%}


Expand Down
70 changes: 69 additions & 1 deletion macros/activity_schema/dataset/_build_dataset.sql
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,70 @@ aql query in model '{{ model.unique_id }}' has invalid syntax. Please choose a v


with
{% if primary_activity.relationship_selector == rs.time_spine -%}
time_spine_entities as (
select
{{columns.customer}} as {{req}}{{columns.customer}},
{% if columns.anonymous_customer_id is defined %}
{{primary}}.{{columns.anonymous_customer_id}} as {{req}}{{columns.anonymous_customer_id}},
{% endif %}
{%- for column in primary_activity.columns %}
{{ dbt_activity_schema.select_column(stream, primary, column).column_sql }} as {{column.alias}},
{%- endfor %}
date_trunc('{{primary_activity.interval}}', min({{primary}}.{{columns.ts}})) as period_start,
{% if primary_activity.end_period=='current' %}current_timestamp{% else %}date_trunc('{{primary_activity.interval}}', max({{primary}}.{{columns.ts}})){% endif %} as period_end,
date_diff('{{primary_activity.interval}}', period_start::timestamp, {% if primary_activity.end_period=='current' %}current_timestamp{% else %}date_trunc('{{primary_activity.interval}}', max({{primary}}.{{columns.ts}})){% endif %}::timestamp) as active_periods
from {% if primary_activity.filters is none %}{% if not skip_stream %}{{ stream_relation }}{% else %}{{ ref(primary_activity.model_name) }}{% endif %}{% else %}{{primary_activity_alias}}{{fs}}{% endif %} as {{primary}}
where true
{% if not skip_stream %}
and {{primary}}.{{columns.activity}} = {{dbt_activity_schema.clean_activity_name(stream, primary_activity.activity_name)}}
{% endif %}
group by
{{req}}{{columns.customer}},
{% if columns.anonymous_customer_id is defined %}
{{req}}{{columns.anonymous_customer_id}},
{% endif %}
{%- for column in primary_activity.columns %}
{{column.alias}}{% if not loop.last %},{% endif %}
{%- endfor %}
),
time_spine_metadata as (
select
min(period_start) as first_period_start,
max(active_periods) as max_active_periods
from time_spine_entities
),
number_spine as (
with recursive number_spine as (
select 1 as n -- start the spine at 1
union all
select n + 1
from number_spine
where n <= (select max_active_periods from time_spine_metadata) -- adjust the upper limit as needed
)
select * from number_spine),
{{primary_activity_alias}} as (
select
{%- for column in primary_activity.columns %}
tse.{{column.alias}},
{%- endfor %}
ns.n-1 as n0,
{{ dbt_activity_schema.dateadd(primary_activity.interval, 'n0', 'tse.period_start') }} as {{columns.ts}},
{{ dbt_activity_schema.dateadd(primary_activity.interval, 1, columns.ts) }} as {{columns.activity_repeated_at}},
ns.n as {{columns.activity_occurrence}},
md5(tse.{{req}}{{columns.customer}} || {{columns.ts}}) as {{req}}{{columns.activity_id}},
tse.{{req}}{{columns.customer}},
{% if columns.anonymous_customer_id is defined %}
tse.{{req}}{{columns.anonymous_customer_id}},
{% endif %}
{{columns.ts}} as {{req}}{{columns.ts}},
{{columns.activity_occurrence}} as {{req}}{{columns.activity_occurrence}},
{{columns.activity_repeated_at}} as {{req}}{{columns.activity_repeated_at}},
from time_spine_entities tse
left join number_spine ns
on tse.active_periods >= ns.n
)
{% else %}
{% if primary_activity.filters is not none %}
{{primary_activity_alias}}{{fs}} as (
select
Expand Down Expand Up @@ -134,7 +198,7 @@ with
and {{primary}}.{{columns.activity}} = {{dbt_activity_schema.clean_activity_name(stream, primary_activity.activity_name)}}
{% endif %}
and {{ primary_activity.relationship_clause }}
){% if joined_activities|length > 0 %},{% endif %}
){% endif %}{% if joined_activities|length > 0 %},{% endif %}
{% for ja in joined_activities %}

{# cte below only applies to filtered append activities since activity occurrence and next activity need to be recomputed for use in the join #}
Expand Down Expand Up @@ -239,6 +303,10 @@ with
){% if not loop.last %},{% endif %}
{% endfor %}
select
{%- if primary_activity.relationship_selector == rs.time_spine %}
{{primary}}.{{columns.ts}},
{{primary}}.{{req}}{{columns.activity_id}} as {{columns.activity_id}},
{%- endif %}
{%- for column in primary_activity.columns %}
{{primary}}.{{column.alias}}{% if loop.last and joined_activities|length == 0 -%}{% else -%},{%- endif -%}
{%- endfor %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,17 @@ true
{{alias}}.{{activity_occurrence}} = {{nth}}
{% endmacro %}

{% macro _relationship_clause_time_spine(verb, join_condition, nth) %}

{% endmacro %}


{% macro _relationship_clause_map() %}
{%- do return(namespace(
first=dbt_activity_schema._relationship_clause_first,
last=dbt_activity_schema._relationship_clause_last,
all=dbt_activity_schema._relationship_clause_all,
nth=dbt_activity_schema._relationship_clause_nth
nth=dbt_activity_schema._relationship_clause_nth,
time_spine=dbt_activity_schema._relationship_clause_time_spine,
)) -%}
{% endmacro %}
27 changes: 27 additions & 0 deletions macros/activity_schema/dataset/utils.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
{% macro dateadd(interval, periods, ts) %}
{{ return(adapter.dispatch('dateadd', 'dbt_activity_schema')(interval, periods, ts)) }}
{% endmacro %}

{% macro default__dateadd(interval, periods, ts) %}
dateadd({{interval}}, {{periods}}, {{ts}})
{% endmacro %}

{% macro duckdb__dateadd(interval, periods, ts) %}
date_add({{ts}}, {{periods}} * interval 1 {{interval}})
{% endmacro %}

{% macro snowflake__dateadd(interval, periods, ts) %}
dateadd({{interval}}, {{periods}}, {{ts}})
{% endmacro %}

{% macro redshift__dateadd(interval, periods, ts) %}
dateadd({{interval}}, {{periods}}, {{ts}})
{% endmacro %}

{% macro bigquery__dateadd(interval, periods, ts) %}
date_add({{ts}}, {{periods}} * interval 1 {{interval}})
{% endmacro %}



SELECT DATE_ADD(DATE('2025-01-01'), INTERVAL 7 DAY) AS new_date;
Loading

0 comments on commit ce7549e

Please sign in to comment.