Skip to content

Commit

Permalink
external table materialization
Browse files Browse the repository at this point in the history
  • Loading branch information
dataders committed May 15, 2024
1 parent 2e7d04c commit b06bd67
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 0 deletions.
12 changes: 12 additions & 0 deletions dbt/adapters/base/relation.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,14 @@ def is_view(self) -> bool:
def is_materialized_view(self) -> bool:
return self.type == RelationType.MaterializedView

@property
def is_external(self) -> bool:
return self.type == RelationType.External

@property
def is_external_table(self) -> bool:
return self.type == RelationType.ExternalTable

@classproperty
def Table(cls) -> str:
return str(RelationType.Table)
Expand All @@ -367,6 +375,10 @@ def View(cls) -> str:
def External(cls) -> str:
return str(RelationType.External)

@classproperty
def ExternalTable(cls) -> str:
return str(RelationType.ExternalTable)

@classproperty
def MaterializedView(cls) -> str:
return str(RelationType.MaterializedView)
Expand Down
1 change: 1 addition & 0 deletions dbt/adapters/contracts/relation.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class RelationType(StrEnum):
MaterializedView = "materialized_view"
External = "external"
Ephemeral = "ephemeral"
ExternalTable = "external_table"


class MaterializationContract(Protocol):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
{% materialization external_table, default %}

{%- set identifier = model['alias'] -%}
{%- set full_refresh_mode = (should_full_refresh()) -%}

{%- set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) -%}

{%- set exists_as_table = (old_relation is not none and old_relation.is_table) -%}
{%- set exists_as_view = (old_relation is not none and old_relation.is_view) -%}
{%- set exists_as_external_table = (old_relation is not none and old_relation.is_external) -%}

{%- set grant_config = config.get('grants') -%}

{{ run_hooks(pre_hooks, inside_transaction=False) }}

-- `BEGIN` happens here:
{{ run_hooks(pre_hooks, inside_transaction=True) }}

-- build model
{% set build_plan = "" %}

{% set create_or_replace = (old_relation is none or full_refresh_mode) %}

{% if exists_as_view %}
{{ exceptions.raise_compiler_error("Cannot seed to '{}', it is a view".format(old_relation)) }}
{% elif exists_as_table %}
{{ exceptions.raise_compiler_error("Cannot seed to '{}', it is a table".format(old_relation)) }}
{% elif exists_as_external_table %}
{% set build_plan = build_plan + refresh_external_table(source_node) %}
{% else %}
{% set build_plan = build_plan + [
dbt_external_tables.create_external_schema(source_node),
dbt_external_tables.create_external_table(source_node)
] %}
{% endif %}

{% set code = 'CREATE' if create_or_replace else 'REFRESH' %}

{% set sql = load_csv_rows(model, agate_table) %}
{% call noop_statement('main', code ~ ' ' ~ rows_affected, code, rows_affected) %}
{{ build_plan }};
{% endcall %}

{% set target_relation = this.incorporate(type='external_table') %}

{% set should_revoke = should_revoke(old_relation, full_refresh_mode) %}
{% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %}

{% do persist_docs(target_relation, model) %}

{{ run_hooks(post_hooks, inside_transaction=True) }}

-- `COMMIT` happens here
{{ adapter.commit() }}

{{ run_hooks(post_hooks, inside_transaction=False) }}

{{ return({'relations': [target_relation]}) }}

{% endmaterialization %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{% macro create_external_table(source_node) %}
{{ adapter.dispatch('create_external_table', 'dbt_external_tables')(source_node) }}
{% endmacro %}

{% macro default__create_external_table(source_node) %}
{{ exceptions.raise_compiler_error("External table creation is not implemented for the default adapter") }}
{% endmacro %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
{% macro refresh_external_table(source_node) %}
{{ return(adapter.dispatch('refresh_external_table', 'dbt_external_tables')(source_node)) }}
{% endmacro %}

{% macro default__refresh_external_table(source_node) %}
{% do return([]) %}
{% endmacro %}

{% macro update_external_table_columns(source_node) %}
{{ return(adapter.dispatch('update_external_table_columns', 'dbt_external_tables')(source_node)) }}
{% endmacro %}

{% macro default__update_external_table_columns(source_node) %}

{% endmacro %}

{%- macro create_external_schema(source_node) -%}
{{ adapter.dispatch('create_external_schema', 'dbt_external_tables')(source_node) }}
{%- endmacro -%}

{%- macro default__create_external_schema(source_node) -%}
{%- set fqn -%}
{%- if source_node.database -%}
{{ source_node.database }}.{{ source_node.schema }}
{%- else -%}
{{ source_node.schema }}
{%- endif -%}
{%- endset -%}

{%- set ddl -%}
create schema if not exists {{ fqn }}
{%- endset -%}

{{ return(ddl) }}
{%- endmacro -%}


{% macro exit_transaction() %}
{{ return(adapter.dispatch('exit_transaction', 'dbt_external_tables')()) }}
{% endmacro %}

{% macro default__exit_transaction() %}
{{ return('') }}
{% endmacro %}

{% macro dropif(node) %}
{{ adapter.dispatch('dropif', 'dbt_external_tables')(node) }}
{% endmacro %}

{% macro default__dropif() %}
{{ exceptions.raise_compiler_error(
"Dropping external tables is not implemented for the default adapter"
) }}
{% endmacro %}

0 comments on commit b06bd67

Please sign in to comment.