Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 28 additions & 12 deletions core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,11 @@ public static class TransactionStatus {
private final String top;
private final long timeCreated;
private final LockRange lockRange;
private final String details;

private TransactionStatus(FateId fateId, FateInstanceType instanceType, TStatus status,
Fate.FateOperation fateOp, List<String> hlocks, List<String> wlocks, String top,
Long timeCreated, LockRange lockRange) {
Long timeCreated, LockRange lockRange, String details) {

this.fateId = fateId;
this.instanceType = instanceType;
Expand All @@ -88,7 +89,7 @@ private TransactionStatus(FateId fateId, FateInstanceType instanceType, TStatus
this.top = top;
this.timeCreated = timeCreated;
this.lockRange = lockRange;

this.details = details;
}

/**
Expand Down Expand Up @@ -153,6 +154,13 @@ public long getTimeCreated() {
public LockRange getLockRange() {
return lockRange;
}

/**
* @return details of transaction, may contain protected information
*/
public String getDetails() {
return details;
}
}

public static class FateStatus {
Expand Down Expand Up @@ -214,7 +222,7 @@ public List<TransactionStatus> getTransactionStatus(

FateStatus status = getTransactionStatus(readOnlyFateStores, fateIdFilter, statusFilter,
typesFilter, Collections.<FateId,List<String>>emptyMap(),
Collections.<FateId,List<String>>emptyMap(), Map.of());
Collections.<FateId,List<String>>emptyMap(), Map.of(), false);

return status.getTransactions();
}
Expand Down Expand Up @@ -243,20 +251,20 @@ public FateStatus getStatus(ReadOnlyFateStore<T> readOnlyMFS, ZooSession zk,
findLocks(zk, lockPath, heldLocks, waitingLocks, lockRanges);

return getTransactionStatus(Map.of(FateInstanceType.META, readOnlyMFS), fateIdFilter,
statusFilter, typesFilter, heldLocks, waitingLocks, lockRanges);
statusFilter, typesFilter, heldLocks, waitingLocks, lockRanges, false);
}

public FateStatus getStatus(ReadOnlyFateStore<T> readOnlyUFS, Set<FateId> fateIdFilter,
EnumSet<TStatus> statusFilter, EnumSet<FateInstanceType> typesFilter)
throws KeeperException, InterruptedException {

return getTransactionStatus(Map.of(FateInstanceType.USER, readOnlyUFS), fateIdFilter,
statusFilter, typesFilter, new HashMap<>(), new HashMap<>(), Map.of());
statusFilter, typesFilter, new HashMap<>(), new HashMap<>(), Map.of(), false);
}

public FateStatus getStatus(Map<FateInstanceType,ReadOnlyFateStore<T>> readOnlyFateStores,
ZooSession zk, ServiceLockPath lockPath, Set<FateId> fateIdFilter,
EnumSet<TStatus> statusFilter, EnumSet<FateInstanceType> typesFilter)
EnumSet<TStatus> statusFilter, EnumSet<FateInstanceType> typesFilter, boolean includeDetails)
throws KeeperException, InterruptedException {
Map<FateId,List<String>> heldLocks = new HashMap<>();
Map<FateId,List<String>> waitingLocks = new HashMap<>();
Expand All @@ -265,7 +273,7 @@ public FateStatus getStatus(Map<FateInstanceType,ReadOnlyFateStore<T>> readOnlyF
findLocks(zk, lockPath, heldLocks, waitingLocks, lockRanges);

return getTransactionStatus(readOnlyFateStores, fateIdFilter, statusFilter, typesFilter,
heldLocks, waitingLocks, lockRanges);
heldLocks, waitingLocks, lockRanges, includeDetails);
}

/**
Expand Down Expand Up @@ -363,7 +371,7 @@ public static <T> FateStatus getTransactionStatus(
Map<FateInstanceType,ReadOnlyFateStore<T>> readOnlyFateStores, Set<FateId> fateIdFilter,
EnumSet<TStatus> statusFilter, EnumSet<FateInstanceType> typesFilter,
Map<FateId,List<String>> heldLocks, Map<FateId,List<String>> waitingLocks,
Map<FateId,LockRange> lockRanges) {
Map<FateId,LockRange> lockRanges, boolean includeDetails) {
final List<TransactionStatus> statuses = new ArrayList<>();

readOnlyFateStores.forEach((type, store) -> {
Expand All @@ -388,9 +396,13 @@ public static <T> FateStatus getTransactionStatus(
}

String top = null;
String details = null;
ReadOnlyRepo<T> repo = txStore.top();
if (repo != null) {
top = repo.getName();
if (includeDetails) {
details = repo.getDetails();
}
}

TStatus status = txStore.getStatus();
Expand All @@ -400,7 +412,7 @@ public static <T> FateStatus getTransactionStatus(
if (includeByStatus(status, statusFilter) && includeByFateId(fateId, fateIdFilter)
&& includeByInstanceType(fateId.getType(), typesFilter)) {
statuses.add(new TransactionStatus(fateId, type, status, fateOp, hlocks, wlocks, top,
timeCreated, lockRanges.getOrDefault(fateId, LockRange.infinite())));
timeCreated, lockRanges.getOrDefault(fateId, LockRange.infinite()), details));
}
});
}
Expand All @@ -423,17 +435,21 @@ private static boolean includeByInstanceType(FateInstanceType type,

public void print(Map<FateInstanceType,ReadOnlyFateStore<T>> readOnlyFateStores, ZooSession zk,
ServiceLockPath tableLocksPath, Formatter fmt, Set<FateId> fateIdFilter,
EnumSet<TStatus> statusFilter, EnumSet<FateInstanceType> typesFilter)
EnumSet<TStatus> statusFilter, EnumSet<FateInstanceType> typesFilter, boolean includeDetails)
throws KeeperException, InterruptedException {
FateStatus fateStatus =
getStatus(readOnlyFateStores, zk, tableLocksPath, fateIdFilter, statusFilter, typesFilter);
FateStatus fateStatus = getStatus(readOnlyFateStores, zk, tableLocksPath, fateIdFilter,
statusFilter, typesFilter, includeDetails);

for (TransactionStatus txStatus : fateStatus.getTransactions()) {
fmt.format(
"%-15s fateId: %s status: %-18s locked: %-15s locking: %-15s op: %-15s created: %s lock range: %s%n",
txStatus.getFateOp(), txStatus.getFateId(), txStatus.getStatus(), txStatus.getHeldLocks(),
txStatus.getWaitingLocks(), txStatus.getTop(), txStatus.getTimeCreatedFormatted(),
txStatus.getLockRange());
if (includeDetails) {
fmt.format("\t Transaction details: %s%n",
txStatus.getDetails() == null ? "" : txStatus.getDetails());
}
}
fmt.format(" %s transactions", fateStatus.getTransactions().size());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,12 @@ public interface ReadOnlyRepo<T> {

String getName();

/**
* Returns detailed information about the transaction. This information may include protected
* information and should only be used in server-side tools (not the Monitor).
*
* @return json details
*/
String getDetails();

}
Original file line number Diff line number Diff line change
Expand Up @@ -119,4 +119,10 @@ public static <T2> String toLogString(Repo<T2> repo) {

return repo.getClass() + " " + repo.getName();
}

@Override
public String getDetails() {
return repo.getDetails();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,10 @@ static class FateOpts extends ServerOpts {
@Parameter(names = {"-t", "--type"},
description = "<type>... Print transactions of fate instance type(s) {USER, META}")
List<String> instanceTypes = new ArrayList<>();

@Parameter(names = {"-i", "--info"},
description = "Includes detailed transaction information when printing")
boolean printDetails;
}

private final CountDownLatch lockAcquiredLatch = new CountDownLatch(1);
Expand Down Expand Up @@ -245,7 +249,7 @@ public void execute(JCommander cl, FateOpts options) throws Exception {
getCmdLineInstanceTypeFilters(options.instanceTypes);
readOnlyFateStores = createReadOnlyFateStores(context, zk);
admin.print(readOnlyFateStores, zk, zTableLocksPath, new Formatter(System.out),
fateIdFilter, statusFilter, typesFilter);
fateIdFilter, statusFilter, typesFilter, options.printDetails);
// print line break at the end
System.out.println();
}
Expand Down Expand Up @@ -354,7 +358,7 @@ private void summarizeFateTx(ServerContext context, FateOpts cmd, AdminUtil<Fate
NamespaceNotFoundException {

var zk = context.getZooSession();
var transactions = admin.getStatus(fateStores, zk, tableLocksPath, null, null, null);
var transactions = admin.getStatus(fateStores, zk, tableLocksPath, null, null, null, false);

// build id map - relies on unique ids for tables and namespaces
// used to look up the names of either table or namespace by id.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ private static boolean checkTableLocks(ServerContext context) throws Exception {
if (locksExist) {
final var fateStatus =
admin.getStatus(Map.of(FateInstanceType.META, mfs, FateInstanceType.USER, ufs), zk,
zTableLocksPath, null, null, null);
zTableLocksPath, null, null, null, false);
if (!fateStatus.getDanglingHeldLocks().isEmpty()
|| !fateStatus.getDanglingWaitingLocks().isEmpty()) {
status &= false;
Expand Down
4 changes: 4 additions & 0 deletions server/manager/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.OPID;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SELECTED;
import static org.apache.accumulo.core.util.LazySingletons.GSON;

import java.time.Duration;
import java.util.HashSet;
Expand Down Expand Up @@ -55,6 +56,8 @@
import org.slf4j.LoggerFactory;

import com.google.common.base.Preconditions;
import com.google.gson.Gson;
import com.google.gson.JsonObject;

public class CommitCompaction extends AbstractFateOperation {
private static final long serialVersionUID = 1L;
Expand Down Expand Up @@ -280,4 +283,13 @@ public static boolean canCommitCompaction(ExternalCompactionId ecid,

return true;
}

@Override
public String getDetails() {
Gson gson = GSON.get();
JsonObject details = gson.toJsonTree(commitData).getAsJsonObject();
details.addProperty("NewDataFile", newDatafile);
return gson.toJson(details);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.accumulo.manager.compaction.coordinator.commit;

import java.io.Serializable;
import java.lang.reflect.Type;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;
Expand All @@ -31,8 +32,39 @@
import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
import org.apache.accumulo.core.spi.compaction.CompactionKind;
import org.apache.accumulo.core.tabletserver.thrift.TCompactionStats;
import org.apache.accumulo.manager.compaction.coordinator.commit.CompactionCommitData.CompactionCommitDataSerializer;

import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonSerializationContext;
import com.google.gson.JsonSerializer;
import com.google.gson.annotations.JsonAdapter;

@JsonAdapter(CompactionCommitDataSerializer.class)
public class CompactionCommitData implements Serializable {

public static class CompactionCommitDataSerializer
implements JsonSerializer<CompactionCommitData> {

@Override
public JsonElement serialize(CompactionCommitData src, Type typeOfSrc,
JsonSerializationContext context) {
JsonObject obj = new JsonObject();
obj.addProperty("kind", src.kind.name());
obj.addProperty("ecid", src.ecid);
obj.addProperty("extent", src.textent.toString());
obj.addProperty("outputTmpPath", src.outputTmpPath);
obj.addProperty("entriesRead", src.stats.entriesRead);
obj.addProperty("entriesWritten", src.stats.entriesWritten);
obj.addProperty("fileSize", src.stats.fileSize);
JsonArray arr = new JsonArray();
src.inputPaths.forEach(arr::add);
obj.add("inputs", arr);
return obj;
}
}

private static final long serialVersionUID = 1L;
final CompactionKind kind;
final HashSet<String> inputPaths; // type must be serializable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,16 @@
*/
package org.apache.accumulo.manager.compaction.coordinator.commit;

import static org.apache.accumulo.core.util.LazySingletons.GSON;

import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.manager.tableOps.AbstractFateOperation;
import org.apache.accumulo.manager.tableOps.FateEnv;

import com.google.gson.Gson;
import com.google.gson.JsonObject;

public class PutGcCandidates extends AbstractFateOperation {
private static final long serialVersionUID = 1L;
private final CompactionCommitData commitData;
Expand All @@ -47,4 +52,13 @@ public Repo<FateEnv> call(FateId fateId, FateEnv env) throws Exception {
// refresh as part of this compaction commit as it may run sooner.
return new RefreshTablet(commitData.textent, refreshLocation);
}

@Override
public String getDetails() {
Gson gson = GSON.get();
JsonObject details = gson.toJsonTree(commitData).getAsJsonObject();
details.addProperty("refreshLocation", refreshLocation);
return gson.toJson(details);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.accumulo.manager.compaction.coordinator.commit;

import static org.apache.accumulo.core.util.LazySingletons.GSON;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
Expand All @@ -33,6 +35,7 @@
import org.apache.accumulo.manager.tableOps.bulkVer2.TabletRefresher;

import com.google.common.util.concurrent.MoreExecutors;
import com.google.gson.JsonObject;

public class RefreshTablet extends AbstractFateOperation {
private static final long serialVersionUID = 1L;
Expand Down Expand Up @@ -62,4 +65,13 @@ public Repo<FateEnv> call(FateId fateId, FateEnv env) throws Exception {

return null;
}

@Override
public String getDetails() {
JsonObject details = new JsonObject();
details.addProperty("extent", extent.toString());
details.addProperty("TServerInstance", tserverInstance);
return GSON.get().toJson(details);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.accumulo.manager.compaction.coordinator.commit;

import static org.apache.accumulo.core.util.LazySingletons.GSON;

import java.io.IOException;

import org.apache.accumulo.core.dataImpl.KeyExtent;
Expand Down Expand Up @@ -96,4 +98,10 @@ public Repo<FateEnv> call(FateId fateId, FateEnv env) throws Exception {
return new CommitCompaction(commitData,
newDatafile == null ? null : newDatafile.getNormalizedPathStr());
}

@Override
public String getDetails() {
return GSON.get().toJson(commitData);
}

}
Loading