Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,17 @@
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.LongConsumer;
import java.util.stream.Stream;

import org.apache.accumulo.core.fate.Fate.TxInfo;
import org.slf4j.Logger;
Expand Down Expand Up @@ -125,37 +124,34 @@ public FateTxStore<T> reserve(long tid) {
}

@Override
public Iterator<Long> runnable(AtomicBoolean keepWaiting) {
public void runnable(AtomicBoolean keepWaiting, LongConsumer idConsumer) {

while (keepWaiting.get()) {
ArrayList<Long> runnableTids = new ArrayList<>();
AtomicLong seen = new AtomicLong(0);

while (keepWaiting.get() && seen.get() == 0) {
final long beforeCount = unreservedRunnableCount.getCount();

List<String> transactions = getTransactions();
for (String txidStr : transactions) {
long txid = parseTid(txidStr);
if (isRunnable(_getStatus(txid))) {
runnableTids.add(txid);
}
}

synchronized (this) {
runnableTids.removeIf(txid -> {
var deferredTime = deferred.get(txid);
if (deferredTime != null) {
if ((deferredTime - System.nanoTime()) > 0) {
return true;
} else {
deferred.remove(txid);
}
}

return reserved.contains(txid);
});
try (Stream<FateIdStatus> transactions = getTransactions()) {
transactions.filter(fateIdStatus -> isRunnable(fateIdStatus.getStatus()))
.mapToLong(FateIdStatus::getTxid).filter(txid -> {
synchronized (AbstractFateStore.this) {
var deferredTime = deferred.get(txid);
if (deferredTime != null) {
if ((deferredTime - System.nanoTime()) >= 0) {
return false;
} else {
deferred.remove(txid);
}
}
return !reserved.contains(txid);
}
}).forEach(txid -> {
seen.incrementAndGet();
idConsumer.accept(txid);
});
}

if (runnableTids.isEmpty()) {
if (seen.get() == 0) {
if (beforeCount == unreservedRunnableCount.getCount()) {
long waitTime = 5000;
if (!deferred.isEmpty()) {
Expand All @@ -170,23 +166,13 @@ public Iterator<Long> runnable(AtomicBoolean keepWaiting) {
keepWaiting::get);
}
}
} else {
return runnableTids.iterator();
}

}

return Collections.emptyIterator();
}

@Override
public List<Long> list() {
ArrayList<Long> l = new ArrayList<>();
List<String> transactions = getTransactions();
for (String txid : transactions) {
l.add(parseTid(txid));
}
return l;
public Stream<Long> list() {
return getTransactions().map(fateIdStatus -> fateIdStatus.txid);
}

@Override
Expand All @@ -203,7 +189,21 @@ protected long parseTid(String txdir) {
return Long.parseLong(txdir.split("_")[1], 16);
}

protected abstract List<String> getTransactions();
public static abstract class FateIdStatus {
private final long txid;

public FateIdStatus(long txid) {
this.txid = txid;
}

public long getTxid() {
return txid;
}

public abstract TStatus getStatus();
}

protected abstract Stream<FateIdStatus> getTransactions();

protected abstract TStatus _getStatus(long tid);

Expand Down
54 changes: 28 additions & 26 deletions core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

import org.apache.accumulo.core.fate.FateStore.FateTxStore;
import org.apache.accumulo.core.fate.ReadOnlyFateStore.ReadOnlyFateTxStore;
Expand Down Expand Up @@ -339,44 +340,45 @@ private FateStatus getTransactionStatus(ReadOnlyFateStore<T> zs, Set<Long> filte
EnumSet<TStatus> filterStatus, Map<Long,List<String>> heldLocks,
Map<Long,List<String>> waitingLocks) {

List<Long> transactions = zs.list();
List<TransactionStatus> statuses = new ArrayList<>(transactions.size());
try (Stream<Long> tids = zs.list()) {
List<TransactionStatus> statuses = new ArrayList<>();

for (Long tid : transactions) {
tids.forEach(tid -> {

ReadOnlyFateTxStore<T> txStore = zs.read(tid);
ReadOnlyFateTxStore<T> txStore = zs.read(tid);

String txName = (String) txStore.getTransactionInfo(Fate.TxInfo.TX_NAME);
String txName = (String) txStore.getTransactionInfo(Fate.TxInfo.TX_NAME);

List<String> hlocks = heldLocks.remove(tid);
List<String> hlocks = heldLocks.remove(tid);

if (hlocks == null) {
hlocks = Collections.emptyList();
}

List<String> wlocks = waitingLocks.remove(tid);
if (hlocks == null) {
hlocks = Collections.emptyList();
}

if (wlocks == null) {
wlocks = Collections.emptyList();
}
List<String> wlocks = waitingLocks.remove(tid);

String top = null;
ReadOnlyRepo<T> repo = txStore.top();
if (repo != null) {
top = repo.getName();
}
if (wlocks == null) {
wlocks = Collections.emptyList();
}

TStatus status = txStore.getStatus();
String top = null;
ReadOnlyRepo<T> repo = txStore.top();
if (repo != null) {
top = repo.getName();
}

long timeCreated = txStore.timeCreated();
TStatus status = txStore.getStatus();

if (includeByStatus(status, filterStatus) && includeByTxid(tid, filterTxid)) {
statuses.add(new TransactionStatus(tid, status, txName, hlocks, wlocks, top, timeCreated));
}
}
long timeCreated = txStore.timeCreated();

return new FateStatus(statuses, heldLocks, waitingLocks);
if (includeByStatus(status, filterStatus) && includeByTxid(tid, filterTxid)) {
statuses
.add(new TransactionStatus(tid, status, txName, hlocks, wlocks, top, timeCreated));
}
});

return new FateStatus(statuses, heldLocks, waitingLocks);
}
}

private boolean includeByStatus(TStatus status, EnumSet<TStatus> filterStatus) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@

import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.LongConsumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -125,7 +127,9 @@ public AgeOffStore(FateStore<T> store, long ageOffTime, TimeSource timeSource) {

minTime = Long.MAX_VALUE;

List<Long> txids = store.list();
// ELASTICITY_TODO need to rework how this class works so that it does not buffer everything in
// memory.
List<Long> txids = store.list().collect(Collectors.toList());
for (Long txid : txids) {
FateTxStore<T> txStore = store.reserve(txid);
try {
Expand Down Expand Up @@ -199,12 +203,12 @@ public ReadOnlyFateTxStore<T> read(long tid) {
}

@Override
public List<Long> list() {
public Stream<Long> list() {
return store.list();
}

@Override
public Iterator<Long> runnable(AtomicBoolean keepWaiting) {
return store.runnable(keepWaiting);
public void runnable(AtomicBoolean keepWaiting, LongConsumer idConsumer) {
store.runnable(keepWaiting, idConsumer);
}
}
17 changes: 7 additions & 10 deletions core/src/main/java/org/apache/accumulo/core/fate/Fate.java
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,9 @@ private class WorkFinder implements Runnable {
public void run() {
while (keepRunning.get()) {
try {
var iter = store.runnable(keepRunning);

while (iter.hasNext() && keepRunning.get()) {
Long txid = iter.next();
try {
while (keepRunning.get()) {
store.runnable(keepRunning, txid -> {
while (keepRunning.get()) {
try {
// The reason for calling transfer instead of queueing is avoid rescanning the
// storage layer and adding the same thing over and over. For example if all threads
// were busy, the queue size was 100, and there are three runnable things in the
Expand All @@ -104,12 +101,12 @@ public void run() {
if (workQueue.tryTransfer(txid, 100, MILLISECONDS)) {
break;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException(e);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException(e);
}
}
});
} catch (Exception e) {
if (keepRunning.get()) {
log.warn("Failure while attempting to find work for fate", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ interface FateTxStore<T> extends ReadOnlyFateTxStore<T> {
* longer interact with it.
*
* @param deferTime time in millis to keep this transaction from being returned by
* {@link #runnable(java.util.concurrent.atomic.AtomicBoolean)}. Must be non-negative.
* {@link #runnable(java.util.concurrent.atomic.AtomicBoolean, java.util.function.LongConsumer)}.
* Must be non-negative.
*/
void unreserve(long deferTime, TimeUnit timeUnit);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@

import java.io.Serializable;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.LongConsumer;
import java.util.stream.Stream;

/**
* Read only access to a Transaction Store.
Expand Down Expand Up @@ -122,12 +123,13 @@ interface ReadOnlyFateTxStore<T> {
*
* @return all outstanding transactions, including those reserved by others.
*/
List<Long> list();
Stream<Long> list();

/**
* @return an iterator over fate op ids that are (IN_PROGRESS or FAILED_IN_PROGRESS) and
* unreserved. This method will block until it finds something that is runnable or until
* the keepWaiting parameter is false.
* Finds all fate ops that are (IN_PROGRESS, SUBMITTED, or FAILED_IN_PROGRESS) and unreserved. Ids
* that are found are passed to the consumer. This method will block until at least one runnable
* is found or until the keepWaiting parameter is false. It will return once all runnable ids
* found were passed to the consumer.
*/
Iterator<Long> runnable(AtomicBoolean keepWaiting);
void runnable(AtomicBoolean keepWaiting, LongConsumer idConsumer);
}
18 changes: 16 additions & 2 deletions core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.function.Supplier;
import java.util.stream.Stream;

import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy;
Expand All @@ -39,6 +41,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Suppliers;

//TODO use zoocache? - ACCUMULO-1297
//TODO handle zookeeper being down gracefully - ACCUMULO-1297

Expand Down Expand Up @@ -298,9 +302,19 @@ protected FateTxStore<T> newFateTxStore(long tid, boolean isReserved) {
}

@Override
protected List<String> getTransactions() {
protected Stream<FateIdStatus> getTransactions() {
try {
return zk.getChildren(path);
return zk.getChildren(path).stream().map(strTxid -> {
// Memoizing for two reasons. First the status may never be requested, so in that case avoid
// the lookup. Second, if its requested multiple times the result will always be consistent.
Supplier<TStatus> statusSupplier = Suppliers.memoize(() -> _getStatus(parseTid(strTxid)));
return new FateIdStatus(parseTid(strTxid)) {
@Override
public TStatus getStatus() {
return statusSupplier.get();
}
};
});
} catch (KeeperException | InterruptedException e) {
throw new IllegalStateException(e);
}
Expand Down
Loading