From 363fea1bc2b05b2def269edf4a0b2417a3801ce8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mayra=20Pe=C3=B1a?= Date: Tue, 12 Nov 2024 10:51:22 -0800 Subject: [PATCH 01/19] Testing secret dag --- orchestrate/dags/tester.py | 39 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) create mode 100644 orchestrate/dags/tester.py diff --git a/orchestrate/dags/tester.py b/orchestrate/dags/tester.py new file mode 100644 index 0000000..be776d6 --- /dev/null +++ b/orchestrate/dags/tester.py @@ -0,0 +1,39 @@ +import datetime +from airflow.decorators import dag, task +from operators.datacoves.dbt import DatacovesDbtOperator + +@dag( + default_args={ + "start_date": datetime.datetime(2023, 1, 1, 0, 0), + "owner": "Noel Gomez", + "email": "gomezn@example.com", + "email_on_failure": True, + }, + description="Sample DAG for dbt build", + schedule_interval="0 0 1 */12 *", + tags=["version_2"], + catchup=False, +) +def sample_dag(): + + @task + def get_variable(): + from airflow.models import Variable + # Fetch the variable from Airflow's Variables + my_var = Variable.get("datacoves_mayras_secret") + return my_var # Return the value for downstream tasks + + fetched_variable = get_variable() + + # Task to run dbt using the DatacovesDbtOperator and pass the fetched variable + @task + def run_dbt_task(fetched_variable: str): + # Use the fetched variable in the dbt command + DatacovesDbtOperator( + task_id="run_dbt", + bash_command=f"dbt run -s personal_loans --vars '{{my_var: \"{fetched_variable}\"}}'" + ) + + run_dbt_task(fetched_variable) + +dag = sample_dag() From a45143bd20219503bdc1cba708784c67e23922d2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mayra=20Pe=C3=B1a?= Date: Tue, 12 Nov 2024 10:54:27 -0800 Subject: [PATCH 02/19] change name --- orchestrate/dags/tester.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/orchestrate/dags/tester.py b/orchestrate/dags/tester.py index be776d6..77836fb 100644 --- a/orchestrate/dags/tester.py +++ b/orchestrate/dags/tester.py @@ -14,7 +14,7 @@ tags=["version_2"], catchup=False, ) -def sample_dag(): +def tester_dag(): @task def get_variable(): @@ -36,4 +36,4 @@ def run_dbt_task(fetched_variable: str): run_dbt_task(fetched_variable) -dag = sample_dag() +dag = tester_dag() From 9e7868f7aec9955b5171ac07435aab6bfc2cb75c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mayra=20Pe=C3=B1a?= Date: Tue, 12 Nov 2024 11:18:10 -0800 Subject: [PATCH 03/19] Add Aws DAG --- orchestrate/dags/aws_dag.py | 46 +++++++++++++++++++++++++++++++++++++ orchestrate/dags/tester.py | 3 +-- 2 files changed, 47 insertions(+), 2 deletions(-) create mode 100644 orchestrate/dags/aws_dag.py diff --git a/orchestrate/dags/aws_dag.py b/orchestrate/dags/aws_dag.py new file mode 100644 index 0000000..af1c111 --- /dev/null +++ b/orchestrate/dags/aws_dag.py @@ -0,0 +1,46 @@ +import datetime +from airflow.decorators import dag, task +from operators.datacoves.dbt import DatacovesDbtOperator + +@dag( + default_args={ + "start_date": datetime.datetime(2023, 1, 1, 0, 0), + "owner": "Noel Gomez", + "email": "gomezn@example.com", + "email_on_failure": True, + }, + description="Sample DAG for dbt build", + schedule_interval="0 0 1 */12 *", + tags=["version_2"], + catchup=False, +) +def aws_dag(): + + @task + def get_aws_variable(): + from airflow.models import Variable + # Fetches the variable, potentially making an AWS Secrets Manager API call + aws_var = Variable.get("aws_mayras_secret") + return aws_var + + @task + def get_datacoves_variable(): + from airflow.models import Variable + # Fetches the variable without an AWS Secrets Manager API call + datacoves_var = Variable.get("datacoves_mayras_secret") + return local_var + + aws_variable = get_aws_variable() + my_variable = get_local_variable() + + # Task to run dbt using the DatacovesDbtOperator and pass the variables + @task + def run_dbt_task(aws_variable: str, my_variable: str): + # Use the fetched variables in the dbt command + DatacovesDbtOperator( + task_id="run_dbt", + bash_command=f"dbt run -s personal_loans --vars '{{my_aws_variable: \"{aws_variable}\", my_variable: \"{my_variable}\"}}'" + ) + + +dag = aws_dag() diff --git a/orchestrate/dags/tester.py b/orchestrate/dags/tester.py index 77836fb..05bd887 100644 --- a/orchestrate/dags/tester.py +++ b/orchestrate/dags/tester.py @@ -11,7 +11,7 @@ }, description="Sample DAG for dbt build", schedule_interval="0 0 1 */12 *", - tags=["version_2"], + tags=["version_3"], catchup=False, ) def tester_dag(): @@ -34,6 +34,5 @@ def run_dbt_task(fetched_variable: str): bash_command=f"dbt run -s personal_loans --vars '{{my_var: \"{fetched_variable}\"}}'" ) - run_dbt_task(fetched_variable) dag = tester_dag() From 3e3a162eb5684ad825d7f02b3b119496c0472cc4 Mon Sep 17 00:00:00 2001 From: Noel Gomez Date: Tue, 12 Nov 2024 16:22:40 -0800 Subject: [PATCH 04/19] add personal loans --- .../models/L1_staging/loans/_loans_noel.yml | 8 +++ .../L1_staging/loans/stg_noel_noel_source.sql | 71 +++++++++++++++++++ .../L1_staging/loans/stg_noel_noel_source.yml | 61 ++++++++++++++++ 3 files changed, 140 insertions(+) create mode 100644 transform/models/L1_staging/loans/_loans_noel.yml create mode 100644 transform/models/L1_staging/loans/stg_noel_noel_source.sql create mode 100644 transform/models/L1_staging/loans/stg_noel_noel_source.yml diff --git a/transform/models/L1_staging/loans/_loans_noel.yml b/transform/models/L1_staging/loans/_loans_noel.yml new file mode 100644 index 0000000..a7e4726 --- /dev/null +++ b/transform/models/L1_staging/loans/_loans_noel.yml @@ -0,0 +1,8 @@ +version: 2 + +sources: + - name: NOEL + database: RAW + tables: + - name: NOEL_SOURCE + description: '' diff --git a/transform/models/L1_staging/loans/stg_noel_noel_source.sql b/transform/models/L1_staging/loans/stg_noel_noel_source.sql new file mode 100644 index 0000000..6cf5b66 --- /dev/null +++ b/transform/models/L1_staging/loans/stg_noel_noel_source.sql @@ -0,0 +1,71 @@ +with raw_source as ( + + select * + from {{ source('NOEL', 'NOEL_SOURCE') }} + +), + +final as ( + + select + "_AIRBYTE_RAW_ID"::varchar as airbyte_raw_id, + "_AIRBYTE_EXTRACTED_AT"::timestamp_tz as airbyte_extracted_at, + "_AIRBYTE_META"::variant as airbyte_meta, + "TOTAL_ACC"::float as total_acc, + "ANNUAL_INC"::float as annual_inc, + "EMP_LENGTH"::varchar as emp_length, + "DESC"::varchar as desc, + "TOTAL_PYMNT"::float as total_pymnt, + "LAST_PYMNT_D"::varchar as last_pymnt_d, + "ADDR_STATE"::varchar as addr_state, + "NEXT_PYMNT_D"::varchar as next_pymnt_d, + "EMP_TITLE"::varchar as emp_title, + "COLLECTION_RECOVERY_FEE"::float as collection_recovery_fee, + "MTHS_SINCE_LAST_MAJOR_DEROG"::float as mths_since_last_major_derog, + "INQ_LAST_6MTHS"::float as inq_last_6mths, + "SUB_GRADE"::varchar as sub_grade, + "FUNDED_AMNT_INV"::float as funded_amnt_inv, + "DELINQ_2YRS"::float as delinq_2yrs, + "LOAN_ID"::varchar as loan_id, + "FUNDED_AMNT"::float as funded_amnt, + "VERIFICATION_STATUS"::varchar as verification_status, + "DTI"::float as dti, + "TOTAL_REC_PRNCP"::float as total_rec_prncp, + "GRADE"::varchar as grade, + "HOME_OWNERSHIP"::varchar as home_ownership, + "ISSUE_D"::varchar as issue_d, + "MTHS_SINCE_LAST_DELINQ"::float as mths_since_last_delinq, + "OUT_PRNCP"::float as out_prncp, + "PUB_REC"::float as pub_rec, + "INT_RATE"::float as int_rate, + "ZIP_CODE"::varchar as zip_code, + "OPEN_ACC"::float as open_acc, + "TERM"::varchar as term, + "PYMNT_PLAN"::varchar as pymnt_plan, + "URL"::varchar as url, + "REVOL_BAL"::float as revol_bal, + "RECOVERIES"::float as recoveries, + "LAST_PYMNT_AMNT"::float as last_pymnt_amnt, + "LOAN_AMNT"::float as loan_amnt, + "PURPOSE"::varchar as purpose, + "INITIAL_LIST_STATUS"::varchar as initial_list_status, + "TOTAL_REC_INT"::float as total_rec_int, + "TOTAL_PYMNT_INV"::float as total_pymnt_inv, + "MTHS_SINCE_LAST_RECORD"::float as mths_since_last_record, + "LAST_CREDIT_PULL_D"::varchar as last_credit_pull_d, + "TOTAL_REC_LATE_FEE"::float as total_rec_late_fee, + "MEMBER_ID"::float as member_id, + "POLICY_CODE"::float as policy_code, + "TITLE"::varchar as title, + "LOAN_STATUS"::varchar as loan_status, + "INSTALLMENT"::float as installment, + "EARLIEST_CR_LINE"::varchar as earliest_cr_line, + "REVOL_UTIL"::varchar as revol_util, + "OUT_PRNCP_INV"::float as out_prncp_inv, + "COLLECTIONS_12_MTHS_EX_MED"::float as collections_12_mths_ex_med + + from raw_source + +) + +select * from final diff --git a/transform/models/L1_staging/loans/stg_noel_noel_source.yml b/transform/models/L1_staging/loans/stg_noel_noel_source.yml new file mode 100644 index 0000000..88c1c84 --- /dev/null +++ b/transform/models/L1_staging/loans/stg_noel_noel_source.yml @@ -0,0 +1,61 @@ +version: 2 + +models: + - name: NOEL_SOURCE + description: '' + columns: + - name: airbyte_raw_id + - name: airbyte_extracted_at + - name: airbyte_meta + - name: total_acc + - name: annual_inc + - name: emp_length + - name: desc + - name: total_pymnt + - name: last_pymnt_d + - name: addr_state + - name: next_pymnt_d + - name: emp_title + - name: collection_recovery_fee + - name: mths_since_last_major_derog + - name: inq_last_6mths + - name: sub_grade + - name: funded_amnt_inv + - name: delinq_2yrs + - name: loan_id + - name: funded_amnt + - name: verification_status + - name: dti + - name: total_rec_prncp + - name: grade + - name: home_ownership + - name: issue_d + - name: mths_since_last_delinq + - name: out_prncp + - name: pub_rec + - name: int_rate + - name: zip_code + - name: open_acc + - name: term + - name: pymnt_plan + - name: url + - name: revol_bal + - name: recoveries + - name: last_pymnt_amnt + - name: loan_amnt + - name: purpose + - name: initial_list_status + - name: total_rec_int + - name: total_pymnt_inv + - name: mths_since_last_record + - name: last_credit_pull_d + - name: total_rec_late_fee + - name: member_id + - name: policy_code + - name: title + - name: loan_status + - name: installment + - name: earliest_cr_line + - name: revol_util + - name: out_prncp_inv + - name: collections_12_mths_ex_med From 545c6c6886acd167dc1bd7a3facb21a29b9df771 Mon Sep 17 00:00:00 2001 From: Noel Gomez Date: Wed, 13 Nov 2024 15:29:45 -0800 Subject: [PATCH 05/19] add avg model --- .../L1_staging/loans/stg_noel_noel_source.yml | 2 +- .../models/L2_core/noel_avg_by_grade.sql | 20 +++++++++++++++++++ .../models/L2_core/noel_avg_by_grade.yml | 9 +++++++++ 3 files changed, 30 insertions(+), 1 deletion(-) create mode 100644 transform/models/L2_core/noel_avg_by_grade.sql create mode 100644 transform/models/L2_core/noel_avg_by_grade.yml diff --git a/transform/models/L1_staging/loans/stg_noel_noel_source.yml b/transform/models/L1_staging/loans/stg_noel_noel_source.yml index 88c1c84..b2a71df 100644 --- a/transform/models/L1_staging/loans/stg_noel_noel_source.yml +++ b/transform/models/L1_staging/loans/stg_noel_noel_source.yml @@ -1,7 +1,7 @@ version: 2 models: - - name: NOEL_SOURCE + - name: stg_noel_noel_source description: '' columns: - name: airbyte_raw_id diff --git a/transform/models/L2_core/noel_avg_by_grade.sql b/transform/models/L2_core/noel_avg_by_grade.sql new file mode 100644 index 0000000..78c9273 --- /dev/null +++ b/transform/models/L2_core/noel_avg_by_grade.sql @@ -0,0 +1,20 @@ +with raw_source as ( + + select * from {{ ref('stg_noel_noel_source') }} + +), + +final as ( + + select + grade, + avg(loan_amnt) as avg_loan_amount, + count(*) as total_loans + from raw_source + where loan_status = 'Fully Paid' + group by grade + order by grade + +) + +select * from final diff --git a/transform/models/L2_core/noel_avg_by_grade.yml b/transform/models/L2_core/noel_avg_by_grade.yml new file mode 100644 index 0000000..4a6cdb3 --- /dev/null +++ b/transform/models/L2_core/noel_avg_by_grade.yml @@ -0,0 +1,9 @@ +version: 2 + +models: + - name: noel_avg_by_grade + description: 'my model' + columns: + - name: grade + - name: avg_loan_amount + - name: total_loans From 855d07f3cdb0b91563d8aba412ebd612dab72309 Mon Sep 17 00:00:00 2001 From: Noel Gomez Date: Wed, 13 Nov 2024 15:38:35 -0800 Subject: [PATCH 06/19] update description --- transform/models/L1_staging/country_data/_country_data.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/transform/models/L1_staging/country_data/_country_data.yml b/transform/models/L1_staging/country_data/_country_data.yml index e0f3613..9fae7f1 100644 --- a/transform/models/L1_staging/country_data/_country_data.yml +++ b/transform/models/L1_staging/country_data/_country_data.yml @@ -7,4 +7,4 @@ sources: - daily_run_airbyte tables: - name: COUNTRY_POPULATIONS - description: 'Raw population information from Github Datasets repository' + description: 'country pupolations' From bcdbde60acb25c78056ffb1efd3d0ea0a979066b Mon Sep 17 00:00:00 2001 From: Noel Gomez Date: Thu, 14 Nov 2024 11:10:12 -0800 Subject: [PATCH 07/19] add missing desc --- transform/models/L1_staging/loans/_loans_noel.yml | 2 +- transform/models/L1_staging/loans/stg_noel_noel_source.yml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/transform/models/L1_staging/loans/_loans_noel.yml b/transform/models/L1_staging/loans/_loans_noel.yml index a7e4726..2dcf586 100644 --- a/transform/models/L1_staging/loans/_loans_noel.yml +++ b/transform/models/L1_staging/loans/_loans_noel.yml @@ -5,4 +5,4 @@ sources: database: RAW tables: - name: NOEL_SOURCE - description: '' + description: 'country data' diff --git a/transform/models/L1_staging/loans/stg_noel_noel_source.yml b/transform/models/L1_staging/loans/stg_noel_noel_source.yml index b2a71df..2892c6a 100644 --- a/transform/models/L1_staging/loans/stg_noel_noel_source.yml +++ b/transform/models/L1_staging/loans/stg_noel_noel_source.yml @@ -2,7 +2,7 @@ version: 2 models: - name: stg_noel_noel_source - description: '' + description: 'source description' columns: - name: airbyte_raw_id - name: airbyte_extracted_at From 4f49ad97d7c277b25c0ee148b30ee7587fb2a008 Mon Sep 17 00:00:00 2001 From: Noel Gomez Date: Thu, 14 Nov 2024 11:49:15 -0800 Subject: [PATCH 08/19] add dag --- orchestrate/dags/noel.py | 42 +++++++++++++++++++ orchestrate/dags_yml_definitions/noel.yml | 26 ++++++++++++ .../models/L1_staging/loans/_loans_noel.yml | 2 + 3 files changed, 70 insertions(+) create mode 100644 orchestrate/dags/noel.py create mode 100644 orchestrate/dags_yml_definitions/noel.yml diff --git a/orchestrate/dags/noel.py b/orchestrate/dags/noel.py new file mode 100644 index 0000000..025a9ad --- /dev/null +++ b/orchestrate/dags/noel.py @@ -0,0 +1,42 @@ +import datetime + +from airflow.decorators import dag, task_group +from airflow.providers.airbyte.operators.airbyte import \ + AirbyteTriggerSyncOperator +from operators.datacoves.dbt import DatacovesDbtOperator + + +@dag( + default_args={ + "start_date": datetime.datetime(2023, 1, 1, 0, 0), + "owner": "Noel Gomez", + "email": "noel@example.com", + "email_on_failure": True, + }, + description="Personal Loan Average", + schedule_interval="0 0 1 */12 *", + tags=["version_1"], + catchup=False, +) +def noel(): + @task_group(group_id="extract_and_load_airbyte", tooltip="Airbyte Extract and Load") + def extract_and_load_airbyte(): + country_populations_datacoves_train = AirbyteTriggerSyncOperator( + task_id="country_populations_datacoves_train", + connection_id="676575f7-22d7-41f4-ab78-52099d8cbccb", + airbyte_conn_id="airbyte_connection", + ) + noel_source_datacoves_train = AirbyteTriggerSyncOperator( + task_id="noel_source_datacoves_train", + connection_id="c94c67bd-63dd-47d5-a117-e7544eee50f5", + airbyte_conn_id="airbyte_connection", + ) + + tg_extract_and_load_airbyte = extract_and_load_airbyte() + transform = DatacovesDbtOperator( + task_id="transform", bash_command="dbt build -s 'tag:daily_run_airbyte+ -t prd'" + ) + transform.set_upstream([tg_extract_and_load_airbyte]) + + +dag = noel() diff --git a/orchestrate/dags_yml_definitions/noel.yml b/orchestrate/dags_yml_definitions/noel.yml new file mode 100644 index 0000000..49db42f --- /dev/null +++ b/orchestrate/dags_yml_definitions/noel.yml @@ -0,0 +1,26 @@ +description: "Personal Loan Average" +schedule_interval: "0 0 1 */12 *" +tags: + - version_1 +default_args: + start_date: 2023-01-01 + # Replace with the email of the recipient for failures + owner: Noel Gomez + email: noel@example.com + email_on_failure: true +catchup: false + +nodes: + extract_and_load_airbyte: + generator: AirbyteDbtGenerator + type: task_group + + tooltip: "Airbyte Extract and Load" + dbt_list_args: "--select tag:daily_run_airbyte" + + transform: + operator: operators.datacoves.dbt.DatacovesDbtOperator + type: task + + bash_command: "dbt build -s 'tag:daily_run_airbyte+ -t prd'" + dependencies: ["extract_and_load_airbyte"] diff --git a/transform/models/L1_staging/loans/_loans_noel.yml b/transform/models/L1_staging/loans/_loans_noel.yml index 2dcf586..b0bc0c0 100644 --- a/transform/models/L1_staging/loans/_loans_noel.yml +++ b/transform/models/L1_staging/loans/_loans_noel.yml @@ -3,6 +3,8 @@ version: 2 sources: - name: NOEL database: RAW + tags: + - daily_run_airbyte tables: - name: NOEL_SOURCE description: 'country data' From 13a92ac91c231c0f779bae9ebb0026d7871916b7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mayra=20Pe=C3=B1a?= Date: Thu, 14 Nov 2024 11:53:01 -0800 Subject: [PATCH 09/19] Add -daily_run_airbyte --- transform/.dbt_coves/templates/source_props.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/transform/.dbt_coves/templates/source_props.yml b/transform/.dbt_coves/templates/source_props.yml index 5efd05b..e428c61 100644 --- a/transform/.dbt_coves/templates/source_props.yml +++ b/transform/.dbt_coves/templates/source_props.yml @@ -4,6 +4,8 @@ sources: - name: {{ relation.schema }} {%- if source_database %} database: {{ source_database }} + tags: + - daily_run_airbyte {%- endif %} tables: - name: {{ relation.name }} From 410edcfa2bd4b0111f953139e7a94eab09ed7274 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mayra=20Pe=C3=B1a?= Date: Thu, 14 Nov 2024 12:03:09 -0800 Subject: [PATCH 10/19] Fix airflow development --- orchestrate/dags/aws_dag.py | 46 ----------------------- orchestrate/dags/noel.py | 42 --------------------- orchestrate/dags/sample_dag.py | 9 +++-- orchestrate/dags/tester.py | 38 ------------------- orchestrate/dags_yml_definitions/noel.yml | 26 ------------- 5 files changed, 6 insertions(+), 155 deletions(-) delete mode 100644 orchestrate/dags/aws_dag.py delete mode 100644 orchestrate/dags/noel.py delete mode 100644 orchestrate/dags/tester.py delete mode 100644 orchestrate/dags_yml_definitions/noel.yml diff --git a/orchestrate/dags/aws_dag.py b/orchestrate/dags/aws_dag.py deleted file mode 100644 index af1c111..0000000 --- a/orchestrate/dags/aws_dag.py +++ /dev/null @@ -1,46 +0,0 @@ -import datetime -from airflow.decorators import dag, task -from operators.datacoves.dbt import DatacovesDbtOperator - -@dag( - default_args={ - "start_date": datetime.datetime(2023, 1, 1, 0, 0), - "owner": "Noel Gomez", - "email": "gomezn@example.com", - "email_on_failure": True, - }, - description="Sample DAG for dbt build", - schedule_interval="0 0 1 */12 *", - tags=["version_2"], - catchup=False, -) -def aws_dag(): - - @task - def get_aws_variable(): - from airflow.models import Variable - # Fetches the variable, potentially making an AWS Secrets Manager API call - aws_var = Variable.get("aws_mayras_secret") - return aws_var - - @task - def get_datacoves_variable(): - from airflow.models import Variable - # Fetches the variable without an AWS Secrets Manager API call - datacoves_var = Variable.get("datacoves_mayras_secret") - return local_var - - aws_variable = get_aws_variable() - my_variable = get_local_variable() - - # Task to run dbt using the DatacovesDbtOperator and pass the variables - @task - def run_dbt_task(aws_variable: str, my_variable: str): - # Use the fetched variables in the dbt command - DatacovesDbtOperator( - task_id="run_dbt", - bash_command=f"dbt run -s personal_loans --vars '{{my_aws_variable: \"{aws_variable}\", my_variable: \"{my_variable}\"}}'" - ) - - -dag = aws_dag() diff --git a/orchestrate/dags/noel.py b/orchestrate/dags/noel.py deleted file mode 100644 index 025a9ad..0000000 --- a/orchestrate/dags/noel.py +++ /dev/null @@ -1,42 +0,0 @@ -import datetime - -from airflow.decorators import dag, task_group -from airflow.providers.airbyte.operators.airbyte import \ - AirbyteTriggerSyncOperator -from operators.datacoves.dbt import DatacovesDbtOperator - - -@dag( - default_args={ - "start_date": datetime.datetime(2023, 1, 1, 0, 0), - "owner": "Noel Gomez", - "email": "noel@example.com", - "email_on_failure": True, - }, - description="Personal Loan Average", - schedule_interval="0 0 1 */12 *", - tags=["version_1"], - catchup=False, -) -def noel(): - @task_group(group_id="extract_and_load_airbyte", tooltip="Airbyte Extract and Load") - def extract_and_load_airbyte(): - country_populations_datacoves_train = AirbyteTriggerSyncOperator( - task_id="country_populations_datacoves_train", - connection_id="676575f7-22d7-41f4-ab78-52099d8cbccb", - airbyte_conn_id="airbyte_connection", - ) - noel_source_datacoves_train = AirbyteTriggerSyncOperator( - task_id="noel_source_datacoves_train", - connection_id="c94c67bd-63dd-47d5-a117-e7544eee50f5", - airbyte_conn_id="airbyte_connection", - ) - - tg_extract_and_load_airbyte = extract_and_load_airbyte() - transform = DatacovesDbtOperator( - task_id="transform", bash_command="dbt build -s 'tag:daily_run_airbyte+ -t prd'" - ) - transform.set_upstream([tg_extract_and_load_airbyte]) - - -dag = noel() diff --git a/orchestrate/dags/sample_dag.py b/orchestrate/dags/sample_dag.py index e87a4e2..ef4d898 100644 --- a/orchestrate/dags/sample_dag.py +++ b/orchestrate/dags/sample_dag.py @@ -8,17 +8,20 @@ default_args={ "start_date": datetime.datetime(2023, 1, 1, 0, 0), "owner": "Mayra Pena", - "email": "mayra@datacoves.com", + "email": "mayra@example.com", "email_on_failure": True, + "retries": 3, + }, description="Daily dbt run", - schedule_interval="0 12 * * *", + schedule="0 12 * * *", tags=["version_1"], catchup=False, + ) def sample_dag(): run_dbt = DatacovesDbtOperator( - task_id="run_dbt", bash_command="dbt debug" + task_id="run_dbt", bash_command="dbt run - s country_codes" ) diff --git a/orchestrate/dags/tester.py b/orchestrate/dags/tester.py deleted file mode 100644 index 05bd887..0000000 --- a/orchestrate/dags/tester.py +++ /dev/null @@ -1,38 +0,0 @@ -import datetime -from airflow.decorators import dag, task -from operators.datacoves.dbt import DatacovesDbtOperator - -@dag( - default_args={ - "start_date": datetime.datetime(2023, 1, 1, 0, 0), - "owner": "Noel Gomez", - "email": "gomezn@example.com", - "email_on_failure": True, - }, - description="Sample DAG for dbt build", - schedule_interval="0 0 1 */12 *", - tags=["version_3"], - catchup=False, -) -def tester_dag(): - - @task - def get_variable(): - from airflow.models import Variable - # Fetch the variable from Airflow's Variables - my_var = Variable.get("datacoves_mayras_secret") - return my_var # Return the value for downstream tasks - - fetched_variable = get_variable() - - # Task to run dbt using the DatacovesDbtOperator and pass the fetched variable - @task - def run_dbt_task(fetched_variable: str): - # Use the fetched variable in the dbt command - DatacovesDbtOperator( - task_id="run_dbt", - bash_command=f"dbt run -s personal_loans --vars '{{my_var: \"{fetched_variable}\"}}'" - ) - - -dag = tester_dag() diff --git a/orchestrate/dags_yml_definitions/noel.yml b/orchestrate/dags_yml_definitions/noel.yml deleted file mode 100644 index 49db42f..0000000 --- a/orchestrate/dags_yml_definitions/noel.yml +++ /dev/null @@ -1,26 +0,0 @@ -description: "Personal Loan Average" -schedule_interval: "0 0 1 */12 *" -tags: - - version_1 -default_args: - start_date: 2023-01-01 - # Replace with the email of the recipient for failures - owner: Noel Gomez - email: noel@example.com - email_on_failure: true -catchup: false - -nodes: - extract_and_load_airbyte: - generator: AirbyteDbtGenerator - type: task_group - - tooltip: "Airbyte Extract and Load" - dbt_list_args: "--select tag:daily_run_airbyte" - - transform: - operator: operators.datacoves.dbt.DatacovesDbtOperator - type: task - - bash_command: "dbt build -s 'tag:daily_run_airbyte+ -t prd'" - dependencies: ["extract_and_load_airbyte"] From 0c1be1ca490be975ce8491be20d8d868779e146f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mayra=20Pe=C3=B1a?= Date: Thu, 14 Nov 2024 12:08:11 -0800 Subject: [PATCH 11/19] Add retries --- orchestrate/dags_yml_definitions/sample_dag.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/orchestrate/dags_yml_definitions/sample_dag.yml b/orchestrate/dags_yml_definitions/sample_dag.yml index 3b2afe9..eb234dc 100644 --- a/orchestrate/dags_yml_definitions/sample_dag.yml +++ b/orchestrate/dags_yml_definitions/sample_dag.yml @@ -1,5 +1,5 @@ description: "Daily dbt run" -schedule_interval: "0 12 * * *" +schedule: "0 12 * * *" tags: - version_1 default_args: @@ -8,6 +8,7 @@ default_args: # Replace with the email of the recipient for failures email: john@example.com email_on_failure: true + retires: 3 catchup: false nodes: From b486c70c5c3a659108a75cd469a19fdf52f7648d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mayra=20Pe=C3=B1a?= Date: Thu, 14 Nov 2024 12:14:17 -0800 Subject: [PATCH 12/19] Cleaning up repo --- .../models/L1_staging/loans/_loans_noel.yml | 10 --- .../L1_staging/loans/stg_noel_noel_source.sql | 71 ------------------- .../L1_staging/loans/stg_noel_noel_source.yml | 61 ---------------- .../models/L2_core/noel_avg_by_grade.sql | 20 ------ .../models/L2_core/noel_avg_by_grade.yml | 9 --- 5 files changed, 171 deletions(-) delete mode 100644 transform/models/L1_staging/loans/_loans_noel.yml delete mode 100644 transform/models/L1_staging/loans/stg_noel_noel_source.sql delete mode 100644 transform/models/L1_staging/loans/stg_noel_noel_source.yml delete mode 100644 transform/models/L2_core/noel_avg_by_grade.sql delete mode 100644 transform/models/L2_core/noel_avg_by_grade.yml diff --git a/transform/models/L1_staging/loans/_loans_noel.yml b/transform/models/L1_staging/loans/_loans_noel.yml deleted file mode 100644 index b0bc0c0..0000000 --- a/transform/models/L1_staging/loans/_loans_noel.yml +++ /dev/null @@ -1,10 +0,0 @@ -version: 2 - -sources: - - name: NOEL - database: RAW - tags: - - daily_run_airbyte - tables: - - name: NOEL_SOURCE - description: 'country data' diff --git a/transform/models/L1_staging/loans/stg_noel_noel_source.sql b/transform/models/L1_staging/loans/stg_noel_noel_source.sql deleted file mode 100644 index 6cf5b66..0000000 --- a/transform/models/L1_staging/loans/stg_noel_noel_source.sql +++ /dev/null @@ -1,71 +0,0 @@ -with raw_source as ( - - select * - from {{ source('NOEL', 'NOEL_SOURCE') }} - -), - -final as ( - - select - "_AIRBYTE_RAW_ID"::varchar as airbyte_raw_id, - "_AIRBYTE_EXTRACTED_AT"::timestamp_tz as airbyte_extracted_at, - "_AIRBYTE_META"::variant as airbyte_meta, - "TOTAL_ACC"::float as total_acc, - "ANNUAL_INC"::float as annual_inc, - "EMP_LENGTH"::varchar as emp_length, - "DESC"::varchar as desc, - "TOTAL_PYMNT"::float as total_pymnt, - "LAST_PYMNT_D"::varchar as last_pymnt_d, - "ADDR_STATE"::varchar as addr_state, - "NEXT_PYMNT_D"::varchar as next_pymnt_d, - "EMP_TITLE"::varchar as emp_title, - "COLLECTION_RECOVERY_FEE"::float as collection_recovery_fee, - "MTHS_SINCE_LAST_MAJOR_DEROG"::float as mths_since_last_major_derog, - "INQ_LAST_6MTHS"::float as inq_last_6mths, - "SUB_GRADE"::varchar as sub_grade, - "FUNDED_AMNT_INV"::float as funded_amnt_inv, - "DELINQ_2YRS"::float as delinq_2yrs, - "LOAN_ID"::varchar as loan_id, - "FUNDED_AMNT"::float as funded_amnt, - "VERIFICATION_STATUS"::varchar as verification_status, - "DTI"::float as dti, - "TOTAL_REC_PRNCP"::float as total_rec_prncp, - "GRADE"::varchar as grade, - "HOME_OWNERSHIP"::varchar as home_ownership, - "ISSUE_D"::varchar as issue_d, - "MTHS_SINCE_LAST_DELINQ"::float as mths_since_last_delinq, - "OUT_PRNCP"::float as out_prncp, - "PUB_REC"::float as pub_rec, - "INT_RATE"::float as int_rate, - "ZIP_CODE"::varchar as zip_code, - "OPEN_ACC"::float as open_acc, - "TERM"::varchar as term, - "PYMNT_PLAN"::varchar as pymnt_plan, - "URL"::varchar as url, - "REVOL_BAL"::float as revol_bal, - "RECOVERIES"::float as recoveries, - "LAST_PYMNT_AMNT"::float as last_pymnt_amnt, - "LOAN_AMNT"::float as loan_amnt, - "PURPOSE"::varchar as purpose, - "INITIAL_LIST_STATUS"::varchar as initial_list_status, - "TOTAL_REC_INT"::float as total_rec_int, - "TOTAL_PYMNT_INV"::float as total_pymnt_inv, - "MTHS_SINCE_LAST_RECORD"::float as mths_since_last_record, - "LAST_CREDIT_PULL_D"::varchar as last_credit_pull_d, - "TOTAL_REC_LATE_FEE"::float as total_rec_late_fee, - "MEMBER_ID"::float as member_id, - "POLICY_CODE"::float as policy_code, - "TITLE"::varchar as title, - "LOAN_STATUS"::varchar as loan_status, - "INSTALLMENT"::float as installment, - "EARLIEST_CR_LINE"::varchar as earliest_cr_line, - "REVOL_UTIL"::varchar as revol_util, - "OUT_PRNCP_INV"::float as out_prncp_inv, - "COLLECTIONS_12_MTHS_EX_MED"::float as collections_12_mths_ex_med - - from raw_source - -) - -select * from final diff --git a/transform/models/L1_staging/loans/stg_noel_noel_source.yml b/transform/models/L1_staging/loans/stg_noel_noel_source.yml deleted file mode 100644 index 2892c6a..0000000 --- a/transform/models/L1_staging/loans/stg_noel_noel_source.yml +++ /dev/null @@ -1,61 +0,0 @@ -version: 2 - -models: - - name: stg_noel_noel_source - description: 'source description' - columns: - - name: airbyte_raw_id - - name: airbyte_extracted_at - - name: airbyte_meta - - name: total_acc - - name: annual_inc - - name: emp_length - - name: desc - - name: total_pymnt - - name: last_pymnt_d - - name: addr_state - - name: next_pymnt_d - - name: emp_title - - name: collection_recovery_fee - - name: mths_since_last_major_derog - - name: inq_last_6mths - - name: sub_grade - - name: funded_amnt_inv - - name: delinq_2yrs - - name: loan_id - - name: funded_amnt - - name: verification_status - - name: dti - - name: total_rec_prncp - - name: grade - - name: home_ownership - - name: issue_d - - name: mths_since_last_delinq - - name: out_prncp - - name: pub_rec - - name: int_rate - - name: zip_code - - name: open_acc - - name: term - - name: pymnt_plan - - name: url - - name: revol_bal - - name: recoveries - - name: last_pymnt_amnt - - name: loan_amnt - - name: purpose - - name: initial_list_status - - name: total_rec_int - - name: total_pymnt_inv - - name: mths_since_last_record - - name: last_credit_pull_d - - name: total_rec_late_fee - - name: member_id - - name: policy_code - - name: title - - name: loan_status - - name: installment - - name: earliest_cr_line - - name: revol_util - - name: out_prncp_inv - - name: collections_12_mths_ex_med diff --git a/transform/models/L2_core/noel_avg_by_grade.sql b/transform/models/L2_core/noel_avg_by_grade.sql deleted file mode 100644 index 78c9273..0000000 --- a/transform/models/L2_core/noel_avg_by_grade.sql +++ /dev/null @@ -1,20 +0,0 @@ -with raw_source as ( - - select * from {{ ref('stg_noel_noel_source') }} - -), - -final as ( - - select - grade, - avg(loan_amnt) as avg_loan_amount, - count(*) as total_loans - from raw_source - where loan_status = 'Fully Paid' - group by grade - order by grade - -) - -select * from final diff --git a/transform/models/L2_core/noel_avg_by_grade.yml b/transform/models/L2_core/noel_avg_by_grade.yml deleted file mode 100644 index 4a6cdb3..0000000 --- a/transform/models/L2_core/noel_avg_by_grade.yml +++ /dev/null @@ -1,9 +0,0 @@ -version: 2 - -models: - - name: noel_avg_by_grade - description: 'my model' - columns: - - name: grade - - name: avg_loan_amount - - name: total_loans From 338f7f86535e80f1d4227058504b23ff037bd89b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mayra=20Pe=C3=B1a?= Date: Thu, 14 Nov 2024 12:36:02 -0800 Subject: [PATCH 13/19] Fix staging model props template --- transform/.dbt_coves/templates/staging_model_props.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/transform/.dbt_coves/templates/staging_model_props.yml b/transform/.dbt_coves/templates/staging_model_props.yml index 0ca65e8..fed7949 100644 --- a/transform/.dbt_coves/templates/staging_model_props.yml +++ b/transform/.dbt_coves/templates/staging_model_props.yml @@ -2,7 +2,7 @@ version: 2 models: - name: stg_{{relation.schema | lower }}_{{model | lower }} - description: 'TESTING' + description: '' columns: {%- for cols in nested.values() %} {%- for col in cols %} From 2078f4b57355b7478dafc062f4ada4313c636d26 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mayra=20Pe=C3=B1a?= Date: Thu, 14 Nov 2024 14:11:44 -0800 Subject: [PATCH 14/19] Remove country populations desc --- transform/models/L1_staging/country_data/_country_data.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/transform/models/L1_staging/country_data/_country_data.yml b/transform/models/L1_staging/country_data/_country_data.yml index 11ec525..b81bdb1 100644 --- a/transform/models/L1_staging/country_data/_country_data.yml +++ b/transform/models/L1_staging/country_data/_country_data.yml @@ -7,4 +7,4 @@ sources: - daily_run_airbyte tables: - name: COUNTRY_POPULATIONS - description: 'Raw population information from Github Datasets repository.' + description: '' From a3aa12dd43c6fd15ad1521e127c30b4c5cb0b139 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mayra=20Pe=C3=B1a?= Date: Wed, 20 Nov 2024 12:51:56 -0800 Subject: [PATCH 15/19] Test workflow on push to main --- .github/workflows/push-to-main.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/push-to-main.yml b/.github/workflows/push-to-main.yml index 23a1120..5b58326 100644 --- a/.github/workflows/push-to-main.yml +++ b/.github/workflows/push-to-main.yml @@ -4,6 +4,7 @@ on: # yamllint disable-line rule:truthy pull_request: types: - closed + push: branches: - main paths: From 637f4adcf1f17a88f91da742b4a5d509319e5079 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mayra=20Pe=C3=B1a?= Date: Wed, 20 Nov 2024 12:53:39 -0800 Subject: [PATCH 16/19] Testing worklow --- .github/workflows/push-to-main.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/push-to-main.yml b/.github/workflows/push-to-main.yml index 5b58326..92a5cc6 100644 --- a/.github/workflows/push-to-main.yml +++ b/.github/workflows/push-to-main.yml @@ -31,7 +31,8 @@ jobs: # Perform the deployment to Prod build: # Need to make sure that when the PR was closed, it was actually merged. - if: github.event.pull_request.merged == true && github.event.pull_request.base.ref == 'main' + # if: github.event.pull_request.merged == true && github.event.pull_request.base.ref == 'main' + if: github.event.pull_request.base.ref == 'main' name: Deployment Script runs-on: ubuntu-latest From 6c4b885453c24bc0317cf8ecec3f53a7ba840c0f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mayra=20Pe=C3=B1a?= Date: Wed, 20 Nov 2024 12:54:40 -0800 Subject: [PATCH 17/19] Testing workflow --- .github/workflows/push-to-main.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/.github/workflows/push-to-main.yml b/.github/workflows/push-to-main.yml index 92a5cc6..6f795ad 100644 --- a/.github/workflows/push-to-main.yml +++ b/.github/workflows/push-to-main.yml @@ -32,7 +32,6 @@ jobs: build: # Need to make sure that when the PR was closed, it was actually merged. # if: github.event.pull_request.merged == true && github.event.pull_request.base.ref == 'main' - if: github.event.pull_request.base.ref == 'main' name: Deployment Script runs-on: ubuntu-latest From b49caff5aaaca0c2e89b57d6997a59f6fecfa5c3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mayra=20Pena=28Pe=C3=B1a=29?= <97967943+mayrapena1324@users.noreply.github.com> Date: Thu, 21 Nov 2024 14:02:04 -0600 Subject: [PATCH 18/19] Update _country_data.yml --- transform/models/L1_staging/country_data/_country_data.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/transform/models/L1_staging/country_data/_country_data.yml b/transform/models/L1_staging/country_data/_country_data.yml index b81bdb1..11ec525 100644 --- a/transform/models/L1_staging/country_data/_country_data.yml +++ b/transform/models/L1_staging/country_data/_country_data.yml @@ -7,4 +7,4 @@ sources: - daily_run_airbyte tables: - name: COUNTRY_POPULATIONS - description: '' + description: 'Raw population information from Github Datasets repository.' From a924a9c69d7d26df8d887c86416ca3e4cad2d5b8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mayra=20Pe=C3=B1a?= Date: Thu, 21 Nov 2024 12:20:46 -0800 Subject: [PATCH 19/19] Add period --- transform/models/L1_staging/country_data/_country_data.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/transform/models/L1_staging/country_data/_country_data.yml b/transform/models/L1_staging/country_data/_country_data.yml index 11ec525..dc69a01 100644 --- a/transform/models/L1_staging/country_data/_country_data.yml +++ b/transform/models/L1_staging/country_data/_country_data.yml @@ -7,4 +7,4 @@ sources: - daily_run_airbyte tables: - name: COUNTRY_POPULATIONS - description: 'Raw population information from Github Datasets repository.' + description: 'Raw population information from Github Datasets repository..'