diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java index 44f17157754..5983efb1121 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java @@ -24,7 +24,7 @@ import java.util.Objects; import java.util.Set; import java.util.UUID; -import java.util.function.BiConsumer; +import java.util.function.Consumer; import java.util.function.Predicate; import java.util.stream.Stream; @@ -213,7 +213,7 @@ default TabletsMutator mutateTablets() { * An entry point for updating tablets metadata using a conditional writer. The returned mutator * will buffer everything in memory until {@link ConditionalTabletsMutator#process()} is called. * If buffering everything in memory is undesirable, then consider using - * {@link #conditionallyMutateTablets(BiConsumer)} + * {@link #conditionallyMutateTablets(Consumer)} * * @see ConditionalTabletMutator#submit(RejectionHandler) */ @@ -237,7 +237,7 @@ default ConditionalTabletsMutator conditionallyMutateTablets() { * @see ConditionalTabletMutator#submit(RejectionHandler) */ default AsyncConditionalTabletsMutator - conditionallyMutateTablets(BiConsumer resultsConsumer) { + conditionallyMutateTablets(Consumer resultsConsumer) { throw new UnsupportedOperationException(); } diff --git a/server/base/src/main/java/org/apache/accumulo/server/metadata/AsyncConditionalTabletsMutatorImpl.java b/server/base/src/main/java/org/apache/accumulo/server/metadata/AsyncConditionalTabletsMutatorImpl.java index 0d31f71eef7..d7666361f7b 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/metadata/AsyncConditionalTabletsMutatorImpl.java +++ b/server/base/src/main/java/org/apache/accumulo/server/metadata/AsyncConditionalTabletsMutatorImpl.java @@ -24,7 +24,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.function.BiConsumer; +import java.util.function.Consumer; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.metadata.schema.Ample; @@ -32,7 +32,7 @@ import org.apache.accumulo.server.ServerContext; public class AsyncConditionalTabletsMutatorImpl implements Ample.AsyncConditionalTabletsMutator { - private final BiConsumer resultsConsumer; + private final Consumer resultsConsumer; private final ExecutorService executor; private Future> backgroundProcessing = null; private ConditionalTabletsMutatorImpl bufferingMutator; @@ -41,7 +41,7 @@ public class AsyncConditionalTabletsMutatorImpl implements Ample.AsyncConditiona public static final int BATCH_SIZE = 1000; AsyncConditionalTabletsMutatorImpl(ServerContext context, - BiConsumer resultsConsumer) { + Consumer resultsConsumer) { this.resultsConsumer = Objects.requireNonNull(resultsConsumer); this.bufferingMutator = new ConditionalTabletsMutatorImpl(context); this.context = context; @@ -58,7 +58,7 @@ public Ample.OperationRequirements mutateTablet(KeyExtent extent) { if (backgroundProcessing != null) { // a previous batch of mutations was submitted for processing so wait on it. try { - backgroundProcessing.get().forEach(resultsConsumer); + backgroundProcessing.get().values().forEach(resultsConsumer); } catch (InterruptedException | ExecutionException e) { throw new IllegalStateException(e); } @@ -85,13 +85,13 @@ public void close() { if (backgroundProcessing != null) { // a previous batch of mutations was submitted for processing so wait on it. try { - backgroundProcessing.get().forEach(resultsConsumer); + backgroundProcessing.get().values().forEach(resultsConsumer); } catch (InterruptedException | ExecutionException e) { throw new IllegalStateException(e); } } // process anything not processed so far - bufferingMutator.process().forEach(resultsConsumer); + bufferingMutator.process().values().forEach(resultsConsumer); bufferingMutator.close(); executor.shutdownNow(); } diff --git a/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java b/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java index a42bc5f4885..d0c1dd71e33 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java +++ b/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java @@ -28,7 +28,6 @@ import java.util.Objects; import java.util.Set; import java.util.UUID; -import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -96,7 +95,7 @@ public ConditionalTabletsMutator conditionallyMutateTablets() { @Override public AsyncConditionalTabletsMutator - conditionallyMutateTablets(BiConsumer resultsConsumer) { + conditionallyMutateTablets(Consumer resultsConsumer) { return new AsyncConditionalTabletsMutatorImpl(context, resultsConsumer); } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CleanUp.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CleanUp.java index 9b864e3f3fd..e1c6e1f504e 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CleanUp.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CleanUp.java @@ -22,11 +22,14 @@ import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW; import java.time.Duration; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; import org.apache.accumulo.core.data.NamespaceId; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.fate.FateTxId; import org.apache.accumulo.core.fate.Repo; +import org.apache.accumulo.core.metadata.schema.Ample; import org.apache.accumulo.core.metadata.schema.Ample.ConditionalResult.Status; import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.manager.Manager; @@ -59,32 +62,46 @@ public long isReady(long tid, Manager manager) throws Exception { var ample = manager.getContext().getAmple(); + var fateStr = FateTxId.formatTid(tid); + + AtomicLong rejectedCount = new AtomicLong(0); + Consumer resultConsumer = result -> { + if (result.getStatus() == Status.REJECTED) { + log.debug("{} update for {} was rejected ", fateStr, result.getExtent()); + rejectedCount.incrementAndGet(); + } + }; + + long t1, t2, submitted = 0, total = 0; + try ( var tablets = ample.readTablets().forTable(tableId).overlapping(startRow, endRow) .fetch(PREV_ROW, COMPACTED).checkConsistency().build(); - var tabletsMutator = ample.conditionallyMutateTablets()) { + var tabletsMutator = ample.conditionallyMutateTablets(resultConsumer)) { - long t1 = System.nanoTime(); + t1 = System.nanoTime(); for (TabletMetadata tablet : tablets) { + total++; if (tablet.getCompacted().contains(tid)) { tabletsMutator.mutateTablet(tablet.getExtent()).requireAbsentOperation() .requireSame(tablet, COMPACTED).deleteCompacted(tid) .submit(tabletMetadata -> !tabletMetadata.getCompacted().contains(tid)); + submitted++; } } - long rejected = tabletsMutator.process().values().stream() - .filter(result -> result.getStatus() == Status.REJECTED).peek(result -> log - .debug("{} update for {} was rejected ", FateTxId.formatTid(tid), result.getExtent())) - .count(); + t2 = System.nanoTime(); + } - long t2 = System.nanoTime(); + long scanTime = Duration.ofNanos(t2 - t1).toMillis(); - if (rejected > 0) { - long sleepTime = Duration.ofNanos(t2 - t1).toMillis(); - sleepTime = Math.max(100, Math.min(30000, sleepTime * 2)); - return sleepTime; - } + log.debug("{} removed {} of {} compacted markers for {} tablets in {}ms", fateStr, + submitted - rejectedCount.get(), submitted, total, scanTime); + + if (rejectedCount.get() > 0) { + long sleepTime = scanTime; + sleepTime = Math.max(100, Math.min(30000, sleepTime * 2)); + return sleepTime; } return 0; diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java index 21d0f059025..6fd1a969aa4 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java @@ -29,6 +29,8 @@ import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SELECTED; import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -46,6 +48,7 @@ import org.apache.accumulo.core.metadata.AbstractTabletFile; import org.apache.accumulo.core.metadata.RootTable; import org.apache.accumulo.core.metadata.StoredTabletFile; +import org.apache.accumulo.core.metadata.schema.Ample; import org.apache.accumulo.core.metadata.schema.Ample.ConditionalResult.Status; import org.apache.accumulo.core.metadata.schema.SelectedFiles; import org.apache.accumulo.core.metadata.schema.TabletMetadata; @@ -137,18 +140,33 @@ public int updateAndCheckTablets(Manager manager, long tid) // ELASTICITY_TODO use existing compaction logging - try ( - var tablets = ample.readTablets().forTable(tableId).overlapping(startRow, endRow) - .fetch(PREV_ROW, COMPACTED, FILES, SELECTED, ECOMP, OPID).checkConsistency().build(); - var tabletsMutator = ample.conditionallyMutateTablets()) { + var fateStr = FateTxId.formatTid(tid); + + Consumer resultConsumer = result -> { + if (result.getStatus() == Status.REJECTED) { + log.debug("{} update for {} was rejected ", fateStr, result.getExtent()); + } + }; - int complete = 0; - int total = 0; + long t1 = System.currentTimeMillis(); - int selected = 0; + int complete = 0; + int total = 0; + int opidsSeen = 0; + int noFiles = 0; + int noneSelected = 0; + int alreadySelected = 0; + int otherSelected = 0; + int otherCompaction = 0; + int selected = 0; - KeyExtent minSelected = null; - KeyExtent maxSelected = null; + KeyExtent minSelected = null; + KeyExtent maxSelected = null; + + try ( + var tablets = ample.readTablets().forTable(tableId).overlapping(startRow, endRow) + .fetch(PREV_ROW, COMPACTED, FILES, SELECTED, ECOMP, OPID).checkConsistency().build(); + var tabletsMutator = ample.conditionallyMutateTablets(resultConsumer)) { CompactionConfig config = CompactionConfigStorage.getConfig(manager.getContext(), tid); @@ -156,53 +174,54 @@ public int updateAndCheckTablets(Manager manager, long tid) total++; - // TODO change all logging to trace - if (tablet.getCompacted().contains(tid)) { // this tablet is already considered done - log.debug("{} compaction for {} is complete", FateTxId.formatTid(tid), - tablet.getExtent()); + log.trace("{} compaction for {} is complete", fateStr, tablet.getExtent()); complete++; } else if (tablet.getOperationId() != null) { - log.debug("{} ignoring tablet {} with active operation {} ", FateTxId.formatTid(tid), - tablet.getExtent(), tablet.getOperationId()); + log.trace("{} ignoring tablet {} with active operation {} ", fateStr, tablet.getExtent(), + tablet.getOperationId()); + opidsSeen++; } else if (tablet.getFiles().isEmpty()) { - log.debug("{} tablet {} has no files, attempting to mark as compacted ", - FateTxId.formatTid(tid), tablet.getExtent()); + log.trace("{} tablet {} has no files, attempting to mark as compacted ", fateStr, + tablet.getExtent()); // this tablet has no files try to mark it as done tabletsMutator.mutateTablet(tablet.getExtent()).requireAbsentOperation() .requireSame(tablet, FILES, COMPACTED).putCompacted(tid) .submit(tabletMetadata -> tabletMetadata.getCompacted().contains(tid)); + noFiles++; } else if (tablet.getSelectedFiles() == null && tablet.getExternalCompactions().isEmpty()) { // there are no selected files - log.debug("{} selecting {} files compaction for {}", FateTxId.formatTid(tid), - tablet.getFiles().size(), tablet.getExtent()); + log.trace("{} selecting {} files compaction for {}", fateStr, tablet.getFiles().size(), + tablet.getExtent()); Set filesToCompact; try { filesToCompact = CompactionPluginUtils.selectFiles(manager.getContext(), tablet.getExtent(), config, tablet.getFilesMap()); } catch (Exception e) { - log.warn("{} failed to select files for {} using {}", FateTxId.formatTid(tid), - tablet.getExtent(), config.getSelector(), e); + log.warn("{} failed to select files for {} using {}", fateStr, tablet.getExtent(), + config.getSelector(), e); throw new AcceptableThriftTableOperationException(tableId.canonical(), null, TableOperation.COMPACT, TableOperationExceptionType.OTHER, "Failed to select files"); } - // TODO expensive logging - log.debug("{} selected {} of {} files for {}", FateTxId.formatTid(tid), - filesToCompact.stream().map(AbstractTabletFile::getFileName) - .collect(Collectors.toList()), - tablet.getFiles().stream().map(AbstractTabletFile::getFileName) - .collect(Collectors.toList()), - tablet.getExtent()); - + if (log.isTraceEnabled()) { + log.trace("{} selected {} of {} files for {}", fateStr, + filesToCompact.stream().map(AbstractTabletFile::getFileName) + .collect(Collectors.toList()), + tablet.getFiles().stream().map(AbstractTabletFile::getFileName) + .collect(Collectors.toList()), + tablet.getExtent()); + } if (filesToCompact.isEmpty()) { // no files were selected so mark the tablet as compacted tabletsMutator.mutateTablet(tablet.getExtent()).requireAbsentOperation() .requireSame(tablet, FILES, SELECTED, ECOMP, COMPACTED).putCompacted(tid) .submit(tabletMetadata -> tabletMetadata.getCompacted().contains(tid)); + + noneSelected++; } else { var mutator = tabletsMutator.mutateTablet(tablet.getExtent()).requireAbsentOperation() .requireSame(tablet, FILES, SELECTED, ECOMP, COMPACTED); @@ -228,39 +247,42 @@ public int updateAndCheckTablets(Manager manager, long tid) } else if (tablet.getSelectedFiles() != null) { if (tablet.getSelectedFiles().getFateTxId() == tid) { - log.debug( + log.trace( "{} tablet {} already has {} selected files for this compaction, waiting for them be processed", - FateTxId.formatTid(tid), tablet.getExtent(), - tablet.getSelectedFiles().getFiles().size()); + fateStr, tablet.getExtent(), tablet.getSelectedFiles().getFiles().size()); + alreadySelected++; } else { - log.debug( + log.trace( "{} tablet {} already has {} selected files by another compaction {}, waiting for them be processed", - FateTxId.formatTid(tid), tablet.getExtent(), - tablet.getSelectedFiles().getFiles().size(), + fateStr, tablet.getExtent(), tablet.getSelectedFiles().getFiles().size(), FateTxId.formatTid(tablet.getSelectedFiles().getFateTxId())); + otherSelected++; } } else { // ELASTICITY_TODO if there are compactions preventing selection of files, then add // selecting marker that prevents new compactions from starting + otherCompaction++; } } + } catch (InterruptedException | KeeperException e) { + throw new RuntimeException(e); + } - tabletsMutator.process().values().stream() - .filter(result -> result.getStatus() == Status.REJECTED) - .forEach(result -> log.debug("{} update for {} was rejected ", FateTxId.formatTid(tid), - result.getExtent())); + long t2 = System.currentTimeMillis(); - if (selected > 0) { - manager.getEventCoordinator().event( - new KeyExtent(tableId, maxSelected.endRow(), minSelected.prevEndRow()), - "%s selected files for compaction for %d tablets", FateTxId.formatTid(tid), selected); - } + log.debug("{} tablet stats, total:{} complete:{} selected_now:{} selected_prev:{}" + + " selected_by_other:{} no_files:{} none_selected:{} other_compaction:{} opids:{} scan_update_time:{}ms", + fateStr, total, complete, selected, alreadySelected, otherSelected, noFiles, noneSelected, + otherCompaction, opidsSeen, t2 - t1); - return total - complete; - } catch (InterruptedException | KeeperException e) { - throw new RuntimeException(e); + if (selected > 0) { + manager.getEventCoordinator().event( + new KeyExtent(tableId, maxSelected.endRow(), minSelected.prevEndRow()), + "%s selected files for compaction for %d tablets", fateStr, selected); } + return total - complete; + // ELASTICITIY_TODO need to handle seeing zero tablets } @@ -293,12 +315,22 @@ private void cleanupTabletMetadata(long tid, Manager manager) throws Exception { .incrementBy(100, MILLISECONDS).maxWait(1, SECONDS).backOffFactor(1.5) .logInterval(3, MINUTES).createRetry(); + var fateStr = FateTxId.formatTid(tid); + while (!allCleanedUp) { + AtomicLong rejectedCount = new AtomicLong(0); + Consumer resultConsumer = result -> { + if (result.getStatus() == Status.REJECTED) { + log.debug("{} update for {} was rejected ", fateStr, result.getExtent()); + rejectedCount.incrementAndGet(); + } + }; + try ( var tablets = ample.readTablets().forTable(tableId).overlapping(startRow, endRow) .fetch(PREV_ROW, COMPACTED, SELECTED).checkConsistency().build(); - var tabletsMutator = ample.conditionallyMutateTablets()) { + var tabletsMutator = ample.conditionallyMutateTablets(resultConsumer)) { Predicate needsUpdate = tabletMetadata -> (tabletMetadata.getSelectedFiles() != null && tabletMetadata.getSelectedFiles().getFateTxId() == tid) @@ -322,14 +354,12 @@ private void cleanupTabletMetadata(long tid, Manager manager) throws Exception { mutator.submit(needsNoUpdate::test); } } - - allCleanedUp = tabletsMutator.process().values().stream() - .allMatch(result -> result.getStatus() == Status.ACCEPTED); } + allCleanedUp = rejectedCount.get() == 0; + if (!allCleanedUp) { - retry.waitForNextAttempt(log, - "Cleanup metadata for failed compaction " + FateTxId.formatTid(tid)); + retry.waitForNextAttempt(log, "Cleanup metadata for failed compaction " + fateStr); } } } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/ReserveTablets.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/ReserveTablets.java index afdd5fc155b..79ffc5e5422 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/ReserveTablets.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/ReserveTablets.java @@ -23,11 +23,10 @@ import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW; import java.util.concurrent.atomic.AtomicLong; -import java.util.function.BiConsumer; +import java.util.function.Consumer; import org.apache.accumulo.core.data.NamespaceId; import org.apache.accumulo.core.data.TableId; -import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.fate.Repo; import org.apache.accumulo.core.metadata.schema.Ample; import org.apache.accumulo.core.metadata.schema.TabletOperationId; @@ -58,11 +57,11 @@ public long isReady(long tid, Manager manager) throws Exception { // The consumer may be called in another thread so use an AtomicLong AtomicLong accepted = new AtomicLong(0); - BiConsumer resultsConsumer = (extent, result) -> { + Consumer resultsConsumer = result -> { if (result.getStatus() == Ample.ConditionalResult.Status.ACCEPTED) { accepted.incrementAndGet(); } else { - log.debug("Failed to set operation id {} {}", opid, extent); + log.debug("Failed to set operation id {} {}", opid, result.getExtent()); } }; diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/DeleteTablets.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/DeleteTablets.java index f34708681a4..d34a8f83554 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/DeleteTablets.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/DeleteTablets.java @@ -20,7 +20,7 @@ import java.util.Objects; import java.util.concurrent.atomic.AtomicLong; -import java.util.function.BiConsumer; +import java.util.function.Consumer; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.fate.FateTxId; @@ -66,11 +66,11 @@ public Repo call(long tid, Manager manager) throws Exception { AtomicLong acceptedCount = new AtomicLong(); AtomicLong rejectedCount = new AtomicLong(); // delete tablets - BiConsumer resultConsumer = (extent, result) -> { + Consumer resultConsumer = result -> { if (result.getStatus() == Ample.ConditionalResult.Status.ACCEPTED) { acceptedCount.incrementAndGet(); } else { - log.error("{} failed to update {}", fateStr, extent); + log.error("{} failed to update {}", fateStr, result.getExtent()); rejectedCount.incrementAndGet(); } }; diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/FinishTableRangeOp.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/FinishTableRangeOp.java index 9ae16c1ac44..c58efca8852 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/FinishTableRangeOp.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/FinishTableRangeOp.java @@ -23,7 +23,7 @@ import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW; import java.util.concurrent.atomic.AtomicLong; -import java.util.function.BiConsumer; +import java.util.function.Consumer; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.fate.FateTxId; @@ -68,11 +68,11 @@ static void removeOperationIds(Logger log, MergeInfo data, long tid, Manager man AtomicLong acceptedCount = new AtomicLong(); AtomicLong rejectedCount = new AtomicLong(); // delete tablets - BiConsumer resultConsumer = (extent, result) -> { + Consumer resultConsumer = result -> { if (result.getStatus() == Ample.ConditionalResult.Status.ACCEPTED) { acceptedCount.incrementAndGet(); } else { - log.error("{} failed to update {}", fateStr, extent); + log.error("{} failed to update {}", fateStr, result.getExtent()); rejectedCount.incrementAndGet(); } }; diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/ReserveTablets.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/ReserveTablets.java index 3a4e742d7cc..e2fd71e65b3 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/ReserveTablets.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/ReserveTablets.java @@ -24,9 +24,8 @@ import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW; import java.util.concurrent.atomic.AtomicLong; -import java.util.function.BiConsumer; +import java.util.function.Consumer; -import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.fate.FateTxId; import org.apache.accumulo.core.fate.Repo; import org.apache.accumulo.core.metadata.schema.Ample; @@ -59,7 +58,7 @@ public long isReady(long tid, Manager env) throws Exception { var opid = TabletOperationId.from(TabletOperationType.MERGING, tid); AtomicLong opsAccepted = new AtomicLong(0); - BiConsumer resultConsumer = (extent, result) -> { + Consumer resultConsumer = result -> { if (result.getStatus() == Status.ACCEPTED) { opsAccepted.incrementAndGet(); } diff --git a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java index 3a7e2d3c9dd..4934a4ae4a6 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java @@ -46,7 +46,7 @@ import java.util.SortedSet; import java.util.TreeSet; import java.util.concurrent.atomic.AtomicLong; -import java.util.function.BiConsumer; +import java.util.function.Consumer; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -823,7 +823,7 @@ public void testAsyncMutator() throws Exception { AtomicLong accepted = new AtomicLong(0); AtomicLong total = new AtomicLong(0); - BiConsumer resultsConsumer = (extent, result) -> { + Consumer resultsConsumer = result -> { if (result.getStatus() == Status.ACCEPTED) { accepted.incrementAndGet(); } diff --git a/test/src/main/java/org/apache/accumulo/test/functional/SplitMillionIT.java b/test/src/main/java/org/apache/accumulo/test/functional/SplitMillionIT.java index 3bd25e47338..3ed25d330fd 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/SplitMillionIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/SplitMillionIT.java @@ -20,6 +20,8 @@ import static org.junit.jupiter.api.Assertions.assertEquals; +import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Random; import java.util.SortedSet; @@ -29,9 +31,14 @@ import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.admin.CloneConfiguration; +import org.apache.accumulo.core.client.admin.CompactionConfig; +import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.Filter; import org.apache.accumulo.harness.AccumuloClusterHarness; import org.apache.hadoop.io.Text; import org.junit.jupiter.api.Test; @@ -42,11 +49,20 @@ public class SplitMillionIT extends AccumuloClusterHarness { + private static final Logger log = LoggerFactory.getLogger(SplitMillionIT.class); + + public static class XFilter extends Filter { + + @Override + public boolean accept(Key k, Value v) { + return !k.getColumnQualifierData().toString().equals("x"); + } + } + @SuppressFBWarnings(value = {"PREDICTABLE_RANDOM", "DMI_RANDOM_USED_ONLY_ONCE"}, justification = "predictable random is ok for testing") @Test public void testOneMillionTablets() throws Exception { - Logger log = LoggerFactory.getLogger(SplitIT.class); try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { String tableName = getUniqueNames(1)[0]; @@ -96,7 +112,7 @@ public void testOneMillionTablets() throws Exception { } long t3 = System.currentTimeMillis(); - verifyRow(c, tableName, row); + verifyRow(c, tableName, row, Map.of("x", "200", "y", "900", "z", "300")); long t4 = System.currentTimeMillis(); log.info("Row: {} scan1: {}ms write: {}ms scan2: {}ms", row, t2 - t1, t3 - t2, t4 - t3); } @@ -105,7 +121,18 @@ public void testOneMillionTablets() throws Exception { long count = c.tableOperations().getTabletInformation(tableName, new Range()).count(); long t2 = System.currentTimeMillis(); assertEquals(1_000_000, count); - log.info("Time to scan all tablets : {}ms", t2 - t1); + log.info("Time to scan all tablets information : {}ms", t2 - t1); + + t1 = System.currentTimeMillis(); + var iterSetting = new IteratorSetting(100, XFilter.class); + c.tableOperations().compact(tableName, + new CompactionConfig().setIterators(List.of(iterSetting)).setWait(true).setFlush(true)); + t2 = System.currentTimeMillis(); + assertEquals(1_000_000, count); + log.info("Time to compact all tablets : {}ms", t2 - t1); + + var expected = Map.of("y", "900", "z", "300"); + vefifyData(rows, c, tableName, expected); // clone the table to test cloning with lots of tablets and also to give merge its own table // to work on @@ -114,6 +141,7 @@ public void testOneMillionTablets() throws Exception { c.tableOperations().clone(tableName, cloneName, CloneConfiguration.builder().build()); t2 = System.currentTimeMillis(); log.info("Time to clone table : {}ms", t2 - t1); + vefifyData(rows, c, cloneName, expected); // merge the clone, so that delete table can run later on tablet with lots and lots of tablets t1 = System.currentTimeMillis(); @@ -121,11 +149,7 @@ public void testOneMillionTablets() throws Exception { t2 = System.currentTimeMillis(); log.info("Time to merge all tablets : {}ms", t2 - t1); - // verify data after merge - for (var rowInt : rows) { - var row = String.format("%010d", rowInt); - verifyRow(c, cloneName, row); - } + vefifyData(rows, c, cloneName, expected); t1 = System.currentTimeMillis(); c.tableOperations().delete(tableName); @@ -134,12 +158,37 @@ public void testOneMillionTablets() throws Exception { } } - private void verifyRow(AccumuloClient c, String tableName, String row) throws Exception { + private void vefifyData(int[] rows, AccumuloClient c, String tableName, + Map expected) throws Exception { + // use a batch scanner so that many hosting request can be submitted at the same time + long t1 = System.currentTimeMillis(); + try (var scanner = c.createBatchScanner(tableName)) { + var ranges = IntStream.of(rows).mapToObj(row -> String.format("%010d", row)).map(Range::new) + .collect(Collectors.toList()); + scanner.setRanges(ranges); + Map> allCoords = new HashMap<>(); + scanner.forEach((k, v) -> { + var row = k.getRowData().toString(); + var qual = k.getColumnQualifierData().toString(); + var val = v.toString(); + allCoords.computeIfAbsent(row, r -> new HashMap<>()).put(qual, val); + }); + + assertEquals(IntStream.of(rows).mapToObj(row -> String.format("%010d", row)) + .collect(Collectors.toSet()), allCoords.keySet()); + allCoords.values().forEach(coords -> assertEquals(expected, coords)); + } + long t2 = System.currentTimeMillis(); + log.info("Time to verify {} rows was {}ms", rows.length, t2 - t1); + } + + private void verifyRow(AccumuloClient c, String tableName, String row, + Map expected) throws Exception { try (var scanner = c.createScanner(tableName)) { scanner.setRange(new Range(row)); Map coords = scanner.stream().collect(Collectors .toMap(e -> e.getKey().getColumnQualifier().toString(), e -> e.getValue().toString())); - assertEquals(Map.of("x", "200", "y", "900", "z", "300"), coords); + assertEquals(expected, coords); } }