diff --git a/dbt/adapters/snowflake/catalog.py b/dbt/adapters/snowflake/catalog.py new file mode 100644 index 000000000..b342d5bf0 --- /dev/null +++ b/dbt/adapters/snowflake/catalog.py @@ -0,0 +1,57 @@ +from typing import Dict, Optional + +import textwrap + +from dbt.adapters.base import BaseRelation +from dbt.adapters.contracts.catalog import CatalogIntegration, CatalogIntegrationType +from dbt.adapters.contracts.relation import RelationConfig + + +class SnowflakeManagedIcebergCatalogIntegration(CatalogIntegration): + catalog_type = CatalogIntegrationType.managed + + def render_ddl_predicates(self, relation: RelationConfig) -> str: + """ + {{ optional('external_volume', dynamic_table.catalog.external_volume) }} + {{ optional('catalog', dynamic_table.catalog.name) }} + base_location = '{{ dynamic_table.catalog.base_location }}' + :param relation: + :return: + """ + base_location: str = f"_dbt/{relation.schema}/{relation.name}" + + if sub_path := relation.config.get("base_location_subpath"): + base_location += f"/{sub_path}" + + iceberg_ddl_predicates: str = f""" + external_volume = '{self.external_volume}' + catalog = 'snowflake' + base_location = '{base_location}' + """ + return textwrap.indent(textwrap.dedent(iceberg_ddl_predicates), " " * 10) + + +class SnowflakeGlueCatalogIntegration(CatalogIntegration): + catalog_type = CatalogIntegrationType.glue + auto_refresh: str = "FALSE" + replace_invalid_characters: str = "FALSE" + + def _handle_adapter_configs(self, adapter_configs: Optional[Dict]) -> None: + if adapter_configs: + if "auto_refresh" in adapter_configs: + self.auto_refresh = adapter_configs["auto_refresh"] + if "replace_invalid_characters" in adapter_configs: + self.replace_invalid_characters = adapter_configs["replace_invalid_characters"] + + def render_ddl_predicates(self, relation: BaseRelation) -> str: + ddl_predicate = f"""create or replace iceberg table {relation.render()} + external_volume = '{self.external_volume} + catalog = '{self.name}' + """ + if self.namespace: + ddl_predicate += "CATALOG_NAMESPACE = '{self.namespace}'" + if self.auto_refresh: + ddl_predicate += f"REPLACE_INVALID_CHARACTERS = {self.auto_refresh}" + if self.replace_invalid_characters: + ddl_predicate += f"AUTO_REFRESH = {self.replace_invalid_characters}" + return ddl_predicate diff --git a/dbt/adapters/snowflake/impl.py b/dbt/adapters/snowflake/impl.py index 7ccff9f8a..b4e058cd9 100644 --- a/dbt/adapters/snowflake/impl.py +++ b/dbt/adapters/snowflake/impl.py @@ -4,7 +4,12 @@ from dbt.adapters.base.impl import AdapterConfig, ConstraintSupport from dbt.adapters.base.meta import available from dbt.adapters.capability import CapabilityDict, CapabilitySupport, Support, Capability +from dbt.adapters.contracts.catalog import CatalogIntegrationType from dbt.adapters.contracts.relation import RelationConfig +from dbt.adapters.snowflake.catalog import ( + SnowflakeManagedIcebergCatalogIntegration, + SnowflakeGlueCatalogIntegration, +) from dbt.adapters.sql import SQLAdapter from dbt.adapters.sql.impl import ( LIST_SCHEMAS_MACRO_NAME, @@ -63,6 +68,10 @@ class SnowflakeAdapter(SQLAdapter): ConnectionManager = SnowflakeConnectionManager AdapterSpecificConfigs = SnowflakeConfig + CatalogIntegrations = { + CatalogIntegrationType.managed: SnowflakeManagedIcebergCatalogIntegration, + CatalogIntegrationType.glue: SnowflakeGlueCatalogIntegration, + } CONSTRAINT_SUPPORT = { ConstraintType.check: ConstraintSupport.NOT_SUPPORTED, diff --git a/dbt/adapters/snowflake/relation_configs/catalog.py b/dbt/adapters/snowflake/relation_configs/catalog.py index c8d7de40f..309dac579 100644 --- a/dbt/adapters/snowflake/relation_configs/catalog.py +++ b/dbt/adapters/snowflake/relation_configs/catalog.py @@ -89,6 +89,10 @@ def parse_relation_config(cls, relation_config: RelationConfig) -> Dict[str, Any return config_dict + @classmethod + def from_relation_config(cls, relation_config: RelationConfig) -> Self: + return cls.from_dict(cls.parse_relation_config(relation_config)) + @classmethod def parse_relation_results(cls, relation_results: RelationResults) -> Dict[str, Any]: # this try block can be removed once enable_iceberg_materializations is retired diff --git a/dbt/adapters/snowflake/relation_configs/dynamic_table.py b/dbt/adapters/snowflake/relation_configs/dynamic_table.py index 7361df80a..a54e20ef7 100644 --- a/dbt/adapters/snowflake/relation_configs/dynamic_table.py +++ b/dbt/adapters/snowflake/relation_configs/dynamic_table.py @@ -1,19 +1,22 @@ from dataclasses import dataclass -from typing import Optional, Dict, Any, TYPE_CHECKING +from typing import Optional, Dict, Any, TYPE_CHECKING, Union +from dbt.adapters.contracts.catalog import CatalogIntegrationConfig, CatalogIntegrationType from dbt.adapters.relation_configs import RelationConfigChange, RelationResults +from dbt.adapters.clients import catalogs as catalogs_client from dbt.adapters.contracts.relation import RelationConfig from dbt.adapters.contracts.relation import ComponentName from dbt_common.dataclass_schema import StrEnum # doesn't exist in standard library until py3.11 from typing_extensions import Self +from dbt.adapters.relation_configs.formats import TableFormat +from dbt.adapters.snowflake.catalog import SnowflakeManagedIcebergCatalogIntegration from dbt.adapters.snowflake.relation_configs.base import SnowflakeRelationConfigBase from dbt.adapters.snowflake.relation_configs.catalog import ( SnowflakeCatalogConfig, SnowflakeCatalogConfigChange, ) - if TYPE_CHECKING: import agate @@ -37,6 +40,33 @@ def default(cls) -> Self: return cls("ON_CREATE") +def _setup_catalog_integration(catalog_info: Union[Dict, RelationConfig]) -> Optional[str]: + breakpoint() + if not catalog_info: + return None + elif isinstance(catalog_info, dict): + catalog_config = SnowflakeCatalogConfig.from_dict(catalog_info) + else: + catalog_config = SnowflakeCatalogConfig.from_relation_config(catalog_info) + + if catalog_config.table_format != TableFormat.default(): + catalog_name = "snowflake_managed" + integration_config = CatalogIntegrationConfig( + catalog_name=catalog_name, + integration_name=catalog_config.name, + table_format=catalog_config.table_format, + catalog_type=CatalogIntegrationType.managed.value, + external_volume=catalog_config.external_volume, + ) + catalogs_client.add_catalog( + SnowflakeManagedIcebergCatalogIntegration(integration_config), + catalog_name=catalog_name, + ) + return catalog_name + else: + return None + + @dataclass(frozen=True, eq=True, unsafe_hash=True) class SnowflakeDynamicTableConfig(SnowflakeRelationConfigBase): """ @@ -60,12 +90,13 @@ class SnowflakeDynamicTableConfig(SnowflakeRelationConfigBase): query: str target_lag: str snowflake_warehouse: str - catalog: SnowflakeCatalogConfig + catalog: Optional[str] = None refresh_mode: Optional[RefreshMode] = RefreshMode.default() initialize: Optional[Initialize] = Initialize.default() @classmethod def from_dict(cls, config_dict: Dict[str, Any]) -> Self: + catalog = _setup_catalog_integration(config_dict["catalog"]) kwargs_dict = { "name": cls._render_part(ComponentName.Identifier, config_dict.get("name")), "schema_name": cls._render_part(ComponentName.Schema, config_dict.get("schema_name")), @@ -75,7 +106,7 @@ def from_dict(cls, config_dict: Dict[str, Any]) -> Self: "query": config_dict.get("query"), "target_lag": config_dict.get("target_lag"), "snowflake_warehouse": config_dict.get("snowflake_warehouse"), - "catalog": SnowflakeCatalogConfig.from_dict(config_dict["catalog"]), + "catalog": catalog, "refresh_mode": config_dict.get("refresh_mode"), "initialize": config_dict.get("initialize"), } @@ -84,6 +115,7 @@ def from_dict(cls, config_dict: Dict[str, Any]) -> Self: @classmethod def parse_relation_config(cls, relation_config: RelationConfig) -> Dict[str, Any]: + catalog = _setup_catalog_integration(relation_config) config_dict = { "name": relation_config.identifier, "schema_name": relation_config.schema, @@ -91,7 +123,7 @@ def parse_relation_config(cls, relation_config: RelationConfig) -> Dict[str, Any "query": relation_config.compiled_code, "target_lag": relation_config.config.extra.get("target_lag"), "snowflake_warehouse": relation_config.config.extra.get("snowflake_warehouse"), - "catalog": SnowflakeCatalogConfig.parse_relation_config(relation_config), + "catalog": catalog, } if refresh_mode := relation_config.config.extra.get("refresh_mode"): diff --git a/dbt/include/snowflake/macros/relations/dynamic_table/create.sql b/dbt/include/snowflake/macros/relations/dynamic_table/create.sql index 4ebcf145b..ac77bf556 100644 --- a/dbt/include/snowflake/macros/relations/dynamic_table/create.sql +++ b/dbt/include/snowflake/macros/relations/dynamic_table/create.sql @@ -15,7 +15,7 @@ {%- set dynamic_table = relation.from_config(config.model) -%} - {%- if dynamic_table.catalog.table_format == 'iceberg' -%} + {%- if dynamic_table.catalog is not none -%} {{ _get_create_dynamic_iceberg_table_as_sql(dynamic_table, relation, sql) }} {%- else -%} {{ _get_create_dynamic_standard_table_as_sql(dynamic_table, relation, sql) }} @@ -70,12 +70,16 @@ -- A valid DDL statement which will result in a new dynamic iceberg table. -#} + {% set catalog_integration = adapter.get_catalog_integration(dynamic_table.catalog) -%} + + {% if not catalog_integration -%} + {{ raise('Catalog integration is required for iceberg tables') }} + {%- endif -%} + create dynamic iceberg table {{ relation }} target_lag = '{{ dynamic_table.target_lag }}' warehouse = {{ dynamic_table.snowflake_warehouse }} - {{ optional('external_volume', dynamic_table.catalog.external_volume) }} - {{ optional('catalog', dynamic_table.catalog.name) }} - base_location = '{{ dynamic_table.catalog.base_location }}' + {{ catalog_integration.render_ddl_predicates(relation) }} {{ optional('refresh_mode', dynamic_table.refresh_mode) }} {{ optional('initialize', dynamic_table.initialize) }} as ( diff --git a/dbt/include/snowflake/macros/relations/dynamic_table/replace.sql b/dbt/include/snowflake/macros/relations/dynamic_table/replace.sql index 2e7b4566a..4c9b966e7 100644 --- a/dbt/include/snowflake/macros/relations/dynamic_table/replace.sql +++ b/dbt/include/snowflake/macros/relations/dynamic_table/replace.sql @@ -15,7 +15,7 @@ {%- set dynamic_table = relation.from_config(config.model) -%} - {%- if dynamic_table.catalog.table_format == 'iceberg' -%} + {%- if dynamic_table.catalog is not none -%} {{ _get_replace_dynamic_iceberg_table_as_sql(dynamic_table, relation, sql) }} {%- else -%} {{ _get_replace_dynamic_standard_table_as_sql(dynamic_table, relation, sql) }} @@ -68,13 +68,16 @@ -- Returns: -- A valid DDL statement which will result in a new dynamic iceberg table. -#} + {% set catalog_integration = adapter.get_catalog_integration(dynamic_table.catalog) -%} + + {% if not catalog_integration -%} + {{ raise('Catalog integration is required for iceberg tables') }} + {%- endif -%} create or replace dynamic iceberg table {{ relation }} target_lag = '{{ dynamic_table.target_lag }}' warehouse = {{ dynamic_table.snowflake_warehouse }} - {{ optional('external_volume', dynamic_table.catalog.external_volume) }} - {{ optional('catalog', dynamic_table.catalog.name) }} - base_location = '{{ dynamic_table.catalog.base_location }}' + {{ catalog_integration.render_ddl_predicates(relation) }} {{ optional('refresh_mode', dynamic_table.refresh_mode) }} {{ optional('initialize', dynamic_table.initialize) }} as ( diff --git a/dev-requirements.txt b/dev-requirements.txt new file mode 100644 index 000000000..6580efdba --- /dev/null +++ b/dev-requirements.txt @@ -0,0 +1,23 @@ +# install latest changes in dbt-core +git+https://github.com/dbt-labs/dbt-core.git@catalogs-parsing#egg=dbt-core&subdirectory=core +git+https://github.com/dbt-labs/dbt-adapters.git +git+https://github.com/dbt-labs/dbt-adapters.git#subdirectory=dbt-tests-adapter +git+https://github.com/dbt-labs/dbt-common.git + +# dev +ipdb~=0.13.13 +pre-commit~=3.7.0 + +# test +ddtrace==2.3.0 +pytest~=7.4 +pytest-csv~=3.0 +pytest-dotenv~=0.5.2 +pytest-logbook~=1.2 +pytest-xdist~=3.6 +tox~=4.16 + +# build +bumpversion~=0.6.0 +twine~=5.1 +wheel~=0.43 diff --git a/hatch.toml b/hatch.toml index 2377e5d6c..b22b571f0 100644 --- a/hatch.toml +++ b/hatch.toml @@ -11,10 +11,10 @@ sources = ["src"] [envs.default] dependencies = [ - "dbt-adapters @ git+https://github.com/dbt-labs/dbt-adapters.git", + "dbt-adapters @ git+https://github.com/dbt-labs/dbt-adapters.git@feature/externalCatalogConfig", "dbt-common @ git+https://github.com/dbt-labs/dbt-common.git", - "dbt-tests-adapter @ git+https://github.com/dbt-labs/dbt-adapters.git#subdirectory=dbt-tests-adapter", - "dbt-core @ git+https://github.com/dbt-labs/dbt-core.git#subdirectory=core", + "dbt-tests-adapter @ git+https://github.com/dbt-labs/dbt-adapters.git@feature/externalCatalogConfig#subdirectory=dbt-tests-adapter", + "dbt-core @ git+https://github.com/dbt-labs/dbt-core.git@catalogs-parsing#subdirectory=core", "ddtrace==2.3.0", "ipdb~=0.13.13", "pre-commit~=3.7.0", @@ -30,7 +30,7 @@ dependencies = [ setup = "pre-commit install" code-quality = "pre-commit run --all-files" unit-tests = "python -m pytest {args:tests/unit}" -integration-tests = "- python -m pytest {args:tests/functional}" +integration-tests = "python -m pytest {args:tests/functional}" docker-dev = [ "docker build -f docker/dev.Dockerfile -t dbt-snowflake-dev .", "docker run --rm -it --name dbt-snowflake-dev -v $(pwd):/opt/code dbt-snowflake-dev", diff --git a/tests/functional/relation_tests/test_relation_type_change.py b/tests/functional/relation_tests/test_relation_type_change.py index 1024a92ca..244c5cb81 100644 --- a/tests/functional/relation_tests/test_relation_type_change.py +++ b/tests/functional/relation_tests/test_relation_type_change.py @@ -120,7 +120,7 @@ def test_replace(self, project, scenario): assert relation_type == scenario.final.relation_type, scenario.error_message if relation_type == "dynamic_table": dynamic_table = describe_dynamic_table(project, scenario.name) - assert dynamic_table.catalog.table_format == scenario.final.table_format + assert dynamic_table.catalog is not None else: pytest.skip()