Skip to content

Commit

Permalink
detailed logging of the redis batching
Browse files Browse the repository at this point in the history
  • Loading branch information
omiranda committed Dec 4, 2024
1 parent 1f2a7e8 commit 865969d
Showing 1 changed file with 9 additions and 0 deletions.
9 changes: 9 additions & 0 deletions sdk/python/feast/infra/online_stores/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,7 @@ def online_write_batch(
# check if a previous record under the key bin exists
# TODO: investigate if check and set is a better approach rather than pulling all entity ts and then setting
# it may be significantly slower but avoids potential (rare) race conditions
hmget_start_time = time.time()
for entity_key, _, _, _ in data:
redis_key_bin = _redis_key(
project,
Expand All @@ -309,9 +310,13 @@ def online_write_batch(
keys.append(redis_key_bin)
pipe.hmget(redis_key_bin, ts_key)
prev_event_timestamps = pipe.execute()
hmget_end_time = time.time()
print(
f"INFO!!! hmget took {int((hmget_end_time - hmget_start_time) * 1000)} milliseconds")
# flattening the list of lists. `hmget` does the lookup assuming a list of keys in the key bin
prev_event_timestamps = [i[0] for i in prev_event_timestamps]

hset_start_time = time.time()
for redis_key_bin, prev_event_time, (_, values, timestamp, _) in zip(
keys, prev_event_timestamps, data
):
Expand Down Expand Up @@ -346,6 +351,10 @@ def online_write_batch(
if ttl:
pipe.expire(name=redis_key_bin, time=ttl)
results = pipe.execute()
hset_end_time = time.time()
print(
f"INFO!!! hset took {int((hset_end_time - hset_start_time) * 1000)} milliseconds")

if progress:
progress(len(results))
write_end_time = time.time()
Expand Down

0 comments on commit 865969d

Please sign in to comment.