Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Stream;

Expand Down Expand Up @@ -213,7 +213,7 @@ default TabletsMutator mutateTablets() {
* An entry point for updating tablets metadata using a conditional writer. The returned mutator
* will buffer everything in memory until {@link ConditionalTabletsMutator#process()} is called.
* If buffering everything in memory is undesirable, then consider using
* {@link #conditionallyMutateTablets(BiConsumer)}
* {@link #conditionallyMutateTablets(Consumer)}
*
* @see ConditionalTabletMutator#submit(RejectionHandler)
*/
Expand All @@ -237,7 +237,7 @@ default ConditionalTabletsMutator conditionallyMutateTablets() {
* @see ConditionalTabletMutator#submit(RejectionHandler)
*/
default AsyncConditionalTabletsMutator
conditionallyMutateTablets(BiConsumer<KeyExtent,ConditionalResult> resultsConsumer) {
conditionallyMutateTablets(Consumer<ConditionalResult> resultsConsumer) {
throw new UnsupportedOperationException();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.metadata.schema.Ample;
import org.apache.accumulo.core.util.threads.Threads;
import org.apache.accumulo.server.ServerContext;

public class AsyncConditionalTabletsMutatorImpl implements Ample.AsyncConditionalTabletsMutator {
private final BiConsumer<KeyExtent,Ample.ConditionalResult> resultsConsumer;
private final Consumer<Ample.ConditionalResult> resultsConsumer;
private final ExecutorService executor;
private Future<Map<KeyExtent,Ample.ConditionalResult>> backgroundProcessing = null;
private ConditionalTabletsMutatorImpl bufferingMutator;
Expand All @@ -41,7 +41,7 @@ public class AsyncConditionalTabletsMutatorImpl implements Ample.AsyncConditiona
public static final int BATCH_SIZE = 1000;

AsyncConditionalTabletsMutatorImpl(ServerContext context,
BiConsumer<KeyExtent,Ample.ConditionalResult> resultsConsumer) {
Consumer<Ample.ConditionalResult> resultsConsumer) {
this.resultsConsumer = Objects.requireNonNull(resultsConsumer);
this.bufferingMutator = new ConditionalTabletsMutatorImpl(context);
this.context = context;
Expand All @@ -58,7 +58,7 @@ public Ample.OperationRequirements mutateTablet(KeyExtent extent) {
if (backgroundProcessing != null) {
// a previous batch of mutations was submitted for processing so wait on it.
try {
backgroundProcessing.get().forEach(resultsConsumer);
backgroundProcessing.get().values().forEach(resultsConsumer);
} catch (InterruptedException | ExecutionException e) {
throw new IllegalStateException(e);
}
Expand All @@ -85,13 +85,13 @@ public void close() {
if (backgroundProcessing != null) {
// a previous batch of mutations was submitted for processing so wait on it.
try {
backgroundProcessing.get().forEach(resultsConsumer);
backgroundProcessing.get().values().forEach(resultsConsumer);
} catch (InterruptedException | ExecutionException e) {
throw new IllegalStateException(e);
}
}
// process anything not processed so far
bufferingMutator.process().forEach(resultsConsumer);
bufferingMutator.process().values().forEach(resultsConsumer);
bufferingMutator.close();
executor.shutdownNow();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -96,7 +95,7 @@ public ConditionalTabletsMutator conditionallyMutateTablets() {

@Override
public AsyncConditionalTabletsMutator
conditionallyMutateTablets(BiConsumer<KeyExtent,ConditionalResult> resultsConsumer) {
conditionallyMutateTablets(Consumer<ConditionalResult> resultsConsumer) {
return new AsyncConditionalTabletsMutatorImpl(context, resultsConsumer);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,14 @@
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW;

import java.time.Duration;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;

import org.apache.accumulo.core.data.NamespaceId;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.fate.FateTxId;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.core.metadata.schema.Ample;
import org.apache.accumulo.core.metadata.schema.Ample.ConditionalResult.Status;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
import org.apache.accumulo.manager.Manager;
Expand Down Expand Up @@ -59,32 +62,46 @@ public long isReady(long tid, Manager manager) throws Exception {

var ample = manager.getContext().getAmple();

var fateStr = FateTxId.formatTid(tid);

AtomicLong rejectedCount = new AtomicLong(0);
Consumer<Ample.ConditionalResult> resultConsumer = result -> {
if (result.getStatus() == Status.REJECTED) {
log.debug("{} update for {} was rejected ", fateStr, result.getExtent());
rejectedCount.incrementAndGet();
}
};

long t1, t2, submitted = 0, total = 0;

try (
var tablets = ample.readTablets().forTable(tableId).overlapping(startRow, endRow)
.fetch(PREV_ROW, COMPACTED).checkConsistency().build();
var tabletsMutator = ample.conditionallyMutateTablets()) {
var tabletsMutator = ample.conditionallyMutateTablets(resultConsumer)) {

long t1 = System.nanoTime();
t1 = System.nanoTime();
for (TabletMetadata tablet : tablets) {
total++;
if (tablet.getCompacted().contains(tid)) {
tabletsMutator.mutateTablet(tablet.getExtent()).requireAbsentOperation()
.requireSame(tablet, COMPACTED).deleteCompacted(tid)
.submit(tabletMetadata -> !tabletMetadata.getCompacted().contains(tid));
submitted++;
}
}

long rejected = tabletsMutator.process().values().stream()
.filter(result -> result.getStatus() == Status.REJECTED).peek(result -> log
.debug("{} update for {} was rejected ", FateTxId.formatTid(tid), result.getExtent()))
.count();
t2 = System.nanoTime();
}

long t2 = System.nanoTime();
long scanTime = Duration.ofNanos(t2 - t1).toMillis();

if (rejected > 0) {
long sleepTime = Duration.ofNanos(t2 - t1).toMillis();
sleepTime = Math.max(100, Math.min(30000, sleepTime * 2));
return sleepTime;
}
log.debug("{} removed {} of {} compacted markers for {} tablets in {}ms", fateStr,
submitted - rejectedCount.get(), submitted, total, scanTime);

if (rejectedCount.get() > 0) {
long sleepTime = scanTime;
sleepTime = Math.max(100, Math.min(30000, sleepTime * 2));
return sleepTime;
}

return 0;
Expand Down
Loading