From 59879afac4d5698b6c1ccc4bd1460bb18f831f8e Mon Sep 17 00:00:00 2001 From: Sri Harsha CH Date: Tue, 12 Nov 2024 14:53:45 +0000 Subject: [PATCH] chore(spanner): add multiplexed session support for batch write --- ...tractMultiplexedSessionDatabaseClient.java | 10 ----- .../cloud/spanner/DatabaseClientImpl.java | 3 ++ .../DelayedMultiplexedSessionTransaction.java | 16 ++++++++ .../MultiplexedSessionDatabaseClient.java | 10 +++++ .../com/google/cloud/spanner/SessionImpl.java | 1 + ...edSessionDatabaseClientMockServerTest.java | 41 +++++++++++++++++++ 6 files changed, 71 insertions(+), 10 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractMultiplexedSessionDatabaseClient.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractMultiplexedSessionDatabaseClient.java index ebfb0e0a77..579dd42689 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractMultiplexedSessionDatabaseClient.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractMultiplexedSessionDatabaseClient.java @@ -16,11 +16,8 @@ package com.google.cloud.spanner; -import com.google.api.gax.rpc.ServerStream; import com.google.cloud.Timestamp; -import com.google.cloud.spanner.Options.TransactionOption; import com.google.cloud.spanner.Options.UpdateOption; -import com.google.spanner.v1.BatchWriteResponse; /** * Base class for the Multiplexed Session {@link DatabaseClient} implementation. Throws {@link @@ -45,13 +42,6 @@ public Timestamp writeAtLeastOnce(Iterable mutations) throws SpannerEx return writeAtLeastOnceWithOptions(mutations).getCommitTimestamp(); } - @Override - public ServerStream batchWriteAtLeastOnce( - Iterable mutationGroups, TransactionOption... options) - throws SpannerException { - throw new UnsupportedOperationException(); - } - @Override public long executePartitionedUpdate(Statement stmt, UpdateOption... options) { throw new UnsupportedOperationException(); diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java index d7f16f8952..529454621c 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java @@ -178,6 +178,9 @@ public ServerStream batchWriteAtLeastOnce( throws SpannerException { ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, options); try (IScope s = tracer.withSpan(span)) { + if (canUseMultiplexedSessionsForRW() && getMultiplexedSessionDatabaseClient() != null) { + return getMultiplexedSessionDatabaseClient().batchWriteAtLeastOnce(mutationGroups, options); + } return runWithSessionRetry(session -> session.batchWriteAtLeastOnce(mutationGroups, options)); } catch (RuntimeException e) { span.setStatus(e); diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DelayedMultiplexedSessionTransaction.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DelayedMultiplexedSessionTransaction.java index ad3e6b0cf7..21f1b15b0e 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DelayedMultiplexedSessionTransaction.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DelayedMultiplexedSessionTransaction.java @@ -20,11 +20,13 @@ import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutures; +import com.google.api.gax.rpc.ServerStream; import com.google.cloud.Timestamp; import com.google.cloud.spanner.DelayedReadContext.DelayedReadOnlyTransaction; import com.google.cloud.spanner.MultiplexedSessionDatabaseClient.MultiplexedSessionTransaction; import com.google.cloud.spanner.Options.TransactionOption; import com.google.common.util.concurrent.MoreExecutors; +import com.google.spanner.v1.BatchWriteResponse; import java.util.concurrent.ExecutionException; /** @@ -163,6 +165,20 @@ public CommitResponse writeWithOptions(Iterable mutations, Transaction } } + // This is a blocking method, as the interface that it implements is also defined as a blocking + // method. + @Override + public ServerStream batchWriteAtLeastOnce( + Iterable mutationGroups, TransactionOption... options) + throws SpannerException { + SessionReference sessionReference = getSessionReference(); + try (MultiplexedSessionTransaction transaction = + new MultiplexedSessionTransaction( + client, span, sessionReference, NO_CHANNEL_HINT, /* singleUse = */ true)) { + return transaction.batchWriteAtLeastOnce(mutationGroups, options); + } + } + @Override public TransactionRunner readWriteTransaction(TransactionOption... options) { return new DelayedTransactionRunner( diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java index 71e364bde8..11b4fb1361 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java @@ -22,6 +22,7 @@ import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutures; import com.google.api.core.SettableApiFuture; +import com.google.api.gax.rpc.ServerStream; import com.google.cloud.Timestamp; import com.google.cloud.spanner.Options.TransactionOption; import com.google.cloud.spanner.SessionClient.SessionConsumer; @@ -29,6 +30,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.MoreExecutors; +import com.google.spanner.v1.BatchWriteResponse; import com.google.spanner.v1.BeginTransactionRequest; import com.google.spanner.v1.RequestOptions; import com.google.spanner.v1.Transaction; @@ -499,6 +501,14 @@ public CommitResponse writeAtLeastOnceWithOptions( .writeAtLeastOnceWithOptions(mutations, options); } + @Override + public ServerStream batchWriteAtLeastOnce( + Iterable mutationGroups, TransactionOption... options) + throws SpannerException { + return createMultiplexedSessionTransaction(/* singleUse = */ true) + .batchWriteAtLeastOnce(mutationGroups, options); + } + @Override public ReadContext singleUse() { return createMultiplexedSessionTransaction(/* singleUse = */ true).singleUse(); diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java index 5bd3160368..eafc0f5e24 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java @@ -321,6 +321,7 @@ public ServerStream batchWriteAtLeastOnce( throw SpannerExceptionFactory.newSpannerException(e); } finally { span.end(); + onTransactionDone(); } } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java index 4dc1da62e7..32e05a0559 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java @@ -30,6 +30,7 @@ import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutures; +import com.google.api.gax.rpc.ServerStream; import com.google.cloud.NoCredentials; import com.google.cloud.Timestamp; import com.google.cloud.spanner.AsyncTransactionManager.AsyncTransactionStep; @@ -45,6 +46,8 @@ import com.google.common.collect.Lists; import com.google.common.util.concurrent.MoreExecutors; import com.google.protobuf.ByteString; +import com.google.spanner.v1.BatchWriteRequest; +import com.google.spanner.v1.BatchWriteResponse; import com.google.spanner.v1.BeginTransactionRequest; import com.google.spanner.v1.CommitRequest; import com.google.spanner.v1.ExecuteSqlRequest; @@ -1635,6 +1638,44 @@ public void testReadWriteTransactionWithCommitRetryProtocolExtensionSet() { assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsReleased().get()); } + @Test + public void testBatchWriteAtLeastOnce() { + DatabaseClientImpl client = + (DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + + Iterable MUTATION_GROUPS = + ImmutableList.of( + MutationGroup.of( + Mutation.newInsertBuilder("FOO1").set("ID").to(1L).set("NAME").to("Bar1").build(), + Mutation.newInsertBuilder("FOO2").set("ID").to(2L).set("NAME").to("Bar2").build()), + MutationGroup.of( + Mutation.newInsertBuilder("FOO3").set("ID").to(3L).set("NAME").to("Bar3").build(), + Mutation.newInsertBuilder("FOO4").set("ID").to(4L).set("NAME").to("Bar4").build())); + + ServerStream responseStream = client.batchWriteAtLeastOnce(MUTATION_GROUPS); + int idx = 0; + for (BatchWriteResponse response : responseStream) { + assertEquals( + response.getStatus(), + com.google.rpc.Status.newBuilder().setCode(com.google.rpc.Code.OK_VALUE).build()); + assertEquals(response.getIndexesList(), ImmutableList.of(idx, idx + 1)); + idx += 2; + } + + assertNotNull(responseStream); + List requests = mockSpanner.getRequestsOfType(BatchWriteRequest.class); + assertEquals(requests.size(), 1); + BatchWriteRequest request = requests.get(0); + assertTrue(mockSpanner.getSession(request.getSession()).getMultiplexed()); + assertEquals(request.getMutationGroupsCount(), 2); + assertEquals(request.getRequestOptions().getPriority(), Priority.PRIORITY_UNSPECIFIED); + assertFalse(request.getExcludeTxnFromChangeStreams()); + + assertNotNull(client.multiplexedSessionDatabaseClient); + assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsAcquired().get()); + assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsReleased().get()); + } + private void waitForSessionToBeReplaced(DatabaseClientImpl client) { assertNotNull(client.multiplexedSessionDatabaseClient); SessionReference sessionReference =