From e880a5f123b098112c03dd76d3eca7007b6d5ca0 Mon Sep 17 00:00:00 2001 From: omiranda Date: Fri, 17 Jan 2025 17:26:13 -0600 Subject: [PATCH] chore: adding logging for testing --- sdk/python/feast/infra/online_stores/redis.py | 20 +++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/redis.py b/sdk/python/feast/infra/online_stores/redis.py index c844133c76..37620b7342 100644 --- a/sdk/python/feast/infra/online_stores/redis.py +++ b/sdk/python/feast/infra/online_stores/redis.py @@ -224,6 +224,7 @@ def _get_client(self, online_store_config: RedisOnlineStoreConfig): Creates the Redis client RedisCluster or Redis depending on configuration """ if not self._client: + print(f"Creating Redis client for {online_store_config.redis_type}") startup_nodes, kwargs = self._parse_connection_string( online_store_config.connection_string ) @@ -248,6 +249,8 @@ def _get_client(self, online_store_config: RedisOnlineStoreConfig): kwargs["host"] = startup_nodes[0]["host"] kwargs["port"] = startup_nodes[0]["port"] self._client = Redis(**kwargs) + else: + print("Returning existing Redis client") return self._client async def _get_client_async(self, online_store_config: RedisOnlineStoreConfig): @@ -284,13 +287,16 @@ def _do_online_write_batch( progress: Optional[Callable[[int], Any]], force_overwrite: bool, ) -> None: + overall_start = time.perf_counter() + online_store_config = config.online_store assert isinstance(online_store_config, RedisOnlineStoreConfig) - start_time = time.time() + t0 = time.perf_counter() client = self._get_client(online_store_config) - client_time = time.time() - start_time - print(f"Client initialization time: {client_time:.4f} seconds") + t1 = time.perf_counter() + print(f"Time to get/create Redis client: {t1 - t0:.4f} seconds") + project = config.project feature_view = table.name @@ -299,7 +305,7 @@ def _do_online_write_batch( # If force_overwrite is True, skip retrieving timestamps if force_overwrite: print("forcing online data to be overwritten") - start_time = time.time() + overwrite_start = time.perf_counter() with client.pipeline(transaction=False) as pipe: for entity_key, values_dict, timestamp, _ in data: redis_key_bin = _redis_key( @@ -332,8 +338,8 @@ def _do_online_write_batch( results = pipe.execute() if progress: progress(len(results)) - overwrite_time = time.time() - start_time - print(f"Force overwrite time: {overwrite_time:.4f} seconds") + overwrite_end = time.perf_counter() + print(f"Force-overwrite pipeline execution time: {overwrite_end - overwrite_start:.4f} seconds") else: keys = [] @@ -392,6 +398,8 @@ def _do_online_write_batch( results = pipe.execute() if progress: progress(len(results)) + overall_end = time.perf_counter() + print(f"_do_online_write_batch total time: {overall_end - overall_start:.4f} seconds") def _generate_redis_keys_for_entities( self, config: RepoConfig, entity_keys: List[EntityKeyProto]