Skip to content

Commit

Permalink
Refactor expire_windows - simplify code
Browse files Browse the repository at this point in the history
  • Loading branch information
gwaramadze committed Jan 17, 2025
1 parent b25a0cd commit 02e8fc7
Showing 1 changed file with 5 additions and 15 deletions.
20 changes: 5 additions & 15 deletions quixstreams/state/rocksdb/windowed/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,34 +183,24 @@ def expire_windows(
Relevant only together with `collect=True`.
:return: A sorted list of tuples in the format `((start, end), value)`.
"""
start_from = -1
cache = self._last_expired_timestamps

# Find the latest start timestamp of the expired windows for the given key
last_expired = self._get_timestamp(
cache=self._last_expired_timestamps, prefix=prefix
)
if last_expired is not None:
start_from = max(start_from, last_expired)
start_from_ms = self._get_timestamp(cache=cache, prefix=prefix) or -1

# Use the latest expired timestamp to limit the iteration over
# only those windows that have not been expired before
expired_windows = self.get_windows(
start_from_ms=start_from,
start_from_ms=start_from_ms,
start_to_ms=max_start_time,
prefix=prefix,
)
if not expired_windows:
return []

# Save the start of the latest expired window to the expiration index
latest_window = expired_windows[-1]
last_expired__gt = latest_window[0][0]

self._set_timestamp(
cache=self._last_expired_timestamps,
prefix=prefix,
timestamp_ms=last_expired__gt,
)
timestamp_ms = expired_windows[-1][0][0] # [..., [(start, end), value]]
self._set_timestamp(cache=cache, prefix=prefix, timestamp_ms=timestamp_ms)

if not collect:
return expired_windows
Expand Down

0 comments on commit 02e8fc7

Please sign in to comment.