diff --git a/dags/data_utils/metabase_aggregation/aggregation.py b/dags/data_utils/metabase_aggregation/aggregation.py index 90e390b..5e8b5c6 100644 --- a/dags/data_utils/metabase_aggregation/aggregation.py +++ b/dags/data_utils/metabase_aggregation/aggregation.py @@ -39,6 +39,24 @@ def aggregate_data_for_clients(db_cluster, client_databases, queries): df = pd.DataFrame(data) return df +def aggregate_data_for_clients_for_unique_query(db_cluster, client_databases, query): + """ + Performs data aggregation for each client and returns a DataFrame. + """ + data = [] + + for client_db in client_databases: + connection = get_postgres_connection(db_cluster, client_db) + result = connection.execute(text(query)) + df = pd.DataFrame(result) + df['client'] = client_db # Add client name to results + data.append(df) + connection.close() # Close the connection after query execution + + # Convert results to DataFrame + df = pd.concat(data) + return df + def clean_data_in_postgres(connection, table): """Deletes rows in the table.""" try: @@ -102,12 +120,9 @@ def aggregate_by_date_task(): target_database = "aggregated_client_data" - for client in client_databases: - target_table = f"aggregate_by_date_{client}" - connection = get_postgres_connection(db_cluster_prod, client) - result = connection.execute(text(query)) - df = pd.DataFrame(result) - insert_data_to_aggregated_db(db_cluster_preprod, df, target_database, target_table) + target_table = "aggregate_by_date" + df = aggregate_data_for_clients_for_unique_query(db_cluster_prod, client_databases, query) + insert_data_to_aggregated_db(db_cluster_preprod, df, target_database, target_table) def aggregate_by_participation_type_task(): client_databases = ["angers", "cdc", "cea", "cese", "grand_nancy", "lyon", "marseille", "meyzieu", "sytral", "thionville", "toulouse", "tours", "valbonne"] @@ -121,10 +136,6 @@ def aggregate_by_participation_type_task(): """ target_database = "aggregated_client_data" - - for client in client_databases: - target_table = f"aggregate_by_participation_type_{client}" - connection = get_postgres_connection(db_cluster_prod, client) - result = connection.execute(text(query)) - df = pd.DataFrame(result) - insert_data_to_aggregated_db(db_cluster_preprod, df, target_database, target_table) \ No newline at end of file + target_table = "aggregate_by_participation_type" + df = aggregate_data_for_clients_for_unique_query(db_cluster_prod, client_databases, query) + insert_data_to_aggregated_db(db_cluster_preprod, df, target_database, target_table) \ No newline at end of file