From c9b2c6dc9dbcd5649dd71783800a87cc7be105fc Mon Sep 17 00:00:00 2001 From: Bhargav Dodla Date: Thu, 16 Jan 2025 13:09:49 +0530 Subject: [PATCH] fix: Implement shutdown method for CassandraOnlineStore and ensure proper shutdown in SparkKafkaProcessor --- sdk/python/feast/infra/contrib/spark_kafka_processor.py | 2 ++ .../cassandra_online_store/cassandra_online_store.py | 9 +++++++++ 2 files changed, 11 insertions(+) diff --git a/sdk/python/feast/infra/contrib/spark_kafka_processor.py b/sdk/python/feast/infra/contrib/spark_kafka_processor.py index b19137a452..d283160bd1 100644 --- a/sdk/python/feast/infra/contrib/spark_kafka_processor.py +++ b/sdk/python/feast/infra/contrib/spark_kafka_processor.py @@ -313,6 +313,8 @@ def batch_write_pandas_df(iterator, spark_serialized_artifacts, join_keys): lambda x: None, ) + online_store.shutdown() + yield pd.DataFrame([pd.Series(range(1, 2))]) # dummy result def batch_write( diff --git a/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py b/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py index 9334bf49fe..e84afefe35 100644 --- a/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py +++ b/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py @@ -209,6 +209,15 @@ class CassandraOnlineStore(OnlineStore): _keyspace: str = "feast_keyspace" _prepared_statements: Dict[str, PreparedStatement] = {} + def shutdown(self): + """ + Shutdown the Cassandra cluster and session. + """ + if not self._session.is_shutdown: + self._session.shutdown() + if not self._cluster.is_shutdown: + self._cluster.shutdown() + def _get_cluster(self, config: RepoConfig): """ Establish the database connection, if not yet created,