Skip to content

Commit

Permalink
Fix handling of rejected setup errors (#1117)
Browse files Browse the repository at this point in the history
Closes gh-1092
  • Loading branch information
rstoyanchev authored Jan 24, 2025
1 parent 6e59179 commit 7abe35e
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 4 deletions.
12 changes: 9 additions & 3 deletions rsocket-core/src/main/java/io/rsocket/core/RSocketServer.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2020 the original author or authors.
* Copyright 2015-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -463,8 +463,14 @@ private Mono<Void> acceptSetup(
return interceptors
.initSocketAcceptor(acceptor)
.accept(setupPayload, wrappedRSocketRequester)
.doOnError(
err -> serverSetup.sendError(wrappedDuplexConnection, rejectedSetupError(err)))
.onErrorResume(
err ->
Mono.fromRunnable(
() ->
serverSetup.sendError(
wrappedDuplexConnection, rejectedSetupError(err)))
.then(wrappedDuplexConnection.onClose())
.then(Mono.error(err)))
.doOnNext(
rSocketHandler -> {
RSocket wrappedRSocketHandler = interceptors.initResponder(rSocketHandler);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,9 @@ public void setKeepAliveSupport(KeepAliveSupport keepAliveSupport) {

@Override
public void dispose() {
if (logger.isDebugEnabled()) {
logger.debug("Side[server]|Session[{}]. Disposing session", session);
}
Operators.terminate(S, this);
resumableConnection.dispose();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2021 the original author or authors.
* Copyright 2015-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -23,11 +23,14 @@
import io.rsocket.Closeable;
import io.rsocket.FrameAssert;
import io.rsocket.RSocket;
import io.rsocket.exceptions.RejectedSetupException;
import io.rsocket.frame.FrameType;
import io.rsocket.frame.KeepAliveFrameCodec;
import io.rsocket.frame.RequestResponseFrameCodec;
import io.rsocket.frame.SetupFrameCodec;
import io.rsocket.test.util.TestDuplexConnection;
import io.rsocket.test.util.TestServerTransport;
import io.rsocket.util.EmptyPayload;
import java.time.Duration;
import java.util.Random;
import org.assertj.core.api.Assertions;
Expand Down Expand Up @@ -164,4 +167,35 @@ public void unexpectedFramesBeforeSetup() {
server.dispose();
transport.alloc().assertHasNoLeaks();
}

@Test
public void ensuresErrorFrameDeliveredPriorConnectionDisposal() {
TestServerTransport transport = new TestServerTransport();
Closeable server =
RSocketServer.create()
.acceptor(
(setup, sendingSocket) -> Mono.error(new RejectedSetupException("ACCESS_DENIED")))
.bind(transport)
.block();

TestDuplexConnection connection = transport.connect();
connection.addToReceivedBuffer(
SetupFrameCodec.encode(
ByteBufAllocator.DEFAULT,
false,
0,
1,
Unpooled.EMPTY_BUFFER,
"metadata_type",
"data_type",
EmptyPayload.INSTANCE));

StepVerifier.create(connection.onClose()).expectComplete().verify(Duration.ofSeconds(30));
FrameAssert.assertThat(connection.pollFrame())
.hasStreamIdZero()
.hasData("ACCESS_DENIED")
.hasNoLeaks();
server.dispose();
transport.alloc().assertHasNoLeaks();
}
}

0 comments on commit 7abe35e

Please sign in to comment.