Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,12 @@ enum OperationType {
// don't want to block a gax/grpc executor while running user error and success callbacks.
private final ScheduledExecutorService bulkWriterExecutor;

/**
* The BulkWriter will shutdown executor when closed and all writes are done. This is important to
* prevent leaking threads.
*/
boolean autoShutdownBulkWriterExecutor;

/** The maximum number of writes that can be in a single batch. */
private int maxBatchSize = MAX_BATCH_SIZE;

Expand Down Expand Up @@ -230,7 +236,7 @@ enum OperationType {
@GuardedBy("lock")
private Executor errorExecutor;

Context traceContext;
private final Context traceContext;

/**
* Used to track when writes are enqueued. The user handler executors cannot be changed after a
Expand All @@ -241,10 +247,13 @@ enum OperationType {

BulkWriter(FirestoreImpl firestore, BulkWriterOptions options) {
this.firestore = firestore;
this.bulkWriterExecutor =
options.getExecutor() != null
? options.getExecutor()
: Executors.newSingleThreadScheduledExecutor();
if (options.getExecutor() != null) {
this.bulkWriterExecutor = options.getExecutor();
this.autoShutdownBulkWriterExecutor = false;
} else {
this.bulkWriterExecutor = Executors.newSingleThreadScheduledExecutor();
this.autoShutdownBulkWriterExecutor = true;
}
this.successExecutor = MoreExecutors.directExecutor();
this.errorExecutor = MoreExecutors.directExecutor();
this.bulkCommitBatch = new BulkCommitBatch(firestore, bulkWriterExecutor, maxBatchSize);
Expand Down Expand Up @@ -707,7 +716,7 @@ private ApiFuture<Void> flushLocked() {
lastFlushOperation = lastOperation;
scheduleCurrentBatchLocked();
}
return lastOperation;
return lastFlushOperation;
}

/**
Expand All @@ -722,10 +731,16 @@ private ApiFuture<Void> flushLocked() {
public void close() throws InterruptedException, ExecutionException {
ApiFuture<Void> flushFuture;
synchronized (lock) {
flushFuture = flushLocked();
closed = true;
if (!closed) {
flushLocked();
closed = true;
}
flushFuture = lastFlushOperation;
}
flushFuture.get();
if (autoShutdownBulkWriterExecutor) {
bulkWriterExecutor.shutdown();
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static com.google.cloud.firestore.LocalFirestoreHelper.update;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.*;
Expand Down Expand Up @@ -97,24 +98,13 @@ public class BulkWriterTest {

@Rule public Timeout timeout = new Timeout(2, TimeUnit.SECONDS);

@Spy private final FirestoreRpc firestoreRpc = Mockito.mock(FirestoreRpc.class);

private ScheduledExecutorService testExecutor;

/** Executor that executes delayed tasks without delay. */
private final ScheduledExecutorService immediateExecutor =
new ScheduledThreadPoolExecutor(1) {
@Override
@Nonnull
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
return super.schedule(command, 0, TimeUnit.MILLISECONDS);
}
};

@Spy
private final FirestoreImpl firestoreMock =
new FirestoreImpl(
FirestoreOptions.newBuilder().setProjectId("test-project").build(), firestoreRpc);
FirestoreOptions.newBuilder().setProjectId("test-project").build(),
Mockito.mock(FirestoreRpc.class));

@Captor private ArgumentCaptor<BatchWriteRequest> batchWriteCapture;

Expand Down Expand Up @@ -155,7 +145,6 @@ public static ApiFuture<BatchWriteResponse> mergeResponses(

@Before
public void before() {
lenient().doReturn(immediateExecutor).when(firestoreRpc).getExecutor();
testExecutor = Executors.newSingleThreadScheduledExecutor();

timeoutExecutor =
Expand All @@ -169,23 +158,27 @@ public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)

bulkWriter =
firestoreMock.bulkWriter(BulkWriterOptions.builder().setExecutor(timeoutExecutor).build());
bulkWriter.autoShutdownBulkWriterExecutor = true;
doc1 = firestoreMock.document("coll/doc1");
doc2 = firestoreMock.document("coll/doc2");
}

@After
public void after() throws InterruptedException {
shutdownScheduledExecutorService(testExecutor);
shutdownScheduledExecutorService(timeoutExecutor);
}

void shutdownScheduledExecutorService(ScheduledExecutorService executorService)
throws InterruptedException {
executorService.shutdown();
// Wait for the executor to finish after each test.
//
// This ensures the executor service is shut down properly within the given timeout, and thereby
// avoids potential hangs caused by lingering threads. Note that if a given thread is terminated
// because of the timeout, the associated test will fail, which is what we want.
executorService.awaitTermination(100, TimeUnit.MILLISECONDS);
assertTrue(executorService.isTerminated());
}

@Test
Expand Down Expand Up @@ -369,12 +362,19 @@ public void cannotCallMethodsAfterClose() throws Exception {
} catch (Exception e) {
assertEquals(expected, e.getMessage());
}
try {
bulkWriter.close();
fail("close() should have failed");
} catch (Exception e) {
assertEquals(expected, e.getMessage());
}
// Close is idempotent and can be called multiple time.
bulkWriter.close();
}

@Test
public void closeWillShutdownExecutor() throws Exception {
// We ONLY shutdown executor when the executor was created within the BulkWriter.
// To simulate this, we set the autoShutdownBulkWriterExecutor field to true.
bulkWriter.autoShutdownBulkWriterExecutor = true;

assertFalse(timeoutExecutor.isShutdown());
bulkWriter.close();
assertTrue(timeoutExecutor.isShutdown());
}

@Test
Expand Down