Skip to content

Commit

Permalink
[ADAP-353]: Fix for #581: Python incremental model (#594)
Browse files Browse the repository at this point in the history
* Fixes for incremental strategy py model.WIP

* doc string

* Remove extra comment

* Uncomment change schema test

* Update dbt/include/bigquery/macros/materializations/incremental.sql

Add python language exception for time_ingestion_partitioning

Co-authored-by: colin-rogers-dbt <[email protected]>

* Remove tox command flag for test python

* Adding the env vars in integration

* Switch to cluster execution for the python models

* Remove comment and add changie

* Skipping tests since dataproc is unstable. Restoring default to serverless

---------

Co-authored-by: colin-rogers-dbt <[email protected]>
  • Loading branch information
nssalian and colin-rogers-dbt authored Mar 10, 2023
1 parent 850296f commit 384f13c
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 9 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20230309-181313.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: Fix for Python incremental model regression
time: 2023-03-09T18:13:13.512904-08:00
custom:
Author: nssalian
Issue: "581"
13 changes: 9 additions & 4 deletions dbt/include/bigquery/macros/materializations/incremental.sql
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,17 @@

{% endmacro %}
{% macro bq_create_table_as(is_time_ingestion_partitioning, temporary, relation, compiled_code, language='sql') %}
{% if is_time_ingestion_partitioning %}
{% if is_time_ingestion_partitioning and language == 'python' %}
{% do exceptions.raise_compiler_error(
"Python models do not support ingestion time partitioning"
) %}
{% endif %}
{% if is_time_ingestion_partitioning and language == 'sql' %}
{#-- Create the table before inserting data as ingestion time partitioned tables can't be created with the transformed data --#}
{% do run_query(create_ingestion_time_partitioned_table_as_sql(temporary, relation, sql)) %}
{{ return(bq_insert_into_ingestion_time_partitioned_table_sql(relation, sql)) }}
{% do run_query(create_ingestion_time_partitioned_table_as_sql(temporary, relation, compiled_code)) %}
{{ return(bq_insert_into_ingestion_time_partitioned_table_sql(relation, compiled_code)) }}
{% else %}
{{ return(create_table_as(temporary, relation, sql)) }}
{{ return(create_table_as(temporary, relation, compiled_code, language)) }}
{% endif %}
{% endmacro %}

Expand Down
11 changes: 7 additions & 4 deletions tests/functional/adapter/test_python_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@
from dbt.tests.util import run_dbt, write_file
import dbt.tests.adapter.python_model.test_python_model as dbt_tests

# ToDo: Fix and schedule these tests:
# https://github.com/dbt-labs/dbt-bigquery/issues/306
TEST_SKIP_MESSAGE = "Skipping the Tests since Dataproc serverless is not stable. " \
"TODO: Fix later"


@pytest.mark.skip(reason=TEST_SKIP_MESSAGE)
class TestPythonModelDataproc(dbt_tests.BasePythonModelTests):
pass


@pytest.mark.skip(reason="Currently Broken")
@pytest.mark.skip(reason=TEST_SKIP_MESSAGE)
class TestPythonIncrementalMatsDataproc(dbt_tests.BasePythonIncrementalTests):
pass

Expand Down Expand Up @@ -37,7 +40,7 @@ def model(dbt, spark):
"""


@pytest.mark.skip(reason="Currently Broken")
@pytest.mark.skip(reason=TEST_SKIP_MESSAGE)
class TestChangingSchemaDataproc:

@pytest.fixture(scope="class")
Expand Down
4 changes: 3 additions & 1 deletion tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ passenv =
DBT_*
BIGQUERY_TEST_*
PYTEST_ADDOPTS
DATAPROC_*
GCS_BUCKET
commands =
bigquery: {envpython} -m pytest {posargs} -vv tests/functional -k "not TestPython" --profile service_account
bigquery: {envpython} -m pytest {posargs} -vv tests/functional --profile service_account
deps =
-rdev-requirements.txt
-e.
Expand Down

0 comments on commit 384f13c

Please sign in to comment.