diff --git a/dags/aggregation.py b/dags/aggregation.py new file mode 100644 index 0000000..81ca805 --- /dev/null +++ b/dags/aggregation.py @@ -0,0 +1,51 @@ +from airflow import DAG +from airflow.operators.python import PythonOperator +from datetime import datetime, timedelta + +from data_utils.metabase_aggregation.aggregation import perform_and_insert_aggregated_data, aggregate_by_date_task, aggregate_by_participation_type_task, aggregate_by_budgets_task + +# Default arguments for the DAG tasks +default_args = { + 'owner': 'airflow', + 'depends_on_past': False, + 'email_on_failure': False, + 'retries': 0, +} + +# Define the DAG +with DAG( + 'daily_aggregated_client_data_dag', + default_args=default_args, + description='A DAG to aggregate data from multiple clients and insert it into a database', + schedule='@daily', + start_date=datetime(2024, 10, 21), # DAG start date + catchup=False, +) as dag: + + # Task to run the aggregation and insertion function + aggregate_and_insert_task = PythonOperator( + task_id='perform_and_insert_aggregated_data_task', + python_callable=perform_and_insert_aggregated_data, + op_kwargs={}, + ) + aggregate_by_date_task = PythonOperator( + task_id='aggregate_by_date_task', + python_callable=aggregate_by_date_task, + op_kwargs={}, + ) + aggregate_by_participation_type_task = PythonOperator( + task_id='aggregate_by_participation_type_task', + python_callable=aggregate_by_participation_type_task, + op_kwargs={}, + ) + aggregate_by_budgets_task = PythonOperator( + task_id='aggregate_by_budgets_task', + python_callable=aggregate_by_budgets_task, + op_kwargs={}, + ) + + aggregate_and_insert_task + aggregate_by_date_task + aggregate_by_participation_type_task + aggregate_by_budgets_task + \ No newline at end of file diff --git a/dags/client_list.py b/dags/client_list.py index d3fe8be..cc1c0c0 100644 --- a/dags/client_list.py +++ b/dags/client_list.py @@ -1,19 +1,26 @@ clients = [ - "angers", - "casa", - "cdc", - "cea", - "cese", - "colombes", - "grand_nancy", - "lyon", - "marseille", - "montpellier", - "meyzieu", - "real_deal", - "sytral", - "thionville", - "toulouse", - "tours", - "valbonne" + "angers", + "arcueil", + "bagneux", + "casa", + "cdc", + "cea", + "cese", + "chambery", + "colombes", + "cultuur_connect", + "grand_nancy", + "lyon", + "malakoff", + "marseille", + "montpellier", + "meyzieu", + "nanterre", + "real_deal", + "sytral", + "thionville", + "toulouse", + "tours", + "valbonne", + "villeurbanne" ] \ No newline at end of file diff --git a/dags/data_utils/metabase_aggregation/aggregation.py b/dags/data_utils/metabase_aggregation/aggregation.py new file mode 100644 index 0000000..fa5a7bd --- /dev/null +++ b/dags/data_utils/metabase_aggregation/aggregation.py @@ -0,0 +1,164 @@ +from sqlalchemy import text +from client_list import clients +from ..postgres.get_postgres_connection import get_postgres_connection +import pandas as pd + +db_cluster_preprod = 'db_cluster_name_data' +db_cluster_prod = 'db_cluster_name_data_prod' + + +def execute_queries(connection, queries): + """ + Executes a list of queries on the specified connection. + """ + results = {} + for query_name, query in queries.items(): + try: + result = connection.execute(text(query)) + count = result.fetchone()[0] + results[query_name] = count + except Exception as e: + print(f"Failed to execute query {query_name}: {e}") + results[query_name] = None # Return None if the query fails + return results + + +def aggregate_data_for_clients(db_cluster, client_databases, queries): + """ + Performs data aggregation for each client and returns a DataFrame. + """ + data = [] + + for client_db in client_databases: + connection = get_postgres_connection(db_cluster, client_db) + client_results = execute_queries(connection, queries) + client_results['client'] = client_db # Add client name to results + data.append(client_results) + connection.close() # Close the connection after query execution + + # Convert results to DataFrame + df = pd.DataFrame(data) + return df + +def aggregate_data_for_clients_for_unique_query(db_cluster, client_databases, query): + """ + Performs data aggregation for each client and returns a DataFrame. + """ + data = [] + + for client_db in client_databases: + connection = get_postgres_connection(db_cluster, client_db) + result = connection.execute(text(query)) + df = pd.DataFrame(result) + df['client'] = client_db # Add client name to results + data.append(df) + connection.close() # Close the connection after query execution + + # Convert results to DataFrame + df = pd.concat(data) + return df + +def clean_data_in_postgres(connection, table): + """Deletes rows in the table.""" + try: + delete_query = text( + f"DELETE FROM {table};" + ) + + connection.execute(delete_query) + print(f"Cleaned data in table {table}") + except Exception as e: + print(f"Failed to clean data in table {table}: {e}") + +def insert_data_to_aggregated_db(db_cluster, dataframe, target_database, target_table): + """ + Inserts the aggregated data into a target PostgreSQL table. + """ + try: + connection = get_postgres_connection(db_cluster, target_database) + clean_data_in_postgres(connection, target_table) + dataframe.to_sql(target_table, con=connection, if_exists='replace', index=False) + print(f"Data successfully inserted into {target_table} in {target_database}.") + connection.close() + except Exception as e: + print(f"Failed to insert data into {target_table}: {e}") + raise + + +def perform_and_insert_aggregated_data(): + client_databases = clients + + # List of aggregation queries + queries = { + 'user_count': "SELECT COUNT(*) AS user_count FROM prod.users;", + 'participating_user_count': "SELECT COUNT(*) AS participating_user_count FROM prod.users WHERE has_answered_survey OR is_endorsing OR is_following OR has_authored_comment OR has_authored_proposal OR has_voted_on_project OR has_voted_on_proposal;", + 'participatory_process_count': "SELECT COUNT(*) AS participatory_process_count FROM prod.stg_decidim_participatory_processes;", + 'participations_count': "SELECT COUNT(*) AS participations_count FROM prod.participations WHERE participation_type IS NOT NULL;", + } + + # Perform data aggregation for all clients + aggregated_data = aggregate_data_for_clients(db_cluster_preprod, client_databases, queries) + + # Display the aggregated data (optional) + print(aggregated_data.head(5)) + + # Insert the aggregated data into a new database and table + target_database = "aggregated_client_data" + target_table = "aggregated_data" + + insert_data_to_aggregated_db(db_cluster_preprod, aggregated_data, target_database, target_table) + +def aggregate_by_date_task(): + client_databases = clients + + query = """ + SELECT + COUNT(*) as users_count, + DATE(created_at) AS date_of_creation + FROM prod.users + GROUP BY date_of_creation + """ + + target_database = "aggregated_client_data" + + target_table = "aggregate_by_date" + df = aggregate_data_for_clients_for_unique_query(db_cluster_preprod, client_databases, query) + insert_data_to_aggregated_db(db_cluster_preprod, df, target_database, target_table) + +def aggregate_by_participation_type_task(): + client_databases = clients + + query = """ + SELECT + COUNT (*), + participation_type + FROM prod.participations + GROUP BY participation_type + """ + + target_database = "aggregated_client_data" + target_table = "aggregate_by_participation_type" + df = aggregate_data_for_clients_for_unique_query(db_cluster_preprod, client_databases, query) + insert_data_to_aggregated_db(db_cluster_preprod, df, target_database, target_table) + +def aggregate_by_budgets_task(): + client_databases = clients + + query = """ + SELECT + budget_id AS id, + title, + project_amount AS amount, + is_selected, + categories, + components.ps_title + FROM prod.budgets_projects + LEFT JOIN prod.components ON budgets_projects.decidim_component_id = components.id + GROUP BY budget_id, title, is_selected, resource_type, categories, project_amount, ps_title + ORDER BY project_amount ASC + """ + + target_database = "aggregated_client_data" + target_table = "aggregate_by_budgets" + df = aggregate_data_for_clients_for_unique_query(db_cluster_preprod, client_databases, query) + insert_data_to_aggregated_db(db_cluster_preprod, df, target_database, target_table) \ No newline at end of file diff --git a/dags/data_utils/postgres/get_postgres_connection.py b/dags/data_utils/postgres/get_postgres_connection.py new file mode 100644 index 0000000..2f0e451 --- /dev/null +++ b/dags/data_utils/postgres/get_postgres_connection.py @@ -0,0 +1,26 @@ +from airflow.hooks.base import BaseHook +from sqlalchemy import create_engine + +def get_postgres_connection(db_cluster, database): + """ + Extracts PostgreSQL connection details from Airflow and establishes a connection using SQLAlchemy. + """ + try: + # Retrieve the connection object from Airflow + connection = BaseHook.get_connection(db_cluster) + + # Extract connection details + user = connection.login + password = connection.password + host = connection.host + port = connection.port + + # Create the SQLAlchemy engine + engine = create_engine(f"postgresql://{user}:{password}@{host}:{port}/{database}") + conn = engine.connect() + print("Successfully connected to the PostgreSQL database via Airflow.") + return conn + + except Exception as e: + print(f"Failed to connect to PostgreSQL via Airflow: {e}") + raise # Raise exception to ensure the DAG fails if the connection cannot be established