-
Notifications
You must be signed in to change notification settings - Fork 184
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Cancel signal does not interrupt cursored query fetching #536
Comments
Not necessarily. When canceling the frontend publisher (the one used by the code that calls the driver), we must first drain the protocol state to avoid lingering responses on the inbound channel. We do not know what's next when canceling the frontend publisher. We can only reason about certain arrangements such as cursor consumption. The driver is able to pipeline requests and canceling a frontend publisher can easily cancel the "next" command that was already sent to the server.
I'm actually wondering why this is. The mentioned |
I'll try to come up with a small example. Thank you for your response. |
Sorry for the delay in responding. package io.r2dbc.postgresql;
import java.time.Duration;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.springframework.jdbc.core.JdbcOperations;
import reactor.test.StepVerifier;
final class CancelIntegrationTests extends AbstractIntegrationTests {
@Override
protected void customize(PostgresqlConnectionConfiguration.Builder builder) {
builder.fetchSize(100);
}
@BeforeEach
void setUp() {
super.setUp();
JdbcOperations jdbcOperations = SERVER.getJdbcOperations();
jdbcOperations.execute("DROP TABLE IF EXISTS foo;");
jdbcOperations.execute("CREATE TABLE foo AS \n"
+ " SELECT i FROM generate_series(1,200000) as i;");
}
void cancelExchange(boolean withBinding) {
io.r2dbc.postgresql.api.PostgresqlStatement statement;
if (withBinding) {
statement = this.connection.createStatement("SELECT * FROM foo WHERE $1 = $1")
.bind(0, 1);
} else {
statement = this.connection.createStatement("SELECT * FROM foo");
}
statement
.execute()
.flatMap(r -> r.map((row, meta) -> row.get(0, Integer.class)))
.as(StepVerifier::create)
.expectNextCount(5)
.thenCancel()
.verify(Duration.ofSeconds(2));
}
@Test
@Timeout(10)
void cancel_withBinding() {
for (int i = 0; i < 10; i++) {
try {
cancelExchange(true);
} catch (AssertionError e) {
System.out.println("Ignore error " + e.getMessage());
}
}
}
@Test
@Timeout(10)
void cancel_withoutBinding() {
for (int i = 0; i < 10; i++) {
try {
cancelExchange(false);
} catch (AssertionError e) {
System.out.println("Ignore error " + e.getMessage());
}
}
}
} In the above tests, I create a table with 200 000 rows and then request everything with a fetch size of 100. I hope this helps. Please note that our workaround is to add a fake binding to our requests. |
Thanks a lot for providing more detail on this. The issue is that there is quite some stream mapping going on. There are outer and inner streams that are guarded by I fixed the issue now and the inner loop no longer keeps fetching its results. |
[resolves #536] Signed-off-by: Mark Paluch <[email protected]>
When going through the simple query protocol the
Flux
operatordiscardOnCancel
(https://github.com/pgjdbc/r2dbc-postgresql/blob/main/src/main/java/io/r2dbc/postgresql/PostgresqlStatement.java#L248) does not seem to be propagated to theReactorNettyClient.Conversation
sink.When a query is canceled, the
Operators#discardOnCancel
operator is supposed to discard the cancel event and send a very largeSubscription#request
to the subscriptions (see https://github.com/pgjdbc/r2dbc-postgresql/blob/main/src/main/java/io/r2dbc/postgresql/util/FluxDiscardOnCancel.java#L125). But theFluxSink#onRequest
is never called (see https://github.com/pgjdbc/r2dbc-postgresql/blob/main/src/main/java/io/r2dbc/postgresql/client/ReactorNettyClient.java#L710) whenFluxDiscardOnCancel.FluxDiscardOnCancelSubscriber#cancel
is called. This leads to a deadReactorNettyClient.Conversation
since it is not canceled and its demand is exhausted.Since this
ReactorNettyClient.Conversation
is able to handle the cancel state, I removed theFlux
operatordiscardOnCancel
from thePostgresqlStatement
. Then everything worked as theReactorNettyClient.Conversation
was able to go through until the end even if the sink was canceled.Do we still need this
discardOnCancel
operator here? If so, why is the large request not propagated to theReactorNettyClient.Conversation
?The text was updated successfully, but these errors were encountered: