Skip to content

Commit

Permalink
feat: add two more tables
Browse files Browse the repository at this point in the history
  • Loading branch information
hellpe authored and JeanLouisLamezec committed Jan 6, 2025
1 parent daadae7 commit 68ee2d9
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 14 deletions.
16 changes: 15 additions & 1 deletion dags/aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

from data_utils.metabase_aggregation.aggregation import perform_and_insert_aggregated_data
from data_utils.metabase_aggregation.aggregation import perform_and_insert_aggregated_data, aggregate_by_date_task, aggregate_by_participation_type_task

# Default arguments for the DAG tasks
default_args = {
Expand All @@ -28,5 +28,19 @@
python_callable=perform_and_insert_aggregated_data,
op_kwargs={},
)
aggregate_by_date_task = PythonOperator(
task_id='aggregate_by_date_task',
python_callable=aggregate_by_date_task,
op_kwargs={},
)
aggregate_by_participation_type_task = PythonOperator(
task_id='aggregate_by_participation_type_task',
python_callable=aggregate_by_participation_type_task,
op_kwargs={},
)


aggregate_and_insert_task
aggregate_by_date_task
aggregate_by_participation_type_task

66 changes: 53 additions & 13 deletions dags/data_utils/metabase_aggregation/aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
from ..postgres.get_postgres_connection import get_postgres_connection
import pandas as pd

db_cluster = 'db_cluster_name_data'

db_cluster_preprod = 'db_cluster_name_data'
db_cluster_prod = 'db_cluster_name_data_prod'

def execute_queries(connection, queries):
"""
Expand All @@ -22,7 +22,7 @@ def execute_queries(connection, queries):
return results


def aggregate_data_for_clients(client_databases, queries):
def aggregate_data_for_clients(db_cluster, client_databases, queries):
"""
Performs data aggregation for each client and returns a DataFrame.
"""
Expand All @@ -39,25 +39,25 @@ def aggregate_data_for_clients(client_databases, queries):
df = pd.DataFrame(data)
return df

def clean_data_in_postgres(connection):
"""Deletes rows in the table where the 'date' is between the start_date and end_date."""
def clean_data_in_postgres(connection, table):
"""Deletes rows in the table."""
try:
delete_query = text(
f"DELETE FROM aggregated_data;"
f"DELETE FROM {table};"
)

connection.execute(delete_query)
print(f"Cleaned data in aggregated_data")
print(f"Cleaned data in table {table}")
except Exception as e:
print(f"Failed to clean data in aggregated_data: {e}")
print(f"Failed to clean data in table {table}: {e}")

def insert_data_to_aggregated_db(dataframe, target_database, target_table):
def insert_data_to_aggregated_db(db_cluster, dataframe, target_database, target_table):
"""
Inserts the aggregated data into a target PostgreSQL table.
"""
try:
connection = get_postgres_connection(db_cluster, target_database)
clean_data_in_postgres(connection)
clean_data_in_postgres(connection, target_table)
dataframe.to_sql(target_table, con=connection, if_exists='replace', index=False)
print(f"Data successfully inserted into {target_table} in {target_database}.")
connection.close()
Expand All @@ -67,7 +67,7 @@ def insert_data_to_aggregated_db(dataframe, target_database, target_table):


def perform_and_insert_aggregated_data():
client_databases = ["lyon", "marseille", "toulouse", "grand_nancy", "tours"]
client_databases = ["angers", "cdc", "cea", "cese", "grand_nancy", "lyon", "marseille", "meyzieu", "sytral", "thionville", "toulouse", "tours", "valbonne"]

# List of aggregation queries
queries = {
Expand All @@ -78,7 +78,7 @@ def perform_and_insert_aggregated_data():
}

# Perform data aggregation for all clients
aggregated_data = aggregate_data_for_clients(client_databases, queries)
aggregated_data = aggregate_data_for_clients(db_cluster_prod, client_databases, queries)

# Display the aggregated data (optional)
print(aggregated_data.head(5))
Expand All @@ -87,4 +87,44 @@ def perform_and_insert_aggregated_data():
target_database = "aggregated_client_data"
target_table = "aggregated_data"

insert_data_to_aggregated_db(aggregated_data, target_database, target_table)
insert_data_to_aggregated_db(db_cluster_preprod, aggregated_data, target_database, target_table)

def aggregate_by_date_task():
client_databases = ["angers", "cdc", "cea", "cese", "grand_nancy", "lyon", "marseille", "meyzieu", "sytral", "thionville", "toulouse", "tours", "valbonne"]

query = """
SELECT
COUNT(*) as users_count,
DATE(created_at) AS date_of_creation
FROM prod.users
GROUP BY date_of_creation
"""

target_database = "aggregated_client_data"

for client in client_databases:
target_table = f"aggregate_by_date_{client}"
connection = get_postgres_connection(db_cluster_prod, client)
result = connection.execute(text(query))
df = pd.DataFrame(result)
insert_data_to_aggregated_db(db_cluster_preprod, df, target_database, target_table)

def aggregate_by_participation_type_task():
client_databases = ["angers", "cdc", "cea", "cese", "grand_nancy", "lyon", "marseille", "meyzieu", "sytral", "thionville", "toulouse", "tours", "valbonne"]

query = """
SELECT
COUNT (*),
participation_type
FROM prod.participations
GROUP BY participation_type
"""

target_database = "aggregated_client_data"

for client in client_databases:
target_table = f"aggregate_by_participation_type_{client}"
connection = get_postgres_connection(db_cluster_prod, client)
result = connection.execute(text(query))
df = pd.DataFrame(result)
insert_data_to_aggregated_db(db_cluster_preprod, df, target_database, target_table)

0 comments on commit 68ee2d9

Please sign in to comment.