Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pe 6540 bq new loader #48

Merged
merged 10 commits into from
Dec 3, 2024
4 changes: 2 additions & 2 deletions integration_tests/data/source/snowplow_norm_dummy_events.csv
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
event_name,event_id,app_id,collector_tstamp,dvce_sent_tstamp,dvce_created_tstamp,unstruct_event_test_1_0_0,unstruct_event_test_1_0_1,unstruct_event_test2_1_0_0,unstruct_event_test2_1_0_1,contexts_test_1_0_0,contexts_test2_1_0_0,contexts_test2_1_0_1,contexts_test2_1_0_2,contexts_test2_1_0_3,contexts_test2_1_0_4,contexts_test2_1_0_5
'demo','demo','demo',2022-10-01 01:27:34,2022-10-01 01:27:34,2022-10-01 01:27:34,"{""test_id"":""demo"", ""test_class"":""demo""}","{""test_id"":""demo"", ""test_class"":""demo""}","{""test_word"":""demo"", ""test_idea"":""demo""}","{""test_word"":""demo"", ""test_idea"":""demo""}","[{""context_test_id"":""demo"", ""context_test_class"":""demo""}]","[{""context_test_id2"":""demo"", ""context_test_class2"":""demo""}]","[{""context_test_id2"":""demo"", ""context_test_class2"":""demo""}]","[{""context_test_id2"":""demo"", ""context_test_class2"":""demo""}]","[{""context_test_id2"":""demo"", ""context_test_class2"":""demo""}]","[{""context_test_id2"":""demo"", ""context_test_class2"":""demo""}]","[{""context_test_id2"":""demo"", ""context_test_class2"":""demo""}]"
event_name,event_id,app_id,collector_tstamp,dvce_sent_tstamp,dvce_created_tstamp,unstruct_event_test_1_0_0,unstruct_event_test_1_0_1,unstruct_event_test2_1_0_0,unstruct_event_test2_1_0_1,contexts_test_1_0_0,contexts_test2_1_0_0,contexts_test2_1_0_1,contexts_test2_1_0_2,contexts_test2_1_0_3,contexts_test2_1_0_4,contexts_test2_1_0_5,contexts_test4_1,contexts_test4_2
'demo','demo','demo',2022-10-01 01:27:34,2022-10-01 01:27:34,2022-10-01 01:27:34,"{""test_id"":""demo"", ""test_class"":""demo""}","{""test_id"":""demo"", ""test_class"":""demo""}","{""test_word"":""demo"", ""test_idea"":""demo""}","{""test_word"":""demo"", ""test_idea"":""demo""}","[{""context_test_id"":""demo"", ""context_test_class"":""demo""}]","[{""context_test_id2"":""demo"", ""context_test_class2"":""demo""}]","[{""context_test_id2"":""demo"", ""context_test_class2"":""demo""}]","[{""context_test_id2"":""demo"", ""context_test_class2"":""demo""}]","[{""context_test_id2"":""demo"", ""context_test_class2"":""demo""}]","[{""context_test_id2"":""demo"", ""context_test_class2"":""demo""}]","[{""context_test_id2"":""demo"", ""context_test_class2"":""demo""}]","[{""context_test_id2"":""demo"", ""context_test_class2"":""demo""}]","[{""context_test_id2"":""demo"", ""context_test_class2"":""demo""}]"
1 change: 1 addition & 0 deletions integration_tests/dbt_project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,4 @@ seeds:
contexts_test2_1_0_3: "{{ 'string' if target.type in ['bigquery', 'databricks', 'spark'] else 'varchar' }}"
contexts_test2_1_0_4: "{{ 'string' if target.type in ['bigquery', 'databricks', 'spark'] else 'varchar' }}"
contexts_test2_1_0_5: "{{ 'string' if target.type in ['bigquery', 'databricks', 'spark'] else 'varchar' }}"
contexts_test4_1: "{{ 'string' if target.type in ['bigquery', 'databricks', 'spark'] else 'varchar' }}"
5 changes: 3 additions & 2 deletions integration_tests/macros/test_normalize_events.sql
Original file line number Diff line number Diff line change
Expand Up @@ -31,27 +31,27 @@ It runs 9 tests:
{% macro bigquery__test_normalize_events() %}

{% set expected_dict = {
"flat_cols_only" : "select event_id , collector_tstamp -- Flat columns from event table , app_id -- self describing events columns from event table -- context column(s) from the event table from `"~target.project~"`."~target.dataset~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')",
"sde_plus_cols" : "select event_id , collector_tstamp -- Flat columns from event table , app_id -- self describing events columns from event table , coalesce(unstruct_event_test_1_0_1.test_class, unstruct_event_test_1_0_0.test_class) as test_class , coalesce(unstruct_event_test_1_0_1.test_id, unstruct_event_test_1_0_0.test_id) as test_id -- context column(s) from the event table from `"~target.project~"`."~target.dataset~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')",
"sde_plus_cols_w_alias" : "select event_id , collector_tstamp -- Flat columns from event table , app_id -- self describing events columns from event table , coalesce(unstruct_event_test_1_0_1.test_class, unstruct_event_test_1_0_0.test_class) as my_alias_test_class , coalesce(unstruct_event_test_1_0_1.test_id, unstruct_event_test_1_0_0.test_id) as my_alias_test_id -- context column(s) from the event table from `"~target.project~"`."~target.dataset~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')",
"sde_plus_1_context" : "select event_id , collector_tstamp -- Flat columns from event table , app_id -- self describing events columns from event table , coalesce(unstruct_event_test_1_0_1.test_class, unstruct_event_test_1_0_0.test_class) as test_class , coalesce(unstruct_event_test_1_0_1.test_id, unstruct_event_test_1_0_0.test_id) as test_id -- context column(s) from the event table , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_id) as context_test_id , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_class) as context_test_class from `"~target.project~"`."~target.dataset~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')",
"sde_plus_2_context" : "select event_id , collector_tstamp -- Flat columns from event table , app_id -- self describing events columns from event table , coalesce(unstruct_event_test_1_0_1.test_class, unstruct_event_test_1_0_0.test_class) as test_class , coalesce(unstruct_event_test_1_0_1.test_id, unstruct_event_test_1_0_0.test_id) as test_id -- context column(s) from the event table , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_id) as context_test_id , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_class) as context_test_class , coalesce(contexts_test2_1_0_5[safe_offset(0)].context_test_class2, contexts_test2_1_0_4[safe_offset(0)].context_test_class2, contexts_test2_1_0_3[safe_offset(0)].context_test_class2, contexts_test2_1_0_2[safe_offset(0)].context_test_class2, contexts_test2_1_0_1[safe_offset(0)].context_test_class2, contexts_test2_1_0_0[safe_offset(0)].context_test_class2) as context_test_class2 , coalesce(contexts_test2_1_0_5[safe_offset(0)].context_test_id2, contexts_test2_1_0_4[safe_offset(0)].context_test_id2, contexts_test2_1_0_3[safe_offset(0)].context_test_id2, contexts_test2_1_0_2[safe_offset(0)].context_test_id2, contexts_test2_1_0_1[safe_offset(0)].context_test_id2, contexts_test2_1_0_0[safe_offset(0)].context_test_id2) as context_test_id2 from `"~target.project~"`."~target.dataset~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')",
"sde_plus_2_context_w_alias" : "select event_id , collector_tstamp -- Flat columns from event table , app_id -- self describing events columns from event table , coalesce(unstruct_event_test_1_0_1.test_class, unstruct_event_test_1_0_0.test_class) as test_class , coalesce(unstruct_event_test_1_0_1.test_id, unstruct_event_test_1_0_0.test_id) as test_id -- context column(s) from the event table , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_id) as test1_context_test_id , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_class) as test1_context_test_class , coalesce(contexts_test2_1_0_5[safe_offset(0)].context_test_class2, contexts_test2_1_0_4[safe_offset(0)].context_test_class2, contexts_test2_1_0_3[safe_offset(0)].context_test_class2, contexts_test2_1_0_2[safe_offset(0)].context_test_class2, contexts_test2_1_0_1[safe_offset(0)].context_test_class2, contexts_test2_1_0_0[safe_offset(0)].context_test_class2) as test2_context_test_class2 , coalesce(contexts_test2_1_0_5[safe_offset(0)].context_test_id2, contexts_test2_1_0_4[safe_offset(0)].context_test_id2, contexts_test2_1_0_3[safe_offset(0)].context_test_id2, contexts_test2_1_0_2[safe_offset(0)].context_test_id2, contexts_test2_1_0_1[safe_offset(0)].context_test_id2, contexts_test2_1_0_0[safe_offset(0)].context_test_id2) as test2_context_test_id2 from `"~target.project~"`."~target.dataset~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')",
"context_only" : "select event_id , collector_tstamp -- Flat columns from event table , app_id -- self describing events columns from event table -- context column(s) from the event table , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_id) as context_test_id , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_class) as context_test_class , coalesce(contexts_test2_1_0_5[safe_offset(0)].context_test_class2, contexts_test2_1_0_4[safe_offset(0)].context_test_class2, contexts_test2_1_0_3[safe_offset(0)].context_test_class2, contexts_test2_1_0_2[safe_offset(0)].context_test_class2, contexts_test2_1_0_1[safe_offset(0)].context_test_class2, contexts_test2_1_0_0[safe_offset(0)].context_test_class2) as context_test_class2 , coalesce(contexts_test2_1_0_5[safe_offset(0)].context_test_id2, contexts_test2_1_0_4[safe_offset(0)].context_test_id2, contexts_test2_1_0_3[safe_offset(0)].context_test_id2, contexts_test2_1_0_2[safe_offset(0)].context_test_id2, contexts_test2_1_0_1[safe_offset(0)].context_test_id2, contexts_test2_1_0_0[safe_offset(0)].context_test_id2) as context_test_id2 from `"~target.project~"`."~target.dataset~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')",
"context_only_new_loader" : "select event_id , collector_tstamp -- Flat columns from event table , app_id -- self describing events columns from event table -- context column(s) from the event table , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_id) as context_test_id , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_class) as context_test_class , coalesce(contexts_test4_2[safe_offset(0)].context_test_class2) as context_test_class2 , coalesce(contexts_test4_2[safe_offset(0)].context_test_id2) as context_test_id2 from `"~target.project~"`."~target.dataset~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')",
"multiple_base_events" : "select event_id , collector_tstamp -- Flat columns from event table , app_id -- self describing events columns from event table -- context column(s) from the event table , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_id) as context_test_id , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_class) as context_test_class , coalesce(contexts_test2_1_0_5[safe_offset(0)].context_test_class2, contexts_test2_1_0_4[safe_offset(0)].context_test_class2, contexts_test2_1_0_3[safe_offset(0)].context_test_class2, contexts_test2_1_0_2[safe_offset(0)].context_test_class2, contexts_test2_1_0_1[safe_offset(0)].context_test_class2, contexts_test2_1_0_0[safe_offset(0)].context_test_class2) as context_test_class2 , coalesce(contexts_test2_1_0_5[safe_offset(0)].context_test_id2, contexts_test2_1_0_4[safe_offset(0)].context_test_id2, contexts_test2_1_0_3[safe_offset(0)].context_test_id2, contexts_test2_1_0_2[safe_offset(0)].context_test_id2, contexts_test2_1_0_1[safe_offset(0)].context_test_id2, contexts_test2_1_0_0[safe_offset(0)].context_test_id2) as context_test_id2 from `"~target.project~"`."~target.dataset~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name','page_ping')",
"multiple_sde_events" : "select event_id , collector_tstamp -- Flat columns from event table , app_id -- self describing events columns from event table , coalesce(unstruct_event_test_1_0_1.test_class, unstruct_event_test_1_0_0.test_class) as test1_test_class , coalesce(unstruct_event_test_1_0_1.test_id, unstruct_event_test_1_0_0.test_id) as test1_test_id , coalesce(unstruct_event_test2_1_0_1.test_word, unstruct_event_test2_1_0_0.test_word) as test2_test_word , coalesce(unstruct_event_test2_1_0_1.test_idea, unstruct_event_test2_1_0_0.test_idea) as test2_test_idea -- context column(s) from the event table , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_id) as context_test_id , coalesce(contexts_test_1_0_0[safe_offset(0)].context_test_class) as context_test_class , coalesce(contexts_test2_1_0_5[safe_offset(0)].context_test_class2, contexts_test2_1_0_4[safe_offset(0)].context_test_class2, contexts_test2_1_0_3[safe_offset(0)].context_test_class2, contexts_test2_1_0_2[safe_offset(0)].context_test_class2, contexts_test2_1_0_1[safe_offset(0)].context_test_class2, contexts_test2_1_0_0[safe_offset(0)].context_test_class2) as context_test_class2 , coalesce(contexts_test2_1_0_5[safe_offset(0)].context_test_id2, contexts_test2_1_0_4[safe_offset(0)].context_test_id2, contexts_test2_1_0_3[safe_offset(0)].context_test_id2, contexts_test2_1_0_2[safe_offset(0)].context_test_id2, contexts_test2_1_0_1[safe_offset(0)].context_test_id2, contexts_test2_1_0_0[safe_offset(0)].context_test_id2) as context_test_id2 from `"~target.project~"`."~target.dataset~"_scratch.snowplow_normalize_base_events_this_run where event_name in ('event_name')"
} %}



{% set results_dict ={
"flat_cols_only" : snowplow_normalize.normalize_events(['event_name'], ['app_id'], [], [], [], [], [], [], [], [], true).split()|join(' '),
"sde_plus_cols" : snowplow_normalize.normalize_events(['event_name'], ['app_id'], ['UNSTRUCT_EVENT_TEST_1_0_1'], [['testId', 'testClass']], [['string', 'boolean']], [], [], [], [], [], true).split()|join(' '),
"sde_plus_cols_w_alias" : snowplow_normalize.normalize_events(['event_name'], ['app_id'], ['UNSTRUCT_EVENT_TEST_1_0_1'], [['testId', 'testClass']], [['string', 'boolean']], ['my_alias'], [], [], [], [], true).split()|join(' '),
"sde_plus_1_context" : snowplow_normalize.normalize_events(['event_name'], ['app_id'], ['UNSTRUCT_EVENT_TEST_1_0_1'], [['testId', 'testClass']], [['string', 'boolean']], [], ['CONTEXTS_TEST_1_0_0'], [['contextTestId', 'contextTestClass']], [['string', 'integer']], [], true).split()|join(' '),
"sde_plus_2_context" : snowplow_normalize.normalize_events(['event_name'], ['app_id'], ['UNSTRUCT_EVENT_TEST_1_0_1'], [['testId', 'testClass']], [['string', 'boolean']], [], ['CONTEXTS_TEST_1_0_0', 'CONTEXTS_TEST2_1_0_5'], [['contextTestId', 'contextTestClass'], ['contextTestId2', 'contextTestClass2']], [['boolean', 'string'], ['interger', 'string']], [], true).split()|join(' '),
"sde_plus_2_context_w_alias" : snowplow_normalize.normalize_events(['event_name'], ['app_id'], ['UNSTRUCT_EVENT_TEST_1_0_1'], [['testId', 'testClass']], [['string', 'boolean']], [], ['CONTEXTS_TEST_1_0_0', 'CONTEXTS_TEST2_1_0_5'], [['contextTestId', 'contextTestClass'],['contextTestId2', 'contextTestClass2'] ], [['boolean', 'string'], ['interger', 'string']], ['test1', 'test2'], true).split()|join(' '),
"context_only" : snowplow_normalize.normalize_events(['event_name'], ['app_id'], [], [], [], [], ['CONTEXTS_TEST_1_0_0', 'CONTEXTS_TEST2_1_0_5'], [['contextTestId', 'contextTestClass'],['contextTestId2', 'contextTestClass2'] ], [['boolean', 'string'], ['interger', 'string']], [], true).split()|join(' '),
"context_only_new_loader" : snowplow_normalize.normalize_events(['event_name'], ['app_id'], [], [], [], [], ['CONTEXTS_TEST_1_0_0', 'CONTEXTS_TEST4_2'], [['contextTestId', 'contextTestClass'],['contextTestId2', 'contextTestClass2'] ], [['boolean', 'string'], ['interger', 'string']], [], true).split()|join(' '),
"multiple_base_events" : snowplow_normalize.normalize_events(['event_name', 'page_ping'], ['app_id'], [], [], [], [], ['CONTEXTS_TEST_1_0_0', 'CONTEXTS_TEST2_1_0_5'], [['contextTestId', 'contextTestClass'],['contextTestId2', 'contextTestClass2'] ], [['boolean', 'string'], ['interger', 'string']], [], true).split()|join(' '),
"multiple_sde_events" : snowplow_normalize.normalize_events(['event_name'], ['app_id'], ['UNSTRUCT_EVENT_TEST_1_0_1', 'UNSTRUCT_EVENT_TEST2_1_0_1'], [['testId', 'testClass'], ['testWord', 'testIdea']], [['number', 'string']], ['test1', 'test2'], ['CONTEXTS_TEST_1_0_0', 'CONTEXTS_TEST2_1_0_5'], [['contextTestId', 'contextTestClass'],['contextTestId2', 'contextTestClass2'] ], [['boolean', 'string'], ['interger', 'string']], [], true).split()|join(' ')
}
Expand All @@ -65,6 +65,7 @@ It runs 9 tests:
{# {{ print(results_dict['sde_plus_2_context'])}} #}
{# {{ print(results_dict['sde_plus_2_context_w_alias'])}} #}
{# {{ print(results_dict['context_only'])}} #}
{# {{ print(results_dict['context_only_new_loader'])}} #}
{# {{ print(results_dict['multiple_base_events'])}} #}
{# {{ print(results_dict['multiple_sde_events'])}} #}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,23 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0
with prep as (
select
*
except(contexts_test_1_0_0,contexts_test2_1_0_0,contexts_test2_1_0_1,contexts_test2_1_0_2,contexts_test2_1_0_3,contexts_test2_1_0_4,contexts_test2_1_0_5),
except(contexts_test_1_0_0,contexts_test2_1_0_0,contexts_test2_1_0_1,contexts_test2_1_0_2,contexts_test2_1_0_3,contexts_test2_1_0_4,contexts_test2_1_0_5, contexts_test4_1,contexts_test4_2),
JSON_EXTRACT_ARRAY(contexts_test_1_0_0) AS contexts_test_1_0_0,
JSON_EXTRACT_ARRAY(contexts_test2_1_0_0) AS contexts_test2_1_0_0,
JSON_EXTRACT_ARRAY(contexts_test2_1_0_1) AS contexts_test2_1_0_1,
JSON_EXTRACT_ARRAY(contexts_test2_1_0_2) AS contexts_test2_1_0_2,
JSON_EXTRACT_ARRAY(contexts_test2_1_0_3) AS contexts_test2_1_0_3,
JSON_EXTRACT_ARRAY(contexts_test2_1_0_4) AS contexts_test2_1_0_4,
JSON_EXTRACT_ARRAY(contexts_test2_1_0_5) AS contexts_test2_1_0_5
JSON_EXTRACT_ARRAY(contexts_test2_1_0_5) AS contexts_test2_1_0_5,
JSON_EXTRACT_ARRAY(contexts_test4_1) AS contexts_test4_1,
JSON_EXTRACT_ARRAY(contexts_test4_2) AS contexts_test4_2

from {{ ref('snowplow_norm_dummy_events') }}
)

-- recreate repeated record field i.e. array of structs as is originally in BQ events table
select
* except(unstruct_event_test_1_0_0,unstruct_event_test_1_0_1,unstruct_event_test2_1_0_0,unstruct_event_test2_1_0_1,contexts_test_1_0_0,contexts_test2_1_0_0,contexts_test2_1_0_1,contexts_test2_1_0_2,contexts_test2_1_0_3,contexts_test2_1_0_4,contexts_test2_1_0_5),
* except(unstruct_event_test_1_0_0,unstruct_event_test_1_0_1,unstruct_event_test2_1_0_0,unstruct_event_test2_1_0_1,contexts_test_1_0_0,contexts_test2_1_0_0,contexts_test2_1_0_1,contexts_test2_1_0_2,contexts_test2_1_0_3,contexts_test2_1_0_4,contexts_test2_1_0_5,contexts_test4_1,contexts_test4_2),
-- order is reversed to test the aliasing of the coalesced columns
struct(JSON_EXTRACT_scalar(unstruct_event_test_1_0_0, '$.test_class') as test_class,JSON_EXTRACT_scalar(unstruct_event_test_1_0_0, '$.test_id') as test_id) as unstruct_event_test_1_0_0,
struct(JSON_EXTRACT_scalar(unstruct_event_test_1_0_1, '$.test_class') as test_class,JSON_EXTRACT_scalar(unstruct_event_test_1_0_1, '$.test_id') as test_id) as unstruct_event_test_1_0_1,
Expand Down Expand Up @@ -56,7 +58,15 @@ select
array(
select struct(JSON_EXTRACT_scalar(json_array,'$.context_test_class2') as context_test_class2, JSON_EXTRACT_scalar(json_array,'$.context_test_id2') as context_test_id2)
from unnest(contexts_test2_1_0_5) as json_array
) as contexts_test2_1_0_5

) as contexts_test2_1_0_5,
array(
select struct(JSON_EXTRACT_scalar(json_array,'$.context_test_class2') as context_test_class2, JSON_EXTRACT_scalar(json_array,'$.context_test_id2') as context_test_id2)
from unnest(contexts_test4_1) as json_array
) as contexts_test4_1,

array(
select struct(JSON_EXTRACT_scalar(json_array,'$.context_test_class2') as context_test_class2, JSON_EXTRACT_scalar(json_array,'$.context_test_id2') as context_test_id2)
from unnest(contexts_test4_2) as json_array
) as contexts_test4_2

from prep
Loading