From 67bf14881c203777b78a19cff70466cce961a52d Mon Sep 17 00:00:00 2001 From: Ilias Xenogiannis Date: Mon, 25 Nov 2024 13:38:45 +0200 Subject: [PATCH 1/8] Update normalize_events.sql --- macros/normalize_events.sql | 29 ++++++++++++++++++++--------- 1 file changed, 20 insertions(+), 9 deletions(-) diff --git a/macros/normalize_events.sql b/macros/normalize_events.sql index ee0f784..5f1c730 100644 --- a/macros/normalize_events.sql +++ b/macros/normalize_events.sql @@ -65,15 +65,26 @@ where {% macro bigquery__normalize_events(event_names, flat_cols = [], sde_cols = [], sde_keys = [], sde_types = [], sde_aliases = [], context_cols = [], context_keys = [], context_types = [], context_aliases = [], remove_new_event_check = false) %} -{# Remove down to major version for bigquery combine columns macro, drop 2 last _X values #} -{%- set sde_cols_clean = [] -%} -{%- for ind in range(sde_cols|length) -%} - {% do sde_cols_clean.append('_'.join(sde_cols[ind].split('_')[:-2])) -%} -{%- endfor -%} -{%- set context_cols_clean = [] -%} -{%- for ind in range(context_cols|length) -%} - {% do context_cols_clean.append('_'.join(context_cols[ind].split('_')[:-2])) -%} -{%- endfor -%} +{# Handle both versioned and unversioned column names #} +{# Handle both versioned and unversioned column names #} + {%- set version_pattern = '_(([0-9]+_)?[0-9]+)$' -%} + {%- set sde_cols_clean = [] -%} + {%- for col in sde_cols -%} + {%- if col is regex_match(version_pattern) -%} + {% do sde_cols_clean.append(col|regex_replace(version_pattern, '')) -%} + {%- else -%} + {% do sde_cols_clean.append(col) -%} + {%- endif -%} + {%- endfor -%} + + {%- set context_cols_clean = [] -%} + {%- for col in context_cols -%} + {%- if col is regex_match(version_pattern) -%} + {% do context_cols_clean.append(col|regex_replace(version_pattern, '')) -%} + {%- else -%} + {% do context_cols_clean.append(col) -%} + {%- endif -%} + {%- endfor -%} {# Replace keys with snake_case where needed #} {%- set sde_keys_clean = [] -%} From f27e9b5ca053032607a1ed38ff4351602caba571 Mon Sep 17 00:00:00 2001 From: Ilias Xenogiannis Date: Mon, 25 Nov 2024 15:40:59 +0200 Subject: [PATCH 2/8] Add testing and use regex --- .../source/snowplow_norm_dummy_events.csv | 4 +-- integration_tests/dbt_project.yml | 1 + .../macros/test_normalize_events.sql | 2 ++ .../bigquery/snowplow_normalize_stg.sql | 13 +++++--- macros/normalize_events.sql | 32 ++++++++----------- 5 files changed, 27 insertions(+), 25 deletions(-) diff --git a/integration_tests/data/source/snowplow_norm_dummy_events.csv b/integration_tests/data/source/snowplow_norm_dummy_events.csv index b80f0be..c911db1 100644 --- a/integration_tests/data/source/snowplow_norm_dummy_events.csv +++ b/integration_tests/data/source/snowplow_norm_dummy_events.csv @@ -1,2 +1,2 @@ -event_name,event_id,app_id,collector_tstamp,dvce_sent_tstamp,dvce_created_tstamp,unstruct_event_test_1_0_0,unstruct_event_test_1_0_1,unstruct_event_test2_1_0_0,unstruct_event_test2_1_0_1,contexts_test_1_0_0,contexts_test2_1_0_0,contexts_test2_1_0_1,contexts_test2_1_0_2,contexts_test2_1_0_3,contexts_test2_1_0_4,contexts_test2_1_0_5 -'demo','demo','demo',2022-10-01 01:27:34,2022-10-01 01:27:34,2022-10-01 01:27:34,"{""test_id"":""demo"", ""test_class"":""demo""}","{""test_id"":""demo"", ""test_class"":""demo""}","{""test_word"":""demo"", ""test_idea"":""demo""}","{""test_word"":""demo"", ""test_idea"":""demo""}","[{""context_test_id"":""demo"", ""context_test_class"":""demo""}]","[{""context_test_id2"":""demo"", ""context_test_class2"":""demo""}]","[{""context_test_id2"":""demo"", ""context_test_class2"":""demo""}]","[{""context_test_id2"":""demo"", ""context_test_class2"":""demo""}]","[{""context_test_id2"":""demo"", ""context_test_class2"":""demo""}]","[{""context_test_id2"":""demo"", ""context_test_class2"":""demo""}]","[{""context_test_id2"":""demo"", ""context_test_class2"":""demo""}]" +event_name,event_id,app_id,collector_tstamp,dvce_sent_tstamp,dvce_created_tstamp,unstruct_event_test_1_0_0,unstruct_event_test_1_0_1,unstruct_event_test2_1_0_0,unstruct_event_test2_1_0_1,contexts_test_1_0_0,contexts_test2_1_0_0,contexts_test2_1_0_1,contexts_test2_1_0_2,contexts_test2_1_0_3,contexts_test2_1_0_4,contexts_test2_1_0_5,contexts_test4_1 +'demo','demo','demo',2022-10-01 01:27:34,2022-10-01 01:27:34,2022-10-01 01:27:34,"{""test_id"":""demo"", ""test_class"":""demo""}","{""test_id"":""demo"", ""test_class"":""demo""}","{""test_word"":""demo"", ""test_idea"":""demo""}","{""test_word"":""demo"", ""test_idea"":""demo""}","[{""context_test_id"":""demo"", ""context_test_class"":""demo""}]","[{""context_test_id2"":""demo"", ""context_test_class2"":""demo""}]","[{""context_test_id2"":""demo"", ""context_test_class2"":""demo""}]","[{""context_test_id2"":""demo"", ""context_test_class2"":""demo""}]","[{""context_test_id2"":""demo"", ""context_test_class2"":""demo""}]","[{""context_test_id2"":""demo"", ""context_test_class2"":""demo""}]","[{""context_test_id2"":""demo"", ""context_test_class2"":""demo""}]","[{""context_test_id2"":""demo"", ""context_test_class2"":""demo""}]" diff --git a/integration_tests/dbt_project.yml b/integration_tests/dbt_project.yml index 3663550..2966a2e 100644 --- a/integration_tests/dbt_project.yml +++ b/integration_tests/dbt_project.yml @@ -68,3 +68,4 @@ seeds: contexts_test2_1_0_3: "{{ 'string' if target.type in ['bigquery', 'databricks', 'spark'] else 'varchar' }}" contexts_test2_1_0_4: "{{ 'string' if target.type in ['bigquery', 'databricks', 'spark'] else 'varchar' }}" contexts_test2_1_0_5: "{{ 'string' if target.type in ['bigquery', 'databricks', 'spark'] else 'varchar' }}" + contexts_test4_1: "{{ 'string' if target.type in ['bigquery', 'databricks', 'spark'] else 'varchar' }}" diff --git a/integration_tests/macros/test_normalize_events.sql b/integration_tests/macros/test_normalize_events.sql index b063d74..be9f28a 100644 --- a/integration_tests/macros/test_normalize_events.sql +++ b/integration_tests/macros/test_normalize_events.sql @@ -38,6 +38,7 @@ It runs 9 tests: "sde_plus_2_context" : "select event_id , collector_tstamp -- Flat columns from event table , app_id -- self describing events columns from event table , coalesce(unstruct_event_test_1_0_1.test_class, unstruct_event_test_1_0_0.test_class) as test_class , coalesce(unstruct_event_test_1_0_1.test_id, unstruct_event_test_1_0_0.test_id) as test_id -- context column(s) from the event table , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_id) as context_test_id , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_class) as context_test_class , coalesce(contexts_test2_1_0_5[safe_offset(0)].context_test_class2, contexts_test2_1_0_4[safe_offset(0)].context_test_class2, contexts_test2_1_0_3[safe_offset(0)].context_test_class2, contexts_test2_1_0_2[safe_offset(0)].context_test_class2, contexts_test2_1_0_1[safe_offset(0)].context_test_class2, contexts_test2_1_0_0[safe_offset(0)].context_test_class2) as context_test_class2 , coalesce(contexts_test2_1_0_5[safe_offset(0)].context_test_id2, contexts_test2_1_0_4[safe_offset(0)].context_test_id2, contexts_test2_1_0_3[safe_offset(0)].context_test_id2, contexts_test2_1_0_2[safe_offset(0)].context_test_id2, contexts_test2_1_0_1[safe_offset(0)].context_test_id2, contexts_test2_1_0_0[safe_offset(0)].context_test_id2) as context_test_id2 from `"~target.project~"`."~target.dataset~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')", "sde_plus_2_context_w_alias" : "select event_id , collector_tstamp -- Flat columns from event table , app_id -- self describing events columns from event table , coalesce(unstruct_event_test_1_0_1.test_class, unstruct_event_test_1_0_0.test_class) as test_class , coalesce(unstruct_event_test_1_0_1.test_id, unstruct_event_test_1_0_0.test_id) as test_id -- context column(s) from the event table , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_id) as test1_context_test_id , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_class) as test1_context_test_class , coalesce(contexts_test2_1_0_5[safe_offset(0)].context_test_class2, contexts_test2_1_0_4[safe_offset(0)].context_test_class2, contexts_test2_1_0_3[safe_offset(0)].context_test_class2, contexts_test2_1_0_2[safe_offset(0)].context_test_class2, contexts_test2_1_0_1[safe_offset(0)].context_test_class2, contexts_test2_1_0_0[safe_offset(0)].context_test_class2) as test2_context_test_class2 , coalesce(contexts_test2_1_0_5[safe_offset(0)].context_test_id2, contexts_test2_1_0_4[safe_offset(0)].context_test_id2, contexts_test2_1_0_3[safe_offset(0)].context_test_id2, contexts_test2_1_0_2[safe_offset(0)].context_test_id2, contexts_test2_1_0_1[safe_offset(0)].context_test_id2, contexts_test2_1_0_0[safe_offset(0)].context_test_id2) as test2_context_test_id2 from `"~target.project~"`."~target.dataset~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')", "context_only" : "select event_id , collector_tstamp -- Flat columns from event table , app_id -- self describing events columns from event table -- context column(s) from the event table , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_id) as context_test_id , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_class) as context_test_class , coalesce(contexts_test2_1_0_5[safe_offset(0)].context_test_class2, contexts_test2_1_0_4[safe_offset(0)].context_test_class2, contexts_test2_1_0_3[safe_offset(0)].context_test_class2, contexts_test2_1_0_2[safe_offset(0)].context_test_class2, contexts_test2_1_0_1[safe_offset(0)].context_test_class2, contexts_test2_1_0_0[safe_offset(0)].context_test_class2) as context_test_class2 , coalesce(contexts_test2_1_0_5[safe_offset(0)].context_test_id2, contexts_test2_1_0_4[safe_offset(0)].context_test_id2, contexts_test2_1_0_3[safe_offset(0)].context_test_id2, contexts_test2_1_0_2[safe_offset(0)].context_test_id2, contexts_test2_1_0_1[safe_offset(0)].context_test_id2, contexts_test2_1_0_0[safe_offset(0)].context_test_id2) as context_test_id2 from `"~target.project~"`."~target.dataset~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')", + "context_only_new_loader" : "select event_id , collector_tstamp -- Flat columns from event table , app_id -- self describing events columns from event table -- context column(s) from the event table , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_id) as context_test_id , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_class) as context_test_class , coalesce(contexts_test4_1[safe_offset(0)].context_test_class2) as context_test_class2 , coalesce(contexts_test4_1[safe_offset(0)].context_test_id2) as context_test_id2 from `"~target.project~"`."~target.dataset~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')", "multiple_base_events" : "select event_id , collector_tstamp -- Flat columns from event table , app_id -- self describing events columns from event table -- context column(s) from the event table , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_id) as context_test_id , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_class) as context_test_class , coalesce(contexts_test2_1_0_5[safe_offset(0)].context_test_class2, contexts_test2_1_0_4[safe_offset(0)].context_test_class2, contexts_test2_1_0_3[safe_offset(0)].context_test_class2, contexts_test2_1_0_2[safe_offset(0)].context_test_class2, contexts_test2_1_0_1[safe_offset(0)].context_test_class2, contexts_test2_1_0_0[safe_offset(0)].context_test_class2) as context_test_class2 , coalesce(contexts_test2_1_0_5[safe_offset(0)].context_test_id2, contexts_test2_1_0_4[safe_offset(0)].context_test_id2, contexts_test2_1_0_3[safe_offset(0)].context_test_id2, contexts_test2_1_0_2[safe_offset(0)].context_test_id2, contexts_test2_1_0_1[safe_offset(0)].context_test_id2, contexts_test2_1_0_0[safe_offset(0)].context_test_id2) as context_test_id2 from `"~target.project~"`."~target.dataset~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name','page_ping')", "multiple_sde_events" : "select event_id , collector_tstamp -- Flat columns from event table , app_id -- self describing events columns from event table , coalesce(unstruct_event_test_1_0_1.test_class, unstruct_event_test_1_0_0.test_class) as test1_test_class , coalesce(unstruct_event_test_1_0_1.test_id, unstruct_event_test_1_0_0.test_id) as test1_test_id , coalesce(unstruct_event_test2_1_0_1.test_word, unstruct_event_test2_1_0_0.test_word) as test2_test_word , coalesce(unstruct_event_test2_1_0_1.test_idea, unstruct_event_test2_1_0_0.test_idea) as test2_test_idea -- context column(s) from the event table , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_id) as context_test_id , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_class) as context_test_class , coalesce(contexts_test2_1_0_5[safe_offset(0)].context_test_class2, contexts_test2_1_0_4[safe_offset(0)].context_test_class2, contexts_test2_1_0_3[safe_offset(0)].context_test_class2, contexts_test2_1_0_2[safe_offset(0)].context_test_class2, contexts_test2_1_0_1[safe_offset(0)].context_test_class2, contexts_test2_1_0_0[safe_offset(0)].context_test_class2) as context_test_class2 , coalesce(contexts_test2_1_0_5[safe_offset(0)].context_test_id2, contexts_test2_1_0_4[safe_offset(0)].context_test_id2, contexts_test2_1_0_3[safe_offset(0)].context_test_id2, contexts_test2_1_0_2[safe_offset(0)].context_test_id2, contexts_test2_1_0_1[safe_offset(0)].context_test_id2, contexts_test2_1_0_0[safe_offset(0)].context_test_id2) as context_test_id2 from `"~target.project~"`."~target.dataset~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')" } %} @@ -52,6 +53,7 @@ It runs 9 tests: "sde_plus_2_context" : snowplow_normalize.normalize_events(['event_name'], ['app_id'], ['UNSTRUCT_EVENT_TEST_1_0_1'], [['testId', 'testClass']], [['string', 'boolean']], [], ['CONTEXTS_TEST_1_0_0', 'CONTEXTS_TEST2_1_0_5'], [['contextTestId', 'contextTestClass'], ['contextTestId2', 'contextTestClass2']], [['boolean', 'string'], ['interger', 'string']], [], true).split()|join(' '), "sde_plus_2_context_w_alias" : snowplow_normalize.normalize_events(['event_name'], ['app_id'], ['UNSTRUCT_EVENT_TEST_1_0_1'], [['testId', 'testClass']], [['string', 'boolean']], [], ['CONTEXTS_TEST_1_0_0', 'CONTEXTS_TEST2_1_0_5'], [['contextTestId', 'contextTestClass'],['contextTestId2', 'contextTestClass2'] ], [['boolean', 'string'], ['interger', 'string']], ['test1', 'test2'], true).split()|join(' '), "context_only" : snowplow_normalize.normalize_events(['event_name'], ['app_id'], [], [], [], [], ['CONTEXTS_TEST_1_0_0', 'CONTEXTS_TEST2_1_0_5'], [['contextTestId', 'contextTestClass'],['contextTestId2', 'contextTestClass2'] ], [['boolean', 'string'], ['interger', 'string']], [], true).split()|join(' '), + "context_only_new_loader" : snowplow_normalize.normalize_events(['event_name'], ['app_id'], [], [], [], [], ['CONTEXTS_TEST_1_0_0', 'CONTEXTS_TEST4_1'], [['contextTestId', 'contextTestClass'],['contextTestId2', 'contextTestClass2'] ], [['boolean', 'string'], ['interger', 'string']], [], true).split()|join(' '), "multiple_base_events" : snowplow_normalize.normalize_events(['event_name', 'page_ping'], ['app_id'], [], [], [], [], ['CONTEXTS_TEST_1_0_0', 'CONTEXTS_TEST2_1_0_5'], [['contextTestId', 'contextTestClass'],['contextTestId2', 'contextTestClass2'] ], [['boolean', 'string'], ['interger', 'string']], [], true).split()|join(' '), "multiple_sde_events" : snowplow_normalize.normalize_events(['event_name'], ['app_id'], ['UNSTRUCT_EVENT_TEST_1_0_1', 'UNSTRUCT_EVENT_TEST2_1_0_1'], [['testId', 'testClass'], ['testWord', 'testIdea']], [['number', 'string']], ['test1', 'test2'], ['CONTEXTS_TEST_1_0_0', 'CONTEXTS_TEST2_1_0_5'], [['contextTestId', 'contextTestClass'],['contextTestId2', 'contextTestClass2'] ], [['boolean', 'string'], ['interger', 'string']], [], true).split()|join(' ') } diff --git a/integration_tests/models/dummy_model/bigquery/snowplow_normalize_stg.sql b/integration_tests/models/dummy_model/bigquery/snowplow_normalize_stg.sql index 1f79d0c..cabe2f8 100644 --- a/integration_tests/models/dummy_model/bigquery/snowplow_normalize_stg.sql +++ b/integration_tests/models/dummy_model/bigquery/snowplow_normalize_stg.sql @@ -8,21 +8,22 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0 with prep as ( select * - except(contexts_test_1_0_0,contexts_test2_1_0_0,contexts_test2_1_0_1,contexts_test2_1_0_2,contexts_test2_1_0_3,contexts_test2_1_0_4,contexts_test2_1_0_5), + except(contexts_test_1_0_0,contexts_test2_1_0_0,contexts_test2_1_0_1,contexts_test2_1_0_2,contexts_test2_1_0_3,contexts_test2_1_0_4,contexts_test2_1_0_5, contexts_test4_1), JSON_EXTRACT_ARRAY(contexts_test_1_0_0) AS contexts_test_1_0_0, JSON_EXTRACT_ARRAY(contexts_test2_1_0_0) AS contexts_test2_1_0_0, JSON_EXTRACT_ARRAY(contexts_test2_1_0_1) AS contexts_test2_1_0_1, JSON_EXTRACT_ARRAY(contexts_test2_1_0_2) AS contexts_test2_1_0_2, JSON_EXTRACT_ARRAY(contexts_test2_1_0_3) AS contexts_test2_1_0_3, JSON_EXTRACT_ARRAY(contexts_test2_1_0_4) AS contexts_test2_1_0_4, - JSON_EXTRACT_ARRAY(contexts_test2_1_0_5) AS contexts_test2_1_0_5 + JSON_EXTRACT_ARRAY(contexts_test2_1_0_5) AS contexts_test2_1_0_5, + JSON_EXTRACT_ARRAY(contexts_test4_1) AS contexts_test4_1 from {{ ref('snowplow_norm_dummy_events') }} ) -- recreate repeated record field i.e. array of structs as is originally in BQ events table select - * except(unstruct_event_test_1_0_0,unstruct_event_test_1_0_1,unstruct_event_test2_1_0_0,unstruct_event_test2_1_0_1,contexts_test_1_0_0,contexts_test2_1_0_0,contexts_test2_1_0_1,contexts_test2_1_0_2,contexts_test2_1_0_3,contexts_test2_1_0_4,contexts_test2_1_0_5), + * except(unstruct_event_test_1_0_0,unstruct_event_test_1_0_1,unstruct_event_test2_1_0_0,unstruct_event_test2_1_0_1,contexts_test_1_0_0,contexts_test2_1_0_0,contexts_test2_1_0_1,contexts_test2_1_0_2,contexts_test2_1_0_3,contexts_test2_1_0_4,contexts_test2_1_0_5,contexts_test4_1), -- order is reversed to test the aliasing of the coalesced columns struct(JSON_EXTRACT_scalar(unstruct_event_test_1_0_0, '$.test_class') as test_class,JSON_EXTRACT_scalar(unstruct_event_test_1_0_0, '$.test_id') as test_id) as unstruct_event_test_1_0_0, struct(JSON_EXTRACT_scalar(unstruct_event_test_1_0_1, '$.test_class') as test_class,JSON_EXTRACT_scalar(unstruct_event_test_1_0_1, '$.test_id') as test_id) as unstruct_event_test_1_0_1, @@ -56,7 +57,11 @@ select array( select struct(JSON_EXTRACT_scalar(json_array,'$.context_test_class2') as context_test_class2, JSON_EXTRACT_scalar(json_array,'$.context_test_id2') as context_test_id2) from unnest(contexts_test2_1_0_5) as json_array - ) as contexts_test2_1_0_5 + ) as contexts_test2_1_0_5, + array( + select struct(JSON_EXTRACT_scalar(json_array,'$.context_test_class2') as context_test_class2, JSON_EXTRACT_scalar(json_array,'$.context_test_id2') as context_test_id2) + from unnest(contexts_test4_1) as json_array + ) as contexts_test4_1 from prep diff --git a/macros/normalize_events.sql b/macros/normalize_events.sql index 5f1c730..28a786d 100644 --- a/macros/normalize_events.sql +++ b/macros/normalize_events.sql @@ -65,27 +65,22 @@ where {% macro bigquery__normalize_events(event_names, flat_cols = [], sde_cols = [], sde_keys = [], sde_types = [], sde_aliases = [], context_cols = [], context_keys = [], context_types = [], context_aliases = [], remove_new_event_check = false) %} -{# Handle both versioned and unversioned column names #} -{# Handle both versioned and unversioned column names #} - {%- set version_pattern = '_(([0-9]+_)?[0-9]+)$' -%} + {# Handle both versioned and unversioned column names #} + {%- set re = modules.re -%} + {%- set version_pattern = '_[0-9]+(_[0-9]+)?$' -%} + {%- set sde_cols_clean = [] -%} {%- for col in sde_cols -%} - {%- if col is regex_match(version_pattern) -%} - {% do sde_cols_clean.append(col|regex_replace(version_pattern, '')) -%} - {%- else -%} - {% do sde_cols_clean.append(col) -%} - {%- endif -%} + {# Get the base name for combine_column_versions to work with #} + {%- set clean_name = re.sub(version_pattern, '', col) -%} + {% do sde_cols_clean.append(clean_name) -%} {%- endfor -%} {%- set context_cols_clean = [] -%} {%- for col in context_cols -%} - {%- if col is regex_match(version_pattern) -%} - {% do context_cols_clean.append(col|regex_replace(version_pattern, '')) -%} - {%- else -%} - {% do context_cols_clean.append(col) -%} - {%- endif -%} + {%- set clean_name = re.sub(version_pattern, '', col) -%} + {% do context_cols_clean.append(clean_name) -%} {%- endfor -%} - {# Replace keys with snake_case where needed #} {%- set sde_keys_clean = [] -%} {%- set context_keys_clean = [] -%} @@ -97,7 +92,6 @@ where {%- endfor -%} {% do sde_keys_clean.append(sde_key_clean) -%} {%- endfor -%} - {%- for ind1 in range(context_keys|length) -%} {%- set context_key_clean = [] -%} {%- for ind2 in range(context_keys[ind1]|length) -%} @@ -130,10 +124,10 @@ select {%- set required_aliases = sde_keys_clean[col_ind] -%} {%- endif -%} {%- set sde_col_list = snowplow_utils.combine_column_versions( - relation=ref('snowplow_normalize_base_events_this_run'), - column_prefix=col.lower(), - required_fields = zip(sde_keys_clean[col_ind], required_aliases) - ) -%} + relation=ref('snowplow_normalize_base_events_this_run'), + column_prefix=col.lower(), + required_fields = zip(sde_keys_clean[col_ind], required_aliases) + ) -%} {%- for field, key_ind in zip(sde_col_list, range(sde_col_list|length)) -%} {# Loop over each key within the column, appling the bespoke alias as needed #} , {{field}} {% endfor -%} From e7aedc2014d84665ac9df5f5ec0ac910de15e1a4 Mon Sep 17 00:00:00 2001 From: Ilias Xenogiannis Date: Wed, 27 Nov 2024 12:38:00 +0200 Subject: [PATCH 3/8] Updates --- .../source/snowplow_norm_dummy_events.csv | 4 +-- .../macros/test_normalize_events.sql | 4 +-- .../bigquery/snowplow_normalize_stg.sql | 15 +++++++---- macros/normalize_events.sql | 26 ++++++++++++++++--- 4 files changed, 37 insertions(+), 12 deletions(-) diff --git a/integration_tests/data/source/snowplow_norm_dummy_events.csv b/integration_tests/data/source/snowplow_norm_dummy_events.csv index c911db1..3840de2 100644 --- a/integration_tests/data/source/snowplow_norm_dummy_events.csv +++ b/integration_tests/data/source/snowplow_norm_dummy_events.csv @@ -1,2 +1,2 @@ -event_name,event_id,app_id,collector_tstamp,dvce_sent_tstamp,dvce_created_tstamp,unstruct_event_test_1_0_0,unstruct_event_test_1_0_1,unstruct_event_test2_1_0_0,unstruct_event_test2_1_0_1,contexts_test_1_0_0,contexts_test2_1_0_0,contexts_test2_1_0_1,contexts_test2_1_0_2,contexts_test2_1_0_3,contexts_test2_1_0_4,contexts_test2_1_0_5,contexts_test4_1 -'demo','demo','demo',2022-10-01 01:27:34,2022-10-01 01:27:34,2022-10-01 01:27:34,"{""test_id"":""demo"", ""test_class"":""demo""}","{""test_id"":""demo"", ""test_class"":""demo""}","{""test_word"":""demo"", ""test_idea"":""demo""}","{""test_word"":""demo"", ""test_idea"":""demo""}","[{""context_test_id"":""demo"", ""context_test_class"":""demo""}]","[{""context_test_id2"":""demo"", ""context_test_class2"":""demo""}]","[{""context_test_id2"":""demo"", ""context_test_class2"":""demo""}]","[{""context_test_id2"":""demo"", ""context_test_class2"":""demo""}]","[{""context_test_id2"":""demo"", ""context_test_class2"":""demo""}]","[{""context_test_id2"":""demo"", ""context_test_class2"":""demo""}]","[{""context_test_id2"":""demo"", ""context_test_class2"":""demo""}]","[{""context_test_id2"":""demo"", ""context_test_class2"":""demo""}]" +event_name,event_id,app_id,collector_tstamp,dvce_sent_tstamp,dvce_created_tstamp,unstruct_event_test_1_0_0,unstruct_event_test_1_0_1,unstruct_event_test2_1_0_0,unstruct_event_test2_1_0_1,contexts_test_1_0_0,contexts_test2_1_0_0,contexts_test2_1_0_1,contexts_test2_1_0_2,contexts_test2_1_0_3,contexts_test2_1_0_4,contexts_test2_1_0_5,contexts_test4_1,contexts_test4_2 +'demo','demo','demo',2022-10-01 01:27:34,2022-10-01 01:27:34,2022-10-01 01:27:34,"{""test_id"":""demo"", ""test_class"":""demo""}","{""test_id"":""demo"", ""test_class"":""demo""}","{""test_word"":""demo"", ""test_idea"":""demo""}","{""test_word"":""demo"", ""test_idea"":""demo""}","[{""context_test_id"":""demo"", ""context_test_class"":""demo""}]","[{""context_test_id2"":""demo"", ""context_test_class2"":""demo""}]","[{""context_test_id2"":""demo"", ""context_test_class2"":""demo""}]","[{""context_test_id2"":""demo"", ""context_test_class2"":""demo""}]","[{""context_test_id2"":""demo"", ""context_test_class2"":""demo""}]","[{""context_test_id2"":""demo"", ""context_test_class2"":""demo""}]","[{""context_test_id2"":""demo"", ""context_test_class2"":""demo""}]","[{""context_test_id2"":""demo"", ""context_test_class2"":""demo""}]","[{""context_test_id2"":""demo"", ""context_test_class2"":""demo""}]" diff --git a/integration_tests/macros/test_normalize_events.sql b/integration_tests/macros/test_normalize_events.sql index be9f28a..bec29f3 100644 --- a/integration_tests/macros/test_normalize_events.sql +++ b/integration_tests/macros/test_normalize_events.sql @@ -31,7 +31,6 @@ It runs 9 tests: {% macro bigquery__test_normalize_events() %} {% set expected_dict = { - "flat_cols_only" : "select event_id , collector_tstamp -- Flat columns from event table , app_id -- self describing events columns from event table -- context column(s) from the event table from `"~target.project~"`."~target.dataset~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')", "sde_plus_cols" : "select event_id , collector_tstamp -- Flat columns from event table , app_id -- self describing events columns from event table , coalesce(unstruct_event_test_1_0_1.test_class, unstruct_event_test_1_0_0.test_class) as test_class , coalesce(unstruct_event_test_1_0_1.test_id, unstruct_event_test_1_0_0.test_id) as test_id -- context column(s) from the event table from `"~target.project~"`."~target.dataset~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')", "sde_plus_cols_w_alias" : "select event_id , collector_tstamp -- Flat columns from event table , app_id -- self describing events columns from event table , coalesce(unstruct_event_test_1_0_1.test_class, unstruct_event_test_1_0_0.test_class) as my_alias_test_class , coalesce(unstruct_event_test_1_0_1.test_id, unstruct_event_test_1_0_0.test_id) as my_alias_test_id -- context column(s) from the event table from `"~target.project~"`."~target.dataset~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')", "sde_plus_1_context" : "select event_id , collector_tstamp -- Flat columns from event table , app_id -- self describing events columns from event table , coalesce(unstruct_event_test_1_0_1.test_class, unstruct_event_test_1_0_0.test_class) as test_class , coalesce(unstruct_event_test_1_0_1.test_id, unstruct_event_test_1_0_0.test_id) as test_id -- context column(s) from the event table , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_id) as context_test_id , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_class) as context_test_class from `"~target.project~"`."~target.dataset~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')", @@ -46,7 +45,6 @@ It runs 9 tests: {% set results_dict ={ - "flat_cols_only" : snowplow_normalize.normalize_events(['event_name'], ['app_id'], [], [], [], [], [], [], [], [], true).split()|join(' '), "sde_plus_cols" : snowplow_normalize.normalize_events(['event_name'], ['app_id'], ['UNSTRUCT_EVENT_TEST_1_0_1'], [['testId', 'testClass']], [['string', 'boolean']], [], [], [], [], [], true).split()|join(' '), "sde_plus_cols_w_alias" : snowplow_normalize.normalize_events(['event_name'], ['app_id'], ['UNSTRUCT_EVENT_TEST_1_0_1'], [['testId', 'testClass']], [['string', 'boolean']], ['my_alias'], [], [], [], [], true).split()|join(' '), "sde_plus_1_context" : snowplow_normalize.normalize_events(['event_name'], ['app_id'], ['UNSTRUCT_EVENT_TEST_1_0_1'], [['testId', 'testClass']], [['string', 'boolean']], [], ['CONTEXTS_TEST_1_0_0'], [['contextTestId', 'contextTestClass']], [['string', 'integer']], [], true).split()|join(' '), @@ -56,6 +54,7 @@ It runs 9 tests: "context_only_new_loader" : snowplow_normalize.normalize_events(['event_name'], ['app_id'], [], [], [], [], ['CONTEXTS_TEST_1_0_0', 'CONTEXTS_TEST4_1'], [['contextTestId', 'contextTestClass'],['contextTestId2', 'contextTestClass2'] ], [['boolean', 'string'], ['interger', 'string']], [], true).split()|join(' '), "multiple_base_events" : snowplow_normalize.normalize_events(['event_name', 'page_ping'], ['app_id'], [], [], [], [], ['CONTEXTS_TEST_1_0_0', 'CONTEXTS_TEST2_1_0_5'], [['contextTestId', 'contextTestClass'],['contextTestId2', 'contextTestClass2'] ], [['boolean', 'string'], ['interger', 'string']], [], true).split()|join(' '), "multiple_sde_events" : snowplow_normalize.normalize_events(['event_name'], ['app_id'], ['UNSTRUCT_EVENT_TEST_1_0_1', 'UNSTRUCT_EVENT_TEST2_1_0_1'], [['testId', 'testClass'], ['testWord', 'testIdea']], [['number', 'string']], ['test1', 'test2'], ['CONTEXTS_TEST_1_0_0', 'CONTEXTS_TEST2_1_0_5'], [['contextTestId', 'contextTestClass'],['contextTestId2', 'contextTestClass2'] ], [['boolean', 'string'], ['interger', 'string']], [], true).split()|join(' ') + "context_only_new_loader" : snowplow_normalize.normalize_events(['event_name'], ['app_id'], [], [], [], [], ['CONTEXTS_TEST_1_0_0', 'CONTEXTS_TEST4_2'], [['contextTestId', 'contextTestClass'],['contextTestId2', 'contextTestClass2'] ], [['boolean', 'string'], ['interger', 'string']], [], true).split()|join(' ') } %} @@ -67,6 +66,7 @@ It runs 9 tests: {# {{ print(results_dict['sde_plus_2_context'])}} #} {# {{ print(results_dict['sde_plus_2_context_w_alias'])}} #} {# {{ print(results_dict['context_only'])}} #} + {# {{ print(results_dict['context_only_new_loader'])}} #} {# {{ print(results_dict['multiple_base_events'])}} #} {# {{ print(results_dict['multiple_sde_events'])}} #} diff --git a/integration_tests/models/dummy_model/bigquery/snowplow_normalize_stg.sql b/integration_tests/models/dummy_model/bigquery/snowplow_normalize_stg.sql index cabe2f8..03b4f8c 100644 --- a/integration_tests/models/dummy_model/bigquery/snowplow_normalize_stg.sql +++ b/integration_tests/models/dummy_model/bigquery/snowplow_normalize_stg.sql @@ -8,7 +8,7 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0 with prep as ( select * - except(contexts_test_1_0_0,contexts_test2_1_0_0,contexts_test2_1_0_1,contexts_test2_1_0_2,contexts_test2_1_0_3,contexts_test2_1_0_4,contexts_test2_1_0_5, contexts_test4_1), + except(contexts_test_1_0_0,contexts_test2_1_0_0,contexts_test2_1_0_1,contexts_test2_1_0_2,contexts_test2_1_0_3,contexts_test2_1_0_4,contexts_test2_1_0_5, contexts_test4_1,contexts_test4_2), JSON_EXTRACT_ARRAY(contexts_test_1_0_0) AS contexts_test_1_0_0, JSON_EXTRACT_ARRAY(contexts_test2_1_0_0) AS contexts_test2_1_0_0, JSON_EXTRACT_ARRAY(contexts_test2_1_0_1) AS contexts_test2_1_0_1, @@ -16,14 +16,15 @@ select JSON_EXTRACT_ARRAY(contexts_test2_1_0_3) AS contexts_test2_1_0_3, JSON_EXTRACT_ARRAY(contexts_test2_1_0_4) AS contexts_test2_1_0_4, JSON_EXTRACT_ARRAY(contexts_test2_1_0_5) AS contexts_test2_1_0_5, - JSON_EXTRACT_ARRAY(contexts_test4_1) AS contexts_test4_1 + JSON_EXTRACT_ARRAY(contexts_test4_1) AS contexts_test4_1, + JSON_EXTRACT_ARRAY(contexts_test4_2) AS contexts_test4_2 from {{ ref('snowplow_norm_dummy_events') }} ) -- recreate repeated record field i.e. array of structs as is originally in BQ events table select - * except(unstruct_event_test_1_0_0,unstruct_event_test_1_0_1,unstruct_event_test2_1_0_0,unstruct_event_test2_1_0_1,contexts_test_1_0_0,contexts_test2_1_0_0,contexts_test2_1_0_1,contexts_test2_1_0_2,contexts_test2_1_0_3,contexts_test2_1_0_4,contexts_test2_1_0_5,contexts_test4_1), + * except(unstruct_event_test_1_0_0,unstruct_event_test_1_0_1,unstruct_event_test2_1_0_0,unstruct_event_test2_1_0_1,contexts_test_1_0_0,contexts_test2_1_0_0,contexts_test2_1_0_1,contexts_test2_1_0_2,contexts_test2_1_0_3,contexts_test2_1_0_4,contexts_test2_1_0_5,contexts_test4_1,contexts_test4_2), -- order is reversed to test the aliasing of the coalesced columns struct(JSON_EXTRACT_scalar(unstruct_event_test_1_0_0, '$.test_class') as test_class,JSON_EXTRACT_scalar(unstruct_event_test_1_0_0, '$.test_id') as test_id) as unstruct_event_test_1_0_0, struct(JSON_EXTRACT_scalar(unstruct_event_test_1_0_1, '$.test_class') as test_class,JSON_EXTRACT_scalar(unstruct_event_test_1_0_1, '$.test_id') as test_id) as unstruct_event_test_1_0_1, @@ -61,7 +62,11 @@ select array( select struct(JSON_EXTRACT_scalar(json_array,'$.context_test_class2') as context_test_class2, JSON_EXTRACT_scalar(json_array,'$.context_test_id2') as context_test_id2) from unnest(contexts_test4_1) as json_array - ) as contexts_test4_1 - + ) as contexts_test4_1, + array( + select struct(JSON_EXTRACT_scalar(json_array,'$.context_test_class2') as context_test_class2, JSON_EXTRACT_scalar(json_array,'$.context_test_id2') as context_test_id2) + from unnest(contexts_test4_2) as json_array + ) as contexts_test4_2 + from prep diff --git a/macros/normalize_events.sql b/macros/normalize_events.sql index 28a786d..fd53eab 100644 --- a/macros/normalize_events.sql +++ b/macros/normalize_events.sql @@ -67,18 +67,38 @@ where {% macro bigquery__normalize_events(event_names, flat_cols = [], sde_cols = [], sde_keys = [], sde_types = [], sde_aliases = [], context_cols = [], context_keys = [], context_types = [], context_aliases = [], remove_new_event_check = false) %} {# Handle both versioned and unversioned column names #} {%- set re = modules.re -%} - {%- set version_pattern = '_[0-9]+(_[0-9]+)?$' -%} + + {# + This regex pattern handles column versioning in Snowplow contexts and self-describing events. + It specifically targets three-part semantic versions (e.g., field_1_2_3) while preserving + one-part (field_1) and two-part (field_1_2) versions. + + Pattern breakdown: '(_\\d+)_\\d+_\\d+$' + - (_\\d+) : Capture group that matches an underscore followed by one or more digits + This captures the major version number (e.g., "_1" in "field_1_2_3") + - _\\d+ : Matches an underscore and one or more digits (minor version) + - _\\d+ : Matches an underscore and one or more digits (patch version) + - $ : Ensures the pattern only matches at the end of the string + + The replacement pattern '\\1' keeps only the captured major version. + + Examples: + - field_1 -> field_1 (no change - only has major version) + - field_1_2 -> field_1_2 (no change - has major and minor versions) + - field_1_2_3 -> field_1 (transforms - removes minor and patch versions) + #} + {%- set version_pattern = '(_\\d+)_\\d+_\\d+$' -%} {%- set sde_cols_clean = [] -%} {%- for col in sde_cols -%} {# Get the base name for combine_column_versions to work with #} - {%- set clean_name = re.sub(version_pattern, '', col) -%} + {%- set clean_name = re.sub(version_pattern, '\\1', col) -%} {% do sde_cols_clean.append(clean_name) -%} {%- endfor -%} {%- set context_cols_clean = [] -%} {%- for col in context_cols -%} - {%- set clean_name = re.sub(version_pattern, '', col) -%} + {%- set clean_name = re.sub(version_pattern, '\\1', col) -%} {% do context_cols_clean.append(clean_name) -%} {%- endfor -%} {# Replace keys with snake_case where needed #} From e967c666a8f98583fa552fc964d55b3ec17d9936 Mon Sep 17 00:00:00 2001 From: Ilias Xenogiannis Date: Wed, 27 Nov 2024 12:38:18 +0200 Subject: [PATCH 4/8] Update test_normalize_events.sql --- integration_tests/macros/test_normalize_events.sql | 1 + 1 file changed, 1 insertion(+) diff --git a/integration_tests/macros/test_normalize_events.sql b/integration_tests/macros/test_normalize_events.sql index bec29f3..f599dd1 100644 --- a/integration_tests/macros/test_normalize_events.sql +++ b/integration_tests/macros/test_normalize_events.sql @@ -40,6 +40,7 @@ It runs 9 tests: "context_only_new_loader" : "select event_id , collector_tstamp -- Flat columns from event table , app_id -- self describing events columns from event table -- context column(s) from the event table , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_id) as context_test_id , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_class) as context_test_class , coalesce(contexts_test4_1[safe_offset(0)].context_test_class2) as context_test_class2 , coalesce(contexts_test4_1[safe_offset(0)].context_test_id2) as context_test_id2 from `"~target.project~"`."~target.dataset~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')", "multiple_base_events" : "select event_id , collector_tstamp -- Flat columns from event table , app_id -- self describing events columns from event table -- context column(s) from the event table , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_id) as context_test_id , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_class) as context_test_class , coalesce(contexts_test2_1_0_5[safe_offset(0)].context_test_class2, contexts_test2_1_0_4[safe_offset(0)].context_test_class2, contexts_test2_1_0_3[safe_offset(0)].context_test_class2, contexts_test2_1_0_2[safe_offset(0)].context_test_class2, contexts_test2_1_0_1[safe_offset(0)].context_test_class2, contexts_test2_1_0_0[safe_offset(0)].context_test_class2) as context_test_class2 , coalesce(contexts_test2_1_0_5[safe_offset(0)].context_test_id2, contexts_test2_1_0_4[safe_offset(0)].context_test_id2, contexts_test2_1_0_3[safe_offset(0)].context_test_id2, contexts_test2_1_0_2[safe_offset(0)].context_test_id2, contexts_test2_1_0_1[safe_offset(0)].context_test_id2, contexts_test2_1_0_0[safe_offset(0)].context_test_id2) as context_test_id2 from `"~target.project~"`."~target.dataset~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name','page_ping')", "multiple_sde_events" : "select event_id , collector_tstamp -- Flat columns from event table , app_id -- self describing events columns from event table , coalesce(unstruct_event_test_1_0_1.test_class, unstruct_event_test_1_0_0.test_class) as test1_test_class , coalesce(unstruct_event_test_1_0_1.test_id, unstruct_event_test_1_0_0.test_id) as test1_test_id , coalesce(unstruct_event_test2_1_0_1.test_word, unstruct_event_test2_1_0_0.test_word) as test2_test_word , coalesce(unstruct_event_test2_1_0_1.test_idea, unstruct_event_test2_1_0_0.test_idea) as test2_test_idea -- context column(s) from the event table , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_id) as context_test_id , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_class) as context_test_class , coalesce(contexts_test2_1_0_5[safe_offset(0)].context_test_class2, contexts_test2_1_0_4[safe_offset(0)].context_test_class2, contexts_test2_1_0_3[safe_offset(0)].context_test_class2, contexts_test2_1_0_2[safe_offset(0)].context_test_class2, contexts_test2_1_0_1[safe_offset(0)].context_test_class2, contexts_test2_1_0_0[safe_offset(0)].context_test_class2) as context_test_class2 , coalesce(contexts_test2_1_0_5[safe_offset(0)].context_test_id2, contexts_test2_1_0_4[safe_offset(0)].context_test_id2, contexts_test2_1_0_3[safe_offset(0)].context_test_id2, contexts_test2_1_0_2[safe_offset(0)].context_test_id2, contexts_test2_1_0_1[safe_offset(0)].context_test_id2, contexts_test2_1_0_0[safe_offset(0)].context_test_id2) as context_test_id2 from `"~target.project~"`."~target.dataset~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')" + "context_only_new_loader" : "select event_id , collector_tstamp -- Flat columns from event table , app_id -- self describing events columns from event table -- context column(s) from the event table , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_id) as context_test_id , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_class) as context_test_class , coalesce(contexts_test4_2[safe_offset(0)].context_test_class2) as context_test_class2 , coalesce(contexts_test4_2[safe_offset(0)].context_test_id2) as context_test_id2 from `"~target.project~"`."~target.dataset~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')" } %} From 46d9db58e7a1fbe368c4118d28ccd317283bd6e8 Mon Sep 17 00:00:00 2001 From: Ilias Xenogiannis Date: Wed, 27 Nov 2024 12:39:55 +0200 Subject: [PATCH 5/8] Update test_normalize_events.sql --- integration_tests/macros/test_normalize_events.sql | 1 - 1 file changed, 1 deletion(-) diff --git a/integration_tests/macros/test_normalize_events.sql b/integration_tests/macros/test_normalize_events.sql index f599dd1..bec29f3 100644 --- a/integration_tests/macros/test_normalize_events.sql +++ b/integration_tests/macros/test_normalize_events.sql @@ -40,7 +40,6 @@ It runs 9 tests: "context_only_new_loader" : "select event_id , collector_tstamp -- Flat columns from event table , app_id -- self describing events columns from event table -- context column(s) from the event table , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_id) as context_test_id , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_class) as context_test_class , coalesce(contexts_test4_1[safe_offset(0)].context_test_class2) as context_test_class2 , coalesce(contexts_test4_1[safe_offset(0)].context_test_id2) as context_test_id2 from `"~target.project~"`."~target.dataset~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')", "multiple_base_events" : "select event_id , collector_tstamp -- Flat columns from event table , app_id -- self describing events columns from event table -- context column(s) from the event table , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_id) as context_test_id , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_class) as context_test_class , coalesce(contexts_test2_1_0_5[safe_offset(0)].context_test_class2, contexts_test2_1_0_4[safe_offset(0)].context_test_class2, contexts_test2_1_0_3[safe_offset(0)].context_test_class2, contexts_test2_1_0_2[safe_offset(0)].context_test_class2, contexts_test2_1_0_1[safe_offset(0)].context_test_class2, contexts_test2_1_0_0[safe_offset(0)].context_test_class2) as context_test_class2 , coalesce(contexts_test2_1_0_5[safe_offset(0)].context_test_id2, contexts_test2_1_0_4[safe_offset(0)].context_test_id2, contexts_test2_1_0_3[safe_offset(0)].context_test_id2, contexts_test2_1_0_2[safe_offset(0)].context_test_id2, contexts_test2_1_0_1[safe_offset(0)].context_test_id2, contexts_test2_1_0_0[safe_offset(0)].context_test_id2) as context_test_id2 from `"~target.project~"`."~target.dataset~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name','page_ping')", "multiple_sde_events" : "select event_id , collector_tstamp -- Flat columns from event table , app_id -- self describing events columns from event table , coalesce(unstruct_event_test_1_0_1.test_class, unstruct_event_test_1_0_0.test_class) as test1_test_class , coalesce(unstruct_event_test_1_0_1.test_id, unstruct_event_test_1_0_0.test_id) as test1_test_id , coalesce(unstruct_event_test2_1_0_1.test_word, unstruct_event_test2_1_0_0.test_word) as test2_test_word , coalesce(unstruct_event_test2_1_0_1.test_idea, unstruct_event_test2_1_0_0.test_idea) as test2_test_idea -- context column(s) from the event table , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_id) as context_test_id , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_class) as context_test_class , coalesce(contexts_test2_1_0_5[safe_offset(0)].context_test_class2, contexts_test2_1_0_4[safe_offset(0)].context_test_class2, contexts_test2_1_0_3[safe_offset(0)].context_test_class2, contexts_test2_1_0_2[safe_offset(0)].context_test_class2, contexts_test2_1_0_1[safe_offset(0)].context_test_class2, contexts_test2_1_0_0[safe_offset(0)].context_test_class2) as context_test_class2 , coalesce(contexts_test2_1_0_5[safe_offset(0)].context_test_id2, contexts_test2_1_0_4[safe_offset(0)].context_test_id2, contexts_test2_1_0_3[safe_offset(0)].context_test_id2, contexts_test2_1_0_2[safe_offset(0)].context_test_id2, contexts_test2_1_0_1[safe_offset(0)].context_test_id2, contexts_test2_1_0_0[safe_offset(0)].context_test_id2) as context_test_id2 from `"~target.project~"`."~target.dataset~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')" - "context_only_new_loader" : "select event_id , collector_tstamp -- Flat columns from event table , app_id -- self describing events columns from event table -- context column(s) from the event table , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_id) as context_test_id , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_class) as context_test_class , coalesce(contexts_test4_2[safe_offset(0)].context_test_class2) as context_test_class2 , coalesce(contexts_test4_2[safe_offset(0)].context_test_id2) as context_test_id2 from `"~target.project~"`."~target.dataset~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')" } %} From 299668a1791b2f27fba15c1ed0985921ed5229f5 Mon Sep 17 00:00:00 2001 From: Ilias Xenogiannis Date: Wed, 27 Nov 2024 12:41:04 +0200 Subject: [PATCH 6/8] Update test_normalize_events.sql --- integration_tests/macros/test_normalize_events.sql | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/integration_tests/macros/test_normalize_events.sql b/integration_tests/macros/test_normalize_events.sql index bec29f3..c790f53 100644 --- a/integration_tests/macros/test_normalize_events.sql +++ b/integration_tests/macros/test_normalize_events.sql @@ -51,10 +51,9 @@ It runs 9 tests: "sde_plus_2_context" : snowplow_normalize.normalize_events(['event_name'], ['app_id'], ['UNSTRUCT_EVENT_TEST_1_0_1'], [['testId', 'testClass']], [['string', 'boolean']], [], ['CONTEXTS_TEST_1_0_0', 'CONTEXTS_TEST2_1_0_5'], [['contextTestId', 'contextTestClass'], ['contextTestId2', 'contextTestClass2']], [['boolean', 'string'], ['interger', 'string']], [], true).split()|join(' '), "sde_plus_2_context_w_alias" : snowplow_normalize.normalize_events(['event_name'], ['app_id'], ['UNSTRUCT_EVENT_TEST_1_0_1'], [['testId', 'testClass']], [['string', 'boolean']], [], ['CONTEXTS_TEST_1_0_0', 'CONTEXTS_TEST2_1_0_5'], [['contextTestId', 'contextTestClass'],['contextTestId2', 'contextTestClass2'] ], [['boolean', 'string'], ['interger', 'string']], ['test1', 'test2'], true).split()|join(' '), "context_only" : snowplow_normalize.normalize_events(['event_name'], ['app_id'], [], [], [], [], ['CONTEXTS_TEST_1_0_0', 'CONTEXTS_TEST2_1_0_5'], [['contextTestId', 'contextTestClass'],['contextTestId2', 'contextTestClass2'] ], [['boolean', 'string'], ['interger', 'string']], [], true).split()|join(' '), - "context_only_new_loader" : snowplow_normalize.normalize_events(['event_name'], ['app_id'], [], [], [], [], ['CONTEXTS_TEST_1_0_0', 'CONTEXTS_TEST4_1'], [['contextTestId', 'contextTestClass'],['contextTestId2', 'contextTestClass2'] ], [['boolean', 'string'], ['interger', 'string']], [], true).split()|join(' '), + "context_only_new_loader" : snowplow_normalize.normalize_events(['event_name'], ['app_id'], [], [], [], [], ['CONTEXTS_TEST_1_0_0', 'CONTEXTS_TEST4_2'], [['contextTestId', 'contextTestClass'],['contextTestId2', 'contextTestClass2'] ], [['boolean', 'string'], ['interger', 'string']], [], true).split()|join(' '), "multiple_base_events" : snowplow_normalize.normalize_events(['event_name', 'page_ping'], ['app_id'], [], [], [], [], ['CONTEXTS_TEST_1_0_0', 'CONTEXTS_TEST2_1_0_5'], [['contextTestId', 'contextTestClass'],['contextTestId2', 'contextTestClass2'] ], [['boolean', 'string'], ['interger', 'string']], [], true).split()|join(' '), "multiple_sde_events" : snowplow_normalize.normalize_events(['event_name'], ['app_id'], ['UNSTRUCT_EVENT_TEST_1_0_1', 'UNSTRUCT_EVENT_TEST2_1_0_1'], [['testId', 'testClass'], ['testWord', 'testIdea']], [['number', 'string']], ['test1', 'test2'], ['CONTEXTS_TEST_1_0_0', 'CONTEXTS_TEST2_1_0_5'], [['contextTestId', 'contextTestClass'],['contextTestId2', 'contextTestClass2'] ], [['boolean', 'string'], ['interger', 'string']], [], true).split()|join(' ') - "context_only_new_loader" : snowplow_normalize.normalize_events(['event_name'], ['app_id'], [], [], [], [], ['CONTEXTS_TEST_1_0_0', 'CONTEXTS_TEST4_2'], [['contextTestId', 'contextTestClass'],['contextTestId2', 'contextTestClass2'] ], [['boolean', 'string'], ['interger', 'string']], [], true).split()|join(' ') } %} From 0d856fcd7b20ce949f596c9407b1c8731e6f6a61 Mon Sep 17 00:00:00 2001 From: Ilias Xenogiannis Date: Wed, 27 Nov 2024 12:49:36 +0200 Subject: [PATCH 7/8] Update test_normalize_events.sql --- integration_tests/macros/test_normalize_events.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration_tests/macros/test_normalize_events.sql b/integration_tests/macros/test_normalize_events.sql index c790f53..937e70a 100644 --- a/integration_tests/macros/test_normalize_events.sql +++ b/integration_tests/macros/test_normalize_events.sql @@ -37,7 +37,7 @@ It runs 9 tests: "sde_plus_2_context" : "select event_id , collector_tstamp -- Flat columns from event table , app_id -- self describing events columns from event table , coalesce(unstruct_event_test_1_0_1.test_class, unstruct_event_test_1_0_0.test_class) as test_class , coalesce(unstruct_event_test_1_0_1.test_id, unstruct_event_test_1_0_0.test_id) as test_id -- context column(s) from the event table , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_id) as context_test_id , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_class) as context_test_class , coalesce(contexts_test2_1_0_5[safe_offset(0)].context_test_class2, contexts_test2_1_0_4[safe_offset(0)].context_test_class2, contexts_test2_1_0_3[safe_offset(0)].context_test_class2, contexts_test2_1_0_2[safe_offset(0)].context_test_class2, contexts_test2_1_0_1[safe_offset(0)].context_test_class2, contexts_test2_1_0_0[safe_offset(0)].context_test_class2) as context_test_class2 , coalesce(contexts_test2_1_0_5[safe_offset(0)].context_test_id2, contexts_test2_1_0_4[safe_offset(0)].context_test_id2, contexts_test2_1_0_3[safe_offset(0)].context_test_id2, contexts_test2_1_0_2[safe_offset(0)].context_test_id2, contexts_test2_1_0_1[safe_offset(0)].context_test_id2, contexts_test2_1_0_0[safe_offset(0)].context_test_id2) as context_test_id2 from `"~target.project~"`."~target.dataset~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')", "sde_plus_2_context_w_alias" : "select event_id , collector_tstamp -- Flat columns from event table , app_id -- self describing events columns from event table , coalesce(unstruct_event_test_1_0_1.test_class, unstruct_event_test_1_0_0.test_class) as test_class , coalesce(unstruct_event_test_1_0_1.test_id, unstruct_event_test_1_0_0.test_id) as test_id -- context column(s) from the event table , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_id) as test1_context_test_id , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_class) as test1_context_test_class , coalesce(contexts_test2_1_0_5[safe_offset(0)].context_test_class2, contexts_test2_1_0_4[safe_offset(0)].context_test_class2, contexts_test2_1_0_3[safe_offset(0)].context_test_class2, contexts_test2_1_0_2[safe_offset(0)].context_test_class2, contexts_test2_1_0_1[safe_offset(0)].context_test_class2, contexts_test2_1_0_0[safe_offset(0)].context_test_class2) as test2_context_test_class2 , coalesce(contexts_test2_1_0_5[safe_offset(0)].context_test_id2, contexts_test2_1_0_4[safe_offset(0)].context_test_id2, contexts_test2_1_0_3[safe_offset(0)].context_test_id2, contexts_test2_1_0_2[safe_offset(0)].context_test_id2, contexts_test2_1_0_1[safe_offset(0)].context_test_id2, contexts_test2_1_0_0[safe_offset(0)].context_test_id2) as test2_context_test_id2 from `"~target.project~"`."~target.dataset~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')", "context_only" : "select event_id , collector_tstamp -- Flat columns from event table , app_id -- self describing events columns from event table -- context column(s) from the event table , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_id) as context_test_id , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_class) as context_test_class , coalesce(contexts_test2_1_0_5[safe_offset(0)].context_test_class2, contexts_test2_1_0_4[safe_offset(0)].context_test_class2, contexts_test2_1_0_3[safe_offset(0)].context_test_class2, contexts_test2_1_0_2[safe_offset(0)].context_test_class2, contexts_test2_1_0_1[safe_offset(0)].context_test_class2, contexts_test2_1_0_0[safe_offset(0)].context_test_class2) as context_test_class2 , coalesce(contexts_test2_1_0_5[safe_offset(0)].context_test_id2, contexts_test2_1_0_4[safe_offset(0)].context_test_id2, contexts_test2_1_0_3[safe_offset(0)].context_test_id2, contexts_test2_1_0_2[safe_offset(0)].context_test_id2, contexts_test2_1_0_1[safe_offset(0)].context_test_id2, contexts_test2_1_0_0[safe_offset(0)].context_test_id2) as context_test_id2 from `"~target.project~"`."~target.dataset~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')", - "context_only_new_loader" : "select event_id , collector_tstamp -- Flat columns from event table , app_id -- self describing events columns from event table -- context column(s) from the event table , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_id) as context_test_id , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_class) as context_test_class , coalesce(contexts_test4_1[safe_offset(0)].context_test_class2) as context_test_class2 , coalesce(contexts_test4_1[safe_offset(0)].context_test_id2) as context_test_id2 from `"~target.project~"`."~target.dataset~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')", + "context_only_new_loader" : "select event_id , collector_tstamp -- Flat columns from event table , app_id -- self describing events columns from event table -- context column(s) from the event table , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_id) as context_test_id , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_class) as context_test_class , coalesce(contexts_test4_2[safe_offset(0)].context_test_class2) as context_test_class2 , coalesce(contexts_test4_2[safe_offset(0)].context_test_id2) as context_test_id2 from `"~target.project~"`."~target.dataset~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')", "multiple_base_events" : "select event_id , collector_tstamp -- Flat columns from event table , app_id -- self describing events columns from event table -- context column(s) from the event table , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_id) as context_test_id , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_class) as context_test_class , coalesce(contexts_test2_1_0_5[safe_offset(0)].context_test_class2, contexts_test2_1_0_4[safe_offset(0)].context_test_class2, contexts_test2_1_0_3[safe_offset(0)].context_test_class2, contexts_test2_1_0_2[safe_offset(0)].context_test_class2, contexts_test2_1_0_1[safe_offset(0)].context_test_class2, contexts_test2_1_0_0[safe_offset(0)].context_test_class2) as context_test_class2 , coalesce(contexts_test2_1_0_5[safe_offset(0)].context_test_id2, contexts_test2_1_0_4[safe_offset(0)].context_test_id2, contexts_test2_1_0_3[safe_offset(0)].context_test_id2, contexts_test2_1_0_2[safe_offset(0)].context_test_id2, contexts_test2_1_0_1[safe_offset(0)].context_test_id2, contexts_test2_1_0_0[safe_offset(0)].context_test_id2) as context_test_id2 from `"~target.project~"`."~target.dataset~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name','page_ping')", "multiple_sde_events" : "select event_id , collector_tstamp -- Flat columns from event table , app_id -- self describing events columns from event table , coalesce(unstruct_event_test_1_0_1.test_class, unstruct_event_test_1_0_0.test_class) as test1_test_class , coalesce(unstruct_event_test_1_0_1.test_id, unstruct_event_test_1_0_0.test_id) as test1_test_id , coalesce(unstruct_event_test2_1_0_1.test_word, unstruct_event_test2_1_0_0.test_word) as test2_test_word , coalesce(unstruct_event_test2_1_0_1.test_idea, unstruct_event_test2_1_0_0.test_idea) as test2_test_idea -- context column(s) from the event table , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_id) as context_test_id , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_class) as context_test_class , coalesce(contexts_test2_1_0_5[safe_offset(0)].context_test_class2, contexts_test2_1_0_4[safe_offset(0)].context_test_class2, contexts_test2_1_0_3[safe_offset(0)].context_test_class2, contexts_test2_1_0_2[safe_offset(0)].context_test_class2, contexts_test2_1_0_1[safe_offset(0)].context_test_class2, contexts_test2_1_0_0[safe_offset(0)].context_test_class2) as context_test_class2 , coalesce(contexts_test2_1_0_5[safe_offset(0)].context_test_id2, contexts_test2_1_0_4[safe_offset(0)].context_test_id2, contexts_test2_1_0_3[safe_offset(0)].context_test_id2, contexts_test2_1_0_2[safe_offset(0)].context_test_id2, contexts_test2_1_0_1[safe_offset(0)].context_test_id2, contexts_test2_1_0_0[safe_offset(0)].context_test_id2) as context_test_id2 from `"~target.project~"`."~target.dataset~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')" } %} From 5e184809e87f6c339743865059654935f01a3559 Mon Sep 17 00:00:00 2001 From: Ilias Xenogiannis Date: Wed, 27 Nov 2024 12:38:00 +0200 Subject: [PATCH 8/8] Updates Update test_normalize_events.sql Update test_normalize_events.sql Update test_normalize_events.sql Update test_normalize_events.sql --- .../source/snowplow_norm_dummy_events.csv | 4 +-- .../macros/test_normalize_events.sql | 7 +++-- .../bigquery/snowplow_normalize_stg.sql | 15 +++++++---- macros/normalize_events.sql | 26 ++++++++++++++++--- 4 files changed, 38 insertions(+), 14 deletions(-) diff --git a/integration_tests/data/source/snowplow_norm_dummy_events.csv b/integration_tests/data/source/snowplow_norm_dummy_events.csv index c911db1..3840de2 100644 --- a/integration_tests/data/source/snowplow_norm_dummy_events.csv +++ b/integration_tests/data/source/snowplow_norm_dummy_events.csv @@ -1,2 +1,2 @@ -event_name,event_id,app_id,collector_tstamp,dvce_sent_tstamp,dvce_created_tstamp,unstruct_event_test_1_0_0,unstruct_event_test_1_0_1,unstruct_event_test2_1_0_0,unstruct_event_test2_1_0_1,contexts_test_1_0_0,contexts_test2_1_0_0,contexts_test2_1_0_1,contexts_test2_1_0_2,contexts_test2_1_0_3,contexts_test2_1_0_4,contexts_test2_1_0_5,contexts_test4_1 -'demo','demo','demo',2022-10-01 01:27:34,2022-10-01 01:27:34,2022-10-01 01:27:34,"{""test_id"":""demo"", ""test_class"":""demo""}","{""test_id"":""demo"", ""test_class"":""demo""}","{""test_word"":""demo"", ""test_idea"":""demo""}","{""test_word"":""demo"", ""test_idea"":""demo""}","[{""context_test_id"":""demo"", ""context_test_class"":""demo""}]","[{""context_test_id2"":""demo"", ""context_test_class2"":""demo""}]","[{""context_test_id2"":""demo"", ""context_test_class2"":""demo""}]","[{""context_test_id2"":""demo"", ""context_test_class2"":""demo""}]","[{""context_test_id2"":""demo"", ""context_test_class2"":""demo""}]","[{""context_test_id2"":""demo"", ""context_test_class2"":""demo""}]","[{""context_test_id2"":""demo"", ""context_test_class2"":""demo""}]","[{""context_test_id2"":""demo"", ""context_test_class2"":""demo""}]" +event_name,event_id,app_id,collector_tstamp,dvce_sent_tstamp,dvce_created_tstamp,unstruct_event_test_1_0_0,unstruct_event_test_1_0_1,unstruct_event_test2_1_0_0,unstruct_event_test2_1_0_1,contexts_test_1_0_0,contexts_test2_1_0_0,contexts_test2_1_0_1,contexts_test2_1_0_2,contexts_test2_1_0_3,contexts_test2_1_0_4,contexts_test2_1_0_5,contexts_test4_1,contexts_test4_2 +'demo','demo','demo',2022-10-01 01:27:34,2022-10-01 01:27:34,2022-10-01 01:27:34,"{""test_id"":""demo"", ""test_class"":""demo""}","{""test_id"":""demo"", ""test_class"":""demo""}","{""test_word"":""demo"", ""test_idea"":""demo""}","{""test_word"":""demo"", ""test_idea"":""demo""}","[{""context_test_id"":""demo"", ""context_test_class"":""demo""}]","[{""context_test_id2"":""demo"", ""context_test_class2"":""demo""}]","[{""context_test_id2"":""demo"", ""context_test_class2"":""demo""}]","[{""context_test_id2"":""demo"", ""context_test_class2"":""demo""}]","[{""context_test_id2"":""demo"", ""context_test_class2"":""demo""}]","[{""context_test_id2"":""demo"", ""context_test_class2"":""demo""}]","[{""context_test_id2"":""demo"", ""context_test_class2"":""demo""}]","[{""context_test_id2"":""demo"", ""context_test_class2"":""demo""}]","[{""context_test_id2"":""demo"", ""context_test_class2"":""demo""}]" diff --git a/integration_tests/macros/test_normalize_events.sql b/integration_tests/macros/test_normalize_events.sql index be9f28a..937e70a 100644 --- a/integration_tests/macros/test_normalize_events.sql +++ b/integration_tests/macros/test_normalize_events.sql @@ -31,14 +31,13 @@ It runs 9 tests: {% macro bigquery__test_normalize_events() %} {% set expected_dict = { - "flat_cols_only" : "select event_id , collector_tstamp -- Flat columns from event table , app_id -- self describing events columns from event table -- context column(s) from the event table from `"~target.project~"`."~target.dataset~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')", "sde_plus_cols" : "select event_id , collector_tstamp -- Flat columns from event table , app_id -- self describing events columns from event table , coalesce(unstruct_event_test_1_0_1.test_class, unstruct_event_test_1_0_0.test_class) as test_class , coalesce(unstruct_event_test_1_0_1.test_id, unstruct_event_test_1_0_0.test_id) as test_id -- context column(s) from the event table from `"~target.project~"`."~target.dataset~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')", "sde_plus_cols_w_alias" : "select event_id , collector_tstamp -- Flat columns from event table , app_id -- self describing events columns from event table , coalesce(unstruct_event_test_1_0_1.test_class, unstruct_event_test_1_0_0.test_class) as my_alias_test_class , coalesce(unstruct_event_test_1_0_1.test_id, unstruct_event_test_1_0_0.test_id) as my_alias_test_id -- context column(s) from the event table from `"~target.project~"`."~target.dataset~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')", "sde_plus_1_context" : "select event_id , collector_tstamp -- Flat columns from event table , app_id -- self describing events columns from event table , coalesce(unstruct_event_test_1_0_1.test_class, unstruct_event_test_1_0_0.test_class) as test_class , coalesce(unstruct_event_test_1_0_1.test_id, unstruct_event_test_1_0_0.test_id) as test_id -- context column(s) from the event table , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_id) as context_test_id , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_class) as context_test_class from `"~target.project~"`."~target.dataset~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')", "sde_plus_2_context" : "select event_id , collector_tstamp -- Flat columns from event table , app_id -- self describing events columns from event table , coalesce(unstruct_event_test_1_0_1.test_class, unstruct_event_test_1_0_0.test_class) as test_class , coalesce(unstruct_event_test_1_0_1.test_id, unstruct_event_test_1_0_0.test_id) as test_id -- context column(s) from the event table , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_id) as context_test_id , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_class) as context_test_class , coalesce(contexts_test2_1_0_5[safe_offset(0)].context_test_class2, contexts_test2_1_0_4[safe_offset(0)].context_test_class2, contexts_test2_1_0_3[safe_offset(0)].context_test_class2, contexts_test2_1_0_2[safe_offset(0)].context_test_class2, contexts_test2_1_0_1[safe_offset(0)].context_test_class2, contexts_test2_1_0_0[safe_offset(0)].context_test_class2) as context_test_class2 , coalesce(contexts_test2_1_0_5[safe_offset(0)].context_test_id2, contexts_test2_1_0_4[safe_offset(0)].context_test_id2, contexts_test2_1_0_3[safe_offset(0)].context_test_id2, contexts_test2_1_0_2[safe_offset(0)].context_test_id2, contexts_test2_1_0_1[safe_offset(0)].context_test_id2, contexts_test2_1_0_0[safe_offset(0)].context_test_id2) as context_test_id2 from `"~target.project~"`."~target.dataset~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')", "sde_plus_2_context_w_alias" : "select event_id , collector_tstamp -- Flat columns from event table , app_id -- self describing events columns from event table , coalesce(unstruct_event_test_1_0_1.test_class, unstruct_event_test_1_0_0.test_class) as test_class , coalesce(unstruct_event_test_1_0_1.test_id, unstruct_event_test_1_0_0.test_id) as test_id -- context column(s) from the event table , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_id) as test1_context_test_id , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_class) as test1_context_test_class , coalesce(contexts_test2_1_0_5[safe_offset(0)].context_test_class2, contexts_test2_1_0_4[safe_offset(0)].context_test_class2, contexts_test2_1_0_3[safe_offset(0)].context_test_class2, contexts_test2_1_0_2[safe_offset(0)].context_test_class2, contexts_test2_1_0_1[safe_offset(0)].context_test_class2, contexts_test2_1_0_0[safe_offset(0)].context_test_class2) as test2_context_test_class2 , coalesce(contexts_test2_1_0_5[safe_offset(0)].context_test_id2, contexts_test2_1_0_4[safe_offset(0)].context_test_id2, contexts_test2_1_0_3[safe_offset(0)].context_test_id2, contexts_test2_1_0_2[safe_offset(0)].context_test_id2, contexts_test2_1_0_1[safe_offset(0)].context_test_id2, contexts_test2_1_0_0[safe_offset(0)].context_test_id2) as test2_context_test_id2 from `"~target.project~"`."~target.dataset~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')", "context_only" : "select event_id , collector_tstamp -- Flat columns from event table , app_id -- self describing events columns from event table -- context column(s) from the event table , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_id) as context_test_id , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_class) as context_test_class , coalesce(contexts_test2_1_0_5[safe_offset(0)].context_test_class2, contexts_test2_1_0_4[safe_offset(0)].context_test_class2, contexts_test2_1_0_3[safe_offset(0)].context_test_class2, contexts_test2_1_0_2[safe_offset(0)].context_test_class2, contexts_test2_1_0_1[safe_offset(0)].context_test_class2, contexts_test2_1_0_0[safe_offset(0)].context_test_class2) as context_test_class2 , coalesce(contexts_test2_1_0_5[safe_offset(0)].context_test_id2, contexts_test2_1_0_4[safe_offset(0)].context_test_id2, contexts_test2_1_0_3[safe_offset(0)].context_test_id2, contexts_test2_1_0_2[safe_offset(0)].context_test_id2, contexts_test2_1_0_1[safe_offset(0)].context_test_id2, contexts_test2_1_0_0[safe_offset(0)].context_test_id2) as context_test_id2 from `"~target.project~"`."~target.dataset~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')", - "context_only_new_loader" : "select event_id , collector_tstamp -- Flat columns from event table , app_id -- self describing events columns from event table -- context column(s) from the event table , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_id) as context_test_id , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_class) as context_test_class , coalesce(contexts_test4_1[safe_offset(0)].context_test_class2) as context_test_class2 , coalesce(contexts_test4_1[safe_offset(0)].context_test_id2) as context_test_id2 from `"~target.project~"`."~target.dataset~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')", + "context_only_new_loader" : "select event_id , collector_tstamp -- Flat columns from event table , app_id -- self describing events columns from event table -- context column(s) from the event table , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_id) as context_test_id , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_class) as context_test_class , coalesce(contexts_test4_2[safe_offset(0)].context_test_class2) as context_test_class2 , coalesce(contexts_test4_2[safe_offset(0)].context_test_id2) as context_test_id2 from `"~target.project~"`."~target.dataset~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')", "multiple_base_events" : "select event_id , collector_tstamp -- Flat columns from event table , app_id -- self describing events columns from event table -- context column(s) from the event table , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_id) as context_test_id , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_class) as context_test_class , coalesce(contexts_test2_1_0_5[safe_offset(0)].context_test_class2, contexts_test2_1_0_4[safe_offset(0)].context_test_class2, contexts_test2_1_0_3[safe_offset(0)].context_test_class2, contexts_test2_1_0_2[safe_offset(0)].context_test_class2, contexts_test2_1_0_1[safe_offset(0)].context_test_class2, contexts_test2_1_0_0[safe_offset(0)].context_test_class2) as context_test_class2 , coalesce(contexts_test2_1_0_5[safe_offset(0)].context_test_id2, contexts_test2_1_0_4[safe_offset(0)].context_test_id2, contexts_test2_1_0_3[safe_offset(0)].context_test_id2, contexts_test2_1_0_2[safe_offset(0)].context_test_id2, contexts_test2_1_0_1[safe_offset(0)].context_test_id2, contexts_test2_1_0_0[safe_offset(0)].context_test_id2) as context_test_id2 from `"~target.project~"`."~target.dataset~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name','page_ping')", "multiple_sde_events" : "select event_id , collector_tstamp -- Flat columns from event table , app_id -- self describing events columns from event table , coalesce(unstruct_event_test_1_0_1.test_class, unstruct_event_test_1_0_0.test_class) as test1_test_class , coalesce(unstruct_event_test_1_0_1.test_id, unstruct_event_test_1_0_0.test_id) as test1_test_id , coalesce(unstruct_event_test2_1_0_1.test_word, unstruct_event_test2_1_0_0.test_word) as test2_test_word , coalesce(unstruct_event_test2_1_0_1.test_idea, unstruct_event_test2_1_0_0.test_idea) as test2_test_idea -- context column(s) from the event table , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_id) as context_test_id , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_class) as context_test_class , coalesce(contexts_test2_1_0_5[safe_offset(0)].context_test_class2, contexts_test2_1_0_4[safe_offset(0)].context_test_class2, contexts_test2_1_0_3[safe_offset(0)].context_test_class2, contexts_test2_1_0_2[safe_offset(0)].context_test_class2, contexts_test2_1_0_1[safe_offset(0)].context_test_class2, contexts_test2_1_0_0[safe_offset(0)].context_test_class2) as context_test_class2 , coalesce(contexts_test2_1_0_5[safe_offset(0)].context_test_id2, contexts_test2_1_0_4[safe_offset(0)].context_test_id2, contexts_test2_1_0_3[safe_offset(0)].context_test_id2, contexts_test2_1_0_2[safe_offset(0)].context_test_id2, contexts_test2_1_0_1[safe_offset(0)].context_test_id2, contexts_test2_1_0_0[safe_offset(0)].context_test_id2) as context_test_id2 from `"~target.project~"`."~target.dataset~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')" } %} @@ -46,14 +45,13 @@ It runs 9 tests: {% set results_dict ={ - "flat_cols_only" : snowplow_normalize.normalize_events(['event_name'], ['app_id'], [], [], [], [], [], [], [], [], true).split()|join(' '), "sde_plus_cols" : snowplow_normalize.normalize_events(['event_name'], ['app_id'], ['UNSTRUCT_EVENT_TEST_1_0_1'], [['testId', 'testClass']], [['string', 'boolean']], [], [], [], [], [], true).split()|join(' '), "sde_plus_cols_w_alias" : snowplow_normalize.normalize_events(['event_name'], ['app_id'], ['UNSTRUCT_EVENT_TEST_1_0_1'], [['testId', 'testClass']], [['string', 'boolean']], ['my_alias'], [], [], [], [], true).split()|join(' '), "sde_plus_1_context" : snowplow_normalize.normalize_events(['event_name'], ['app_id'], ['UNSTRUCT_EVENT_TEST_1_0_1'], [['testId', 'testClass']], [['string', 'boolean']], [], ['CONTEXTS_TEST_1_0_0'], [['contextTestId', 'contextTestClass']], [['string', 'integer']], [], true).split()|join(' '), "sde_plus_2_context" : snowplow_normalize.normalize_events(['event_name'], ['app_id'], ['UNSTRUCT_EVENT_TEST_1_0_1'], [['testId', 'testClass']], [['string', 'boolean']], [], ['CONTEXTS_TEST_1_0_0', 'CONTEXTS_TEST2_1_0_5'], [['contextTestId', 'contextTestClass'], ['contextTestId2', 'contextTestClass2']], [['boolean', 'string'], ['interger', 'string']], [], true).split()|join(' '), "sde_plus_2_context_w_alias" : snowplow_normalize.normalize_events(['event_name'], ['app_id'], ['UNSTRUCT_EVENT_TEST_1_0_1'], [['testId', 'testClass']], [['string', 'boolean']], [], ['CONTEXTS_TEST_1_0_0', 'CONTEXTS_TEST2_1_0_5'], [['contextTestId', 'contextTestClass'],['contextTestId2', 'contextTestClass2'] ], [['boolean', 'string'], ['interger', 'string']], ['test1', 'test2'], true).split()|join(' '), "context_only" : snowplow_normalize.normalize_events(['event_name'], ['app_id'], [], [], [], [], ['CONTEXTS_TEST_1_0_0', 'CONTEXTS_TEST2_1_0_5'], [['contextTestId', 'contextTestClass'],['contextTestId2', 'contextTestClass2'] ], [['boolean', 'string'], ['interger', 'string']], [], true).split()|join(' '), - "context_only_new_loader" : snowplow_normalize.normalize_events(['event_name'], ['app_id'], [], [], [], [], ['CONTEXTS_TEST_1_0_0', 'CONTEXTS_TEST4_1'], [['contextTestId', 'contextTestClass'],['contextTestId2', 'contextTestClass2'] ], [['boolean', 'string'], ['interger', 'string']], [], true).split()|join(' '), + "context_only_new_loader" : snowplow_normalize.normalize_events(['event_name'], ['app_id'], [], [], [], [], ['CONTEXTS_TEST_1_0_0', 'CONTEXTS_TEST4_2'], [['contextTestId', 'contextTestClass'],['contextTestId2', 'contextTestClass2'] ], [['boolean', 'string'], ['interger', 'string']], [], true).split()|join(' '), "multiple_base_events" : snowplow_normalize.normalize_events(['event_name', 'page_ping'], ['app_id'], [], [], [], [], ['CONTEXTS_TEST_1_0_0', 'CONTEXTS_TEST2_1_0_5'], [['contextTestId', 'contextTestClass'],['contextTestId2', 'contextTestClass2'] ], [['boolean', 'string'], ['interger', 'string']], [], true).split()|join(' '), "multiple_sde_events" : snowplow_normalize.normalize_events(['event_name'], ['app_id'], ['UNSTRUCT_EVENT_TEST_1_0_1', 'UNSTRUCT_EVENT_TEST2_1_0_1'], [['testId', 'testClass'], ['testWord', 'testIdea']], [['number', 'string']], ['test1', 'test2'], ['CONTEXTS_TEST_1_0_0', 'CONTEXTS_TEST2_1_0_5'], [['contextTestId', 'contextTestClass'],['contextTestId2', 'contextTestClass2'] ], [['boolean', 'string'], ['interger', 'string']], [], true).split()|join(' ') } @@ -67,6 +65,7 @@ It runs 9 tests: {# {{ print(results_dict['sde_plus_2_context'])}} #} {# {{ print(results_dict['sde_plus_2_context_w_alias'])}} #} {# {{ print(results_dict['context_only'])}} #} + {# {{ print(results_dict['context_only_new_loader'])}} #} {# {{ print(results_dict['multiple_base_events'])}} #} {# {{ print(results_dict['multiple_sde_events'])}} #} diff --git a/integration_tests/models/dummy_model/bigquery/snowplow_normalize_stg.sql b/integration_tests/models/dummy_model/bigquery/snowplow_normalize_stg.sql index cabe2f8..03b4f8c 100644 --- a/integration_tests/models/dummy_model/bigquery/snowplow_normalize_stg.sql +++ b/integration_tests/models/dummy_model/bigquery/snowplow_normalize_stg.sql @@ -8,7 +8,7 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0 with prep as ( select * - except(contexts_test_1_0_0,contexts_test2_1_0_0,contexts_test2_1_0_1,contexts_test2_1_0_2,contexts_test2_1_0_3,contexts_test2_1_0_4,contexts_test2_1_0_5, contexts_test4_1), + except(contexts_test_1_0_0,contexts_test2_1_0_0,contexts_test2_1_0_1,contexts_test2_1_0_2,contexts_test2_1_0_3,contexts_test2_1_0_4,contexts_test2_1_0_5, contexts_test4_1,contexts_test4_2), JSON_EXTRACT_ARRAY(contexts_test_1_0_0) AS contexts_test_1_0_0, JSON_EXTRACT_ARRAY(contexts_test2_1_0_0) AS contexts_test2_1_0_0, JSON_EXTRACT_ARRAY(contexts_test2_1_0_1) AS contexts_test2_1_0_1, @@ -16,14 +16,15 @@ select JSON_EXTRACT_ARRAY(contexts_test2_1_0_3) AS contexts_test2_1_0_3, JSON_EXTRACT_ARRAY(contexts_test2_1_0_4) AS contexts_test2_1_0_4, JSON_EXTRACT_ARRAY(contexts_test2_1_0_5) AS contexts_test2_1_0_5, - JSON_EXTRACT_ARRAY(contexts_test4_1) AS contexts_test4_1 + JSON_EXTRACT_ARRAY(contexts_test4_1) AS contexts_test4_1, + JSON_EXTRACT_ARRAY(contexts_test4_2) AS contexts_test4_2 from {{ ref('snowplow_norm_dummy_events') }} ) -- recreate repeated record field i.e. array of structs as is originally in BQ events table select - * except(unstruct_event_test_1_0_0,unstruct_event_test_1_0_1,unstruct_event_test2_1_0_0,unstruct_event_test2_1_0_1,contexts_test_1_0_0,contexts_test2_1_0_0,contexts_test2_1_0_1,contexts_test2_1_0_2,contexts_test2_1_0_3,contexts_test2_1_0_4,contexts_test2_1_0_5,contexts_test4_1), + * except(unstruct_event_test_1_0_0,unstruct_event_test_1_0_1,unstruct_event_test2_1_0_0,unstruct_event_test2_1_0_1,contexts_test_1_0_0,contexts_test2_1_0_0,contexts_test2_1_0_1,contexts_test2_1_0_2,contexts_test2_1_0_3,contexts_test2_1_0_4,contexts_test2_1_0_5,contexts_test4_1,contexts_test4_2), -- order is reversed to test the aliasing of the coalesced columns struct(JSON_EXTRACT_scalar(unstruct_event_test_1_0_0, '$.test_class') as test_class,JSON_EXTRACT_scalar(unstruct_event_test_1_0_0, '$.test_id') as test_id) as unstruct_event_test_1_0_0, struct(JSON_EXTRACT_scalar(unstruct_event_test_1_0_1, '$.test_class') as test_class,JSON_EXTRACT_scalar(unstruct_event_test_1_0_1, '$.test_id') as test_id) as unstruct_event_test_1_0_1, @@ -61,7 +62,11 @@ select array( select struct(JSON_EXTRACT_scalar(json_array,'$.context_test_class2') as context_test_class2, JSON_EXTRACT_scalar(json_array,'$.context_test_id2') as context_test_id2) from unnest(contexts_test4_1) as json_array - ) as contexts_test4_1 - + ) as contexts_test4_1, + array( + select struct(JSON_EXTRACT_scalar(json_array,'$.context_test_class2') as context_test_class2, JSON_EXTRACT_scalar(json_array,'$.context_test_id2') as context_test_id2) + from unnest(contexts_test4_2) as json_array + ) as contexts_test4_2 + from prep diff --git a/macros/normalize_events.sql b/macros/normalize_events.sql index 28a786d..fd53eab 100644 --- a/macros/normalize_events.sql +++ b/macros/normalize_events.sql @@ -67,18 +67,38 @@ where {% macro bigquery__normalize_events(event_names, flat_cols = [], sde_cols = [], sde_keys = [], sde_types = [], sde_aliases = [], context_cols = [], context_keys = [], context_types = [], context_aliases = [], remove_new_event_check = false) %} {# Handle both versioned and unversioned column names #} {%- set re = modules.re -%} - {%- set version_pattern = '_[0-9]+(_[0-9]+)?$' -%} + + {# + This regex pattern handles column versioning in Snowplow contexts and self-describing events. + It specifically targets three-part semantic versions (e.g., field_1_2_3) while preserving + one-part (field_1) and two-part (field_1_2) versions. + + Pattern breakdown: '(_\\d+)_\\d+_\\d+$' + - (_\\d+) : Capture group that matches an underscore followed by one or more digits + This captures the major version number (e.g., "_1" in "field_1_2_3") + - _\\d+ : Matches an underscore and one or more digits (minor version) + - _\\d+ : Matches an underscore and one or more digits (patch version) + - $ : Ensures the pattern only matches at the end of the string + + The replacement pattern '\\1' keeps only the captured major version. + + Examples: + - field_1 -> field_1 (no change - only has major version) + - field_1_2 -> field_1_2 (no change - has major and minor versions) + - field_1_2_3 -> field_1 (transforms - removes minor and patch versions) + #} + {%- set version_pattern = '(_\\d+)_\\d+_\\d+$' -%} {%- set sde_cols_clean = [] -%} {%- for col in sde_cols -%} {# Get the base name for combine_column_versions to work with #} - {%- set clean_name = re.sub(version_pattern, '', col) -%} + {%- set clean_name = re.sub(version_pattern, '\\1', col) -%} {% do sde_cols_clean.append(clean_name) -%} {%- endfor -%} {%- set context_cols_clean = [] -%} {%- for col in context_cols -%} - {%- set clean_name = re.sub(version_pattern, '', col) -%} + {%- set clean_name = re.sub(version_pattern, '\\1', col) -%} {% do context_cols_clean.append(clean_name) -%} {%- endfor -%} {# Replace keys with snake_case where needed #}