-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdata_aggregator.py
92 lines (79 loc) · 3.24 KB
/
data_aggregator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
import logging
from datetime import datetime, timedelta
from database import get_db
logger = logging.getLogger(__name__)
def aggregate_data():
logger.info("Starting data aggregation...")
db = get_db()
try:
with db.conn.cursor() as cursor:
# Ensure the unique constraint exists
cursor.execute("""
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM pg_constraint
WHERE conname = 'unique_metric_timestamp'
) THEN
ALTER TABLE metrics
ADD CONSTRAINT unique_metric_timestamp
UNIQUE (host_id, metric_name, timestamp);
END IF;
END $$;
""")
# Define aggregation periods
aggregation_periods = [
# Data older than 7 days but newer than 30 days:
# Aggregate to hourly intervals
(timedelta(days=7), 'hour'),
# Data older than 30 days but newer than 90 days:
# Aggregate to daily intervals
(timedelta(days=30), 'day'),
# Data older than 90 days but newer than 365 days:
# Aggregate to weekly intervals
(timedelta(days=90), 'week'),
]
for retention_period, interval in aggregation_periods:
aggregate_period(cursor, retention_period, interval)
# Delete data older than 1 year
delete_old_data(cursor, timedelta(days=365))
db.conn.commit()
logger.info("Data aggregation complete.")
except Exception as e:
db.conn.rollback()
logger.error(f"An error occurred during data aggregation: {str(e)}")
def aggregate_period(cursor, retention_period, interval):
end_date = datetime.now() - retention_period
start_date = end_date - retention_period
cursor.execute("""
WITH aggregated_data AS (
SELECT
host_id,
metric_name,
date_trunc(%s, to_timestamp(timestamp)) as bucket,
AVG(value) as avg_value
FROM metrics
WHERE timestamp >= %s AND timestamp < %s
GROUP BY host_id, metric_name, bucket
),
deleted_data AS (
DELETE FROM metrics
WHERE timestamp >= %s AND timestamp < %s
RETURNING host_id, metric_name, timestamp, value
)
INSERT INTO metrics (host_id, metric_name, timestamp, value)
SELECT
host_id,
metric_name,
EXTRACT(EPOCH FROM bucket),
avg_value
FROM aggregated_data
ON CONFLICT (host_id, metric_name, timestamp)
DO UPDATE SET value = EXCLUDED.value
""", (interval, start_date.timestamp(), end_date.timestamp(),
start_date.timestamp(), end_date.timestamp()))
logger.info(f"Aggregated data to {interval} intervals for period ending {end_date}")
def delete_old_data(cursor, max_age):
cutoff_date = datetime.now() - max_age
cursor.execute("DELETE FROM metrics WHERE timestamp < %s", (cutoff_date.timestamp(),))
logger.info(f"Deleted data older than {cutoff_date}")