Skip to content

Commit

Permalink
Fix NPE for transaction id handling
Browse files Browse the repository at this point in the history
  • Loading branch information
alex268 committed Nov 5, 2024
1 parent e6bc656 commit 1c2cb00
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 10 deletions.
11 changes: 5 additions & 6 deletions query/src/main/java/tech/ydb/query/impl/SessionImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,7 @@ public QueryStream createQuery(String query, TxMode tx, Params prms, ExecuteQuer
YdbQuery.TransactionControl tc = TxControl.txModeCtrl(tx, true);
return new StreamImpl(createGrpcStream(query, tc, prms, settings)) {
@Override
void handleTxMeta(YdbQuery.TransactionMeta meta) {
String txID = meta == null ? null : meta.getId();
void handleTxMeta(String txID) {
if (txID != null && !txID.isEmpty()) {
logger.warn("{} got unexpected transaction id {}", SessionImpl.this, txID);
}
Expand Down Expand Up @@ -253,7 +252,7 @@ abstract class StreamImpl implements QueryStream {
this.grpcStream = grpcStream;
}

abstract void handleTxMeta(YdbQuery.TransactionMeta meta);
abstract void handleTxMeta(String txId);
void handleCompletion(Status status, Throwable th) { }

@Override
Expand All @@ -276,7 +275,7 @@ public CompletableFuture<Result<QueryInfo>> execute(PartsHandler handler) {
}

if (msg.hasTxMeta()) {
handleTxMeta(msg.getTxMeta());
handleTxMeta(msg.getTxMeta().getId());
}
if (issues.length > 0) {
if (handler != null) {
Expand Down Expand Up @@ -352,8 +351,8 @@ public QueryStream createQuery(String query, boolean commitAtEnd, Params prms, E

return new StreamImpl(createGrpcStream(query, tc, prms, settings)) {
@Override
void handleTxMeta(YdbQuery.TransactionMeta meta) {
String newId = meta == null || meta.getId() == null || meta.getId().isEmpty() ? null : meta.getId();
void handleTxMeta(String txID) {
String newId = txID == null || txID.isEmpty() ? null : txID;
if (!txId.compareAndSet(currentId, newId)) {
logger.warn("{} lost transaction meta id {}", SessionImpl.this, newId);
}
Expand Down
8 changes: 4 additions & 4 deletions query/src/main/java/tech/ydb/query/impl/TableClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -156,17 +156,17 @@ public CompletableFuture<Result<DataQueryResult>> executeDataQueryInternal(
.withRequestTimeout(settings.getTimeoutDuration())
.build();

final AtomicReference<String> txID = new AtomicReference<>("");
final AtomicReference<String> txRef = new AtomicReference<>("");
QueryStream stream = querySession.new StreamImpl(querySession.createGrpcStream(query, tc, prms, qs)) {
@Override
void handleTxMeta(YdbQuery.TransactionMeta meta) {
txID.set(meta.getId());
void handleTxMeta(String txID) {
txRef.set(txID);
}
};

return QueryReader.readFrom(stream)
.thenApply(r -> r.map(
reader -> new ProxedDataQueryResult(txID.get(), reader)
reader -> new ProxedDataQueryResult(txRef.get(), reader)
));
}

Expand Down

0 comments on commit 1c2cb00

Please sign in to comment.