Skip to content

Commit 50c51c4

Browse files
committed
Fix handling of rejected setup errors (#1117)
Closes gh-1092
1 parent f591f9d commit 50c51c4

File tree

3 files changed

+47
-4
lines changed

3 files changed

+47
-4
lines changed

rsocket-core/src/main/java/io/rsocket/core/RSocketServer.java

+9-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2020 the original author or authors.
2+
* Copyright 2015-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -463,8 +463,14 @@ private Mono<Void> acceptSetup(
463463
return interceptors
464464
.initSocketAcceptor(acceptor)
465465
.accept(setupPayload, wrappedRSocketRequester)
466-
.doOnError(
467-
err -> serverSetup.sendError(wrappedDuplexConnection, rejectedSetupError(err)))
466+
.onErrorResume(
467+
err ->
468+
Mono.fromRunnable(
469+
() ->
470+
serverSetup.sendError(
471+
wrappedDuplexConnection, rejectedSetupError(err)))
472+
.then(wrappedDuplexConnection.onClose())
473+
.then(Mono.error(err)))
468474
.doOnNext(
469475
rSocketHandler -> {
470476
RSocket wrappedRSocketHandler = interceptors.initResponder(rSocketHandler);

rsocket-core/src/main/java/io/rsocket/resume/ServerRSocketSession.java

+3
Original file line numberDiff line numberDiff line change
@@ -287,6 +287,9 @@ public void setKeepAliveSupport(KeepAliveSupport keepAliveSupport) {
287287

288288
@Override
289289
public void dispose() {
290+
if (logger.isDebugEnabled()) {
291+
logger.debug("Side[server]|Session[{}]. Disposing session", session);
292+
}
290293
Operators.terminate(S, this);
291294
resumableConnection.dispose();
292295
}

rsocket-core/src/test/java/io/rsocket/core/RSocketServerTest.java

+35-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2021 the original author or authors.
2+
* Copyright 2015-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -23,11 +23,14 @@
2323
import io.rsocket.Closeable;
2424
import io.rsocket.FrameAssert;
2525
import io.rsocket.RSocket;
26+
import io.rsocket.exceptions.RejectedSetupException;
2627
import io.rsocket.frame.FrameType;
2728
import io.rsocket.frame.KeepAliveFrameCodec;
2829
import io.rsocket.frame.RequestResponseFrameCodec;
30+
import io.rsocket.frame.SetupFrameCodec;
2931
import io.rsocket.test.util.TestDuplexConnection;
3032
import io.rsocket.test.util.TestServerTransport;
33+
import io.rsocket.util.EmptyPayload;
3134
import java.time.Duration;
3235
import java.util.Random;
3336
import org.assertj.core.api.Assertions;
@@ -164,4 +167,35 @@ public void unexpectedFramesBeforeSetup() {
164167
server.dispose();
165168
transport.alloc().assertHasNoLeaks();
166169
}
170+
171+
@Test
172+
public void ensuresErrorFrameDeliveredPriorConnectionDisposal() {
173+
TestServerTransport transport = new TestServerTransport();
174+
Closeable server =
175+
RSocketServer.create()
176+
.acceptor(
177+
(setup, sendingSocket) -> Mono.error(new RejectedSetupException("ACCESS_DENIED")))
178+
.bind(transport)
179+
.block();
180+
181+
TestDuplexConnection connection = transport.connect();
182+
connection.addToReceivedBuffer(
183+
SetupFrameCodec.encode(
184+
ByteBufAllocator.DEFAULT,
185+
false,
186+
0,
187+
1,
188+
Unpooled.EMPTY_BUFFER,
189+
"metadata_type",
190+
"data_type",
191+
EmptyPayload.INSTANCE));
192+
193+
StepVerifier.create(connection.onClose()).expectComplete().verify(Duration.ofSeconds(30));
194+
FrameAssert.assertThat(connection.pollFrame())
195+
.hasStreamIdZero()
196+
.hasData("ACCESS_DENIED")
197+
.hasNoLeaks();
198+
server.dispose();
199+
transport.alloc().assertHasNoLeaks();
200+
}
167201
}

0 commit comments

Comments
 (0)