Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions core/src/main/java/org/apache/accumulo/core/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,11 @@ public class Constants {
public static final String ZTABLE_DELETE_MARKER = "/deleting";
public static final String ZTABLE_STATE = "/state";
public static final String ZTABLE_FLUSH_ID = "/flush-id";

// ELASTICITY_TODO delete from code and remove from ZK in upgrade
public static final String ZTABLE_COMPACT_ID = "/compact-id";

// ELASTICITY_TODO delete from code and remove from ZK in upgrade
public static final String ZTABLE_COMPACT_CANCEL_ID = "/compact-cancel-id";
public static final String ZTABLE_NAMESPACE = "/namespace";

Expand All @@ -70,6 +74,9 @@ public class Constants {

public static final String ZSSERVERS = "/sservers";

// tracks config for running compactions
public static final String ZCOMPACTIONS = "/compactions";

public static final String ZCOMPACTORS = "/compactors";

public static final String ZDEAD = "/dead";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,8 +305,23 @@ void compact(String tableName, Text start, Text end, List<IteratorSetting> itera
* </ul>
*
* <p>
* If two threads call this method concurrently for the same table and set one or more of the
* above then one thread will fail.
* Starting with Accumulo, 4.0 concurrent compactions can be initiated on a table with different
* configuration. Prior to 4.0, if this were done, then only one compaction would work and the
* others would throw an exception. When concurrent compactions with different configuration run,
* each tablet will be compacted once for each user initiated compaction in some arbitrary order.
* For example consider the following situation.
*
* <ol>
* <li>Table A has three tablets Tab1, Tab2, Tab3</li>
* <li>This method is called to initiate a compaction on Tablets Tab1 and Tab2 with iterator
* I1</li>
* <li>This method is called to initiate a compaction on Tablets Tab2 and Tab3 with iterator
* I2</li>
* <li>Tablet Tab1 will compact with iterator I1</li>
* <li>Two compactions will happen for tablet Tab2. It will either compact with iterator I1 and
* then I2 OR it will compact with iterator I2 and then I1.</li>
* <li>Tablet Tab3 will compact with iterator I2</li>
* </ol>
*
* @param tableName the table to compact
* @param config the configuration to use
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public static void encode(DataOutput dout, int magic, int version, String classN
}
}

public static interface Encoder<T> {
public interface Encoder<T> {
public void encode(DataOutput dout, T p);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,10 @@ interface TabletUpdates<T> {

T deleteExternalCompaction(ExternalCompactionId ecid);

T putCompacted(long fateTxid);

T deleteCompacted(long fateTxid);

T putHostingGoal(TabletHostingGoal goal);

T setHostingRequested();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,19 +42,19 @@ public class ExternalCompactionMetadata {
private final short priority;
private final CompactionExecutorId ceid;
private final boolean propagateDeletes;
private final Long compactionId;
private final Long fateTxId;

public ExternalCompactionMetadata(Set<StoredTabletFile> jobFiles,
ReferencedTabletFile compactTmpName, String compactorId, CompactionKind kind, short priority,
CompactionExecutorId ceid, boolean propagateDeletes, Long compactionId) {
CompactionExecutorId ceid, boolean propagateDeletes, Long fateTxId) {
this.jobFiles = Objects.requireNonNull(jobFiles);
this.compactTmpName = Objects.requireNonNull(compactTmpName);
this.compactorId = Objects.requireNonNull(compactorId);
this.kind = Objects.requireNonNull(kind);
this.priority = priority;
this.ceid = Objects.requireNonNull(ceid);
this.propagateDeletes = propagateDeletes;
this.compactionId = compactionId;
this.fateTxId = fateTxId;
}

public Set<StoredTabletFile> getJobFiles() {
Expand Down Expand Up @@ -85,8 +85,8 @@ public boolean getPropagateDeletes() {
return propagateDeletes;
}

public Long getCompactionId() {
return compactionId;
public Long getFateTxId() {
return fateTxId;
}

// ELASTICITY_TODO remove this code when removing compaction code from tserver
Expand All @@ -99,6 +99,11 @@ public boolean getInitiallySelecteAll() {
throw new UnsupportedOperationException();
}

// ELASTICITY_TODO remove this code when removing compaction code from tserver
public Long getCompactionId() {
throw new UnsupportedOperationException();
}

// This class is used to serialize and deserialize this class using GSon. Any changes to this
// class must consider persisted data.
private static class GSonData {
Expand All @@ -109,7 +114,7 @@ private static class GSonData {
String executorId;
short priority;
boolean propDels;
Long compactionId;
Long fateTxId;
}

public String toJson() {
Expand All @@ -122,7 +127,7 @@ public String toJson() {
jData.executorId = ((CompactionExecutorIdImpl) ceid).getExternalName();
jData.priority = priority;
jData.propDels = propagateDeletes;
jData.compactionId = compactionId;
jData.fateTxId = fateTxId;
return GSON.get().toJson(jData);
}

Expand All @@ -133,7 +138,7 @@ public static ExternalCompactionMetadata fromJson(String json) {
jData.inputs.stream().map(StoredTabletFile::new).collect(toSet()),
new ReferencedTabletFile(new Path(jData.tmp)), jData.compactor,
CompactionKind.valueOf(jData.kind), jData.priority,
CompactionExecutorIdImpl.externalId(jData.executorId), jData.propDels, jData.compactionId);
CompactionExecutorIdImpl.externalId(jData.executorId), jData.propDels, jData.fateTxId);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,9 +241,8 @@ public static void validateDirCol(String dirName) {
*/
public static final String FLUSH_QUAL = "flush";
public static final ColumnFQ FLUSH_COLUMN = new ColumnFQ(NAME, new Text(FLUSH_QUAL));
/**
* Holds compact IDs to enable waiting on a compaction to complete
*/

// ELASTICITY_TODO remove this from code and remove it from metadata in upgrade
public static final String COMPACT_QUAL = "compact";
public static final ColumnFQ COMPACT_COLUMN = new ColumnFQ(NAME, new Text(COMPACT_QUAL));
/**
Expand Down Expand Up @@ -378,6 +377,15 @@ public static class ExternalCompactionColumnFamily {
public static final Text NAME = new Text(STR_NAME);
}

/**
* This family is used to track which tablets were compacted by a user compaction. The column
* qualifier is expected to contain the fate transaction id that is executing the compaction.
*/
public static class CompactedColumnFamily {
public static final String STR_NAME = "compacted";
public static final Text NAME = new Text(STR_NAME);
}

public static class HostingColumnFamily {
public static final String STR_NAME = "hosting";
public static final Text NAME = new Text(STR_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.dataImpl.TabletIdImpl;
import org.apache.accumulo.core.fate.FateTxId;
import org.apache.accumulo.core.fate.zookeeper.ZooCache;
import org.apache.accumulo.core.lock.ServiceLock;
import org.apache.accumulo.core.lock.ServiceLockData;
Expand All @@ -66,6 +67,7 @@
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.BulkFileColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ChoppedColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ClonedColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CompactedColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ExternalCompactionColumnFamily;
Expand All @@ -90,6 +92,7 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSortedMap;
import com.google.common.net.HostAndPort;

Expand Down Expand Up @@ -127,6 +130,7 @@ public class TabletMetadata {
private TabletOperationId operationId;
private boolean futureAndCurrentLocationSet = false;
private boolean operationIdAndCurrentLocationSet = false;
private Set<Long> compacted;

public static TabletMetadataBuilder builder(KeyExtent extent) {
return new TabletMetadataBuilder(extent);
Expand Down Expand Up @@ -157,7 +161,8 @@ public enum ColumnType {
HOSTING_GOAL,
HOSTING_REQUESTED,
OPID,
SELECTED
SELECTED,
COMPACTED
}

public static class Location {
Expand Down Expand Up @@ -470,6 +475,11 @@ public Map<ExternalCompactionId,ExternalCompactionMetadata> getExternalCompactio
return extCompactions;
}

public Set<Long> getCompacted() {
ensureFetched(ColumnType.COMPACTED);
return compacted;
}

/**
* @return the operation id if it exist, null otherwise
* @see MetadataSchema.TabletsSection.ServerColumnFamily#OPID_COLUMN
Expand Down Expand Up @@ -502,6 +512,7 @@ public static <E extends Entry<Key,Value>> TabletMetadata convertRow(Iterator<E>
final var extCompBuilder =
ImmutableMap.<ExternalCompactionId,ExternalCompactionMetadata>builder();
final var loadedFilesBuilder = ImmutableMap.<StoredTabletFile,Long>builder();
final var compactedBuilder = ImmutableSet.<Long>builder();
ByteSequence row = null;

while (rowIter.hasNext()) {
Expand Down Expand Up @@ -597,6 +608,9 @@ public static <E extends Entry<Key,Value>> TabletMetadata convertRow(Iterator<E>
extCompBuilder.put(ExternalCompactionId.of(qual),
ExternalCompactionMetadata.fromJson(val));
break;
case CompactedColumnFamily.STR_NAME:
compactedBuilder.add(FateTxId.fromString(qual));
break;
case ChoppedColumnFamily.STR_NAME:
te.chopped = true;
break;
Expand Down Expand Up @@ -626,6 +640,7 @@ public static <E extends Entry<Key,Value>> TabletMetadata convertRow(Iterator<E>
te.scans = scansBuilder.build();
te.logs = logsBuilder.build();
te.extCompactions = extCompBuilder.build();
te.compacted = compactedBuilder.build();
if (buildKeyValueMap) {
te.keyValues = kvBuilder.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.accumulo.core.metadata.schema;

import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.CHOPPED;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.COMPACTED;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.COMPACT_ID;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.DIR;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.ECOMP;
Expand Down Expand Up @@ -219,6 +220,18 @@ public TabletMetadataBuilder deleteExternalCompaction(ExternalCompactionId ecid)
throw new UnsupportedOperationException();
}

@Override
public TabletMetadataBuilder putCompacted(long fateTxId) {
fetched.add(COMPACTED);
internalBuilder.putCompacted(fateTxId);
return this;
}

@Override
public TabletMetadataBuilder deleteCompacted(long fateTxId) {
throw new UnsupportedOperationException();
}

@Override
public TabletMetadataBuilder putHostingGoal(TabletHostingGoal goal) {
fetched.add(HOSTING_GOAL);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.accumulo.core.metadata.TServerInstance;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.BulkFileColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ChoppedColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CompactedColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ExternalCompactionColumnFamily;
Expand Down Expand Up @@ -268,6 +269,18 @@ public T deleteExternalCompaction(ExternalCompactionId ecid) {
return getThis();
}

@Override
public T putCompacted(long fateTxId) {
mutation.put(CompactedColumnFamily.STR_NAME, FateTxId.formatTid(fateTxId), "");
return getThis();
}

@Override
public T deleteCompacted(long fateTxId) {
mutation.putDelete(CompactedColumnFamily.STR_NAME, FateTxId.formatTid(fateTxId));
return getThis();
}

@Override
public T putOperation(TabletOperationId opId) {
ServerColumnFamily.OPID_COLUMN.put(mutation, new Value(opId.canonical()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.BulkFileColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ClonedColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CompactedColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ExternalCompactionColumnFamily;
Expand Down Expand Up @@ -351,6 +352,9 @@ public Options fetch(ColumnType... colsToFetch) {
case SELECTED:
qualifiers.add(SELECTED_COLUMN);
break;
case COMPACTED:
families.add(CompactedColumnFamily.NAME);
break;
default:
throw new IllegalArgumentException("Unknown col type " + colToFetch);
}
Expand Down
Loading