Skip to content

Commit

Permalink
feat: add Freshdesk ETL job
Browse files Browse the repository at this point in the history
Add the ETL job that will process Freshdesk tickets.
  • Loading branch information
patheard committed Jan 17, 2025
1 parent a0d8a1e commit 32c5423
Show file tree
Hide file tree
Showing 5 changed files with 251 additions and 1 deletion.
46 changes: 45 additions & 1 deletion terragrunt/aws/glue/etl.tf
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#
# ETL jobs
# Platform / GC Forms / Generate test data
#
data "local_file" "forms_generate_test_data" {
filename = "${path.module}/etl/platform/gc-forms/scripts/generate_test_data.py"
Expand Down Expand Up @@ -35,3 +35,47 @@ resource "aws_glue_job" "forms_generate_test_data" {
"--num_partitions" = "1"
}
}

#
# Platform / Support / Generate test data
#
data "local_file" "platform_support_freshdesk" {
filename = "${path.module}/etl/platform/support/freshdesk/scripts/process_tickets.py"
}

resource "aws_s3_object" "platform_support_freshdesk" {
bucket = var.glue_bucket_name
key = "platform/support/freshdesk/process_tickets.py"
source = data.local_file.platform_support_freshdesk.filename
etag = filemd5(data.local_file.platform_support_freshdesk.filename)
}

resource "aws_glue_job" "platform_support_freshdesk" {
name = "Platform / Support / Freshdesk"

timeout = 15 # minutes
role_arn = aws_iam_role.glue_etl.arn
security_configuration = aws_glue_security_configuration.encryption_at_rest.name

command {
script_location = "s3://${var.glue_bucket_name}/${aws_s3_object.platform_support_freshdesk.key}"
python_version = "3"
}

default_arguments = {
"--continuous-log-logGroup" = "/aws-glue/jobs/${aws_glue_security_configuration.encryption_at_rest.name}/service-role/${aws_iam_role.glue_etl.name}/output"
"--continuous-log-logStreamPrefix" = "platform_support_freshdesk"
"--enable-continuous-cloudwatch-log" = "true"
"--enable-continuous-log-filter" = "true"
"--enable-metrics" = "true"
"--enable-observability-metrics" = "true"
"--additional-python-modules" = "awswrangler==3.11.0"
"--source_bucket" = var.raw_bucket_name
"--source_prefix" = "platform/support/freshdesk/"
"--transformed_bucket" = var.transformed_bucket_name
"--transformed_prefix" = "platform/support/freshdesk/"
"--database_name_raw" = aws_glue_catalog_database.platform_support_production_raw.name
"--database_name_transformed" = aws_glue_catalog_database.platform_support_production.name
"--table_name" = "platform_support_freshdesk"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
default:
python process_ticket_extract.py

fmt:
black . $(ARGS)

install:
pip3 install --user -r requirements.txt

install_dev:
pip3 install --user -r requirements_dev.txt

lint:
flake8 --ignore=E501 *.py

test:
python -m pytest -s -vv .

.PHONY: \
fmt \
install \
install_dev \
lint \
test
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
import logging
import sys

from datetime import datetime, UTC
from dateutil.relativedelta import relativedelta

import awswrangler as wr
import pandas as pd

from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext

logger = logging.getLogger()
logger.setLevel("INFO")

args = getResolvedOptions(
sys.argv,
[
"JOB_NAME",
"source_bucket",
"source_prefix",
"transformed_bucket",
"transformed_prefix",
"database_name_raw",
"database_name_transformed",
"table_name",
],
)

JOB_NAME = args["JOB_NAME"]
SOURCE_BUCKET = args["source_bucket"]
SOURCE_PREFIX = args["source_prefix"]
TRANSFORMED_BUCKET = args["transformed_bucket"]
TRANSFORMED_PREFIX = args["transformed_prefix"]
TRANSFORMED_PATH = f"s3://{TRANSFORMED_BUCKET}/{TRANSFORMED_PREFIX}"
PARTITION_KEY = "month"
DATABASE_NAME_RAW = args["database_name_raw"]
DATABASE_NAME_TRANSFORMED = args["database_name_transformed"]
TABLE_NAME = args["table_name"]


def validate_schema(dataframe: pd.DataFrame, glue_table_schema: pd.DataFrame) -> bool:
"""
Validate that the DataFrame conforms to the Glue table schema.
"""
for _, row in glue_table_schema.iterrows():
column_name = row["Column Name"]
column_type = row["Type"]
if column_name not in dataframe.columns:
logger.error(f"Validation failed: Missing column '{column_name}'")
return False

if not is_type_compatible(dataframe[column_name], column_type):
logger.error(
f"Validation failed: Column '{column_name}' type mismatch. Expected {column_type}"
)
return False

return True


def is_type_compatible(series: pd.Series, glue_type: str) -> bool:
"""
Check if a pandas Series is compatible with a Glue type.
"""
glue_to_pandas = {
"string": pd.StringDtype(),
"int": pd.Int64Dtype(),
"bigint": pd.Int64Dtype(),
"double": float,
"float": float,
"boolean": bool,
"date": "datetime64[ns]",
"timestamp": "datetime64[ns]",
"array<string>": pd.StringDtype(),
}
expected_type = glue_to_pandas.get(glue_type.lower())
if expected_type is None:
logger.error(f"Unknown Glue type '{glue_type}' for validation.")
return False
try:
series.astype(expected_type)
except (ValueError, TypeError):
return False
return True


def get_days_tickets(day: datetime) -> pd.DataFrame:
"""
Load the JSON file containing the tickets for a specific day.
"""
day_formated = day.strftime("%Y-%m-%d")
source_file_path = f"s3://{SOURCE_BUCKET}/{SOURCE_PREFIX}{PARTITION_KEY}={day_formated[:7]}/{day_formated}.json"
logger.info(f"Loading source JSON file: {source_file_path}")
return wr.s3.read_json(source_file_path)


def get_existing_tickets(start_date: str) -> pd.DataFrame:
"""
Load the existing transformed data from the S3 bucket.
"""
start_date_formatted = start_date.strftime("%Y-%m")
logger.info(f"Loading transformed data from {start_date_formatted} onwards...")
existing_tickets = pd.DataFrame()
try:
existing_tickets = wr.s3.read_parquet(
path=TRANSFORMED_PATH,
dataset=True,
partition_filter=(
lambda partition: partition[PARTITION_KEY] >= start_date_formatted
),
)
except wr.exceptions.NoFilesFound:
logger.warning("No existing data found. Starting fresh.")

return existing_tickets


def merge_tickets(
existing_tickets: pd.DataFrame, new_tickets: pd.DataFrame
) -> pd.DataFrame:
"""
Merge the existing and new tickets DataFrames.
"""
if existing_tickets.empty:
return new_tickets

existing_tickets["id"] = existing_tickets["id"].astype(str)
new_tickets["id"] = new_tickets["id"].astype(str)

combined_tickets = pd.concat([existing_tickets, new_tickets], ignore_index=True)
combined_tickets = combined_tickets.sort_values(
by=["id", "updated_at"], ascending=[True, False]
)
combined_tickets = combined_tickets.drop_duplicates(subset=["id"], keep="first")
return combined_tickets


def process_tickets():
# Get yesterday's tickets
yesterday = datetime.now(UTC) - relativedelta(days=1)
new_tickets = get_days_tickets(yesterday)

# Check that the new tickets schema matches the expected schema
glue_table_schema = wr.catalog.table(database=DATABASE_NAME_RAW, table=TABLE_NAME)
if not validate_schema(new_tickets, glue_table_schema):
raise ValueError("Schema validation failed. Aborting ETL process.")

# Load 4 months of existing ticket data
start_date = datetime.now(UTC) - relativedelta(months=4)
existing_tickets = get_existing_tickets(start_date)

# Merge the existing and new tickets and save
combined_tickets = merge_tickets(existing_tickets, new_tickets)
logger.info("Saving updated DataFrame to S3...")
wr.s3.to_parquet(
df=combined_tickets,
path=TRANSFORMED_PATH,
dataset=True,
mode="overwrite_partitions",
database=DATABASE_NAME_TRANSFORMED,
table=TABLE_NAME,
partition_cols=[PARTITION_KEY],
)
logger.info("ETL process completed successfully.")


sparkContext = SparkContext()
glueContext = GlueContext(sparkContext)
job = Job(glueContext)
job.init(JOB_NAME, args)

process_tickets()

job.commit()
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
awswrangler==3.11.0
pandas==2.2.3
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
black==24.10.0
flake8==7.1.1
pytest==8.3.4

0 comments on commit 32c5423

Please sign in to comment.