diff --git a/.github/workflows/integration_tests.yml b/.github/workflows/integration_tests.yml index f13a222..adccb8f 100644 --- a/.github/workflows/integration_tests.yml +++ b/.github/workflows/integration_tests.yml @@ -5,7 +5,6 @@ on: branches: - main - 'release/**' - concurrency: dbt_integration_tests env: @@ -41,14 +40,13 @@ env: SNOWFLAKE_TEST_DATABASE: ${{ secrets.SNOWFLAKE_TEST_DATABASE }} SNOWFLAKE_TEST_WAREHOUSE: ${{ secrets.SNOWFLAKE_TEST_WAREHOUSE }} - # # Postgres Connection - # POSTGRES_TEST_HOST: ${{ secrets.POSTGRES_TEST_HOST }} + # Postgres Connection # POSTGRES_TEST_USER: ${{ secrets.POSTGRES_TEST_USER }} # POSTGRES_TEST_PASS: ${{ secrets.POSTGRES_TEST_PASS }} - # POSTGRES_TEST_PORT: ${{ secrets.POSTGRES_TEST_PORT }} # POSTGRES_TEST_DBNAME: ${{ secrets.POSTGRES_TEST_DBNAME }} + # POSTGRES_TEST_HOST: ${{ secrets.POSTGRES_TEST_HOST }} + # POSTGRES_TEST_PORT: ${{ secrets.POSTGRES_TEST_PORT }} - # Databricks Connection DATABRICKS_TEST_HOST: ${{ secrets.DATABRICKS_TEST_HOST }} DATABRICKS_TEST_HTTP_PATH: ${{ secrets.DATABRICKS_TEST_HTTP_PATH }} DATABRICKS_TEST_TOKEN: ${{ secrets.DATABRICKS_TEST_TOKEN }} @@ -63,14 +61,35 @@ jobs: # Run tests from integration_tests sub dir working-directory: ./integration_tests strategy: + fail-fast: false matrix: dbt_version: ["1.*"] - warehouse: ["bigquery", "snowflake", "databricks"] + warehouse: ["bigquery", "snowflake", "databricks", "spark_iceberg"] steps: - name: Check out - uses: actions/checkout@v3 + uses: actions/checkout@v4 + - name: Configure Docker credentials + uses: docker/login-action@v2 + with: + username: ${{ secrets.DOCKERHUB_SNOWPLOWCI_READ_USERNAME }} + password: ${{ secrets.DOCKERHUB_SNOWPLOWCI_READ_PASSWORD }} + - name: Configure AWS credentials + uses: aws-actions/configure-aws-credentials@v1 + with: + aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }} + aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }} + aws-region: eu-west-1 + - name: Set warehouse variables + id: set_warehouse + run: | + WAREHOUSE_PLATFORM=$(echo ${{ matrix.warehouse }} | cut -d'_' -f1) + WAREHOUSE_SPECIFIC=$(echo ${{ matrix.warehouse }} | cut -s -d'_' -f2) + echo "WAREHOUSE_PLATFORM=${WAREHOUSE_PLATFORM}" >> $GITHUB_ENV + echo "WAREHOUSE_SPECIFIC=${WAREHOUSE_SPECIFIC}" >> $GITHUB_ENV + echo "warehouse_platform=${WAREHOUSE_PLATFORM}" >> $GITHUB_OUTPUT + echo "warehouse_specific=${WAREHOUSE_SPECIFIC}" >> $GITHUB_OUTPUT # Remove '*' and replace '.' with '_' in DBT_VERSION & set as SCHEMA_SUFFIX. # SCHEMA_SUFFIX allows us to run multiple versions of dbt in parallel without overwriting the output tables - name: Set SCHEMA_SUFFIX env @@ -80,7 +99,7 @@ jobs: - name: Set DEFAULT_TARGET env run: | - echo "DEFAULT_TARGET=${{ matrix.warehouse }}" >> $GITHUB_ENV + echo "DEFAULT_TARGET=${{matrix.warehouse}}" >> $GITHUB_ENV - name: Python setup uses: actions/setup-python@v4 @@ -91,32 +110,46 @@ jobs: uses: actions/cache@v3 with: path: ~/.cache/pip - key: ${{ runner.os }}-pip-${{ matrix.dbt_version }}-${{ matrix.warehouse }} + key: ${{ runner.os }}-pip-${{ matrix.dbt_version }}-${{env.WAREHOUSE_PLATFORM}} restore-keys: | - ${{ runner.os }}-pip-${{ matrix.dbt_version }}-${{ matrix.warehouse }} + ${{ runner.os }}-pip-${{ matrix.dbt_version }}-${{env.WAREHOUSE_PLATFORM}} # Install latest patch version. Upgrade if cache contains old patch version. - name: Install dependencies run: | - pip install --upgrade pip wheel setuptools - pip install -Iv dbt-${{ matrix.warehouse }}==${{ matrix.dbt_version }} --upgrade + pip install wheel setuptools + pip install -Iv dbt-${{env.WAREHOUSE_PLATFORM}}==${{ matrix.dbt_version }} --upgrade dbt deps - if: ${{matrix.warehouse != 'spark'}} + if: ${{env.WAREHOUSE_PLATFORM != 'spark'}} - name: Install spark dependencies run: | pip install --upgrade pip wheel setuptools - pip install -Iv "dbt-${{ matrix.warehouse }}[ODBC]"==${{ matrix.dbt_version }} --upgrade + pip install -Iv "dbt-${{ env.WAREHOUSE_PLATFORM }}[PyHive]"==${{ matrix.dbt_version }} --upgrade dbt deps - if: ${{matrix.warehouse == 'spark'}} + if: ${{env.WAREHOUSE_PLATFORM == 'spark'}} + + - name: Install Docker Compose + run: | + sudo curl -L "https://github.com/docker/compose/releases/download/1.29.2/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose + sudo chmod +x /usr/local/bin/docker-compose + + - name: Build and start Spark cluster + working-directory: .github/workflows/spark_deployment + run: | + docker-compose up -d + echo "Waiting for Spark services to start..." + sleep 90 + if: ${{env.WAREHOUSE_PLATFORM == 'spark'}} + - name: "Pre-test: Drop ci schemas" run: | - dbt run-operation post_ci_cleanup --target ${{ matrix.warehouse }} + dbt run-operation post_ci_cleanup --target ${{matrix.warehouse}} - name: Run tests - run: ./.scripts/integration_tests.sh -d ${{ matrix.warehouse }} + run: ./.scripts/integration_test.sh -d ${{matrix.warehouse}} - name: "Post-test: Drop ci schemas" run: | - dbt run-operation post_ci_cleanup --target ${{ matrix.warehouse }} + dbt run-operation post_ci_cleanup --target ${{matrix.warehouse}} \ No newline at end of file diff --git a/.github/workflows/spark_deployment/Dockerfile b/.github/workflows/spark_deployment/Dockerfile new file mode 100644 index 0000000..dab5720 --- /dev/null +++ b/.github/workflows/spark_deployment/Dockerfile @@ -0,0 +1,34 @@ +FROM openjdk:11-jre-slim + +# Set environment variables +ENV SPARK_VERSION=3.5.1 +ENV HADOOP_VERSION=3.3.4 +ENV ICEBERG_VERSION=1.4.2 +ENV AWS_SDK_VERSION=1.12.581 + +# Install necessary tools +RUN apt-get update && apt-get install -y curl wget procps rsync ssh + +# Download and install Spark +RUN wget https://downloads.apache.org/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop3.tgz && \ + tar -xvzf spark-${SPARK_VERSION}-bin-hadoop3.tgz && \ + mv spark-${SPARK_VERSION}-bin-hadoop3 /spark && \ + rm spark-${SPARK_VERSION}-bin-hadoop3.tgz + +# Set Spark environment variables +ENV SPARK_HOME=/spark +ENV PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin + +# Download necessary JARs +RUN mkdir -p /spark/jars && \ + wget https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/${ICEBERG_VERSION}/iceberg-spark-runtime-3.5_2.12-${ICEBERG_VERSION}.jar -O /spark/jars/iceberg-spark-runtime.jar && \ + wget https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-aws-bundle/${ICEBERG_VERSION}/iceberg-aws-bundle-${ICEBERG_VERSION}.jar -O /spark/jars/iceberg-aws-bundle.jar && \ + wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/${HADOOP_VERSION}/hadoop-aws-${HADOOP_VERSION}.jar -O /spark/jars/hadoop-aws.jar && \ + wget https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/${AWS_SDK_VERSION}/aws-java-sdk-bundle-${AWS_SDK_VERSION}.jar -O /spark/jars/aws-java-sdk-bundle.jar + +# Create directory for Spark events +RUN mkdir -p /tmp/spark-events + +WORKDIR /spark + +CMD ["bash"] \ No newline at end of file diff --git a/.github/workflows/spark_deployment/build_and_push.sh b/.github/workflows/spark_deployment/build_and_push.sh new file mode 100755 index 0000000..1be2b6d --- /dev/null +++ b/.github/workflows/spark_deployment/build_and_push.sh @@ -0,0 +1,20 @@ +#!/bin/bash + +# Set variables +DOCKER_HUB_ORG="snowplow" +IMAGE_NAME="spark-s3-iceberg" +TAG="latest" + +# Build the image +echo "Building Docker image..." +docker build --platform linux/amd64 -t $DOCKER_HUB_ORG/$IMAGE_NAME:$TAG . + +# Log in to Docker Hub +echo "Logging in to Docker Hub..." +docker login + +# Push the image to Docker Hub +echo "Pushing image to Docker Hub..." +docker push $DOCKER_HUB_ORG/$IMAGE_NAME:$TAG + +echo "Image successfully built and pushed to Docker Hub" \ No newline at end of file diff --git a/.github/workflows/spark_deployment/docker-compose.yml b/.github/workflows/spark_deployment/docker-compose.yml new file mode 100644 index 0000000..2e8077b --- /dev/null +++ b/.github/workflows/spark_deployment/docker-compose.yml @@ -0,0 +1,66 @@ +version: '3' + +networks: + spark-network: + driver: bridge + +services: + spark-master: + image: snowplow/spark-s3-iceberg:latest + command: ["/bin/bash", "-c", "/spark/sbin/start-master.sh -h spark-master --properties-file /spark/conf/spark-defaults.conf && tail -f /spark/logs/spark--org.apache.spark.deploy.master.Master-1-*.out"] + hostname: spark-master + ports: + - '8080:8080' + - '7077:7077' + environment: + - SPARK_LOCAL_IP=spark-master + - SPARK_MASTER_HOST=spark-master + - SPARK_MASTER_PORT=7077 + - SPARK_MASTER_OPTS="-Dspark.driver.memory=2g" + - AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID} + - AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY} + - AWS_REGION=eu-west-1 + - AWS_DEFAULT_REGION=eu-west-1 + volumes: + - ./spark-defaults.conf:/spark/conf/spark-defaults.conf + networks: + - spark-network + + spark-worker: + image: snowplow/spark-s3-iceberg:latest + command: ["/bin/bash", "-c", "sleep 10 && /spark/sbin/start-worker.sh spark://spark-master:7077 --properties-file /spark/conf/spark-defaults.conf && tail -f /spark/logs/spark--org.apache.spark.deploy.worker.Worker-*.out"] + depends_on: + - spark-master + environment: + - SPARK_WORKER_CORES=2 + - SPARK_WORKER_MEMORY=4G + - SPARK_EXECUTOR_MEMORY=3G + - SPARK_LOCAL_IP=spark-worker + - SPARK_MASTER=spark://spark-master:7077 + - AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID} + - AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY} + - AWS_REGION=eu-west-1 + - AWS_DEFAULT_REGION=eu-west-1 + volumes: + - ./spark-defaults.conf:/spark/conf/spark-defaults.conf + networks: + - spark-network + + thrift-server: + image: snowplow/spark-s3-iceberg:latest + command: ["/bin/bash", "-c", "sleep 30 && /spark/sbin/start-thriftserver.sh --master spark://spark-master:7077 --driver-memory 2g --executor-memory 3g --hiveconf hive.server2.thrift.port=10000 --hiveconf hive.server2.thrift.bind.host=0.0.0.0 --conf spark.sql.hive.thriftServer.async=true --conf spark.sql.hive.thriftServer.workerQueue.size=2000 --conf spark.sql.hive.thriftServer.maxWorkerThreads=100 --conf spark.sql.hive.thriftServer.minWorkerThreads=50 && tail -f /spark/logs/spark--org.apache.spark.sql.hive.thriftserver.HiveThriftServer2-*.out"] + ports: + - '10000:10000' + depends_on: + - spark-master + - spark-worker + environment: + - SPARK_LOCAL_IP=thrift-server + - AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID} + - AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY} + - AWS_REGION=eu-west-1 + - AWS_DEFAULT_REGION=eu-west-1 + volumes: + - ./spark-defaults.conf:/spark/conf/spark-defaults.conf + networks: + - spark-network \ No newline at end of file diff --git a/.github/workflows/spark_deployment/spark-defaults.conf b/.github/workflows/spark_deployment/spark-defaults.conf new file mode 100644 index 0000000..9052a05 --- /dev/null +++ b/.github/workflows/spark_deployment/spark-defaults.conf @@ -0,0 +1,44 @@ +spark.master spark://spark-master:7077 + +spark.sql.warehouse.dir s3a://dbt-spark-iceberg/github-integration-testing +spark.sql.catalog.glue org.apache.iceberg.spark.SparkCatalog +spark.sql.catalog.glue.catalog-impl org.apache.iceberg.aws.glue.GlueCatalog +spark.sql.catalog.glue.warehouse s3a://dbt-spark-iceberg/github-integration-testing +spark.sql.catalog.glue.io-impl org.apache.iceberg.aws.s3.S3FileIO +spark.sql.defaultCatalog glue +spark.sql.catalog.glue.database dbt-spark-iceberg + +spark.hadoop.fs.s3a.impl org.apache.hadoop.fs.s3a.S3AFileSystem +spark.hadoop.fs.s3a.access.key +spark.hadoop.fs.s3a.secret.key +spark.hadoop.fs.s3a.endpoint s3.eu-west-1.amazonaws.com +spark.hadoop.fs.s3a.path.style.access true +spark.hadoop.fs.s3a.region eu-west-1 +spark.hadoop.fs.s3a.aws.region eu-west-1 + +# Enabling AWS SDK V4 signing (required for regions launched after January 2014) +spark.hadoop.com.amazonaws.services.s3.enableV4 true +spark.hadoop.fs.s3a.aws.credentials.provider org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider + +# Hive Metastore Configuration (using AWS Glue) +spark.hadoop.hive.metastore.client.factory.class com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory + +# Thrift Server Configuration for better performance in concurrent environments +spark.sql.hive.thriftServer.singleSession false +spark.sql.hive.thriftServer.async true +# spark.sql.hive.thriftServer.maxWorkerThreads 100 +# spark.sql.hive.thriftServer.minWorkerThreads 50 +# spark.sql.hive.thriftServer.workerQueue.size 2000 + +# Memory and Performance Tuning +# spark.driver.memory 2g +# spark.executor.memory 3g +# spark.worker.memory 4g +spark.network.timeout 600s +spark.sql.broadcastTimeout 600s +spark.sql.adaptive.enabled true +spark.serializer org.apache.spark.serializer.KryoSerializer + +# Logging and Debugging +spark.eventLog.enabled true +spark.eventLog.dir /tmp/spark-events diff --git a/CHANGELOG b/CHANGELOG index be643f9..a09708b 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,3 +1,24 @@ +Snowplow Normalize 0.4.0 (2024-12-04) +--------------------------------------- +## Summary +This version adds new features including Apache Spark with Iceberg support, flexible timestamp partitioning, and compatibility with the new BigQuery loader naming conventions. + +## Features +- Added support for Apache Spark with Iceberg +- Introduced configurable partition timestamp column name for derived tables +- Added compatibility with new BigQuery loader naming conventions + +## Upgrading +To upgrade simply bump the snowplow-normalize version in your `packages.yml` file. + +If you want to start using the new partition timestamp feature, you can configure the column name in your project configuration: +```yaml +vars: + snowplow__partition_tstamp: your_custom_column_name +``` + +You have to be sure that the field you add is included in the table, and you do a full refresh. + Snowplow Normalize 0.3.5 (2024-03-18) --------------------------------------- ## Summary diff --git a/dbt_project.yml b/dbt_project.yml index 8904d0e..e95c592 100644 --- a/dbt_project.yml +++ b/dbt_project.yml @@ -1,6 +1,6 @@ name: 'snowplow_normalize' -version: '0.3.5' +version: '0.4.0' config-version: 2 require-dbt-version: [">=1.4.0", "<2.0.0"] @@ -41,6 +41,7 @@ vars: snowplow__dev_target_name: 'dev' snowplow__allow_refresh: false snowplow__session_timestamp: 'collector_tstamp' + snowplow__partition_tstamp: 'collector_tstamp' # This is the column that will be used to partition the data in the derived tables, it should be a timestamp column that is present in the data # Variables - Databricks Only # Add the following variable to your dbt project's dbt_project.yml file # Depending on the use case it should either be the catalog (for Unity Catalog users from databricks connector 1.1.1 onwards) or the same value as your snowplow__atomic_schema (unless changed it should be 'atomic') @@ -63,7 +64,7 @@ on-run-end: models: snowplow_normalize: +materialized: table - +file_format: delta + +file_format: "{{ 'delta' if target.type not in ['spark'] else 'iceberg'}}" +bind: false base: manifest: diff --git a/integration_tests/.scripts/integration_tests.sh b/integration_tests/.scripts/integration_test.sh similarity index 78% rename from integration_tests/.scripts/integration_tests.sh rename to integration_tests/.scripts/integration_test.sh index 096d8cb..11d722b 100755 --- a/integration_tests/.scripts/integration_tests.sh +++ b/integration_tests/.scripts/integration_test.sh @@ -10,7 +10,7 @@ do esac done -declare -a SUPPORTED_DATABASES=("bigquery" "databricks" "snowflake") +declare -a SUPPORTED_DATABASES=("bigquery" "databricks" "snowflake", "spark_iceberg") # set to lower case DATABASE="$(echo $DATABASE | tr '[:upper:]' '[:lower:]')" @@ -23,13 +23,11 @@ fi for db in ${DATABASES[@]}; do - if [ $db == 'bigquery' ]; then - echo "Snowplow web integration tests: Seeding data and doing first run" - - eval "dbt seed --target $db --full-refresh" || exit 1; - - eval "dbt run --target $db --full-refresh" || exit 1; + if [[ "$db" == "bigquery" ]]; then + echo "Snowplow integration tests: Seeding data and doing first run" + eval "dbt seed --target $db --full-refresh" || exit 1 + eval "dbt run --target $db --full-refresh" || exit 1 fi echo "Snowplow normalize integration tests: snakeify case" diff --git a/integration_tests/ci/profiles.yml b/integration_tests/ci/profiles.yml index 5476748..76fe5ef 100644 --- a/integration_tests/ci/profiles.yml +++ b/integration_tests/ci/profiles.yml @@ -67,12 +67,13 @@ integration_tests: token: "{{ env_var('DATABRICKS_TEST_TOKEN') }}" threads: 4 - # spark: - # type: spark - # method: odbc - # driver: "{{ env_var('DATABRICKS_TEST_HTTP_PATH') }}" - # schema: "github_snwplow_web_dbt_{{ env_var('SCHEMA_SUFFIX') }}" - # host: "{{ env_var('DATABRICKS_TEST_HOST') }}" - # token: "{{ env_var('DATABRICKS_TEST_TOKEN') }}" - # endpoint: "{{ env_var('DATABRICKS_TEST_ENDPOINT') }}" - # threads: 4 + spark_iceberg: + type: spark + method: thrift + host: "{{ env_var('SPARK_MASTER_HOST', 'localhost') }}" + port: 10000 + user: "{{ env_var('SPARK_USER', 'spark') }}" + schema: "{{ env_var('SPARK_SCHEMA', 'default') }}" + connect_retries: 5 + connect_timeout: 60 + threads: 4 \ No newline at end of file diff --git a/integration_tests/data/source/snowplow_norm_dummy_events.csv b/integration_tests/data/source/snowplow_norm_dummy_events.csv index b80f0be..3840de2 100644 --- a/integration_tests/data/source/snowplow_norm_dummy_events.csv +++ b/integration_tests/data/source/snowplow_norm_dummy_events.csv @@ -1,2 +1,2 @@ -event_name,event_id,app_id,collector_tstamp,dvce_sent_tstamp,dvce_created_tstamp,unstruct_event_test_1_0_0,unstruct_event_test_1_0_1,unstruct_event_test2_1_0_0,unstruct_event_test2_1_0_1,contexts_test_1_0_0,contexts_test2_1_0_0,contexts_test2_1_0_1,contexts_test2_1_0_2,contexts_test2_1_0_3,contexts_test2_1_0_4,contexts_test2_1_0_5 -'demo','demo','demo',2022-10-01 01:27:34,2022-10-01 01:27:34,2022-10-01 01:27:34,"{""test_id"":""demo"", ""test_class"":""demo""}","{""test_id"":""demo"", ""test_class"":""demo""}","{""test_word"":""demo"", ""test_idea"":""demo""}","{""test_word"":""demo"", ""test_idea"":""demo""}","[{""context_test_id"":""demo"", ""context_test_class"":""demo""}]","[{""context_test_id2"":""demo"", ""context_test_class2"":""demo""}]","[{""context_test_id2"":""demo"", ""context_test_class2"":""demo""}]","[{""context_test_id2"":""demo"", ""context_test_class2"":""demo""}]","[{""context_test_id2"":""demo"", ""context_test_class2"":""demo""}]","[{""context_test_id2"":""demo"", ""context_test_class2"":""demo""}]","[{""context_test_id2"":""demo"", ""context_test_class2"":""demo""}]" +event_name,event_id,app_id,collector_tstamp,dvce_sent_tstamp,dvce_created_tstamp,unstruct_event_test_1_0_0,unstruct_event_test_1_0_1,unstruct_event_test2_1_0_0,unstruct_event_test2_1_0_1,contexts_test_1_0_0,contexts_test2_1_0_0,contexts_test2_1_0_1,contexts_test2_1_0_2,contexts_test2_1_0_3,contexts_test2_1_0_4,contexts_test2_1_0_5,contexts_test4_1,contexts_test4_2 +'demo','demo','demo',2022-10-01 01:27:34,2022-10-01 01:27:34,2022-10-01 01:27:34,"{""test_id"":""demo"", ""test_class"":""demo""}","{""test_id"":""demo"", ""test_class"":""demo""}","{""test_word"":""demo"", ""test_idea"":""demo""}","{""test_word"":""demo"", ""test_idea"":""demo""}","[{""context_test_id"":""demo"", ""context_test_class"":""demo""}]","[{""context_test_id2"":""demo"", ""context_test_class2"":""demo""}]","[{""context_test_id2"":""demo"", ""context_test_class2"":""demo""}]","[{""context_test_id2"":""demo"", ""context_test_class2"":""demo""}]","[{""context_test_id2"":""demo"", ""context_test_class2"":""demo""}]","[{""context_test_id2"":""demo"", ""context_test_class2"":""demo""}]","[{""context_test_id2"":""demo"", ""context_test_class2"":""demo""}]","[{""context_test_id2"":""demo"", ""context_test_class2"":""demo""}]","[{""context_test_id2"":""demo"", ""context_test_class2"":""demo""}]" diff --git a/integration_tests/dbt_project.yml b/integration_tests/dbt_project.yml index 3663550..f0972ae 100644 --- a/integration_tests/dbt_project.yml +++ b/integration_tests/dbt_project.yml @@ -1,5 +1,5 @@ name: 'snowplow_normalize_integration_tests' -version: '0.3.5' +version: '0.4.0' config-version: 2 profile: 'integration_tests' @@ -23,6 +23,9 @@ clean-targets: quoting: identifier: false schema: false + database: "{{ true if target.type in ['bigquery','databricks'] else false }}" + + vars: snowplow_normalize: @@ -31,18 +34,22 @@ vars: snowplow__backfill_limit_days: 2 snowplow__derived_tstamp_partitioned: false snowplow__atomic_schema: "{{ target.schema ~ 'sp_normalize_int_test' }}" + snowplow__partition_tstamp: "load_tstamp" models: snowplow_normalize_integration_tests: + +materialized: table bind: false +schema: "sp_normalize_int_test" dummy_model: bigquery: +enabled: "{{ target.type == 'bigquery' | as_bool() }}" databricks: - +enabled: "{{ target.type in ['databricks', 'spark'] | as_bool() }}" + +enabled: "{{ target.type == 'databricks' | as_bool() }}" snowflake: +enabled: "{{ target.type == 'snowflake' | as_bool() }}" + spark: + +enabled: "{{ target.type == 'spark' | as_bool() }}" seeds: quote_columns: false @@ -68,3 +75,4 @@ seeds: contexts_test2_1_0_3: "{{ 'string' if target.type in ['bigquery', 'databricks', 'spark'] else 'varchar' }}" contexts_test2_1_0_4: "{{ 'string' if target.type in ['bigquery', 'databricks', 'spark'] else 'varchar' }}" contexts_test2_1_0_5: "{{ 'string' if target.type in ['bigquery', 'databricks', 'spark'] else 'varchar' }}" + contexts_test4_1: "{{ 'string' if target.type in ['bigquery', 'databricks', 'spark'] else 'varchar' }}" diff --git a/integration_tests/macros/test_normalize_events.sql b/integration_tests/macros/test_normalize_events.sql index b063d74..e00e8fa 100644 --- a/integration_tests/macros/test_normalize_events.sql +++ b/integration_tests/macros/test_normalize_events.sql @@ -31,13 +31,13 @@ It runs 9 tests: {% macro bigquery__test_normalize_events() %} {% set expected_dict = { - "flat_cols_only" : "select event_id , collector_tstamp -- Flat columns from event table , app_id -- self describing events columns from event table -- context column(s) from the event table from `"~target.project~"`."~target.dataset~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')", "sde_plus_cols" : "select event_id , collector_tstamp -- Flat columns from event table , app_id -- self describing events columns from event table , coalesce(unstruct_event_test_1_0_1.test_class, unstruct_event_test_1_0_0.test_class) as test_class , coalesce(unstruct_event_test_1_0_1.test_id, unstruct_event_test_1_0_0.test_id) as test_id -- context column(s) from the event table from `"~target.project~"`."~target.dataset~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')", "sde_plus_cols_w_alias" : "select event_id , collector_tstamp -- Flat columns from event table , app_id -- self describing events columns from event table , coalesce(unstruct_event_test_1_0_1.test_class, unstruct_event_test_1_0_0.test_class) as my_alias_test_class , coalesce(unstruct_event_test_1_0_1.test_id, unstruct_event_test_1_0_0.test_id) as my_alias_test_id -- context column(s) from the event table from `"~target.project~"`."~target.dataset~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')", "sde_plus_1_context" : "select event_id , collector_tstamp -- Flat columns from event table , app_id -- self describing events columns from event table , coalesce(unstruct_event_test_1_0_1.test_class, unstruct_event_test_1_0_0.test_class) as test_class , coalesce(unstruct_event_test_1_0_1.test_id, unstruct_event_test_1_0_0.test_id) as test_id -- context column(s) from the event table , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_id) as context_test_id , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_class) as context_test_class from `"~target.project~"`."~target.dataset~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')", "sde_plus_2_context" : "select event_id , collector_tstamp -- Flat columns from event table , app_id -- self describing events columns from event table , coalesce(unstruct_event_test_1_0_1.test_class, unstruct_event_test_1_0_0.test_class) as test_class , coalesce(unstruct_event_test_1_0_1.test_id, unstruct_event_test_1_0_0.test_id) as test_id -- context column(s) from the event table , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_id) as context_test_id , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_class) as context_test_class , coalesce(contexts_test2_1_0_5[safe_offset(0)].context_test_class2, contexts_test2_1_0_4[safe_offset(0)].context_test_class2, contexts_test2_1_0_3[safe_offset(0)].context_test_class2, contexts_test2_1_0_2[safe_offset(0)].context_test_class2, contexts_test2_1_0_1[safe_offset(0)].context_test_class2, contexts_test2_1_0_0[safe_offset(0)].context_test_class2) as context_test_class2 , coalesce(contexts_test2_1_0_5[safe_offset(0)].context_test_id2, contexts_test2_1_0_4[safe_offset(0)].context_test_id2, contexts_test2_1_0_3[safe_offset(0)].context_test_id2, contexts_test2_1_0_2[safe_offset(0)].context_test_id2, contexts_test2_1_0_1[safe_offset(0)].context_test_id2, contexts_test2_1_0_0[safe_offset(0)].context_test_id2) as context_test_id2 from `"~target.project~"`."~target.dataset~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')", "sde_plus_2_context_w_alias" : "select event_id , collector_tstamp -- Flat columns from event table , app_id -- self describing events columns from event table , coalesce(unstruct_event_test_1_0_1.test_class, unstruct_event_test_1_0_0.test_class) as test_class , coalesce(unstruct_event_test_1_0_1.test_id, unstruct_event_test_1_0_0.test_id) as test_id -- context column(s) from the event table , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_id) as test1_context_test_id , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_class) as test1_context_test_class , coalesce(contexts_test2_1_0_5[safe_offset(0)].context_test_class2, contexts_test2_1_0_4[safe_offset(0)].context_test_class2, contexts_test2_1_0_3[safe_offset(0)].context_test_class2, contexts_test2_1_0_2[safe_offset(0)].context_test_class2, contexts_test2_1_0_1[safe_offset(0)].context_test_class2, contexts_test2_1_0_0[safe_offset(0)].context_test_class2) as test2_context_test_class2 , coalesce(contexts_test2_1_0_5[safe_offset(0)].context_test_id2, contexts_test2_1_0_4[safe_offset(0)].context_test_id2, contexts_test2_1_0_3[safe_offset(0)].context_test_id2, contexts_test2_1_0_2[safe_offset(0)].context_test_id2, contexts_test2_1_0_1[safe_offset(0)].context_test_id2, contexts_test2_1_0_0[safe_offset(0)].context_test_id2) as test2_context_test_id2 from `"~target.project~"`."~target.dataset~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')", "context_only" : "select event_id , collector_tstamp -- Flat columns from event table , app_id -- self describing events columns from event table -- context column(s) from the event table , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_id) as context_test_id , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_class) as context_test_class , coalesce(contexts_test2_1_0_5[safe_offset(0)].context_test_class2, contexts_test2_1_0_4[safe_offset(0)].context_test_class2, contexts_test2_1_0_3[safe_offset(0)].context_test_class2, contexts_test2_1_0_2[safe_offset(0)].context_test_class2, contexts_test2_1_0_1[safe_offset(0)].context_test_class2, contexts_test2_1_0_0[safe_offset(0)].context_test_class2) as context_test_class2 , coalesce(contexts_test2_1_0_5[safe_offset(0)].context_test_id2, contexts_test2_1_0_4[safe_offset(0)].context_test_id2, contexts_test2_1_0_3[safe_offset(0)].context_test_id2, contexts_test2_1_0_2[safe_offset(0)].context_test_id2, contexts_test2_1_0_1[safe_offset(0)].context_test_id2, contexts_test2_1_0_0[safe_offset(0)].context_test_id2) as context_test_id2 from `"~target.project~"`."~target.dataset~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')", + "context_only_new_loader" : "select event_id , collector_tstamp -- Flat columns from event table , app_id -- self describing events columns from event table -- context column(s) from the event table , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_id) as context_test_id , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_class) as context_test_class , coalesce(contexts_test4_2[safe_offset(0)].context_test_class2) as context_test_class2 , coalesce(contexts_test4_2[safe_offset(0)].context_test_id2) as context_test_id2 from `"~target.project~"`."~target.dataset~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')", "multiple_base_events" : "select event_id , collector_tstamp -- Flat columns from event table , app_id -- self describing events columns from event table -- context column(s) from the event table , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_id) as context_test_id , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_class) as context_test_class , coalesce(contexts_test2_1_0_5[safe_offset(0)].context_test_class2, contexts_test2_1_0_4[safe_offset(0)].context_test_class2, contexts_test2_1_0_3[safe_offset(0)].context_test_class2, contexts_test2_1_0_2[safe_offset(0)].context_test_class2, contexts_test2_1_0_1[safe_offset(0)].context_test_class2, contexts_test2_1_0_0[safe_offset(0)].context_test_class2) as context_test_class2 , coalesce(contexts_test2_1_0_5[safe_offset(0)].context_test_id2, contexts_test2_1_0_4[safe_offset(0)].context_test_id2, contexts_test2_1_0_3[safe_offset(0)].context_test_id2, contexts_test2_1_0_2[safe_offset(0)].context_test_id2, contexts_test2_1_0_1[safe_offset(0)].context_test_id2, contexts_test2_1_0_0[safe_offset(0)].context_test_id2) as context_test_id2 from `"~target.project~"`."~target.dataset~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name','page_ping')", "multiple_sde_events" : "select event_id , collector_tstamp -- Flat columns from event table , app_id -- self describing events columns from event table , coalesce(unstruct_event_test_1_0_1.test_class, unstruct_event_test_1_0_0.test_class) as test1_test_class , coalesce(unstruct_event_test_1_0_1.test_id, unstruct_event_test_1_0_0.test_id) as test1_test_id , coalesce(unstruct_event_test2_1_0_1.test_word, unstruct_event_test2_1_0_0.test_word) as test2_test_word , coalesce(unstruct_event_test2_1_0_1.test_idea, unstruct_event_test2_1_0_0.test_idea) as test2_test_idea -- context column(s) from the event table , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_id) as context_test_id , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_class) as context_test_class , coalesce(contexts_test2_1_0_5[safe_offset(0)].context_test_class2, contexts_test2_1_0_4[safe_offset(0)].context_test_class2, contexts_test2_1_0_3[safe_offset(0)].context_test_class2, contexts_test2_1_0_2[safe_offset(0)].context_test_class2, contexts_test2_1_0_1[safe_offset(0)].context_test_class2, contexts_test2_1_0_0[safe_offset(0)].context_test_class2) as context_test_class2 , coalesce(contexts_test2_1_0_5[safe_offset(0)].context_test_id2, contexts_test2_1_0_4[safe_offset(0)].context_test_id2, contexts_test2_1_0_3[safe_offset(0)].context_test_id2, contexts_test2_1_0_2[safe_offset(0)].context_test_id2, contexts_test2_1_0_1[safe_offset(0)].context_test_id2, contexts_test2_1_0_0[safe_offset(0)].context_test_id2) as context_test_id2 from `"~target.project~"`."~target.dataset~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')" } %} @@ -45,13 +45,13 @@ It runs 9 tests: {% set results_dict ={ - "flat_cols_only" : snowplow_normalize.normalize_events(['event_name'], ['app_id'], [], [], [], [], [], [], [], [], true).split()|join(' '), "sde_plus_cols" : snowplow_normalize.normalize_events(['event_name'], ['app_id'], ['UNSTRUCT_EVENT_TEST_1_0_1'], [['testId', 'testClass']], [['string', 'boolean']], [], [], [], [], [], true).split()|join(' '), "sde_plus_cols_w_alias" : snowplow_normalize.normalize_events(['event_name'], ['app_id'], ['UNSTRUCT_EVENT_TEST_1_0_1'], [['testId', 'testClass']], [['string', 'boolean']], ['my_alias'], [], [], [], [], true).split()|join(' '), "sde_plus_1_context" : snowplow_normalize.normalize_events(['event_name'], ['app_id'], ['UNSTRUCT_EVENT_TEST_1_0_1'], [['testId', 'testClass']], [['string', 'boolean']], [], ['CONTEXTS_TEST_1_0_0'], [['contextTestId', 'contextTestClass']], [['string', 'integer']], [], true).split()|join(' '), "sde_plus_2_context" : snowplow_normalize.normalize_events(['event_name'], ['app_id'], ['UNSTRUCT_EVENT_TEST_1_0_1'], [['testId', 'testClass']], [['string', 'boolean']], [], ['CONTEXTS_TEST_1_0_0', 'CONTEXTS_TEST2_1_0_5'], [['contextTestId', 'contextTestClass'], ['contextTestId2', 'contextTestClass2']], [['boolean', 'string'], ['interger', 'string']], [], true).split()|join(' '), "sde_plus_2_context_w_alias" : snowplow_normalize.normalize_events(['event_name'], ['app_id'], ['UNSTRUCT_EVENT_TEST_1_0_1'], [['testId', 'testClass']], [['string', 'boolean']], [], ['CONTEXTS_TEST_1_0_0', 'CONTEXTS_TEST2_1_0_5'], [['contextTestId', 'contextTestClass'],['contextTestId2', 'contextTestClass2'] ], [['boolean', 'string'], ['interger', 'string']], ['test1', 'test2'], true).split()|join(' '), "context_only" : snowplow_normalize.normalize_events(['event_name'], ['app_id'], [], [], [], [], ['CONTEXTS_TEST_1_0_0', 'CONTEXTS_TEST2_1_0_5'], [['contextTestId', 'contextTestClass'],['contextTestId2', 'contextTestClass2'] ], [['boolean', 'string'], ['interger', 'string']], [], true).split()|join(' '), + "context_only_new_loader" : snowplow_normalize.normalize_events(['event_name'], ['app_id'], [], [], [], [], ['CONTEXTS_TEST_1_0_0', 'CONTEXTS_TEST4_2'], [['contextTestId', 'contextTestClass'],['contextTestId2', 'contextTestClass2'] ], [['boolean', 'string'], ['interger', 'string']], [], true).split()|join(' '), "multiple_base_events" : snowplow_normalize.normalize_events(['event_name', 'page_ping'], ['app_id'], [], [], [], [], ['CONTEXTS_TEST_1_0_0', 'CONTEXTS_TEST2_1_0_5'], [['contextTestId', 'contextTestClass'],['contextTestId2', 'contextTestClass2'] ], [['boolean', 'string'], ['interger', 'string']], [], true).split()|join(' '), "multiple_sde_events" : snowplow_normalize.normalize_events(['event_name'], ['app_id'], ['UNSTRUCT_EVENT_TEST_1_0_1', 'UNSTRUCT_EVENT_TEST2_1_0_1'], [['testId', 'testClass'], ['testWord', 'testIdea']], [['number', 'string']], ['test1', 'test2'], ['CONTEXTS_TEST_1_0_0', 'CONTEXTS_TEST2_1_0_5'], [['contextTestId', 'contextTestClass'],['contextTestId2', 'contextTestClass2'] ], [['boolean', 'string'], ['interger', 'string']], [], true).split()|join(' ') } @@ -65,6 +65,7 @@ It runs 9 tests: {# {{ print(results_dict['sde_plus_2_context'])}} #} {# {{ print(results_dict['sde_plus_2_context_w_alias'])}} #} {# {{ print(results_dict['context_only'])}} #} + {# {{ print(results_dict['context_only_new_loader'])}} #} {# {{ print(results_dict['multiple_base_events'])}} #} {# {{ print(results_dict['multiple_sde_events'])}} #} @@ -78,15 +79,62 @@ It runs 9 tests: {% macro databricks__test_normalize_events() %} {% set expected_dict = { - "flat_cols_only" : "select event_id , collector_tstamp , DATE(collector_tstamp) as collector_tstamp_date -- Flat columns from event table , app_id -- self describing events columns from event table -- context column(s) from the event table from `"~target.catalog~"`."~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')", - "sde_plus_cols" : "select event_id , collector_tstamp , DATE(collector_tstamp) as collector_tstamp_date -- Flat columns from event table , app_id -- self describing events columns from event table , UNSTRUCT_EVENT_TEST_1.test_id as test_id , UNSTRUCT_EVENT_TEST_1.test_class as test_class -- context column(s) from the event table from `"~target.catalog~"`."~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')", - "sde_plus_cols_w_alias" : "select event_id , collector_tstamp , DATE(collector_tstamp) as collector_tstamp_date -- Flat columns from event table , app_id -- self describing events columns from event table , UNSTRUCT_EVENT_TEST_1.test_id as my_alias_test_id , UNSTRUCT_EVENT_TEST_1.test_class as my_alias_test_class -- context column(s) from the event table from `"~target.catalog~"`."~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')", - "sde_plus_1_context" : "select event_id , collector_tstamp , DATE(collector_tstamp) as collector_tstamp_date -- Flat columns from event table , app_id -- self describing events columns from event table , UNSTRUCT_EVENT_TEST_1.test_id as test_id , UNSTRUCT_EVENT_TEST_1.test_class as test_class -- context column(s) from the event table , CONTEXTS_TEST_1[0].context_test_id as context_test_id , CONTEXTS_TEST_1[0].context_test_class as context_test_class from `"~target.catalog~"`."~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')", - "sde_plus_2_context" : "select event_id , collector_tstamp , DATE(collector_tstamp) as collector_tstamp_date -- Flat columns from event table , app_id -- self describing events columns from event table , UNSTRUCT_EVENT_TEST_1.test_id as test_id , UNSTRUCT_EVENT_TEST_1.test_class as test_class -- context column(s) from the event table , CONTEXTS_TEST_1[0].context_test_id as context_test_id , CONTEXTS_TEST_1[0].context_test_class as context_test_class , CONTEXTS_TEST2_1[0].context_test_id2 as context_test_id2 , CONTEXTS_TEST2_1[0].context_test_class2 as context_test_class2 from `"~target.catalog~"`."~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')", - "sde_plus_2_context_w_alias" : "select event_id , collector_tstamp , DATE(collector_tstamp) as collector_tstamp_date -- Flat columns from event table , app_id -- self describing events columns from event table , UNSTRUCT_EVENT_TEST_1.test_id as test_id , UNSTRUCT_EVENT_TEST_1.test_class as test_class -- context column(s) from the event table , CONTEXTS_TEST_1[0].context_test_id as test1_context_test_id , CONTEXTS_TEST_1[0].context_test_class as test1_context_test_class , CONTEXTS_TEST2_1[0].context_test_id2 as test2_context_test_id2 , CONTEXTS_TEST2_1[0].context_test_class2 as test2_context_test_class2 from `"~target.catalog~"`."~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')", - "context_only" : "select event_id , collector_tstamp , DATE(collector_tstamp) as collector_tstamp_date -- Flat columns from event table , app_id -- self describing events columns from event table -- context column(s) from the event table , CONTEXTS_TEST_1[0].context_test_id as context_test_id , CONTEXTS_TEST_1[0].context_test_class as context_test_class , CONTEXTS_TEST2_1[0].context_test_id2 as context_test_id2 , CONTEXTS_TEST2_1[0].context_test_class2 as context_test_class2 from `"~target.catalog~"`."~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')", - "multiple_base_events" : "select event_id , collector_tstamp , DATE(collector_tstamp) as collector_tstamp_date -- Flat columns from event table , app_id -- self describing events columns from event table -- context column(s) from the event table , CONTEXTS_TEST_1[0].context_test_id as context_test_id , CONTEXTS_TEST_1[0].context_test_class as context_test_class , CONTEXTS_TEST2_1[0].context_test_id2 as context_test_id2 , CONTEXTS_TEST2_1[0].context_test_class2 as context_test_class2 from `"~target.catalog~"`."~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name','page_ping')", - "multiple_sde_events" : "select event_id , collector_tstamp , DATE(collector_tstamp) as collector_tstamp_date -- Flat columns from event table , app_id -- self describing events columns from event table , UNSTRUCT_EVENT_TEST_1.test_id as test1_test_id , UNSTRUCT_EVENT_TEST_1.test_class as test1_test_class , UNSTRUCT_EVENT_TEST2_1.test_word as test2_test_word , UNSTRUCT_EVENT_TEST2_1.test_idea as test2_test_idea -- context column(s) from the event table , CONTEXTS_TEST_1[0].context_test_id as context_test_id , CONTEXTS_TEST_1[0].context_test_class as context_test_class , CONTEXTS_TEST2_1[0].context_test_id2 as context_test_id2 , CONTEXTS_TEST2_1[0].context_test_class2 as context_test_class2 from `"~target.catalog~"`."~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')" + "flat_cols_only" : "select event_id , collector_tstamp , DATE(" ~ var('snowplow__partition_tstamp') ~ ") as " ~ var('snowplow__partition_tstamp') ~ "_date -- Flat columns from event table , app_id -- self describing events columns from event table -- context column(s) from the event table from `"~target.catalog~"`."~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')", + "sde_plus_cols" : "select event_id , collector_tstamp , DATE(" ~ var('snowplow__partition_tstamp') ~ ") as " ~ var('snowplow__partition_tstamp') ~ "_date -- Flat columns from event table , app_id -- self describing events columns from event table , UNSTRUCT_EVENT_TEST_1.test_id as test_id , UNSTRUCT_EVENT_TEST_1.test_class as test_class -- context column(s) from the event table from `"~target.catalog~"`."~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')", + "sde_plus_cols_w_alias" : "select event_id , collector_tstamp , DATE(" ~ var('snowplow__partition_tstamp') ~ ") as " ~ var('snowplow__partition_tstamp') ~ "_date -- Flat columns from event table , app_id -- self describing events columns from event table , UNSTRUCT_EVENT_TEST_1.test_id as my_alias_test_id , UNSTRUCT_EVENT_TEST_1.test_class as my_alias_test_class -- context column(s) from the event table from `"~target.catalog~"`."~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')", + "sde_plus_1_context" : "select event_id , collector_tstamp , DATE(" ~ var('snowplow__partition_tstamp') ~ ") as " ~ var('snowplow__partition_tstamp') ~ "_date -- Flat columns from event table , app_id -- self describing events columns from event table , UNSTRUCT_EVENT_TEST_1.test_id as test_id , UNSTRUCT_EVENT_TEST_1.test_class as test_class -- context column(s) from the event table , CONTEXTS_TEST_1[0].context_test_id as context_test_id , CONTEXTS_TEST_1[0].context_test_class as context_test_class from `"~target.catalog~"`."~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')", + "sde_plus_2_context" : "select event_id , collector_tstamp , DATE(" ~ var('snowplow__partition_tstamp') ~ ") as " ~ var('snowplow__partition_tstamp') ~ "_date -- Flat columns from event table , app_id -- self describing events columns from event table , UNSTRUCT_EVENT_TEST_1.test_id as test_id , UNSTRUCT_EVENT_TEST_1.test_class as test_class -- context column(s) from the event table , CONTEXTS_TEST_1[0].context_test_id as context_test_id , CONTEXTS_TEST_1[0].context_test_class as context_test_class , CONTEXTS_TEST2_1[0].context_test_id2 as context_test_id2 , CONTEXTS_TEST2_1[0].context_test_class2 as context_test_class2 from `"~target.catalog~"`."~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')", + "sde_plus_2_context_w_alias" : "select event_id , collector_tstamp , DATE(" ~ var('snowplow__partition_tstamp') ~ ") as " ~ var('snowplow__partition_tstamp') ~ "_date -- Flat columns from event table , app_id -- self describing events columns from event table , UNSTRUCT_EVENT_TEST_1.test_id as test_id , UNSTRUCT_EVENT_TEST_1.test_class as test_class -- context column(s) from the event table , CONTEXTS_TEST_1[0].context_test_id as test1_context_test_id , CONTEXTS_TEST_1[0].context_test_class as test1_context_test_class , CONTEXTS_TEST2_1[0].context_test_id2 as test2_context_test_id2 , CONTEXTS_TEST2_1[0].context_test_class2 as test2_context_test_class2 from `"~target.catalog~"`."~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')", + "context_only" : "select event_id , collector_tstamp , DATE(" ~ var('snowplow__partition_tstamp') ~ ") as " ~ var('snowplow__partition_tstamp') ~ "_date -- Flat columns from event table , app_id -- self describing events columns from event table -- context column(s) from the event table , CONTEXTS_TEST_1[0].context_test_id as context_test_id , CONTEXTS_TEST_1[0].context_test_class as context_test_class , CONTEXTS_TEST2_1[0].context_test_id2 as context_test_id2 , CONTEXTS_TEST2_1[0].context_test_class2 as context_test_class2 from `"~target.catalog~"`."~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')", + "multiple_base_events" : "select event_id , collector_tstamp , DATE(" ~ var('snowplow__partition_tstamp') ~ ") as " ~ var('snowplow__partition_tstamp') ~ "_date -- Flat columns from event table , app_id -- self describing events columns from event table -- context column(s) from the event table , CONTEXTS_TEST_1[0].context_test_id as context_test_id , CONTEXTS_TEST_1[0].context_test_class as context_test_class , CONTEXTS_TEST2_1[0].context_test_id2 as context_test_id2 , CONTEXTS_TEST2_1[0].context_test_class2 as context_test_class2 from `"~target.catalog~"`."~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name','page_ping')", + "multiple_sde_events" : "select event_id , collector_tstamp , DATE(" ~ var('snowplow__partition_tstamp') ~ ") as " ~ var('snowplow__partition_tstamp') ~ "_date -- Flat columns from event table , app_id -- self describing events columns from event table , UNSTRUCT_EVENT_TEST_1.test_id as test1_test_id , UNSTRUCT_EVENT_TEST_1.test_class as test1_test_class , UNSTRUCT_EVENT_TEST2_1.test_word as test2_test_word , UNSTRUCT_EVENT_TEST2_1.test_idea as test2_test_idea -- context column(s) from the event table , CONTEXTS_TEST_1[0].context_test_id as context_test_id , CONTEXTS_TEST_1[0].context_test_class as context_test_class , CONTEXTS_TEST2_1[0].context_test_id2 as context_test_id2 , CONTEXTS_TEST2_1[0].context_test_class2 as context_test_class2 from `"~target.catalog~"`."~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')" + } %} + + {% set results_dict ={ + "flat_cols_only" : snowplow_normalize.normalize_events(['event_name'], ['app_id'], [], [], [], [], [], [], [], [], true).split()|join(' '), + "sde_plus_cols" : snowplow_normalize.normalize_events(['event_name'], ['app_id'], ['UNSTRUCT_EVENT_TEST_1_0_1'], [['testId', 'testClass']], [['string', 'boolean']], [], [], [], [], [], true).split()|join(' '), + "sde_plus_cols_w_alias" : snowplow_normalize.normalize_events(['event_name'], ['app_id'], ['UNSTRUCT_EVENT_TEST_1_0_1'], [['testId', 'testClass']], [['string', 'boolean']], ['my_alias'], [], [], [], [], true).split()|join(' '), + "sde_plus_1_context" : snowplow_normalize.normalize_events(['event_name'], ['app_id'], ['UNSTRUCT_EVENT_TEST_1_0_1'], [['testId', 'testClass']], [['string', 'boolean']], [], ['CONTEXTS_TEST_1_0_0'], [['contextTestId', 'contextTestClass']], [['string', 'integer']], [], true).split()|join(' '), + "sde_plus_2_context" : snowplow_normalize.normalize_events(['event_name'], ['app_id'], ['UNSTRUCT_EVENT_TEST_1_0_1'], [['testId', 'testClass']], [['string', 'boolean']], [], ['CONTEXTS_TEST_1_0_0', 'CONTEXTS_TEST2_1_0_5'], [['contextTestId', 'contextTestClass'], ['contextTestId2', 'contextTestClass2']], [['boolean', 'string'], ['interger', 'string']], [], true).split()|join(' '), + "sde_plus_2_context_w_alias" : snowplow_normalize.normalize_events(['event_name'], ['app_id'], ['UNSTRUCT_EVENT_TEST_1_0_1'], [['testId', 'testClass']], [['string', 'boolean']], [], ['CONTEXTS_TEST_1_0_0', 'CONTEXTS_TEST2_1_0_5'], [['contextTestId', 'contextTestClass'],['contextTestId2', 'contextTestClass2'] ], [['boolean', 'string'], ['interger', 'string']], ['test1', 'test2'], true).split()|join(' '), + "context_only" : snowplow_normalize.normalize_events(['event_name'], ['app_id'], [], [], [], [], ['CONTEXTS_TEST_1_0_0', 'CONTEXTS_TEST2_1_0_5'], [['contextTestId', 'contextTestClass'],['contextTestId2', 'contextTestClass2'] ], [['boolean', 'string'], ['interger', 'string']], [], true).split()|join(' '), + "multiple_base_events" : snowplow_normalize.normalize_events(['event_name', 'page_ping'], ['app_id'], [], [], [], [], ['CONTEXTS_TEST_1_0_0', 'CONTEXTS_TEST2_1_0_5'], [['contextTestId', 'contextTestClass'],['contextTestId2', 'contextTestClass2'] ], [['boolean', 'string'], ['interger', 'string']], [], true).split()|join(' '), + "multiple_sde_events" : snowplow_normalize.normalize_events(['event_name'], ['app_id'], ['UNSTRUCT_EVENT_TEST_1_0_1', 'UNSTRUCT_EVENT_TEST2_1_0_1'], [['testId', 'testClass'], ['testWord', 'testIdea']], [['number', 'string']], ['test1', 'test2'], ['CONTEXTS_TEST_1_0_0', 'CONTEXTS_TEST2_1_0_5'], [['contextTestId', 'contextTestClass'],['contextTestId2', 'contextTestClass2'] ], [['boolean', 'string'], ['interger', 'string']], [], true).split()|join(' ') + } + %} + + + {# {{ print(results_dict['flat_cols_only'])}} #} + {# {{ print(results_dict['sde_plus_cols'])}} #} + {# {{ print(results_dict['sde_plus_cols_w_alias'])}} #} + {# {{ print(results_dict['sde_plus_1_context'])}} #} + {# {{ print(results_dict['sde_plus_2_context'])}} #} + {# {{ print(results_dict['sde_plus_2_context_w_alias'])}} #} + {# {{ print(results_dict['context_only'])}} #} + {# {{ print(results_dict['multiple_base_events'])}} #} + {# {{ print(results_dict['multiple_sde_events'])}} #} + + + {{ dbt_unittest.assert_dict_equals(expected_dict, results_dict) }} + + +{% endmacro %} + + +{% macro spark__test_normalize_events() %} + + -- Main difference here is that spark doesnt need the catalog in the from clause + + {% set expected_dict = { + "flat_cols_only" : "select event_id , collector_tstamp , DATE(" ~ var('snowplow__partition_tstamp') ~ ") as " ~ var('snowplow__partition_tstamp') ~ "_date -- Flat columns from event table , app_id -- self describing events columns from event table -- context column(s) from the event table from "~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')", + "sde_plus_cols" : "select event_id , collector_tstamp , DATE(" ~ var('snowplow__partition_tstamp') ~ ") as " ~ var('snowplow__partition_tstamp') ~ "_date -- Flat columns from event table , app_id -- self describing events columns from event table , UNSTRUCT_EVENT_TEST_1.test_id as test_id , UNSTRUCT_EVENT_TEST_1.test_class as test_class -- context column(s) from the event table from "~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')", + "sde_plus_cols_w_alias" : "select event_id , collector_tstamp , DATE(" ~ var('snowplow__partition_tstamp') ~ ") as " ~ var('snowplow__partition_tstamp') ~ "_date -- Flat columns from event table , app_id -- self describing events columns from event table , UNSTRUCT_EVENT_TEST_1.test_id as my_alias_test_id , UNSTRUCT_EVENT_TEST_1.test_class as my_alias_test_class -- context column(s) from the event table from "~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')", + "sde_plus_1_context" : "select event_id , collector_tstamp , DATE(" ~ var('snowplow__partition_tstamp') ~ ") as " ~ var('snowplow__partition_tstamp') ~ "_date -- Flat columns from event table , app_id -- self describing events columns from event table , UNSTRUCT_EVENT_TEST_1.test_id as test_id , UNSTRUCT_EVENT_TEST_1.test_class as test_class -- context column(s) from the event table , CONTEXTS_TEST_1[0].context_test_id as context_test_id , CONTEXTS_TEST_1[0].context_test_class as context_test_class from "~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')", + "sde_plus_2_context" : "select event_id , collector_tstamp , DATE(" ~ var('snowplow__partition_tstamp') ~ ") as " ~ var('snowplow__partition_tstamp') ~ "_date -- Flat columns from event table , app_id -- self describing events columns from event table , UNSTRUCT_EVENT_TEST_1.test_id as test_id , UNSTRUCT_EVENT_TEST_1.test_class as test_class -- context column(s) from the event table , CONTEXTS_TEST_1[0].context_test_id as context_test_id , CONTEXTS_TEST_1[0].context_test_class as context_test_class , CONTEXTS_TEST2_1[0].context_test_id2 as context_test_id2 , CONTEXTS_TEST2_1[0].context_test_class2 as context_test_class2 from "~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')", + "sde_plus_2_context_w_alias" : "select event_id , collector_tstamp , DATE(" ~ var('snowplow__partition_tstamp') ~ ") as " ~ var('snowplow__partition_tstamp') ~ "_date -- Flat columns from event table , app_id -- self describing events columns from event table , UNSTRUCT_EVENT_TEST_1.test_id as test_id , UNSTRUCT_EVENT_TEST_1.test_class as test_class -- context column(s) from the event table , CONTEXTS_TEST_1[0].context_test_id as test1_context_test_id , CONTEXTS_TEST_1[0].context_test_class as test1_context_test_class , CONTEXTS_TEST2_1[0].context_test_id2 as test2_context_test_id2 , CONTEXTS_TEST2_1[0].context_test_class2 as test2_context_test_class2 from "~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')", + "context_only" : "select event_id , collector_tstamp , DATE(" ~ var('snowplow__partition_tstamp') ~ ") as " ~ var('snowplow__partition_tstamp') ~ "_date -- Flat columns from event table , app_id -- self describing events columns from event table -- context column(s) from the event table , CONTEXTS_TEST_1[0].context_test_id as context_test_id , CONTEXTS_TEST_1[0].context_test_class as context_test_class , CONTEXTS_TEST2_1[0].context_test_id2 as context_test_id2 , CONTEXTS_TEST2_1[0].context_test_class2 as context_test_class2 from "~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')", + "multiple_base_events" : "select event_id , collector_tstamp , DATE(" ~ var('snowplow__partition_tstamp') ~ ") as " ~ var('snowplow__partition_tstamp') ~ "_date -- Flat columns from event table , app_id -- self describing events columns from event table -- context column(s) from the event table , CONTEXTS_TEST_1[0].context_test_id as context_test_id , CONTEXTS_TEST_1[0].context_test_class as context_test_class , CONTEXTS_TEST2_1[0].context_test_id2 as context_test_id2 , CONTEXTS_TEST2_1[0].context_test_class2 as context_test_class2 from "~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name','page_ping')", + "multiple_sde_events" : "select event_id , collector_tstamp , DATE(" ~ var('snowplow__partition_tstamp') ~ ") as " ~ var('snowplow__partition_tstamp') ~ "_date -- Flat columns from event table , app_id -- self describing events columns from event table , UNSTRUCT_EVENT_TEST_1.test_id as test1_test_id , UNSTRUCT_EVENT_TEST_1.test_class as test1_test_class , UNSTRUCT_EVENT_TEST2_1.test_word as test2_test_word , UNSTRUCT_EVENT_TEST2_1.test_idea as test2_test_idea -- context column(s) from the event table , CONTEXTS_TEST_1[0].context_test_id as context_test_id , CONTEXTS_TEST_1[0].context_test_class as context_test_class , CONTEXTS_TEST2_1[0].context_test_id2 as context_test_id2 , CONTEXTS_TEST2_1[0].context_test_class2 as context_test_class2 from "~target.schema~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')" } %} {% set results_dict ={ diff --git a/integration_tests/macros/test_users_table.sql b/integration_tests/macros/test_users_table.sql index 3be634f..519692f 100644 --- a/integration_tests/macros/test_users_table.sql +++ b/integration_tests/macros/test_users_table.sql @@ -71,6 +71,49 @@ It runs 6 tests: {% endmacro %} +{% macro spark__test_users_table() %} + -- Main difference here is that spark doesnt need the catalog in the from clause + {% set expected_dict = { + "1_context" : "with defined_user_id as ( select user_id as user_id , collector_tstamp as latest_collector_tstamp , DATE(collector_tstamp) as latest_collector_tstamp_date -- Flat columns from event table -- user column(s) from the event table , CONTEXTS_TEST_1[0].context_test_id as context_test_id , CONTEXTS_TEST_1[0].context_test_class as context_test_class from "~target.schema~"_scratch.snowplow_normalize_base_events_this_run where 1 = 1 ), users_ordering as ( select a.* , row_number() over (partition by user_id order by latest_collector_tstamp desc) as rn from defined_user_id a where user_id is not null ) select * except (rn) from users_ordering where rn = 1", + "2_context" : "with defined_user_id as ( select user_id as user_id , collector_tstamp as latest_collector_tstamp , DATE(collector_tstamp) as latest_collector_tstamp_date -- Flat columns from event table -- user column(s) from the event table , CONTEXTS_TEST_1[0].context_test_id as context_test_id , CONTEXTS_TEST_1[0].context_test_class as context_test_class , CONTEXT_TEST2_1[0].context_test_id2 as context_test_id2 , CONTEXT_TEST2_1[0].context_test_class2 as context_test_class2 from "~target.schema~"_scratch.snowplow_normalize_base_events_this_run where 1 = 1 ), users_ordering as ( select a.* , row_number() over (partition by user_id order by latest_collector_tstamp desc) as rn from defined_user_id a where user_id is not null ) select * except (rn) from users_ordering where rn = 1", + "custom_user_field" : "with defined_user_id as ( select test_id as user_id , collector_tstamp as latest_collector_tstamp , DATE(collector_tstamp) as latest_collector_tstamp_date -- Flat columns from event table -- user column(s) from the event table , CONTEXTS_TEST_1[0].context_test_id as context_test_id , CONTEXTS_TEST_1[0].context_test_class as context_test_class , CONTEXT_TEST2_1[0].context_test_id2 as context_test_id2 , CONTEXT_TEST2_1[0].context_test_class2 as context_test_class2 from "~target.schema~"_scratch.snowplow_normalize_base_events_this_run where 1 = 1 ), users_ordering as ( select a.* , row_number() over (partition by user_id order by latest_collector_tstamp desc) as rn from defined_user_id a where user_id is not null ) select * except (rn) from users_ordering where rn = 1", + "custom_user_field_sde" : "with defined_user_id as ( select UNSTRUCT_EVENT_COM_GOOGLE_ANALYTICS_MEASUREMENT_PROTOCOL_USER_1.test_id as user_id , collector_tstamp as latest_collector_tstamp , DATE(collector_tstamp) as latest_collector_tstamp_date -- Flat columns from event table -- user column(s) from the event table , CONTEXTS_TEST_1[0].context_test_id as context_test_id , CONTEXTS_TEST_1[0].context_test_class as context_test_class , CONTEXT_TEST2_1[0].context_test_id2 as context_test_id2 , CONTEXT_TEST2_1[0].context_test_class2 as context_test_class2 from "~target.schema~"_scratch.snowplow_normalize_base_events_this_run where 1 = 1 ), users_ordering as ( select a.* , row_number() over (partition by user_id order by latest_collector_tstamp desc) as rn from defined_user_id a where user_id is not null ) select * except (rn) from users_ordering where rn = 1", + "custom_user_field_context" : "with defined_user_id as ( select CONTEXTS_COM_ZENDESK_SNOWPLOW_USER_1[0].test_id as user_id , collector_tstamp as latest_collector_tstamp , DATE(collector_tstamp) as latest_collector_tstamp_date -- Flat columns from event table -- user column(s) from the event table , CONTEXTS_TEST_1[0].context_test_id as context_test_id , CONTEXTS_TEST_1[0].context_test_class as context_test_class , CONTEXT_TEST2_1[0].context_test_id2 as context_test_id2 , CONTEXT_TEST2_1[0].context_test_class2 as context_test_class2 from "~target.schema~"_scratch.snowplow_normalize_base_events_this_run where 1 = 1 ), users_ordering as ( select a.* , row_number() over (partition by user_id order by latest_collector_tstamp desc) as rn from defined_user_id a where user_id is not null ) select * except (rn) from users_ordering where rn = 1", + "custom_user_field_both" : "with defined_user_id as ( select UNSTRUCT_EVENT_COM_GOOGLE_ANALYTICS_MEASUREMENT_PROTOCOL_USER_1.test_id as user_id , collector_tstamp as latest_collector_tstamp , DATE(collector_tstamp) as latest_collector_tstamp_date -- Flat columns from event table -- user column(s) from the event table , CONTEXTS_TEST_1[0].context_test_id as context_test_id , CONTEXTS_TEST_1[0].context_test_class as context_test_class , CONTEXT_TEST2_1[0].context_test_id2 as context_test_id2 , CONTEXT_TEST2_1[0].context_test_class2 as context_test_class2 from "~target.schema~"_scratch.snowplow_normalize_base_events_this_run where 1 = 1 ), users_ordering as ( select a.* , row_number() over (partition by user_id order by latest_collector_tstamp desc) as rn from defined_user_id a where user_id is not null ) select * except (rn) from users_ordering where rn = 1", + "custom_user_field_both_w_alias" : "with defined_user_id as ( select UNSTRUCT_EVENT_COM_GOOGLE_ANALYTICS_MEASUREMENT_PROTOCOL_USER_1.test_id as my_user_id , collector_tstamp as latest_collector_tstamp , DATE(collector_tstamp) as latest_collector_tstamp_date -- Flat columns from event table -- user column(s) from the event table , CONTEXTS_TEST_1[0].context_test_id as context_test_id , CONTEXTS_TEST_1[0].context_test_class as context_test_class , CONTEXT_TEST2_1[0].context_test_id2 as context_test_id2 , CONTEXT_TEST2_1[0].context_test_class2 as context_test_class2 from "~target.schema~"_scratch.snowplow_normalize_base_events_this_run where 1 = 1 ), users_ordering as ( select a.* , row_number() over (partition by my_user_id order by latest_collector_tstamp desc) as rn from defined_user_id a where my_user_id is not null ) select * except (rn) from users_ordering where rn = 1", + "custom_user_field_both_w_alias_and_flat" : "with defined_user_id as ( select UNSTRUCT_EVENT_COM_GOOGLE_ANALYTICS_MEASUREMENT_PROTOCOL_USER_1.test_id as my_user_id , collector_tstamp as latest_collector_tstamp , DATE(collector_tstamp) as latest_collector_tstamp_date -- Flat columns from event table , app_id , network_user_id -- user column(s) from the event table , CONTEXTS_TEST_1[0].context_test_id as context_test_id , CONTEXTS_TEST_1[0].context_test_class as context_test_class , CONTEXT_TEST2_1[0].context_test_id2 as context_test_id2 , CONTEXT_TEST2_1[0].context_test_class2 as context_test_class2 from "~target.schema~"_scratch.snowplow_normalize_base_events_this_run where 1 = 1 ), users_ordering as ( select a.* , row_number() over (partition by my_user_id order by latest_collector_tstamp desc) as rn from defined_user_id a where my_user_id is not null ) select * except (rn) from users_ordering where rn = 1" + } %} + + + + {% set results_dict ={ + "1_context" : snowplow_normalize.users_table('user_id', '', '', ['CONTEXTS_TEST_1_0_0'], [['contextTestId', 'contextTestClass']], [['string', 'integer']], remove_new_event_check = true).split()|join(' '), + "2_context" : snowplow_normalize.users_table('user_id', '', '',['CONTEXTS_TEST_1_0_0', 'CONTEXT_TEST2_1_0_5'], [['contextTestId', 'contextTestClass'], ['contextTestId2', 'contextTestClass2']], [['boolean', 'string'], ['interger', 'string']], remove_new_event_check = true).split()|join(' '), + "custom_user_field" : snowplow_normalize.users_table('testId', '', '',['CONTEXTS_TEST_1_0_0', 'CONTEXT_TEST2_1_0_5'], [['contextTestId', 'contextTestClass'], ['contextTestId2', 'contextTestClass2']], [['boolean', 'string'], ['interger', 'string']], remove_new_event_check = true).split()|join(' '), + "custom_user_field_sde" : snowplow_normalize.users_table('testId', 'UNSTRUCT_EVENT_COM_GOOGLE_ANALYTICS_MEASUREMENT_PROTOCOL_USER_1_0_0', '',['CONTEXTS_TEST_1_0_0', 'CONTEXT_TEST2_1_0_5'], [['contextTestId', 'contextTestClass'], ['contextTestId2', 'contextTestClass2']], [['boolean', 'string'], ['interger', 'string']], remove_new_event_check = true).split()|join(' '), + "custom_user_field_context" : snowplow_normalize.users_table('testId', '', 'CONTEXTS_COM_ZENDESK_SNOWPLOW_USER_1_0_0',['CONTEXTS_TEST_1_0_0', 'CONTEXT_TEST2_1_0_5'], [['contextTestId', 'contextTestClass'], ['contextTestId2', 'contextTestClass2']], [['boolean', 'string'], ['interger', 'string']], remove_new_event_check = true).split()|join(' '), + "custom_user_field_both" : snowplow_normalize.users_table('testId', 'UNSTRUCT_EVENT_COM_GOOGLE_ANALYTICS_MEASUREMENT_PROTOCOL_USER_1_0_0', 'CONTEXTS_COM_ZENDESK_SNOWPLOW_USER_1_0_0',['CONTEXTS_TEST_1_0_0', 'CONTEXT_TEST2_1_0_5'], [['contextTestId', 'contextTestClass'], ['contextTestId2', 'contextTestClass2']], [['boolean', 'string'], ['interger', 'string']], remove_new_event_check = true).split()|join(' '), + "custom_user_field_both_w_alias" : snowplow_normalize.users_table('testId', 'UNSTRUCT_EVENT_COM_GOOGLE_ANALYTICS_MEASUREMENT_PROTOCOL_USER_1_0_0', 'CONTEXTS_COM_ZENDESK_SNOWPLOW_USER_1_0_0',['CONTEXTS_TEST_1_0_0', 'CONTEXT_TEST2_1_0_5'], [['contextTestId', 'contextTestClass'], ['contextTestId2', 'contextTestClass2']], [['boolean', 'string'], ['interger', 'string']], 'my_user_id', remove_new_event_check = true).split()|join(' '), + "custom_user_field_both_w_alias_and_flat" : snowplow_normalize.users_table('testId', 'UNSTRUCT_EVENT_COM_GOOGLE_ANALYTICS_MEASUREMENT_PROTOCOL_USER_1_0_0', 'CONTEXTS_COM_ZENDESK_SNOWPLOW_USER_1_0_0',['CONTEXTS_TEST_1_0_0', 'CONTEXT_TEST2_1_0_5'], [['contextTestId', 'contextTestClass'], ['contextTestId2', 'contextTestClass2']], [['boolean', 'string'], ['interger', 'string']], 'my_user_id', ['app_id', 'network_user_id'], remove_new_event_check = true).split()|join(' '), + } + %} + + + {# {{ print(results_dict['1_context'])}} #} + {# {{ print(results_dict['2_context'])}} #} + {# {{ print(results_dict['custom_user_field'])}} #} + {# {{ print(results_dict['custom_user_field_sde'])}} #} + {# {{ print(results_dict['custom_user_field_context'])}} #} + {# {{ print(results_dict['custom_user_field_both'])}} #} + {# {{ print(results_dict['custom_user_field_both_w_alias'])}} #} + {# {{ print(results_dict['custom_user_field_both_w_alias_and_flat'])}} #} + + + {{ dbt_unittest.assert_equals(expected_dict, results_dict) }} + + +{% endmacro %} + {% macro databricks__test_users_table() %} {% set expected_dict = { diff --git a/integration_tests/models/dummy_model/bigquery/snowplow_normalize_stg.sql b/integration_tests/models/dummy_model/bigquery/snowplow_normalize_stg.sql index 1f79d0c..03b4f8c 100644 --- a/integration_tests/models/dummy_model/bigquery/snowplow_normalize_stg.sql +++ b/integration_tests/models/dummy_model/bigquery/snowplow_normalize_stg.sql @@ -8,21 +8,23 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0 with prep as ( select * - except(contexts_test_1_0_0,contexts_test2_1_0_0,contexts_test2_1_0_1,contexts_test2_1_0_2,contexts_test2_1_0_3,contexts_test2_1_0_4,contexts_test2_1_0_5), + except(contexts_test_1_0_0,contexts_test2_1_0_0,contexts_test2_1_0_1,contexts_test2_1_0_2,contexts_test2_1_0_3,contexts_test2_1_0_4,contexts_test2_1_0_5, contexts_test4_1,contexts_test4_2), JSON_EXTRACT_ARRAY(contexts_test_1_0_0) AS contexts_test_1_0_0, JSON_EXTRACT_ARRAY(contexts_test2_1_0_0) AS contexts_test2_1_0_0, JSON_EXTRACT_ARRAY(contexts_test2_1_0_1) AS contexts_test2_1_0_1, JSON_EXTRACT_ARRAY(contexts_test2_1_0_2) AS contexts_test2_1_0_2, JSON_EXTRACT_ARRAY(contexts_test2_1_0_3) AS contexts_test2_1_0_3, JSON_EXTRACT_ARRAY(contexts_test2_1_0_4) AS contexts_test2_1_0_4, - JSON_EXTRACT_ARRAY(contexts_test2_1_0_5) AS contexts_test2_1_0_5 + JSON_EXTRACT_ARRAY(contexts_test2_1_0_5) AS contexts_test2_1_0_5, + JSON_EXTRACT_ARRAY(contexts_test4_1) AS contexts_test4_1, + JSON_EXTRACT_ARRAY(contexts_test4_2) AS contexts_test4_2 from {{ ref('snowplow_norm_dummy_events') }} ) -- recreate repeated record field i.e. array of structs as is originally in BQ events table select - * except(unstruct_event_test_1_0_0,unstruct_event_test_1_0_1,unstruct_event_test2_1_0_0,unstruct_event_test2_1_0_1,contexts_test_1_0_0,contexts_test2_1_0_0,contexts_test2_1_0_1,contexts_test2_1_0_2,contexts_test2_1_0_3,contexts_test2_1_0_4,contexts_test2_1_0_5), + * except(unstruct_event_test_1_0_0,unstruct_event_test_1_0_1,unstruct_event_test2_1_0_0,unstruct_event_test2_1_0_1,contexts_test_1_0_0,contexts_test2_1_0_0,contexts_test2_1_0_1,contexts_test2_1_0_2,contexts_test2_1_0_3,contexts_test2_1_0_4,contexts_test2_1_0_5,contexts_test4_1,contexts_test4_2), -- order is reversed to test the aliasing of the coalesced columns struct(JSON_EXTRACT_scalar(unstruct_event_test_1_0_0, '$.test_class') as test_class,JSON_EXTRACT_scalar(unstruct_event_test_1_0_0, '$.test_id') as test_id) as unstruct_event_test_1_0_0, struct(JSON_EXTRACT_scalar(unstruct_event_test_1_0_1, '$.test_class') as test_class,JSON_EXTRACT_scalar(unstruct_event_test_1_0_1, '$.test_id') as test_id) as unstruct_event_test_1_0_1, @@ -56,7 +58,15 @@ select array( select struct(JSON_EXTRACT_scalar(json_array,'$.context_test_class2') as context_test_class2, JSON_EXTRACT_scalar(json_array,'$.context_test_id2') as context_test_id2) from unnest(contexts_test2_1_0_5) as json_array - ) as contexts_test2_1_0_5 - + ) as contexts_test2_1_0_5, + array( + select struct(JSON_EXTRACT_scalar(json_array,'$.context_test_class2') as context_test_class2, JSON_EXTRACT_scalar(json_array,'$.context_test_id2') as context_test_id2) + from unnest(contexts_test4_1) as json_array + ) as contexts_test4_1, + array( + select struct(JSON_EXTRACT_scalar(json_array,'$.context_test_class2') as context_test_class2, JSON_EXTRACT_scalar(json_array,'$.context_test_id2') as context_test_id2) + from unnest(contexts_test4_2) as json_array + ) as contexts_test4_2 + from prep diff --git a/integration_tests/models/dummy_model/spark/int_test_dummy_model.sql b/integration_tests/models/dummy_model/spark/int_test_dummy_model.sql new file mode 100644 index 0000000..39d0df8 --- /dev/null +++ b/integration_tests/models/dummy_model/spark/int_test_dummy_model.sql @@ -0,0 +1,12 @@ +{# +Copyright (c) 2022-present Snowplow Analytics Ltd. All rights reserved. +This program is licensed to you under the Snowplow Personal and Academic License Version 1.0, +and you may not use this file except in compliance with the Snowplow Personal and Academic License Version 1.0. +You may obtain a copy of the Snowplow Personal and Academic License Version 1.0 at https://docs.snowplow.io/personal-and-academic-license-1.0/ +#} + +{{ config( + tags = "snowplow_normalize_incremental", +) }} + +select 1 as dummy from {{ ref ('snowplow_normalize_base_events_this_run')}} diff --git a/integration_tests/models/dummy_model/spark/snowplow_normalize_stg.sql b/integration_tests/models/dummy_model/spark/snowplow_normalize_stg.sql new file mode 100644 index 0000000..6923b1a --- /dev/null +++ b/integration_tests/models/dummy_model/spark/snowplow_normalize_stg.sql @@ -0,0 +1,10 @@ +{# +Copyright (c) 2022-present Snowplow Analytics Ltd. All rights reserved. +This program is licensed to you under the Snowplow Personal and Academic License Version 1.0, +and you may not use this file except in compliance with the Snowplow Personal and Academic License Version 1.0. +You may obtain a copy of the Snowplow Personal and Academic License Version 1.0 at https://docs.snowplow.io/personal-and-academic-license-1.0/ +#} + +-- we don't actually use this data at all, we just need the model so the graph can build +select * +from {{ ref('snowplow_norm_dummy_events') }} diff --git a/macros/normalize_events.sql b/macros/normalize_events.sql index ee0f784..ca07fd9 100644 --- a/macros/normalize_events.sql +++ b/macros/normalize_events.sql @@ -65,16 +65,42 @@ where {% macro bigquery__normalize_events(event_names, flat_cols = [], sde_cols = [], sde_keys = [], sde_types = [], sde_aliases = [], context_cols = [], context_keys = [], context_types = [], context_aliases = [], remove_new_event_check = false) %} -{# Remove down to major version for bigquery combine columns macro, drop 2 last _X values #} -{%- set sde_cols_clean = [] -%} -{%- for ind in range(sde_cols|length) -%} - {% do sde_cols_clean.append('_'.join(sde_cols[ind].split('_')[:-2])) -%} -{%- endfor -%} -{%- set context_cols_clean = [] -%} -{%- for ind in range(context_cols|length) -%} - {% do context_cols_clean.append('_'.join(context_cols[ind].split('_')[:-2])) -%} -{%- endfor -%} + {# Handle both versioned and unversioned column names #} + {%- set re = modules.re -%} + + {# + This regex pattern handles column versioning in Snowplow contexts and self-describing events. + It specifically targets three-part semantic versions (e.g., field_1_2_3) while preserving + one-part (field_1) and two-part (field_1_2) versions. + + Pattern breakdown: '(_\\d+)_\\d+_\\d+$' + - (_\\d+) : Capture group that matches an underscore followed by one or more digits + This captures the major version number (e.g., "_1" in "field_1_2_3") + - _\\d+ : Matches an underscore and one or more digits (minor version) + - _\\d+ : Matches an underscore and one or more digits (patch version) + - $ : Ensures the pattern only matches at the end of the string + The replacement pattern '\\1' keeps only the captured major version. + + Examples: + - field_1 -> field_1 (no change - only has major version) + - field_1_2 -> field_1_2 (no change - has major and minor versions) + - field_1_2_3 -> field_1 (transforms - removes minor and patch versions) + #} + {%- set version_pattern = '(_\\d+)_\\d+_\\d+$' -%} + + {%- set sde_cols_clean = [] -%} + {%- for col in sde_cols -%} + {# Get the base name for combine_column_versions to work with #} + {%- set clean_name = re.sub(version_pattern, '\\1', col) -%} + {% do sde_cols_clean.append(clean_name) -%} + {%- endfor -%} + + {%- set context_cols_clean = [] -%} + {%- for col in context_cols -%} + {%- set clean_name = re.sub(version_pattern, '\\1', col) -%} + {% do context_cols_clean.append(clean_name) -%} + {%- endfor -%} {# Replace keys with snake_case where needed #} {%- set sde_keys_clean = [] -%} {%- set context_keys_clean = [] -%} @@ -86,7 +112,6 @@ where {%- endfor -%} {% do sde_keys_clean.append(sde_key_clean) -%} {%- endfor -%} - {%- for ind1 in range(context_keys|length) -%} {%- set context_key_clean = [] -%} {%- for ind2 in range(context_keys[ind1]|length) -%} @@ -119,10 +144,10 @@ select {%- set required_aliases = sde_keys_clean[col_ind] -%} {%- endif -%} {%- set sde_col_list = snowplow_utils.combine_column_versions( - relation=ref('snowplow_normalize_base_events_this_run'), - column_prefix=col.lower(), - required_fields = zip(sde_keys_clean[col_ind], required_aliases) - ) -%} + relation=ref('snowplow_normalize_base_events_this_run'), + column_prefix=col.lower(), + required_fields = zip(sde_keys_clean[col_ind], required_aliases) + ) -%} {%- for field, key_ind in zip(sde_col_list, range(sde_col_list|length)) -%} {# Loop over each key within the column, appling the bespoke alias as needed #} , {{field}} {% endfor -%} @@ -159,7 +184,7 @@ where {%- endif -%} {% endmacro %} -{% macro databricks__normalize_events(event_names, flat_cols = [], sde_cols = [], sde_keys = [], sde_types = [], sde_aliases = [], context_cols = [], context_keys = [], context_types = [], context_aliases = [], remove_new_event_check = false) %} +{% macro spark__normalize_events(event_names, flat_cols = [], sde_cols = [], sde_keys = [], sde_types = [], sde_aliases = [], context_cols = [], context_keys = [], context_types = [], context_aliases = [], remove_new_event_check = false) %} {# Remove down to major version for Databricks columns, drop 2 last _X values #} {%- set sde_cols_clean = [] -%} {%- for ind in range(sde_cols|length) -%} @@ -195,7 +220,7 @@ select event_id , collector_tstamp {% if target.type in ['databricks', 'spark'] -%} - , DATE(collector_tstamp) as collector_tstamp_date + , DATE({{var("snowplow__partition_tstamp")}}) as {{var("snowplow__partition_tstamp")}}_date {%- endif %} -- Flat columns from event table {% if flat_cols|length > 0 %} diff --git a/macros/rename_partition_tstamp_date.sql b/macros/rename_partition_tstamp_date.sql new file mode 100644 index 0000000..408a260 --- /dev/null +++ b/macros/rename_partition_tstamp_date.sql @@ -0,0 +1,14 @@ +{% macro rename_partition_tstamp_date() %} + {{ return(adapter.dispatch('rename_partition_tstamp_date', 'snowplow_normalize')()) }} +{% endmacro %} + +{% macro default__rename_partition_tstamp_date() %} + + + {% set rename_partition_tstamp_date = var('snowplow__partition_tstamp')~"_date" %} + + {{ log("Rename partition to: " ~ rename_partition_tstamp_date)}} + + {{ return(rename_partition_tstamp_date) }} + +{% endmacro %} diff --git a/macros/schema.yml b/macros/schema.yml index 3fc4402..8b2f2cb 100644 --- a/macros/schema.yml +++ b/macros/schema.yml @@ -73,3 +73,5 @@ macros: - name: text type: string description: the text to convert to snakecase + - name: rename_partition_tstamp_date + description: Takes the partition_tstamp column and renames it to add the date suffix, currently only used in Databricks \ No newline at end of file diff --git a/macros/users_table.sql b/macros/users_table.sql index b389c7b..d0d0194 100644 --- a/macros/users_table.sql +++ b/macros/users_table.sql @@ -167,7 +167,7 @@ where rn = 1 {% endmacro %} -{% macro databricks__users_table(user_id_field = 'user_id', user_id_sde = '', user_id_context = '', user_cols = [], user_keys = [], user_types = [], user_id_alias = 'user_id', flat_cols = [], remove_new_event_check = false) %} +{% macro spark__users_table(user_id_field = 'user_id', user_id_sde = '', user_id_context = '', user_cols = [], user_keys = [], user_types = [], user_id_alias = 'user_id', flat_cols = [], remove_new_event_check = false) %} {# Remove down to major version for Databricks columns, drop 2 last _X values #} {%- set user_cols_clean = [] -%} {%- for ind in range(user_cols|length) -%} diff --git a/models/base/scratch/snowplow_normalize_base_events_this_run.sql b/models/base/scratch/snowplow_normalize_base_events_this_run.sql index 46acd94..966ebd5 100644 --- a/models/base/scratch/snowplow_normalize_base_events_this_run.sql +++ b/models/base/scratch/snowplow_normalize_base_events_this_run.sql @@ -14,24 +14,38 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0 {%- set lower_limit, upper_limit, session_start_limit = snowplow_utils.return_base_new_event_limits(ref('snowplow_normalize_base_new_event_limits')) %} -select - a.* +with prep AS ( -from {{ var('snowplow__events') }} as a + select + a.* -where - {# dvce_sent_tstamp is an optional field and not all trackers/webhooks populate it, this means this filter needs to be optional #} - {% if var("snowplow__days_late_allowed") == -1 %} - 1 = 1 - {% else %} - a.dvce_sent_tstamp <= {{ snowplow_utils.timestamp_add('day', var("snowplow__days_late_allowed", 3), 'a.dvce_created_tstamp') }} + {% if target.type not in ['databricks','snowflake','bigquery'] %} + , row_number() over (partition by a.event_id order by a.collector_tstamp{% if target.type in ['databricks', 'spark'] -%}, a.dvce_sent_tstamp {%- endif %}) as rn {% endif %} - and a.{{ var('snowplow__session_timestamp', 'collector_tstamp') }} >= {{ lower_limit }} - and a.{{ var('snowplow__session_timestamp', 'collector_tstamp') }} <= {{ upper_limit }} - {% if var('snowplow__derived_tstamp_partitioned', true) and target.type == 'bigquery' | as_bool() %} - and a.derived_tstamp >= {{ snowplow_utils.timestamp_add('hour', -1, lower_limit) }} - and a.derived_tstamp <= {{ upper_limit }} + + from {{ var('snowplow__events') }} as a + where + {# dvce_sent_tstamp is an optional field and not all trackers/webhooks populate it, this means this filter needs to be optional #} + {% if var("snowplow__days_late_allowed") == -1 %} + 1 = 1 + {% else %} + a.dvce_sent_tstamp <= {{ snowplow_utils.timestamp_add('day', var("snowplow__days_late_allowed", 3), 'a.dvce_created_tstamp') }} + {% endif %} + and a.{{ var('snowplow__session_timestamp', 'collector_tstamp') }} >= {{ lower_limit }} + and a.{{ var('snowplow__session_timestamp', 'collector_tstamp') }} <= {{ upper_limit }} + {% if var('snowplow__derived_tstamp_partitioned', true) and target.type == 'bigquery' | as_bool() %} + and a.derived_tstamp >= {{ snowplow_utils.timestamp_add('hour', -1, lower_limit) }} + and a.derived_tstamp <= {{ upper_limit }} + {% endif %} + and {{ snowplow_utils.app_id_filter(var("snowplow__app_id",[])) }} + {# We are doing the branching in order not to do the qualify in the case of spark, as it does not support it #} + {% if target.type in ['databricks','snowflake','bigquery'] %} + qualify row_number() over (partition by a.event_id order by a.collector_tstamp{% if target.type in ['databricks', 'spark'] -%}, a.etl_tstamp {%- endif %}) = 1 {% endif %} - and {{ snowplow_utils.app_id_filter(var("snowplow__app_id",[])) }} +) -qualify row_number() over (partition by a.event_id order by a.collector_tstamp{% if target.type in ['databricks', 'spark'] -%}, a.etl_tstamp {%- endif %}) = 1 +SELECT * +FROM prep +{% if target.type not in ['databricks','snowflake','bigquery'] %} +WHERE rn = 1 +{% endif %} \ No newline at end of file diff --git a/packages.yml b/packages.yml index 6da1d8c..bf44732 100644 --- a/packages.yml +++ b/packages.yml @@ -1,3 +1,3 @@ packages: - package: snowplow/snowplow_utils - version: [">=0.16.2", "<0.17.0"] + version: [">=0.17.0", "<0.18.0"] diff --git a/utils/snowplow_normalize_model_gen.py b/utils/snowplow_normalize_model_gen.py index fdbd102..a31e9ad 100644 --- a/utils/snowplow_normalize_model_gen.py +++ b/utils/snowplow_normalize_model_gen.py @@ -195,11 +195,11 @@ tags = "snowplow_normalize_incremental", materialized = "incremental", unique_key = "event_id", - upsert_date_key = "collector_tstamp", + upsert_date_key = var("snowplow__partition_tstamp"), partition_by = snowplow_utils.get_value_by_target_type(bigquery_val={{ - "field": "collector_tstamp", + "field": var("snowplow__partition_tstamp"), "data_type": "timestamp" - }}, databricks_val='collector_tstamp_date'), + }}, databricks_val=rename_partition_tstamp_date()), sql_header=snowplow_utils.set_query_tag(var('snowplow__query_tag', 'snowplow_dbt')), tblproperties={{ 'delta.autoOptimize.optimizeWrite' : 'true', @@ -251,11 +251,11 @@ tags = "snowplow_normalize_incremental", materialized = "incremental", unique_key = "unique_id", - upsert_date_key = "collector_tstamp", + upsert_date_key = var("snowplow__partition_tstamp"), partition_by = snowplow_utils.get_value_by_target_type(bigquery_val={{ - "field": "collector_tstamp", + "field": var("snowplow__partition_tstamp"), "data_type": "timestamp" - }}, databricks_val='collector_tstamp_date'), + }}, databricks_val=rename_partition_tstamp_date()), sql_header=snowplow_utils.set_query_tag(var('snowplow__query_tag', 'snowplow_dbt')), tblproperties={{ 'delta.autoOptimize.optimizeWrite' : 'true', @@ -270,9 +270,9 @@ filtered_model_content += f""" select event_id - , collector_tstamp + , {{{{var("snowplow__partition_tstamp")}}}} {{% if target.type in ['databricks', 'spark'] -%}} - , DATE(collector_tstamp) as collector_tstamp_date + , DATE({{{{var("snowplow__partition_tstamp")}}}}) as {{{{var("snowplow__partition_tstamp")}}}}_date {{%- endif %}} , event_name , '{model}' as event_table_name diff --git a/utils/tests/expected/custom_table_name2_1.sql b/utils/tests/expected/custom_table_name2_1.sql index bf3b770..83ffd6c 100644 --- a/utils/tests/expected/custom_table_name2_1.sql +++ b/utils/tests/expected/custom_table_name2_1.sql @@ -2,11 +2,11 @@ tags = "snowplow_normalize_incremental", materialized = "incremental", unique_key = "event_id", - upsert_date_key = "collector_tstamp", + upsert_date_key = var("snowplow__partition_tstamp"), partition_by = snowplow_utils.get_value_by_target_type(bigquery_val={ - "field": "collector_tstamp", + "field": var("snowplow__partition_tstamp"), "data_type": "timestamp" - }, databricks_val='collector_tstamp_date'), + }, databricks_val=rename_partition_tstamp_date()), sql_header=snowplow_utils.set_query_tag(var('snowplow__query_tag', 'snowplow_dbt')), tblproperties={ 'delta.autoOptimize.optimizeWrite' : 'true', diff --git a/utils/tests/expected/custom_table_name3_2.sql b/utils/tests/expected/custom_table_name3_2.sql index b81c68a..c4b3365 100644 --- a/utils/tests/expected/custom_table_name3_2.sql +++ b/utils/tests/expected/custom_table_name3_2.sql @@ -2,11 +2,11 @@ tags = "snowplow_normalize_incremental", materialized = "incremental", unique_key = "event_id", - upsert_date_key = "collector_tstamp", + upsert_date_key = var("snowplow__partition_tstamp"), partition_by = snowplow_utils.get_value_by_target_type(bigquery_val={ - "field": "collector_tstamp", + "field": var("snowplow__partition_tstamp"), "data_type": "timestamp" - }, databricks_val='collector_tstamp_date'), + }, databricks_val=rename_partition_tstamp_date()), sql_header=snowplow_utils.set_query_tag(var('snowplow__query_tag', 'snowplow_dbt')), tblproperties={ 'delta.autoOptimize.optimizeWrite' : 'true', diff --git a/utils/tests/expected/custom_table_name4_1.sql b/utils/tests/expected/custom_table_name4_1.sql index 8d12527..e520cb7 100644 --- a/utils/tests/expected/custom_table_name4_1.sql +++ b/utils/tests/expected/custom_table_name4_1.sql @@ -2,11 +2,11 @@ tags = "snowplow_normalize_incremental", materialized = "incremental", unique_key = "event_id", - upsert_date_key = "collector_tstamp", + upsert_date_key = var("snowplow__partition_tstamp"), partition_by = snowplow_utils.get_value_by_target_type(bigquery_val={ - "field": "collector_tstamp", + "field": var("snowplow__partition_tstamp"), "data_type": "timestamp" - }, databricks_val='collector_tstamp_date'), + }, databricks_val=rename_partition_tstamp_date()), sql_header=snowplow_utils.set_query_tag(var('snowplow__query_tag', 'snowplow_dbt')), tblproperties={ 'delta.autoOptimize.optimizeWrite' : 'true', diff --git a/utils/tests/expected/custom_table_name5_9.sql b/utils/tests/expected/custom_table_name5_9.sql index 66d66d6..6553244 100644 --- a/utils/tests/expected/custom_table_name5_9.sql +++ b/utils/tests/expected/custom_table_name5_9.sql @@ -2,11 +2,11 @@ tags = "snowplow_normalize_incremental", materialized = "incremental", unique_key = "event_id", - upsert_date_key = "collector_tstamp", + upsert_date_key = var("snowplow__partition_tstamp"), partition_by = snowplow_utils.get_value_by_target_type(bigquery_val={ - "field": "collector_tstamp", + "field": var("snowplow__partition_tstamp"), "data_type": "timestamp" - }, databricks_val='collector_tstamp_date'), + }, databricks_val=rename_partition_tstamp_date()), sql_header=snowplow_utils.set_query_tag(var('snowplow__query_tag', 'snowplow_dbt')), tblproperties={ 'delta.autoOptimize.optimizeWrite' : 'true', diff --git a/utils/tests/expected/custom_table_name6_6.sql b/utils/tests/expected/custom_table_name6_6.sql index b1bde78..dafb91b 100644 --- a/utils/tests/expected/custom_table_name6_6.sql +++ b/utils/tests/expected/custom_table_name6_6.sql @@ -2,11 +2,11 @@ tags = "snowplow_normalize_incremental", materialized = "incremental", unique_key = "event_id", - upsert_date_key = "collector_tstamp", + upsert_date_key = var("snowplow__partition_tstamp"), partition_by = snowplow_utils.get_value_by_target_type(bigquery_val={ - "field": "collector_tstamp", + "field": var("snowplow__partition_tstamp"), "data_type": "timestamp" - }, databricks_val='collector_tstamp_date'), + }, databricks_val=rename_partition_tstamp_date()), sql_header=snowplow_utils.set_query_tag(var('snowplow__query_tag', 'snowplow_dbt')), tblproperties={ 'delta.autoOptimize.optimizeWrite' : 'true', diff --git a/utils/tests/expected/custom_table_name7_6.sql b/utils/tests/expected/custom_table_name7_6.sql index 608287f..6003c42 100644 --- a/utils/tests/expected/custom_table_name7_6.sql +++ b/utils/tests/expected/custom_table_name7_6.sql @@ -2,11 +2,11 @@ tags = "snowplow_normalize_incremental", materialized = "incremental", unique_key = "event_id", - upsert_date_key = "collector_tstamp", + upsert_date_key = var("snowplow__partition_tstamp"), partition_by = snowplow_utils.get_value_by_target_type(bigquery_val={ - "field": "collector_tstamp", + "field": var("snowplow__partition_tstamp"), "data_type": "timestamp" - }, databricks_val='collector_tstamp_date'), + }, databricks_val=rename_partition_tstamp_date()), sql_header=snowplow_utils.set_query_tag(var('snowplow__query_tag', 'snowplow_dbt')), tblproperties={ 'delta.autoOptimize.optimizeWrite' : 'true', diff --git a/utils/tests/expected/event_name1_1.sql b/utils/tests/expected/event_name1_1.sql index 1f2f83c..fb6040e 100644 --- a/utils/tests/expected/event_name1_1.sql +++ b/utils/tests/expected/event_name1_1.sql @@ -2,11 +2,11 @@ tags = "snowplow_normalize_incremental", materialized = "incremental", unique_key = "event_id", - upsert_date_key = "collector_tstamp", + upsert_date_key = var("snowplow__partition_tstamp"), partition_by = snowplow_utils.get_value_by_target_type(bigquery_val={ - "field": "collector_tstamp", + "field": var("snowplow__partition_tstamp"), "data_type": "timestamp" - }, databricks_val='collector_tstamp_date'), + }, databricks_val=rename_partition_tstamp_date()), sql_header=snowplow_utils.set_query_tag(var('snowplow__query_tag', 'snowplow_dbt')), tblproperties={ 'delta.autoOptimize.optimizeWrite' : 'true', diff --git a/utils/tests/expected/test_normalized_events.sql b/utils/tests/expected/test_normalized_events.sql index d74e64d..04dd4c4 100644 --- a/utils/tests/expected/test_normalized_events.sql +++ b/utils/tests/expected/test_normalized_events.sql @@ -2,11 +2,11 @@ tags = "snowplow_normalize_incremental", materialized = "incremental", unique_key = "unique_id", - upsert_date_key = "collector_tstamp", + upsert_date_key = var("snowplow__partition_tstamp"), partition_by = snowplow_utils.get_value_by_target_type(bigquery_val={ - "field": "collector_tstamp", + "field": var("snowplow__partition_tstamp"), "data_type": "timestamp" - }, databricks_val='collector_tstamp_date'), + }, databricks_val=rename_partition_tstamp_date()), sql_header=snowplow_utils.set_query_tag(var('snowplow__query_tag', 'snowplow_dbt')), tblproperties={ 'delta.autoOptimize.optimizeWrite' : 'true', @@ -17,9 +17,9 @@ select event_id - , collector_tstamp + , {{var("snowplow__partition_tstamp")}} {% if target.type in ['databricks', 'spark'] -%} - , DATE(collector_tstamp) as collector_tstamp_date + , DATE({{var("snowplow__partition_tstamp")}}) as {{var("snowplow__partition_tstamp")}}_date {%- endif %} , event_name , 'itsaprefix_event_name1_1' as event_table_name @@ -34,9 +34,9 @@ UNION ALL select event_id - , collector_tstamp + , {{var("snowplow__partition_tstamp")}} {% if target.type in ['databricks', 'spark'] -%} - , DATE(collector_tstamp) as collector_tstamp_date + , DATE({{var("snowplow__partition_tstamp")}}) as {{var("snowplow__partition_tstamp")}}_date {%- endif %} , event_name , 'custom_table_name2_1' as event_table_name @@ -51,9 +51,9 @@ UNION ALL select event_id - , collector_tstamp + , {{var("snowplow__partition_tstamp")}} {% if target.type in ['databricks', 'spark'] -%} - , DATE(collector_tstamp) as collector_tstamp_date + , DATE({{var("snowplow__partition_tstamp")}}) as {{var("snowplow__partition_tstamp")}}_date {%- endif %} , event_name , 'custom_table_name3_2' as event_table_name @@ -68,9 +68,9 @@ UNION ALL select event_id - , collector_tstamp + , {{var("snowplow__partition_tstamp")}} {% if target.type in ['databricks', 'spark'] -%} - , DATE(collector_tstamp) as collector_tstamp_date + , DATE({{var("snowplow__partition_tstamp")}}) as {{var("snowplow__partition_tstamp")}}_date {%- endif %} , event_name , 'custom_table_name4_1' as event_table_name @@ -85,9 +85,9 @@ UNION ALL select event_id - , collector_tstamp + , {{var("snowplow__partition_tstamp")}} {% if target.type in ['databricks', 'spark'] -%} - , DATE(collector_tstamp) as collector_tstamp_date + , DATE({{var("snowplow__partition_tstamp")}}) as {{var("snowplow__partition_tstamp")}}_date {%- endif %} , event_name , 'custom_table_name5_9' as event_table_name @@ -102,9 +102,9 @@ UNION ALL select event_id - , collector_tstamp + , {{var("snowplow__partition_tstamp")}} {% if target.type in ['databricks', 'spark'] -%} - , DATE(collector_tstamp) as collector_tstamp_date + , DATE({{var("snowplow__partition_tstamp")}}) as {{var("snowplow__partition_tstamp")}}_date {%- endif %} , event_name , 'custom_table_name6_6' as event_table_name @@ -119,9 +119,9 @@ UNION ALL select event_id - , collector_tstamp + , {{var("snowplow__partition_tstamp")}} {% if target.type in ['databricks', 'spark'] -%} - , DATE(collector_tstamp) as collector_tstamp_date + , DATE({{var("snowplow__partition_tstamp")}}) as {{var("snowplow__partition_tstamp")}}_date {%- endif %} , event_name , 'custom_table_name7_6' as event_table_name