Skip to content

Commit

Permalink
removed tag isolation for iceberg retries retries tests
Browse files Browse the repository at this point in the history
  • Loading branch information
VanTudor committed Jan 17, 2025
1 parent 8dbdf7c commit b0309a9
Showing 1 changed file with 23 additions and 40 deletions.
63 changes: 23 additions & 40 deletions dbt-athena/tests/functional/adapter/test_retries_iceberg.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,13 @@

import copy
import os
import time

import pytest

from dbt.artifacts.schemas.results import RunStatus
from dbt.tests.util import check_relations_equal, run_dbt, run_dbt_and_capture

PARALLELISM = 2
PARALLELISM = 10

base_dbt_profile = {
"type": "athena",
Expand Down Expand Up @@ -41,48 +40,36 @@
"""

"""
Using an unique tag here to avoid some issues that pop up when running tests in parallel.
"""


def get_models__source(unique_tag: str):
return {
f"model_{i}.sql": f"""
{{{{
config(
table_type='iceberg',
materialized='table',
tags=['{unique_tag}'],
pre_hook='insert into target values ({i}, {i})'
)
}}}}
select 1 as col
"""
for i in range(PARALLELISM)
}
models__source = {
f"model_{i}.sql": f"""
{{{{
config(
table_type='iceberg',
materialized='table',
tags=['src'],
pre_hook='insert into target values ({i}, {i})'
)
}}}}
select 1 as col
"""
for i in range(PARALLELISM)
}

seeds__expected_target_init = "id,status"
seeds__expected_target_post = "id,status\n" + "\n".join([f"{i},{i}" for i in range(PARALLELISM)])


@pytest.mark.flaky
class TestIcebergRetriesDisabled:
@pytest.fixture(scope="class")
def unique_tag(self):
return str(time.time())

@pytest.fixture(scope="class")
def dbt_profile_target(self):
profile = copy.deepcopy(base_dbt_profile)
profile["num_iceberg_retries"] = 0
return profile

@pytest.fixture(scope="class")
def models(self, unique_tag):
return {**{"target.sql": models__target}, **get_models__source(unique_tag)}
def models(self):
return {**{"target.sql": models__target}, **models__source}

@pytest.fixture(scope="class")
def seeds(self):
Expand All @@ -91,7 +78,7 @@ def seeds(self):
"expected_target_post.csv": seeds__expected_target_post,
}

def test__retries_iceberg(self, project, unique_tag):
def test__retries_iceberg(self, project):
"""Seed should match the model after run"""

expected__init_seed_name = "expected_target_init"
Expand All @@ -106,16 +93,12 @@ def test__retries_iceberg(self, project, unique_tag):
expected__post_seed_name = "expected_target_post"
run_dbt(["seed", "--select", expected__post_seed_name, "--full-refresh"])

run, log = run_dbt_and_capture(["run", "--select", f"tag:{unique_tag}"], expect_pass=False)
run, log = run_dbt_and_capture(["run", "--select", "tag:src"], expect_pass=False)
assert any(model_run_result.status == RunStatus.Error for model_run_result in run.results)
assert "ICEBERG_COMMIT_ERROR" in log


class TestIcebergRetriesEnabled:
@pytest.fixture(scope="class")
def unique_tag(self):
return str(time.time())

@pytest.fixture(scope="class")
def dbt_profile_target(self):
profile = copy.deepcopy(base_dbt_profile)
Expand All @@ -124,8 +107,8 @@ def dbt_profile_target(self):
return profile

@pytest.fixture(scope="class")
def models(self, unique_tag):
return {**{"target.sql": models__target}, **get_models__source(unique_tag)}
def models(self):
return {**{"target.sql": models__target}, **models__source}

@pytest.fixture(scope="class")
def seeds(self):
Expand All @@ -134,7 +117,7 @@ def seeds(self):
"expected_target_post.csv": seeds__expected_target_post,
}

def test__retries_iceberg(self, project, unique_tag):
def test__retries_iceberg(self, project):
"""Seed should match the model after run"""

expected__init_seed_name = "expected_target_init"
Expand All @@ -149,7 +132,7 @@ def test__retries_iceberg(self, project, unique_tag):
expected__post_seed_name = "expected_target_post"
run_dbt(["seed", "--select", expected__post_seed_name, "--full-refresh"])

run = run_dbt(["run", "--select", f"tag:{unique_tag}"])
run = run_dbt(["run", "--select", "tag:src"])
assert all(
[model_run_result.status == RunStatus.Success for model_run_result in run.results]
)
Expand Down

0 comments on commit b0309a9

Please sign in to comment.