From e67e3e013259733a7ff2ddbd62f84da4eaf4f873 Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Thu, 4 Jan 2024 17:46:25 -0500 Subject: [PATCH 1/3] Avoids reading all fate ids into memory. As we change Accumulo to use FATE for per tablet operations its important to avoid reading all FATEs persisted data into memory. This commit modifies FATE to use Streams internally instead of Collections. For the Accumulo implemention of FATE storage this makes it possible to have java stream backed by a scanner which avoids reading all of the FATE ids into memory. The Zookeeper storage implementation will still read everything into memory. Another change that was made in the PR was optimizing the Accumulo storage layer to read the status while reading the id. Before this change ids were read from scanner, then for each id a scanner was created to read the status. Now the status and id are read in stream from the same scanner which should be much faster. This change was not possible for Zookeeper, it will still make an RPC to get each status. Its ok that Zookeeper store is less efficient as the Accumulo store will likely store orders of magnitude more data. Its probably not possible to make the same optimizations for speed and memory in the zookeeper store. A bug in the Fate integration test was fixed by using the Unknown status which represents the status for transaction that does not exists in the persisted store. Ran into this bug while testing these changes. --- .../accumulo/core/fate/AbstractFateStore.java | 83 ++++++++++--------- .../apache/accumulo/core/fate/AdminUtil.java | 54 ++++++------ .../accumulo/core/fate/AgeOffStore.java | 14 ++-- .../org/apache/accumulo/core/fate/Fate.java | 17 ++-- .../accumulo/core/fate/ReadOnlyFateStore.java | 14 ++-- .../apache/accumulo/core/fate/ZooStore.java | 29 ++++++- .../core/fate/accumulo/AccumuloStore.java | 30 ++++++- .../accumulo/core/logging/FateLogger.java | 10 +-- .../accumulo/core/fate/AgeOffStoreTest.java | 28 +++---- .../apache/accumulo/core/fate/TestStore.java | 9 +- .../manager/upgrade/UpgradeCoordinator.java | 16 ++-- .../org/apache/accumulo/test/fate/FateIT.java | 32 +------ .../test/fate/accumulo/AccumuloFateIT.java | 13 +-- .../accumulo/AccumuloStoreReadWriteIT.java | 10 +-- .../test/fate/zookeeper/ZookeeperFateIT.java | 28 ++----- 15 files changed, 191 insertions(+), 196 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java index 874b58d8c6d..6fd231e3388 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java @@ -27,17 +27,17 @@ import java.io.ObjectOutputStream; import java.io.Serializable; import java.io.UncheckedIOException; -import java.util.ArrayList; import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; -import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.LongConsumer; +import java.util.stream.Stream; import org.apache.accumulo.core.fate.Fate.TxInfo; import org.slf4j.Logger; @@ -124,37 +124,34 @@ public FateTxStore reserve(long tid) { } @Override - public Iterator runnable(AtomicBoolean keepWaiting) { + public void runnable(AtomicBoolean keepWaiting, LongConsumer idConsumer) { - while (keepWaiting.get()) { - ArrayList runnableTids = new ArrayList<>(); + AtomicLong seen = new AtomicLong(0); + while (keepWaiting.get() && seen.get() == 0) { final long beforeCount = unreservedRunnableCount.getCount(); - List transactions = getTransactions(); - for (String txidStr : transactions) { - long txid = parseTid(txidStr); - if (isRunnable(_getStatus(txid))) { - runnableTids.add(txid); - } - } - - synchronized (this) { - runnableTids.removeIf(txid -> { - var deferredTime = deferred.get(txid); - if (deferredTime != null) { - if (deferredTime >= System.currentTimeMillis()) { - return true; - } else { - deferred.remove(txid); - } - } - - return reserved.contains(txid); - }); + try (Stream transactions = getTransactions()) { + transactions.filter(fateIdStatus -> isRunnable(fateIdStatus.getStatus())) + .mapToLong(FateIdStatus::getTxid).filter(txid -> { + synchronized (AbstractFateStore.this) { + var deferredTime = deferred.get(txid); + if (deferredTime != null) { + if (deferredTime >= System.currentTimeMillis()) { + return false; + } else { + deferred.remove(txid); + } + } + return !reserved.contains(txid); + } + }).forEach(txid -> { + seen.incrementAndGet(); + idConsumer.accept(txid); + }); } - if (runnableTids.isEmpty()) { + if (seen.get() == 0) { if (beforeCount == unreservedRunnableCount.getCount()) { long waitTime = 5000; if (!deferred.isEmpty()) { @@ -167,23 +164,13 @@ public Iterator runnable(AtomicBoolean keepWaiting) { keepWaiting::get); } } - } else { - return runnableTids.iterator(); } - } - - return Collections.emptyIterator(); } @Override - public List list() { - ArrayList l = new ArrayList<>(); - List transactions = getTransactions(); - for (String txid : transactions) { - l.add(parseTid(txid)); - } - return l; + public Stream list() { + return getTransactions().map(fateIdStatus -> fateIdStatus.txid); } @Override @@ -200,7 +187,21 @@ protected long parseTid(String txdir) { return Long.parseLong(txdir.split("_")[1], 16); } - protected abstract List getTransactions(); + public static abstract class FateIdStatus { + private final long txid; + + public FateIdStatus(long txid) { + this.txid = txid; + } + + public long getTxid() { + return txid; + } + + public abstract TStatus getStatus(); + } + + protected abstract Stream getTransactions(); protected abstract TStatus _getStatus(long tid); diff --git a/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java b/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java index 95ef99448f5..06974119f00 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java @@ -32,6 +32,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import java.util.stream.Stream; import org.apache.accumulo.core.fate.FateStore.FateTxStore; import org.apache.accumulo.core.fate.ReadOnlyFateStore.ReadOnlyFateTxStore; @@ -338,44 +339,45 @@ private FateStatus getTransactionStatus(ReadOnlyFateStore zs, Set filte EnumSet filterStatus, Map> heldLocks, Map> waitingLocks) { - List transactions = zs.list(); - List statuses = new ArrayList<>(transactions.size()); + try (Stream tids = zs.list()) { + List statuses = new ArrayList<>(); - for (Long tid : transactions) { + tids.forEach(tid -> { - ReadOnlyFateTxStore txStore = zs.read(tid); + ReadOnlyFateTxStore txStore = zs.read(tid); - String txName = (String) txStore.getTransactionInfo(Fate.TxInfo.TX_NAME); + String txName = (String) txStore.getTransactionInfo(Fate.TxInfo.TX_NAME); - List hlocks = heldLocks.remove(tid); + List hlocks = heldLocks.remove(tid); - if (hlocks == null) { - hlocks = Collections.emptyList(); - } - - List wlocks = waitingLocks.remove(tid); + if (hlocks == null) { + hlocks = Collections.emptyList(); + } - if (wlocks == null) { - wlocks = Collections.emptyList(); - } + List wlocks = waitingLocks.remove(tid); - String top = null; - ReadOnlyRepo repo = txStore.top(); - if (repo != null) { - top = repo.getName(); - } + if (wlocks == null) { + wlocks = Collections.emptyList(); + } - TStatus status = txStore.getStatus(); + String top = null; + ReadOnlyRepo repo = txStore.top(); + if (repo != null) { + top = repo.getName(); + } - long timeCreated = txStore.timeCreated(); + TStatus status = txStore.getStatus(); - if (includeByStatus(status, filterStatus) && includeByTxid(tid, filterTxid)) { - statuses.add(new TransactionStatus(tid, status, txName, hlocks, wlocks, top, timeCreated)); - } - } + long timeCreated = txStore.timeCreated(); - return new FateStatus(statuses, heldLocks, waitingLocks); + if (includeByStatus(status, filterStatus) && includeByTxid(tid, filterTxid)) { + statuses + .add(new TransactionStatus(tid, status, txName, hlocks, wlocks, top, timeCreated)); + } + }); + return new FateStatus(statuses, heldLocks, waitingLocks); + } } private boolean includeByStatus(TStatus status, EnumSet filterStatus) { diff --git a/core/src/main/java/org/apache/accumulo/core/fate/AgeOffStore.java b/core/src/main/java/org/apache/accumulo/core/fate/AgeOffStore.java index f61c06028ca..2443619c5ff 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/AgeOffStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/AgeOffStore.java @@ -20,12 +20,14 @@ import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.LongConsumer; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -124,7 +126,9 @@ public AgeOffStore(FateStore store, long ageOffTime, TimeSource timeSource) { minTime = Long.MAX_VALUE; - List txids = store.list(); + // ELASTICITY_TODO need to rework how this class works so that it does not buffer everything in + // memory. + List txids = store.list().collect(Collectors.toList()); for (Long txid : txids) { FateTxStore txStore = store.reserve(txid); try { @@ -198,12 +202,12 @@ public ReadOnlyFateTxStore read(long tid) { } @Override - public List list() { + public Stream list() { return store.list(); } @Override - public Iterator runnable(AtomicBoolean keepWaiting) { - return store.runnable(keepWaiting); + public void runnable(AtomicBoolean keepWaiting, LongConsumer idConsumer) { + store.runnable(keepWaiting, idConsumer); } } diff --git a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java index a54ad734ee7..6d7df5d32b5 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java @@ -89,12 +89,9 @@ private class WorkFinder implements Runnable { public void run() { while (keepRunning.get()) { try { - var iter = store.runnable(keepRunning); - - while (iter.hasNext() && keepRunning.get()) { - Long txid = iter.next(); - try { - while (keepRunning.get()) { + store.runnable(keepRunning, txid -> { + while (keepRunning.get()) { + try { // The reason for calling transfer instead of queueing is avoid rescanning the // storage layer and adding the same thing over and over. For example if all threads // were busy, the queue size was 100, and there are three runnable things in the @@ -103,12 +100,12 @@ public void run() { if (workQueue.tryTransfer(txid, 100, MILLISECONDS)) { break; } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalStateException(e); } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new IllegalStateException(e); } - } + }); } catch (Exception e) { if (keepRunning.get()) { log.warn("Failure while attempting to find work for fate", e); diff --git a/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java index f0140de3674..e525c89ff9c 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java @@ -20,9 +20,10 @@ import java.io.Serializable; import java.util.EnumSet; -import java.util.Iterator; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.LongConsumer; +import java.util.stream.Stream; /** * Read only access to a Transaction Store. @@ -122,12 +123,13 @@ interface ReadOnlyFateTxStore { * * @return all outstanding transactions, including those reserved by others. */ - List list(); + Stream list(); /** - * @return an iterator over fate op ids that are (IN_PROGRESS or FAILED_IN_PROGRESS) and - * unreserved. This method will block until it finds something that is runnable or until - * the keepWaiting parameter is false. + * Finds all fate ops that are (IN_PROGRESS, SUBMITTED, or FAILED_IN_PROGRESS) and unreserved. Ids + * that are found are passed to the consumer. This method will block until at least one runnable + * is found or until the keepWaiting parameter is false. It will return once all runnable ids + * found were passed to the consumer. */ - Iterator runnable(AtomicBoolean keepWaiting); + void runnable(AtomicBoolean keepWaiting, LongConsumer idConsumer); } diff --git a/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java b/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java index 969aed07178..e5194b7287c 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java @@ -27,6 +27,8 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.function.Supplier; +import java.util.stream.Stream; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy; @@ -39,6 +41,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Suppliers; + //TODO use zoocache? - ACCUMULO-1297 //TODO handle zookeeper being down gracefully - ACCUMULO-1297 @@ -297,10 +301,31 @@ protected FateTxStore newFateTxStore(long tid, boolean isReserved) { return new FateTxStoreImpl(tid, isReserved); } + private static class ZooFateIdStatus extends FateIdStatus { + + private final Supplier statusSupplier; + + public ZooFateIdStatus(long txid, Supplier statusSupplier) { + super(txid); + this.statusSupplier = statusSupplier; + } + + @Override + public TStatus getStatus() { + return statusSupplier.get(); + } + } + @Override - protected List getTransactions() { + protected Stream getTransactions() { try { - return zk.getChildren(path); + return zk.getChildren(path).stream().map(strTxid -> { + long txid = parseTid(strTxid); + // Memoizing for two reasons. First the status may never be requested, so in the case avoid + // the lookup. Second, if its requested multiple times the result will always be consistent. + Supplier statusSupplier = Suppliers.memoize(() -> _getStatus(txid)); + return new ZooFateIdStatus(txid, statusSupplier); + }); } catch (KeeperException | InterruptedException e) { throw new IllegalStateException(e); } diff --git a/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java index 1c1284696e6..4a6712c03d2 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java @@ -26,6 +26,7 @@ import java.util.Optional; import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.TableNotFoundException; @@ -69,13 +70,34 @@ public long create() { return tid; } + private static class AccumuoFateIdStatus extends FateIdStatus { + private final TStatus status; + + public AccumuoFateIdStatus(long txid, TStatus status) { + super(txid); + this.status = status; + } + + @Override + public TStatus getStatus() { + return status; + } + } + @Override - protected List getTransactions() { - return scanTx(scanner -> { + protected Stream getTransactions() { + try { + Scanner scanner = context.createScanner(tableName, Authorizations.EMPTY); scanner.setRange(new Range()); TxColumnFamily.STATUS_COLUMN.fetch(scanner); - return scanner.stream().map(e -> e.getKey().getRow().toString()).collect(Collectors.toList()); - }); + return scanner.stream().onClose(scanner::close).map(e -> { + long txid = parseTid(e.getKey().getRow().toString()); + TStatus status = TStatus.valueOf(e.getValue().toString()); + return new AccumuoFateIdStatus(txid, status); + }); + } catch (TableNotFoundException e) { + throw new IllegalStateException(tableName + " not found!", e); + } } @Override diff --git a/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java b/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java index d85e417650a..bac25c321bb 100644 --- a/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java +++ b/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java @@ -21,11 +21,11 @@ import static org.apache.accumulo.core.fate.FateTxId.formatTid; import java.io.Serializable; -import java.util.Iterator; -import java.util.List; import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; +import java.util.function.LongConsumer; +import java.util.stream.Stream; import org.apache.accumulo.core.fate.Fate; import org.apache.accumulo.core.fate.FateStore; @@ -115,13 +115,13 @@ public ReadOnlyFateTxStore read(long tid) { } @Override - public List list() { + public Stream list() { return store.list(); } @Override - public Iterator runnable(AtomicBoolean keepWaiting) { - return store.runnable(keepWaiting); + public void runnable(AtomicBoolean keepWaiting, LongConsumer idConsumer) { + store.runnable(keepWaiting, idConsumer); } @Override diff --git a/core/src/test/java/org/apache/accumulo/core/fate/AgeOffStoreTest.java b/core/src/test/java/org/apache/accumulo/core/fate/AgeOffStoreTest.java index d2530ce1f3a..49290ec7fed 100644 --- a/core/src/test/java/org/apache/accumulo/core/fate/AgeOffStoreTest.java +++ b/core/src/test/java/org/apache/accumulo/core/fate/AgeOffStoreTest.java @@ -18,9 +18,9 @@ */ package org.apache.accumulo.core.fate; +import static java.util.stream.Collectors.toSet; import static org.junit.jupiter.api.Assertions.assertEquals; -import java.util.HashSet; import java.util.Set; import org.apache.accumulo.core.fate.AgeOffStore.TimeSource; @@ -74,22 +74,19 @@ public void testBasic() throws InterruptedException, KeeperException { aoStore.ageOff(); - assertEquals(Set.of(txid1, txid2, txid3, txid4), new HashSet<>(aoStore.list())); - assertEquals(4, new HashSet<>(aoStore.list()).size()); + assertEquals(Set.of(txid1, txid2, txid3, txid4), aoStore.list().collect(toSet())); tts.time = 15; aoStore.ageOff(); - assertEquals(Set.of(txid1, txid3, txid4), new HashSet<>(aoStore.list())); - assertEquals(3, new HashSet<>(aoStore.list()).size()); + assertEquals(Set.of(txid1, txid3, txid4), aoStore.list().collect(toSet())); tts.time = 30; aoStore.ageOff(); - assertEquals(Set.of(txid1), new HashSet<>(aoStore.list())); - assertEquals(1, Set.of(aoStore.list()).size()); + assertEquals(Set.of(txid1), aoStore.list().collect(toSet())); } @Test @@ -119,20 +116,17 @@ public void testNonEmpty() throws InterruptedException, KeeperException { AgeOffStore aoStore = new AgeOffStore<>(testStore, 10, tts); - assertEquals(Set.of(txid1, txid2, txid3, txid4), new HashSet<>(aoStore.list())); - assertEquals(4, new HashSet<>(aoStore.list()).size()); + assertEquals(Set.of(txid1, txid2, txid3, txid4), aoStore.list().collect(toSet())); aoStore.ageOff(); - assertEquals(Set.of(txid1, txid2, txid3, txid4), new HashSet<>(aoStore.list())); - assertEquals(4, new HashSet<>(aoStore.list()).size()); + assertEquals(Set.of(txid1, txid2, txid3, txid4), aoStore.list().collect(toSet())); tts.time = 15; aoStore.ageOff(); - assertEquals(Set.of(txid1), new HashSet<>(aoStore.list())); - assertEquals(1, new HashSet<>(aoStore.list()).size()); + assertEquals(Set.of(txid1), aoStore.list().collect(toSet())); txStore1 = aoStore.reserve(txid1); txStore1.setStatus(TStatus.FAILED_IN_PROGRESS); @@ -142,8 +136,7 @@ public void testNonEmpty() throws InterruptedException, KeeperException { aoStore.ageOff(); - assertEquals(Set.of(txid1), new HashSet<>(aoStore.list())); - assertEquals(1, new HashSet<>(aoStore.list()).size()); + assertEquals(Set.of(txid1), aoStore.list().collect(toSet())); txStore1 = aoStore.reserve(txid1); txStore1.setStatus(TStatus.FAILED); @@ -151,13 +144,12 @@ public void testNonEmpty() throws InterruptedException, KeeperException { aoStore.ageOff(); - assertEquals(Set.of(txid1), new HashSet<>(aoStore.list())); - assertEquals(1, new HashSet<>(aoStore.list()).size()); + assertEquals(Set.of(txid1), aoStore.list().collect(toSet())); tts.time = 42; aoStore.ageOff(); - assertEquals(0, new HashSet<>(aoStore.list()).size()); + assertEquals(0, aoStore.list().count()); } } diff --git a/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java b/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java index 058b0c50a4b..a0c82cc6992 100644 --- a/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java +++ b/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java @@ -23,12 +23,13 @@ import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.LongConsumer; +import java.util.stream.Stream; /** * Transient in memory store for transactions. @@ -165,12 +166,12 @@ public ReadOnlyFateTxStore read(long tid) { } @Override - public List list() { - return new ArrayList<>(statuses.keySet()); + public Stream list() { + return new ArrayList<>(statuses.keySet()).stream(); } @Override - public Iterator runnable(AtomicBoolean keepWaiting) { + public void runnable(AtomicBoolean keepWaiting, LongConsumer idConsumer) { throw new UnsupportedOperationException(); } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java index 2bcaad0e1bb..8436a187377 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java @@ -274,13 +274,15 @@ private void abortIfFateTransactions(ServerContext context) { try { final ReadOnlyFateStore fate = new ZooStore<>( context.getZooKeeperRoot() + Constants.ZFATE, context.getZooReaderWriter()); - if (!fate.list().isEmpty()) { - throw new AccumuloException("Aborting upgrade because there are" - + " outstanding FATE transactions from a previous Accumulo version." - + " You can start the tservers and then use the shell to delete completed " - + " transactions. If there are incomplete transactions, you will need to roll" - + " back and fix those issues. Please see the following page for more information: " - + " https://accumulo.apache.org/docs/2.x/troubleshooting/advanced#upgrade-issues"); + try (var idStream = fate.list()) { + if (idStream.findFirst().isPresent()) { + throw new AccumuloException("Aborting upgrade because there are" + + " outstanding FATE transactions from a previous Accumulo version." + + " You can start the tservers and then use the shell to delete completed " + + " transactions. If there are incomplete transactions, you will need to roll" + + " back and fix those issues. Please see the following page for more information: " + + " https://accumulo.apache.org/docs/2.x/troubleshooting/advanced#upgrade-issues"); + } } } catch (Exception exception) { log.error("Problem verifying Fate readiness", exception); diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java b/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java index 217f68e5c76..e424be4e873 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java @@ -23,13 +23,11 @@ import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.IN_PROGRESS; import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.NEW; import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.SUBMITTED; -import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.SUCCESSFUL; +import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.UNKNOWN; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -import java.util.NoSuchElementException; import java.util.concurrent.CountDownLatch; import org.apache.accumulo.core.conf.ConfigurationCopy; @@ -131,22 +129,7 @@ protected void testTransactionStatus(FateStore store, ServerContext sct // tell the op to exit the method finishCall.countDown(); - // TODO: This check seems like a race condition that might - // need to be fixed as occasionally the test fails because it was - // already removed so that seems to indicate things are removed - // before can check it was SUCCESSFUL - TStatus s = getTxStatus(sctx, txid); - while (s != SUCCESSFUL) { - s = getTxStatus(sctx, txid); - Thread.sleep(10); - } - // Check that it gets removed - boolean removed = false; - while (!removed) { - removed = verifyRemoved(sctx, txid); - Thread.sleep(10); - } - + Wait.waitFor(() -> getTxStatus(sctx, txid) == UNKNOWN); } finally { fate.shutdown(); } @@ -184,7 +167,7 @@ protected void testCancelWhileNew(FateStore store, ServerContext sctx) // nothing should have run assertEquals(1, callStarted.getCount()); fate.delete(txid); - assertThrows(getNoTxExistsException(), () -> getTxStatus(sctx, txid)); + assertEquals(UNKNOWN, getTxStatus(sctx, txid)); } finally { fate.shutdown(); } @@ -223,7 +206,7 @@ protected void testCancelWhileSubmittedAndRunning(FateStore store, Serv finishCall.countDown(); Wait.waitFor(() -> IN_PROGRESS != getTxStatus(sctx, txid)); fate.delete(txid); - assertThrows(getNoTxExistsException(), () -> getTxStatus(sctx, txid)); + assertEquals(UNKNOWN, getTxStatus(sctx, txid)); } finally { fate.shutdown(); } @@ -267,8 +250,6 @@ protected void testCancelWhileInCall(FateStore store, ServerContext sct protected abstract TStatus getTxStatus(ServerContext sctx, long txid) throws Exception; - protected abstract boolean verifyRemoved(ServerContext sctx, long txid); - protected abstract void executeTest(FateTestExecutor testMethod) throws Exception; protected interface FateTestExecutor { @@ -281,9 +262,4 @@ private static void inCall() throws InterruptedException { // wait for the signal to exit the method finishCall.await(); } - - protected Class getNoTxExistsException() { - return NoSuchElementException.class; - } - } diff --git a/test/src/main/java/org/apache/accumulo/test/fate/accumulo/AccumuloFateIT.java b/test/src/main/java/org/apache/accumulo/test/fate/accumulo/AccumuloFateIT.java index fe33cb92c11..8e494829d61 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/accumulo/AccumuloFateIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/accumulo/AccumuloFateIT.java @@ -18,7 +18,6 @@ */ package org.apache.accumulo.test.fate.accumulo; -import java.util.NoSuchElementException; import java.util.stream.StreamSupport; import org.apache.accumulo.core.client.Accumulo; @@ -69,22 +68,12 @@ protected TStatus getTxStatus(ServerContext context, long txid) { scanner.setRange(getRow(txid)); TxColumnFamily.STATUS_COLUMN.fetch(scanner); return StreamSupport.stream(scanner.spliterator(), false) - .map(e -> TStatus.valueOf(e.getValue().toString())).findFirst().orElseThrow(); + .map(e -> TStatus.valueOf(e.getValue().toString())).findFirst().orElse(TStatus.UNKNOWN); } catch (TableNotFoundException e) { throw new IllegalStateException(table + " not found!", e); } } - @Override - protected boolean verifyRemoved(ServerContext sctx, long txid) { - try { - getTxStatus(sctx, txid); - } catch (NoSuchElementException e) { - return true; - } - return false; - } - private static Range getRow(long tid) { return new Range("tx_" + FastFormat.toHexString(tid)); } diff --git a/test/src/main/java/org/apache/accumulo/test/fate/accumulo/AccumuloStoreReadWriteIT.java b/test/src/main/java/org/apache/accumulo/test/fate/accumulo/AccumuloStoreReadWriteIT.java index 16294585884..587299a0db2 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/accumulo/AccumuloStoreReadWriteIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/accumulo/AccumuloStoreReadWriteIT.java @@ -66,13 +66,13 @@ public void testReadWrite() throws Exception { AccumuloStore store = new AccumuloStore<>(client, table); // Verify no transactions - assertEquals(0, store.list().size()); + assertEquals(0, store.list().count()); // Create a new transaction and get the store for it long tid = store.create(); FateTxStore txStore = store.reserve(tid); assertTrue(txStore.timeCreated() > 0); - assertEquals(1, store.list().size()); + assertEquals(1, store.list().count()); // Push a test FATE op and verify we can read it back txStore.push(new TestRepo("testOp")); @@ -109,13 +109,13 @@ public void testReadWrite() throws Exception { // create second FateTxStore txStore2 = store.reserve(store.create()); - assertEquals(2, store.list().size()); + assertEquals(2, store.list().count()); // test delete txStore.delete(); - assertEquals(1, store.list().size()); + assertEquals(1, store.list().count()); txStore2.delete(); - assertEquals(0, store.list().size()); + assertEquals(0, store.list().count()); } } diff --git a/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/ZookeeperFateIT.java b/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/ZookeeperFateIT.java index b9c29e85bb4..3810030bb4a 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/ZookeeperFateIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/ZookeeperFateIT.java @@ -23,7 +23,6 @@ import static org.easymock.EasyMock.createMock; import static org.easymock.EasyMock.expect; import static org.easymock.EasyMock.replay; -import static org.junit.jupiter.api.Assertions.fail; import java.io.File; import java.util.UUID; @@ -78,33 +77,12 @@ protected void executeTest(FateTestExecutor testMethod) throws Exception { testMethod.execute(store, sctx); } - @Override - protected Class getNoTxExistsException() { - return KeeperException.NoNodeException.class; - } - @Override protected TStatus getTxStatus(ServerContext sctx, long txid) throws InterruptedException, KeeperException { return getTxStatus(sctx.getZooReaderWriter(), txid); } - @Override - protected boolean verifyRemoved(ServerContext sctx, long txid) { - try { - getTxStatus(sctx, txid); - } catch (KeeperException e) { - if (e.code() == KeeperException.Code.NONODE) { - return true; - } else { - fail("Unexpected error thrown: " + e.getMessage()); - } - } catch (InterruptedException e) { - throw new IllegalStateException(e); - } - return false; - } - /* * Get the status of the TX from ZK directly. Unable to call ZooStore.getStatus because this test * thread does not have the reservation (the FaTE thread does) @@ -113,7 +91,11 @@ private static TStatus getTxStatus(ZooReaderWriter zrw, long txid) throws KeeperException, InterruptedException { zrw.sync(ZK_ROOT); String txdir = String.format("%s%s/tx_%016x", ZK_ROOT, Constants.ZFATE, txid); - return TStatus.valueOf(new String(zrw.getData(txdir), UTF_8)); + try { + return TStatus.valueOf(new String(zrw.getData(txdir), UTF_8)); + } catch (KeeperException.NoNodeException e) { + return TStatus.UNKNOWN; + } } } From 3170c5847d83699c5c477ad4b787ba91367865e1 Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Thu, 4 Jan 2024 18:28:52 -0500 Subject: [PATCH 2/3] replace innner classes w/ anonymous classes --- .../apache/accumulo/core/fate/ZooStore.java | 27 ++++++------------- .../core/fate/accumulo/AccumuloStore.java | 23 +++++----------- 2 files changed, 14 insertions(+), 36 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java b/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java index e5194b7287c..b4ad365dfa3 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java @@ -301,30 +301,19 @@ protected FateTxStore newFateTxStore(long tid, boolean isReserved) { return new FateTxStoreImpl(tid, isReserved); } - private static class ZooFateIdStatus extends FateIdStatus { - - private final Supplier statusSupplier; - - public ZooFateIdStatus(long txid, Supplier statusSupplier) { - super(txid); - this.statusSupplier = statusSupplier; - } - - @Override - public TStatus getStatus() { - return statusSupplier.get(); - } - } - @Override protected Stream getTransactions() { try { return zk.getChildren(path).stream().map(strTxid -> { - long txid = parseTid(strTxid); - // Memoizing for two reasons. First the status may never be requested, so in the case avoid + // Memoizing for two reasons. First the status may never be requested, so in that case avoid // the lookup. Second, if its requested multiple times the result will always be consistent. - Supplier statusSupplier = Suppliers.memoize(() -> _getStatus(txid)); - return new ZooFateIdStatus(txid, statusSupplier); + Supplier statusSupplier = Suppliers.memoize(() -> _getStatus(parseTid(strTxid))); + return new FateIdStatus(parseTid(strTxid)) { + @Override + public TStatus getStatus() { + return statusSupplier.get(); + } + }; }); } catch (KeeperException | InterruptedException e) { throw new IllegalStateException(e); diff --git a/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java index 4a6712c03d2..26a66632e8a 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java @@ -70,20 +70,6 @@ public long create() { return tid; } - private static class AccumuoFateIdStatus extends FateIdStatus { - private final TStatus status; - - public AccumuoFateIdStatus(long txid, TStatus status) { - super(txid); - this.status = status; - } - - @Override - public TStatus getStatus() { - return status; - } - } - @Override protected Stream getTransactions() { try { @@ -91,9 +77,12 @@ protected Stream getTransactions() { scanner.setRange(new Range()); TxColumnFamily.STATUS_COLUMN.fetch(scanner); return scanner.stream().onClose(scanner::close).map(e -> { - long txid = parseTid(e.getKey().getRow().toString()); - TStatus status = TStatus.valueOf(e.getValue().toString()); - return new AccumuoFateIdStatus(txid, status); + return new FateIdStatus(parseTid(e.getKey().getRow().toString())) { + @Override + public TStatus getStatus() { + return TStatus.valueOf(e.getValue().toString()); + } + }; }); } catch (TableNotFoundException e) { throw new IllegalStateException(tableName + " not found!", e); From 138b78275937de29f47e0ccedab0d242f096c0e9 Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Thu, 4 Jan 2024 18:35:37 -0500 Subject: [PATCH 3/3] fix javadoc link --- .../src/main/java/org/apache/accumulo/core/fate/FateStore.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java index 7db5766e81b..b170b15a0da 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java @@ -85,7 +85,8 @@ interface FateTxStore extends ReadOnlyFateTxStore { * longer interact with it. * * @param deferTime time in millis to keep this transaction from being returned by - * {@link #runnable(java.util.concurrent.atomic.AtomicBoolean)}. Must be non-negative. + * {@link #runnable(java.util.concurrent.atomic.AtomicBoolean, java.util.function.LongConsumer)}. + * Must be non-negative. */ void unreserve(long deferTime); }