Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: retry in-flight transactions on regular sessions when multiplexed sessions encounter UNIMPLEMENTED errors #3708

Merged
Original file line number Diff line number Diff line change
@@ -44,14 +44,17 @@ class DelayedMultiplexedSessionTransaction extends AbstractMultiplexedSessionDat
private final ISpan span;

private final ApiFuture<SessionReference> sessionFuture;
private final SessionPool sessionPool;

DelayedMultiplexedSessionTransaction(
MultiplexedSessionDatabaseClient client,
ISpan span,
ApiFuture<SessionReference> sessionFuture) {
ApiFuture<SessionReference> sessionFuture,
SessionPool sessionPool) {
this.client = client;
this.span = span;
this.sessionFuture = sessionFuture;
this.sessionPool = sessionPool;
}

@Override
@@ -189,7 +192,12 @@ public TransactionRunner readWriteTransaction(TransactionOption... options) {
this.sessionFuture,
sessionReference ->
new MultiplexedSessionTransaction(
client, span, sessionReference, NO_CHANNEL_HINT, /* singleUse = */ false)
client,
span,
sessionReference,
NO_CHANNEL_HINT,
/* singleUse = */ false,
this.sessionPool)
.readWriteTransaction(options),
MoreExecutors.directExecutor()));
}
Original file line number Diff line number Diff line change
@@ -27,6 +27,7 @@
import com.google.cloud.spanner.Options.TransactionOption;
import com.google.cloud.spanner.Options.UpdateOption;
import com.google.cloud.spanner.SessionClient.SessionConsumer;
import com.google.cloud.spanner.SessionPool.SessionPoolTransactionRunner;
import com.google.cloud.spanner.SpannerException.ResourceNotFoundException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
@@ -52,6 +53,89 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

/**
* {@link TransactionRunner} that automatically handles "UNIMPLEMENTED" errors with the message
* "Transaction type read_write not supported with multiplexed sessions" by switching from a
* multiplexed session to a regular session and then restarts the transaction.
*/
class MultiplexedSessionTransactionRunner implements TransactionRunner {
private final SessionPool sessionPool;
private final TransactionRunnerImpl transactionRunnerForMultiplexedSession;
private SessionPoolTransactionRunner transactionRunnerForRegularSession;
private final TransactionOption[] options;
private boolean isUsingMultiplexedSession = true;

public MultiplexedSessionTransactionRunner(
SessionImpl multiplexedSession, SessionPool sessionPool, TransactionOption... options) {
this.sessionPool = sessionPool;
this.transactionRunnerForMultiplexedSession =
new TransactionRunnerImpl(
multiplexedSession, options); // Uses multiplexed session initially
multiplexedSession.setActive(this.transactionRunnerForMultiplexedSession);
this.options = options;
}

private TransactionRunner getRunner() {
if (this.isUsingMultiplexedSession) {
return this.transactionRunnerForMultiplexedSession;
} else {
if (this.transactionRunnerForRegularSession == null) {
this.transactionRunnerForRegularSession =
new SessionPoolTransactionRunner<>(
sessionPool.getSession(),
sessionPool.getPooledSessionReplacementHandler(),
options);
}
return this.transactionRunnerForRegularSession;
}
}

@Override
public <T> T run(TransactionCallable<T> callable) {
while (true) {
try {
return getRunner().run(callable);
} catch (SpannerException e) {
if (e.getErrorCode() == ErrorCode.UNIMPLEMENTED
&& verifyUnimplementedErrorMessageForRWMux(e)) {
this.isUsingMultiplexedSession = false; // Fallback to regular session
} else {
throw e; // Other errors propagate
}
}
}
}

@Override
public Timestamp getCommitTimestamp() {
return getRunner().getCommitTimestamp();
}

@Override
public CommitResponse getCommitResponse() {
return getRunner().getCommitResponse();
}

@Override
public TransactionRunner allowNestedTransaction() {
getRunner().allowNestedTransaction();
return this;
}

private boolean verifyUnimplementedErrorMessageForRWMux(SpannerException spannerException) {
if (spannerException.getCause() == null) {
return false;
}
if (spannerException.getCause().getMessage() == null) {
return false;
}
return spannerException
.getCause()
.getMessage()
.contains("Transaction type read_write not supported with multiplexed sessions");
}
}

/**
* {@link DatabaseClient} implementation that uses a single multiplexed session to execute
* transactions.
@@ -75,18 +159,30 @@ static class MultiplexedSessionTransaction extends SessionImpl {
private final int singleUseChannelHint;

private boolean done;
private final SessionPool pool;

MultiplexedSessionTransaction(
MultiplexedSessionDatabaseClient client,
ISpan span,
SessionReference sessionReference,
int singleUseChannelHint,
boolean singleUse) {
this(client, span, sessionReference, singleUseChannelHint, singleUse, null);
}

MultiplexedSessionTransaction(
MultiplexedSessionDatabaseClient client,
ISpan span,
SessionReference sessionReference,
int singleUseChannelHint,
boolean singleUse,
SessionPool pool) {
super(client.sessionClient.getSpanner(), sessionReference, singleUseChannelHint);
this.client = client;
this.singleUse = singleUse;
this.singleUseChannelHint = singleUseChannelHint;
this.client.numSessionsAcquired.incrementAndGet();
this.pool = pool;
setCurrentSpan(span);
}

@@ -134,6 +230,11 @@ public CommitResponse writeAtLeastOnceWithOptions(
return response;
}

@Override
public TransactionRunner readWriteTransaction(TransactionOption... options) {
return new MultiplexedSessionTransactionRunner(this, pool, options);
}

@Override
void onTransactionDone() {
boolean markedDone = false;
@@ -225,6 +326,8 @@ public void close() {
*/
@VisibleForTesting final AtomicBoolean unimplementedForPartitionedOps = new AtomicBoolean(false);

private SessionPool pool;

MultiplexedSessionDatabaseClient(SessionClient sessionClient) {
this(sessionClient, Clock.systemUTC());
}
@@ -299,6 +402,10 @@ public void onSessionCreateFailure(Throwable t, int createFailureForSessionCount
initialSessionReferenceFuture);
}

void setPool(SessionPool pool) {
this.pool = pool;
}

private static void maybeWaitForSessionCreation(
SessionPoolOptions sessionPoolOptions, ApiFuture<SessionReference> future) {
Duration waitDuration = sessionPoolOptions.getWaitForMinSessions();
@@ -489,7 +596,8 @@ private MultiplexedSessionTransaction createDirectMultiplexedSessionTransaction(
// any special handling of such errors.
multiplexedSessionReference.get().get(),
singleUse ? getSingleUseChannelHint() : NO_CHANNEL_HINT,
singleUse);
singleUse,
this.pool);
} catch (ExecutionException executionException) {
throw SpannerExceptionFactory.asSpannerException(executionException.getCause());
} catch (InterruptedException interruptedException) {
@@ -499,7 +607,7 @@ private MultiplexedSessionTransaction createDirectMultiplexedSessionTransaction(

private DelayedMultiplexedSessionTransaction createDelayedMultiplexSessionTransaction() {
return new DelayedMultiplexedSessionTransaction(
this, tracer.getCurrentSpan(), multiplexedSessionReference.get());
this, tracer.getCurrentSpan(), multiplexedSessionReference.get(), this.pool);
}

private int getSingleUseChannelHint() {
Original file line number Diff line number Diff line change
@@ -1004,15 +1004,15 @@ public TransactionState getState() {
* {@link TransactionRunner} that automatically handles {@link SessionNotFoundException}s by
* replacing the underlying session and then restarts the transaction.
*/
private static final class SessionPoolTransactionRunner<I extends SessionFuture>
static final class SessionPoolTransactionRunner<I extends SessionFuture>
implements TransactionRunner {

private I session;
private final SessionReplacementHandler<I> sessionReplacementHandler;
private final TransactionOption[] options;
private TransactionRunner runner;

private SessionPoolTransactionRunner(
SessionPoolTransactionRunner(
I session,
SessionReplacementHandler<I> sessionReplacementHandler,
TransactionOption... options) {
Original file line number Diff line number Diff line change
@@ -332,6 +332,12 @@ DatabaseClientImpl createDatabaseClient(
boolean useMultiplexedSessionPartitionedOps,
boolean useMultiplexedSessionForRW,
Attributes commonAttributes) {
if (multiplexedSessionClient != null) {
// Set the session pool in the multiplexed session client.
// This is required to handle fallback to regular sessions for in-progress transactions that
// use multiplexed sessions but fail with UNIMPLEMENTED errors.
multiplexedSessionClient.setPool(pool);
}
return new DatabaseClientImpl(
clientId,
pool,
Loading