Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release/snowplow-normalize/0.4.0 #47

Merged
merged 4 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 51 additions & 18 deletions .github/workflows/integration_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ on:
branches:
- main
- 'release/**'

concurrency: dbt_integration_tests

env:
Expand Down Expand Up @@ -41,14 +40,13 @@ env:
SNOWFLAKE_TEST_DATABASE: ${{ secrets.SNOWFLAKE_TEST_DATABASE }}
SNOWFLAKE_TEST_WAREHOUSE: ${{ secrets.SNOWFLAKE_TEST_WAREHOUSE }}

# # Postgres Connection
# POSTGRES_TEST_HOST: ${{ secrets.POSTGRES_TEST_HOST }}
# Postgres Connection
# POSTGRES_TEST_USER: ${{ secrets.POSTGRES_TEST_USER }}
# POSTGRES_TEST_PASS: ${{ secrets.POSTGRES_TEST_PASS }}
# POSTGRES_TEST_PORT: ${{ secrets.POSTGRES_TEST_PORT }}
# POSTGRES_TEST_DBNAME: ${{ secrets.POSTGRES_TEST_DBNAME }}
# POSTGRES_TEST_HOST: ${{ secrets.POSTGRES_TEST_HOST }}
# POSTGRES_TEST_PORT: ${{ secrets.POSTGRES_TEST_PORT }}

# Databricks Connection
DATABRICKS_TEST_HOST: ${{ secrets.DATABRICKS_TEST_HOST }}
DATABRICKS_TEST_HTTP_PATH: ${{ secrets.DATABRICKS_TEST_HTTP_PATH }}
DATABRICKS_TEST_TOKEN: ${{ secrets.DATABRICKS_TEST_TOKEN }}
Expand All @@ -63,14 +61,35 @@ jobs:
# Run tests from integration_tests sub dir
working-directory: ./integration_tests
strategy:
fail-fast: false
matrix:
dbt_version: ["1.*"]
warehouse: ["bigquery", "snowflake", "databricks"]
warehouse: ["bigquery", "snowflake", "databricks", "spark_iceberg"]

steps:
- name: Check out
uses: actions/checkout@v3
uses: actions/checkout@v4

- name: Configure Docker credentials
uses: docker/login-action@v2
with:
username: ${{ secrets.DOCKERHUB_SNOWPLOWCI_READ_USERNAME }}
password: ${{ secrets.DOCKERHUB_SNOWPLOWCI_READ_PASSWORD }}
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v1
with:
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
aws-region: eu-west-1
- name: Set warehouse variables
id: set_warehouse
run: |
WAREHOUSE_PLATFORM=$(echo ${{ matrix.warehouse }} | cut -d'_' -f1)
WAREHOUSE_SPECIFIC=$(echo ${{ matrix.warehouse }} | cut -s -d'_' -f2)
echo "WAREHOUSE_PLATFORM=${WAREHOUSE_PLATFORM}" >> $GITHUB_ENV
echo "WAREHOUSE_SPECIFIC=${WAREHOUSE_SPECIFIC}" >> $GITHUB_ENV
echo "warehouse_platform=${WAREHOUSE_PLATFORM}" >> $GITHUB_OUTPUT
echo "warehouse_specific=${WAREHOUSE_SPECIFIC}" >> $GITHUB_OUTPUT
# Remove '*' and replace '.' with '_' in DBT_VERSION & set as SCHEMA_SUFFIX.
# SCHEMA_SUFFIX allows us to run multiple versions of dbt in parallel without overwriting the output tables
- name: Set SCHEMA_SUFFIX env
Expand All @@ -80,7 +99,7 @@ jobs:

- name: Set DEFAULT_TARGET env
run: |
echo "DEFAULT_TARGET=${{ matrix.warehouse }}" >> $GITHUB_ENV
echo "DEFAULT_TARGET=${{matrix.warehouse}}" >> $GITHUB_ENV

- name: Python setup
uses: actions/setup-python@v4
Expand All @@ -91,32 +110,46 @@ jobs:
uses: actions/cache@v3
with:
path: ~/.cache/pip
key: ${{ runner.os }}-pip-${{ matrix.dbt_version }}-${{ matrix.warehouse }}
key: ${{ runner.os }}-pip-${{ matrix.dbt_version }}-${{env.WAREHOUSE_PLATFORM}}
restore-keys: |
${{ runner.os }}-pip-${{ matrix.dbt_version }}-${{ matrix.warehouse }}
${{ runner.os }}-pip-${{ matrix.dbt_version }}-${{env.WAREHOUSE_PLATFORM}}

# Install latest patch version. Upgrade if cache contains old patch version.
- name: Install dependencies
run: |
pip install --upgrade pip wheel setuptools
pip install -Iv dbt-${{ matrix.warehouse }}==${{ matrix.dbt_version }} --upgrade
pip install wheel setuptools
pip install -Iv dbt-${{env.WAREHOUSE_PLATFORM}}==${{ matrix.dbt_version }} --upgrade
dbt deps
if: ${{matrix.warehouse != 'spark'}}
if: ${{env.WAREHOUSE_PLATFORM != 'spark'}}

- name: Install spark dependencies
run: |
pip install --upgrade pip wheel setuptools
pip install -Iv "dbt-${{ matrix.warehouse }}[ODBC]"==${{ matrix.dbt_version }} --upgrade
pip install -Iv "dbt-${{ env.WAREHOUSE_PLATFORM }}[PyHive]"==${{ matrix.dbt_version }} --upgrade
dbt deps
if: ${{matrix.warehouse == 'spark'}}
if: ${{env.WAREHOUSE_PLATFORM == 'spark'}}

- name: Install Docker Compose
run: |
sudo curl -L "https://github.com/docker/compose/releases/download/1.29.2/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose
sudo chmod +x /usr/local/bin/docker-compose


- name: Build and start Spark cluster
working-directory: .github/workflows/spark_deployment
run: |
docker-compose up -d
echo "Waiting for Spark services to start..."
sleep 90
if: ${{env.WAREHOUSE_PLATFORM == 'spark'}}

- name: "Pre-test: Drop ci schemas"
run: |
dbt run-operation post_ci_cleanup --target ${{ matrix.warehouse }}
dbt run-operation post_ci_cleanup --target ${{matrix.warehouse}}

- name: Run tests
run: ./.scripts/integration_tests.sh -d ${{ matrix.warehouse }}
run: ./.scripts/integration_test.sh -d ${{matrix.warehouse}}

- name: "Post-test: Drop ci schemas"
run: |
dbt run-operation post_ci_cleanup --target ${{ matrix.warehouse }}
dbt run-operation post_ci_cleanup --target ${{matrix.warehouse}}
34 changes: 34 additions & 0 deletions .github/workflows/spark_deployment/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
FROM openjdk:11-jre-slim

# Set environment variables
ENV SPARK_VERSION=3.5.1
ENV HADOOP_VERSION=3.3.4
ENV ICEBERG_VERSION=1.4.2
ENV AWS_SDK_VERSION=1.12.581

# Install necessary tools
RUN apt-get update && apt-get install -y curl wget procps rsync ssh

# Download and install Spark
RUN wget https://downloads.apache.org/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop3.tgz && \
tar -xvzf spark-${SPARK_VERSION}-bin-hadoop3.tgz && \
mv spark-${SPARK_VERSION}-bin-hadoop3 /spark && \
rm spark-${SPARK_VERSION}-bin-hadoop3.tgz

# Set Spark environment variables
ENV SPARK_HOME=/spark
ENV PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin

# Download necessary JARs
RUN mkdir -p /spark/jars && \
wget https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/${ICEBERG_VERSION}/iceberg-spark-runtime-3.5_2.12-${ICEBERG_VERSION}.jar -O /spark/jars/iceberg-spark-runtime.jar && \
wget https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-aws-bundle/${ICEBERG_VERSION}/iceberg-aws-bundle-${ICEBERG_VERSION}.jar -O /spark/jars/iceberg-aws-bundle.jar && \
wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/${HADOOP_VERSION}/hadoop-aws-${HADOOP_VERSION}.jar -O /spark/jars/hadoop-aws.jar && \
wget https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/${AWS_SDK_VERSION}/aws-java-sdk-bundle-${AWS_SDK_VERSION}.jar -O /spark/jars/aws-java-sdk-bundle.jar

# Create directory for Spark events
RUN mkdir -p /tmp/spark-events

WORKDIR /spark

CMD ["bash"]
20 changes: 20 additions & 0 deletions .github/workflows/spark_deployment/build_and_push.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#!/bin/bash

# Set variables
DOCKER_HUB_ORG="snowplow"
IMAGE_NAME="spark-s3-iceberg"
TAG="latest"

# Build the image
echo "Building Docker image..."
docker build --platform linux/amd64 -t $DOCKER_HUB_ORG/$IMAGE_NAME:$TAG .

# Log in to Docker Hub
echo "Logging in to Docker Hub..."
docker login

# Push the image to Docker Hub
echo "Pushing image to Docker Hub..."
docker push $DOCKER_HUB_ORG/$IMAGE_NAME:$TAG

echo "Image successfully built and pushed to Docker Hub"
66 changes: 66 additions & 0 deletions .github/workflows/spark_deployment/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
version: '3'

networks:
spark-network:
driver: bridge

services:
spark-master:
image: snowplow/spark-s3-iceberg:latest
command: ["/bin/bash", "-c", "/spark/sbin/start-master.sh -h spark-master --properties-file /spark/conf/spark-defaults.conf && tail -f /spark/logs/spark--org.apache.spark.deploy.master.Master-1-*.out"]
hostname: spark-master
ports:
- '8080:8080'
- '7077:7077'
environment:
- SPARK_LOCAL_IP=spark-master
- SPARK_MASTER_HOST=spark-master
- SPARK_MASTER_PORT=7077
- SPARK_MASTER_OPTS="-Dspark.driver.memory=2g"
- AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID}
- AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY}
- AWS_REGION=eu-west-1
- AWS_DEFAULT_REGION=eu-west-1
volumes:
- ./spark-defaults.conf:/spark/conf/spark-defaults.conf
networks:
- spark-network

spark-worker:
image: snowplow/spark-s3-iceberg:latest
command: ["/bin/bash", "-c", "sleep 10 && /spark/sbin/start-worker.sh spark://spark-master:7077 --properties-file /spark/conf/spark-defaults.conf && tail -f /spark/logs/spark--org.apache.spark.deploy.worker.Worker-*.out"]
depends_on:
- spark-master
environment:
- SPARK_WORKER_CORES=2
- SPARK_WORKER_MEMORY=4G
- SPARK_EXECUTOR_MEMORY=3G
- SPARK_LOCAL_IP=spark-worker
- SPARK_MASTER=spark://spark-master:7077
- AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID}
- AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY}
- AWS_REGION=eu-west-1
- AWS_DEFAULT_REGION=eu-west-1
volumes:
- ./spark-defaults.conf:/spark/conf/spark-defaults.conf
networks:
- spark-network

thrift-server:
image: snowplow/spark-s3-iceberg:latest
command: ["/bin/bash", "-c", "sleep 30 && /spark/sbin/start-thriftserver.sh --master spark://spark-master:7077 --driver-memory 2g --executor-memory 3g --hiveconf hive.server2.thrift.port=10000 --hiveconf hive.server2.thrift.bind.host=0.0.0.0 --conf spark.sql.hive.thriftServer.async=true --conf spark.sql.hive.thriftServer.workerQueue.size=2000 --conf spark.sql.hive.thriftServer.maxWorkerThreads=100 --conf spark.sql.hive.thriftServer.minWorkerThreads=50 && tail -f /spark/logs/spark--org.apache.spark.sql.hive.thriftserver.HiveThriftServer2-*.out"]
ports:
- '10000:10000'
depends_on:
- spark-master
- spark-worker
environment:
- SPARK_LOCAL_IP=thrift-server
- AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID}
- AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY}
- AWS_REGION=eu-west-1
- AWS_DEFAULT_REGION=eu-west-1
volumes:
- ./spark-defaults.conf:/spark/conf/spark-defaults.conf
networks:
- spark-network
44 changes: 44 additions & 0 deletions .github/workflows/spark_deployment/spark-defaults.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
spark.master spark://spark-master:7077

spark.sql.warehouse.dir s3a://dbt-spark-iceberg/github-integration-testing
spark.sql.catalog.glue org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.glue.catalog-impl org.apache.iceberg.aws.glue.GlueCatalog
spark.sql.catalog.glue.warehouse s3a://dbt-spark-iceberg/github-integration-testing
spark.sql.catalog.glue.io-impl org.apache.iceberg.aws.s3.S3FileIO
spark.sql.defaultCatalog glue
spark.sql.catalog.glue.database dbt-spark-iceberg

spark.hadoop.fs.s3a.impl org.apache.hadoop.fs.s3a.S3AFileSystem
spark.hadoop.fs.s3a.access.key <AWS_ACCESS_KEY_ID>
spark.hadoop.fs.s3a.secret.key <AWS_SECRET_ACCESS_KEY>
spark.hadoop.fs.s3a.endpoint s3.eu-west-1.amazonaws.com
spark.hadoop.fs.s3a.path.style.access true
spark.hadoop.fs.s3a.region eu-west-1
spark.hadoop.fs.s3a.aws.region eu-west-1

# Enabling AWS SDK V4 signing (required for regions launched after January 2014)
spark.hadoop.com.amazonaws.services.s3.enableV4 true
spark.hadoop.fs.s3a.aws.credentials.provider org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider

# Hive Metastore Configuration (using AWS Glue)
spark.hadoop.hive.metastore.client.factory.class com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory

# Thrift Server Configuration for better performance in concurrent environments
spark.sql.hive.thriftServer.singleSession false
spark.sql.hive.thriftServer.async true
# spark.sql.hive.thriftServer.maxWorkerThreads 100
# spark.sql.hive.thriftServer.minWorkerThreads 50
# spark.sql.hive.thriftServer.workerQueue.size 2000

# Memory and Performance Tuning
# spark.driver.memory 2g
# spark.executor.memory 3g
# spark.worker.memory 4g
spark.network.timeout 600s
spark.sql.broadcastTimeout 600s
spark.sql.adaptive.enabled true
spark.serializer org.apache.spark.serializer.KryoSerializer

# Logging and Debugging
spark.eventLog.enabled true
spark.eventLog.dir /tmp/spark-events
21 changes: 21 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,3 +1,24 @@
Snowplow Normalize 0.4.0 (2024-12-04)
---------------------------------------
## Summary
This version adds new features including Apache Spark with Iceberg support, flexible timestamp partitioning, and compatibility with the new BigQuery loader naming conventions.

## Features
- Added support for Apache Spark with Iceberg
- Introduced configurable partition timestamp column name for derived tables
- Added compatibility with new BigQuery loader naming conventions

## Upgrading
To upgrade simply bump the snowplow-normalize version in your `packages.yml` file.

If you want to start using the new partition timestamp feature, you can configure the column name in your project configuration:
```yaml
vars:
snowplow__partition_tstamp: your_custom_column_name
```

You have to be sure that the field you add is included in the table, and you do a full refresh.

Snowplow Normalize 0.3.5 (2024-03-18)
---------------------------------------
## Summary
Expand Down
5 changes: 3 additions & 2 deletions dbt_project.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@

name: 'snowplow_normalize'
version: '0.3.5'
version: '0.4.0'
config-version: 2

require-dbt-version: [">=1.4.0", "<2.0.0"]
Expand Down Expand Up @@ -41,6 +41,7 @@ vars:
snowplow__dev_target_name: 'dev'
snowplow__allow_refresh: false
snowplow__session_timestamp: 'collector_tstamp'
snowplow__partition_tstamp: 'collector_tstamp' # This is the column that will be used to partition the data in the derived tables, it should be a timestamp column that is present in the data
# Variables - Databricks Only
# Add the following variable to your dbt project's dbt_project.yml file
# Depending on the use case it should either be the catalog (for Unity Catalog users from databricks connector 1.1.1 onwards) or the same value as your snowplow__atomic_schema (unless changed it should be 'atomic')
Expand All @@ -63,7 +64,7 @@ on-run-end:
models:
snowplow_normalize:
+materialized: table
+file_format: delta
+file_format: "{{ 'delta' if target.type not in ['spark'] else 'iceberg'}}"
+bind: false
base:
manifest:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ do
esac
done

declare -a SUPPORTED_DATABASES=("bigquery" "databricks" "snowflake")
declare -a SUPPORTED_DATABASES=("bigquery" "databricks" "snowflake", "spark_iceberg")

# set to lower case
DATABASE="$(echo $DATABASE | tr '[:upper:]' '[:lower:]')"
Expand All @@ -23,13 +23,11 @@ fi

for db in ${DATABASES[@]}; do

if [ $db == 'bigquery' ]; then
echo "Snowplow web integration tests: Seeding data and doing first run"

eval "dbt seed --target $db --full-refresh" || exit 1;

eval "dbt run --target $db --full-refresh" || exit 1;
if [[ "$db" == "bigquery" ]]; then
echo "Snowplow integration tests: Seeding data and doing first run"

eval "dbt seed --target $db --full-refresh" || exit 1
eval "dbt run --target $db --full-refresh" || exit 1
fi

echo "Snowplow normalize integration tests: snakeify case"
Expand Down
Loading
Loading