Skip to content

Commit

Permalink
feat: last edits
Browse files Browse the repository at this point in the history
  • Loading branch information
hellpe authored and JeanLouisLamezec committed Jan 6, 2025
1 parent 68ee2d9 commit b29f0d9
Showing 1 changed file with 24 additions and 13 deletions.
37 changes: 24 additions & 13 deletions dags/data_utils/metabase_aggregation/aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,24 @@ def aggregate_data_for_clients(db_cluster, client_databases, queries):
df = pd.DataFrame(data)
return df

def aggregate_data_for_clients_for_unique_query(db_cluster, client_databases, query):
"""
Performs data aggregation for each client and returns a DataFrame.
"""
data = []

for client_db in client_databases:
connection = get_postgres_connection(db_cluster, client_db)
result = connection.execute(text(query))
df = pd.DataFrame(result)
df['client'] = client_db # Add client name to results
data.append(df)
connection.close() # Close the connection after query execution

# Convert results to DataFrame
df = pd.concat(data)
return df

def clean_data_in_postgres(connection, table):
"""Deletes rows in the table."""
try:
Expand Down Expand Up @@ -102,12 +120,9 @@ def aggregate_by_date_task():

target_database = "aggregated_client_data"

for client in client_databases:
target_table = f"aggregate_by_date_{client}"
connection = get_postgres_connection(db_cluster_prod, client)
result = connection.execute(text(query))
df = pd.DataFrame(result)
insert_data_to_aggregated_db(db_cluster_preprod, df, target_database, target_table)
target_table = "aggregate_by_date"
df = aggregate_data_for_clients_for_unique_query(db_cluster_prod, client_databases, query)
insert_data_to_aggregated_db(db_cluster_preprod, df, target_database, target_table)

def aggregate_by_participation_type_task():
client_databases = ["angers", "cdc", "cea", "cese", "grand_nancy", "lyon", "marseille", "meyzieu", "sytral", "thionville", "toulouse", "tours", "valbonne"]
Expand All @@ -121,10 +136,6 @@ def aggregate_by_participation_type_task():
"""

target_database = "aggregated_client_data"

for client in client_databases:
target_table = f"aggregate_by_participation_type_{client}"
connection = get_postgres_connection(db_cluster_prod, client)
result = connection.execute(text(query))
df = pd.DataFrame(result)
insert_data_to_aggregated_db(db_cluster_preprod, df, target_database, target_table)
target_table = "aggregate_by_participation_type"
df = aggregate_data_for_clients_for_unique_query(db_cluster_prod, client_databases, query)
insert_data_to_aggregated_db(db_cluster_preprod, df, target_database, target_table)

0 comments on commit b29f0d9

Please sign in to comment.