Skip to content

Commit

Permalink
fix: Improve session management in CassandraOnlineStore
Browse files Browse the repository at this point in the history
  • Loading branch information
Bhargav Dodla committed Jan 10, 2025
1 parent 6eebb75 commit b7541b2
Showing 1 changed file with 16 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -221,10 +221,12 @@ def _get_session(self, config: RepoConfig):
raise CassandraInvalidConfig(E_CASSANDRA_UNEXPECTED_CONFIGURATION_CLASS)

if self._session:
print("Reusing existing session..")
return self._session
else:
print("Creating a new session..")
if not self._session.is_shutdown:
print("Reusing existing session..")
return self._session
else:
self._session = None
print("Creating a new session..")
if not self._session:
# configuration consistency checks
hosts = online_store_config.hosts
Expand Down Expand Up @@ -348,7 +350,9 @@ def online_write_batch(
session: Session = self._get_session(config)
keyspace: str = self._keyspace
fqtable = CassandraOnlineStore._fq_table_name(keyspace, project, table)
insert_cql = self._get_cql_statement(config, "insert4", fqtable=fqtable)
insert_cql = self._get_cql_statement(
config, "insert4", fqtable=fqtable, session=session
)

futures = []
for entity_key, values, timestamp, created_ts in data:
Expand Down Expand Up @@ -390,6 +394,7 @@ def online_write_batch(
logger.error(f"Error writing a batch: {exc}")
print(f"Error writing a batch: {exc}")
raise Exception("Error writing a batch") from exc
session.shutdown()
# correction for the last missing call to `progress`:
if progress:
progress(1)
Expand Down Expand Up @@ -582,7 +587,12 @@ def _get_cql_statement(
This additional layer makes it easy to control whether to use prepared
statements and, if so, on which database operations.
"""
session: Session = self._get_session(config)
session: Session = None
if "session" in kwargs:
session = kwargs["session"]
else:
session = self._get_session(config)

template, prepare = CQL_TEMPLATE_MAP[op_name]
statement = template.format(
fqtable=fqtable,
Expand Down

0 comments on commit b7541b2

Please sign in to comment.