Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/external catalog config #1215

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions dbt/adapters/snowflake/catalog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
from typing import Dict, Optional

import textwrap

from dbt.adapters.base import BaseRelation
from dbt.adapters.contracts.catalog import CatalogIntegration, CatalogIntegrationType
from dbt.adapters.contracts.relation import RelationConfig


class SnowflakeManagedIcebergCatalogIntegration(CatalogIntegration):
catalog_type = CatalogIntegrationType.managed

def render_ddl_predicates(self, relation: RelationConfig) -> str:
"""
{{ optional('external_volume', dynamic_table.catalog.external_volume) }}
{{ optional('catalog', dynamic_table.catalog.name) }}
base_location = '{{ dynamic_table.catalog.base_location }}'
:param relation:
:return:
"""
base_location: str = f"_dbt/{relation.schema}/{relation.name}"

if sub_path := relation.config.get("base_location_subpath"):
base_location += f"/{sub_path}"

iceberg_ddl_predicates: str = f"""
external_volume = '{self.external_volume}'
catalog = 'snowflake'
base_location = '{base_location}'
"""
return textwrap.indent(textwrap.dedent(iceberg_ddl_predicates), " " * 10)


class SnowflakeGlueCatalogIntegration(CatalogIntegration):
catalog_type = CatalogIntegrationType.glue
auto_refresh: str = "FALSE"
replace_invalid_characters: str = "FALSE"

def _handle_adapter_configs(self, adapter_configs: Optional[Dict]) -> None:
if adapter_configs:
if "auto_refresh" in adapter_configs:
self.auto_refresh = adapter_configs["auto_refresh"]
if "replace_invalid_characters" in adapter_configs:
self.replace_invalid_characters = adapter_configs["replace_invalid_characters"]

def render_ddl_predicates(self, relation: BaseRelation) -> str:
ddl_predicate = f"""create or replace iceberg table {relation.render()}
external_volume = '{self.external_volume}
catalog = '{self.name}'
"""
if self.namespace:
ddl_predicate += "CATALOG_NAMESPACE = '{self.namespace}'"
if self.auto_refresh:
ddl_predicate += f"REPLACE_INVALID_CHARACTERS = {self.auto_refresh}"
if self.replace_invalid_characters:
ddl_predicate += f"AUTO_REFRESH = {self.replace_invalid_characters}"
return ddl_predicate
9 changes: 9 additions & 0 deletions dbt/adapters/snowflake/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@
from dbt.adapters.base.impl import AdapterConfig, ConstraintSupport
from dbt.adapters.base.meta import available
from dbt.adapters.capability import CapabilityDict, CapabilitySupport, Support, Capability
from dbt.adapters.contracts.catalog import CatalogIntegrationType
from dbt.adapters.contracts.relation import RelationConfig
from dbt.adapters.snowflake.catalog import (
SnowflakeManagedIcebergCatalogIntegration,
SnowflakeGlueCatalogIntegration,
)
from dbt.adapters.sql import SQLAdapter
from dbt.adapters.sql.impl import (
LIST_SCHEMAS_MACRO_NAME,
Expand Down Expand Up @@ -63,6 +68,10 @@ class SnowflakeAdapter(SQLAdapter):
ConnectionManager = SnowflakeConnectionManager

AdapterSpecificConfigs = SnowflakeConfig
CatalogIntegrations = {
CatalogIntegrationType.managed: SnowflakeManagedIcebergCatalogIntegration,
CatalogIntegrationType.glue: SnowflakeGlueCatalogIntegration,
}

CONSTRAINT_SUPPORT = {
ConstraintType.check: ConstraintSupport.NOT_SUPPORTED,
Expand Down
4 changes: 4 additions & 0 deletions dbt/adapters/snowflake/relation_configs/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ def parse_relation_config(cls, relation_config: RelationConfig) -> Dict[str, Any

return config_dict

@classmethod
def from_relation_config(cls, relation_config: RelationConfig) -> Self:
return cls.from_dict(cls.parse_relation_config(relation_config))

@classmethod
def parse_relation_results(cls, relation_results: RelationResults) -> Dict[str, Any]:
# this try block can be removed once enable_iceberg_materializations is retired
Expand Down
42 changes: 37 additions & 5 deletions dbt/adapters/snowflake/relation_configs/dynamic_table.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
from dataclasses import dataclass
from typing import Optional, Dict, Any, TYPE_CHECKING
from typing import Optional, Dict, Any, TYPE_CHECKING, Union

from dbt.adapters.contracts.catalog import CatalogIntegrationConfig, CatalogIntegrationType
from dbt.adapters.relation_configs import RelationConfigChange, RelationResults
from dbt.adapters.clients import catalogs as catalogs_client
from dbt.adapters.contracts.relation import RelationConfig
from dbt.adapters.contracts.relation import ComponentName
from dbt_common.dataclass_schema import StrEnum # doesn't exist in standard library until py3.11
from typing_extensions import Self

from dbt.adapters.relation_configs.formats import TableFormat
from dbt.adapters.snowflake.catalog import SnowflakeManagedIcebergCatalogIntegration
from dbt.adapters.snowflake.relation_configs.base import SnowflakeRelationConfigBase
from dbt.adapters.snowflake.relation_configs.catalog import (
SnowflakeCatalogConfig,
SnowflakeCatalogConfigChange,
)


if TYPE_CHECKING:
import agate

Expand All @@ -37,6 +40,33 @@ def default(cls) -> Self:
return cls("ON_CREATE")


def _setup_catalog_integration(catalog_info: Union[Dict, RelationConfig]) -> Optional[str]:
breakpoint()
if not catalog_info:
return None
elif isinstance(catalog_info, dict):
catalog_config = SnowflakeCatalogConfig.from_dict(catalog_info)
else:
catalog_config = SnowflakeCatalogConfig.from_relation_config(catalog_info)

if catalog_config.table_format != TableFormat.default():
catalog_name = "snowflake_managed"
integration_config = CatalogIntegrationConfig(
catalog_name=catalog_name,
integration_name=catalog_config.name,
table_format=catalog_config.table_format,
catalog_type=CatalogIntegrationType.managed.value,
external_volume=catalog_config.external_volume,
)
catalogs_client.add_catalog(
SnowflakeManagedIcebergCatalogIntegration(integration_config),
catalog_name=catalog_name,
)
return catalog_name
else:
return None


@dataclass(frozen=True, eq=True, unsafe_hash=True)
class SnowflakeDynamicTableConfig(SnowflakeRelationConfigBase):
"""
Expand All @@ -60,12 +90,13 @@ class SnowflakeDynamicTableConfig(SnowflakeRelationConfigBase):
query: str
target_lag: str
snowflake_warehouse: str
catalog: SnowflakeCatalogConfig
catalog: Optional[str] = None
refresh_mode: Optional[RefreshMode] = RefreshMode.default()
initialize: Optional[Initialize] = Initialize.default()

@classmethod
def from_dict(cls, config_dict: Dict[str, Any]) -> Self:
catalog = _setup_catalog_integration(config_dict["catalog"])
kwargs_dict = {
"name": cls._render_part(ComponentName.Identifier, config_dict.get("name")),
"schema_name": cls._render_part(ComponentName.Schema, config_dict.get("schema_name")),
Expand All @@ -75,7 +106,7 @@ def from_dict(cls, config_dict: Dict[str, Any]) -> Self:
"query": config_dict.get("query"),
"target_lag": config_dict.get("target_lag"),
"snowflake_warehouse": config_dict.get("snowflake_warehouse"),
"catalog": SnowflakeCatalogConfig.from_dict(config_dict["catalog"]),
"catalog": catalog,
"refresh_mode": config_dict.get("refresh_mode"),
"initialize": config_dict.get("initialize"),
}
Expand All @@ -84,14 +115,15 @@ def from_dict(cls, config_dict: Dict[str, Any]) -> Self:

@classmethod
def parse_relation_config(cls, relation_config: RelationConfig) -> Dict[str, Any]:
catalog = _setup_catalog_integration(relation_config)
config_dict = {
"name": relation_config.identifier,
"schema_name": relation_config.schema,
"database_name": relation_config.database,
"query": relation_config.compiled_code,
"target_lag": relation_config.config.extra.get("target_lag"),
"snowflake_warehouse": relation_config.config.extra.get("snowflake_warehouse"),
"catalog": SnowflakeCatalogConfig.parse_relation_config(relation_config),
"catalog": catalog,
}

if refresh_mode := relation_config.config.extra.get("refresh_mode"):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

{%- set dynamic_table = relation.from_config(config.model) -%}

{%- if dynamic_table.catalog.table_format == 'iceberg' -%}
{%- if dynamic_table.catalog is not none -%}
{{ _get_create_dynamic_iceberg_table_as_sql(dynamic_table, relation, sql) }}
{%- else -%}
{{ _get_create_dynamic_standard_table_as_sql(dynamic_table, relation, sql) }}
Expand Down Expand Up @@ -70,12 +70,16 @@
-- A valid DDL statement which will result in a new dynamic iceberg table.
-#}

{% set catalog_integration = adapter.get_catalog_integration(dynamic_table.catalog) -%}

{% if not catalog_integration -%}
{{ raise('Catalog integration is required for iceberg tables') }}
{%- endif -%}

create dynamic iceberg table {{ relation }}
target_lag = '{{ dynamic_table.target_lag }}'
warehouse = {{ dynamic_table.snowflake_warehouse }}
{{ optional('external_volume', dynamic_table.catalog.external_volume) }}
{{ optional('catalog', dynamic_table.catalog.name) }}
base_location = '{{ dynamic_table.catalog.base_location }}'
{{ catalog_integration.render_ddl_predicates(relation) }}
{{ optional('refresh_mode', dynamic_table.refresh_mode) }}
{{ optional('initialize', dynamic_table.initialize) }}
as (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

{%- set dynamic_table = relation.from_config(config.model) -%}

{%- if dynamic_table.catalog.table_format == 'iceberg' -%}
{%- if dynamic_table.catalog is not none -%}
{{ _get_replace_dynamic_iceberg_table_as_sql(dynamic_table, relation, sql) }}
{%- else -%}
{{ _get_replace_dynamic_standard_table_as_sql(dynamic_table, relation, sql) }}
Expand Down Expand Up @@ -68,13 +68,16 @@
-- Returns:
-- A valid DDL statement which will result in a new dynamic iceberg table.
-#}
{% set catalog_integration = adapter.get_catalog_integration(dynamic_table.catalog) -%}

{% if not catalog_integration -%}
{{ raise('Catalog integration is required for iceberg tables') }}
{%- endif -%}

create or replace dynamic iceberg table {{ relation }}
target_lag = '{{ dynamic_table.target_lag }}'
warehouse = {{ dynamic_table.snowflake_warehouse }}
{{ optional('external_volume', dynamic_table.catalog.external_volume) }}
{{ optional('catalog', dynamic_table.catalog.name) }}
base_location = '{{ dynamic_table.catalog.base_location }}'
{{ catalog_integration.render_ddl_predicates(relation) }}
{{ optional('refresh_mode', dynamic_table.refresh_mode) }}
{{ optional('initialize', dynamic_table.initialize) }}
as (
Expand Down
23 changes: 23 additions & 0 deletions dev-requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# install latest changes in dbt-core
git+https://github.com/dbt-labs/dbt-core.git@catalogs-parsing#egg=dbt-core&subdirectory=core
git+https://github.com/dbt-labs/dbt-adapters.git
git+https://github.com/dbt-labs/dbt-adapters.git#subdirectory=dbt-tests-adapter
git+https://github.com/dbt-labs/dbt-common.git

# dev
ipdb~=0.13.13
pre-commit~=3.7.0

# test
ddtrace==2.3.0
pytest~=7.4
pytest-csv~=3.0
pytest-dotenv~=0.5.2
pytest-logbook~=1.2
pytest-xdist~=3.6
tox~=4.16

# build
bumpversion~=0.6.0
twine~=5.1
wheel~=0.43
8 changes: 4 additions & 4 deletions hatch.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ sources = ["src"]

[envs.default]
dependencies = [
"dbt-adapters @ git+https://github.com/dbt-labs/dbt-adapters.git",
"dbt-adapters @ git+https://github.com/dbt-labs/dbt-adapters.git@feature/externalCatalogConfig",
"dbt-common @ git+https://github.com/dbt-labs/dbt-common.git",
"dbt-tests-adapter @ git+https://github.com/dbt-labs/dbt-adapters.git#subdirectory=dbt-tests-adapter",
"dbt-core @ git+https://github.com/dbt-labs/dbt-core.git#subdirectory=core",
"dbt-tests-adapter @ git+https://github.com/dbt-labs/dbt-adapters.git@feature/externalCatalogConfig#subdirectory=dbt-tests-adapter",
"dbt-core @ git+https://github.com/dbt-labs/dbt-core.git@catalogs-parsing#subdirectory=core",
"ddtrace==2.3.0",
"ipdb~=0.13.13",
"pre-commit~=3.7.0",
Expand All @@ -30,7 +30,7 @@ dependencies = [
setup = "pre-commit install"
code-quality = "pre-commit run --all-files"
unit-tests = "python -m pytest {args:tests/unit}"
integration-tests = "- python -m pytest {args:tests/functional}"
integration-tests = "python -m pytest {args:tests/functional}"
docker-dev = [
"docker build -f docker/dev.Dockerfile -t dbt-snowflake-dev .",
"docker run --rm -it --name dbt-snowflake-dev -v $(pwd):/opt/code dbt-snowflake-dev",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ def test_replace(self, project, scenario):
assert relation_type == scenario.final.relation_type, scenario.error_message
if relation_type == "dynamic_table":
dynamic_table = describe_dynamic_table(project, scenario.name)
assert dynamic_table.catalog.table_format == scenario.final.table_format
assert dynamic_table.catalog is not None
else:
pytest.skip()

Expand Down
Loading