From 64925c562781e445c75ec9d5f5a834aa9c7232dd Mon Sep 17 00:00:00 2001 From: Kevin Rathbun Date: Wed, 24 Jan 2024 15:44:42 -0500 Subject: [PATCH 1/8] Initial commit for issue 4044 A WIP for #4044. The end goal is to have the stronger type FateId replace the current representation of a transaction id (which is just a long). This was brought about from the addition of the AccumuloStore class - there are now two fate instance types associated with a transaction - META (for ZooStore) or USER (for AccumuloStore). FateId is a new class which includes the FateInstanceType and the transaction id. Current changes: - FateTxId replaced with new class FateId - Started with changes in ReadOnlyFateStore and resolved the issues in other classes extending from changing this class. Still left TODO: - There are still related problems from the classes I have changed: I have not yet finished going down the entire chain of issues - Need to change Fate and the associated issues with changing this class - Need to change Repo and the associated issues with changing this class - More changes TBD --- .../accumulo/core/fate/AbstractFateStore.java | 88 +++++++------- .../accumulo/core/fate/AgeOffStore.java | 52 ++++----- .../org/apache/accumulo/core/fate/FateId.java | 107 ++++++++++++++++++ .../apache/accumulo/core/fate/FateStore.java | 24 ++-- .../apache/accumulo/core/fate/FateTxId.java | 70 ------------ .../accumulo/core/fate/ReadOnlyFateStore.java | 6 +- .../core/fate/WrappedFateTxStore.java | 2 +- .../apache/accumulo/core/fate/ZooStore.java | 50 ++++---- .../core/fate/accumulo/AccumuloStore.java | 55 +++++---- .../core/fate/accumulo/FateMutatorImpl.java | 12 +- .../accumulo/core/logging/FateLogger.java | 35 +++--- .../apache/accumulo/core/fate/TestStore.java | 57 +++++----- 12 files changed, 298 insertions(+), 260 deletions(-) create mode 100644 core/src/main/java/org/apache/accumulo/core/fate/FateId.java delete mode 100644 core/src/main/java/org/apache/accumulo/core/fate/FateTxId.java diff --git a/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java index 7125f692fef..bc576d6e874 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java @@ -51,8 +51,8 @@ public abstract class AbstractFateStore implements FateStore { private static final Logger log = LoggerFactory.getLogger(AbstractFateStore.class); - protected final Set reserved; - protected final Map deferred; + protected final Set reserved; + protected final Map deferred; // This is incremented each time a transaction was unreserved that was non new protected final SignalCount unreservedNonNewCount = new SignalCount(); @@ -90,26 +90,26 @@ public static Object deserialize(byte[] ser) { } /** - * Attempt to reserve transaction + * Attempt to reserve the fate transaction. * - * @param tid transaction id + * @param fateId The FateId * @return An Optional containing the FateTxStore if the transaction was successfully reserved, or * an empty Optional if the transaction was already reserved. */ @Override - public Optional> tryReserve(long tid) { + public Optional> tryReserve(FateId fateId) { synchronized (this) { - if (!reserved.contains(tid)) { - return Optional.of(reserve(tid)); + if (!reserved.contains(fateId)) { + return Optional.of(reserve(fateId)); } return Optional.empty(); } } @Override - public FateTxStore reserve(long tid) { + public FateTxStore reserve(FateId fateId) { synchronized (AbstractFateStore.this) { - while (reserved.contains(tid)) { + while (reserved.contains(fateId)) { try { AbstractFateStore.this.wait(100); } catch (InterruptedException e) { @@ -118,8 +118,8 @@ public FateTxStore reserve(long tid) { } } - reserved.add(tid); - return newFateTxStore(tid, true); + reserved.add(fateId); + return newFateTxStore(fateId, true); } } @@ -133,21 +133,21 @@ public void runnable(AtomicBoolean keepWaiting, LongConsumer idConsumer) { try (Stream transactions = getTransactions()) { transactions.filter(fateIdStatus -> isRunnable(fateIdStatus.getStatus())) - .mapToLong(FateIdStatus::getTxid).filter(txid -> { + .map(FateIdStatus::getFateId).filter(fateId -> { synchronized (AbstractFateStore.this) { - var deferredTime = deferred.get(txid); + var deferredTime = deferred.get(fateId); if (deferredTime != null) { if ((deferredTime - System.nanoTime()) >= 0) { return false; } else { - deferred.remove(txid); + deferred.remove(fateId); } } - return !reserved.contains(txid); + return !reserved.contains(fateId); } - }).forEach(txid -> { + }).forEach(fateId -> { seen.incrementAndGet(); - idConsumer.accept(txid); + idConsumer.accept(fateId.getTid()); }); } @@ -171,13 +171,13 @@ public void runnable(AtomicBoolean keepWaiting, LongConsumer idConsumer) { } @Override - public Stream list() { - return getTransactions().map(fateIdStatus -> fateIdStatus.txid); + public Stream list() { + return getTransactions().map(FateIdStatus::getFateId); } @Override - public ReadOnlyFateTxStore read(long tid) { - return newFateTxStore(tid, false); + public ReadOnlyFateTxStore read(FateId fateId) { + return newFateTxStore(fateId, false); } protected boolean isRunnable(TStatus status) { @@ -185,19 +185,15 @@ protected boolean isRunnable(TStatus status) { || status == TStatus.SUBMITTED; } - protected long parseTid(String txdir) { - return Long.parseLong(txdir.split("_")[1], 16); - } - public static abstract class FateIdStatus { - private final long txid; + private final FateId fateId; - public FateIdStatus(long txid) { - this.txid = txid; + public FateIdStatus(FateId fateId) { + this.fateId = fateId; } - public long getTxid() { - return txid; + public FateId getFateId() { + return fateId; } public abstract TStatus getStatus(); @@ -205,30 +201,30 @@ public long getTxid() { protected abstract Stream getTransactions(); - protected abstract TStatus _getStatus(long tid); + protected abstract TStatus _getStatus(FateId fateId); - protected abstract FateTxStore newFateTxStore(long tid, boolean isReserved); + protected abstract FateTxStore newFateTxStore(FateId fateId, boolean isReserved); protected abstract class AbstractFateTxStoreImpl implements FateTxStore { - protected final long tid; + protected final FateId fateId; protected final boolean isReserved; protected TStatus observedStatus = null; - protected AbstractFateTxStoreImpl(long tid, boolean isReserved) { - this.tid = tid; + protected AbstractFateTxStoreImpl(FateId fateId, boolean isReserved) { + this.fateId = fateId; this.isReserved = isReserved; } @Override public TStatus waitForStatusChange(EnumSet expected) { Preconditions.checkState(!isReserved, - "Attempted to wait for status change while reserved " + FateTxId.formatTid(getID())); + "Attempted to wait for status change while reserved " + fateId); while (true) { long countBefore = unreservedNonNewCount.getCount(); - TStatus status = _getStatus(tid); + TStatus status = _getStatus(fateId); if (expected.contains(status)) { return status; } @@ -246,16 +242,15 @@ public void unreserve(long deferTime, TimeUnit timeUnit) { } synchronized (AbstractFateStore.this) { - if (!reserved.remove(tid)) { - throw new IllegalStateException( - "Tried to unreserve id that was not reserved " + FateTxId.formatTid(tid)); + if (!reserved.remove(fateId)) { + throw new IllegalStateException("Tried to unreserve id that was not reserved " + fateId); } // notify any threads waiting to reserve AbstractFateStore.this.notifyAll(); if (deferTime > 0) { - deferred.put(tid, System.nanoTime() + deferTime); + deferred.put(fateId, System.nanoTime() + deferTime); } } @@ -275,9 +270,8 @@ protected void verifyReserved(boolean isWrite) { if (isReserved) { synchronized (AbstractFateStore.this) { - if (!reserved.contains(tid)) { - throw new IllegalStateException( - "Tried to operate on unreserved transaction " + FateTxId.formatTid(tid)); + if (!reserved.contains(fateId)) { + throw new IllegalStateException("Tried to operate on unreserved transaction " + fateId); } } } @@ -286,14 +280,14 @@ protected void verifyReserved(boolean isWrite) { @Override public TStatus getStatus() { verifyReserved(false); - var status = _getStatus(tid); + var status = _getStatus(fateId); observedStatus = status; return status; } @Override - public long getID() { - return tid; + public FateId getID() { + return fateId; } protected byte[] serializeTxInfo(Serializable so) { diff --git a/core/src/main/java/org/apache/accumulo/core/fate/AgeOffStore.java b/core/src/main/java/org/apache/accumulo/core/fate/AgeOffStore.java index c18b13f5832..34832d4ed44 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/AgeOffStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/AgeOffStore.java @@ -48,7 +48,7 @@ public interface TimeSource { private static final Logger log = LoggerFactory.getLogger(AgeOffStore.class); private final FateStore store; - private Map candidates; + private Map candidates; private long ageOffTime; private long minTime; private TimeSource timeSource; @@ -63,28 +63,28 @@ private synchronized void updateMinTime() { } } - private synchronized void addCandidate(long txid) { + private synchronized void addCandidate(FateId fateId) { long time = timeSource.currentTimeMillis(); - candidates.put(txid, time); + candidates.put(fateId, time); if (time < minTime) { minTime = time; } } - private synchronized void removeCandidate(long txid) { - Long time = candidates.remove(txid); + private synchronized void removeCandidate(FateId fateId) { + Long time = candidates.remove(fateId); if (time != null && time <= minTime) { updateMinTime(); } } public void ageOff() { - HashSet oldTxs = new HashSet<>(); + HashSet oldTxs = new HashSet<>(); synchronized (this) { long time = timeSource.currentTimeMillis(); if (minTime < time && time - minTime >= ageOffTime) { - for (Entry entry : candidates.entrySet()) { + for (Entry entry : candidates.entrySet()) { if (time - entry.getValue() >= ageOffTime) { oldTxs.add(entry.getKey()); } @@ -95,16 +95,16 @@ public void ageOff() { } } - for (Long txid : oldTxs) { + for (FateId oldTx : oldTxs) { try { - FateTxStore txStore = store.reserve(txid); + FateTxStore txStore = store.reserve(oldTx); try { switch (txStore.getStatus()) { case NEW: case FAILED: case SUCCESSFUL: txStore.delete(); - log.debug("Aged off FATE tx {}", FateTxId.formatTid(txid)); + log.debug("Aged off FATE tx {}", oldTx); break; default: break; @@ -114,7 +114,7 @@ public void ageOff() { txStore.unreserve(0, TimeUnit.MILLISECONDS); } } catch (Exception e) { - log.warn("Failed to age off FATE tx " + FateTxId.formatTid(txid), e); + log.warn("Failed to age off FATE tx " + oldTx, e); } } } @@ -129,15 +129,15 @@ public AgeOffStore(FateStore store, long ageOffTime, TimeSource timeSource) { // ELASTICITY_TODO need to rework how this class works so that it does not buffer everything in // memory. - List txids = store.list().collect(Collectors.toList()); - for (Long txid : txids) { - FateTxStore txStore = store.reserve(txid); + List fateIds = store.list().collect(Collectors.toList()); + for (FateId fateId : fateIds) { + FateTxStore txStore = store.reserve(fateId); try { switch (txStore.getStatus()) { case NEW: case FAILED: case SUCCESSFUL: - addCandidate(txid); + addCandidate(fateId); break; default: break; @@ -149,20 +149,20 @@ public AgeOffStore(FateStore store, long ageOffTime, TimeSource timeSource) { } @Override - public long create() { - long txid = store.create(); - addCandidate(txid); - return txid; + public FateId create() { + FateId fateId = store.create(); + addCandidate(fateId); + return fateId; } @Override - public FateTxStore reserve(long tid) { - return new AgeOffFateTxStore(store.reserve(tid)); + public FateTxStore reserve(FateId fateId) { + return new AgeOffFateTxStore(store.reserve(fateId)); } @Override - public Optional> tryReserve(long tid) { - return store.tryReserve(tid).map(AgeOffFateTxStore::new); + public Optional> tryReserve(FateId fateId) { + return store.tryReserve(fateId).map(AgeOffFateTxStore::new); } private class AgeOffFateTxStore extends WrappedFateTxStore { @@ -198,12 +198,12 @@ public void delete() { } @Override - public ReadOnlyFateTxStore read(long tid) { - return store.read(tid); + public ReadOnlyFateTxStore read(FateId fateId) { + return store.read(fateId); } @Override - public Stream list() { + public Stream list() { return store.list(); } diff --git a/core/src/main/java/org/apache/accumulo/core/fate/FateId.java b/core/src/main/java/org/apache/accumulo/core/fate/FateId.java new file mode 100644 index 00000000000..a45e5e6202d --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/fate/FateId.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.fate; + +import java.util.regex.Pattern; + +import org.apache.accumulo.core.data.AbstractId; +import org.apache.accumulo.core.util.FastFormat; + +/** + * A strongly typed FATE Transaction ID. This is used to uniquely identify a FATE transaction. + * Consists of its {@link FateInstanceType} and its transaction id (long). The canonical string is + * of the form "FATE:[FateInstanceType]:[hex long tid]" (without the brackets). + */ +public class FateId extends AbstractId { + + private static final String PREFIX = "FATE:"; + private static final Pattern PATTERN = + Pattern.compile("^" + PREFIX + "(USER|META)" + ":" + "[0-9a-fA-F]+$"); + + private FateId(String canonical) { + super(canonical); + } + + /** + * @return the {@link FateInstanceType} + */ + public FateInstanceType getType() { + return FateInstanceType.valueOf(canonical().split(":")[1]); + } + + /** + * @return the decimal value of the transaction id + */ + public long getTid() { + return Long.parseLong(getHexTid(), 16); + } + + /** + * @return the hexadecimal value of the transaction id + */ + public String getHexTid() { + return canonical().split(":")[2]; + } + + /** + * Creates a new FateId object from the given parameters + * + * @param type the {@link FateInstanceType} + * @param tid the decimal transaction id + * @return a new FateId object + */ + public static FateId from(FateInstanceType type, long tid) { + return new FateId(PREFIX + type + ":" + formatTid(tid)); + } + + /** + * Creates a new FateId object from the given string + * + * @param fateIdStr Should be of the form "FATE:[{@link FateInstanceType}]:[hex long tid]" + * (without the brackets) + * @return a new FateId object + */ + public static FateId from(String fateIdStr) { + return new FateId(validate(fateIdStr)); + } + + /** + * Validates that a fateIdStr is of the form "FATE:[{@link FateInstanceType}]:[hex long tid]" + * (without the brackets). + * + * @param fateIdStr The string to validate + * @return the given string, if the string is valid. An {@link IllegalArgumentException} is thrown + * otherwise. + */ + public static String validate(String fateIdStr) { + if (PATTERN.matcher(fateIdStr).matches()) { + return fateIdStr; + } else { + throw new IllegalArgumentException("Invalid FATE ID: " + fateIdStr); + } + } + + /** + * Formats transaction ids in a consistent way that is useful for logging and persisting. + */ + public static String formatTid(long tid) { + // do not change how this formats without considering implications for persistence + return FastFormat.toHexString(tid); + } +} diff --git a/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java index b5ccae52684..ef12316a611 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java @@ -25,19 +25,19 @@ /** * Transaction Store: a place to save transactions * - * A transaction consists of a number of operations. To use, first create a transaction id, and then - * seed the transaction with an initial operation. An executor service can then execute the + * A transaction consists of a number of operations. To use, first create a fate transaction id, and + * then seed the transaction with an initial operation. An executor service can then execute the * transaction's operation, possibly pushing more operations onto the transaction as each step * successfully completes. If a step fails, the stack can be unwound, undoing each operation. */ public interface FateStore extends ReadOnlyFateStore { /** - * Create a new transaction id + * Create a new fate transaction id * - * @return a transaction id + * @return a new FateId */ - long create(); + FateId create(); /** * An interface that allows read/write access to the data related to a single fate operation. @@ -93,20 +93,20 @@ interface FateTxStore extends ReadOnlyFateTxStore { } /** - * Attempt to reserve transaction + * Attempt to reserve the fate transaction. * - * @param tid transaction id + * @param fateId The FateId * @return true if reserved by this call, false if already reserved */ - Optional> tryReserve(long tid); + Optional> tryReserve(FateId fateId); /** - * Reserve the specific tid. + * Reserve the fate transaction. * - * Reserving a transaction id ensures that nothing else in-process interacting via the same - * instance will be operating on that transaction id. + * Reserving a fate transaction ensures that nothing else in-process interacting via the same + * instance will be operating on that fate transaction. * */ - FateTxStore reserve(long tid); + FateTxStore reserve(FateId fateId); } diff --git a/core/src/main/java/org/apache/accumulo/core/fate/FateTxId.java b/core/src/main/java/org/apache/accumulo/core/fate/FateTxId.java deleted file mode 100644 index 8468eda3558..00000000000 --- a/core/src/main/java/org/apache/accumulo/core/fate/FateTxId.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.accumulo.core.fate; - -import java.util.regex.Pattern; - -import org.apache.accumulo.core.util.FastFormat; - -import com.google.common.base.Preconditions; - -public class FateTxId { - - private static final String PREFIX = "FATE["; - private static final String SUFFIX = "]"; - - private final static Pattern PATTERN = - Pattern.compile(Pattern.quote(PREFIX) + "[0-9a-fA-F]+" + Pattern.quote(SUFFIX)); - - private static String getHex(String fmtTid) { - return fmtTid.substring(PREFIX.length(), fmtTid.length() - SUFFIX.length()); - } - - /** - * @return true if string was created by {@link #formatTid(long)} and false otherwise. - */ - public static boolean isFormatedTid(String fmtTid) { - return PATTERN.matcher(fmtTid).matches(); - } - - /** - * Reverses {@link #formatTid(long)} - */ - public static long fromString(String fmtTid) { - Preconditions.checkArgument(fmtTid.startsWith(PREFIX) && fmtTid.endsWith(SUFFIX)); - return Long.parseLong(getHex(fmtTid), 16); - } - - /** - * Formats transaction ids in a consistent way that is useful for logging and persisting. - */ - public static String formatTid(long tid) { - // do not change how this formats without considering implications for persistence - return FastFormat.toHexString(PREFIX, tid, SUFFIX); - } - - public static long parseTidFromUserInput(String s) { - if (isFormatedTid(s)) { - return fromString(s); - } else { - return Long.parseLong(s, 16); - } - } - -} diff --git a/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java index e525c89ff9c..fa733297133 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java @@ -56,7 +56,7 @@ enum TStatus { /** * Reads the data related to fate transaction without reserving it. */ - ReadOnlyFateTxStore read(long tid); + ReadOnlyFateTxStore read(FateId fateId); /** * Storage for an individual fate transaction @@ -115,7 +115,7 @@ interface ReadOnlyFateTxStore { /** * @return the id of the FATE transaction */ - long getID(); + FateId getID(); } /** @@ -123,7 +123,7 @@ interface ReadOnlyFateTxStore { * * @return all outstanding transactions, including those reserved by others. */ - Stream list(); + Stream list(); /** * Finds all fate ops that are (IN_PROGRESS, SUBMITTED, or FAILED_IN_PROGRESS) and unreserved. Ids diff --git a/core/src/main/java/org/apache/accumulo/core/fate/WrappedFateTxStore.java b/core/src/main/java/org/apache/accumulo/core/fate/WrappedFateTxStore.java index 1d8c7126c22..de103c7902b 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/WrappedFateTxStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/WrappedFateTxStore.java @@ -86,7 +86,7 @@ public long timeCreated() { } @Override - public long getID() { + public FateId getID() { return wrapped.getID(); } diff --git a/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java b/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java index b4ad365dfa3..485c4462a96 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java @@ -33,7 +33,6 @@ import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeMissingPolicy; -import org.apache.accumulo.core.util.FastFormat; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.NoNodeException; import org.apache.zookeeper.KeeperException.NodeExistsException; @@ -49,11 +48,12 @@ public class ZooStore extends AbstractFateStore { private static final Logger log = LoggerFactory.getLogger(ZooStore.class); + private static final FateInstanceType fateInstanceType = FateInstanceType.META; private String path; private ZooReaderWriter zk; - private String getTXPath(long tid) { - return FastFormat.toHexString(path + "/tx_", tid, ""); + private String getTXPath(FateId fateId) { + return path + "/tx_" + fateId.getHexTid(); } public ZooStore(String path, ZooReaderWriter zk) throws KeeperException, InterruptedException { @@ -70,14 +70,15 @@ public ZooStore(String path, ZooReaderWriter zk) throws KeeperException, Interru ZooStore() {} @Override - public long create() { + public FateId create() { while (true) { try { // looking at the code for SecureRandom, it appears to be thread safe long tid = RANDOM.get().nextLong() & 0x7fffffffffffffffL; - zk.putPersistentData(getTXPath(tid), TStatus.NEW.name().getBytes(UTF_8), + FateId fateId = FateId.from(fateInstanceType, tid); + zk.putPersistentData(getTXPath(fateId), TStatus.NEW.name().getBytes(UTF_8), NodeExistsPolicy.FAIL); - return tid; + return fateId; } catch (NodeExistsException nee) { // exist, so just try another random # } catch (KeeperException | InterruptedException e) { @@ -88,8 +89,8 @@ public long create() { private class FateTxStoreImpl extends AbstractFateTxStoreImpl { - private FateTxStoreImpl(long tid, boolean isReserved) { - super(tid, isReserved); + private FateTxStoreImpl(FateId fateId, boolean isReserved) { + super(fateId, isReserved); } private static final int RETRIES = 10; @@ -99,7 +100,7 @@ public Repo top() { verifyReserved(false); for (int i = 0; i < RETRIES; i++) { - String txpath = getTXPath(tid); + String txpath = getTXPath(fateId); try { String top; try { @@ -150,7 +151,7 @@ private String findTop(String txpath) throws KeeperException, InterruptedExcepti public void push(Repo repo) throws StackOverflowException { verifyReserved(true); - String txpath = getTXPath(tid); + String txpath = getTXPath(fateId); try { String top = findTop(txpath); if (top != null && Long.parseLong(top.split("_")[1]) > 100) { @@ -170,10 +171,10 @@ public void pop() { verifyReserved(true); try { - String txpath = getTXPath(tid); + String txpath = getTXPath(fateId); String top = findTop(txpath); if (top == null) { - throw new IllegalStateException("Tried to pop when empty " + FateTxId.formatTid(tid)); + throw new IllegalStateException("Tried to pop when empty " + fateId); } zk.recursiveDelete(txpath + "/" + top, NodeMissingPolicy.SKIP); } catch (KeeperException | InterruptedException e) { @@ -186,7 +187,7 @@ public void setStatus(TStatus status) { verifyReserved(true); try { - zk.putPersistentData(getTXPath(tid), status.name().getBytes(UTF_8), + zk.putPersistentData(getTXPath(fateId), status.name().getBytes(UTF_8), NodeExistsPolicy.OVERWRITE); } catch (KeeperException | InterruptedException e) { throw new IllegalStateException(e); @@ -200,7 +201,7 @@ public void delete() { verifyReserved(true); try { - zk.recursiveDelete(getTXPath(tid), NodeMissingPolicy.SKIP); + zk.recursiveDelete(getTXPath(fateId), NodeMissingPolicy.SKIP); } catch (KeeperException | InterruptedException e) { throw new IllegalStateException(e); } @@ -211,7 +212,7 @@ public void setTransactionInfo(Fate.TxInfo txInfo, Serializable so) { verifyReserved(true); try { - zk.putPersistentData(getTXPath(tid) + "/" + txInfo, serializeTxInfo(so), + zk.putPersistentData(getTXPath(fateId) + "/" + txInfo, serializeTxInfo(so), NodeExistsPolicy.OVERWRITE); } catch (KeeperException | InterruptedException e2) { throw new IllegalStateException(e2); @@ -223,7 +224,7 @@ public Serializable getTransactionInfo(Fate.TxInfo txInfo) { verifyReserved(false); try { - return deserializeTxInfo(txInfo, zk.getData(getTXPath(tid) + "/" + txInfo)); + return deserializeTxInfo(txInfo, zk.getData(getTXPath(fateId) + "/" + txInfo)); } catch (NoNodeException nne) { return null; } catch (KeeperException | InterruptedException e) { @@ -236,7 +237,7 @@ public long timeCreated() { verifyReserved(false); try { - Stat stat = zk.getZooKeeper().exists(getTXPath(tid), false); + Stat stat = zk.getZooKeeper().exists(getTXPath(fateId), false); return stat.getCtime(); } catch (Exception e) { return 0; @@ -246,7 +247,7 @@ public long timeCreated() { @Override public List> getStack() { verifyReserved(false); - String txpath = getTXPath(tid); + String txpath = getTXPath(fateId); outer: while (true) { List ops; @@ -286,9 +287,9 @@ public List> getStack() { } @Override - protected TStatus _getStatus(long tid) { + protected TStatus _getStatus(FateId fateId) { try { - return TStatus.valueOf(new String(zk.getData(getTXPath(tid)), UTF_8)); + return TStatus.valueOf(new String(zk.getData(getTXPath(fateId)), UTF_8)); } catch (NoNodeException nne) { return TStatus.UNKNOWN; } catch (KeeperException | InterruptedException e) { @@ -297,18 +298,19 @@ protected TStatus _getStatus(long tid) { } @Override - protected FateTxStore newFateTxStore(long tid, boolean isReserved) { - return new FateTxStoreImpl(tid, isReserved); + protected FateTxStore newFateTxStore(FateId fateId, boolean isReserved) { + return new FateTxStoreImpl(fateId, isReserved); } @Override protected Stream getTransactions() { try { return zk.getChildren(path).stream().map(strTxid -> { + FateId fateId = FateId.from("FATE:" + fateInstanceType + ":" + strTxid); // Memoizing for two reasons. First the status may never be requested, so in that case avoid // the lookup. Second, if its requested multiple times the result will always be consistent. - Supplier statusSupplier = Suppliers.memoize(() -> _getStatus(parseTid(strTxid))); - return new FateIdStatus(parseTid(strTxid)) { + Supplier statusSupplier = Suppliers.memoize(() -> _getStatus(fateId)); + return new FateIdStatus(fateId) { @Override public TStatus getStatus() { return statusSupplier.get(); diff --git a/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java index 9b870537faa..643af274802 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java @@ -34,6 +34,8 @@ import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.fate.AbstractFateStore; import org.apache.accumulo.core.fate.Fate.TxInfo; +import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.FateInstanceType; import org.apache.accumulo.core.fate.ReadOnlyRepo; import org.apache.accumulo.core.fate.Repo; import org.apache.accumulo.core.fate.StackOverflowException; @@ -43,7 +45,6 @@ import org.apache.accumulo.core.metadata.AccumuloTable; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.util.ColumnFQ; -import org.apache.accumulo.core.util.FastFormat; import org.apache.hadoop.io.Text; import com.google.common.base.Preconditions; @@ -53,6 +54,7 @@ public class AccumuloStore extends AbstractFateStore { private final ClientContext context; private final String tableName; + private static final FateInstanceType fateInstanceType = FateInstanceType.USER; private static final int maxRepos = 100; private static final com.google.common.collect.Range REPO_RANGE = com.google.common.collect.Range.closed(1, maxRepos); @@ -67,12 +69,13 @@ public AccumuloStore(ClientContext context) { } @Override - public long create() { + public FateId create() { long tid = RANDOM.get().nextLong() & 0x7fffffffffffffffL; + FateId fateId = FateId.from(fateInstanceType, tid); - newMutator(tid).putStatus(TStatus.NEW).putCreateTime(System.currentTimeMillis()).mutate(); + newMutator(fateId).putStatus(TStatus.NEW).putCreateTime(System.currentTimeMillis()).mutate(); - return tid; + return fateId; } @Override @@ -82,7 +85,9 @@ protected Stream getTransactions() { scanner.setRange(new Range()); TxColumnFamily.STATUS_COLUMN.fetch(scanner); return scanner.stream().onClose(scanner::close).map(e -> { - return new FateIdStatus(parseTid(e.getKey().getRow().toString())) { + FateId fateId = + FateId.from("FATE:" + fateInstanceType + ":" + e.getKey().getRow().toString()); + return new FateIdStatus(fateId) { @Override public TStatus getStatus() { return TStatus.valueOf(e.getValue().toString()); @@ -95,9 +100,9 @@ public TStatus getStatus() { } @Override - protected TStatus _getStatus(long tid) { + protected TStatus _getStatus(FateId fateId) { return scanTx(scanner -> { - scanner.setRange(getRow(tid)); + scanner.setRange(getRow(fateId)); TxColumnFamily.STATUS_COLUMN.fetch(scanner); return scanner.stream().map(e -> TStatus.valueOf(e.getValue().toString())).findFirst() .orElse(TStatus.UNKNOWN); @@ -105,16 +110,16 @@ protected TStatus _getStatus(long tid) { } @Override - protected FateTxStore newFateTxStore(long tid, boolean isReserved) { - return new FateTxStoreImpl(tid, isReserved); + protected FateTxStore newFateTxStore(FateId fateId, boolean isReserved) { + return new FateTxStoreImpl(fateId, isReserved); } - static Range getRow(long tid) { - return new Range("tx_" + FastFormat.toHexString(tid)); + static Range getRow(FateId fateId) { + return new Range("tx_" + fateId.getHexTid()); } - private FateMutatorImpl newMutator(long tid) { - return new FateMutatorImpl<>(context, tableName, tid); + private FateMutatorImpl newMutator(FateId fateId) { + return new FateMutatorImpl<>(context, tableName, fateId); } private R scanTx(Function func) { @@ -127,8 +132,8 @@ private R scanTx(Function func) { private class FateTxStoreImpl extends AbstractFateTxStoreImpl { - private FateTxStoreImpl(long tid, boolean isReserved) { - super(tid, isReserved); + private FateTxStoreImpl(FateId fateId, boolean isReserved) { + super(fateId, isReserved); } @Override @@ -136,7 +141,7 @@ public Repo top() { verifyReserved(false); return scanTx(scanner -> { - scanner.setRange(getRow(tid)); + scanner.setRange(getRow(fateId)); scanner.setBatchSize(1); scanner.fetchColumnFamily(RepoColumnFamily.NAME); return scanner.stream().map(e -> { @@ -152,7 +157,7 @@ public List> getStack() { verifyReserved(false); return scanTx(scanner -> { - scanner.setRange(getRow(tid)); + scanner.setRange(getRow(fateId)); scanner.fetchColumnFamily(RepoColumnFamily.NAME); return scanner.stream().map(e -> { @SuppressWarnings("unchecked") @@ -167,7 +172,7 @@ public Serializable getTransactionInfo(TxInfo txInfo) { verifyReserved(false); try (Scanner scanner = context.createScanner(tableName, Authorizations.EMPTY)) { - scanner.setRange(getRow(tid)); + scanner.setRange(getRow(fateId)); final ColumnFQ cq; switch (txInfo) { @@ -200,7 +205,7 @@ public long timeCreated() { verifyReserved(false); return scanTx(scanner -> { - scanner.setRange(getRow(tid)); + scanner.setRange(getRow(fateId)); TxColumnFamily.CREATE_TIME_COLUMN.fetch(scanner); return scanner.stream().map(e -> Long.parseLong(e.getValue().toString())).findFirst() .orElse(0L); @@ -217,7 +222,7 @@ public void push(Repo repo) throws StackOverflowException { throw new StackOverflowException("Repo stack size too large"); } - FateMutator fateMutator = newMutator(tid); + FateMutator fateMutator = newMutator(fateId); fateMutator.putRepo(top.map(t -> t + 1).orElse(1), repo).mutate(); } @@ -226,14 +231,14 @@ public void pop() { verifyReserved(true); Optional top = findTop(); - top.ifPresent(t -> newMutator(tid).deleteRepo(t).mutate()); + top.ifPresent(t -> newMutator(fateId).deleteRepo(t).mutate()); } @Override public void setStatus(TStatus status) { verifyReserved(true); - newMutator(tid).putStatus(status).mutate(); + newMutator(fateId).putStatus(status).mutate(); observedStatus = status; } @@ -241,7 +246,7 @@ public void setStatus(TStatus status) { public void setTransactionInfo(TxInfo txInfo, Serializable so) { verifyReserved(true); - FateMutator fateMutator = newMutator(tid); + FateMutator fateMutator = newMutator(fateId); final byte[] serialized = serializeTxInfo(so); switch (txInfo) { @@ -268,12 +273,12 @@ public void setTransactionInfo(TxInfo txInfo, Serializable so) { public void delete() { verifyReserved(true); - newMutator(tid).delete().mutate(); + newMutator(fateId).delete().mutate(); } private Optional findTop() { return scanTx(scanner -> { - scanner.setRange(getRow(tid)); + scanner.setRange(getRow(fateId)); scanner.setBatchSize(1); scanner.fetchColumnFamily(RepoColumnFamily.NAME); return scanner.stream().map(e -> restoreRepo(e.getKey().getColumnQualifier())).findFirst(); diff --git a/core/src/main/java/org/apache/accumulo/core/fate/accumulo/FateMutatorImpl.java b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/FateMutatorImpl.java index b605b910972..a4a257a936a 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/accumulo/FateMutatorImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/FateMutatorImpl.java @@ -31,27 +31,27 @@ import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.fate.Fate.TxInfo; +import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; import org.apache.accumulo.core.fate.Repo; import org.apache.accumulo.core.fate.accumulo.schema.FateSchema.RepoColumnFamily; import org.apache.accumulo.core.fate.accumulo.schema.FateSchema.TxColumnFamily; import org.apache.accumulo.core.fate.accumulo.schema.FateSchema.TxInfoColumnFamily; import org.apache.accumulo.core.security.Authorizations; -import org.apache.accumulo.core.util.FastFormat; import org.apache.hadoop.io.Text; public class FateMutatorImpl implements FateMutator { private final ClientContext context; private final String tableName; - private final long tid; + private final FateId fateId; private final Mutation mutation; - FateMutatorImpl(ClientContext context, String tableName, long tid) { + FateMutatorImpl(ClientContext context, String tableName, FateId fateId) { this.context = Objects.requireNonNull(context); this.tableName = Objects.requireNonNull(tableName); - this.tid = tid; - this.mutation = new Mutation(new Text("tx_" + FastFormat.toHexString(tid))); + this.fateId = fateId; + this.mutation = new Mutation(new Text("tx_" + fateId.getHexTid())); } @Override @@ -123,7 +123,7 @@ public FateMutator deleteRepo(int position) { public FateMutator delete() { try (Scanner scanner = context.createScanner(tableName, Authorizations.EMPTY)) { - scanner.setRange(getRow(tid)); + scanner.setRange(getRow(fateId)); scanner.forEach( (key, value) -> mutation.putDelete(key.getColumnFamily(), key.getColumnQualifier())); } catch (TableNotFoundException e) { diff --git a/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java b/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java index bac25c321bb..f6f03207d86 100644 --- a/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java +++ b/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java @@ -18,8 +18,6 @@ */ package org.apache.accumulo.core.logging; -import static org.apache.accumulo.core.fate.FateTxId.formatTid; - import java.io.Serializable; import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; @@ -28,6 +26,7 @@ import java.util.stream.Stream; import org.apache.accumulo.core.fate.Fate; +import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.FateStore; import org.apache.accumulo.core.fate.FateStore.FateTxStore; import org.apache.accumulo.core.fate.ReadOnlyFateStore; @@ -57,7 +56,7 @@ private LoggingFateTxStore(FateTxStore wrapped, Function,String> toLo public void push(Repo repo) throws StackOverflowException { super.push(repo); if (storeLog.isTraceEnabled()) { - storeLog.trace("{} pushed {}", formatTid(getID()), toLogString.apply(repo)); + storeLog.trace("{} pushed {}", getID(), toLogString.apply(repo)); } } @@ -65,7 +64,7 @@ public void push(Repo repo) throws StackOverflowException { public void pop() { super.pop(); if (storeLog.isTraceEnabled()) { - storeLog.trace("{} popped", formatTid(getID())); + storeLog.trace("{} popped", getID()); } } @@ -73,7 +72,7 @@ public void pop() { public void setStatus(ReadOnlyFateStore.TStatus status) { super.setStatus(status); if (storeLog.isTraceEnabled()) { - storeLog.trace("{} setStatus to {}", formatTid(getID()), status); + storeLog.trace("{} setStatus to {}", getID(), status); } } @@ -81,7 +80,7 @@ public void setStatus(ReadOnlyFateStore.TStatus status) { public void setTransactionInfo(Fate.TxInfo txInfo, Serializable val) { super.setTransactionInfo(txInfo, val); if (storeLog.isTraceEnabled()) { - storeLog.trace("{} setting {} to {}", formatTid(getID()), txInfo, val); + storeLog.trace("{} setting {} to {}", getID(), txInfo, val); } } @@ -89,7 +88,7 @@ public void setTransactionInfo(Fate.TxInfo txInfo, Serializable val) { public void delete() { super.delete(); if (storeLog.isTraceEnabled()) { - storeLog.trace("{} deleted fate transaction", formatTid(getID())); + storeLog.trace("{} deleted fate transaction", getID()); } } } @@ -100,22 +99,22 @@ public static FateStore wrap(FateStore store, Function,String> return new FateStore<>() { @Override - public FateTxStore reserve(long tid) { - return new LoggingFateTxStore<>(store.reserve(tid), toLogString); + public FateTxStore reserve(FateId fateId) { + return new LoggingFateTxStore<>(store.reserve(fateId), toLogString); } @Override - public Optional> tryReserve(long tid) { - return store.tryReserve(tid).map(ftxs -> new LoggingFateTxStore<>(ftxs, toLogString)); + public Optional> tryReserve(FateId fateId) { + return store.tryReserve(fateId).map(ftxs -> new LoggingFateTxStore<>(ftxs, toLogString)); } @Override - public ReadOnlyFateTxStore read(long tid) { - return store.read(tid); + public ReadOnlyFateTxStore read(FateId fateId) { + return store.read(fateId); } @Override - public Stream list() { + public Stream list() { return store.list(); } @@ -125,12 +124,12 @@ public void runnable(AtomicBoolean keepWaiting, LongConsumer idConsumer) { } @Override - public long create() { - long tid = store.create(); + public FateId create() { + FateId fateId = store.create(); if (storeLog.isTraceEnabled()) { - storeLog.trace("{} created fate transaction", formatTid(tid)); + storeLog.trace("{} created fate transaction", fateId); } - return tid; + return fateId; } }; } diff --git a/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java b/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java index 6dd6368d529..aba82713117 100644 --- a/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java +++ b/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java @@ -38,31 +38,32 @@ public class TestStore implements FateStore { private long nextId = 1; - private Map statuses = new HashMap<>(); - private Set reserved = new HashSet<>(); + private Map statuses = new HashMap<>(); + private Set reserved = new HashSet<>(); @Override - public long create() { - statuses.put(nextId, TStatus.NEW); - return nextId++; + public FateId create() { + FateId fateId = FateId.from(FateInstanceType.USER, nextId++); + statuses.put(fateId, TStatus.NEW); + return fateId; } @Override - public FateTxStore reserve(long tid) { - if (reserved.contains(tid)) { + public FateTxStore reserve(FateId fateId) { + if (reserved.contains(fateId)) { throw new IllegalStateException(); // zoo store would wait, but do not expect test to reserve } // twice... if test change, then change this - reserved.add(tid); - return new TestFateTxStore(tid); + reserved.add(fateId); + return new TestFateTxStore(fateId); } @Override - public Optional> tryReserve(long tid) { + public Optional> tryReserve(FateId fateId) { synchronized (this) { - if (!reserved.contains(tid)) { - reserve(tid); - return Optional.of(new TestFateTxStore(tid)); + if (!reserved.contains(fateId)) { + reserve(fateId); + return Optional.of(new TestFateTxStore(fateId)); } return Optional.empty(); } @@ -70,10 +71,10 @@ public Optional> tryReserve(long tid) { private class TestFateTxStore implements FateTxStore { - private final long tid; + private final FateId fateId; - TestFateTxStore(long tid) { - this.tid = tid; + TestFateTxStore(FateId fateId) { + this.fateId = fateId; } @Override @@ -88,11 +89,11 @@ public List> getStack() { @Override public TStatus getStatus() { - if (!reserved.contains(tid)) { + if (!reserved.contains(fateId)) { throw new IllegalStateException(); } - TStatus status = statuses.get(tid); + TStatus status = statuses.get(fateId); if (status == null) { return TStatus.UNKNOWN; } @@ -115,8 +116,8 @@ public long timeCreated() { } @Override - public long getID() { - return tid; + public FateId getID() { + return fateId; } @Override @@ -131,13 +132,13 @@ public void pop() { @Override public void setStatus(TStatus status) { - if (!reserved.contains(tid)) { + if (!reserved.contains(fateId)) { throw new IllegalStateException(); } - if (!statuses.containsKey(tid)) { + if (!statuses.containsKey(fateId)) { throw new IllegalStateException(); } - statuses.put(tid, status); + statuses.put(fateId, status); } @Override @@ -147,27 +148,27 @@ public void setTransactionInfo(Fate.TxInfo txInfo, Serializable val) { @Override public void delete() { - if (!reserved.contains(tid)) { + if (!reserved.contains(fateId)) { throw new IllegalStateException(); } - statuses.remove(tid); + statuses.remove(fateId); } @Override public void unreserve(long deferTime, TimeUnit timeUnit) { - if (!reserved.remove(tid)) { + if (!reserved.remove(fateId)) { throw new IllegalStateException(); } } } @Override - public ReadOnlyFateTxStore read(long tid) { + public ReadOnlyFateTxStore read(FateId fateId) { throw new UnsupportedOperationException(); } @Override - public Stream list() { + public Stream list() { return new ArrayList<>(statuses.keySet()).stream(); } From 25e17ab7c3862d1aab2820b5f4bbc889f52177d0 Mon Sep 17 00:00:00 2001 From: Kevin Rathbun Date: Thu, 25 Jan 2024 10:41:52 -0500 Subject: [PATCH 2/8] runnable() sig changed, FateId.from() sig changed --- .../accumulo/core/fate/AbstractFateStore.java | 6 ++-- .../accumulo/core/fate/AgeOffStore.java | 4 +-- .../org/apache/accumulo/core/fate/FateId.java | 29 +++++-------------- .../apache/accumulo/core/fate/FateStore.java | 2 +- .../accumulo/core/fate/ReadOnlyFateStore.java | 4 +-- .../apache/accumulo/core/fate/ZooStore.java | 2 +- .../core/fate/accumulo/AccumuloStore.java | 3 +- .../accumulo/core/logging/FateLogger.java | 4 +-- .../apache/accumulo/core/fate/TestStore.java | 8 +++-- 9 files changed, 25 insertions(+), 37 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java index bc576d6e874..8f0c062504b 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java @@ -36,7 +36,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import java.util.function.LongConsumer; +import java.util.function.Consumer; import java.util.stream.Stream; import org.apache.accumulo.core.fate.Fate.TxInfo; @@ -124,7 +124,7 @@ public FateTxStore reserve(FateId fateId) { } @Override - public void runnable(AtomicBoolean keepWaiting, LongConsumer idConsumer) { + public void runnable(AtomicBoolean keepWaiting, Consumer idConsumer) { AtomicLong seen = new AtomicLong(0); @@ -147,7 +147,7 @@ public void runnable(AtomicBoolean keepWaiting, LongConsumer idConsumer) { } }).forEach(fateId -> { seen.incrementAndGet(); - idConsumer.accept(fateId.getTid()); + idConsumer.accept(fateId); }); } diff --git a/core/src/main/java/org/apache/accumulo/core/fate/AgeOffStore.java b/core/src/main/java/org/apache/accumulo/core/fate/AgeOffStore.java index 34832d4ed44..da36838d47a 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/AgeOffStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/AgeOffStore.java @@ -26,7 +26,7 @@ import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.LongConsumer; +import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -208,7 +208,7 @@ public Stream list() { } @Override - public void runnable(AtomicBoolean keepWaiting, LongConsumer idConsumer) { + public void runnable(AtomicBoolean keepWaiting, Consumer idConsumer) { store.runnable(keepWaiting, idConsumer); } } diff --git a/core/src/main/java/org/apache/accumulo/core/fate/FateId.java b/core/src/main/java/org/apache/accumulo/core/fate/FateId.java index a45e5e6202d..9e3ccf5080f 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/FateId.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/FateId.java @@ -31,8 +31,6 @@ public class FateId extends AbstractId { private static final String PREFIX = "FATE:"; - private static final Pattern PATTERN = - Pattern.compile("^" + PREFIX + "(USER|META)" + ":" + "[0-9a-fA-F]+$"); private FateId(String canonical) { super(canonical); @@ -71,29 +69,18 @@ public static FateId from(FateInstanceType type, long tid) { } /** - * Creates a new FateId object from the given string + * Creates a new FateId object from the given parameters * - * @param fateIdStr Should be of the form "FATE:[{@link FateInstanceType}]:[hex long tid]" - * (without the brackets) + * @param type the {@link FateInstanceType} + * @param hexTid the hexadecimal transaction id * @return a new FateId object */ - public static FateId from(String fateIdStr) { - return new FateId(validate(fateIdStr)); - } - - /** - * Validates that a fateIdStr is of the form "FATE:[{@link FateInstanceType}]:[hex long tid]" - * (without the brackets). - * - * @param fateIdStr The string to validate - * @return the given string, if the string is valid. An {@link IllegalArgumentException} is thrown - * otherwise. - */ - public static String validate(String fateIdStr) { - if (PATTERN.matcher(fateIdStr).matches()) { - return fateIdStr; + public static FateId from(FateInstanceType type, String hexTid) { + Pattern hexPattern = Pattern.compile("^[0-9a-fA-F]+$"); + if (hexPattern.matcher(hexTid).matches()) { + return new FateId(PREFIX + type + ":" + hexTid); } else { - throw new IllegalArgumentException("Invalid FATE ID: " + fateIdStr); + throw new IllegalArgumentException("Invalid Hex Transaction ID: " + hexTid); } } diff --git a/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java index ef12316a611..d8495906e3a 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java @@ -86,7 +86,7 @@ interface FateTxStore extends ReadOnlyFateTxStore { * longer interact with it. * * @param deferTime time in millis to keep this transaction from being returned by - * {@link #runnable(java.util.concurrent.atomic.AtomicBoolean, java.util.function.LongConsumer)}. + * {@link #runnable(java.util.concurrent.atomic.AtomicBoolean, java.util.function.Consumer)}. * Must be non-negative. */ void unreserve(long deferTime, TimeUnit timeUnit); diff --git a/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java index fa733297133..0616a4e5b15 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java @@ -22,7 +22,7 @@ import java.util.EnumSet; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.LongConsumer; +import java.util.function.Consumer; import java.util.stream.Stream; /** @@ -131,5 +131,5 @@ interface ReadOnlyFateTxStore { * is found or until the keepWaiting parameter is false. It will return once all runnable ids * found were passed to the consumer. */ - void runnable(AtomicBoolean keepWaiting, LongConsumer idConsumer); + void runnable(AtomicBoolean keepWaiting, Consumer idConsumer); } diff --git a/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java b/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java index 485c4462a96..eccc0bd0830 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java @@ -306,7 +306,7 @@ protected FateTxStore newFateTxStore(FateId fateId, boolean isReserved) { protected Stream getTransactions() { try { return zk.getChildren(path).stream().map(strTxid -> { - FateId fateId = FateId.from("FATE:" + fateInstanceType + ":" + strTxid); + FateId fateId = FateId.from(fateInstanceType, strTxid); // Memoizing for two reasons. First the status may never be requested, so in that case avoid // the lookup. Second, if its requested multiple times the result will always be consistent. Supplier statusSupplier = Suppliers.memoize(() -> _getStatus(fateId)); diff --git a/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java index 643af274802..f96e4c10a94 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java @@ -85,8 +85,7 @@ protected Stream getTransactions() { scanner.setRange(new Range()); TxColumnFamily.STATUS_COLUMN.fetch(scanner); return scanner.stream().onClose(scanner::close).map(e -> { - FateId fateId = - FateId.from("FATE:" + fateInstanceType + ":" + e.getKey().getRow().toString()); + FateId fateId = FateId.from(fateInstanceType, e.getKey().getRow().toString()); return new FateIdStatus(fateId) { @Override public TStatus getStatus() { diff --git a/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java b/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java index f6f03207d86..f018c2bacb1 100644 --- a/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java +++ b/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java @@ -21,8 +21,8 @@ import java.io.Serializable; import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; import java.util.function.Function; -import java.util.function.LongConsumer; import java.util.stream.Stream; import org.apache.accumulo.core.fate.Fate; @@ -119,7 +119,7 @@ public Stream list() { } @Override - public void runnable(AtomicBoolean keepWaiting, LongConsumer idConsumer) { + public void runnable(AtomicBoolean keepWaiting, Consumer idConsumer) { store.runnable(keepWaiting, idConsumer); } diff --git a/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java b/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java index aba82713117..79eaddeda91 100644 --- a/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java +++ b/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java @@ -29,7 +29,7 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.LongConsumer; +import java.util.function.Consumer; import java.util.stream.Stream; /** @@ -41,9 +41,11 @@ public class TestStore implements FateStore { private Map statuses = new HashMap<>(); private Set reserved = new HashSet<>(); + private static final FateInstanceType fateInstanceType = FateInstanceType.USER; + @Override public FateId create() { - FateId fateId = FateId.from(FateInstanceType.USER, nextId++); + FateId fateId = FateId.from(fateInstanceType, nextId++); statuses.put(fateId, TStatus.NEW); return fateId; } @@ -173,7 +175,7 @@ public Stream list() { } @Override - public void runnable(AtomicBoolean keepWaiting, LongConsumer idConsumer) { + public void runnable(AtomicBoolean keepWaiting, Consumer idConsumer) { throw new UnsupportedOperationException(); } From 117149d1045545842094b25b746a800341d495f7 Mon Sep 17 00:00:00 2001 From: Kevin Rathbun Date: Thu, 25 Jan 2024 12:43:35 -0500 Subject: [PATCH 3/8] Added back FateTxId.java Will be deleted later on when it can be safely deleted (after all uses of FateTxId have been replaced by FateId) --- .../apache/accumulo/core/fate/FateTxId.java | 70 +++++++++++++++++++ 1 file changed, 70 insertions(+) create mode 100644 core/src/main/java/org/apache/accumulo/core/fate/FateTxId.java diff --git a/core/src/main/java/org/apache/accumulo/core/fate/FateTxId.java b/core/src/main/java/org/apache/accumulo/core/fate/FateTxId.java new file mode 100644 index 00000000000..8468eda3558 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/fate/FateTxId.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.fate; + +import java.util.regex.Pattern; + +import org.apache.accumulo.core.util.FastFormat; + +import com.google.common.base.Preconditions; + +public class FateTxId { + + private static final String PREFIX = "FATE["; + private static final String SUFFIX = "]"; + + private final static Pattern PATTERN = + Pattern.compile(Pattern.quote(PREFIX) + "[0-9a-fA-F]+" + Pattern.quote(SUFFIX)); + + private static String getHex(String fmtTid) { + return fmtTid.substring(PREFIX.length(), fmtTid.length() - SUFFIX.length()); + } + + /** + * @return true if string was created by {@link #formatTid(long)} and false otherwise. + */ + public static boolean isFormatedTid(String fmtTid) { + return PATTERN.matcher(fmtTid).matches(); + } + + /** + * Reverses {@link #formatTid(long)} + */ + public static long fromString(String fmtTid) { + Preconditions.checkArgument(fmtTid.startsWith(PREFIX) && fmtTid.endsWith(SUFFIX)); + return Long.parseLong(getHex(fmtTid), 16); + } + + /** + * Formats transaction ids in a consistent way that is useful for logging and persisting. + */ + public static String formatTid(long tid) { + // do not change how this formats without considering implications for persistence + return FastFormat.toHexString(PREFIX, tid, SUFFIX); + } + + public static long parseTidFromUserInput(String s) { + if (isFormatedTid(s)) { + return fromString(s); + } else { + return Long.parseLong(s, 16); + } + } + +} From 704fd68185955e4f66d1222ff14468c347473fea Mon Sep 17 00:00:00 2001 From: Kevin Rathbun Date: Fri, 26 Jan 2024 09:54:02 -0500 Subject: [PATCH 4/8] pattern to private static final field --- core/src/main/java/org/apache/accumulo/core/fate/FateId.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/fate/FateId.java b/core/src/main/java/org/apache/accumulo/core/fate/FateId.java index 9e3ccf5080f..3d04bc41961 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/FateId.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/FateId.java @@ -31,6 +31,7 @@ public class FateId extends AbstractId { private static final String PREFIX = "FATE:"; + private static final Pattern HEX_PATTERN = Pattern.compile("^[0-9a-fA-F]+$"); private FateId(String canonical) { super(canonical); @@ -76,8 +77,7 @@ public static FateId from(FateInstanceType type, long tid) { * @return a new FateId object */ public static FateId from(FateInstanceType type, String hexTid) { - Pattern hexPattern = Pattern.compile("^[0-9a-fA-F]+$"); - if (hexPattern.matcher(hexTid).matches()) { + if (HEX_PATTERN.matcher(hexTid).matches()) { return new FateId(PREFIX + type + ":" + hexTid); } else { throw new IllegalArgumentException("Invalid Hex Transaction ID: " + hexTid); From 00d7ef23b351760b70770ad8cba3acade2227202 Mon Sep 17 00:00:00 2001 From: Kevin Rathbun Date: Tue, 30 Jan 2024 10:29:25 -0500 Subject: [PATCH 5/8] Changes: - New fromThrift() method in FateId. - Fate now uses FateId and resolved issues stemming from these changes. - AdminUtil and Admin do not have FateId fully incorporated yet. Deferred for now. Marked "TODO DEFERRED - ISSUE 4044" in code. Need issue #4168 completed first. - Resolved all compile errors. --- .../apache/accumulo/core/fate/AdminUtil.java | 27 +++--- .../org/apache/accumulo/core/fate/Fate.java | 85 +++++++++---------- .../accumulo/core/fate/FateCleaner.java | 3 +- .../org/apache/accumulo/core/fate/FateId.java | 20 +++++ .../accumulo/core/logging/FateLogger.java | 1 + .../accumulo/manager/FateServiceHandler.java | 68 ++++++++------- .../manager/ManagerClientServiceHandler.java | 9 +- .../accumulo/manager/split/SplitTask.java | 5 +- .../org/apache/accumulo/test/fate/FateIT.java | 81 +++++++++--------- .../test/fate/accumulo/AccumuloFateIT.java | 10 +-- .../accumulo/AccumuloStoreReadWriteIT.java | 19 +++-- .../test/fate/zookeeper/ZookeeperFateIT.java | 9 +- 12 files changed, 185 insertions(+), 152 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java b/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java index 85bc34141ce..d9be4bd4b31 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java @@ -369,20 +369,22 @@ private FateStatus getTransactionStatus(Map statuses = new ArrayList<>(); fateStores.forEach((type, store) -> { - try (Stream tids = store.list().map(FateIdStatus::getTxid)) { - tids.forEach(tid -> { + try (Stream fateIds = store.list().map(FateIdStatus::getFateId)) { + fateIds.forEach(fateId -> { - ReadOnlyFateTxStore txStore = store.read(tid); + ReadOnlyFateTxStore txStore = store.read(fateId); String txName = (String) txStore.getTransactionInfo(Fate.TxInfo.TX_NAME); - List hlocks = heldLocks.remove(tid); + // TODO DEFERRED - ISSUE 4044 + List hlocks = heldLocks.remove(fateId.getTid()); if (hlocks == null) { hlocks = Collections.emptyList(); } - List wlocks = waitingLocks.remove(tid); + // TODO DEFERRED - ISSUE 4044 + List wlocks = waitingLocks.remove(fateId.getTid()); if (wlocks == null) { wlocks = Collections.emptyList(); @@ -398,9 +400,10 @@ private FateStatus getTransactionStatus(Map zs, ZooReaderWriter zk, ServiceLockPath p return false; } boolean state = false; - FateTxStore txStore = zs.reserve(txid); + // TODO DEFERRED - ISSUE 4044 + FateId fateId = FateId.from(FateInstanceType.META, txid); + FateTxStore txStore = zs.reserve(fateId); try { TStatus ts = txStore.getStatus(); switch (ts) { @@ -500,7 +505,9 @@ public boolean prepFail(FateStore zs, ZooReaderWriter zk, ServiceLockPath zLo return false; } boolean state = false; - FateTxStore txStore = zs.reserve(txid); + // TODO DEFERRED - ISSUE 4044 + FateId fateId = FateId.from(FateInstanceType.META, txid); + FateTxStore txStore = zs.reserve(fateId); try { TStatus ts = txStore.getStatus(); switch (ts) { diff --git a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java index 55845078475..675113d62ce 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java @@ -73,7 +73,7 @@ public class Fate { private static final EnumSet FINISHED_STATES = EnumSet.of(FAILED, SUCCESSFUL, UNKNOWN); private final AtomicBoolean keepRunning = new AtomicBoolean(true); - private final TransferQueue workQueue; + private final TransferQueue workQueue; private final Thread workFinder; public enum TxInfo { @@ -90,7 +90,7 @@ private class WorkFinder implements Runnable { public void run() { while (keepRunning.get()) { try { - store.runnable(keepRunning, txid -> { + store.runnable(keepRunning, fateId -> { while (keepRunning.get()) { try { // The reason for calling transfer instead of queueing is avoid rescanning the @@ -98,7 +98,7 @@ public void run() { // were busy, the queue size was 100, and there are three runnable things in the // store. Do not want to keep scanning the store adding those same 3 runnable things // until the queue is full. - if (workQueue.tryTransfer(txid, 100, MILLISECONDS)) { + if (workQueue.tryTransfer(fateId, 100, MILLISECONDS)) { break; } } catch (InterruptedException e) { @@ -124,12 +124,12 @@ private class TransactionRunner implements Runnable { private Optional> reserveFateTx() throws InterruptedException { while (keepRunning.get()) { - var unreservedTid = workQueue.poll(100, MILLISECONDS); + FateId unreservedFateId = workQueue.poll(100, MILLISECONDS); - if (unreservedTid == null) { + if (unreservedFateId == null) { continue; } - var optionalopStore = store.tryReserve(unreservedTid); + var optionalopStore = store.tryReserve(unreservedFateId); if (optionalopStore.isPresent()) { return optionalopStore; } @@ -157,7 +157,7 @@ public void run() { } else if (status == SUBMITTED || status == IN_PROGRESS) { Repo prevOp = null; try { - deferTime = op.isReady(txStore.getID(), environment); + deferTime = op.isReady(txStore.getID().getTid(), environment); // Here, deferTime is only used to determine success (zero) or failure (non-zero), // proceeding on success and returning to the while loop on failure. @@ -167,7 +167,7 @@ public void run() { if (status == SUBMITTED) { txStore.setStatus(IN_PROGRESS); } - op = op.call(txStore.getID(), environment); + op = op.call(txStore.getID().getTid(), environment); } else { continue; } @@ -214,18 +214,17 @@ public void run() { * transaction just wait for process to die. When the manager start elsewhere the FATE * transaction can resume. */ - private void blockIfHadoopShutdown(long tid, Exception e) { + private void blockIfHadoopShutdown(FateId fateId, Exception e) { if (ShutdownUtil.isShutdownInProgress()) { - String tidStr = FateTxId.formatTid(tid); if (e instanceof AcceptableException) { - log.debug("Ignoring exception possibly caused by Hadoop Shutdown hook. {} ", tidStr, e); + log.debug("Ignoring exception possibly caused by Hadoop Shutdown hook. {} ", fateId, e); } else if (isIOException(e)) { - log.info("Ignoring exception likely caused by Hadoop Shutdown hook. {} ", tidStr, e); + log.info("Ignoring exception likely caused by Hadoop Shutdown hook. {} ", fateId, e); } else { // sometimes code will catch an IOException caused by the hadoop shutdown hook and throw // another exception without setting the cause. - log.warn("Ignoring exception possibly caused by Hadoop Shutdown hook. {} ", tidStr, e); + log.warn("Ignoring exception possibly caused by Hadoop Shutdown hook. {} ", fateId, e); } while (true) { @@ -237,8 +236,8 @@ private void blockIfHadoopShutdown(long tid, Exception e) { } private void transitionToFailed(FateTxStore txStore, Exception e) { - String tidStr = FateTxId.formatTid(txStore.getID()); - final String msg = "Failed to execute Repo " + tidStr; + FateId fateId = txStore.getID(); + final String msg = "Failed to execute Repo " + fateId; // Certain FATE ops that throw exceptions don't need to be propagated up to the Monitor // as a warning. They're a normal, handled failure condition. if (e instanceof AcceptableException) { @@ -250,7 +249,7 @@ private void transitionToFailed(FateTxStore txStore, Exception e) { } txStore.setTransactionInfo(TxInfo.EXCEPTION, e); txStore.setStatus(FAILED_IN_PROGRESS); - log.info("Updated status for Repo with {} to FAILED_IN_PROGRESS", tidStr); + log.info("Updated status for Repo with {} to FAILED_IN_PROGRESS", fateId); } private void processFailed(FateTxStore txStore, Repo op) { @@ -278,11 +277,11 @@ private void doCleanUp(FateTxStore txStore) { } } - private void undo(long tid, Repo op) { + private void undo(FateId fateId, Repo op) { try { - op.undo(tid, environment); + op.undo(fateId.getTid(), environment); } catch (Exception e) { - log.warn("Failed to undo Repo, " + FateTxId.formatTid(tid), e); + log.warn("Failed to undo Repo, " + fateId, e); } } @@ -332,20 +331,20 @@ public Fate(T environment, FateStore store, Function,String> toLogStr } // get a transaction id back to the requester before doing any work - public long startTransaction() { + public FateId startTransaction() { return store.create(); } // start work in the transaction.. it is safe to call this // multiple times for a transaction... but it will only seed once - public void seedTransaction(String txName, long tid, Repo repo, boolean autoCleanUp, + public void seedTransaction(String txName, FateId fateId, Repo repo, boolean autoCleanUp, String goalMessage) { - FateTxStore txStore = store.reserve(tid); + FateTxStore txStore = store.reserve(fateId); try { if (txStore.getStatus() == NEW) { if (txStore.top() == null) { try { - log.info("Seeding {} {}", FateTxId.formatTid(tid), goalMessage); + log.info("Seeding {} {}", fateId, goalMessage); txStore.push(repo); } catch (StackOverflowException e) { // this should not happen @@ -368,21 +367,20 @@ public void seedTransaction(String txName, long tid, Repo repo, boolean autoC } // check on the transaction - public TStatus waitForCompletion(long tid) { - return store.read(tid).waitForStatusChange(FINISHED_STATES); + public TStatus waitForCompletion(FateId fateId) { + return store.read(fateId).waitForStatusChange(FINISHED_STATES); } /** * Attempts to cancel a running Fate transaction * - * @param tid transaction id + * @param fateId fate transaction id * @return true if transaction transitioned to a failed state or already in a completed state, * false otherwise */ - public boolean cancel(long tid) { - String tidStr = FateTxId.formatTid(tid); + public boolean cancel(FateId fateId) { for (int retries = 0; retries < 5; retries++) { - Optional> optionalTxStore = store.tryReserve(tid); + Optional> optionalTxStore = store.tryReserve(fateId); if (optionalTxStore.isPresent()) { var txStore = optionalTxStore.orElseThrow(); try { @@ -393,10 +391,10 @@ public boolean cancel(long tid) { TApplicationException.INTERNAL_ERROR, "Fate transaction cancelled by user")); txStore.setStatus(FAILED_IN_PROGRESS); log.info("Updated status for {} to FAILED_IN_PROGRESS because it was cancelled by user", - tidStr); + fateId); return true; } else { - log.info("{} cancelled by user but already in progress or finished state", tidStr); + log.info("{} cancelled by user but already in progress or finished state", fateId); return false; } } finally { @@ -407,13 +405,13 @@ public boolean cancel(long tid) { UtilWaitThread.sleep(500); } } - log.info("Unable to reserve transaction {} to cancel it", tid); + log.info("Unable to reserve transaction {} to cancel it", fateId); return false; } // resource cleanup - public void delete(long tid) { - FateTxStore txStore = store.reserve(tid); + public void delete(FateId fateId) { + FateTxStore txStore = store.reserve(fateId); try { switch (txStore.getStatus()) { case NEW: @@ -424,8 +422,7 @@ public void delete(long tid) { break; case FAILED_IN_PROGRESS: case IN_PROGRESS: - throw new IllegalStateException( - "Can not delete in progress transaction " + FateTxId.formatTid(tid)); + throw new IllegalStateException("Can not delete in progress transaction " + fateId); case UNKNOWN: // nothing to do, it does not exist break; @@ -435,12 +432,12 @@ public void delete(long tid) { } } - public String getReturn(long tid) { - FateTxStore txStore = store.reserve(tid); + public String getReturn(FateId fateId) { + FateTxStore txStore = store.reserve(fateId); try { if (txStore.getStatus() != SUCCESSFUL) { - throw new IllegalStateException("Tried to get exception when transaction " - + FateTxId.formatTid(tid) + " not in successful state"); + throw new IllegalStateException( + "Tried to get exception when transaction " + fateId + " not in successful state"); } return (String) txStore.getTransactionInfo(TxInfo.RETURN_VALUE); } finally { @@ -449,12 +446,12 @@ public String getReturn(long tid) { } // get reportable failures - public Exception getException(long tid) { - FateTxStore txStore = store.reserve(tid); + public Exception getException(FateId fateId) { + FateTxStore txStore = store.reserve(fateId); try { if (txStore.getStatus() != FAILED) { - throw new IllegalStateException("Tried to get exception when transaction " - + FateTxId.formatTid(tid) + " not in failed state"); + throw new IllegalStateException( + "Tried to get exception when transaction " + fateId + " not in failed state"); } return (Exception) txStore.getTransactionInfo(TxInfo.EXCEPTION); } finally { diff --git a/core/src/main/java/org/apache/accumulo/core/fate/FateCleaner.java b/core/src/main/java/org/apache/accumulo/core/fate/FateCleaner.java index 9563f8b7d12..4e1beb1b9b2 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/FateCleaner.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/FateCleaner.java @@ -116,8 +116,7 @@ public void ageOff() { var newAgeOffInfo = new AgeOffInfo(instanceId, timeSource.currentTimeNanos(), currStatus); txStore.setTransactionInfo(Fate.TxInfo.TX_AGEOFF, newAgeOffInfo.toString()); - log.trace("Set age off data {} {}", idStatus.getFateId(), - newAgeOffInfo); + log.trace("Set age off data {} {}", idStatus.getFateId(), newAgeOffInfo); } else if (shouldAgeOff(currStatus, ageOffInfo)) { txStore.delete(); log.debug("Aged off FATE tx {}", idStatus.getFateId()); diff --git a/core/src/main/java/org/apache/accumulo/core/fate/FateId.java b/core/src/main/java/org/apache/accumulo/core/fate/FateId.java index 3d04bc41961..90e87c67d56 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/FateId.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/FateId.java @@ -21,6 +21,7 @@ import java.util.regex.Pattern; import org.apache.accumulo.core.data.AbstractId; +import org.apache.accumulo.core.manager.thrift.TFateId; import org.apache.accumulo.core.util.FastFormat; /** @@ -30,6 +31,7 @@ */ public class FateId extends AbstractId { + private static final long serialVersionUID = 1L; private static final String PREFIX = "FATE:"; private static final Pattern HEX_PATTERN = Pattern.compile("^[0-9a-fA-F]+$"); @@ -84,6 +86,24 @@ public static FateId from(FateInstanceType type, String hexTid) { } } + public static FateId fromThrift(TFateId tFateId) { + FateInstanceType type; + long tid = tFateId.getTid(); + + switch (tFateId.getType()) { + case USER: + type = FateInstanceType.USER; + break; + case META: + type = FateInstanceType.META; + break; + default: + throw new IllegalArgumentException("Invalid TFateInstanceType: " + tFateId.getType()); + } + + return new FateId(PREFIX + type + ":" + formatTid(tid)); + } + /** * Formats transaction ids in a consistent way that is useful for logging and persisting. */ diff --git a/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java b/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java index 4acd6209874..fe525bf37d3 100644 --- a/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java +++ b/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java @@ -132,6 +132,7 @@ public FateId create() { return fateId; } + @Override public int getDeferredCount() { return store.getDeferredCount(); } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/FateServiceHandler.java b/server/manager/src/main/java/org/apache/accumulo/manager/FateServiceHandler.java index abef59d7132..e35088bd10f 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/FateServiceHandler.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/FateServiceHandler.java @@ -69,6 +69,7 @@ import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.dataImpl.thrift.TRange; +import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.FateInstanceType; import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; import org.apache.accumulo.core.manager.state.tables.TableState; @@ -80,7 +81,6 @@ import org.apache.accumulo.core.manager.thrift.ThriftPropertyException; import org.apache.accumulo.core.securityImpl.thrift.TCredentials; import org.apache.accumulo.core.util.ByteBufferUtil; -import org.apache.accumulo.core.util.FastFormat; import org.apache.accumulo.core.util.TextUtil; import org.apache.accumulo.core.util.Validator; import org.apache.accumulo.core.util.tables.TableNameUtil; @@ -126,7 +126,8 @@ public FateServiceHandler(Manager manager) { public TFateId beginFateOperation(TInfo tinfo, TCredentials credentials, TFateInstanceType type) throws ThriftSecurityException { authenticate(credentials); - return new TFateId(type, manager.fate(FateInstanceType.fromThrift(type)).startTransaction()); + return new TFateId(type, + manager.fate(FateInstanceType.fromThrift(type)).startTransaction().getTid()); } @Override @@ -137,6 +138,7 @@ public void executeFateOperation(TInfo tinfo, TCredentials c, TFateId opid, Fate String goalMessage = op.toString() + " "; long tid = opid.getTid(); FateInstanceType type = FateInstanceType.fromThrift(opid.getType()); + FateId fateId = FateId.from(type, tid); switch (op) { case NAMESPACE_CREATE: { @@ -149,7 +151,7 @@ public void executeFateOperation(TInfo tinfo, TCredentials c, TFateId opid, Fate } goalMessage += "Create " + namespace + " namespace."; - manager.fate(type).seedTransaction(op.toString(), tid, + manager.fate(type).seedTransaction(op.toString(), fateId, new TraceRepo<>(new CreateNamespace(c.getPrincipal(), namespace, options)), autoCleanup, goalMessage); break; @@ -168,7 +170,7 @@ public void executeFateOperation(TInfo tinfo, TCredentials c, TFateId opid, Fate } goalMessage += "Rename " + oldName + " namespace to " + newName; - manager.fate(type).seedTransaction(op.toString(), tid, + manager.fate(type).seedTransaction(op.toString(), fateId, new TraceRepo<>(new RenameNamespace(namespaceId, oldName, newName)), autoCleanup, goalMessage); break; @@ -186,7 +188,7 @@ public void executeFateOperation(TInfo tinfo, TCredentials c, TFateId opid, Fate } goalMessage += "Delete namespace Id: " + namespaceId; - manager.fate(type).seedTransaction(op.toString(), tid, + manager.fate(type).seedTransaction(op.toString(), fateId, new TraceRepo<>(new DeleteNamespace(namespaceId)), autoCleanup, goalMessage); break; } @@ -248,8 +250,7 @@ public void executeFateOperation(TInfo tinfo, TCredentials c, TFateId opid, Fate goalMessage += "Create table " + tableName + " " + initialTableState + " with " + splitCount + " splits and initial hosting goal of " + initialHostingGoal; - - manager.fate(type).seedTransaction(op.toString(), tid, + manager.fate(type).seedTransaction(op.toString(), fateId, new TraceRepo<>(new CreateTable(c.getPrincipal(), tableName, timeType, options, splitsPath, splitCount, splitsDirsPath, initialTableState, initialHostingGoal, namespaceId)), @@ -285,7 +286,7 @@ public void executeFateOperation(TInfo tinfo, TCredentials c, TFateId opid, Fate goalMessage += "Rename table " + oldTableName + "(" + tableId + ") to " + oldTableName; try { - manager.fate(type).seedTransaction(op.toString(), tid, + manager.fate(type).seedTransaction(op.toString(), fateId, new TraceRepo<>(new RenameTable(namespaceId, tableId, oldTableName, newTableName)), autoCleanup, goalMessage); } catch (NamespaceNotFoundException e) { @@ -357,7 +358,7 @@ public void executeFateOperation(TInfo tinfo, TCredentials c, TFateId opid, Fate } manager.fate(type).seedTransaction( - op.toString(), tid, new TraceRepo<>(new CloneTable(c.getPrincipal(), namespaceId, + op.toString(), fateId, new TraceRepo<>(new CloneTable(c.getPrincipal(), namespaceId, srcTableId, tableName, propertiesToSet, propertiesToExclude, keepOffline)), autoCleanup, goalMessage); @@ -386,7 +387,7 @@ public void executeFateOperation(TInfo tinfo, TCredentials c, TFateId opid, Fate } goalMessage += "Delete table " + tableName + "(" + tableId + ")"; - manager.fate(type).seedTransaction(op.toString(), tid, + manager.fate(type).seedTransaction(op.toString(), fateId, new TraceRepo<>(new PreDeleteTable(namespaceId, tableId)), autoCleanup, goalMessage); break; } @@ -411,7 +412,7 @@ public void executeFateOperation(TInfo tinfo, TCredentials c, TFateId opid, Fate goalMessage += "Online table " + tableId; final EnumSet expectedCurrStates = EnumSet.of(TableState.ONLINE, TableState.OFFLINE); - manager.fate(type).seedTransaction(op.toString(), tid, + manager.fate(type).seedTransaction(op.toString(), fateId, new TraceRepo<>( new ChangeTableState(namespaceId, tableId, tableOp, expectedCurrStates)), autoCleanup, goalMessage); @@ -439,7 +440,7 @@ public void executeFateOperation(TInfo tinfo, TCredentials c, TFateId opid, Fate goalMessage += "Offline table " + tableId; final EnumSet expectedCurrStates = EnumSet.of(TableState.ONLINE, TableState.OFFLINE); - manager.fate(type).seedTransaction(op.toString(), tid, + manager.fate(type).seedTransaction(op.toString(), fateId, new TraceRepo<>( new ChangeTableState(namespaceId, tableId, tableOp, expectedCurrStates)), autoCleanup, goalMessage); @@ -475,7 +476,7 @@ public void executeFateOperation(TInfo tinfo, TCredentials c, TFateId opid, Fate startRowStr, endRowStr); goalMessage += "Merge table " + tableName + "(" + tableId + ") splits from " + startRowStr + " to " + endRowStr; - manager.fate(type).seedTransaction(op.toString(), tid, new TraceRepo<>( + manager.fate(type).seedTransaction(op.toString(), fateId, new TraceRepo<>( new TableRangeOp(MergeInfo.Operation.MERGE, namespaceId, tableId, startRow, endRow)), autoCleanup, goalMessage); break; @@ -507,7 +508,7 @@ public void executeFateOperation(TInfo tinfo, TCredentials c, TFateId opid, Fate goalMessage += "Delete table " + tableName + "(" + tableId + ") range " + startRow + " to " + endRow; - manager.fate(type).seedTransaction(op.toString(), tid, new TraceRepo<>( + manager.fate(type).seedTransaction(op.toString(), fateId, new TraceRepo<>( new TableRangeOp(MergeInfo.Operation.DELETE, namespaceId, tableId, startRow, endRow)), autoCleanup, goalMessage); break; @@ -533,7 +534,7 @@ public void executeFateOperation(TInfo tinfo, TCredentials c, TFateId opid, Fate } goalMessage += "Compact table (" + tableId + ") with config " + compactionConfig; - manager.fate(type).seedTransaction(op.toString(), tid, + manager.fate(type).seedTransaction(op.toString(), fateId, new TraceRepo<>(new CompactRange(namespaceId, tableId, compactionConfig)), autoCleanup, goalMessage); break; @@ -557,7 +558,7 @@ public void executeFateOperation(TInfo tinfo, TCredentials c, TFateId opid, Fate } goalMessage += "Cancel compaction of table (" + tableId + ")"; - manager.fate(type).seedTransaction(op.toString(), tid, + manager.fate(type).seedTransaction(op.toString(), fateId, new TraceRepo<>(new CancelCompactions(namespaceId, tableId)), autoCleanup, goalMessage); break; } @@ -598,10 +599,10 @@ public void executeFateOperation(TInfo tinfo, TCredentials c, TFateId opid, Fate } goalMessage += "Import table with new name: " + tableName + " from " + exportDirs; - manager.fate(type) - .seedTransaction(op.toString(), tid, new TraceRepo<>(new ImportTable(c.getPrincipal(), - tableName, exportDirs, namespaceId, keepMappings, keepOffline)), autoCleanup, - goalMessage); + manager.fate(type).seedTransaction(op.toString(), fateId, + new TraceRepo<>(new ImportTable(c.getPrincipal(), tableName, exportDirs, namespaceId, + keepMappings, keepOffline)), + autoCleanup, goalMessage); break; } case TABLE_EXPORT: { @@ -628,7 +629,7 @@ public void executeFateOperation(TInfo tinfo, TCredentials c, TFateId opid, Fate } goalMessage += "Export table " + tableName + "(" + tableId + ") to " + exportDir; - manager.fate(type).seedTransaction(op.toString(), tid, + manager.fate(type).seedTransaction(op.toString(), fateId, new TraceRepo<>(new ExportTable(namespaceId, tableName, tableId, exportDir)), autoCleanup, goalMessage); break; @@ -665,7 +666,7 @@ public void executeFateOperation(TInfo tinfo, TCredentials c, TFateId opid, Fate manager.updateBulkImportStatus(dir, BulkImportState.INITIAL); goalMessage += "Bulk import (v2) " + dir + " to " + tableName + "(" + tableId + ")"; - manager.fate(type).seedTransaction(op.toString(), tid, + manager.fate(type).seedTransaction(op.toString(), fateId, new TraceRepo<>(new PrepBulkImport(tableId, dir, setTime)), autoCleanup, goalMessage); break; } @@ -707,7 +708,7 @@ public void executeFateOperation(TInfo tinfo, TCredentials c, TFateId opid, Fate goalMessage += "Set Hosting Goal for table: " + tableName + "(" + tableId + ") range: " + tRange + " to: " + goal.name(); - manager.fate(type).seedTransaction(op.toString(), tid, + manager.fate(type).seedTransaction(op.toString(), fateId, new TraceRepo<>(new SetHostingGoal(tableId, namespaceId, tRange, goal)), autoCleanup, goalMessage); break; @@ -784,7 +785,7 @@ public void executeFateOperation(TInfo tinfo, TCredentials c, TFateId opid, Fate } goalMessage = "Splitting " + extent + " for user into " + (splits.size() + 1) + " tablets"; - manager.fate(type).seedTransaction(op.toString(), tid, new PreSplit(extent, splits), + manager.fate(type).seedTransaction(op.toString(), fateId, new PreSplit(extent, splits), autoCleanup, goalMessage); break; } @@ -835,10 +836,11 @@ public String waitForFateOperation(TInfo tinfo, TCredentials credentials, TFateI throws ThriftSecurityException, ThriftTableOperationException { authenticate(credentials); - FateInstanceType type = FateInstanceType.fromThrift(opid.getType()); - TStatus status = manager.fate(type).waitForCompletion(opid.getTid()); + FateId fateId = FateId.fromThrift(opid); + FateInstanceType type = fateId.getType(); + TStatus status = manager.fate(type).waitForCompletion(fateId); if (status == TStatus.FAILED) { - Exception e = manager.fate(type).getException(opid.getTid()); + Exception e = manager.fate(type).getException(fateId); if (e instanceof ThriftTableOperationException) { throw (ThriftTableOperationException) e; } else if (e instanceof ThriftSecurityException) { @@ -850,7 +852,7 @@ public String waitForFateOperation(TInfo tinfo, TCredentials credentials, TFateI } } - String ret = manager.fate(type).getReturn(opid.getTid()); + String ret = manager.fate(type).getReturn(fateId); if (ret == null) { ret = ""; // thrift does not like returning null } @@ -861,7 +863,8 @@ public String waitForFateOperation(TInfo tinfo, TCredentials credentials, TFateI public void finishFateOperation(TInfo tinfo, TCredentials credentials, TFateId opid) throws ThriftSecurityException { authenticate(credentials); - manager.fate(FateInstanceType.fromThrift(opid.getType())).delete(opid.getTid()); + FateId fateId = FateId.fromThrift(opid); + manager.fate(fateId.getType()).delete(fateId); } protected void authenticate(TCredentials credentials) throws ThriftSecurityException { @@ -942,8 +945,8 @@ private void writeSplitsToFile(Path splitsPath, final List arguments */ public Path mkTempDir(TFateId opid) throws IOException { Volume vol = manager.getVolumeManager().getFirst(); - Path p = vol - .prefixChild("/tmp/fate-" + opid.getType() + "-" + FastFormat.toHexString(opid.getTid())); + FateId fateId = FateId.fromThrift(opid); + Path p = vol.prefixChild("/tmp/fate-" + fateId.getType() + "-" + fateId.getHexTid()); FileSystem fs = vol.getFileSystem(); if (fs.exists(p)) { fs.delete(p, true); @@ -955,12 +958,13 @@ public Path mkTempDir(TFateId opid) throws IOException { @Override public boolean cancelFateOperation(TInfo tinfo, TCredentials credentials, TFateId opid) throws ThriftSecurityException, ThriftNotActiveServiceException { + FateId fateId = FateId.fromThrift(opid); if (!manager.security.canPerformSystemActions(credentials)) { throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED); } - return manager.fate(FateInstanceType.fromThrift(opid.getType())).cancel(opid.getTid()); + return manager.fate(fateId.getType()).cancel(fateId); } } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/ManagerClientServiceHandler.java b/server/manager/src/main/java/org/apache/accumulo/manager/ManagerClientServiceHandler.java index de1328571c4..0c568862e84 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/ManagerClientServiceHandler.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/ManagerClientServiceHandler.java @@ -57,6 +57,7 @@ import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent; import org.apache.accumulo.core.fate.Fate; +import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.FateInstanceType; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.manager.thrift.ManagerClientService; @@ -330,14 +331,14 @@ public void shutdownTabletServer(TInfo info, TCredentials c, String tabletServer } Fate fate = manager.fate(FateInstanceType.META); - long tid = fate.startTransaction(); + FateId fateId = fate.startTransaction(); String msg = "Shutdown tserver " + tabletServer; - fate.seedTransaction("ShutdownTServer", tid, + fate.seedTransaction("ShutdownTServer", fateId, new TraceRepo<>(new ShutdownTServer(doomed, force)), false, msg); - fate.waitForCompletion(tid); - fate.delete(tid); + fate.waitForCompletion(fateId); + fate.delete(fateId); log.debug("FATE op shutting down " + tabletServer + " finished"); } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/split/SplitTask.java b/server/manager/src/main/java/org/apache/accumulo/manager/split/SplitTask.java index 18958c92be0..999de0f7e99 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/split/SplitTask.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/split/SplitTask.java @@ -21,6 +21,7 @@ import java.time.Duration; import java.util.SortedSet; +import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.FateInstanceType; import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.manager.Manager; @@ -85,9 +86,9 @@ public void run() { } var fateInstanceType = FateInstanceType.fromTableId((tablet.getTableId())); - long fateTxId = manager.fate(fateInstanceType).startTransaction(); + FateId fateId = manager.fate(fateInstanceType).startTransaction(); - manager.fate(fateInstanceType).seedTransaction("SYSTEM_SPLIT", fateTxId, + manager.fate(fateInstanceType).seedTransaction("SYSTEM_SPLIT", fateId, new PreSplit(extent, splits), true, "System initiated split of tablet " + extent + " into " + splits.size() + " splits"); } catch (Exception e) { diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java b/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java index d1797a42f91..87e4dca1928 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java @@ -37,6 +37,7 @@ import org.apache.accumulo.core.conf.ConfigurationCopy; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.fate.Fate; +import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.FateStore; import org.apache.accumulo.core.fate.FateTxId; import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; @@ -166,18 +167,18 @@ protected void testTransactionStatus(FateStore store, ServerContext sct callStarted = new CountDownLatch(1); finishCall = new CountDownLatch(1); - long txid = fate.startTransaction(); - assertEquals(TStatus.NEW, getTxStatus(sctx, txid)); - fate.seedTransaction("TestOperation", txid, new TestRepo("testTransactionStatus"), true, + FateId fateId = fate.startTransaction(); + assertEquals(TStatus.NEW, getTxStatus(sctx, fateId)); + fate.seedTransaction("TestOperation", fateId, new TestRepo("testTransactionStatus"), true, "Test Op"); - assertEquals(TStatus.SUBMITTED, getTxStatus(sctx, txid)); + assertEquals(TStatus.SUBMITTED, getTxStatus(sctx, fateId)); // wait for call() to be called callStarted.await(); - assertEquals(IN_PROGRESS, getTxStatus(sctx, txid)); + assertEquals(IN_PROGRESS, getTxStatus(sctx, fateId)); // tell the op to exit the method finishCall.countDown(); - Wait.waitFor(() -> getTxStatus(sctx, txid) == UNKNOWN); + Wait.waitFor(() -> getTxStatus(sctx, fateId) == UNKNOWN); } finally { fate.shutdown(); } @@ -198,20 +199,20 @@ protected void testCancelWhileNew(FateStore store, ServerContext sctx) callStarted = new CountDownLatch(1); finishCall = new CountDownLatch(1); - long txid = fate.startTransaction(); - LOG.debug("Starting test testCancelWhileNew with {}", FateTxId.formatTid(txid)); - assertEquals(NEW, getTxStatus(sctx, txid)); + FateId fateId = fate.startTransaction(); + LOG.debug("Starting test testCancelWhileNew with {}", fateId); + assertEquals(NEW, getTxStatus(sctx, fateId)); // cancel the transaction - assertTrue(fate.cancel(txid)); + assertTrue(fate.cancel(fateId)); assertTrue( - FAILED_IN_PROGRESS == getTxStatus(sctx, txid) || FAILED == getTxStatus(sctx, txid)); - fate.seedTransaction("TestOperation", txid, new TestRepo("testCancelWhileNew"), true, + FAILED_IN_PROGRESS == getTxStatus(sctx, fateId) || FAILED == getTxStatus(sctx, fateId)); + fate.seedTransaction("TestOperation", fateId, new TestRepo("testCancelWhileNew"), true, "Test Op"); - Wait.waitFor(() -> FAILED == getTxStatus(sctx, txid)); + Wait.waitFor(() -> FAILED == getTxStatus(sctx, fateId)); // nothing should have run assertEquals(1, callStarted.getCount()); - fate.delete(txid); - assertEquals(UNKNOWN, getTxStatus(sctx, txid)); + fate.delete(fateId); + assertEquals(UNKNOWN, getTxStatus(sctx, fateId)); } finally { fate.shutdown(); } @@ -233,20 +234,20 @@ protected void testCancelWhileSubmittedAndRunning(FateStore store, Serv callStarted = new CountDownLatch(1); finishCall = new CountDownLatch(1); - long txid = fate.startTransaction(); - LOG.debug("Starting test testCancelWhileSubmitted with {}", FateTxId.formatTid(txid)); - assertEquals(NEW, getTxStatus(sctx, txid)); - fate.seedTransaction("TestOperation", txid, + FateId fateId = fate.startTransaction(); + LOG.debug("Starting test testCancelWhileSubmitted with {}", fateId); + assertEquals(NEW, getTxStatus(sctx, fateId)); + fate.seedTransaction("TestOperation", fateId, new TestRepo("testCancelWhileSubmittedAndRunning"), false, "Test Op"); - Wait.waitFor(() -> IN_PROGRESS == getTxStatus(sctx, txid)); + Wait.waitFor(() -> IN_PROGRESS == getTxStatus(sctx, fateId)); // This is false because the transaction runner has reserved the FaTe // transaction. - assertFalse(fate.cancel(txid)); + assertFalse(fate.cancel(fateId)); callStarted.await(); finishCall.countDown(); - Wait.waitFor(() -> IN_PROGRESS != getTxStatus(sctx, txid)); - fate.delete(txid); - assertEquals(UNKNOWN, getTxStatus(sctx, txid)); + Wait.waitFor(() -> IN_PROGRESS != getTxStatus(sctx, fateId)); + fate.delete(fateId); + assertEquals(UNKNOWN, getTxStatus(sctx, fateId)); } finally { fate.shutdown(); } @@ -268,16 +269,16 @@ protected void testCancelWhileInCall(FateStore store, ServerContext sct callStarted = new CountDownLatch(1); finishCall = new CountDownLatch(1); - long txid = fate.startTransaction(); - LOG.debug("Starting test testCancelWhileInCall with {}", FateTxId.formatTid(txid)); - assertEquals(NEW, getTxStatus(sctx, txid)); - fate.seedTransaction("TestOperation", txid, new TestRepo("testCancelWhileInCall"), true, + FateId fateId = fate.startTransaction(); + LOG.debug("Starting test testCancelWhileInCall with {}", fateId); + assertEquals(NEW, getTxStatus(sctx, fateId)); + fate.seedTransaction("TestOperation", fateId, new TestRepo("testCancelWhileInCall"), true, "Test Op"); - assertEquals(SUBMITTED, getTxStatus(sctx, txid)); + assertEquals(SUBMITTED, getTxStatus(sctx, fateId)); // wait for call() to be called callStarted.await(); // cancel the transaction - assertFalse(fate.cancel(txid)); + assertFalse(fate.cancel(fateId)); } finally { fate.shutdown(); } @@ -306,7 +307,7 @@ protected void testDeferredOverflow(FateStore store, ServerContext sctx // so it will be deferred when submitted DeferredTestRepo.delay.set(30000); - Set transactions = new HashSet<>(); + Set transactions = new HashSet<>(); // Start by creating 10 transactions that are all deferred which should // fill up the deferred map with all 10 as we set the max deferred limit @@ -346,7 +347,7 @@ protected void testDeferredOverflow(FateStore store, ServerContext sctx // Verify all 20 unique transactions finished Wait.waitFor(() -> { - transactions.removeIf(txid -> getTxStatus(sctx, txid) == UNKNOWN); + transactions.removeIf(fateId -> getTxStatus(sctx, fateId) == UNKNOWN); return transactions.isEmpty(); }); @@ -355,13 +356,13 @@ protected void testDeferredOverflow(FateStore store, ServerContext sctx } } - private void submitDeferred(Fate fate, ServerContext sctx, Set transactions) { - long txid = fate.startTransaction(); - transactions.add(txid); - assertEquals(TStatus.NEW, getTxStatus(sctx, txid)); - fate.seedTransaction("TestOperation", txid, new DeferredTestRepo("testDeferredOverflow"), true, - "Test Op"); - assertEquals(TStatus.SUBMITTED, getTxStatus(sctx, txid)); + private void submitDeferred(Fate fate, ServerContext sctx, Set transactions) { + FateId fateId = fate.startTransaction(); + transactions.add(fateId); + assertEquals(TStatus.NEW, getTxStatus(sctx, fateId)); + fate.seedTransaction("TestOperation", fateId, new DeferredTestRepo("testDeferredOverflow"), + true, "Test Op"); + assertEquals(TStatus.SUBMITTED, getTxStatus(sctx, fateId)); } protected Fate initializeFate(FateStore store) { @@ -371,7 +372,7 @@ protected Fate initializeFate(FateStore store) { return new Fate<>(new TestEnv(), store, r -> r + "", config); } - protected abstract TStatus getTxStatus(ServerContext sctx, long txid); + protected abstract TStatus getTxStatus(ServerContext sctx, FateId fateId); protected abstract void executeTest(FateTestExecutor testMethod) throws Exception; diff --git a/test/src/main/java/org/apache/accumulo/test/fate/accumulo/AccumuloFateIT.java b/test/src/main/java/org/apache/accumulo/test/fate/accumulo/AccumuloFateIT.java index c71c1d1229e..90368875500 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/accumulo/AccumuloFateIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/accumulo/AccumuloFateIT.java @@ -25,11 +25,11 @@ import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.clientImpl.ClientContext; import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; import org.apache.accumulo.core.fate.accumulo.AccumuloStore; import org.apache.accumulo.core.fate.accumulo.schema.FateSchema.TxColumnFamily; import org.apache.accumulo.core.security.Authorizations; -import org.apache.accumulo.core.util.FastFormat; import org.apache.accumulo.harness.SharedMiniClusterBase; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.test.fate.FateIT; @@ -68,9 +68,9 @@ protected void executeTest(FateTestExecutor testMethod, int maxDeferred) throws } @Override - protected TStatus getTxStatus(ServerContext context, long txid) { + protected TStatus getTxStatus(ServerContext context, FateId fateId) { try (Scanner scanner = context.createScanner(table, Authorizations.EMPTY)) { - scanner.setRange(getRow(txid)); + scanner.setRange(getRow(fateId)); TxColumnFamily.STATUS_COLUMN.fetch(scanner); return StreamSupport.stream(scanner.spliterator(), false) .map(e -> TStatus.valueOf(e.getValue().toString())).findFirst().orElse(TStatus.UNKNOWN); @@ -79,7 +79,7 @@ protected TStatus getTxStatus(ServerContext context, long txid) { } } - private static Range getRow(long tid) { - return new Range("tx_" + FastFormat.toHexString(tid)); + private static Range getRow(FateId fateId) { + return new Range("tx_" + fateId.getHexTid()); } } diff --git a/test/src/main/java/org/apache/accumulo/test/fate/accumulo/AccumuloStoreReadWriteIT.java b/test/src/main/java/org/apache/accumulo/test/fate/accumulo/AccumuloStoreReadWriteIT.java index 0d36d30a95a..2953245e039 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/accumulo/AccumuloStoreReadWriteIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/accumulo/AccumuloStoreReadWriteIT.java @@ -36,6 +36,7 @@ import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.clientImpl.ClientContext; import org.apache.accumulo.core.fate.Fate.TxInfo; +import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.FateStore.FateTxStore; import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; import org.apache.accumulo.core.fate.ReadOnlyRepo; @@ -77,8 +78,8 @@ public void testReadWrite() throws Exception { assertEquals(0, store.list().count()); // Create a new transaction and get the store for it - long tid = store.create(); - FateTxStore txStore = store.reserve(tid); + FateId fateId = store.create(); + FateTxStore txStore = store.reserve(fateId); assertTrue(txStore.timeCreated() > 0); assertEquals(1, store.list().count()); @@ -140,11 +141,11 @@ public void testDeferredOverflow() throws Exception { assertFalse(store.isDeferredOverflow()); // Store 10 transactions that are all deferred - final Set transactions = new HashSet<>(); + final Set transactions = new HashSet<>(); for (int i = 0; i < 10; i++) { - long tid = store.create(); - transactions.add(tid); - FateTxStore txStore = store.reserve(tid); + FateId fateId = store.create(); + transactions.add(fateId); + FateTxStore txStore = store.reserve(fateId); txStore.setStatus(TStatus.SUBMITTED); assertTrue(txStore.timeCreated() > 0); txStore.unreserve(10, TimeUnit.SECONDS); @@ -173,9 +174,9 @@ public void testDeferredOverflow() throws Exception { // Store one more that should go over the max deferred of 10 // and should clear the map and set the overflow flag - long tid = store.create(); - transactions.add(tid); - FateTxStore txStore = store.reserve(tid); + FateId fateId = store.create(); + transactions.add(fateId); + FateTxStore txStore = store.reserve(fateId); txStore.setStatus(TStatus.SUBMITTED); txStore.unreserve(30, TimeUnit.SECONDS); diff --git a/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/ZookeeperFateIT.java b/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/ZookeeperFateIT.java index 175785270d1..2508f7a7e6c 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/ZookeeperFateIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/ZookeeperFateIT.java @@ -28,6 +28,7 @@ import java.util.UUID; import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; import org.apache.accumulo.core.fate.ZooStore; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; @@ -81,9 +82,9 @@ protected void executeTest(FateTestExecutor testMethod, int maxDeferred) throws } @Override - protected TStatus getTxStatus(ServerContext sctx, long txid) { + protected TStatus getTxStatus(ServerContext sctx, FateId fateId) { try { - return getTxStatus(sctx.getZooReaderWriter(), txid); + return getTxStatus(sctx.getZooReaderWriter(), fateId); } catch (KeeperException | InterruptedException e) { throw new IllegalStateException(e); } @@ -93,10 +94,10 @@ protected TStatus getTxStatus(ServerContext sctx, long txid) { * Get the status of the TX from ZK directly. Unable to call ZooStore.getStatus because this test * thread does not have the reservation (the FaTE thread does) */ - private static TStatus getTxStatus(ZooReaderWriter zrw, long txid) + private static TStatus getTxStatus(ZooReaderWriter zrw, FateId fateId) throws KeeperException, InterruptedException { zrw.sync(ZK_ROOT); - String txdir = String.format("%s%s/tx_%016x", ZK_ROOT, Constants.ZFATE, txid); + String txdir = String.format("%s%s/tx_%s", ZK_ROOT, Constants.ZFATE, fateId.getHexTid()); try { return TStatus.valueOf(new String(zrw.getData(txdir), UTF_8)); } catch (KeeperException.NoNodeException e) { From 2e7cf34128c2e2d3a90e985abc89e4917380c9e3 Mon Sep 17 00:00:00 2001 From: Kevin Rathbun Date: Tue, 30 Jan 2024 14:54:32 -0500 Subject: [PATCH 6/8] Fixed Test Failures --- core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java | 3 ++- .../org/apache/accumulo/core/fate/accumulo/AccumuloStore.java | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java b/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java index e4d1ad4a79d..99c47a5624d 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java @@ -311,7 +311,8 @@ protected FateTxStore newFateTxStore(FateId fateId, boolean isReserved) { protected Stream getTransactions() { try { return zk.getChildren(path).stream().map(strTxid -> { - FateId fateId = FateId.from(fateInstanceType, strTxid); + String hexTid = strTxid.split("_")[1]; + FateId fateId = FateId.from(fateInstanceType, hexTid); // Memoizing for two reasons. First the status may never be requested, so in that case avoid // the lookup. Second, if its requested multiple times the result will always be consistent. Supplier statusSupplier = Suppliers.memoize(() -> _getStatus(fateId)); diff --git a/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java index f17d41278da..3c9a1c95a6b 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java @@ -90,7 +90,8 @@ protected Stream getTransactions() { scanner.setRange(new Range()); TxColumnFamily.STATUS_COLUMN.fetch(scanner); return scanner.stream().onClose(scanner::close).map(e -> { - FateId fateId = FateId.from(fateInstanceType, e.getKey().getRow().toString()); + String hexTid = e.getKey().getRow().toString().split("_")[1]; + FateId fateId = FateId.from(fateInstanceType, hexTid); return new FateIdStatusBase(fateId) { @Override public TStatus getStatus() { From 9e4710c611602ed5cb019d31aba3ab9ad61f8ade Mon Sep 17 00:00:00 2001 From: Kevin Rathbun Date: Wed, 31 Jan 2024 11:17:35 -0500 Subject: [PATCH 7/8] Added requested changes: - Inline statement in Fate - Changed comment about deferral in AdminUtil --- .../java/org/apache/accumulo/core/fate/AdminUtil.java | 10 +++++----- .../main/java/org/apache/accumulo/core/fate/Fate.java | 5 ++--- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java b/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java index d9be4bd4b31..2a436b34445 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java @@ -376,14 +376,14 @@ private FateStatus getTransactionStatus(Map hlocks = heldLocks.remove(fateId.getTid()); if (hlocks == null) { hlocks = Collections.emptyList(); } - // TODO DEFERRED - ISSUE 4044 + // ELASTICITY_TODO DEFERRED - ISSUE 4044 List wlocks = waitingLocks.remove(fateId.getTid()); if (wlocks == null) { @@ -400,7 +400,7 @@ private FateStatus getTransactionStatus(Map zs, ZooReaderWriter zk, ServiceLockPath p return false; } boolean state = false; - // TODO DEFERRED - ISSUE 4044 + // ELASTICITY_TODO DEFERRED - ISSUE 4044 FateId fateId = FateId.from(FateInstanceType.META, txid); FateTxStore txStore = zs.reserve(fateId); try { @@ -505,7 +505,7 @@ public boolean prepFail(FateStore zs, ZooReaderWriter zk, ServiceLockPath zLo return false; } boolean state = false; - // TODO DEFERRED - ISSUE 4044 + // ELASTICITY_TODO DEFERRED - ISSUE 4044 FateId fateId = FateId.from(FateInstanceType.META, txid); FateTxStore txStore = zs.reserve(fateId); try { diff --git a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java index 675113d62ce..938b76ef4c2 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java @@ -236,8 +236,7 @@ private void blockIfHadoopShutdown(FateId fateId, Exception e) { } private void transitionToFailed(FateTxStore txStore, Exception e) { - FateId fateId = txStore.getID(); - final String msg = "Failed to execute Repo " + fateId; + final String msg = "Failed to execute Repo " + txStore.getID(); // Certain FATE ops that throw exceptions don't need to be propagated up to the Monitor // as a warning. They're a normal, handled failure condition. if (e instanceof AcceptableException) { @@ -249,7 +248,7 @@ private void transitionToFailed(FateTxStore txStore, Exception e) { } txStore.setTransactionInfo(TxInfo.EXCEPTION, e); txStore.setStatus(FAILED_IN_PROGRESS); - log.info("Updated status for Repo with {} to FAILED_IN_PROGRESS", fateId); + log.info("Updated status for Repo with {} to FAILED_IN_PROGRESS", txStore.getID()); } private void processFailed(FateTxStore txStore, Repo op) { From 45eeaed19d2176612a3113d270268d1a0aaafc5b Mon Sep 17 00:00:00 2001 From: Kevin Rathbun Date: Wed, 31 Jan 2024 13:48:03 -0500 Subject: [PATCH 8/8] Trivial change to AccumuloStore.create() --- .../org/apache/accumulo/core/fate/accumulo/AccumuloStore.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java index c211d9235e9..9ae596bb83e 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java @@ -81,14 +81,14 @@ public AccumuloStore(ClientContext context) { @Override public FateId create() { final int maxAttempts = 5; - FateId fateId = FateId.from(fateInstanceType, 0L); for (int attempt = 0; attempt < maxAttempts; attempt++) { + FateId fateId = getFateId(); + if (attempt >= 1) { log.debug("Failed to create new id: {}, trying again", fateId); UtilWaitThread.sleep(100); } - fateId = getFateId(); var status = newMutator(fateId).requireStatus().putStatus(TStatus.NEW) .putCreateTime(System.currentTimeMillis()).tryMutate();