Skip to content

Commit 3288c73

Browse files
olavloitekolea2
authored andcommitted
add specific timeout for partitioned dml (#5709)
1 parent 1f2a8c7 commit 3288c73

6 files changed

Lines changed: 137 additions & 5 deletions

File tree

google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDMLTransaction.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import com.google.spanner.v1.TransactionSelector;
3030
import java.util.Map;
3131
import java.util.concurrent.Callable;
32+
import org.threeten.bp.Duration;
3233

3334
/** Partitioned DML transaction for bulk updates and deletes. */
3435
class PartitionedDMLTransaction implements SessionTransaction {
@@ -62,7 +63,7 @@ private ByteString initTransaction() {
6263
* Executes the {@link Statement} using a partitioned dml transaction with automatic retry if the
6364
* transaction was aborted.
6465
*/
65-
long executePartitionedUpdate(final Statement statement) {
66+
long executePartitionedUpdate(final Statement statement, final Duration timeout) {
6667
checkState(isValid, "Partitioned DML has been invalidated by a new operation on the session");
6768
Callable<com.google.spanner.v1.ResultSet> callable =
6869
new Callable<com.google.spanner.v1.ResultSet>() {
@@ -83,7 +84,7 @@ public com.google.spanner.v1.ResultSet call() throws Exception {
8384
builder.putParamTypes(param.getKey(), param.getValue().getType().toProto());
8485
}
8586
}
86-
return rpc.executeQuery(builder.build(), session.getOptions());
87+
return rpc.executePartitionedDml(builder.build(), session.getOptions(), timeout);
8788
}
8889
};
8990
com.google.spanner.v1.ResultSet resultSet =

google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ public String getName() {
100100
public long executePartitionedUpdate(Statement stmt) {
101101
setActive(null);
102102
PartitionedDMLTransaction txn = new PartitionedDMLTransaction(this, spanner.getRpc());
103-
return txn.executePartitionedUpdate(stmt);
103+
return txn.executePartitionedUpdate(stmt, spanner.getOptions().getPartitionedDmlTimeout());
104104
}
105105

106106
@Override

google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import java.net.URL;
4444
import java.util.Map;
4545
import java.util.Set;
46+
import org.threeten.bp.Duration;
4647

4748
/** Options for the Cloud Spanner service. */
4849
public class SpannerOptions extends ServiceOptions<Spanner, SpannerOptions> {
@@ -68,6 +69,7 @@ public class SpannerOptions extends ServiceOptions<Spanner, SpannerOptions> {
6869
private final SpannerStubSettings spannerStubSettings;
6970
private final InstanceAdminStubSettings instanceAdminStubSettings;
7071
private final DatabaseAdminStubSettings databaseAdminStubSettings;
72+
private final Duration partitionedDmlTimeout;
7173

7274
/** Default implementation of {@code SpannerFactory}. */
7375
private static class DefaultSpannerFactory implements SpannerFactory {
@@ -114,6 +116,7 @@ private SpannerOptions(Builder builder) {
114116
} catch (IOException e) {
115117
throw SpannerExceptionFactory.newSpannerException(e);
116118
}
119+
partitionedDmlTimeout = builder.partitionedDmlTimeout;
117120
}
118121

119122
/** Builder for {@link SpannerOptions} instances. */
@@ -139,6 +142,7 @@ public static class Builder
139142
InstanceAdminStubSettings.newBuilder();
140143
private DatabaseAdminStubSettings.Builder databaseAdminStubSettingsBuilder =
141144
DatabaseAdminStubSettings.newBuilder();
145+
private Duration partitionedDmlTimeout = Duration.ofHours(2L);
142146

143147
private Builder() {}
144148

@@ -151,6 +155,7 @@ private Builder() {}
151155
this.spannerStubSettingsBuilder = options.spannerStubSettings.toBuilder();
152156
this.instanceAdminStubSettingsBuilder = options.instanceAdminStubSettings.toBuilder();
153157
this.databaseAdminStubSettingsBuilder = options.databaseAdminStubSettings.toBuilder();
158+
this.partitionedDmlTimeout = options.partitionedDmlTimeout;
154159
this.channelProvider = options.channelProvider;
155160
this.channelConfigurator = options.channelConfigurator;
156161
this.interceptorProvider = options.interceptorProvider;
@@ -328,6 +333,15 @@ public DatabaseAdminStubSettings.Builder getDatabaseAdminStubSettingsBuilder() {
328333
return databaseAdminStubSettingsBuilder;
329334
}
330335

336+
/**
337+
* Sets a timeout specifically for Partitioned DML statements executed through {@link
338+
* DatabaseClient#executePartitionedUpdate(Statement)}. The default is 2 hours.
339+
*/
340+
public Builder setPartitionedDmlTimeout(Duration timeout) {
341+
this.partitionedDmlTimeout = timeout;
342+
return this;
343+
}
344+
331345
/**
332346
* Specifying this will allow the client to prefetch up to {@code prefetchChunks} {@code
333347
* PartialResultSet} chunks for each read and query. The data size of each chunk depends on the
@@ -396,6 +410,10 @@ public DatabaseAdminStubSettings getDatabaseAdminStubSettings() {
396410
return databaseAdminStubSettings;
397411
}
398412

413+
public Duration getPartitionedDmlTimeout() {
414+
return partitionedDmlTimeout;
415+
}
416+
399417
public int getPrefetchChunks() {
400418
return prefetchChunks;
401419
}

google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -511,6 +511,13 @@ public ResultSet executeQuery(ExecuteSqlRequest request, @Nullable Map<Option, ?
511511
return get(spannerStub.executeSqlCallable().futureCall(request, context));
512512
}
513513

514+
@Override
515+
public ResultSet executePartitionedDml(
516+
ExecuteSqlRequest request, @Nullable Map<Option, ?> options, Duration timeout) {
517+
GrpcCallContext context = newCallContext(options, request.getSession(), timeout);
518+
return get(spannerStub.executeSqlCallable().futureCall(request, context));
519+
}
520+
514521
@Override
515522
public StreamingCall executeQuery(
516523
ExecuteSqlRequest request, ResultStreamConsumer consumer, @Nullable Map<Option, ?> options) {
@@ -591,11 +598,19 @@ private static <T> T get(final Future<T> future) throws SpannerException {
591598
}
592599

593600
private GrpcCallContext newCallContext(@Nullable Map<Option, ?> options, String resource) {
601+
return newCallContext(options, resource, null);
602+
}
603+
604+
private GrpcCallContext newCallContext(
605+
@Nullable Map<Option, ?> options, String resource, Duration timeout) {
594606
GrpcCallContext context = GrpcCallContext.createDefault();
595607
if (options != null) {
596608
context = context.withChannelAffinity(Option.CHANNEL_HINT.getLong(options).intValue());
597609
}
598610
context = context.withExtraHeaders(metadataProvider.newExtraHeaders(resource, projectName));
611+
if (timeout != null) {
612+
context = context.withTimeout(timeout);
613+
}
599614
return context.withStreamWaitTimeout(waitTimeout).withStreamIdleTimeout(idleTimeout);
600615
}
601616

google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import java.util.List;
5050
import java.util.Map;
5151
import javax.annotation.Nullable;
52+
import org.threeten.bp.Duration;
5253

5354
/**
5455
* Abstracts remote calls to the Cloud Spanner service. Typically end-consumer code will never use
@@ -213,6 +214,9 @@ StreamingCall read(
213214

214215
ResultSet executeQuery(ExecuteSqlRequest request, @Nullable Map<Option, ?> options);
215216

217+
ResultSet executePartitionedDml(
218+
ExecuteSqlRequest request, @Nullable Map<Option, ?> options, Duration timeout);
219+
216220
StreamingCall executeQuery(
217221
ExecuteSqlRequest request, ResultStreamConsumer consumer, @Nullable Map<Option, ?> options);
218222

google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java

Lines changed: 96 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,14 @@
1919
import static org.hamcrest.CoreMatchers.equalTo;
2020
import static org.hamcrest.CoreMatchers.is;
2121
import static org.junit.Assert.assertThat;
22+
import static org.junit.Assert.fail;
2223

2324
import com.google.api.gax.grpc.testing.LocalChannelProvider;
25+
import com.google.api.gax.retrying.RetrySettings;
2426
import com.google.cloud.NoCredentials;
27+
import com.google.cloud.spanner.MockSpannerServiceImpl.SimulatedExecutionTime;
2528
import com.google.cloud.spanner.MockSpannerServiceImpl.StatementResult;
29+
import com.google.cloud.spanner.TransactionRunner.TransactionCallable;
2630
import com.google.protobuf.ListValue;
2731
import com.google.spanner.v1.ResultSetMetadata;
2832
import com.google.spanner.v1.StructType;
@@ -32,13 +36,15 @@
3236
import io.grpc.Status;
3337
import io.grpc.inprocess.InProcessServerBuilder;
3438
import java.io.IOException;
39+
import java.util.concurrent.ScheduledThreadPoolExecutor;
3540
import org.junit.After;
3641
import org.junit.AfterClass;
3742
import org.junit.Before;
3843
import org.junit.BeforeClass;
3944
import org.junit.Test;
4045
import org.junit.runner.RunWith;
4146
import org.junit.runners.JUnit4;
47+
import org.threeten.bp.Duration;
4248

4349
@RunWith(JUnit4.class)
4450
public class DatabaseClientImplTest {
@@ -89,21 +95,24 @@ public static void startStaticServer() throws IOException {
8995
String uniqueName = InProcessServerBuilder.generateName();
9096
server =
9197
InProcessServerBuilder.forName(uniqueName)
92-
.directExecutor()
98+
// We need to use a real executor for timeouts to occur.
99+
.scheduledExecutorService(new ScheduledThreadPoolExecutor(1))
93100
.addService(mockSpanner)
94101
.build()
95102
.start();
96103
channelProvider = LocalChannelProvider.create(uniqueName);
97104
}
98105

99106
@AfterClass
100-
public static void stopServer() {
107+
public static void stopServer() throws InterruptedException {
101108
server.shutdown();
109+
server.awaitTermination();
102110
}
103111

104112
@Before
105113
public void setUp() throws IOException {
106114
mockSpanner.reset();
115+
mockSpanner.removeAllExecutionTimes();
107116
spanner =
108117
SpannerOptions.newBuilder()
109118
.setProjectId("[PROJECT]")
@@ -158,4 +167,89 @@ public void testExecutePartitionedDmlWithException() {
158167
spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]"));
159168
client.executePartitionedUpdate(INVALID_UPDATE_STATEMENT);
160169
}
170+
171+
@Test
172+
public void testPartitionedDmlDoesNotTimeout() throws Exception {
173+
mockSpanner.setExecuteSqlExecutionTime(SimulatedExecutionTime.ofMinimumAndRandomTime(10, 0));
174+
final RetrySettings retrySettings =
175+
RetrySettings.newBuilder()
176+
.setInitialRpcTimeout(Duration.ofMillis(1L))
177+
.setMaxRpcTimeout(Duration.ofMillis(1L))
178+
.setMaxAttempts(1)
179+
.setTotalTimeout(Duration.ofMillis(1L))
180+
.build();
181+
SpannerOptions.Builder builder =
182+
SpannerOptions.newBuilder()
183+
.setProjectId("[PROJECT]")
184+
.setChannelProvider(channelProvider)
185+
.setCredentials(NoCredentials.getInstance());
186+
// Set normal DML timeout value.
187+
builder.getSpannerStubSettingsBuilder().executeSqlSettings().setRetrySettings(retrySettings);
188+
try (Spanner spanner = builder.build().getService()) {
189+
DatabaseClient client =
190+
spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]"));
191+
192+
// PDML should not timeout with these settings.
193+
long updateCount = client.executePartitionedUpdate(UPDATE_STATEMENT);
194+
assertThat(updateCount, is(equalTo(UPDATE_COUNT)));
195+
196+
// Normal DML should timeout.
197+
try {
198+
client
199+
.readWriteTransaction()
200+
.run(
201+
new TransactionCallable<Void>() {
202+
@Override
203+
public Void run(TransactionContext transaction) throws Exception {
204+
transaction.executeUpdate(UPDATE_STATEMENT);
205+
return null;
206+
}
207+
});
208+
fail("expected DEADLINE_EXCEEDED");
209+
} catch (SpannerException e) {
210+
if (e.getErrorCode() != ErrorCode.DEADLINE_EXCEEDED) {
211+
fail("expected DEADLINE_EXCEEDED");
212+
}
213+
}
214+
}
215+
}
216+
217+
@Test
218+
public void testPartitionedDmlWithTimeout() throws Exception {
219+
mockSpanner.setExecuteSqlExecutionTime(SimulatedExecutionTime.ofMinimumAndRandomTime(10, 0));
220+
SpannerOptions.Builder builder =
221+
SpannerOptions.newBuilder()
222+
.setProjectId("[PROJECT]")
223+
.setChannelProvider(channelProvider)
224+
.setCredentials(NoCredentials.getInstance());
225+
// Set PDML timeout value.
226+
builder.setPartitionedDmlTimeout(Duration.ofMillis(1L));
227+
try (Spanner spanner = builder.build().getService()) {
228+
DatabaseClient client =
229+
spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]"));
230+
231+
// PDML should timeout with these settings.
232+
try {
233+
client.executePartitionedUpdate(UPDATE_STATEMENT);
234+
fail("expected DEADLINE_EXCEEDED");
235+
} catch (SpannerException e) {
236+
if (e.getErrorCode() != ErrorCode.DEADLINE_EXCEEDED) {
237+
fail("expected DEADLINE_EXCEEDED");
238+
}
239+
}
240+
241+
// Normal DML should not timeout.
242+
long updateCount =
243+
client
244+
.readWriteTransaction()
245+
.run(
246+
new TransactionCallable<Long>() {
247+
@Override
248+
public Long run(TransactionContext transaction) throws Exception {
249+
return transaction.executeUpdate(UPDATE_STATEMENT);
250+
}
251+
});
252+
assertThat(updateCount, is(equalTo(UPDATE_COUNT)));
253+
}
254+
}
161255
}

0 commit comments

Comments
 (0)