From 02e8fc7dcd02be736dd84f6c2961949f6c3cde32 Mon Sep 17 00:00:00 2001 From: Remy Gwaramadze Date: Fri, 17 Jan 2025 11:30:00 +0100 Subject: [PATCH] Refactor expire_windows - simplify code --- .../state/rocksdb/windowed/transaction.py | 20 +++++-------------- 1 file changed, 5 insertions(+), 15 deletions(-) diff --git a/quixstreams/state/rocksdb/windowed/transaction.py b/quixstreams/state/rocksdb/windowed/transaction.py index 58b854ee7..24267ba25 100644 --- a/quixstreams/state/rocksdb/windowed/transaction.py +++ b/quixstreams/state/rocksdb/windowed/transaction.py @@ -183,19 +183,15 @@ def expire_windows( Relevant only together with `collect=True`. :return: A sorted list of tuples in the format `((start, end), value)`. """ - start_from = -1 + cache = self._last_expired_timestamps # Find the latest start timestamp of the expired windows for the given key - last_expired = self._get_timestamp( - cache=self._last_expired_timestamps, prefix=prefix - ) - if last_expired is not None: - start_from = max(start_from, last_expired) + start_from_ms = self._get_timestamp(cache=cache, prefix=prefix) or -1 # Use the latest expired timestamp to limit the iteration over # only those windows that have not been expired before expired_windows = self.get_windows( - start_from_ms=start_from, + start_from_ms=start_from_ms, start_to_ms=max_start_time, prefix=prefix, ) @@ -203,14 +199,8 @@ def expire_windows( return [] # Save the start of the latest expired window to the expiration index - latest_window = expired_windows[-1] - last_expired__gt = latest_window[0][0] - - self._set_timestamp( - cache=self._last_expired_timestamps, - prefix=prefix, - timestamp_ms=last_expired__gt, - ) + timestamp_ms = expired_windows[-1][0][0] # [..., [(start, end), value]] + self._set_timestamp(cache=cache, prefix=prefix, timestamp_ms=timestamp_ms) if not collect: return expired_windows