From ae02890b54db5e3b52954b8be6987a26ea8e657d Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Tue, 10 Sep 2024 11:10:58 -0400 Subject: [PATCH 1/9] base microbatch support + tests --- .../test_incremental_microbatch.py | 76 +++++++++++++++++++ dbt/adapters/base/impl.py | 2 +- .../models/incremental/strategies.sql | 13 ++++ 3 files changed, 90 insertions(+), 1 deletion(-) create mode 100644 dbt-tests-adapter/dbt/tests/adapter/incremental/test_incremental_microbatch.py diff --git a/dbt-tests-adapter/dbt/tests/adapter/incremental/test_incremental_microbatch.py b/dbt-tests-adapter/dbt/tests/adapter/incremental/test_incremental_microbatch.py new file mode 100644 index 000000000..9a6abc675 --- /dev/null +++ b/dbt-tests-adapter/dbt/tests/adapter/incremental/test_incremental_microbatch.py @@ -0,0 +1,76 @@ +import os +from unittest import mock + +import pytest +from freezegun import freeze_time + +from dbt.tests.util import relation_from_name, run_dbt + +input_model_sql = """ +{{ config(materialized='table', event_time='event_time') }} +select 1 as id, TIMESTAMP '2020-01-01 00:00:00-0' as event_time +union all +select 2 as id, TIMESTAMP '2020-01-02 00:00:00-0' as event_time +union all +select 3 as id, TIMESTAMP '2020-01-03 00:00:00-0' as event_time +""" + +microbatch_model_sql = """ +{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day') }} +select * from {{ ref('input_model') }} +""" + + +class BaseMicrobatch: + @pytest.fixture(scope="class") + def models(self): + return { + "input_model.sql": input_model_sql, + "microbatch_model.sql": microbatch_model_sql, + } + + def assert_row_count(self, project, relation_name: str, expected_row_count: int): + relation = relation_from_name(project.adapter, relation_name) + result = project.run_sql(f"select count(*) as num_rows from {relation}", fetch="one") + + if result[0] != expected_row_count: + # running show for debugging + run_dbt(["show", "--inline", f"select * from {relation}"]) + + assert result[0] == expected_row_count + + @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) + def test_run_with_event_time(self, project): + # initial run -- backfills all data + with freeze_time("2020-01-01 13:57:00"): + run_dbt(["run"]) + self.assert_row_count(project, "microbatch_model", 3) + + # our partition grain is "day" so running the same day without new data should produce the same results + with freeze_time("2020-01-03 14:57:00"): + run_dbt(["run"]) + self.assert_row_count(project, "microbatch_model", 3) + + # add next two days of data + test_schema_relation = project.adapter.Relation.create( + database=project.database, schema=project.test_schema + ) + project.run_sql( + f"insert into {test_schema_relation}.input_model(id, event_time) values (4, TIMESTAMP '2020-01-04 00:00:00-0'), (5, TIMESTAMP '2020-01-05 00:00:00-0')" + ) + self.assert_row_count(project, "input_model", 5) + + # re-run without changing current time => no insert + with freeze_time("2020-01-03 14:57:00"): + run_dbt(["run", "--select", "microbatch_model"]) + self.assert_row_count(project, "microbatch_model", 3) + + # re-run by advancing time by one day changing current time => insert 1 row + with freeze_time("2020-01-04 14:57:00"): + run_dbt(["run", "--select", "microbatch_model"]) + self.assert_row_count(project, "microbatch_model", 4) + + # re-run by advancing time by one more day changing current time => insert 1 more row + with freeze_time("2020-01-05 14:57:00"): + run_dbt(["run", "--select", "microbatch_model"]) + self.assert_row_count(project, "microbatch_model", 5) \ No newline at end of file diff --git a/dbt/adapters/base/impl.py b/dbt/adapters/base/impl.py index 541c98469..7d776b914 100644 --- a/dbt/adapters/base/impl.py +++ b/dbt/adapters/base/impl.py @@ -1572,7 +1572,7 @@ def valid_incremental_strategies(self): return ["append"] def builtin_incremental_strategies(self): - return ["append", "delete+insert", "merge", "insert_overwrite"] + return ["append", "delete+insert", "merge", "insert_overwrite", "microbatch"] @available.parse_none def get_incremental_strategy_macro(self, model_context, strategy: str): diff --git a/dbt/include/global_project/macros/materializations/models/incremental/strategies.sql b/dbt/include/global_project/macros/materializations/models/incremental/strategies.sql index 72082ccad..49cdc9e64 100644 --- a/dbt/include/global_project/macros/materializations/models/incremental/strategies.sql +++ b/dbt/include/global_project/macros/materializations/models/incremental/strategies.sql @@ -66,6 +66,19 @@ {% endmacro %} +{% macro get_incremental_microbatch_sql(arg_dict) %} + + {{ return(adapter.dispatch('get_incremental_microbatch_sql', 'dbt')(arg_dict)) }} + +{% endmacro %} + +{% macro default__get_incremental_microbatch_sql(arg_dict) %} + + {% do return(get_microbatch_sql(arg_dict) %} + +{% endmacro %} + + {% macro get_insert_into_sql(target_relation, temp_relation, dest_columns) %} {%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%} From 7a7ce3fa3b507427380391731a115819976dc040 Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Wed, 11 Sep 2024 13:36:31 -0400 Subject: [PATCH 2/9] default microbatch: not implemented error --- .../tests/adapter/incremental/test_incremental_microbatch.py | 2 +- .../macros/materializations/models/incremental/strategies.sql | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/dbt-tests-adapter/dbt/tests/adapter/incremental/test_incremental_microbatch.py b/dbt-tests-adapter/dbt/tests/adapter/incremental/test_incremental_microbatch.py index 9a6abc675..d08d33f20 100644 --- a/dbt-tests-adapter/dbt/tests/adapter/incremental/test_incremental_microbatch.py +++ b/dbt-tests-adapter/dbt/tests/adapter/incremental/test_incremental_microbatch.py @@ -42,7 +42,7 @@ def assert_row_count(self, project, relation_name: str, expected_row_count: int) @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) def test_run_with_event_time(self, project): # initial run -- backfills all data - with freeze_time("2020-01-01 13:57:00"): + with freeze_time("2020-01-03 13:57:00"): run_dbt(["run"]) self.assert_row_count(project, "microbatch_model", 3) diff --git a/dbt/include/global_project/macros/materializations/models/incremental/strategies.sql b/dbt/include/global_project/macros/materializations/models/incremental/strategies.sql index 49cdc9e64..111d38877 100644 --- a/dbt/include/global_project/macros/materializations/models/incremental/strategies.sql +++ b/dbt/include/global_project/macros/materializations/models/incremental/strategies.sql @@ -74,7 +74,7 @@ {% macro default__get_incremental_microbatch_sql(arg_dict) %} - {% do return(get_microbatch_sql(arg_dict) %} + {{ exceptions.raise_not_implemented('microbatch materialization strategy not implemented for adapter ' + adapter.type()) }} {% endmacro %} From c4d707b9430bb81671498efda0ce83d3876629e1 Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Wed, 11 Sep 2024 13:41:12 -0400 Subject: [PATCH 3/9] simplify EventTimeFilter class --- dbt/adapters/base/relation.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dbt/adapters/base/relation.py b/dbt/adapters/base/relation.py index 0053265f0..80dbd34ba 100644 --- a/dbt/adapters/base/relation.py +++ b/dbt/adapters/base/relation.py @@ -37,8 +37,8 @@ SerializableIterable = Union[Tuple, FrozenSet] -@dataclass(frozen=True, eq=False, repr=False) -class EventTimeFilter(FakeAPIObject, Hashable): +@dataclass +class EventTimeFilter(FakeAPIObject): field_name: str start: Optional[datetime] = None end: Optional[datetime] = None From 36fc4dae441468ba7f16918da99418e5d1102535 Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Wed, 11 Sep 2024 13:48:36 -0400 Subject: [PATCH 4/9] linting --- .../tests/adapter/incremental/test_incremental_microbatch.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbt-tests-adapter/dbt/tests/adapter/incremental/test_incremental_microbatch.py b/dbt-tests-adapter/dbt/tests/adapter/incremental/test_incremental_microbatch.py index d08d33f20..ad9cc0636 100644 --- a/dbt-tests-adapter/dbt/tests/adapter/incremental/test_incremental_microbatch.py +++ b/dbt-tests-adapter/dbt/tests/adapter/incremental/test_incremental_microbatch.py @@ -73,4 +73,4 @@ def test_run_with_event_time(self, project): # re-run by advancing time by one more day changing current time => insert 1 more row with freeze_time("2020-01-05 14:57:00"): run_dbt(["run", "--select", "microbatch_model"]) - self.assert_row_count(project, "microbatch_model", 5) \ No newline at end of file + self.assert_row_count(project, "microbatch_model", 5) From 7bcbf78afebba520794b5d4601eceebafbe55433 Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Wed, 11 Sep 2024 13:53:39 -0400 Subject: [PATCH 5/9] make tests easier to override inputs for --- .../incremental/test_incremental_microbatch.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/dbt-tests-adapter/dbt/tests/adapter/incremental/test_incremental_microbatch.py b/dbt-tests-adapter/dbt/tests/adapter/incremental/test_incremental_microbatch.py index ad9cc0636..d27706c91 100644 --- a/dbt-tests-adapter/dbt/tests/adapter/incremental/test_incremental_microbatch.py +++ b/dbt-tests-adapter/dbt/tests/adapter/incremental/test_incremental_microbatch.py @@ -6,7 +6,7 @@ from dbt.tests.util import relation_from_name, run_dbt -input_model_sql = """ +_input_model_sql = """ {{ config(materialized='table', event_time='event_time') }} select 1 as id, TIMESTAMP '2020-01-01 00:00:00-0' as event_time union all @@ -15,7 +15,7 @@ select 3 as id, TIMESTAMP '2020-01-03 00:00:00-0' as event_time """ -microbatch_model_sql = """ +_microbatch_model_sql = """ {{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day') }} select * from {{ ref('input_model') }} """ @@ -23,7 +23,15 @@ class BaseMicrobatch: @pytest.fixture(scope="class") - def models(self): + def microbatch_model_sql(self): + return _microbatch_model_sql + + @pytest.fixture(scope="class") + def input_model_sql(self): + return _input_model_sql + + @pytest.fixture(scope="class") + def models(self, microbatch_model_sql, input_model_sql): return { "input_model.sql": input_model_sql, "microbatch_model.sql": microbatch_model_sql, From 7b1297f68d0f83732e4fb6511db733f25b9311b9 Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Wed, 11 Sep 2024 13:54:09 -0400 Subject: [PATCH 6/9] changelog entry --- .changes/unreleased/Features-20240911-135404.yaml | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 .changes/unreleased/Features-20240911-135404.yaml diff --git a/.changes/unreleased/Features-20240911-135404.yaml b/.changes/unreleased/Features-20240911-135404.yaml new file mode 100644 index 000000000..5ff9630d4 --- /dev/null +++ b/.changes/unreleased/Features-20240911-135404.yaml @@ -0,0 +1,6 @@ +kind: Features +body: Default microbatch strategy implementation and base tests +time: 2024-09-11T13:54:04.231977-04:00 +custom: + Author: michelleark + Issue: "302" From 9ad30fc664602fc519286b7773cba77e1ee71fd4 Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Wed, 11 Sep 2024 14:08:02 -0400 Subject: [PATCH 7/9] linting --- .../tests/adapter/incremental/test_incremental_microbatch.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbt-tests-adapter/dbt/tests/adapter/incremental/test_incremental_microbatch.py b/dbt-tests-adapter/dbt/tests/adapter/incremental/test_incremental_microbatch.py index d27706c91..a023cf1bd 100644 --- a/dbt-tests-adapter/dbt/tests/adapter/incremental/test_incremental_microbatch.py +++ b/dbt-tests-adapter/dbt/tests/adapter/incremental/test_incremental_microbatch.py @@ -25,7 +25,7 @@ class BaseMicrobatch: @pytest.fixture(scope="class") def microbatch_model_sql(self): return _microbatch_model_sql - + @pytest.fixture(scope="class") def input_model_sql(self): return _input_model_sql From f249db06439cb2969fc89fc9620a72c146da96d2 Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Wed, 11 Sep 2024 15:02:39 -0400 Subject: [PATCH 8/9] bump to 1.6.0a --- dbt/adapters/__about__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbt/adapters/__about__.py b/dbt/adapters/__about__.py index e3a0f0153..50e7f5d8c 100644 --- a/dbt/adapters/__about__.py +++ b/dbt/adapters/__about__.py @@ -1 +1 @@ -version = "1.5.0" +version = "1.6.0a" From c36d15680ccc39d51906c6c740f113c424743f33 Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Wed, 11 Sep 2024 16:32:01 -0400 Subject: [PATCH 9/9] parametrize insert_two_rows_sql, clean up assert_row_count utility --- .../test_incremental_microbatch.py | 37 +++++++++++-------- 1 file changed, 22 insertions(+), 15 deletions(-) diff --git a/dbt-tests-adapter/dbt/tests/adapter/incremental/test_incremental_microbatch.py b/dbt-tests-adapter/dbt/tests/adapter/incremental/test_incremental_microbatch.py index a023cf1bd..dae91c974 100644 --- a/dbt-tests-adapter/dbt/tests/adapter/incremental/test_incremental_microbatch.py +++ b/dbt-tests-adapter/dbt/tests/adapter/incremental/test_incremental_microbatch.py @@ -1,4 +1,5 @@ import os +from pprint import pformat from unittest import mock import pytest @@ -23,13 +24,27 @@ class BaseMicrobatch: @pytest.fixture(scope="class") - def microbatch_model_sql(self): + def microbatch_model_sql(self) -> str: + """ + This is the SQL that defines the microbatch model, including any {{ config(..) }} + """ return _microbatch_model_sql @pytest.fixture(scope="class") - def input_model_sql(self): + def input_model_sql(self) -> str: + """ + This is the SQL that defines the input model to the microbatch model, including any {{ config(..) }}. + event_time is a required configuration of this input + """ return _input_model_sql + @pytest.fixture(scope="class") + def insert_two_rows_sql(self, project) -> str: + test_schema_relation = project.adapter.Relation.create( + database=project.database, schema=project.test_schema + ) + return f"insert into {test_schema_relation}.input_model (id, event_time) values (4, TIMESTAMP '2020-01-04 00:00:00-0'), (5, TIMESTAMP '2020-01-05 00:00:00-0')" + @pytest.fixture(scope="class") def models(self, microbatch_model_sql, input_model_sql): return { @@ -39,16 +54,12 @@ def models(self, microbatch_model_sql, input_model_sql): def assert_row_count(self, project, relation_name: str, expected_row_count: int): relation = relation_from_name(project.adapter, relation_name) - result = project.run_sql(f"select count(*) as num_rows from {relation}", fetch="one") - - if result[0] != expected_row_count: - # running show for debugging - run_dbt(["show", "--inline", f"select * from {relation}"]) + result = project.run_sql(f"select * from {relation}", fetch="all") - assert result[0] == expected_row_count + assert len(result) == expected_row_count, f"{relation_name}:{pformat(result)}" @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) - def test_run_with_event_time(self, project): + def test_run_with_event_time(self, project, insert_two_rows_sql): # initial run -- backfills all data with freeze_time("2020-01-03 13:57:00"): run_dbt(["run"]) @@ -60,12 +71,8 @@ def test_run_with_event_time(self, project): self.assert_row_count(project, "microbatch_model", 3) # add next two days of data - test_schema_relation = project.adapter.Relation.create( - database=project.database, schema=project.test_schema - ) - project.run_sql( - f"insert into {test_schema_relation}.input_model(id, event_time) values (4, TIMESTAMP '2020-01-04 00:00:00-0'), (5, TIMESTAMP '2020-01-05 00:00:00-0')" - ) + project.run_sql(insert_two_rows_sql) + self.assert_row_count(project, "input_model", 5) # re-run without changing current time => no insert