Skip to content

Commit

Permalink
chore: adding logging for testing
Browse files Browse the repository at this point in the history
  • Loading branch information
omiranda committed Jan 17, 2025
1 parent 761a503 commit e880a5f
Showing 1 changed file with 14 additions and 6 deletions.
20 changes: 14 additions & 6 deletions sdk/python/feast/infra/online_stores/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ def _get_client(self, online_store_config: RedisOnlineStoreConfig):
Creates the Redis client RedisCluster or Redis depending on configuration
"""
if not self._client:
print(f"Creating Redis client for {online_store_config.redis_type}")
startup_nodes, kwargs = self._parse_connection_string(
online_store_config.connection_string
)
Expand All @@ -248,6 +249,8 @@ def _get_client(self, online_store_config: RedisOnlineStoreConfig):
kwargs["host"] = startup_nodes[0]["host"]
kwargs["port"] = startup_nodes[0]["port"]
self._client = Redis(**kwargs)
else:
print("Returning existing Redis client")
return self._client

async def _get_client_async(self, online_store_config: RedisOnlineStoreConfig):
Expand Down Expand Up @@ -284,13 +287,16 @@ def _do_online_write_batch(
progress: Optional[Callable[[int], Any]],
force_overwrite: bool,
) -> None:
overall_start = time.perf_counter()

online_store_config = config.online_store
assert isinstance(online_store_config, RedisOnlineStoreConfig)

start_time = time.time()
t0 = time.perf_counter()
client = self._get_client(online_store_config)
client_time = time.time() - start_time
print(f"Client initialization time: {client_time:.4f} seconds")
t1 = time.perf_counter()
print(f"Time to get/create Redis client: {t1 - t0:.4f} seconds")

project = config.project

feature_view = table.name
Expand All @@ -299,7 +305,7 @@ def _do_online_write_batch(
# If force_overwrite is True, skip retrieving timestamps
if force_overwrite:
print("forcing online data to be overwritten")
start_time = time.time()
overwrite_start = time.perf_counter()
with client.pipeline(transaction=False) as pipe:
for entity_key, values_dict, timestamp, _ in data:
redis_key_bin = _redis_key(
Expand Down Expand Up @@ -332,8 +338,8 @@ def _do_online_write_batch(
results = pipe.execute()
if progress:
progress(len(results))
overwrite_time = time.time() - start_time
print(f"Force overwrite time: {overwrite_time:.4f} seconds")
overwrite_end = time.perf_counter()
print(f"Force-overwrite pipeline execution time: {overwrite_end - overwrite_start:.4f} seconds")

else:
keys = []
Expand Down Expand Up @@ -392,6 +398,8 @@ def _do_online_write_batch(
results = pipe.execute()
if progress:
progress(len(results))
overall_end = time.perf_counter()
print(f"_do_online_write_batch total time: {overall_end - overall_start:.4f} seconds")

def _generate_redis_keys_for_entities(
self, config: RepoConfig, entity_keys: List[EntityKeyProto]
Expand Down

0 comments on commit e880a5f

Please sign in to comment.