From 3adf613ba44297e4dae1ccef12659b14b8335e98 Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Wed, 17 Jan 2024 15:57:00 -0500 Subject: [PATCH 1/2] Exposes status when listing fate operations (#4169) The Accumulo store can very efficiently gather status the status at the same time as the id for a FATE transaction. This is already being gathered internally in implementation of the fate storage layer. This commit exposes this information further as it will be useful in refactoring the ageoff store. --- .../accumulo/core/fate/AbstractFateStore.java | 8 +++---- .../apache/accumulo/core/fate/AdminUtil.java | 3 ++- .../accumulo/core/fate/AgeOffStore.java | 4 ++-- .../accumulo/core/fate/ReadOnlyFateStore.java | 8 ++++++- .../apache/accumulo/core/fate/ZooStore.java | 2 +- .../core/fate/accumulo/AccumuloStore.java | 2 +- .../accumulo/core/logging/FateLogger.java | 2 +- .../accumulo/core/fate/AgeOffStoreTest.java | 21 ++++++++++++------- .../apache/accumulo/core/fate/TestStore.java | 14 +++++++++++-- 9 files changed, 43 insertions(+), 21 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java index 7125f692fef..d6cbf2780f8 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java @@ -171,8 +171,8 @@ public void runnable(AtomicBoolean keepWaiting, LongConsumer idConsumer) { } @Override - public Stream list() { - return getTransactions().map(fateIdStatus -> fateIdStatus.txid); + public Stream list() { + return getTransactions(); } @Override @@ -189,10 +189,10 @@ protected long parseTid(String txdir) { return Long.parseLong(txdir.split("_")[1], 16); } - public static abstract class FateIdStatus { + public static abstract class FateIdStatusBase implements FateIdStatus { private final long txid; - public FateIdStatus(long txid) { + public FateIdStatusBase(long txid) { this.txid = txid; } diff --git a/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java b/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java index bbb7f425720..85bc34141ce 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java @@ -36,6 +36,7 @@ import java.util.stream.Stream; import org.apache.accumulo.core.fate.FateStore.FateTxStore; +import org.apache.accumulo.core.fate.ReadOnlyFateStore.FateIdStatus; import org.apache.accumulo.core.fate.ReadOnlyFateStore.ReadOnlyFateTxStore; import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; import org.apache.accumulo.core.fate.zookeeper.FateLock; @@ -368,7 +369,7 @@ private FateStatus getTransactionStatus(Map statuses = new ArrayList<>(); fateStores.forEach((type, store) -> { - try (Stream tids = store.list()) { + try (Stream tids = store.list().map(FateIdStatus::getTxid)) { tids.forEach(tid -> { ReadOnlyFateTxStore txStore = store.read(tid); diff --git a/core/src/main/java/org/apache/accumulo/core/fate/AgeOffStore.java b/core/src/main/java/org/apache/accumulo/core/fate/AgeOffStore.java index c18b13f5832..9470de9e187 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/AgeOffStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/AgeOffStore.java @@ -129,7 +129,7 @@ public AgeOffStore(FateStore store, long ageOffTime, TimeSource timeSource) { // ELASTICITY_TODO need to rework how this class works so that it does not buffer everything in // memory. - List txids = store.list().collect(Collectors.toList()); + List txids = store.list().map(FateIdStatus::getTxid).collect(Collectors.toList()); for (Long txid : txids) { FateTxStore txStore = store.reserve(txid); try { @@ -203,7 +203,7 @@ public ReadOnlyFateTxStore read(long tid) { } @Override - public Stream list() { + public Stream list() { return store.list(); } diff --git a/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java index e525c89ff9c..c5f7a9027cc 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java @@ -118,12 +118,18 @@ interface ReadOnlyFateTxStore { long getID(); } + interface FateIdStatus { + long getTxid(); + + TStatus getStatus(); + } + /** * list all transaction ids in store. * * @return all outstanding transactions, including those reserved by others. */ - Stream list(); + Stream list(); /** * Finds all fate ops that are (IN_PROGRESS, SUBMITTED, or FAILED_IN_PROGRESS) and unreserved. Ids diff --git a/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java b/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java index b4ad365dfa3..31c299cf684 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java @@ -308,7 +308,7 @@ protected Stream getTransactions() { // Memoizing for two reasons. First the status may never be requested, so in that case avoid // the lookup. Second, if its requested multiple times the result will always be consistent. Supplier statusSupplier = Suppliers.memoize(() -> _getStatus(parseTid(strTxid))); - return new FateIdStatus(parseTid(strTxid)) { + return new FateIdStatusBase(parseTid(strTxid)) { @Override public TStatus getStatus() { return statusSupplier.get(); diff --git a/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java index 9b870537faa..4e81065f0c4 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java @@ -82,7 +82,7 @@ protected Stream getTransactions() { scanner.setRange(new Range()); TxColumnFamily.STATUS_COLUMN.fetch(scanner); return scanner.stream().onClose(scanner::close).map(e -> { - return new FateIdStatus(parseTid(e.getKey().getRow().toString())) { + return new FateIdStatusBase(parseTid(e.getKey().getRow().toString())) { @Override public TStatus getStatus() { return TStatus.valueOf(e.getValue().toString()); diff --git a/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java b/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java index bac25c321bb..ffd854bad4b 100644 --- a/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java +++ b/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java @@ -115,7 +115,7 @@ public ReadOnlyFateTxStore read(long tid) { } @Override - public Stream list() { + public Stream list() { return store.list(); } diff --git a/core/src/test/java/org/apache/accumulo/core/fate/AgeOffStoreTest.java b/core/src/test/java/org/apache/accumulo/core/fate/AgeOffStoreTest.java index 88447771ec9..5cc26716155 100644 --- a/core/src/test/java/org/apache/accumulo/core/fate/AgeOffStoreTest.java +++ b/core/src/test/java/org/apache/accumulo/core/fate/AgeOffStoreTest.java @@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit; import org.apache.accumulo.core.fate.AgeOffStore.TimeSource; +import org.apache.accumulo.core.fate.ReadOnlyFateStore.FateIdStatus; import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; import org.apache.zookeeper.KeeperException; import org.junit.jupiter.api.Test; @@ -75,19 +76,21 @@ public void testBasic() throws InterruptedException, KeeperException { aoStore.ageOff(); - assertEquals(Set.of(txid1, txid2, txid3, txid4), aoStore.list().collect(toSet())); + assertEquals(Set.of(txid1, txid2, txid3, txid4), + aoStore.list().map(FateIdStatus::getTxid).collect(toSet())); tts.time = 15; aoStore.ageOff(); - assertEquals(Set.of(txid1, txid3, txid4), aoStore.list().collect(toSet())); + assertEquals(Set.of(txid1, txid3, txid4), + aoStore.list().map(FateIdStatus::getTxid).collect(toSet())); tts.time = 30; aoStore.ageOff(); - assertEquals(Set.of(txid1), aoStore.list().collect(toSet())); + assertEquals(Set.of(txid1), aoStore.list().map(FateIdStatus::getTxid).collect(toSet())); } @Test @@ -117,17 +120,19 @@ public void testNonEmpty() throws InterruptedException, KeeperException { AgeOffStore aoStore = new AgeOffStore<>(testStore, 10, tts); - assertEquals(Set.of(txid1, txid2, txid3, txid4), aoStore.list().collect(toSet())); + assertEquals(Set.of(txid1, txid2, txid3, txid4), + aoStore.list().map(FateIdStatus::getTxid).collect(toSet())); aoStore.ageOff(); - assertEquals(Set.of(txid1, txid2, txid3, txid4), aoStore.list().collect(toSet())); + assertEquals(Set.of(txid1, txid2, txid3, txid4), + aoStore.list().map(FateIdStatus::getTxid).collect(toSet())); tts.time = 15; aoStore.ageOff(); - assertEquals(Set.of(txid1), aoStore.list().collect(toSet())); + assertEquals(Set.of(txid1), aoStore.list().map(FateIdStatus::getTxid).collect(toSet())); txStore1 = aoStore.reserve(txid1); txStore1.setStatus(TStatus.FAILED_IN_PROGRESS); @@ -137,7 +142,7 @@ public void testNonEmpty() throws InterruptedException, KeeperException { aoStore.ageOff(); - assertEquals(Set.of(txid1), aoStore.list().collect(toSet())); + assertEquals(Set.of(txid1), aoStore.list().map(FateIdStatus::getTxid).collect(toSet())); txStore1 = aoStore.reserve(txid1); txStore1.setStatus(TStatus.FAILED); @@ -145,7 +150,7 @@ public void testNonEmpty() throws InterruptedException, KeeperException { aoStore.ageOff(); - assertEquals(Set.of(txid1), aoStore.list().collect(toSet())); + assertEquals(Set.of(txid1), aoStore.list().map(FateIdStatus::getTxid).collect(toSet())); tts.time = 42; diff --git a/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java b/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java index 6dd6368d529..df1d711baef 100644 --- a/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java +++ b/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java @@ -167,8 +167,18 @@ public ReadOnlyFateTxStore read(long tid) { } @Override - public Stream list() { - return new ArrayList<>(statuses.keySet()).stream(); + public Stream list() { + return new ArrayList<>(statuses.entrySet()).stream().map(e -> new FateIdStatus() { + @Override + public long getTxid() { + return e.getKey(); + } + + @Override + public TStatus getStatus() { + return e.getValue(); + } + }); } @Override From be36cde2ad4ad7ce6b52a4ba5bbb25ee4aa498ff Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Wed, 17 Jan 2024 17:58:45 -0500 Subject: [PATCH 2/2] Stopped tracking fate age off data in memory (#4169) Replaced Fate AgeOffStore with FateCleaner. FateCleaner tracks fate operations that are candidates for age off in the fate storage layer instead of in memory. Also FateCleaner does not wrap a fate storage layer, it just takes one to track. fixes #4130 --- .../accumulo/core/fate/AbstractFateStore.java | 3 +- .../accumulo/core/fate/AgeOffStore.java | 214 -------------- .../org/apache/accumulo/core/fate/Fate.java | 2 +- .../accumulo/core/fate/FateCleaner.java | 136 +++++++++ .../accumulo/core/fate/AgeOffStoreTest.java | 161 ---------- .../accumulo/core/fate/FateCleanerTest.java | 274 ++++++++++++++++++ .../apache/accumulo/core/fate/TestStore.java | 14 +- .../org/apache/accumulo/manager/Manager.java | 11 +- .../test/fate/zookeeper/ZookeeperFateIT.java | 4 +- 9 files changed, 430 insertions(+), 389 deletions(-) delete mode 100644 core/src/main/java/org/apache/accumulo/core/fate/AgeOffStore.java create mode 100644 core/src/main/java/org/apache/accumulo/core/fate/FateCleaner.java delete mode 100644 core/src/test/java/org/apache/accumulo/core/fate/AgeOffStoreTest.java create mode 100644 core/src/test/java/org/apache/accumulo/core/fate/FateCleanerTest.java diff --git a/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java index d6cbf2780f8..396f30b474b 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java @@ -196,11 +196,10 @@ public FateIdStatusBase(long txid) { this.txid = txid; } + @Override public long getTxid() { return txid; } - - public abstract TStatus getStatus(); } protected abstract Stream getTransactions(); diff --git a/core/src/main/java/org/apache/accumulo/core/fate/AgeOffStore.java b/core/src/main/java/org/apache/accumulo/core/fate/AgeOffStore.java deleted file mode 100644 index 9470de9e187..00000000000 --- a/core/src/main/java/org/apache/accumulo/core/fate/AgeOffStore.java +++ /dev/null @@ -1,214 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.accumulo.core.fate; - -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Optional; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.LongConsumer; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * This store removes Repos, in the store it wraps, that are in a finished or new state for more - * than a configurable time period. - * - * No external time source is used. It starts tracking idle time when its created. - */ -public class AgeOffStore implements FateStore { - - public interface TimeSource { - long currentTimeMillis(); - } - - private static final Logger log = LoggerFactory.getLogger(AgeOffStore.class); - - private final FateStore store; - private Map candidates; - private long ageOffTime; - private long minTime; - private TimeSource timeSource; - - private synchronized void updateMinTime() { - minTime = Long.MAX_VALUE; - - for (Long time : candidates.values()) { - if (time < minTime) { - minTime = time; - } - } - } - - private synchronized void addCandidate(long txid) { - long time = timeSource.currentTimeMillis(); - candidates.put(txid, time); - if (time < minTime) { - minTime = time; - } - } - - private synchronized void removeCandidate(long txid) { - Long time = candidates.remove(txid); - if (time != null && time <= minTime) { - updateMinTime(); - } - } - - public void ageOff() { - HashSet oldTxs = new HashSet<>(); - - synchronized (this) { - long time = timeSource.currentTimeMillis(); - if (minTime < time && time - minTime >= ageOffTime) { - for (Entry entry : candidates.entrySet()) { - if (time - entry.getValue() >= ageOffTime) { - oldTxs.add(entry.getKey()); - } - } - - candidates.keySet().removeAll(oldTxs); - updateMinTime(); - } - } - - for (Long txid : oldTxs) { - try { - FateTxStore txStore = store.reserve(txid); - try { - switch (txStore.getStatus()) { - case NEW: - case FAILED: - case SUCCESSFUL: - txStore.delete(); - log.debug("Aged off FATE tx {}", FateTxId.formatTid(txid)); - break; - default: - break; - } - - } finally { - txStore.unreserve(0, TimeUnit.MILLISECONDS); - } - } catch (Exception e) { - log.warn("Failed to age off FATE tx " + FateTxId.formatTid(txid), e); - } - } - } - - public AgeOffStore(FateStore store, long ageOffTime, TimeSource timeSource) { - this.store = store; - this.ageOffTime = ageOffTime; - this.timeSource = timeSource; - candidates = new HashMap<>(); - - minTime = Long.MAX_VALUE; - - // ELASTICITY_TODO need to rework how this class works so that it does not buffer everything in - // memory. - List txids = store.list().map(FateIdStatus::getTxid).collect(Collectors.toList()); - for (Long txid : txids) { - FateTxStore txStore = store.reserve(txid); - try { - switch (txStore.getStatus()) { - case NEW: - case FAILED: - case SUCCESSFUL: - addCandidate(txid); - break; - default: - break; - } - } finally { - txStore.unreserve(0, TimeUnit.MILLISECONDS); - } - } - } - - @Override - public long create() { - long txid = store.create(); - addCandidate(txid); - return txid; - } - - @Override - public FateTxStore reserve(long tid) { - return new AgeOffFateTxStore(store.reserve(tid)); - } - - @Override - public Optional> tryReserve(long tid) { - return store.tryReserve(tid).map(AgeOffFateTxStore::new); - } - - private class AgeOffFateTxStore extends WrappedFateTxStore { - - private AgeOffFateTxStore(FateTxStore wrapped) { - super(wrapped); - } - - @Override - public void setStatus(FateStore.TStatus status) { - super.setStatus(status); - - switch (status) { - case SUBMITTED: - case IN_PROGRESS: - case FAILED_IN_PROGRESS: - removeCandidate(getID()); - break; - case FAILED: - case SUCCESSFUL: - addCandidate(getID()); - break; - default: - break; - } - } - - @Override - public void delete() { - super.delete(); - removeCandidate(getID()); - } - } - - @Override - public ReadOnlyFateTxStore read(long tid) { - return store.read(tid); - } - - @Override - public Stream list() { - return store.list(); - } - - @Override - public void runnable(AtomicBoolean keepWaiting, LongConsumer idConsumer) { - store.runnable(keepWaiting, idConsumer); - } -} diff --git a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java index 1a2c190e536..55845078475 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java @@ -77,7 +77,7 @@ public class Fate { private final Thread workFinder; public enum TxInfo { - TX_NAME, AUTO_CLEAN, EXCEPTION, RETURN_VALUE + TX_NAME, AUTO_CLEAN, EXCEPTION, TX_AGEOFF, RETURN_VALUE } /** diff --git a/core/src/main/java/org/apache/accumulo/core/fate/FateCleaner.java b/core/src/main/java/org/apache/accumulo/core/fate/FateCleaner.java new file mode 100644 index 00000000000..54b349858d1 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/fate/FateCleaner.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.fate; + +import java.time.Duration; +import java.util.EnumSet; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import org.apache.accumulo.core.fate.FateStore.FateTxStore; +import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +/** + * Removes Repos, in the Fate store it tracks, that are in a finished or new state for more than a + * configurable time period. This class stores data in the Fate store under the + * {@link org.apache.accumulo.core.fate.Fate.TxInfo#TX_AGEOFF} field. The data stored under this + * field is used to track fate transactions that are candidates for cleanup. + * + *

+ * No external time source is used. It starts tracking idle time when its created. + * + *

+ * The {@link #ageOff()} method on this class must be periodically called inorder to cleanup to + * happen. + */ +public class FateCleaner { + + public interface TimeSource { + long currentTimeNanos(); + } + + // Statuses that can be aged off if idle for a prolonged period. + private static final EnumSet AGE_OFF_STATUSES = + EnumSet.of(TStatus.NEW, TStatus.FAILED, TStatus.SUCCESSFUL); + + // This is used to determine if age off data was persisted by another instance of this object. + private final UUID instanceId = UUID.randomUUID(); + + private static final Logger log = LoggerFactory.getLogger(FateCleaner.class); + + private final FateStore store; + + private final long ageOffTime; + private final TimeSource timeSource; + + private static class AgeOffInfo { + final UUID instanceId; + final long setTime; + final TStatus status; + + public AgeOffInfo(String ageOffStr) { + var tokens = ageOffStr.split(":"); + Preconditions.checkArgument(tokens.length == 3, "Malformed input %s", ageOffStr); + instanceId = UUID.fromString(tokens[0]); + setTime = Long.parseLong(tokens[1]); + status = TStatus.valueOf(tokens[2]); + } + + public AgeOffInfo(UUID instanceId, long time, TStatus status) { + this.instanceId = instanceId; + this.setTime = time; + this.status = status; + } + + @Override + public String toString() { + return instanceId + ":" + setTime + ":" + status; + } + } + + private AgeOffInfo readAgeOffInfo(FateTxStore txStore) { + String ageOffStr = (String) txStore.getTransactionInfo(Fate.TxInfo.TX_AGEOFF); + if (ageOffStr == null) { + return null; + } + + return new AgeOffInfo(ageOffStr); + } + + private boolean shouldAgeOff(TStatus currStatus, AgeOffInfo ageOffInfo) { + return AGE_OFF_STATUSES.contains(currStatus) && currStatus == ageOffInfo.status + && ageOffInfo.instanceId.equals(instanceId) + && timeSource.currentTimeNanos() - ageOffInfo.setTime >= ageOffTime; + } + + public void ageOff() { + store.list().filter(ids -> AGE_OFF_STATUSES.contains(ids.getStatus())) + .forEach(idStatus -> store.tryReserve(idStatus.getTxid()).ifPresent(txStore -> { + try { + AgeOffInfo ageOffInfo = readAgeOffInfo(txStore); + TStatus currStatus = txStore.getStatus(); + if (ageOffInfo == null || !ageOffInfo.instanceId.equals(instanceId) + || currStatus != ageOffInfo.status) { + // set or reset the age off info because it does not exists or it exists but is no + // longer valid + var newAgeOffInfo = + new AgeOffInfo(instanceId, timeSource.currentTimeNanos(), currStatus); + txStore.setTransactionInfo(Fate.TxInfo.TX_AGEOFF, newAgeOffInfo.toString()); + log.trace("Set age off data {} {}", FateTxId.formatTid(idStatus.getTxid()), + newAgeOffInfo); + } else if (shouldAgeOff(currStatus, ageOffInfo)) { + txStore.delete(); + log.debug("Aged off FATE tx {}", FateTxId.formatTid(idStatus.getTxid())); + } + } finally { + txStore.unreserve(0, TimeUnit.MILLISECONDS); + } + })); + } + + public FateCleaner(FateStore store, Duration duration, TimeSource timeSource) { + this.store = store; + this.ageOffTime = duration.toNanos(); + this.timeSource = timeSource; + } +} diff --git a/core/src/test/java/org/apache/accumulo/core/fate/AgeOffStoreTest.java b/core/src/test/java/org/apache/accumulo/core/fate/AgeOffStoreTest.java deleted file mode 100644 index 5cc26716155..00000000000 --- a/core/src/test/java/org/apache/accumulo/core/fate/AgeOffStoreTest.java +++ /dev/null @@ -1,161 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.accumulo.core.fate; - -import static java.util.stream.Collectors.toSet; -import static org.junit.jupiter.api.Assertions.assertEquals; - -import java.util.Set; -import java.util.concurrent.TimeUnit; - -import org.apache.accumulo.core.fate.AgeOffStore.TimeSource; -import org.apache.accumulo.core.fate.ReadOnlyFateStore.FateIdStatus; -import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; -import org.apache.zookeeper.KeeperException; -import org.junit.jupiter.api.Test; - -public class AgeOffStoreTest { - - private static class TestTimeSource implements TimeSource { - long time = 0; - - @Override - public long currentTimeMillis() { - return time; - } - - } - - @Test - public void testBasic() throws InterruptedException, KeeperException { - - TestTimeSource tts = new TestTimeSource(); - TestStore testStore = new TestStore(); - AgeOffStore aoStore = new AgeOffStore<>(testStore, 10, tts); - - aoStore.ageOff(); - - long txid1 = aoStore.create(); - var txStore1 = aoStore.reserve(txid1); - txStore1.setStatus(TStatus.IN_PROGRESS); - txStore1.unreserve(0, TimeUnit.MILLISECONDS); - - aoStore.ageOff(); - - long txid2 = aoStore.create(); - var txStore2 = aoStore.reserve(txid2); - txStore2.setStatus(TStatus.IN_PROGRESS); - txStore2.setStatus(TStatus.FAILED); - txStore2.unreserve(0, TimeUnit.MILLISECONDS); - - tts.time = 6; - - long txid3 = aoStore.create(); - var txStore3 = aoStore.reserve(txid3); - txStore3.setStatus(TStatus.IN_PROGRESS); - txStore3.setStatus(TStatus.SUCCESSFUL); - txStore3.unreserve(0, TimeUnit.MILLISECONDS); - - Long txid4 = aoStore.create(); - - aoStore.ageOff(); - - assertEquals(Set.of(txid1, txid2, txid3, txid4), - aoStore.list().map(FateIdStatus::getTxid).collect(toSet())); - - tts.time = 15; - - aoStore.ageOff(); - - assertEquals(Set.of(txid1, txid3, txid4), - aoStore.list().map(FateIdStatus::getTxid).collect(toSet())); - - tts.time = 30; - - aoStore.ageOff(); - - assertEquals(Set.of(txid1), aoStore.list().map(FateIdStatus::getTxid).collect(toSet())); - } - - @Test - public void testNonEmpty() throws InterruptedException, KeeperException { - // test age off when source store starts off non empty - - TestTimeSource tts = new TestTimeSource(); - TestStore testStore = new TestStore(); - long txid1 = testStore.create(); - var txStore1 = testStore.reserve(txid1); - txStore1.setStatus(TStatus.IN_PROGRESS); - txStore1.unreserve(0, TimeUnit.MILLISECONDS); - - long txid2 = testStore.create(); - var txStore2 = testStore.reserve(txid2); - txStore2.setStatus(TStatus.IN_PROGRESS); - txStore2.setStatus(TStatus.FAILED); - txStore2.unreserve(0, TimeUnit.MILLISECONDS); - - long txid3 = testStore.create(); - var txStore3 = testStore.reserve(txid3); - txStore3.setStatus(TStatus.IN_PROGRESS); - txStore3.setStatus(TStatus.SUCCESSFUL); - txStore3.unreserve(0, TimeUnit.MILLISECONDS); - - Long txid4 = testStore.create(); - - AgeOffStore aoStore = new AgeOffStore<>(testStore, 10, tts); - - assertEquals(Set.of(txid1, txid2, txid3, txid4), - aoStore.list().map(FateIdStatus::getTxid).collect(toSet())); - - aoStore.ageOff(); - - assertEquals(Set.of(txid1, txid2, txid3, txid4), - aoStore.list().map(FateIdStatus::getTxid).collect(toSet())); - - tts.time = 15; - - aoStore.ageOff(); - - assertEquals(Set.of(txid1), aoStore.list().map(FateIdStatus::getTxid).collect(toSet())); - - txStore1 = aoStore.reserve(txid1); - txStore1.setStatus(TStatus.FAILED_IN_PROGRESS); - txStore1.unreserve(0, TimeUnit.MILLISECONDS); - - tts.time = 30; - - aoStore.ageOff(); - - assertEquals(Set.of(txid1), aoStore.list().map(FateIdStatus::getTxid).collect(toSet())); - - txStore1 = aoStore.reserve(txid1); - txStore1.setStatus(TStatus.FAILED); - txStore1.unreserve(0, TimeUnit.MILLISECONDS); - - aoStore.ageOff(); - - assertEquals(Set.of(txid1), aoStore.list().map(FateIdStatus::getTxid).collect(toSet())); - - tts.time = 42; - - aoStore.ageOff(); - - assertEquals(0, aoStore.list().count()); - } -} diff --git a/core/src/test/java/org/apache/accumulo/core/fate/FateCleanerTest.java b/core/src/test/java/org/apache/accumulo/core/fate/FateCleanerTest.java new file mode 100644 index 00000000000..ed851917e70 --- /dev/null +++ b/core/src/test/java/org/apache/accumulo/core/fate/FateCleanerTest.java @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.fate; + +import static java.util.stream.Collectors.toSet; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.time.Duration; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.accumulo.core.fate.FateCleaner.TimeSource; +import org.apache.accumulo.core.fate.ReadOnlyFateStore.FateIdStatus; +import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; +import org.apache.zookeeper.KeeperException; +import org.junit.jupiter.api.Test; + +public class FateCleanerTest { + + private static class TestTimeSource implements TimeSource { + long time = 0; + + @Override + public long currentTimeNanos() { + return time; + } + + } + + @Test + public void testBasic() throws InterruptedException, KeeperException { + + TestTimeSource tts = new TestTimeSource(); + TestStore testStore = new TestStore(); + FateCleaner cleaner = new FateCleaner<>(testStore, Duration.ofNanos(10), tts); + + cleaner.ageOff(); + + long txid1 = testStore.create(); + var txStore1 = testStore.reserve(txid1); + txStore1.setStatus(TStatus.IN_PROGRESS); + txStore1.unreserve(0, TimeUnit.MILLISECONDS); + + cleaner.ageOff(); + + long txid2 = testStore.create(); + var txStore2 = testStore.reserve(txid2); + txStore2.setStatus(TStatus.IN_PROGRESS); + txStore2.setStatus(TStatus.FAILED); + txStore2.unreserve(0, TimeUnit.MILLISECONDS); + + cleaner.ageOff(); + + tts.time = 6; + + long txid3 = testStore.create(); + var txStore3 = testStore.reserve(txid3); + txStore3.setStatus(TStatus.IN_PROGRESS); + txStore3.setStatus(TStatus.SUCCESSFUL); + txStore3.unreserve(0, TimeUnit.MILLISECONDS); + + cleaner.ageOff(); + + Long txid4 = testStore.create(); + + cleaner.ageOff(); + + assertEquals(Set.of(txid1, txid2, txid3, txid4), + testStore.list().map(FateIdStatus::getTxid).collect(toSet())); + + tts.time = 15; + + cleaner.ageOff(); + + assertEquals(Set.of(txid1, txid3, txid4), + testStore.list().map(FateIdStatus::getTxid).collect(toSet())); + + tts.time = 30; + + cleaner.ageOff(); + + assertEquals(Set.of(txid1), testStore.list().map(FateIdStatus::getTxid).collect(toSet())); + } + + @Test + public void testNonEmpty() { + // test age off when source store starts off non empty + + TestTimeSource tts = new TestTimeSource(); + TestStore testStore = new TestStore(); + long txid1 = testStore.create(); + var txStore1 = testStore.reserve(txid1); + txStore1.setStatus(TStatus.IN_PROGRESS); + txStore1.unreserve(0, TimeUnit.MILLISECONDS); + + long txid2 = testStore.create(); + var txStore2 = testStore.reserve(txid2); + txStore2.setStatus(TStatus.IN_PROGRESS); + txStore2.setStatus(TStatus.FAILED); + txStore2.unreserve(0, TimeUnit.MILLISECONDS); + + long txid3 = testStore.create(); + var txStore3 = testStore.reserve(txid3); + txStore3.setStatus(TStatus.IN_PROGRESS); + txStore3.setStatus(TStatus.SUCCESSFUL); + txStore3.unreserve(0, TimeUnit.MILLISECONDS); + + Long txid4 = testStore.create(); + + FateCleaner cleaner = new FateCleaner<>(testStore, Duration.ofNanos(10), tts); + cleaner.ageOff(); + + assertEquals(Set.of(txid1, txid2, txid3, txid4), + testStore.list().map(FateIdStatus::getTxid).collect(toSet())); + + cleaner.ageOff(); + + assertEquals(Set.of(txid1, txid2, txid3, txid4), + testStore.list().map(FateIdStatus::getTxid).collect(toSet())); + + tts.time = 15; + + cleaner.ageOff(); + + assertEquals(Set.of(txid1), testStore.list().map(FateIdStatus::getTxid).collect(toSet())); + + txStore1 = testStore.reserve(txid1); + txStore1.setStatus(TStatus.FAILED_IN_PROGRESS); + txStore1.unreserve(0, TimeUnit.MILLISECONDS); + + tts.time = 30; + + cleaner.ageOff(); + + assertEquals(Set.of(txid1), testStore.list().map(FateIdStatus::getTxid).collect(toSet())); + + txStore1 = testStore.reserve(txid1); + txStore1.setStatus(TStatus.FAILED); + txStore1.unreserve(0, TimeUnit.MILLISECONDS); + + cleaner.ageOff(); + + assertEquals(Set.of(txid1), testStore.list().map(FateIdStatus::getTxid).collect(toSet())); + + tts.time = 42; + + cleaner.ageOff(); + + assertEquals(0, testStore.list().count()); + } + + @Test + public void testStatusChange() { + // test ensure that if something is eligible for ageoff and its status changes it will no longer + // be eligible + + TestTimeSource tts = new TestTimeSource(); + TestStore testStore = new TestStore(); + FateCleaner cleaner = new FateCleaner<>(testStore, Duration.ofHours(10), tts); + + cleaner.ageOff(); + + // create a something in the NEW state + long txid1 = testStore.create(); + + // create another that is complete + long txid2 = testStore.create(); + var txStore2 = testStore.reserve(txid2); + txStore2.setStatus(TStatus.IN_PROGRESS); + txStore2.setStatus(TStatus.FAILED); + txStore2.unreserve(0, TimeUnit.MILLISECONDS); + + // create another in the NEW state + long txid3 = testStore.create(); + + // start tracking what can age off, both should be candidates + cleaner.ageOff(); + assertEquals(Set.of(txid1, txid2, txid3), + testStore.list().map(FateIdStatus::getTxid).collect(toSet())); + + // advance time by 9 hours, nothing should age off + tts.time += Duration.ofHours(9).toNanos(); + cleaner.ageOff(); + + assertEquals(Set.of(txid1, txid2, txid3), + testStore.list().map(FateIdStatus::getTxid).collect(toSet())); + + var txStore1 = testStore.reserve(txid1); + txStore1.setStatus(TStatus.IN_PROGRESS); + txStore1.setStatus(TStatus.FAILED); + txStore1.unreserve(0, TimeUnit.MILLISECONDS); + + // advance time by 2 hours, both should be able to age off.. however the status changed on txid1 + // so it should not age off + tts.time += Duration.ofHours(2).toNanos(); + cleaner.ageOff(); + + assertEquals(Set.of(txid1), testStore.list().map(FateIdStatus::getTxid).collect(toSet())); + + // advance time by 9 hours, nothing should age off + tts.time += Duration.ofHours(9).toNanos(); + cleaner.ageOff(); + assertEquals(Set.of(txid1), testStore.list().map(FateIdStatus::getTxid).collect(toSet())); + + // advance time by 2 hours, should age off everything + tts.time += Duration.ofHours(2).toNanos(); + cleaner.ageOff(); + assertEquals(Set.of(), testStore.list().map(FateIdStatus::getTxid).collect(toSet())); + } + + @Test + public void testNewCleaner() { + // this test ensures that a new cleaner instance ignores data from another cleaner instance + + TestTimeSource tts = new TestTimeSource(); + TestStore testStore = new TestStore(); + FateCleaner cleaner1 = new FateCleaner<>(testStore, Duration.ofHours(10), tts); + + long txid1 = testStore.create(); + + cleaner1.ageOff(); + assertEquals(Set.of(txid1), testStore.list().map(FateIdStatus::getTxid).collect(toSet())); + + tts.time += Duration.ofHours(5).toNanos(); + long txid2 = testStore.create(); + + cleaner1.ageOff(); + assertEquals(Set.of(txid1, txid2), + testStore.list().map(FateIdStatus::getTxid).collect(toSet())); + + tts.time += Duration.ofHours(6).toNanos(); + long txid3 = testStore.create(); + + cleaner1.ageOff(); + assertEquals(Set.of(txid2, txid3), + testStore.list().map(FateIdStatus::getTxid).collect(toSet())); + + // create a new cleaner, it should ignore any data stored by previous cleaner + FateCleaner cleaner2 = new FateCleaner<>(testStore, Duration.ofHours(10), tts); + + tts.time += Duration.ofHours(5).toNanos(); + // since this is a new cleaner instance, it should reset the clock + cleaner2.ageOff(); + assertEquals(Set.of(txid2, txid3), + testStore.list().map(FateIdStatus::getTxid).collect(toSet())); + + // since the clock was reset, advancing time should not age anything off + tts.time += Duration.ofHours(9).toNanos(); + cleaner2.ageOff(); + assertEquals(Set.of(txid2, txid3), + testStore.list().map(FateIdStatus::getTxid).collect(toSet())); + + // this should advance time enough to age everything off + tts.time += Duration.ofHours(2).toNanos(); + cleaner2.ageOff(); + assertEquals(Set.of(), testStore.list().map(FateIdStatus::getTxid).collect(toSet())); + } +} diff --git a/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java b/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java index df1d711baef..c07f51662d4 100644 --- a/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java +++ b/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java @@ -39,6 +39,7 @@ public class TestStore implements FateStore { private long nextId = 1; private Map statuses = new HashMap<>(); + private Map> txInfos = new HashMap<>(); private Set reserved = new HashSet<>(); @Override @@ -106,7 +107,12 @@ public TStatus waitForStatusChange(EnumSet expected) { @Override public Serializable getTransactionInfo(Fate.TxInfo txInfo) { - throw new UnsupportedOperationException(); + var submap = txInfos.get(tid); + if (submap == null) { + return null; + } + + return submap.get(txInfo); } @Override @@ -142,7 +148,11 @@ public void setStatus(TStatus status) { @Override public void setTransactionInfo(Fate.TxInfo txInfo, Serializable val) { - throw new UnsupportedOperationException(); + if (!reserved.contains(tid)) { + throw new IllegalStateException(); + } + + txInfos.computeIfAbsent(tid, t -> new HashMap<>()).put(txInfo, val); } @Override diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index 03aa217ed1a..d35a323cd6b 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -22,7 +22,6 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.Collections.emptyMap; import static java.util.Collections.emptySortedMap; -import static java.util.concurrent.TimeUnit.HOURS; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.concurrent.TimeUnit.NANOSECONDS; @@ -31,6 +30,7 @@ import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.net.UnknownHostException; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -71,8 +71,8 @@ import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.dataImpl.KeyExtent; -import org.apache.accumulo.core.fate.AgeOffStore; import org.apache.accumulo.core.fate.Fate; +import org.apache.accumulo.core.fate.FateCleaner; import org.apache.accumulo.core.fate.FateInstanceType; import org.apache.accumulo.core.fate.FateStore; import org.apache.accumulo.core.fate.ZooStore; @@ -1187,14 +1187,13 @@ boolean canSuspendTablets() { private Fate initializeFateInstance(ServerContext context, FateInstanceType type, FateStore store) { - final AgeOffStore ageOffStore = - new AgeOffStore<>(store, HOURS.toMillis(8), System::currentTimeMillis); final Fate fateInstance = - new Fate<>(this, ageOffStore, TraceRepo::toLogString, getConfiguration()); + new Fate<>(this, store, TraceRepo::toLogString, getConfiguration()); + var fateCleaner = new FateCleaner<>(store, Duration.ofHours(8), System::nanoTime); ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor() - .scheduleWithFixedDelay(ageOffStore::ageOff, 63000, 63000, MILLISECONDS)); + .scheduleWithFixedDelay(fateCleaner::ageOff, 10, 4 * 60, MINUTES)); return fateInstance; } diff --git a/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/ZookeeperFateIT.java b/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/ZookeeperFateIT.java index 3810030bb4a..c1948089b7c 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/ZookeeperFateIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/ZookeeperFateIT.java @@ -28,7 +28,6 @@ import java.util.UUID; import org.apache.accumulo.core.Constants; -import org.apache.accumulo.core.fate.AgeOffStore; import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; import org.apache.accumulo.core.fate.ZooStore; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; @@ -67,14 +66,13 @@ public static void teardown() throws Exception { @Override protected void executeTest(FateTestExecutor testMethod) throws Exception { final ZooStore zooStore = new ZooStore<>(ZK_ROOT + Constants.ZFATE, zk); - final AgeOffStore store = new AgeOffStore<>(zooStore, 3000, System::currentTimeMillis); ServerContext sctx = createMock(ServerContext.class); expect(sctx.getZooKeeperRoot()).andReturn(ZK_ROOT).anyTimes(); expect(sctx.getZooReaderWriter()).andReturn(zk).anyTimes(); replay(sctx); - testMethod.execute(store, sctx); + testMethod.execute(zooStore, sctx); } @Override