diff --git a/terragrunt/aws/glue/etl.tf b/terragrunt/aws/glue/etl.tf index 6c0ab20..c7d8dff 100644 --- a/terragrunt/aws/glue/etl.tf +++ b/terragrunt/aws/glue/etl.tf @@ -1,5 +1,5 @@ # -# ETL jobs +# Platform / GC Forms / Generate test data # data "local_file" "forms_generate_test_data" { filename = "${path.module}/etl/platform/gc-forms/scripts/generate_test_data.py" @@ -35,3 +35,47 @@ resource "aws_glue_job" "forms_generate_test_data" { "--num_partitions" = "1" } } + +# +# Platform / Support / Generate test data +# +data "local_file" "platform_support_freshdesk" { + filename = "${path.module}/etl/platform/support/freshdesk/scripts/process_tickets.py" +} + +resource "aws_s3_object" "platform_support_freshdesk" { + bucket = var.glue_bucket_name + key = "platform/support/freshdesk/process_tickets.py" + source = data.local_file.platform_support_freshdesk.filename + etag = filemd5(data.local_file.platform_support_freshdesk.filename) +} + +resource "aws_glue_job" "platform_support_freshdesk" { + name = "Platform / Support / Freshdesk" + + timeout = 15 # minutes + role_arn = aws_iam_role.glue_etl.arn + security_configuration = aws_glue_security_configuration.encryption_at_rest.name + + command { + script_location = "s3://${var.glue_bucket_name}/${aws_s3_object.platform_support_freshdesk.key}" + python_version = "3" + } + + default_arguments = { + "--continuous-log-logGroup" = "/aws-glue/jobs/${aws_glue_security_configuration.encryption_at_rest.name}/service-role/${aws_iam_role.glue_etl.name}/output" + "--continuous-log-logStreamPrefix" = "platform_support_freshdesk" + "--enable-continuous-cloudwatch-log" = "true" + "--enable-continuous-log-filter" = "true" + "--enable-metrics" = "true" + "--enable-observability-metrics" = "true" + "--additional-python-modules" = "awswrangler==3.11.0" + "--source_bucket" = var.raw_bucket_name + "--source_prefix" = "platform/support/freshdesk/" + "--transformed_bucket" = var.transformed_bucket_name + "--transformed_prefix" = "platform/support/freshdesk/" + "--database_name_raw" = aws_glue_catalog_database.platform_support_production_raw.name + "--database_name_transformed" = aws_glue_catalog_database.platform_support_production.name + "--table_name" = "platform_support_freshdesk" + } +} diff --git a/terragrunt/aws/glue/etl/platform/support/freshdesk/scripts/Makefile b/terragrunt/aws/glue/etl/platform/support/freshdesk/scripts/Makefile new file mode 100644 index 0000000..5a063e2 --- /dev/null +++ b/terragrunt/aws/glue/etl/platform/support/freshdesk/scripts/Makefile @@ -0,0 +1,24 @@ +default: + python process_ticket_extract.py + +fmt: + black . $(ARGS) + +install: + pip3 install --user -r requirements.txt + +install_dev: + pip3 install --user -r requirements_dev.txt + +lint: + flake8 --ignore=E501 *.py + +test: + python -m pytest -s -vv . + +.PHONY: \ + fmt \ + install \ + install_dev \ + lint \ + test \ No newline at end of file diff --git a/terragrunt/aws/glue/etl/platform/support/freshdesk/scripts/process_tickets.py b/terragrunt/aws/glue/etl/platform/support/freshdesk/scripts/process_tickets.py new file mode 100644 index 0000000..1714d64 --- /dev/null +++ b/terragrunt/aws/glue/etl/platform/support/freshdesk/scripts/process_tickets.py @@ -0,0 +1,177 @@ +import logging +import sys + +from datetime import datetime, UTC +from dateutil.relativedelta import relativedelta + +import awswrangler as wr +import pandas as pd + +from awsglue.context import GlueContext +from awsglue.job import Job +from awsglue.utils import getResolvedOptions +from pyspark.context import SparkContext + +logger = logging.getLogger() +logger.setLevel("INFO") + +args = getResolvedOptions( + sys.argv, + [ + "JOB_NAME", + "source_bucket", + "source_prefix", + "transformed_bucket", + "transformed_prefix", + "database_name_raw", + "database_name_transformed", + "table_name", + ], +) + +JOB_NAME = args["JOB_NAME"] +SOURCE_BUCKET = args["source_bucket"] +SOURCE_PREFIX = args["source_prefix"] +TRANSFORMED_BUCKET = args["transformed_bucket"] +TRANSFORMED_PREFIX = args["transformed_prefix"] +TRANSFORMED_PATH = f"s3://{TRANSFORMED_BUCKET}/{TRANSFORMED_PREFIX}" +PARTITION_KEY = "month" +DATABASE_NAME_RAW = args["database_name_raw"] +DATABASE_NAME_TRANSFORMED = args["database_name_transformed"] +TABLE_NAME = args["table_name"] + + +def validate_schema(dataframe: pd.DataFrame, glue_table_schema: pd.DataFrame) -> bool: + """ + Validate that the DataFrame conforms to the Glue table schema. + """ + for _, row in glue_table_schema.iterrows(): + column_name = row["Column Name"] + column_type = row["Type"] + if column_name not in dataframe.columns: + logger.error(f"Validation failed: Missing column '{column_name}'") + return False + + if not is_type_compatible(dataframe[column_name], column_type): + logger.error( + f"Validation failed: Column '{column_name}' type mismatch. Expected {column_type}" + ) + return False + + return True + + +def is_type_compatible(series: pd.Series, glue_type: str) -> bool: + """ + Check if a pandas Series is compatible with a Glue type. + """ + glue_to_pandas = { + "string": pd.StringDtype(), + "int": pd.Int64Dtype(), + "bigint": pd.Int64Dtype(), + "double": float, + "float": float, + "boolean": bool, + "date": "datetime64[ns]", + "timestamp": "datetime64[ns]", + "array": pd.StringDtype(), + } + expected_type = glue_to_pandas.get(glue_type.lower()) + if expected_type is None: + logger.error(f"Unknown Glue type '{glue_type}' for validation.") + return False + try: + series.astype(expected_type) + except (ValueError, TypeError): + return False + return True + + +def get_days_tickets(day: datetime) -> pd.DataFrame: + """ + Load the JSON file containing the tickets for a specific day. + """ + day_formated = day.strftime("%Y-%m-%d") + source_file_path = f"s3://{SOURCE_BUCKET}/{SOURCE_PREFIX}{PARTITION_KEY}={day_formated[:7]}/{day_formated}.json" + logger.info(f"Loading source JSON file: {source_file_path}") + return wr.s3.read_json(source_file_path) + + +def get_existing_tickets(start_date: str) -> pd.DataFrame: + """ + Load the existing transformed data from the S3 bucket. + """ + start_date_formatted = start_date.strftime("%Y-%m") + logger.info(f"Loading transformed data from {start_date_formatted} onwards...") + existing_tickets = pd.DataFrame() + try: + existing_tickets = wr.s3.read_parquet( + path=TRANSFORMED_PATH, + dataset=True, + partition_filter=( + lambda partition: partition[PARTITION_KEY] >= start_date_formatted + ), + ) + except wr.exceptions.NoFilesFound: + logger.warning("No existing data found. Starting fresh.") + + return existing_tickets + + +def merge_tickets( + existing_tickets: pd.DataFrame, new_tickets: pd.DataFrame +) -> pd.DataFrame: + """ + Merge the existing and new tickets DataFrames. + """ + if existing_tickets.empty: + return new_tickets + + existing_tickets["id"] = existing_tickets["id"].astype(str) + new_tickets["id"] = new_tickets["id"].astype(str) + + combined_tickets = pd.concat([existing_tickets, new_tickets], ignore_index=True) + combined_tickets = combined_tickets.sort_values( + by=["id", "updated_at"], ascending=[True, False] + ) + combined_tickets = combined_tickets.drop_duplicates(subset=["id"], keep="first") + return combined_tickets + + +def process_tickets(): + # Get yesterday's tickets + yesterday = datetime.now(UTC) - relativedelta(days=1) + new_tickets = get_days_tickets(yesterday) + + # Check that the new tickets schema matches the expected schema + glue_table_schema = wr.catalog.table(database=DATABASE_NAME_RAW, table=TABLE_NAME) + if not validate_schema(new_tickets, glue_table_schema): + raise ValueError("Schema validation failed. Aborting ETL process.") + + # Load 4 months of existing ticket data + start_date = datetime.now(UTC) - relativedelta(months=4) + existing_tickets = get_existing_tickets(start_date) + + # Merge the existing and new tickets and save + combined_tickets = merge_tickets(existing_tickets, new_tickets) + logger.info("Saving updated DataFrame to S3...") + wr.s3.to_parquet( + df=combined_tickets, + path=TRANSFORMED_PATH, + dataset=True, + mode="overwrite_partitions", + database=DATABASE_NAME_TRANSFORMED, + table=TABLE_NAME, + partition_cols=[PARTITION_KEY], + ) + logger.info("ETL process completed successfully.") + + +sparkContext = SparkContext() +glueContext = GlueContext(sparkContext) +job = Job(glueContext) +job.init(JOB_NAME, args) + +process_tickets() + +job.commit() diff --git a/terragrunt/aws/glue/etl/platform/support/freshdesk/scripts/requirements.txt b/terragrunt/aws/glue/etl/platform/support/freshdesk/scripts/requirements.txt new file mode 100644 index 0000000..150e31b --- /dev/null +++ b/terragrunt/aws/glue/etl/platform/support/freshdesk/scripts/requirements.txt @@ -0,0 +1,2 @@ +awswrangler==3.11.0 +pandas==2.2.3 \ No newline at end of file diff --git a/terragrunt/aws/glue/etl/platform/support/freshdesk/scripts/requirements_dev.txt b/terragrunt/aws/glue/etl/platform/support/freshdesk/scripts/requirements_dev.txt new file mode 100644 index 0000000..d59fa86 --- /dev/null +++ b/terragrunt/aws/glue/etl/platform/support/freshdesk/scripts/requirements_dev.txt @@ -0,0 +1,3 @@ +black==24.10.0 +flake8==7.1.1 +pytest==8.3.4 \ No newline at end of file