Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added Type-II Dimensional Models for the Soroban Contract tables #110

Open
wants to merge 4 commits into
base: master
Choose a base branch
from

Conversation

harsha-stellar-data
Copy link
Contributor

PR Checklist

PR Structure

  • [] This PR has reasonably narrow scope (if not, break it down into smaller PRs).
  • [] This PR avoids mixing refactoring changes with feature changes (split into two PRs otherwise).
  • This PR's title starts with the jira ticket associated with the PR.

Thoroughness

  • [] This PR adds tests for the most critical parts of the new functionality or fixes.
  • [] I've updated the docs and README with the added features, breaking changes, new instructions on how to use the repository.

Release planning

  • [] I've decided if this PR requires a new major/minor/patch version accordingly to
    semver, and I've changed the name of the BRANCH to major/* , minor/* or patch/* .

What

This PR introduces several models related to the contract dimensions, implementing Slowly Changing Dimension (SCD) Type 2 logic to track historical changes in contract data. The key type 2 tables include:

  • dim_contract_data_hist:
  • dim_contract_code_hist:
  • dim_ttl_hist:

Why

These changes are being made to enhance the tracking and management of contract-related data within the project. Implementing SCD Type 2 logic ensures that we maintain accurate historical records while accommodating updates and changes seamlessly.

Known limitations

N/A

@harsha-stellar-data harsha-stellar-data requested a review from a team as a code owner October 31, 2024 06:18
@harsha-stellar-data harsha-stellar-data force-pushed the feature/518-Contract-Dimension-Models branch from fbf611d to 529e586 Compare November 1, 2024 16:17
materialized = 'table',
unique_key = ['ledger_key_hash', 'closed_at'],
cluster_by = ["ledger_key_hash", "closed_at", "row_hash"],
tags = ["soroban_analytics", "intermediate", "daily"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are the intermediate and daily tags for?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added these tags to segregate Models by processing layers(intermediate) and frequency as we have airflow jobs scheduled at varying frequency. Also, since all these Type 2 Models will go into their own DAG, added additional tags to control run behavior in case of issues with Type 2 data

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If that's the case I think the names need to be more descriptive. intermediate and daily without any qualifiers would be hard to understand from a user perspective

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure. we can edit them as needed but my intent was i have 3 Models in this set that are intermediate.
If I ran to run them all i can do something like
dbt run --select tag:soroban_analytics tag:intermediate

*/

-- Set the execution date (use Airflow value, fallback to dbt default if absent)
{% set execution_date = var('execution_date', dbt_airflow_macros.ts()) %}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can just use the dbt_ariflow_macros instead of making a new var

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Added the variable to make the Filter logic look cleaner.
  • Also, by passing a manual date to execution_date via an airflow variable, we can implement point in time recovery etc which will be added as enhancements in the future.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In every dbt run in airflow as it stands right now already sets the var execution_date. dbt_airflow_macros already uses the execution_date variable plus offers date/timestamp casting built in so it's better to use built-ins instead of defining your own variables

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure. I will look at the Macro. If it does allow the flexibility to manually pass a date of my choice if and when needed instead of setting it to current timestamp always, will use it.

Comment on lines +73 to +74
-- Process only completed days up to execution_date
date(closed_at) < date('{{ execution_date }}')
Copy link
Contributor

@chowbao chowbao Nov 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the reason to query beginning of time to execution date? Doesn't it make more sense to make this incremental?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Originally had an Incremental Layer with Incremental Materialization and then a Truncate & Reload Type Transformation Layer. But had to combine them into one Model to prevent "Rejoining of Upstream Concepts" issue

  • After combining them, if I make this Model incremental, Intra Dupes can come in as Incremental Model will only look at newer versions of data based of closed_at.
    Ex: If i have a ledger: 123 and $100 with closed_date : 10/28 loaded today and another record comes through with the same data on 10/29 , Incremental Model allows this Dupe to get in and the De-Duplication logic i have in here to prevent consecutive row dupes will not detect that.

  • Considering all this and the volumes, when I merged the two Models, made this model and full truncate and reload every day.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think my final comment applies to this question

I think the main issue is that the current models do full table scans on every run int_* is essentially a full_refresh on every run --> dim_* are incremental. I think the int_* logic should also be incremental

There should be a way to process scd2 tables without doing a full table scan on stg_* data

Copy link
Contributor Author

@harsha-stellar-data harsha-stellar-data Nov 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when you say process, does it mean just reprocessing existing data in the table cause some bad date came in and disrupted the chaining? If yes, I am working on a stored proc for now to fix data within the table. I didn't include such things for MVP.

But the long term plan though is to have a framework covering all these scenarios -

  • Initial Load
  • Daily Incremental Run - Insert (Future Dated Transaction)
  • Daily Incremental Run - No Change (Future Dated Transaction)
  • Daily Incremental Run - Update/NoChange (Future Dated Transaction as well as past dated transaction)
  • Multi-Day Catchup (All Future Dated Transactions with or without change in data)
  • Single-Day Recovery (past dated transaction with or without data change)
  • Multi-Day Recovery (past dated transactions with or without data change)
  • Full Recovery (Full Wipe)
  • Late Arriving records

Comment on lines +83 to +90
, derived_data as (
select
ledger_key_hash
, min(case when ledger_entry_change = 0 then closed_at end) as contract_create_ts -- Calculate contract created timestamp
-- Calculate contract deletion timestamp
, max(case when ledger_entry_change = 2 and deleted = true then closed_at end) as contract_delete_ts
from source_data
group by ledger_key_hash
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the derived data can be moved to it's own incremental table instead of regenerating the whole table every run

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Thought about this too but since there are only couple fields that needed tracking, kept it in here.

  • This CTE is also the very reason why I had to combine an Incremental Pull Model and Transformation Model into one.

  • We can definitely discuss the pros/cons of maintaining a table of its own

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if this is the reason for removing the incremental mode doesn't that warrant moving this to it's own table so everything can be incremental?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Depends on how many models we want to build for these i guess.
If we create another Model/table for maintaining this information, there will 4 models

  • Incremental Model to bring in Staging data
  • Incremental Model to calculate the contract created/delete ts
  • Full truncate and Reload Model to perform Transformations
  • SCD2 Model

vs (current design)

  • Truncate and Reload Model that does take care of things Transformation Model (step 3 above)
  • SCD2 Model

Comment on lines +117 to +131
, sha256(concat(
coalesce(f.ledger_key_hash, '')
, coalesce(cast(f.n_instructions as string), '')
, coalesce(cast(f.n_functions as string), '')
, coalesce(cast(f.n_globals as string), '')
, coalesce(cast(f.n_table_entries as string), '')
, coalesce(cast(f.n_types as string), '')
, coalesce(cast(f.n_data_segments as string), '')
, coalesce(cast(f.n_elem_segments as string), '')
, coalesce(cast(f.n_imports as string), '')
, coalesce(cast(f.n_exports as string), '')
, coalesce(cast(f.n_data_segment_bytes as string), '')
, coalesce(cast(d.contract_create_ts as string), '')
, coalesce(cast(d.contract_delete_ts as string), '')
)) as row_hash
Copy link
Contributor

@chowbao chowbao Nov 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is row_hash needed for unique_id? Isn't ledger_key_hash effectively the unique_id for an entry and then ledger_key_hash + closed_at the unique row?

Edit: I see you use the row_hash to determine a diff

-- Rank records to identify the latest entry
, row_number() over (
partition by ledger_key_hash, cast(closed_at as date)
order by closed_at desc, ledger_sequence desc
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I don't think you need both closed_at and ledger_sequence. They should both provide the same ordering

Copy link
Contributor Author

@harsha-stellar-data harsha-stellar-data Nov 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. I didn't notice any such data with same closed_at but kept ledger sequence as a fail safe in case anything weird happens.

}}

/*
Model: dim_contract_code_hist
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: name scd2 tables with _scd2 instead of _hist

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Opted for _hist to make this follow _current naming format we already have. I am open to suggestions/feedback here

, row_hash
from {{ ref('int_transform_contract_code') }}
{% if is_incremental() %}
where closed_at > (select coalesce(max(closed_at), '2000-01-01T00:00:00+00:00') from {{ this }})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I don't think the coalesce is necessary. On the first run of the model is_incremental() will be false anyways and on subsequent runs the table will have data

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • This will help if we ever perform a Manual Truncate and don't want to modify anything in the airflow vars to trigger a Full refresh.

Copy link
Contributor

@chowbao chowbao Nov 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When would you do a manual truncate without just running --full_refresh?

Edit: I think it does create the empty table with the correct schema

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we ever want to do a clean up by Truncating the table i guess.

For full refresh, we have to make changes to airflow vars as its stands (my understanding based on existing setup?)
If we allow this code, we can simulate a refresh (assuming no schema changes) without having to do anything in airflow.

I am open to either options I guess as a Truncate table is also a manual step, a more straight forward thing compared to airflow changes. We can discuss in office hours

, CAST(NULL AS TIMESTAMP) AS dw_load_ts
, CAST(NULL AS TIMESTAMP) AS dw_update_ts
from source_data
where 1 = 0
Copy link
Contributor

@chowbao chowbao Nov 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this always evaluate to false? I guess if you're making a empty table you don't even need the from source_data in the statement

Also doesn't this add a null row into the final table?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • You are right that this always evaluates to false. That is intention and will always produce as zero-row result set with schema.

  • It doesn't insert any Null rows and will always produce zero rows

  • Primary purpose is to allow the same code downstream to work during Initial Load/ Full Refresh as well as Incremental Load. Without this, we have to write customized code to execute with is_incremental() blocks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does

select * from {{ this }}

evaluate as an empty table with the correct schema on a full_refresh?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried using that but during first run or whenever we use full-refresh, its dropping the table and doesn't have any reference of what {{ this }} means and is failing.

Comment on lines +155 to +163
, case
when cdc.change_type = 'Insert' then 'INSERT_NEW_KEY'
when cdc.change_type = 'Update' then 'START_NEW_VERSION'
when
cdc.change_type = 'NoChange'
and lead(cdc.start_date) over (partition by cdc.ledger_key_hash order by cdc.start_date) is not null
then 'END_CURRENT_VERSION'
else 'KEEP_CURRENT'
end as operation_type
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need to create/track the change_type? It looks like any change is processed the same way

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Can you please elaborate what you mean. I didn't get it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You name mark rows with change_type and operation_type but never use this downstream. At the end of the scd2 table you don't include change_type nor operation_type so is there a point in identifying a change with such detail if all you need is a boolean is_changed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understood. Agree that everything is either an Insert/Upsert but having the rows explicitly marked help identify things during debugging/testing.

Comment on lines +167 to +193
-- Final data processing with all transformations
, final_data as (
select
dc.ledger_key_hash
, coalesce(dc.source_contract_id, dc.target_contract_id) as contract_id
, coalesce(dc.source_contract_durability, dc.target_contract_durability) as contract_durability
, coalesce(dc.source_asset_code, dc.target_asset_code) as asset_code
, coalesce(dc.source_asset_issuer, dc.target_asset_issuer) as asset_issuer
, coalesce(dc.source_asset_type, dc.target_asset_type) as asset_type
, coalesce(dc.source_balance, dc.target_balance) as balance
, coalesce(dc.source_balance_holder, dc.target_balance_holder) as balance_holder
, coalesce(dc.source_contract_create_ts, dc.target_contract_create_ts) as contract_create_ts
, coalesce(dc.source_contract_delete_ts, dc.target_contract_delete_ts) as contract_delete_ts
, coalesce(dc.source_closed_at, dc.target_closed_at) as closed_at
, dc.start_date
, coalesce(date_sub(dc.next_start_date, interval 1 day), date('9999-12-31')) as end_date
, coalesce(row_number() over (partition by dc.ledger_key_hash order by dc.start_date desc) = 1, false) as is_current
, coalesce(dc.source_airflow_start_ts, dc.target_airflow_start_ts) as airflow_start_ts
, coalesce(dc.source_batch_id, dc.target_batch_id) as batch_id
, coalesce(dc.source_batch_run_date, dc.target_batch_run_date) as batch_run_date
, coalesce(dc.source_row_hash, dc.target_row_hash) as row_hash
, coalesce(dc.target_dw_load_ts, current_timestamp()) as dw_load_ts
, current_timestamp() as dw_update_ts
, dc.operation_type
from date_chained as dc
where dc.operation_type in ('INSERT_NEW_KEY', 'START_NEW_VERSION', 'END_CURRENT_VERSION')
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a need to actually do the coalescing between the source and target? I would say that any time there is a new ledger_key_hash + closed_at that is newer should be inserted directly as is even if the only diff is the closed_at column

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Since we are not tracking all fields/json changes, a change in closed_at doesn't always necessarily result in a new row. Only a change in the fields considered for CDC (row_hash calculation) can trigger a change. Closed_at is not part of row_hash calculation

  • This block necessarily combines fetching the target record (for closing previous version) as well as fetching the source records (New Inserts or Updates to existing keys) into one CTE.

Copy link
Contributor

@chowbao chowbao Nov 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would argue that closed_at should trigger a new scd2 row even if other columns don't change. Because there is a different closed_at means that a ledger_key_hash had a new ledger entry. If we don't include that change we wouldn't be able to use the scd table to find all the instances of a ledger_key_hash entry in time

Plus this simplifies the logic. You no longer need to compare if any field is different you just insert the new row and mark it as current and mark the previous row of the same ledger_key_hash as not current

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure. In the current state, I didn't find any use because of the fields we are using for CDC tracking and tracking every record will look like a duplicate in the SCD2 table if the CDC fields doesn't change. If we properly track the actual change in the json fields as well, then I totally agree with the suggestion. As it stands, it is going to look like a temporal chain of records that get closed and opened without any materialistic changes in the data.

I will table this for a deeper discussion during office hours as well

@chowbao
Copy link
Contributor

chowbao commented Nov 1, 2024

I didn't review contract_data nor ttl but I think the same comments from the contract_code would apply. I think the main issue is that the current models do full table scans on every run int_* is essentially a full_refresh on every run --> dim_* are incremental. I think the int_* logic should also be incremental

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants