Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.LongConsumer;
import java.util.function.Consumer;
import java.util.stream.Stream;

import org.apache.accumulo.core.fate.Fate.TxInfo;
Expand All @@ -55,8 +55,8 @@ public abstract class AbstractFateStore<T> implements FateStore<T> {
// all existing transactions are processed immediately again
protected static final int DEFAULT_MAX_DEFERRED = 100_000;

protected final Set<Long> reserved;
protected final Map<Long,Long> deferred;
protected final Set<FateId> reserved;
protected final Map<FateId,Long> deferred;
private final int maxDeferred;
private final AtomicBoolean deferredOverflow = new AtomicBoolean();

Expand Down Expand Up @@ -101,26 +101,26 @@ public static Object deserialize(byte[] ser) {
}

/**
* Attempt to reserve transaction
* Attempt to reserve the fate transaction.
*
* @param tid transaction id
* @param fateId The FateId
* @return An Optional containing the FateTxStore if the transaction was successfully reserved, or
* an empty Optional if the transaction was already reserved.
*/
@Override
public Optional<FateTxStore<T>> tryReserve(long tid) {
synchronized (AbstractFateStore.this) {
if (!reserved.contains(tid)) {
return Optional.of(reserve(tid));
public Optional<FateTxStore<T>> tryReserve(FateId fateId) {
synchronized (this) {
if (!reserved.contains(fateId)) {
return Optional.of(reserve(fateId));
}
return Optional.empty();
}
}

@Override
public FateTxStore<T> reserve(long tid) {
public FateTxStore<T> reserve(FateId fateId) {
synchronized (AbstractFateStore.this) {
while (reserved.contains(tid)) {
while (reserved.contains(fateId)) {
try {
AbstractFateStore.this.wait(100);
} catch (InterruptedException e) {
Expand All @@ -129,13 +129,13 @@ public FateTxStore<T> reserve(long tid) {
}
}

reserved.add(tid);
return newFateTxStore(tid, true);
reserved.add(fateId);
return newFateTxStore(fateId, true);
}
}

@Override
public void runnable(AtomicBoolean keepWaiting, LongConsumer idConsumer) {
public void runnable(AtomicBoolean keepWaiting, Consumer<FateId> idConsumer) {

AtomicLong seen = new AtomicLong(0);

Expand All @@ -145,21 +145,21 @@ public void runnable(AtomicBoolean keepWaiting, LongConsumer idConsumer) {

try (Stream<FateIdStatus> transactions = getTransactions()) {
transactions.filter(fateIdStatus -> isRunnable(fateIdStatus.getStatus()))
.mapToLong(FateIdStatus::getTxid).filter(txid -> {
.map(FateIdStatus::getFateId).filter(fateId -> {
synchronized (AbstractFateStore.this) {
var deferredTime = deferred.get(txid);
var deferredTime = deferred.get(fateId);
if (deferredTime != null) {
if ((deferredTime - System.nanoTime()) >= 0) {
return false;
} else {
deferred.remove(txid);
deferred.remove(fateId);
}
}
return !reserved.contains(txid);
return !reserved.contains(fateId);
}
}).forEach(txid -> {
}).forEach(fateId -> {
seen.incrementAndGet();
idConsumer.accept(txid);
idConsumer.accept(fateId);
});
}

Expand Down Expand Up @@ -202,29 +202,25 @@ public Stream<FateIdStatus> list() {
}

@Override
public ReadOnlyFateTxStore<T> read(long tid) {
return newFateTxStore(tid, false);
public ReadOnlyFateTxStore<T> read(FateId fateId) {
return newFateTxStore(fateId, false);
}

protected boolean isRunnable(TStatus status) {
return status == TStatus.IN_PROGRESS || status == TStatus.FAILED_IN_PROGRESS
|| status == TStatus.SUBMITTED;
}

protected long parseTid(String txdir) {
return Long.parseLong(txdir.split("_")[1], 16);
}

public static abstract class FateIdStatusBase implements FateIdStatus {
private final long txid;
private final FateId fateId;

public FateIdStatusBase(long txid) {
this.txid = txid;
public FateIdStatusBase(FateId fateId) {
this.fateId = fateId;
}

@Override
public long getTxid() {
return txid;
public FateId getFateId() {
return fateId;
}
}

Expand All @@ -245,30 +241,30 @@ public int getDeferredCount() {

protected abstract Stream<FateIdStatus> getTransactions();

protected abstract TStatus _getStatus(long tid);
protected abstract TStatus _getStatus(FateId fateId);

protected abstract FateTxStore<T> newFateTxStore(long tid, boolean isReserved);
protected abstract FateTxStore<T> newFateTxStore(FateId fateId, boolean isReserved);

protected abstract class AbstractFateTxStoreImpl<T> implements FateTxStore<T> {
protected final long tid;
protected final FateId fateId;
protected final boolean isReserved;

protected TStatus observedStatus = null;

protected AbstractFateTxStoreImpl(long tid, boolean isReserved) {
this.tid = tid;
protected AbstractFateTxStoreImpl(FateId fateId, boolean isReserved) {
this.fateId = fateId;
this.isReserved = isReserved;
}

@Override
public TStatus waitForStatusChange(EnumSet<TStatus> expected) {
Preconditions.checkState(!isReserved,
"Attempted to wait for status change while reserved " + FateTxId.formatTid(getID()));
"Attempted to wait for status change while reserved " + fateId);
while (true) {

long countBefore = unreservedNonNewCount.getCount();

TStatus status = _getStatus(tid);
TStatus status = _getStatus(fateId);
if (expected.contains(status)) {
return status;
}
Expand All @@ -286,9 +282,8 @@ public void unreserve(long deferTime, TimeUnit timeUnit) {
}

synchronized (AbstractFateStore.this) {
if (!reserved.remove(tid)) {
throw new IllegalStateException(
"Tried to unreserve id that was not reserved " + FateTxId.formatTid(tid));
if (!reserved.remove(fateId)) {
throw new IllegalStateException("Tried to unreserve id that was not reserved " + fateId);
}

// notify any threads waiting to reserve
Expand All @@ -306,7 +301,7 @@ public void unreserve(long deferTime, TimeUnit timeUnit) {
deferredOverflow.set(true);
deferred.clear();
} else {
deferred.put(tid, System.nanoTime() + deferTime);
deferred.put(fateId, System.nanoTime() + deferTime);
}
}
}
Expand All @@ -327,9 +322,8 @@ protected void verifyReserved(boolean isWrite) {

if (isReserved) {
synchronized (AbstractFateStore.this) {
if (!reserved.contains(tid)) {
throw new IllegalStateException(
"Tried to operate on unreserved transaction " + FateTxId.formatTid(tid));
if (!reserved.contains(fateId)) {
throw new IllegalStateException("Tried to operate on unreserved transaction " + fateId);
}
}
}
Expand All @@ -338,14 +332,14 @@ protected void verifyReserved(boolean isWrite) {
@Override
public TStatus getStatus() {
verifyReserved(false);
var status = _getStatus(tid);
var status = _getStatus(fateId);
observedStatus = status;
return status;
}

@Override
public long getID() {
return tid;
public FateId getID() {
return fateId;
}

protected byte[] serializeTxInfo(Serializable so) {
Expand Down
27 changes: 17 additions & 10 deletions core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -369,20 +369,22 @@ private FateStatus getTransactionStatus(Map<FateInstanceType,ReadOnlyFateStore<T
final List<TransactionStatus> statuses = new ArrayList<>();

fateStores.forEach((type, store) -> {
try (Stream<Long> tids = store.list().map(FateIdStatus::getTxid)) {
tids.forEach(tid -> {
try (Stream<FateId> fateIds = store.list().map(FateIdStatus::getFateId)) {
fateIds.forEach(fateId -> {

ReadOnlyFateTxStore<T> txStore = store.read(tid);
ReadOnlyFateTxStore<T> txStore = store.read(fateId);

String txName = (String) txStore.getTransactionInfo(Fate.TxInfo.TX_NAME);

List<String> hlocks = heldLocks.remove(tid);
// ELASTICITY_TODO DEFERRED - ISSUE 4044
List<String> hlocks = heldLocks.remove(fateId.getTid());

if (hlocks == null) {
hlocks = Collections.emptyList();
}

List<String> wlocks = waitingLocks.remove(tid);
// ELASTICITY_TODO DEFERRED - ISSUE 4044
List<String> wlocks = waitingLocks.remove(fateId.getTid());

if (wlocks == null) {
wlocks = Collections.emptyList();
Expand All @@ -398,9 +400,10 @@ private FateStatus getTransactionStatus(Map<FateInstanceType,ReadOnlyFateStore<T

long timeCreated = txStore.timeCreated();

if (includeByStatus(status, filterStatus) && includeByTxid(tid, filterTxid)) {
statuses.add(
new TransactionStatus(tid, type, status, txName, hlocks, wlocks, top, timeCreated));
// ELASTICITY_TODO DEFERRED - ISSUE 4044
if (includeByStatus(status, filterStatus) && includeByTxid(fateId.getTid(), filterTxid)) {
statuses.add(new TransactionStatus(fateId.getTid(), type, status, txName, hlocks,
wlocks, top, timeCreated));
}
});
}
Expand Down Expand Up @@ -461,7 +464,9 @@ public boolean prepDelete(FateStore<T> zs, ZooReaderWriter zk, ServiceLockPath p
return false;
}
boolean state = false;
FateTxStore<T> txStore = zs.reserve(txid);
// ELASTICITY_TODO DEFERRED - ISSUE 4044
FateId fateId = FateId.from(FateInstanceType.META, txid);
FateTxStore<T> txStore = zs.reserve(fateId);
try {
TStatus ts = txStore.getStatus();
switch (ts) {
Expand Down Expand Up @@ -500,7 +505,9 @@ public boolean prepFail(FateStore<T> zs, ZooReaderWriter zk, ServiceLockPath zLo
return false;
}
boolean state = false;
FateTxStore<T> txStore = zs.reserve(txid);
// ELASTICITY_TODO DEFERRED - ISSUE 4044
FateId fateId = FateId.from(FateInstanceType.META, txid);
FateTxStore<T> txStore = zs.reserve(fateId);
try {
TStatus ts = txStore.getStatus();
switch (ts) {
Expand Down
Loading