Skip to content

Commit

Permalink
ADAP-1166: Add table format telemetry reporting to Spark adapter (#517)
Browse files Browse the repository at this point in the history
  • Loading branch information
VanTudor authored Jan 16, 2025
1 parent 886f7da commit da4bf5f
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 0 deletions.
20 changes: 20 additions & 0 deletions dbt-spark/src/dbt/adapters/spark/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,26 @@ def debug_query(self) -> None:
"""Override for DebugTask method"""
self.execute("select 1 as id")

@classmethod
def _get_adapter_specific_run_info(cls, config: RelationConfig) -> Dict[str, Any]:
table_format: Optional[str] = None
# Full table_format support within this adapter is coming. Until then, for telemetry,
# we're relying on table_formats_within_file_formats - a subset of file_format values
table_formats_within_file_formats = ["delta", "iceberg", "hive", "hudi"]

if (
config
and hasattr(config, "_extra")
and (file_format := config._extra.get("file_format"))
):
if file_format in table_formats_within_file_formats:
table_format = file_format

return {
"adapter_type": "spark",
"table_format": table_format,
}


# spark does something interesting with joins when both tables have the same
# static values for the join condition and complains that the join condition is
Expand Down
51 changes: 51 additions & 0 deletions dbt-spark/tests/unit/test_adapter_telemetry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
from unittest import mock

import dbt.adapters.spark.__version__

from dbt.adapters.spark.impl import SparkAdapter
from dbt.adapters.base.relation import AdapterTrackingRelationInfo


def assert_telemetry_data(adapter_type: str, file_format: str):
table_formats_within_file_formats = ["delta", "iceberg", "hive", "hudi"]
expected_table_format = None
if file_format in table_formats_within_file_formats:
expected_table_format = file_format

mock_model_config = mock.MagicMock()
mock_model_config._extra = mock.MagicMock()
mock_model_config._extra = {
"adapter_type": adapter_type,
"file_format": file_format,
}

res = SparkAdapter.get_adapter_run_info(mock_model_config)

assert res.adapter_name == adapter_type
assert res.base_adapter_version == dbt.adapters.__about__.version
assert res.adapter_version == dbt.adapters.spark.__version__.version

assert res.model_adapter_details == {
"adapter_type": adapter_type,
"table_format": expected_table_format,
}

assert type(res) is AdapterTrackingRelationInfo


def test_telemetry_with_spark_details():
spark_file_formats = [
"text",
"csv",
"json",
"jdbc",
"parquet",
"orc",
"hive",
"delta",
"iceberg",
"libsvm",
"hudi",
]
for file_format in spark_file_formats:
assert_telemetry_data("spark", file_format)

0 comments on commit da4bf5f

Please sign in to comment.