From 209c4778d019139d743899049a582b7010b0bf79 Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Sat, 11 Nov 2023 14:56:22 -0500 Subject: [PATCH 1/2] adds support for compacting lots of tablets Removes buffering of all conditional results in memory in the compaction code. This allows compacting more tablets than would fit in memory. Changed the condition result BiConsumer to a Consumer because it was passing the extents twice, so a BiConsumer was not needed and made the code more verbose. Updated the compaction code to collect stats and log a trace. It was logging a lot of per tablet information at debug. When compacting one million tablets, this resulted in a lot of information in the manager logs. Moved the per tablet information to trace logging. Added collection of stats for the different per tablet information and logged the stats once for all tablets scanned. Added compaction to the SplitMillionIT. Without the other changes in this PR adding compaction to the SplitMillion would cause the Manager to die with an out of memory error because the conditional write was buffering all 1 million tablets. --- .../accumulo/core/metadata/schema/Ample.java | 6 +- .../AsyncConditionalTabletsMutatorImpl.java | 12 +- .../server/metadata/ServerAmpleImpl.java | 3 +- .../manager/tableOps/compact/CleanUp.java | 41 +++++-- .../tableOps/compact/CompactionDriver.java | 116 +++++++++++------- .../tableOps/delete/ReserveTablets.java | 7 +- .../manager/tableOps/merge/DeleteTablets.java | 6 +- .../tableOps/merge/FinishTableRangeOp.java | 6 +- .../tableOps/merge/ReserveTablets.java | 5 +- .../functional/AmpleConditionalWriterIT.java | 4 +- .../test/functional/SplitMillionIT.java | 69 +++++++++-- 11 files changed, 186 insertions(+), 89 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java index 44f17157754..5983efb1121 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java @@ -24,7 +24,7 @@ import java.util.Objects; import java.util.Set; import java.util.UUID; -import java.util.function.BiConsumer; +import java.util.function.Consumer; import java.util.function.Predicate; import java.util.stream.Stream; @@ -213,7 +213,7 @@ default TabletsMutator mutateTablets() { * An entry point for updating tablets metadata using a conditional writer. The returned mutator * will buffer everything in memory until {@link ConditionalTabletsMutator#process()} is called. * If buffering everything in memory is undesirable, then consider using - * {@link #conditionallyMutateTablets(BiConsumer)} + * {@link #conditionallyMutateTablets(Consumer)} * * @see ConditionalTabletMutator#submit(RejectionHandler) */ @@ -237,7 +237,7 @@ default ConditionalTabletsMutator conditionallyMutateTablets() { * @see ConditionalTabletMutator#submit(RejectionHandler) */ default AsyncConditionalTabletsMutator - conditionallyMutateTablets(BiConsumer resultsConsumer) { + conditionallyMutateTablets(Consumer resultsConsumer) { throw new UnsupportedOperationException(); } diff --git a/server/base/src/main/java/org/apache/accumulo/server/metadata/AsyncConditionalTabletsMutatorImpl.java b/server/base/src/main/java/org/apache/accumulo/server/metadata/AsyncConditionalTabletsMutatorImpl.java index 0d31f71eef7..d7666361f7b 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/metadata/AsyncConditionalTabletsMutatorImpl.java +++ b/server/base/src/main/java/org/apache/accumulo/server/metadata/AsyncConditionalTabletsMutatorImpl.java @@ -24,7 +24,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.function.BiConsumer; +import java.util.function.Consumer; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.metadata.schema.Ample; @@ -32,7 +32,7 @@ import org.apache.accumulo.server.ServerContext; public class AsyncConditionalTabletsMutatorImpl implements Ample.AsyncConditionalTabletsMutator { - private final BiConsumer resultsConsumer; + private final Consumer resultsConsumer; private final ExecutorService executor; private Future> backgroundProcessing = null; private ConditionalTabletsMutatorImpl bufferingMutator; @@ -41,7 +41,7 @@ public class AsyncConditionalTabletsMutatorImpl implements Ample.AsyncConditiona public static final int BATCH_SIZE = 1000; AsyncConditionalTabletsMutatorImpl(ServerContext context, - BiConsumer resultsConsumer) { + Consumer resultsConsumer) { this.resultsConsumer = Objects.requireNonNull(resultsConsumer); this.bufferingMutator = new ConditionalTabletsMutatorImpl(context); this.context = context; @@ -58,7 +58,7 @@ public Ample.OperationRequirements mutateTablet(KeyExtent extent) { if (backgroundProcessing != null) { // a previous batch of mutations was submitted for processing so wait on it. try { - backgroundProcessing.get().forEach(resultsConsumer); + backgroundProcessing.get().values().forEach(resultsConsumer); } catch (InterruptedException | ExecutionException e) { throw new IllegalStateException(e); } @@ -85,13 +85,13 @@ public void close() { if (backgroundProcessing != null) { // a previous batch of mutations was submitted for processing so wait on it. try { - backgroundProcessing.get().forEach(resultsConsumer); + backgroundProcessing.get().values().forEach(resultsConsumer); } catch (InterruptedException | ExecutionException e) { throw new IllegalStateException(e); } } // process anything not processed so far - bufferingMutator.process().forEach(resultsConsumer); + bufferingMutator.process().values().forEach(resultsConsumer); bufferingMutator.close(); executor.shutdownNow(); } diff --git a/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java b/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java index a42bc5f4885..d0c1dd71e33 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java +++ b/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java @@ -28,7 +28,6 @@ import java.util.Objects; import java.util.Set; import java.util.UUID; -import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -96,7 +95,7 @@ public ConditionalTabletsMutator conditionallyMutateTablets() { @Override public AsyncConditionalTabletsMutator - conditionallyMutateTablets(BiConsumer resultsConsumer) { + conditionallyMutateTablets(Consumer resultsConsumer) { return new AsyncConditionalTabletsMutatorImpl(context, resultsConsumer); } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CleanUp.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CleanUp.java index 9b864e3f3fd..e1c6e1f504e 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CleanUp.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CleanUp.java @@ -22,11 +22,14 @@ import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW; import java.time.Duration; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; import org.apache.accumulo.core.data.NamespaceId; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.fate.FateTxId; import org.apache.accumulo.core.fate.Repo; +import org.apache.accumulo.core.metadata.schema.Ample; import org.apache.accumulo.core.metadata.schema.Ample.ConditionalResult.Status; import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.manager.Manager; @@ -59,32 +62,46 @@ public long isReady(long tid, Manager manager) throws Exception { var ample = manager.getContext().getAmple(); + var fateStr = FateTxId.formatTid(tid); + + AtomicLong rejectedCount = new AtomicLong(0); + Consumer resultConsumer = result -> { + if (result.getStatus() == Status.REJECTED) { + log.debug("{} update for {} was rejected ", fateStr, result.getExtent()); + rejectedCount.incrementAndGet(); + } + }; + + long t1, t2, submitted = 0, total = 0; + try ( var tablets = ample.readTablets().forTable(tableId).overlapping(startRow, endRow) .fetch(PREV_ROW, COMPACTED).checkConsistency().build(); - var tabletsMutator = ample.conditionallyMutateTablets()) { + var tabletsMutator = ample.conditionallyMutateTablets(resultConsumer)) { - long t1 = System.nanoTime(); + t1 = System.nanoTime(); for (TabletMetadata tablet : tablets) { + total++; if (tablet.getCompacted().contains(tid)) { tabletsMutator.mutateTablet(tablet.getExtent()).requireAbsentOperation() .requireSame(tablet, COMPACTED).deleteCompacted(tid) .submit(tabletMetadata -> !tabletMetadata.getCompacted().contains(tid)); + submitted++; } } - long rejected = tabletsMutator.process().values().stream() - .filter(result -> result.getStatus() == Status.REJECTED).peek(result -> log - .debug("{} update for {} was rejected ", FateTxId.formatTid(tid), result.getExtent())) - .count(); + t2 = System.nanoTime(); + } - long t2 = System.nanoTime(); + long scanTime = Duration.ofNanos(t2 - t1).toMillis(); - if (rejected > 0) { - long sleepTime = Duration.ofNanos(t2 - t1).toMillis(); - sleepTime = Math.max(100, Math.min(30000, sleepTime * 2)); - return sleepTime; - } + log.debug("{} removed {} of {} compacted markers for {} tablets in {}ms", fateStr, + submitted - rejectedCount.get(), submitted, total, scanTime); + + if (rejectedCount.get() > 0) { + long sleepTime = scanTime; + sleepTime = Math.max(100, Math.min(30000, sleepTime * 2)); + return sleepTime; } return 0; diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java index 21d0f059025..630f2e831bf 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java @@ -29,6 +29,8 @@ import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SELECTED; import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -46,6 +48,7 @@ import org.apache.accumulo.core.metadata.AbstractTabletFile; import org.apache.accumulo.core.metadata.RootTable; import org.apache.accumulo.core.metadata.StoredTabletFile; +import org.apache.accumulo.core.metadata.schema.Ample; import org.apache.accumulo.core.metadata.schema.Ample.ConditionalResult.Status; import org.apache.accumulo.core.metadata.schema.SelectedFiles; import org.apache.accumulo.core.metadata.schema.TabletMetadata; @@ -137,18 +140,33 @@ public int updateAndCheckTablets(Manager manager, long tid) // ELASTICITY_TODO use existing compaction logging - try ( - var tablets = ample.readTablets().forTable(tableId).overlapping(startRow, endRow) - .fetch(PREV_ROW, COMPACTED, FILES, SELECTED, ECOMP, OPID).checkConsistency().build(); - var tabletsMutator = ample.conditionallyMutateTablets()) { + var fateStr = FateTxId.formatTid(tid); + + Consumer resultConsumer = result -> { + if (result.getStatus() == Status.REJECTED) { + log.debug("{} update for {} was rejected ", fateStr, result.getExtent()); + } + }; + + long t1 = System.currentTimeMillis(); - int complete = 0; - int total = 0; + int complete = 0; + int total = 0; + int opidsSeen = 0; + int noFiles = 0; + int noneSelected = 0; + int alreadySelected = 0; + int otherSelected = 0; + int otherCompaction = 0; + int selected = 0; - int selected = 0; + KeyExtent minSelected = null; + KeyExtent maxSelected = null; - KeyExtent minSelected = null; - KeyExtent maxSelected = null; + try ( + var tablets = ample.readTablets().forTable(tableId).overlapping(startRow, endRow) + .fetch(PREV_ROW, COMPACTED, FILES, SELECTED, ECOMP, OPID).checkConsistency().build(); + var tabletsMutator = ample.conditionallyMutateTablets(resultConsumer)) { CompactionConfig config = CompactionConfigStorage.getConfig(manager.getContext(), tid); @@ -156,26 +174,26 @@ public int updateAndCheckTablets(Manager manager, long tid) total++; - // TODO change all logging to trace - if (tablet.getCompacted().contains(tid)) { // this tablet is already considered done - log.debug("{} compaction for {} is complete", FateTxId.formatTid(tid), + log.trace("{} compaction for {} is complete", FateTxId.formatTid(tid), tablet.getExtent()); complete++; } else if (tablet.getOperationId() != null) { - log.debug("{} ignoring tablet {} with active operation {} ", FateTxId.formatTid(tid), + log.trace("{} ignoring tablet {} with active operation {} ", FateTxId.formatTid(tid), tablet.getExtent(), tablet.getOperationId()); + opidsSeen++; } else if (tablet.getFiles().isEmpty()) { - log.debug("{} tablet {} has no files, attempting to mark as compacted ", + log.trace("{} tablet {} has no files, attempting to mark as compacted ", FateTxId.formatTid(tid), tablet.getExtent()); // this tablet has no files try to mark it as done tabletsMutator.mutateTablet(tablet.getExtent()).requireAbsentOperation() .requireSame(tablet, FILES, COMPACTED).putCompacted(tid) .submit(tabletMetadata -> tabletMetadata.getCompacted().contains(tid)); + noFiles++; } else if (tablet.getSelectedFiles() == null && tablet.getExternalCompactions().isEmpty()) { // there are no selected files - log.debug("{} selecting {} files compaction for {}", FateTxId.formatTid(tid), + log.trace("{} selecting {} files compaction for {}", FateTxId.formatTid(tid), tablet.getFiles().size(), tablet.getExtent()); Set filesToCompact; @@ -190,19 +208,21 @@ public int updateAndCheckTablets(Manager manager, long tid) "Failed to select files"); } - // TODO expensive logging - log.debug("{} selected {} of {} files for {}", FateTxId.formatTid(tid), - filesToCompact.stream().map(AbstractTabletFile::getFileName) - .collect(Collectors.toList()), - tablet.getFiles().stream().map(AbstractTabletFile::getFileName) - .collect(Collectors.toList()), - tablet.getExtent()); - + if (log.isTraceEnabled()) { + log.trace("{} selected {} of {} files for {}", FateTxId.formatTid(tid), + filesToCompact.stream().map(AbstractTabletFile::getFileName) + .collect(Collectors.toList()), + tablet.getFiles().stream().map(AbstractTabletFile::getFileName) + .collect(Collectors.toList()), + tablet.getExtent()); + } if (filesToCompact.isEmpty()) { // no files were selected so mark the tablet as compacted tabletsMutator.mutateTablet(tablet.getExtent()).requireAbsentOperation() .requireSame(tablet, FILES, SELECTED, ECOMP, COMPACTED).putCompacted(tid) .submit(tabletMetadata -> tabletMetadata.getCompacted().contains(tid)); + + noneSelected++; } else { var mutator = tabletsMutator.mutateTablet(tablet.getExtent()).requireAbsentOperation() .requireSame(tablet, FILES, SELECTED, ECOMP, COMPACTED); @@ -228,39 +248,44 @@ public int updateAndCheckTablets(Manager manager, long tid) } else if (tablet.getSelectedFiles() != null) { if (tablet.getSelectedFiles().getFateTxId() == tid) { - log.debug( + log.trace( "{} tablet {} already has {} selected files for this compaction, waiting for them be processed", FateTxId.formatTid(tid), tablet.getExtent(), tablet.getSelectedFiles().getFiles().size()); + alreadySelected++; } else { - log.debug( + log.trace( "{} tablet {} already has {} selected files by another compaction {}, waiting for them be processed", FateTxId.formatTid(tid), tablet.getExtent(), tablet.getSelectedFiles().getFiles().size(), FateTxId.formatTid(tablet.getSelectedFiles().getFateTxId())); + otherSelected++; } } else { // ELASTICITY_TODO if there are compactions preventing selection of files, then add // selecting marker that prevents new compactions from starting + otherCompaction++; } } + } catch (InterruptedException | KeeperException e) { + throw new RuntimeException(e); + } - tabletsMutator.process().values().stream() - .filter(result -> result.getStatus() == Status.REJECTED) - .forEach(result -> log.debug("{} update for {} was rejected ", FateTxId.formatTid(tid), - result.getExtent())); + long t2 = System.currentTimeMillis(); - if (selected > 0) { - manager.getEventCoordinator().event( - new KeyExtent(tableId, maxSelected.endRow(), minSelected.prevEndRow()), - "%s selected files for compaction for %d tablets", FateTxId.formatTid(tid), selected); - } + log.debug("{} tablet stats, total:{} complete:{} selected_now:{} selected_prev:{}" + + " selected_by_other:{} no_files:{} none_selected:{} other_compaction:{} opids:{} scan_update_time:{}ms", + fateStr, total, complete, selected, alreadySelected, otherSelected, noFiles, noneSelected, + otherCompaction, opidsSeen, t2 - t1); - return total - complete; - } catch (InterruptedException | KeeperException e) { - throw new RuntimeException(e); + if (selected > 0) { + manager.getEventCoordinator().event( + new KeyExtent(tableId, maxSelected.endRow(), minSelected.prevEndRow()), + "%s selected files for compaction for %d tablets", FateTxId.formatTid(tid), selected); } + return total - complete; + // ELASTICITIY_TODO need to handle seeing zero tablets } @@ -293,12 +318,22 @@ private void cleanupTabletMetadata(long tid, Manager manager) throws Exception { .incrementBy(100, MILLISECONDS).maxWait(1, SECONDS).backOffFactor(1.5) .logInterval(3, MINUTES).createRetry(); + var fateStr = FateTxId.formatTid(tid); + while (!allCleanedUp) { + AtomicLong rejectedCount = new AtomicLong(0); + Consumer resultConsumer = result -> { + if (result.getStatus() == Status.REJECTED) { + log.debug("{} update for {} was rejected ", fateStr, result.getExtent()); + rejectedCount.incrementAndGet(); + } + }; + try ( var tablets = ample.readTablets().forTable(tableId).overlapping(startRow, endRow) .fetch(PREV_ROW, COMPACTED, SELECTED).checkConsistency().build(); - var tabletsMutator = ample.conditionallyMutateTablets()) { + var tabletsMutator = ample.conditionallyMutateTablets(resultConsumer)) { Predicate needsUpdate = tabletMetadata -> (tabletMetadata.getSelectedFiles() != null && tabletMetadata.getSelectedFiles().getFateTxId() == tid) @@ -322,11 +357,10 @@ private void cleanupTabletMetadata(long tid, Manager manager) throws Exception { mutator.submit(needsNoUpdate::test); } } - - allCleanedUp = tabletsMutator.process().values().stream() - .allMatch(result -> result.getStatus() == Status.ACCEPTED); } + allCleanedUp = rejectedCount.get() == 0; + if (!allCleanedUp) { retry.waitForNextAttempt(log, "Cleanup metadata for failed compaction " + FateTxId.formatTid(tid)); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/ReserveTablets.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/ReserveTablets.java index afdd5fc155b..79ffc5e5422 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/ReserveTablets.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/ReserveTablets.java @@ -23,11 +23,10 @@ import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW; import java.util.concurrent.atomic.AtomicLong; -import java.util.function.BiConsumer; +import java.util.function.Consumer; import org.apache.accumulo.core.data.NamespaceId; import org.apache.accumulo.core.data.TableId; -import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.fate.Repo; import org.apache.accumulo.core.metadata.schema.Ample; import org.apache.accumulo.core.metadata.schema.TabletOperationId; @@ -58,11 +57,11 @@ public long isReady(long tid, Manager manager) throws Exception { // The consumer may be called in another thread so use an AtomicLong AtomicLong accepted = new AtomicLong(0); - BiConsumer resultsConsumer = (extent, result) -> { + Consumer resultsConsumer = result -> { if (result.getStatus() == Ample.ConditionalResult.Status.ACCEPTED) { accepted.incrementAndGet(); } else { - log.debug("Failed to set operation id {} {}", opid, extent); + log.debug("Failed to set operation id {} {}", opid, result.getExtent()); } }; diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/DeleteTablets.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/DeleteTablets.java index f34708681a4..d34a8f83554 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/DeleteTablets.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/DeleteTablets.java @@ -20,7 +20,7 @@ import java.util.Objects; import java.util.concurrent.atomic.AtomicLong; -import java.util.function.BiConsumer; +import java.util.function.Consumer; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.fate.FateTxId; @@ -66,11 +66,11 @@ public Repo call(long tid, Manager manager) throws Exception { AtomicLong acceptedCount = new AtomicLong(); AtomicLong rejectedCount = new AtomicLong(); // delete tablets - BiConsumer resultConsumer = (extent, result) -> { + Consumer resultConsumer = result -> { if (result.getStatus() == Ample.ConditionalResult.Status.ACCEPTED) { acceptedCount.incrementAndGet(); } else { - log.error("{} failed to update {}", fateStr, extent); + log.error("{} failed to update {}", fateStr, result.getExtent()); rejectedCount.incrementAndGet(); } }; diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/FinishTableRangeOp.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/FinishTableRangeOp.java index 9ae16c1ac44..c58efca8852 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/FinishTableRangeOp.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/FinishTableRangeOp.java @@ -23,7 +23,7 @@ import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW; import java.util.concurrent.atomic.AtomicLong; -import java.util.function.BiConsumer; +import java.util.function.Consumer; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.fate.FateTxId; @@ -68,11 +68,11 @@ static void removeOperationIds(Logger log, MergeInfo data, long tid, Manager man AtomicLong acceptedCount = new AtomicLong(); AtomicLong rejectedCount = new AtomicLong(); // delete tablets - BiConsumer resultConsumer = (extent, result) -> { + Consumer resultConsumer = result -> { if (result.getStatus() == Ample.ConditionalResult.Status.ACCEPTED) { acceptedCount.incrementAndGet(); } else { - log.error("{} failed to update {}", fateStr, extent); + log.error("{} failed to update {}", fateStr, result.getExtent()); rejectedCount.incrementAndGet(); } }; diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/ReserveTablets.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/ReserveTablets.java index 3a4e742d7cc..e2fd71e65b3 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/ReserveTablets.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/ReserveTablets.java @@ -24,9 +24,8 @@ import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW; import java.util.concurrent.atomic.AtomicLong; -import java.util.function.BiConsumer; +import java.util.function.Consumer; -import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.fate.FateTxId; import org.apache.accumulo.core.fate.Repo; import org.apache.accumulo.core.metadata.schema.Ample; @@ -59,7 +58,7 @@ public long isReady(long tid, Manager env) throws Exception { var opid = TabletOperationId.from(TabletOperationType.MERGING, tid); AtomicLong opsAccepted = new AtomicLong(0); - BiConsumer resultConsumer = (extent, result) -> { + Consumer resultConsumer = result -> { if (result.getStatus() == Status.ACCEPTED) { opsAccepted.incrementAndGet(); } diff --git a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java index 3a7e2d3c9dd..4934a4ae4a6 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java @@ -46,7 +46,7 @@ import java.util.SortedSet; import java.util.TreeSet; import java.util.concurrent.atomic.AtomicLong; -import java.util.function.BiConsumer; +import java.util.function.Consumer; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -823,7 +823,7 @@ public void testAsyncMutator() throws Exception { AtomicLong accepted = new AtomicLong(0); AtomicLong total = new AtomicLong(0); - BiConsumer resultsConsumer = (extent, result) -> { + Consumer resultsConsumer = result -> { if (result.getStatus() == Status.ACCEPTED) { accepted.incrementAndGet(); } diff --git a/test/src/main/java/org/apache/accumulo/test/functional/SplitMillionIT.java b/test/src/main/java/org/apache/accumulo/test/functional/SplitMillionIT.java index 3bd25e47338..cc1030ad7e8 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/SplitMillionIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/SplitMillionIT.java @@ -20,6 +20,8 @@ import static org.junit.jupiter.api.Assertions.assertEquals; +import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Random; import java.util.SortedSet; @@ -29,9 +31,14 @@ import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.admin.CloneConfiguration; +import org.apache.accumulo.core.client.admin.CompactionConfig; +import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.Filter; import org.apache.accumulo.harness.AccumuloClusterHarness; import org.apache.hadoop.io.Text; import org.junit.jupiter.api.Test; @@ -42,11 +49,20 @@ public class SplitMillionIT extends AccumuloClusterHarness { + private static final Logger log = LoggerFactory.getLogger(SplitIT.class); + + public static class XFilter extends Filter { + + @Override + public boolean accept(Key k, Value v) { + return !k.getColumnQualifierData().toString().equals("x"); + } + } + @SuppressFBWarnings(value = {"PREDICTABLE_RANDOM", "DMI_RANDOM_USED_ONLY_ONCE"}, justification = "predictable random is ok for testing") @Test public void testOneMillionTablets() throws Exception { - Logger log = LoggerFactory.getLogger(SplitIT.class); try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { String tableName = getUniqueNames(1)[0]; @@ -96,7 +112,7 @@ public void testOneMillionTablets() throws Exception { } long t3 = System.currentTimeMillis(); - verifyRow(c, tableName, row); + verifyRow(c, tableName, row, Map.of("x", "200", "y", "900", "z", "300")); long t4 = System.currentTimeMillis(); log.info("Row: {} scan1: {}ms write: {}ms scan2: {}ms", row, t2 - t1, t3 - t2, t4 - t3); } @@ -105,7 +121,18 @@ public void testOneMillionTablets() throws Exception { long count = c.tableOperations().getTabletInformation(tableName, new Range()).count(); long t2 = System.currentTimeMillis(); assertEquals(1_000_000, count); - log.info("Time to scan all tablets : {}ms", t2 - t1); + log.info("Time to scan all tablets information : {}ms", t2 - t1); + + t1 = System.currentTimeMillis(); + var iterSetting = new IteratorSetting(100, XFilter.class); + c.tableOperations().compact(tableName, + new CompactionConfig().setIterators(List.of(iterSetting)).setWait(true).setFlush(true)); + t2 = System.currentTimeMillis(); + assertEquals(1_000_000, count); + log.info("Time to compact all tablets : {}ms", t2 - t1); + + var expected = Map.of("y", "900", "z", "300"); + vefifyData(rows, c, tableName, expected); // clone the table to test cloning with lots of tablets and also to give merge its own table // to work on @@ -114,6 +141,7 @@ public void testOneMillionTablets() throws Exception { c.tableOperations().clone(tableName, cloneName, CloneConfiguration.builder().build()); t2 = System.currentTimeMillis(); log.info("Time to clone table : {}ms", t2 - t1); + vefifyData(rows, c, cloneName, expected); // merge the clone, so that delete table can run later on tablet with lots and lots of tablets t1 = System.currentTimeMillis(); @@ -121,11 +149,7 @@ public void testOneMillionTablets() throws Exception { t2 = System.currentTimeMillis(); log.info("Time to merge all tablets : {}ms", t2 - t1); - // verify data after merge - for (var rowInt : rows) { - var row = String.format("%010d", rowInt); - verifyRow(c, cloneName, row); - } + vefifyData(rows, c, cloneName, expected); t1 = System.currentTimeMillis(); c.tableOperations().delete(tableName); @@ -134,12 +158,37 @@ public void testOneMillionTablets() throws Exception { } } - private void verifyRow(AccumuloClient c, String tableName, String row) throws Exception { + private void vefifyData(int[] rows, AccumuloClient c, String tableName, + Map expected) throws Exception { + // use a batch scanner so that many hosting request can be submitted at the same time + long t1 = System.currentTimeMillis(); + try (var scanner = c.createBatchScanner(tableName)) { + var ranges = IntStream.of(rows).mapToObj(row -> String.format("%010d", row)).map(Range::new) + .collect(Collectors.toList()); + scanner.setRanges(ranges); + Map> allCoords = new HashMap<>(); + scanner.forEach((k, v) -> { + var row = k.getRowData().toString(); + var qual = k.getColumnQualifierData().toString(); + var val = v.toString(); + allCoords.computeIfAbsent(row, r -> new HashMap<>()).put(qual, val); + }); + + assertEquals(IntStream.of(rows).mapToObj(row -> String.format("%010d", row)) + .collect(Collectors.toSet()), allCoords.keySet()); + allCoords.values().forEach(coords -> assertEquals(expected, coords)); + } + long t2 = System.currentTimeMillis(); + log.info("Time to verify {} rows was {}ms", rows.length, t2 - t1); + } + + private void verifyRow(AccumuloClient c, String tableName, String row, + Map expected) throws Exception { try (var scanner = c.createScanner(tableName)) { scanner.setRange(new Range(row)); Map coords = scanner.stream().collect(Collectors .toMap(e -> e.getKey().getColumnQualifier().toString(), e -> e.getValue().toString())); - assertEquals(Map.of("x", "200", "y", "900", "z", "300"), coords); + assertEquals(expected, coords); } } From b573f095a8fd1da80471208b42986396388853ff Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Wed, 15 Nov 2023 10:26:50 -0500 Subject: [PATCH 2/2] code review updates --- .../tableOps/compact/CompactionDriver.java | 32 ++++++++----------- .../test/functional/SplitMillionIT.java | 2 +- 2 files changed, 15 insertions(+), 19 deletions(-) diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java index 630f2e831bf..6fd1a969aa4 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java @@ -176,16 +176,15 @@ public int updateAndCheckTablets(Manager manager, long tid) if (tablet.getCompacted().contains(tid)) { // this tablet is already considered done - log.trace("{} compaction for {} is complete", FateTxId.formatTid(tid), - tablet.getExtent()); + log.trace("{} compaction for {} is complete", fateStr, tablet.getExtent()); complete++; } else if (tablet.getOperationId() != null) { - log.trace("{} ignoring tablet {} with active operation {} ", FateTxId.formatTid(tid), - tablet.getExtent(), tablet.getOperationId()); + log.trace("{} ignoring tablet {} with active operation {} ", fateStr, tablet.getExtent(), + tablet.getOperationId()); opidsSeen++; } else if (tablet.getFiles().isEmpty()) { - log.trace("{} tablet {} has no files, attempting to mark as compacted ", - FateTxId.formatTid(tid), tablet.getExtent()); + log.trace("{} tablet {} has no files, attempting to mark as compacted ", fateStr, + tablet.getExtent()); // this tablet has no files try to mark it as done tabletsMutator.mutateTablet(tablet.getExtent()).requireAbsentOperation() .requireSame(tablet, FILES, COMPACTED).putCompacted(tid) @@ -193,23 +192,23 @@ public int updateAndCheckTablets(Manager manager, long tid) noFiles++; } else if (tablet.getSelectedFiles() == null && tablet.getExternalCompactions().isEmpty()) { // there are no selected files - log.trace("{} selecting {} files compaction for {}", FateTxId.formatTid(tid), - tablet.getFiles().size(), tablet.getExtent()); + log.trace("{} selecting {} files compaction for {}", fateStr, tablet.getFiles().size(), + tablet.getExtent()); Set filesToCompact; try { filesToCompact = CompactionPluginUtils.selectFiles(manager.getContext(), tablet.getExtent(), config, tablet.getFilesMap()); } catch (Exception e) { - log.warn("{} failed to select files for {} using {}", FateTxId.formatTid(tid), - tablet.getExtent(), config.getSelector(), e); + log.warn("{} failed to select files for {} using {}", fateStr, tablet.getExtent(), + config.getSelector(), e); throw new AcceptableThriftTableOperationException(tableId.canonical(), null, TableOperation.COMPACT, TableOperationExceptionType.OTHER, "Failed to select files"); } if (log.isTraceEnabled()) { - log.trace("{} selected {} of {} files for {}", FateTxId.formatTid(tid), + log.trace("{} selected {} of {} files for {}", fateStr, filesToCompact.stream().map(AbstractTabletFile::getFileName) .collect(Collectors.toList()), tablet.getFiles().stream().map(AbstractTabletFile::getFileName) @@ -250,14 +249,12 @@ public int updateAndCheckTablets(Manager manager, long tid) if (tablet.getSelectedFiles().getFateTxId() == tid) { log.trace( "{} tablet {} already has {} selected files for this compaction, waiting for them be processed", - FateTxId.formatTid(tid), tablet.getExtent(), - tablet.getSelectedFiles().getFiles().size()); + fateStr, tablet.getExtent(), tablet.getSelectedFiles().getFiles().size()); alreadySelected++; } else { log.trace( "{} tablet {} already has {} selected files by another compaction {}, waiting for them be processed", - FateTxId.formatTid(tid), tablet.getExtent(), - tablet.getSelectedFiles().getFiles().size(), + fateStr, tablet.getExtent(), tablet.getSelectedFiles().getFiles().size(), FateTxId.formatTid(tablet.getSelectedFiles().getFateTxId())); otherSelected++; } @@ -281,7 +278,7 @@ public int updateAndCheckTablets(Manager manager, long tid) if (selected > 0) { manager.getEventCoordinator().event( new KeyExtent(tableId, maxSelected.endRow(), minSelected.prevEndRow()), - "%s selected files for compaction for %d tablets", FateTxId.formatTid(tid), selected); + "%s selected files for compaction for %d tablets", fateStr, selected); } return total - complete; @@ -362,8 +359,7 @@ private void cleanupTabletMetadata(long tid, Manager manager) throws Exception { allCleanedUp = rejectedCount.get() == 0; if (!allCleanedUp) { - retry.waitForNextAttempt(log, - "Cleanup metadata for failed compaction " + FateTxId.formatTid(tid)); + retry.waitForNextAttempt(log, "Cleanup metadata for failed compaction " + fateStr); } } } diff --git a/test/src/main/java/org/apache/accumulo/test/functional/SplitMillionIT.java b/test/src/main/java/org/apache/accumulo/test/functional/SplitMillionIT.java index cc1030ad7e8..3ed25d330fd 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/SplitMillionIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/SplitMillionIT.java @@ -49,7 +49,7 @@ public class SplitMillionIT extends AccumuloClusterHarness { - private static final Logger log = LoggerFactory.getLogger(SplitIT.class); + private static final Logger log = LoggerFactory.getLogger(SplitMillionIT.class); public static class XFilter extends Filter {