diff --git a/core/src/main/java/org/apache/accumulo/core/Constants.java b/core/src/main/java/org/apache/accumulo/core/Constants.java index a06c41ea5c7..69c552949f5 100644 --- a/core/src/main/java/org/apache/accumulo/core/Constants.java +++ b/core/src/main/java/org/apache/accumulo/core/Constants.java @@ -43,7 +43,11 @@ public class Constants { public static final String ZTABLE_DELETE_MARKER = "/deleting"; public static final String ZTABLE_STATE = "/state"; public static final String ZTABLE_FLUSH_ID = "/flush-id"; + + // ELASTICITY_TODO delete from code and remove from ZK in upgrade public static final String ZTABLE_COMPACT_ID = "/compact-id"; + + // ELASTICITY_TODO delete from code and remove from ZK in upgrade public static final String ZTABLE_COMPACT_CANCEL_ID = "/compact-cancel-id"; public static final String ZTABLE_NAMESPACE = "/namespace"; @@ -70,6 +74,9 @@ public class Constants { public static final String ZSSERVERS = "/sservers"; + // tracks config for running compactions + public static final String ZCOMPACTIONS = "/compactions"; + public static final String ZCOMPACTORS = "/compactors"; public static final String ZDEAD = "/dead"; diff --git a/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperations.java b/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperations.java index 0eb7504d156..6b05f527e29 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperations.java +++ b/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperations.java @@ -305,8 +305,23 @@ void compact(String tableName, Text start, Text end, List itera * * *

- * If two threads call this method concurrently for the same table and set one or more of the - * above then one thread will fail. + * Starting with Accumulo, 4.0 concurrent compactions can be initiated on a table with different + * configuration. Prior to 4.0, if this were done, then only one compaction would work and the + * others would throw an exception. When concurrent compactions with different configuration run, + * each tablet will be compacted once for each user initiated compaction in some arbitrary order. + * For example consider the following situation. + * + *

    + *
  1. Table A has three tablets Tab1, Tab2, Tab3
  2. + *
  3. This method is called to initiate a compaction on Tablets Tab1 and Tab2 with iterator + * I1
  4. + *
  5. This method is called to initiate a compaction on Tablets Tab2 and Tab3 with iterator + * I2
  6. + *
  7. Tablet Tab1 will compact with iterator I1
  8. + *
  9. Two compactions will happen for tablet Tab2. It will either compact with iterator I1 and + * then I2 OR it will compact with iterator I2 and then I1.
  10. + *
  11. Tablet Tab3 will compact with iterator I2
  12. + *
* * @param tableName the table to compact * @param config the configuration to use diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/UserCompactionUtils.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/UserCompactionUtils.java index e64ae669dc3..03cff23f6d6 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/UserCompactionUtils.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/UserCompactionUtils.java @@ -75,7 +75,7 @@ public static void encode(DataOutput dout, int magic, int version, String classN } } - public static interface Encoder { + public interface Encoder { public void encode(DataOutput dout, T p); } diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java index b54068f7a4f..bf289f82e47 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java @@ -337,6 +337,10 @@ interface TabletUpdates { T deleteExternalCompaction(ExternalCompactionId ecid); + T putCompacted(long fateTxid); + + T deleteCompacted(long fateTxid); + T putHostingGoal(TabletHostingGoal goal); T setHostingRequested(); diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/ExternalCompactionMetadata.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/ExternalCompactionMetadata.java index a8b90969afc..1fe9e2b4d00 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/ExternalCompactionMetadata.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/ExternalCompactionMetadata.java @@ -42,11 +42,11 @@ public class ExternalCompactionMetadata { private final short priority; private final CompactionExecutorId ceid; private final boolean propagateDeletes; - private final Long compactionId; + private final Long fateTxId; public ExternalCompactionMetadata(Set jobFiles, ReferencedTabletFile compactTmpName, String compactorId, CompactionKind kind, short priority, - CompactionExecutorId ceid, boolean propagateDeletes, Long compactionId) { + CompactionExecutorId ceid, boolean propagateDeletes, Long fateTxId) { this.jobFiles = Objects.requireNonNull(jobFiles); this.compactTmpName = Objects.requireNonNull(compactTmpName); this.compactorId = Objects.requireNonNull(compactorId); @@ -54,7 +54,7 @@ public ExternalCompactionMetadata(Set jobFiles, this.priority = priority; this.ceid = Objects.requireNonNull(ceid); this.propagateDeletes = propagateDeletes; - this.compactionId = compactionId; + this.fateTxId = fateTxId; } public Set getJobFiles() { @@ -85,8 +85,8 @@ public boolean getPropagateDeletes() { return propagateDeletes; } - public Long getCompactionId() { - return compactionId; + public Long getFateTxId() { + return fateTxId; } // ELASTICITY_TODO remove this code when removing compaction code from tserver @@ -99,6 +99,11 @@ public boolean getInitiallySelecteAll() { throw new UnsupportedOperationException(); } + // ELASTICITY_TODO remove this code when removing compaction code from tserver + public Long getCompactionId() { + throw new UnsupportedOperationException(); + } + // This class is used to serialize and deserialize this class using GSon. Any changes to this // class must consider persisted data. private static class GSonData { @@ -109,7 +114,7 @@ private static class GSonData { String executorId; short priority; boolean propDels; - Long compactionId; + Long fateTxId; } public String toJson() { @@ -122,7 +127,7 @@ public String toJson() { jData.executorId = ((CompactionExecutorIdImpl) ceid).getExternalName(); jData.priority = priority; jData.propDels = propagateDeletes; - jData.compactionId = compactionId; + jData.fateTxId = fateTxId; return GSON.get().toJson(jData); } @@ -133,7 +138,7 @@ public static ExternalCompactionMetadata fromJson(String json) { jData.inputs.stream().map(StoredTabletFile::new).collect(toSet()), new ReferencedTabletFile(new Path(jData.tmp)), jData.compactor, CompactionKind.valueOf(jData.kind), jData.priority, - CompactionExecutorIdImpl.externalId(jData.executorId), jData.propDels, jData.compactionId); + CompactionExecutorIdImpl.externalId(jData.executorId), jData.propDels, jData.fateTxId); } @Override diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java index 388df53c1ba..26672a167eb 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java @@ -241,9 +241,8 @@ public static void validateDirCol(String dirName) { */ public static final String FLUSH_QUAL = "flush"; public static final ColumnFQ FLUSH_COLUMN = new ColumnFQ(NAME, new Text(FLUSH_QUAL)); - /** - * Holds compact IDs to enable waiting on a compaction to complete - */ + + // ELASTICITY_TODO remove this from code and remove it from metadata in upgrade public static final String COMPACT_QUAL = "compact"; public static final ColumnFQ COMPACT_COLUMN = new ColumnFQ(NAME, new Text(COMPACT_QUAL)); /** @@ -378,6 +377,15 @@ public static class ExternalCompactionColumnFamily { public static final Text NAME = new Text(STR_NAME); } + /** + * This family is used to track which tablets were compacted by a user compaction. The column + * qualifier is expected to contain the fate transaction id that is executing the compaction. + */ + public static class CompactedColumnFamily { + public static final String STR_NAME = "compacted"; + public static final Text NAME = new Text(STR_NAME); + } + public static class HostingColumnFamily { public static final String STR_NAME = "hosting"; public static final Text NAME = new Text(STR_NAME); diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java index fae8344503b..61a0bc6b8e9 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java @@ -53,6 +53,7 @@ import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.dataImpl.TabletIdImpl; +import org.apache.accumulo.core.fate.FateTxId; import org.apache.accumulo.core.fate.zookeeper.ZooCache; import org.apache.accumulo.core.lock.ServiceLock; import org.apache.accumulo.core.lock.ServiceLockData; @@ -66,6 +67,7 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.BulkFileColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ChoppedColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ClonedColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CompactedColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ExternalCompactionColumnFamily; @@ -90,6 +92,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSortedMap; import com.google.common.net.HostAndPort; @@ -127,6 +130,7 @@ public class TabletMetadata { private TabletOperationId operationId; private boolean futureAndCurrentLocationSet = false; private boolean operationIdAndCurrentLocationSet = false; + private Set compacted; public static TabletMetadataBuilder builder(KeyExtent extent) { return new TabletMetadataBuilder(extent); @@ -157,7 +161,8 @@ public enum ColumnType { HOSTING_GOAL, HOSTING_REQUESTED, OPID, - SELECTED + SELECTED, + COMPACTED } public static class Location { @@ -470,6 +475,11 @@ public Map getExternalCompactio return extCompactions; } + public Set getCompacted() { + ensureFetched(ColumnType.COMPACTED); + return compacted; + } + /** * @return the operation id if it exist, null otherwise * @see MetadataSchema.TabletsSection.ServerColumnFamily#OPID_COLUMN @@ -502,6 +512,7 @@ public static > TabletMetadata convertRow(Iterator final var extCompBuilder = ImmutableMap.builder(); final var loadedFilesBuilder = ImmutableMap.builder(); + final var compactedBuilder = ImmutableSet.builder(); ByteSequence row = null; while (rowIter.hasNext()) { @@ -597,6 +608,9 @@ public static > TabletMetadata convertRow(Iterator extCompBuilder.put(ExternalCompactionId.of(qual), ExternalCompactionMetadata.fromJson(val)); break; + case CompactedColumnFamily.STR_NAME: + compactedBuilder.add(FateTxId.fromString(qual)); + break; case ChoppedColumnFamily.STR_NAME: te.chopped = true; break; @@ -626,6 +640,7 @@ public static > TabletMetadata convertRow(Iterator te.scans = scansBuilder.build(); te.logs = logsBuilder.build(); te.extCompactions = extCompBuilder.build(); + te.compacted = compactedBuilder.build(); if (buildKeyValueMap) { te.keyValues = kvBuilder.build(); } diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadataBuilder.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadataBuilder.java index a7f2837a39c..305b5861fd5 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadataBuilder.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadataBuilder.java @@ -19,6 +19,7 @@ package org.apache.accumulo.core.metadata.schema; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.CHOPPED; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.COMPACTED; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.COMPACT_ID; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.DIR; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.ECOMP; @@ -219,6 +220,18 @@ public TabletMetadataBuilder deleteExternalCompaction(ExternalCompactionId ecid) throw new UnsupportedOperationException(); } + @Override + public TabletMetadataBuilder putCompacted(long fateTxId) { + fetched.add(COMPACTED); + internalBuilder.putCompacted(fateTxId); + return this; + } + + @Override + public TabletMetadataBuilder deleteCompacted(long fateTxId) { + throw new UnsupportedOperationException(); + } + @Override public TabletMetadataBuilder putHostingGoal(TabletHostingGoal goal) { fetched.add(HOSTING_GOAL); diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMutatorBase.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMutatorBase.java index 45f341443a0..59696325556 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMutatorBase.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMutatorBase.java @@ -31,6 +31,7 @@ import org.apache.accumulo.core.metadata.TServerInstance; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.BulkFileColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ChoppedColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CompactedColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ExternalCompactionColumnFamily; @@ -268,6 +269,18 @@ public T deleteExternalCompaction(ExternalCompactionId ecid) { return getThis(); } + @Override + public T putCompacted(long fateTxId) { + mutation.put(CompactedColumnFamily.STR_NAME, FateTxId.formatTid(fateTxId), ""); + return getThis(); + } + + @Override + public T deleteCompacted(long fateTxId) { + mutation.putDelete(CompactedColumnFamily.STR_NAME, FateTxId.formatTid(fateTxId)); + return getThis(); + } + @Override public T putOperation(TabletOperationId opId) { ServerColumnFamily.OPID_COLUMN.put(mutation, new Value(opId.canonical())); diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java index 0cb605c3f0d..4053ab9d666 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java @@ -72,6 +72,7 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.BulkFileColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ClonedColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CompactedColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ExternalCompactionColumnFamily; @@ -351,6 +352,9 @@ public Options fetch(ColumnType... colsToFetch) { case SELECTED: qualifiers.add(SELECTED_COLUMN); break; + case COMPACTED: + families.add(CompactedColumnFamily.NAME); + break; default: throw new IllegalArgumentException("Unknown col type " + colToFetch); } diff --git a/core/src/main/thrift-gen-java/org/apache/accumulo/core/tabletserver/thrift/TExternalCompactionJob.java b/core/src/main/thrift-gen-java/org/apache/accumulo/core/tabletserver/thrift/TExternalCompactionJob.java index e440c9c2244..6a0523c51b2 100644 --- a/core/src/main/thrift-gen-java/org/apache/accumulo/core/tabletserver/thrift/TExternalCompactionJob.java +++ b/core/src/main/thrift-gen-java/org/apache/accumulo/core/tabletserver/thrift/TExternalCompactionJob.java @@ -35,7 +35,7 @@ public class TExternalCompactionJob implements org.apache.thrift.TBase overrides; // required /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ @@ -60,7 +60,7 @@ public enum _Fields implements org.apache.thrift.TFieldIdEnum { OUTPUT_FILE((short)5, "outputFile"), PROPAGATE_DELETES((short)6, "propagateDeletes"), KIND((short)7, "kind"), - USER_COMPACTION_ID((short)8, "userCompactionId"), + FATE_TX_ID((short)8, "fateTxId"), OVERRIDES((short)9, "overrides"); private static final java.util.Map byName = new java.util.HashMap(); @@ -91,8 +91,8 @@ public static _Fields findByThriftId(int fieldId) { return PROPAGATE_DELETES; case 7: // KIND return KIND; - case 8: // USER_COMPACTION_ID - return USER_COMPACTION_ID; + case 8: // FATE_TX_ID + return FATE_TX_ID; case 9: // OVERRIDES return OVERRIDES; default: @@ -139,7 +139,7 @@ public java.lang.String getFieldName() { // isset id assignments private static final int __PROPAGATEDELETES_ISSET_ID = 0; - private static final int __USERCOMPACTIONID_ISSET_ID = 1; + private static final int __FATETXID_ISSET_ID = 1; private byte __isset_bitfield = 0; public static final java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; static { @@ -159,7 +159,7 @@ public java.lang.String getFieldName() { new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.BOOL))); tmpMap.put(_Fields.KIND, new org.apache.thrift.meta_data.FieldMetaData("kind", org.apache.thrift.TFieldRequirementType.DEFAULT, new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.ENUM , "TCompactionKind"))); - tmpMap.put(_Fields.USER_COMPACTION_ID, new org.apache.thrift.meta_data.FieldMetaData("userCompactionId", org.apache.thrift.TFieldRequirementType.DEFAULT, + tmpMap.put(_Fields.FATE_TX_ID, new org.apache.thrift.meta_data.FieldMetaData("fateTxId", org.apache.thrift.TFieldRequirementType.DEFAULT, new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64))); tmpMap.put(_Fields.OVERRIDES, new org.apache.thrift.meta_data.FieldMetaData("overrides", org.apache.thrift.TFieldRequirementType.DEFAULT, new org.apache.thrift.meta_data.MapMetaData(org.apache.thrift.protocol.TType.MAP, @@ -180,7 +180,7 @@ public TExternalCompactionJob( java.lang.String outputFile, boolean propagateDeletes, TCompactionKind kind, - long userCompactionId, + long fateTxId, java.util.Map overrides) { this(); @@ -192,8 +192,8 @@ public TExternalCompactionJob( this.propagateDeletes = propagateDeletes; setPropagateDeletesIsSet(true); this.kind = kind; - this.userCompactionId = userCompactionId; - setUserCompactionIdIsSet(true); + this.fateTxId = fateTxId; + setFateTxIdIsSet(true); this.overrides = overrides; } @@ -225,7 +225,7 @@ public TExternalCompactionJob(TExternalCompactionJob other) { if (other.isSetKind()) { this.kind = other.kind; } - this.userCompactionId = other.userCompactionId; + this.fateTxId = other.fateTxId; if (other.isSetOverrides()) { java.util.Map __this__overrides = new java.util.HashMap(other.overrides); this.overrides = __this__overrides; @@ -247,8 +247,8 @@ public void clear() { setPropagateDeletesIsSet(false); this.propagateDeletes = false; this.kind = null; - setUserCompactionIdIsSet(false); - this.userCompactionId = 0; + setFateTxIdIsSet(false); + this.fateTxId = 0; this.overrides = null; } @@ -441,27 +441,27 @@ public void setKindIsSet(boolean value) { } } - public long getUserCompactionId() { - return this.userCompactionId; + public long getFateTxId() { + return this.fateTxId; } - public TExternalCompactionJob setUserCompactionId(long userCompactionId) { - this.userCompactionId = userCompactionId; - setUserCompactionIdIsSet(true); + public TExternalCompactionJob setFateTxId(long fateTxId) { + this.fateTxId = fateTxId; + setFateTxIdIsSet(true); return this; } - public void unsetUserCompactionId() { - __isset_bitfield = org.apache.thrift.EncodingUtils.clearBit(__isset_bitfield, __USERCOMPACTIONID_ISSET_ID); + public void unsetFateTxId() { + __isset_bitfield = org.apache.thrift.EncodingUtils.clearBit(__isset_bitfield, __FATETXID_ISSET_ID); } - /** Returns true if field userCompactionId is set (has been assigned a value) and false otherwise */ - public boolean isSetUserCompactionId() { - return org.apache.thrift.EncodingUtils.testBit(__isset_bitfield, __USERCOMPACTIONID_ISSET_ID); + /** Returns true if field fateTxId is set (has been assigned a value) and false otherwise */ + public boolean isSetFateTxId() { + return org.apache.thrift.EncodingUtils.testBit(__isset_bitfield, __FATETXID_ISSET_ID); } - public void setUserCompactionIdIsSet(boolean value) { - __isset_bitfield = org.apache.thrift.EncodingUtils.setBit(__isset_bitfield, __USERCOMPACTIONID_ISSET_ID, value); + public void setFateTxIdIsSet(boolean value) { + __isset_bitfield = org.apache.thrift.EncodingUtils.setBit(__isset_bitfield, __FATETXID_ISSET_ID, value); } public int getOverridesSize() { @@ -559,11 +559,11 @@ public void setFieldValue(_Fields field, @org.apache.thrift.annotation.Nullable } break; - case USER_COMPACTION_ID: + case FATE_TX_ID: if (value == null) { - unsetUserCompactionId(); + unsetFateTxId(); } else { - setUserCompactionId((java.lang.Long)value); + setFateTxId((java.lang.Long)value); } break; @@ -603,8 +603,8 @@ public java.lang.Object getFieldValue(_Fields field) { case KIND: return getKind(); - case USER_COMPACTION_ID: - return getUserCompactionId(); + case FATE_TX_ID: + return getFateTxId(); case OVERRIDES: return getOverrides(); @@ -635,8 +635,8 @@ public boolean isSet(_Fields field) { return isSetPropagateDeletes(); case KIND: return isSetKind(); - case USER_COMPACTION_ID: - return isSetUserCompactionId(); + case FATE_TX_ID: + return isSetFateTxId(); case OVERRIDES: return isSetOverrides(); } @@ -719,12 +719,12 @@ public boolean equals(TExternalCompactionJob that) { return false; } - boolean this_present_userCompactionId = true; - boolean that_present_userCompactionId = true; - if (this_present_userCompactionId || that_present_userCompactionId) { - if (!(this_present_userCompactionId && that_present_userCompactionId)) + boolean this_present_fateTxId = true; + boolean that_present_fateTxId = true; + if (this_present_fateTxId || that_present_fateTxId) { + if (!(this_present_fateTxId && that_present_fateTxId)) return false; - if (this.userCompactionId != that.userCompactionId) + if (this.fateTxId != that.fateTxId) return false; } @@ -770,7 +770,7 @@ public int hashCode() { if (isSetKind()) hashCode = hashCode * 8191 + kind.getValue(); - hashCode = hashCode * 8191 + org.apache.thrift.TBaseHelper.hashCode(userCompactionId); + hashCode = hashCode * 8191 + org.apache.thrift.TBaseHelper.hashCode(fateTxId); hashCode = hashCode * 8191 + ((isSetOverrides()) ? 131071 : 524287); if (isSetOverrides()) @@ -857,12 +857,12 @@ public int compareTo(TExternalCompactionJob other) { return lastComparison; } } - lastComparison = java.lang.Boolean.compare(isSetUserCompactionId(), other.isSetUserCompactionId()); + lastComparison = java.lang.Boolean.compare(isSetFateTxId(), other.isSetFateTxId()); if (lastComparison != 0) { return lastComparison; } - if (isSetUserCompactionId()) { - lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.userCompactionId, other.userCompactionId); + if (isSetFateTxId()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.fateTxId, other.fateTxId); if (lastComparison != 0) { return lastComparison; } @@ -953,8 +953,8 @@ public java.lang.String toString() { } first = false; if (!first) sb.append(", "); - sb.append("userCompactionId:"); - sb.append(this.userCompactionId); + sb.append("fateTxId:"); + sb.append(this.fateTxId); first = false; if (!first) sb.append(", "); sb.append("overrides:"); @@ -1086,10 +1086,10 @@ public void read(org.apache.thrift.protocol.TProtocol iprot, TExternalCompaction org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; - case 8: // USER_COMPACTION_ID + case 8: // FATE_TX_ID if (schemeField.type == org.apache.thrift.protocol.TType.I64) { - struct.userCompactionId = iprot.readI64(); - struct.setUserCompactionIdIsSet(true); + struct.fateTxId = iprot.readI64(); + struct.setFateTxIdIsSet(true); } else { org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } @@ -1170,8 +1170,8 @@ public void write(org.apache.thrift.protocol.TProtocol oprot, TExternalCompactio oprot.writeI32(struct.kind.getValue()); oprot.writeFieldEnd(); } - oprot.writeFieldBegin(USER_COMPACTION_ID_FIELD_DESC); - oprot.writeI64(struct.userCompactionId); + oprot.writeFieldBegin(FATE_TX_ID_FIELD_DESC); + oprot.writeI64(struct.fateTxId); oprot.writeFieldEnd(); if (struct.overrides != null) { oprot.writeFieldBegin(OVERRIDES_FIELD_DESC); @@ -1226,7 +1226,7 @@ public void write(org.apache.thrift.protocol.TProtocol prot, TExternalCompaction if (struct.isSetKind()) { optionals.set(6); } - if (struct.isSetUserCompactionId()) { + if (struct.isSetFateTxId()) { optionals.set(7); } if (struct.isSetOverrides()) { @@ -1260,8 +1260,8 @@ public void write(org.apache.thrift.protocol.TProtocol prot, TExternalCompaction if (struct.isSetKind()) { oprot.writeI32(struct.kind.getValue()); } - if (struct.isSetUserCompactionId()) { - oprot.writeI64(struct.userCompactionId); + if (struct.isSetFateTxId()) { + oprot.writeI64(struct.fateTxId); } if (struct.isSetOverrides()) { { @@ -1320,8 +1320,8 @@ public void read(org.apache.thrift.protocol.TProtocol prot, TExternalCompactionJ struct.setKindIsSet(true); } if (incoming.get(7)) { - struct.userCompactionId = iprot.readI64(); - struct.setUserCompactionIdIsSet(true); + struct.fateTxId = iprot.readI64(); + struct.setFateTxIdIsSet(true); } if (incoming.get(8)) { { diff --git a/core/src/main/thrift/tabletserver.thrift b/core/src/main/thrift/tabletserver.thrift index 87c0e19acc8..43ba9574857 100644 --- a/core/src/main/thrift/tabletserver.thrift +++ b/core/src/main/thrift/tabletserver.thrift @@ -111,7 +111,7 @@ struct TExternalCompactionJob { 5:string outputFile 6:bool propagateDeletes 7:TCompactionKind kind - 8:i64 userCompactionId + 8:i64 fateTxId 9:map overrides } diff --git a/core/src/test/java/org/apache/accumulo/core/metadata/schema/TabletMetadataTest.java b/core/src/test/java/org/apache/accumulo/core/metadata/schema/TabletMetadataTest.java index 589c0385856..117440910c0 100644 --- a/core/src/test/java/org/apache/accumulo/core/metadata/schema/TabletMetadataTest.java +++ b/core/src/test/java/org/apache/accumulo/core/metadata/schema/TabletMetadataTest.java @@ -334,7 +334,8 @@ public void testBuilder() { TabletMetadata tm = TabletMetadata.builder(extent).putHostingGoal(TabletHostingGoal.NEVER) .putLocation(Location.future(ser1)).putFile(sf1, dfv1).putFile(sf2, dfv2) .putCompactionId(23).putBulkFile(rf1, 25).putBulkFile(rf2, 35).putFlushId(27) - .putDirName("dir1").putScan(sf3).putScan(sf4).build(ECOMP, HOSTING_REQUESTED); + .putDirName("dir1").putScan(sf3).putScan(sf4).putCompacted(17).putCompacted(23) + .build(ECOMP, HOSTING_REQUESTED); assertEquals(extent, tm.getExtent()); assertEquals(TabletHostingGoal.NEVER, tm.getHostingGoal()); @@ -346,6 +347,7 @@ public void testBuilder() { assertEquals("dir1", tm.getDirName()); assertEquals(Set.of(sf3, sf4), Set.copyOf(tm.getScans())); assertEquals(Set.of(), tm.getExternalCompactions().keySet()); + assertEquals(Set.of(17L, 23L), tm.getCompacted()); assertFalse(tm.getHostingRequested()); assertThrows(IllegalStateException.class, tm::getOperationId); assertThrows(IllegalStateException.class, tm::getSuspend); @@ -369,6 +371,7 @@ public void testBuilder() { assertThrows(IllegalStateException.class, tm2::getExternalCompactions); assertThrows(IllegalStateException.class, tm2::getHostingRequested); assertThrows(IllegalStateException.class, tm2::getSelectedFiles); + assertThrows(IllegalStateException.class, tm2::getCompacted); var ecid1 = ExternalCompactionId.generate(UUID.randomUUID()); ExternalCompactionMetadata ecm = new ExternalCompactionMetadata(Set.of(sf1, sf2), rf1, "cid1", diff --git a/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionConfigStorage.java b/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionConfigStorage.java index dabfd693616..7821b163b37 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionConfigStorage.java +++ b/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionConfigStorage.java @@ -18,63 +18,98 @@ */ package org.apache.accumulo.server.compaction; -import static java.nio.charset.StandardCharsets.UTF_8; - import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.function.Predicate; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.admin.CompactionConfig; import org.apache.accumulo.core.clientImpl.UserCompactionUtils; -import org.apache.accumulo.core.dataImpl.KeyExtent; -import org.apache.accumulo.core.util.Pair; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.fate.zookeeper.ZooUtil; +import org.apache.accumulo.core.util.FastFormat; import org.apache.accumulo.server.ServerContext; -import org.apache.commons.codec.DecoderException; -import org.apache.commons.codec.binary.Hex; import org.apache.zookeeper.KeeperException; public class CompactionConfigStorage { - public static Pair getCompactionID(ServerContext context, KeyExtent extent) - throws KeeperException.NoNodeException { - try { - String zTablePath = Constants.ZROOT + "/" + context.getInstanceID() + Constants.ZTABLES + "/" - + extent.tableId() + Constants.ZTABLE_COMPACT_ID; - - String[] tokens = - new String(context.getZooReaderWriter().getData(zTablePath), UTF_8).split(","); - long compactID = Long.parseLong(tokens[0]); - CompactionConfig overlappingConfig = null; - - if (tokens.length > 1) { - Hex hex = new Hex(); - ByteArrayInputStream bais = - new ByteArrayInputStream(hex.decode(tokens[1].split("=")[1].getBytes(UTF_8))); - DataInputStream dis = new DataInputStream(bais); + private static String createPath(ServerContext context, long fateTxId) { + String txidString = FastFormat.toHexString(fateTxId); + return context.getZooKeeperRoot() + Constants.ZCOMPACTIONS + "/" + txidString; + } - var compactionConfig = UserCompactionUtils.decodeCompactionConfig(dis); + public static byte[] encodeConfig(CompactionConfig config, TableId tableId) { + try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(baos)) { + dos.writeUTF(tableId.canonical()); + UserCompactionUtils.encode(dos, config); + dos.close(); + return baos.toByteArray(); + } catch (IOException ioe) { + throw new UncheckedIOException(ioe); + } + } - KeyExtent ke = new KeyExtent(extent.tableId(), compactionConfig.getEndRow(), - compactionConfig.getStartRow()); + public static CompactionConfig getConfig(ServerContext context, long fateTxId) + throws InterruptedException, KeeperException { + return getConfig(context, fateTxId, tableId -> true); + } - if (ke.overlaps(extent)) { - overlappingConfig = compactionConfig; + public static CompactionConfig getConfig(ServerContext context, long fateTxId, + Predicate tableIdPredicate) throws InterruptedException, KeeperException { + try { + byte[] data = context.getZooReaderWriter().getData(createPath(context, fateTxId)); + try (ByteArrayInputStream bais = new ByteArrayInputStream(data); + DataInputStream dis = new DataInputStream(bais)) { + var tableId = TableId.of(dis.readUTF()); + if (tableIdPredicate.test(tableId)) { + return UserCompactionUtils.decodeCompactionConfig(dis); + } else { + return null; } + } catch (IOException ioe) { + throw new UncheckedIOException(ioe); } - if (overlappingConfig == null) { - overlappingConfig = new CompactionConfig(); // no config present, set to default - } + } catch (KeeperException.NoNodeException e) { + return null; + } + } + + public static void setConfig(ServerContext context, long fateTxId, byte[] encConfig) + throws InterruptedException, KeeperException { + context.getZooReaderWriter().putPrivatePersistentData(createPath(context, fateTxId), encConfig, + ZooUtil.NodeExistsPolicy.SKIP); + } + + public static void deleteConfig(ServerContext context, long fateTxId) + throws InterruptedException, KeeperException { + context.getZooReaderWriter().delete(createPath(context, fateTxId)); + } + + public static Map getAllConfig(ServerContext context, + Predicate tableIdPredicate) throws InterruptedException, KeeperException { + + Map configs = new HashMap<>(); - return new Pair<>(compactID, overlappingConfig); - } catch (InterruptedException | DecoderException | NumberFormatException e) { - throw new RuntimeException("Exception on " + extent + " getting compaction ID", e); - } catch (KeeperException ke) { - if (ke instanceof KeeperException.NoNodeException) { - throw (KeeperException.NoNodeException) ke; - } else { - throw new RuntimeException("Exception on " + extent + " getting compaction ID", ke); + var children = context.getZooReaderWriter() + .getChildren(context.getZooKeeperRoot() + Constants.ZCOMPACTIONS); + for (var child : children) { + var fateTxid = Long.parseLong(child, 16); + var cconf = getConfig(context, fateTxid, tableIdPredicate); + if (cconf != null) { + configs.put(fateTxid, cconf); } } + + return Collections.unmodifiableMap(configs); } + } diff --git a/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionJobGenerator.java b/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionJobGenerator.java index e30cb12c73d..bb35ba8c5e4 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionJobGenerator.java +++ b/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionJobGenerator.java @@ -20,6 +20,7 @@ import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -56,14 +57,25 @@ public class CompactionJobGenerator { private final Cache dispatchers; private final Set serviceIds; private final PluginEnvironment env; + private final Map> allExecutionHints; - public CompactionJobGenerator(PluginEnvironment env) { + public CompactionJobGenerator(PluginEnvironment env, + Map> executionHints) { servicesConfig = new CompactionServicesConfig(env.getConfiguration()); serviceIds = servicesConfig.getPlanners().keySet().stream().map(CompactionServiceId::of) .collect(Collectors.toUnmodifiableSet()); dispatchers = Caffeine.newBuilder().maximumSize(10).build(); this.env = env; + if (executionHints.isEmpty()) { + this.allExecutionHints = executionHints; + } else { + this.allExecutionHints = new HashMap<>(); + // Make the maps that will be passed to plugins unmodifiable. Do this once, so it does not + // need to be done for each tablet. + executionHints.forEach((k, v) -> allExecutionHints.put(k, + v.isEmpty() ? Map.of() : Collections.unmodifiableMap(v))); + } } public Collection generateJobs(TabletMetadata tablet, Set kinds) { @@ -77,15 +89,18 @@ public Collection generateJobs(TabletMetadata tablet, Set systemJobs = Set.of(); if (kinds.contains(CompactionKind.SYSTEM)) { - CompactionServiceId serviceId = dispatch(CompactionKind.SYSTEM, tablet); - systemJobs = planCompactions(serviceId, CompactionKind.SYSTEM, tablet); + CompactionServiceId serviceId = dispatch(CompactionKind.SYSTEM, tablet, Map.of()); + systemJobs = planCompactions(serviceId, CompactionKind.SYSTEM, tablet, Map.of()); } Collection userJobs = Set.of(); if (kinds.contains(CompactionKind.USER) && tablet.getSelectedFiles() != null) { - CompactionServiceId serviceId = dispatch(CompactionKind.USER, tablet); - userJobs = planCompactions(serviceId, CompactionKind.USER, tablet); + var hints = allExecutionHints.get(tablet.getSelectedFiles().getFateTxId()); + if (hints != null) { + CompactionServiceId serviceId = dispatch(CompactionKind.USER, tablet, hints); + userJobs = planCompactions(serviceId, CompactionKind.USER, tablet, hints); + } } if (userJobs.isEmpty()) { @@ -100,7 +115,8 @@ public Collection generateJobs(TabletMetadata tablet, Set executionHints) { CompactionDispatcher dispatcher = dispatchers.get(tablet.getTableId(), tableId -> CompactionPluginUtils.createDispatcher((ServiceEnvironment) env, tableId)); @@ -124,9 +140,7 @@ public CompactionKind getCompactionKind() { @Override public Map getExecutionHints() { - // ELASTICITY_TODO do for user compactions. Best to do this after per user compaction - // config storage is changed in ZK so that it can be cached. - return Map.of(); + return executionHints; } }; @@ -134,7 +148,7 @@ public Map getExecutionHints() { } private Collection planCompactions(CompactionServiceId serviceId, - CompactionKind kind, TabletMetadata tablet) { + CompactionKind kind, TabletMetadata tablet, Map executionHints) { CompactionPlanner planner = planners.computeIfAbsent(serviceId, sid -> createPlanner(tablet.getTableId(), serviceId)); @@ -232,8 +246,7 @@ public Collection getRunningCompactions() { @Override public Map getExecutionHints() { - // ELASTICITY_TODO implement for user compactions - return Map.of(); + return executionHints; } @Override diff --git a/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java b/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java index e4dd238c588..86c28ce9111 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java +++ b/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java @@ -32,6 +32,7 @@ import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.data.constraints.Constraint; import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.fate.FateTxId; import org.apache.accumulo.core.fate.zookeeper.ZooCache; import org.apache.accumulo.core.fate.zookeeper.ZooUtil; import org.apache.accumulo.core.lock.ServiceLock; @@ -41,6 +42,7 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.BulkFileColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ChoppedColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ClonedColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CompactedColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ExternalCompactionColumnFamily; @@ -101,7 +103,8 @@ public class MetadataConstraints implements Constraint { FutureLocationColumnFamily.NAME, ChoppedColumnFamily.NAME, ClonedColumnFamily.NAME, - ExternalCompactionColumnFamily.NAME); + ExternalCompactionColumnFamily.NAME, + CompactedColumnFamily.NAME); // @formatter:on private static boolean isValidColumn(ColumnUpdate cu) { @@ -216,7 +219,8 @@ public List check(Environment env, Mutation mutation) { } if (columnUpdate.getValue().length == 0 && !columnFamily.equals(ScanFileColumnFamily.NAME) - && !HostingColumnFamily.REQUESTED_COLUMN.equals(columnFamily, columnQualifier)) { + && !HostingColumnFamily.REQUESTED_COLUMN.equals(columnFamily, columnQualifier) + && !columnFamily.equals(CompactedColumnFamily.NAME)) { violations = addViolation(violations, 6); } @@ -254,6 +258,10 @@ public List check(Environment env, Mutation mutation) { } catch (RuntimeException e) { violations = addViolation(violations, 11); } + } else if (CompactedColumnFamily.NAME.equals(columnFamily)) { + if (!FateTxId.isFormatedTid(columnQualifier.toString())) { + violations = addViolation(violations, 13); + } } else if (columnFamily.equals(BulkFileColumnFamily.NAME)) { if (!columnUpdate.isDeleted() && !checkedBulk) { violations = validateDataFilePath(violations, @@ -391,6 +399,8 @@ public String getViolationDescription(short violationCode) { return "Malformed file selection value"; case 12: return "Invalid data file metadata format"; + case 13: + return "Invalid compacted column"; } return null; } diff --git a/server/base/src/main/java/org/apache/accumulo/server/init/ZooKeeperInitializer.java b/server/base/src/main/java/org/apache/accumulo/server/init/ZooKeeperInitializer.java index 44f04294a71..ecb4d59a3dc 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/init/ZooKeeperInitializer.java +++ b/server/base/src/main/java/org/apache/accumulo/server/init/ZooKeeperInitializer.java @@ -163,6 +163,8 @@ void initialize(final ServerContext context, final boolean clearInstanceName, ZooUtil.NodeExistsPolicy.FAIL); zoo.putPersistentData(zkInstanceRoot + Constants.ZSSERVERS, EMPTY_BYTE_ARRAY, ZooUtil.NodeExistsPolicy.FAIL); + zoo.putPersistentData(zkInstanceRoot + Constants.ZCOMPACTIONS, EMPTY_BYTE_ARRAY, + ZooUtil.NodeExistsPolicy.FAIL); } /** diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/state/CurrentState.java b/server/base/src/main/java/org/apache/accumulo/server/manager/state/CurrentState.java index 86b6998d299..88b26220d5d 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/state/CurrentState.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/state/CurrentState.java @@ -51,4 +51,10 @@ default Set getUnassignmentRequest() { } ManagerState getManagerState(); + + // ELASTICITIY_TODO it would be nice if this method could take DataLevel as an argument and only + // retrieve information about compactions in that data level. Attempted this and a lot of + // refactoring was needed to get that small bit of information to this method. Would be best to + // address this after issue. May be best to attempt this after #3576. + Map> getCompactionHints(); } diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java index 7f4b1e56bfb..f1b169d0066 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java @@ -18,7 +18,10 @@ */ package org.apache.accumulo.server.manager.state; +import static org.apache.accumulo.core.util.LazySingletons.GSON; + import java.io.IOException; +import java.lang.reflect.Type; import java.util.ArrayList; import java.util.Arrays; import java.util.Base64; @@ -83,6 +86,7 @@ import com.google.common.base.Joiner; import com.google.common.collect.Sets; +import com.google.gson.reflect.TypeToken; /** * Iterator used by the TabletGroupWatcher threads in the Manager. This iterator returns @@ -90,7 +94,6 @@ * Manager. */ public class TabletManagementIterator extends SkippingIterator { - private static final Logger LOG = LoggerFactory.getLogger(TabletManagementIterator.class); private static final String SERVERS_OPTION = "servers"; @@ -101,6 +104,7 @@ public class TabletManagementIterator extends SkippingIterator { private static final String SHUTTING_DOWN_OPTION = "shuttingDown"; private static final String RESOURCE_GROUPS = "resourceGroups"; private static final String TSERVER_GROUP_PREFIX = "serverGroups_"; + private static final String COMPACTION_HINTS_OPTIONS = "compactionHints"; private CompactionJobGenerator compactionGenerator; private TabletBalancer balancer; @@ -164,6 +168,11 @@ private static void setShuttingDown(final IteratorSetting cfg, } } + private static void setCompactionHints(final IteratorSetting cfg, + Map> allHints) { + cfg.addOption(COMPACTION_HINTS_OPTIONS, GSON.get().toJson(allHints)); + } + private static void setTServerResourceGroups(final IteratorSetting cfg, Map> tServerResourceGroups) { if (tServerResourceGroups == null) { @@ -257,6 +266,14 @@ private static Map parseMerges(final String merges) { return result; } + private static Map> parseCompactionHints(String json) { + if (json == null) { + return Map.of(); + } + Type tt = new TypeToken>>() {}.getType(); + return GSON.get().fromJson(json, tt); + } + private static boolean shouldReturnDueToSplit(final TabletMetadata tm, final long splitThreshold) { final long sumOfFileSizes = @@ -341,6 +358,7 @@ public static void configureScanner(final ScannerBase scanner, final CurrentStat TabletManagementIterator.setShuttingDown(tabletChange, state.shutdownServers()); TabletManagementIterator.setTServerResourceGroups(tabletChange, state.tServerResourceGroups()); + setCompactionHints(tabletChange, state.getCompactionHints()); } scanner.addScanIterator(tabletChange); } @@ -381,7 +399,8 @@ public void init(SortedKeyValueIterator source, Map op if (shuttingDown != null) { current.removeAll(shuttingDown); } - compactionGenerator = new CompactionJobGenerator(env.getPluginEnv()); + compactionGenerator = new CompactionJobGenerator(env.getPluginEnv(), + parseCompactionHints(options.get(COMPACTION_HINTS_OPTIONS))); final AccumuloConfiguration conf = new ConfigurationCopy(env.getPluginEnv().getConfiguration()); BalancerEnvironmentImpl benv = new BalancerEnvironmentImpl(((TabletIteratorEnvironment) env).getServerContext()); diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementScanner.java b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementScanner.java index bbd97fe6bf2..619189babb7 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementScanner.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementScanner.java @@ -82,6 +82,7 @@ public class TabletManagementScanner implements ClosableIterator TextUtil.getBytes(stf.getMetaUpdateDeleteText()), BulkFileColumnFamily.NAME); mutation.addCondition(c); } - + break; + case COMPACTED: { + Condition c = SetEqualityIterator.createCondition(tabletMetadata.getCompacted(), + ftid -> FateTxId.formatTid(ftid).getBytes(UTF_8), CompactedColumnFamily.NAME); + mutation.addCondition(c); + } break; default: throw new UnsupportedOperationException("Column type " + type + " is not supported."); @@ -237,5 +244,4 @@ public void submit(Ample.RejectionHandler rejectionCheck) { mutationConsumer.accept(mutation); rejectionHandlerConsumer.accept(extent, rejectionCheck); } - } diff --git a/server/base/src/test/java/org/apache/accumulo/server/constraints/MetadataConstraintsTest.java b/server/base/src/test/java/org/apache/accumulo/server/constraints/MetadataConstraintsTest.java index e51d2821785..a206bbb4a23 100644 --- a/server/base/src/test/java/org/apache/accumulo/server/constraints/MetadataConstraintsTest.java +++ b/server/base/src/test/java/org/apache/accumulo/server/constraints/MetadataConstraintsTest.java @@ -27,10 +27,12 @@ import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.fate.FateTxId; import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.metadata.StoredTabletFile; import org.apache.accumulo.core.metadata.schema.DataFileValue; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.BulkFileColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CompactedColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ScanFileColumnFamily; @@ -350,4 +352,23 @@ public void testSelectedFiles() { violations = mc.check(createEnv(), m); assertNull(violations); } + + @Test + public void testCompacted() { + MetadataConstraints mc = new MetadataConstraints(); + Mutation m; + List violations; + + m = new Mutation(new Text("0;foo")); + m.put(CompactedColumnFamily.STR_NAME, FateTxId.formatTid(45), ""); + violations = mc.check(createEnv(), m); + assertNull(violations); + + m = new Mutation(new Text("0;foo")); + m.put(CompactedColumnFamily.STR_NAME, "incorrect data", ""); + violations = mc.check(createEnv(), m); + assertNotNull(violations); + assertEquals(1, violations.size()); + assertEquals(Short.valueOf((short) 13), violations.get(0)); + } } diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java index 52ae273aaaf..332da3dbda4 100644 --- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java @@ -19,7 +19,6 @@ package org.apache.accumulo.compactor; import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; -import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.concurrent.TimeUnit.MINUTES; import static org.apache.accumulo.core.util.LazySingletons.RANDOM; @@ -63,6 +62,7 @@ import org.apache.accumulo.core.data.NamespaceId; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.fate.FateTxId; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy; import org.apache.accumulo.core.iteratorsImpl.system.SystemIteratorUtil; @@ -93,6 +93,7 @@ import org.apache.accumulo.core.util.threads.ThreadPools; import org.apache.accumulo.core.util.threads.Threads; import org.apache.accumulo.server.AbstractServer; +import org.apache.accumulo.server.compaction.CompactionConfigStorage; import org.apache.accumulo.server.compaction.CompactionInfo; import org.apache.accumulo.server.compaction.CompactionWatcher; import org.apache.accumulo.server.compaction.FileCompactor; @@ -205,25 +206,21 @@ protected void checkIfCanceled() { } if (job.getKind() == TCompactionKind.USER) { - String zTablePath = Constants.ZROOT + "/" + getContext().getInstanceID() - + Constants.ZTABLES + "/" + extent.tableId() + Constants.ZTABLE_COMPACT_CANCEL_ID; - byte[] id = getContext().getZooCache().get(zTablePath); - if (id == null) { - // table probably deleted - LOG.info("Cancelling compaction {} for table that no longer exists {}", ecid, extent); - JOB_HOLDER.cancel(job.getExternalCompactionId()); - } else { - var cancelId = Long.parseLong(new String(id, UTF_8)); - if (cancelId >= job.getUserCompactionId()) { - LOG.info("Cancelling compaction {} because user compaction was canceled", ecid); - JOB_HOLDER.cancel(job.getExternalCompactionId()); - } + var cconf = CompactionConfigStorage.getConfig(getContext(), job.getFateTxId()); + + if (cconf == null) { + LOG.info("Cancelling compaction {} for user compaction that no longer exists {} {}", + ecid, FateTxId.formatTid(job.getFateTxId()), extent); + JOB_HOLDER.cancel(job.getExternalCompactionId()); } } - } catch (RuntimeException e) { + } catch (RuntimeException | KeeperException e) { LOG.warn("Failed to check if compaction {} for {} was canceled.", job.getExternalCompactionId(), KeyExtent.fromThrift(job.getExtent()), e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); } } } diff --git a/server/manager/pom.xml b/server/manager/pom.xml index f08e01070a0..eafb953bd45 100644 --- a/server/manager/pom.xml +++ b/server/manager/pom.xml @@ -48,10 +48,6 @@ com.google.guava guava - - commons-codec - commons-codec - commons-io commons-io diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index 11524752e8b..eb56144d748 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -59,6 +59,7 @@ import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.admin.CompactionConfig; import org.apache.accumulo.core.clientImpl.thrift.TableOperation; import org.apache.accumulo.core.clientImpl.thrift.TableOperationExceptionType; import org.apache.accumulo.core.clientImpl.thrift.ThriftTableOperationException; @@ -127,6 +128,7 @@ import org.apache.accumulo.server.AbstractServer; import org.apache.accumulo.server.HighlyAvailableService; import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.server.compaction.CompactionConfigStorage; import org.apache.accumulo.server.fs.VolumeManager; import org.apache.accumulo.server.manager.LiveTServerSet; import org.apache.accumulo.server.manager.LiveTServerSet.TServerConnection; @@ -163,6 +165,7 @@ import org.slf4j.LoggerFactory; import com.google.common.collect.ImmutableSortedMap; +import com.google.common.collect.Maps; import com.google.common.util.concurrent.RateLimiter; import com.google.common.util.concurrent.Uninterruptibles; @@ -240,6 +243,17 @@ public synchronized ManagerState getManagerState() { return state; } + @Override + public Map> getCompactionHints() { + Map allConfig = null; + try { + allConfig = CompactionConfigStorage.getAllConfig(getContext(), tableId -> true); + } catch (InterruptedException | KeeperException e) { + throw new RuntimeException(e); + } + return Maps.transformValues(allConfig, CompactionConfig::getExecutionHints); + } + public boolean stillManager() { return getManagerState() != ManagerState.STOP; } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java index 9b18301033b..997ef5e454c 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java @@ -251,8 +251,8 @@ public void run() { int[] counts = new int[TabletState.values().length]; stats.begin(); - CompactionJobGenerator compactionGenerator = - new CompactionJobGenerator(new ServiceEnvironmentImpl(manager.getContext())); + CompactionJobGenerator compactionGenerator = new CompactionJobGenerator( + new ServiceEnvironmentImpl(manager.getContext()), manager.getCompactionHints()); final Map> resourceGroups = new HashMap<>(); manager.tServerResourceGroups().forEach((k, v) -> { diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java index fc576ae4e37..39149f97472 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java @@ -21,7 +21,7 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.concurrent.TimeUnit.SECONDS; -import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.COMPACT_ID; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.COMPACTED; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.ECOMP; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION; @@ -71,6 +71,7 @@ import org.apache.accumulo.core.data.NamespaceId; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent; +import org.apache.accumulo.core.fate.FateTxId; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.iteratorsImpl.system.SystemIteratorUtil; import org.apache.accumulo.core.metadata.AbstractTabletFile; @@ -97,7 +98,6 @@ import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob; import org.apache.accumulo.core.tabletserver.thrift.TTabletRefresh; import org.apache.accumulo.core.tabletserver.thrift.TabletServerClientService; -import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.util.Retry; import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.core.util.compaction.CompactionExecutorIdImpl; @@ -120,7 +120,9 @@ import org.slf4j.LoggerFactory; import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.CacheLoader; import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.LoadingCache; import com.google.common.base.Preconditions; import com.google.common.collect.Iterators; import com.google.common.collect.Sets; @@ -165,6 +167,8 @@ public class CompactionCoordinator implements CompactionCoordinatorService.Iface private final ScheduledThreadPoolExecutor schedExecutor; + private LoadingCache compactionConfigCache; + public CompactionCoordinator(ServerContext ctx, LiveTServerSet tservers, SecurityOperation security, CompactionJobQueues jobQueues) { this.ctx = ctx; @@ -179,6 +183,16 @@ public CompactionCoordinator(ServerContext ctx, LiveTServerSet tservers, refreshLatches.put(Ample.DataLevel.USER, new CountDownLatch(1)); this.refreshLatches = Collections.unmodifiableMap(refreshLatches); + CacheLoader loader = + txid -> CompactionConfigStorage.getConfig(ctx, txid); + + // Keep a small short lived cache of compaction config. Compaction config never changes, however + // when a compaction is canceled it is deleted which is why there is a time limit. It does not + // hurt to let a job that was canceled start, it will be canceled later. Caching this immutable + // config will help avoid reading the same data over and over. + compactionConfigCache = + Caffeine.newBuilder().expireAfterWrite(30, SECONDS).maximumSize(100).build(loader); + // At this point the manager does not have its lock so no actions should be taken yet } @@ -358,13 +372,24 @@ public TExternalCompactionJob getCompactionJob(TInfo tinfo, TCredentials credent jobQueues.poll(CompactionExecutorIdImpl.externalId(queueName)); while (metaJob != null) { + + Optional compactionConfig = getCompactionConfig(metaJob); + // this method may reread the metadata, do not use the metadata in metaJob for anything after // this method - ExternalCompactionMetadata ecm = - reserveCompaction(metaJob, compactorAddress, externalCompactionId); + ExternalCompactionMetadata ecm = null; + + var kind = metaJob.getJob().getKind(); + + // Only reserve user compactions when the config is present. When compactions are canceled the + // config is deleted. + if (kind == CompactionKind.SYSTEM + || (kind == CompactionKind.USER && compactionConfig.isPresent())) { + ecm = reserveCompaction(metaJob, compactorAddress, externalCompactionId); + } if (ecm != null) { - result = createThriftJob(externalCompactionId, ecm, metaJob); + result = createThriftJob(externalCompactionId, ecm, metaJob, compactionConfig); // It is possible that by the time this added that the the compactor that made this request // is dead. In this cases the compaction is not actually running. RUNNING_CACHE.put(ExternalCompactionId.of(result.getExternalCompactionId()), @@ -457,6 +482,8 @@ private ExternalCompactionMetadata createExternalCompactionMetadata(CompactionJo Set jobFiles, TabletMetadata tablet, String compactorAddress) { boolean propDels; + Long fateTxId = null; + switch (job.getKind()) { case SYSTEM: { boolean compactingAll = tablet.getFiles().equals(jobFiles); @@ -468,6 +495,7 @@ private ExternalCompactionMetadata createExternalCompactionMetadata(CompactionJo boolean compactingAll = tablet.getSelectedFiles().initiallySelectedAll() && tablet.getSelectedFiles().getFiles().equals(jobFiles); propDels = !compactingAll; + fateTxId = tablet.getSelectedFiles().getFateTxId(); } break; default: @@ -481,12 +509,8 @@ private ExternalCompactionMetadata createExternalCompactionMetadata(CompactionJo ReferencedTabletFile newFile = TabletNameGenerator.getNextDataFilenameForMajc(propDels, ctx, tablet, directorCreator); - // ELASTICITY_TODO this is not being set on purpose for now as how compactions store data and - // are canceled may be changed. - Long compactionId = null; - return new ExternalCompactionMetadata(jobFiles, newFile, compactorAddress, job.getKind(), - job.getPriority(), job.getExecutor(), propDels, compactionId); + job.getPriority(), job.getExecutor(), propDels, fateTxId); } @@ -547,9 +571,8 @@ private ExternalCompactionMetadata reserveCompaction(CompactionJobQueues.MetaJob } TExternalCompactionJob createThriftJob(String externalCompactionId, - ExternalCompactionMetadata ecm, CompactionJobQueues.MetaJob metaJob) { - - Optional compactionConfig = getCompactionConfig(metaJob); + ExternalCompactionMetadata ecm, CompactionJobQueues.MetaJob metaJob, + Optional compactionConfig) { Map overrides = CompactionPluginUtils.computeOverrides(compactionConfig, ctx, metaJob.getTabletMetadata().getExtent(), metaJob.getJob().getFiles()); @@ -563,11 +586,15 @@ TExternalCompactionJob createThriftJob(String externalCompactionId, dfv.getNumEntries(), dfv.getTime()); }).collect(Collectors.toList()); + long fateTxid = 0; + if (metaJob.getJob().getKind() == CompactionKind.USER) { + fateTxid = metaJob.getTabletMetadata().getSelectedFiles().getFateTxId(); + } + return new TExternalCompactionJob(externalCompactionId, metaJob.getTabletMetadata().getExtent().toThrift(), files, iteratorSettings, ecm.getCompactTmpName().getNormalizedPathStr(), ecm.getPropagateDeletes(), - TCompactionKind.valueOf(ecm.getKind().name()), - ecm.getCompactionId() == null ? 0 : ecm.getCompactionId(), overrides); + TCompactionKind.valueOf(ecm.getKind().name()), fateTxid, overrides); } class RefreshWriter { @@ -623,21 +650,13 @@ public void deleteRefresh() { } private Optional getCompactionConfig(CompactionJobQueues.MetaJob metaJob) { - Optional compactionConfig = Optional.empty(); - if (metaJob.getJob().getKind() == CompactionKind.USER - || metaJob.getJob().getKind() == CompactionKind.SELECTOR) { - try { - Pair cconf = - CompactionConfigStorage.getCompactionID(ctx, metaJob.getTabletMetadata().getExtent()); - if (cconf != null) { - compactionConfig = Optional.of(cconf.getSecond()); - } - } catch (KeeperException.NoNodeException e) { - throw new RuntimeException(e); - } + && metaJob.getTabletMetadata().getSelectedFiles() != null) { + var cconf = + compactionConfigCache.get(metaJob.getTabletMetadata().getSelectedFiles().getFateTxId()); + return Optional.ofNullable(cconf); } - return compactionConfig; + return Optional.empty(); } /** @@ -710,7 +729,7 @@ public void compactionCompleted(TInfo tinfo, TCredentials credentials, final var ecid = ExternalCompactionId.of(externalCompactionId); var tabletMeta = - ctx.getAmple().readTablet(extent, ECOMP, SELECTED, LOCATION, FILES, COMPACT_ID, OPID); + ctx.getAmple().readTablet(extent, ECOMP, SELECTED, LOCATION, FILES, COMPACTED, OPID); if (!canCommitCompaction(ecid, tabletMeta)) { return; @@ -794,14 +813,6 @@ private void refreshTablet(TabletMetadata metadata, Collection } } - private long getCompactionId(KeyExtent extent) { - try { - return CompactionConfigStorage.getCompactionID(ctx, extent).getFirst(); - } catch (KeeperException.NoNodeException e) { - throw new RuntimeException(e); - } - } - // ELASTICITY_TODO unit test this method private boolean canCommitCompaction(ExternalCompactionId ecid, TabletMetadata tabletMetadata) { @@ -827,10 +838,25 @@ private boolean canCommitCompaction(ExternalCompactionId ecid, TabletMetadata ta } if (ecm.getKind() == CompactionKind.USER || ecm.getKind() == CompactionKind.SELECTOR) { - if (tabletMetadata.getSelectedFiles() == null - || !tabletMetadata.getSelectedFiles().getFiles().containsAll(ecm.getJobFiles())) { + if (tabletMetadata.getSelectedFiles() == null) { + // when the compaction is canceled, selected files are deleted + LOG.debug( + "Received completion notification for user compaction and tablet has no selected files {} {}", + ecid, extent); + return false; + } + + if (ecm.getFateTxId() != tabletMetadata.getSelectedFiles().getFateTxId()) { + // maybe the compaction was cancled and another user compaction was started on the tablet. + LOG.debug( + "Received completion notification for user compaction where its fate txid did not match the tablets {} {} {} {}", + ecid, extent, FateTxId.formatTid(ecm.getFateTxId()), + FateTxId.formatTid(tabletMetadata.getSelectedFiles().getFateTxId())); + } + + if (!tabletMetadata.getSelectedFiles().getFiles().containsAll(ecm.getJobFiles())) { // this is not expected to happen - LOG.error("Compaction contained files not in the selected set {} {} {} {} {}", + LOG.error("User compaction contained files not in the selected set {} {} {} {} {}", tabletMetadata.getExtent(), ecid, ecm.getKind(), Optional.ofNullable(tabletMetadata.getSelectedFiles()).map(SelectedFiles::getFiles), ecm.getJobFiles()); @@ -875,7 +901,7 @@ private TabletMetadata commitCompaction(TCompactionStats stats, ExternalCompacti .requireCompaction(ecid).requireSame(tablet, PREV_ROW, FILES, LOCATION); if (ecm.getKind() == CompactionKind.USER || ecm.getKind() == CompactionKind.SELECTOR) { - tabletMutator.requireSame(tablet, SELECTED, COMPACT_ID); + tabletMutator.requireSame(tablet, SELECTED, COMPACTED); } // make the needed updates to the tablet @@ -918,22 +944,22 @@ private void updateTabletForCompaction(TCompactionStats stats, ExternalCompactio // all files selected for the user compactions are finished, so the tablet is finish and // its compaction id needs to be updated. - long compactionId = getCompactionId(extent); + long fateTxId = tablet.getSelectedFiles().getFateTxId(); + + Preconditions.checkArgument(!tablet.getCompacted().contains(fateTxId), + "Tablet %s unexpected has selected files and compacted columns for %s", + tablet.getExtent(), fateTxId); + // TODO set to trace - LOG.debug("All selected files compcated for {} setting compaction ID to {}", - tablet.getExtent(), compactionId); + LOG.debug("All selected files compcated for {} setting compacted for {}", + tablet.getExtent(), FateTxId.formatTid(tablet.getSelectedFiles().getFateTxId())); tabletMutator.deleteSelectedFiles(); - - if (tablet.getCompactId().orElse(-1) < compactionId) { - tabletMutator.putCompactionId(compactionId); - } + tabletMutator.putCompacted(fateTxId); } else { // not all of the selected files were finished, so need to add the new file to the // selected set - // TODO need a test for user compaction that takes multiple compactions on a single tablet - // to test this case Set newSelectedFileSet = new HashSet<>(tablet.getSelectedFiles().getFiles()); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/TabletRefresher.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/TabletRefresher.java index 3555c82e358..4ad5ccee1ff 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/TabletRefresher.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/TabletRefresher.java @@ -108,6 +108,9 @@ public static void refreshTablets(ExecutorService threadPool, String logId, Serv Supplier> onlineTserversSupplier, Map> refreshesNeeded) { + // make a copy as it will be mutated in this method + refreshesNeeded = new HashMap<>(refreshesNeeded); + Retry retry = Retry.builder().infiniteRetries().retryAfter(100, MILLISECONDS) .incrementBy(100, MILLISECONDS).maxWait(1, SECONDS).backOffFactor(1.5) .logInterval(3, MINUTES).createRetry(); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CleanUp.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CleanUp.java new file mode 100644 index 00000000000..cef3b05dca5 --- /dev/null +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CleanUp.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.manager.tableOps.compact; + +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.COMPACTED; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW; + +import java.time.Duration; + +import org.apache.accumulo.core.data.NamespaceId; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.fate.FateTxId; +import org.apache.accumulo.core.fate.Repo; +import org.apache.accumulo.core.metadata.schema.Ample.ConditionalResult.Status; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.manager.Manager; +import org.apache.accumulo.manager.tableOps.ManagerRepo; +import org.apache.accumulo.manager.tableOps.Utils; +import org.apache.accumulo.server.compaction.CompactionConfigStorage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CleanUp extends ManagerRepo { + + private static final Logger log = LoggerFactory.getLogger(CleanUp.class); + + private static final long serialVersionUID = 1L; + + private final TableId tableId; + private final NamespaceId namespaceId; + private final byte[] startRow; + private final byte[] endRow; + + public CleanUp(TableId tableId, NamespaceId namespaceId, byte[] startRow, byte[] endRow) { + this.tableId = tableId; + this.namespaceId = namespaceId; + this.startRow = startRow; + this.endRow = endRow; + } + + @Override + public long isReady(long tid, Manager manager) throws Exception { + + var ample = manager.getContext().getAmple(); + + try ( + var tablets = ample.readTablets().forTable(tableId).overlapping(startRow, endRow) + .fetch(PREV_ROW, COMPACTED).checkConsistency().build(); + var tabletsMutator = ample.conditionallyMutateTablets()) { + + long t1 = System.nanoTime(); + for (TabletMetadata tablet : tablets) { + if (tablet.getCompacted().contains(tid)) { + tabletsMutator.mutateTablet(tablet.getExtent()).requireAbsentOperation() + .requireSame(tablet, PREV_ROW, COMPACTED).deleteCompacted(tid) + .submit(tabletMetadata -> !tabletMetadata.getCompacted().contains(tid)); + } + } + + long rejected = tabletsMutator.process().values().stream() + .filter(result -> result.getStatus() == Status.REJECTED).peek(result -> log + .debug("{} update for {} was rejected ", FateTxId.formatTid(tid), result.getExtent())) + .count(); + + long t2 = System.nanoTime(); + + if (rejected > 0) { + long sleepTime = Duration.ofNanos(t2 - t1).toMillis(); + sleepTime = Math.max(100, Math.min(30000, sleepTime * 2)); + return sleepTime; + } + } + + return 0; + } + + @Override + public Repo call(long tid, Manager manager) throws Exception { + CompactionConfigStorage.deleteConfig(manager.getContext(), tid); + Utils.getReadLock(manager, tableId, tid).unlock(); + Utils.getReadLock(manager, namespaceId, tid).unlock(); + return null; + } +} diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactRange.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactRange.java index be7244bf93a..5edeadab797 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactRange.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactRange.java @@ -18,29 +18,22 @@ */ package org.apache.accumulo.manager.tableOps.compact; -import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.Objects.requireNonNull; -import static org.apache.accumulo.core.clientImpl.UserCompactionUtils.isDefault; import java.util.Optional; -import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.admin.CompactionConfig; import org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationException; -import org.apache.accumulo.core.clientImpl.UserCompactionUtils; import org.apache.accumulo.core.clientImpl.thrift.TableOperation; import org.apache.accumulo.core.clientImpl.thrift.TableOperationExceptionType; import org.apache.accumulo.core.data.NamespaceId; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.fate.Repo; -import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; -import org.apache.accumulo.core.util.FastFormat; import org.apache.accumulo.core.util.TextUtil; import org.apache.accumulo.manager.Manager; import org.apache.accumulo.manager.tableOps.ManagerRepo; import org.apache.accumulo.manager.tableOps.Utils; -import org.apache.commons.codec.binary.Hex; -import org.apache.zookeeper.KeeperException.NoNodeException; +import org.apache.accumulo.server.compaction.CompactionConfigStorage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,16 +56,7 @@ public CompactRange(NamespaceId namespaceId, TableId tableId, CompactionConfig c this.tableId = tableId; this.namespaceId = namespaceId; - - if (!compactionConfig.getIterators().isEmpty() - || !compactionConfig.getExecutionHints().isEmpty() - || !isDefault(compactionConfig.getConfigurer()) - || !isDefault(compactionConfig.getSelector())) { - this.config = UserCompactionUtils.encode(compactionConfig); - } else { - log.debug( - "Using default compaction config. No user iterators or compaction config provided."); - } + this.config = CompactionConfigStorage.encodeConfig(compactionConfig, tableId); if (compactionConfig.getStartRow() != null && compactionConfig.getEndRow() != null && compactionConfig.getStartRow().compareTo(compactionConfig.getEndRow()) >= 0) { @@ -95,89 +79,14 @@ public long isReady(long tid, Manager env) throws Exception { @Override public Repo call(final long tid, Manager env) throws Exception { - String zTablePath = Constants.ZROOT + "/" + env.getInstanceID() + Constants.ZTABLES + "/" - + tableId + Constants.ZTABLE_COMPACT_ID; - - ZooReaderWriter zoo = env.getContext().getZooReaderWriter(); - byte[] cid; - try { - cid = zoo.mutateExisting(zTablePath, currentValue -> { - String cvs = new String(currentValue, UTF_8); - String[] tokens = cvs.split(","); - long flushID = Long.parseLong(tokens[0]) + 1; - - String txidString = FastFormat.toHexString(tid); - - for (int i = 1; i < tokens.length; i++) { - if (tokens[i].startsWith(txidString)) { - continue; // skip self - } - - log.debug("txidString : {}", txidString); - log.debug("tokens[{}] : {}", i, tokens[i]); - - throw new AcceptableThriftTableOperationException(tableId.canonical(), null, - TableOperation.COMPACT, TableOperationExceptionType.OTHER, - "Another compaction with iterators and/or a compaction strategy is running"); - } - - StringBuilder encodedIterators = new StringBuilder(); - - if (config != null) { - Hex hex = new Hex(); - encodedIterators.append(","); - encodedIterators.append(txidString); - encodedIterators.append("="); - encodedIterators.append(new String(hex.encode(config), UTF_8)); - } - - return (Long.toString(flushID) + encodedIterators).getBytes(UTF_8); - }); - - return new CompactionDriver(Long.parseLong(new String(cid, UTF_8).split(",")[0]), namespaceId, - tableId, startRow, endRow); - } catch (NoNodeException nne) { - throw new AcceptableThriftTableOperationException(tableId.canonical(), null, - TableOperation.COMPACT, TableOperationExceptionType.NOTFOUND, null); - } - - } - - static void removeIterators(Manager environment, final long txid, TableId tableId) - throws Exception { - String zTablePath = Constants.ZROOT + "/" + environment.getInstanceID() + Constants.ZTABLES - + "/" + tableId + Constants.ZTABLE_COMPACT_ID; - - ZooReaderWriter zoo = environment.getContext().getZooReaderWriter(); - - try { - zoo.mutateExisting(zTablePath, currentValue -> { - String cvs = new String(currentValue, UTF_8); - String[] tokens = cvs.split(","); - long flushID = Long.parseLong(tokens[0]); - - String txidString = FastFormat.toHexString(txid); - - StringBuilder encodedIterators = new StringBuilder(); - for (int i = 1; i < tokens.length; i++) { - if (tokens[i].startsWith(txidString)) { - continue; - } - encodedIterators.append(","); - encodedIterators.append(tokens[i]); - } - - return (Long.toString(flushID) + encodedIterators).getBytes(UTF_8); - }); - } catch (NoNodeException ke) { - log.debug("Node for {} no longer exists.", tableId, ke); - } + CompactionConfigStorage.setConfig(env.getContext(), tid, config); + return new CompactionDriver(namespaceId, tableId, startRow, endRow); } @Override public void undo(long tid, Manager env) throws Exception { try { - removeIterators(env, tid, tableId); + CompactionConfigStorage.deleteConfig(env.getContext(), tid); } finally { Utils.unreserveNamespace(env, namespaceId, tid, false); Utils.unreserveTable(env, tableId, tid, false); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java index 51e39f96974..0f8491fc5ee 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java @@ -18,7 +18,10 @@ */ package org.apache.accumulo.manager.tableOps.compact; -import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.COMPACT_ID; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.MINUTES; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.COMPACTED; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.ECOMP; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.OPID; @@ -26,15 +29,14 @@ import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SELECTED; import java.util.Set; +import java.util.function.Predicate; import java.util.stream.Collectors; -import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.admin.CompactionConfig; import org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationException; import org.apache.accumulo.core.clientImpl.TableOperationsImpl; import org.apache.accumulo.core.clientImpl.thrift.TableOperation; import org.apache.accumulo.core.clientImpl.thrift.TableOperationExceptionType; -import org.apache.accumulo.core.data.InstanceId; import org.apache.accumulo.core.data.NamespaceId; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.fate.FateTxId; @@ -46,10 +48,12 @@ import org.apache.accumulo.core.metadata.schema.Ample.ConditionalResult.Status; import org.apache.accumulo.core.metadata.schema.SelectedFiles; import org.apache.accumulo.core.metadata.schema.TabletMetadata; -import org.apache.accumulo.core.util.Pair; +import org.apache.accumulo.core.util.Retry; import org.apache.accumulo.manager.Manager; import org.apache.accumulo.manager.tableOps.ManagerRepo; +import org.apache.accumulo.manager.tableOps.bulkVer2.TabletRefresher; import org.apache.accumulo.manager.tableOps.delete.PreDeleteTable; +import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.compaction.CompactionConfigStorage; import org.apache.accumulo.server.compaction.CompactionPluginUtils; import org.apache.zookeeper.KeeperException; @@ -60,22 +64,15 @@ class CompactionDriver extends ManagerRepo { private static final Logger log = LoggerFactory.getLogger(CompactionDriver.class); - public static String createCompactionCancellationPath(InstanceId instanceId, TableId tableId) { - return Constants.ZROOT + "/" + instanceId + Constants.ZTABLES + "/" + tableId.canonical() - + Constants.ZTABLE_COMPACT_CANCEL_ID; - } - private static final long serialVersionUID = 1L; - private long compactId; private final TableId tableId; private final NamespaceId namespaceId; private byte[] startRow; private byte[] endRow; - public CompactionDriver(long compactId, NamespaceId namespaceId, TableId tableId, byte[] startRow, + public CompactionDriver(NamespaceId namespaceId, TableId tableId, byte[] startRow, byte[] endRow) { - this.compactId = compactId; this.tableId = tableId; this.namespaceId = namespaceId; this.startRow = startRow; @@ -90,10 +87,9 @@ public long isReady(long tid, Manager manager) throws Exception { return 0; } - String zCancelID = createCompactionCancellationPath(manager.getInstanceID(), tableId); ZooReaderWriter zoo = manager.getContext().getZooReaderWriter(); - if (Long.parseLong(new String(zoo.getData(zCancelID))) >= compactId) { + if (isCancelled(tid, manager.getContext())) { // compaction was canceled throw new AcceptableThriftTableOperationException(tableId.canonical(), null, TableOperation.COMPACT, TableOperationExceptionType.OTHER, @@ -111,7 +107,7 @@ public long isReady(long tid, Manager manager) throws Exception { long t1 = System.currentTimeMillis(); - int tabletsToWaitFor = updateAndCheckTablets(manager, tid, compactId); + int tabletsToWaitFor = updateAndCheckTablets(manager, tid); long scanTime = System.currentTimeMillis() - t1; @@ -128,7 +124,13 @@ public long isReady(long tid, Manager manager) throws Exception { return sleepTime; } - public int updateAndCheckTablets(Manager manager, long tid, long compactId) { + private boolean isCancelled(long tid, ServerContext context) + throws InterruptedException, KeeperException { + return CompactionConfigStorage.getConfig(context, tid) == null; + } + + public int updateAndCheckTablets(Manager manager, long tid) + throws AcceptableThriftTableOperationException { var ample = manager.getContext().getAmple(); @@ -136,7 +138,7 @@ public int updateAndCheckTablets(Manager manager, long tid, long compactId) { try ( var tablets = ample.readTablets().forTable(tableId).overlapping(startRow, endRow) - .fetch(PREV_ROW, COMPACT_ID, FILES, SELECTED, ECOMP, OPID).build(); + .fetch(PREV_ROW, COMPACTED, FILES, SELECTED, ECOMP, OPID).checkConsistency().build(); var tabletsMutator = ample.conditionallyMutateTablets()) { int complete = 0; @@ -144,13 +146,15 @@ public int updateAndCheckTablets(Manager manager, long tid, long compactId) { int selected = 0; + CompactionConfig config = CompactionConfigStorage.getConfig(manager.getContext(), tid); + for (TabletMetadata tablet : tablets) { total++; // TODO change all logging to trace - if (tablet.getCompactId().orElse(-1) >= compactId) { + if (tablet.getCompacted().contains(tid)) { // this tablet is already considered done log.debug("{} compaction for {} is complete", FateTxId.formatTid(tid), tablet.getExtent()); @@ -163,29 +167,25 @@ public int updateAndCheckTablets(Manager manager, long tid, long compactId) { FateTxId.formatTid(tid), tablet.getExtent()); // this tablet has no files try to mark it as done tabletsMutator.mutateTablet(tablet.getExtent()).requireAbsentOperation() - .requireSame(tablet, PREV_ROW, FILES, COMPACT_ID).putCompactionId(compactId) - .submit(tabletMetadata -> tabletMetadata.getCompactId().orElse(-1) >= compactId); + .requireSame(tablet, PREV_ROW, FILES, COMPACTED).putCompacted(tid) + .submit(tabletMetadata -> tabletMetadata.getCompacted().contains(tid)); } else if (tablet.getSelectedFiles() == null && tablet.getExternalCompactions().isEmpty()) { // there are no selected files log.debug("{} selecting {} files compaction for {}", FateTxId.formatTid(tid), tablet.getFiles().size(), tablet.getExtent()); - // ELASTICITY_TODO this is inefficient, going to zookeeper for each tablet... Having per - // fate - // transaction config lends itself to caching very well because the config related to the - // fate txid is fixed and is not changing - Pair comactionConfig = null; + Set filesToCompact; try { - comactionConfig = - CompactionConfigStorage.getCompactionID(manager.getContext(), tablet.getExtent()); - } catch (KeeperException.NoNodeException e) { - throw new RuntimeException(e); + filesToCompact = CompactionPluginUtils.selectFiles(manager.getContext(), + tablet.getExtent(), config, tablet.getFilesMap()); + } catch (Exception e) { + log.warn("{} failed to select files for {} using {}", FateTxId.formatTid(tid), + tablet.getExtent(), config.getSelector(), e); + throw new AcceptableThriftTableOperationException(tableId.canonical(), null, + TableOperation.COMPACT, TableOperationExceptionType.OTHER, + "Failed to select files"); } - Set filesToCompact = - CompactionPluginUtils.selectFiles(manager.getContext(), tablet.getExtent(), - comactionConfig.getSecond(), tablet.getFilesMap()); - // TODO expensive logging log.debug("{} selected {} of {} files for {}", FateTxId.formatTid(tid), filesToCompact.stream().map(AbstractTabletFile::getFileName) @@ -197,11 +197,11 @@ public int updateAndCheckTablets(Manager manager, long tid, long compactId) { if (filesToCompact.isEmpty()) { // no files were selected so mark the tablet as compacted tabletsMutator.mutateTablet(tablet.getExtent()).requireAbsentOperation() - .requireSame(tablet, PREV_ROW, FILES, COMPACT_ID).putCompactionId(compactId) - .submit(tabletMetadata -> tabletMetadata.getCompactId().orElse(-1) >= compactId); + .requireSame(tablet, PREV_ROW, FILES, SELECTED, ECOMP, COMPACTED).putCompacted(tid) + .submit(tabletMetadata -> tabletMetadata.getCompacted().contains(tid)); } else { var mutator = tabletsMutator.mutateTablet(tablet.getExtent()).requireAbsentOperation() - .requireSame(tablet, PREV_ROW, FILES, SELECTED, ECOMP, COMPACT_ID); + .requireSame(tablet, PREV_ROW, FILES, SELECTED, ECOMP, COMPACTED); var selectedFiles = new SelectedFiles(filesToCompact, tablet.getFiles().equals(filesToCompact), tid); @@ -214,12 +214,19 @@ public int updateAndCheckTablets(Manager manager, long tid, long compactId) { selected++; } - } else if (tablet.getSelectedFiles() != null - && tablet.getSelectedFiles().getFateTxId() == tid) { - log.debug( - "{} tablet {} already has {} selected files for this compaction, waiting for them be processed", - FateTxId.formatTid(tid), tablet.getExtent(), - tablet.getSelectedFiles().getFiles().size()); + } else if (tablet.getSelectedFiles() != null) { + if (tablet.getSelectedFiles().getFateTxId() == tid) { + log.debug( + "{} tablet {} already has {} selected files for this compaction, waiting for them be processed", + FateTxId.formatTid(tid), tablet.getExtent(), + tablet.getSelectedFiles().getFiles().size()); + } else { + log.debug( + "{} tablet {} already has {} selected files by another compaction {}, waiting for them be processed", + FateTxId.formatTid(tid), tablet.getExtent(), + tablet.getSelectedFiles().getFiles().size(), + FateTxId.formatTid(tablet.getSelectedFiles().getFateTxId())); + } } else { // ELASTICITY_TODO if there are compactions preventing selection of files, then add // selecting marker that prevents new compactions from starting @@ -239,6 +246,8 @@ public int updateAndCheckTablets(Manager manager, long tid, long compactId) { } return total - complete; + } catch (InterruptedException | KeeperException e) { + throw new RuntimeException(e); } // ELASTICITIY_TODO need to handle seeing zero tablets @@ -250,8 +259,68 @@ public Repo call(long tid, Manager env) throws Exception { } @Override - public void undo(long tid, Manager environment) { + public void undo(long tid, Manager env) throws Exception { + cleanupTabletMetadata(tid, env); + + // For any compactions that may have happened before this operation failed, attempt to refresh + // tablets. + TabletRefresher.refresh(env.getContext(), env::onlineTabletServers, tid, tableId, startRow, + endRow, tabletMetadata -> true); + } + + /** + * Cleans up any tablet metadata that may have been added as part of this compaction operation. + */ + private void cleanupTabletMetadata(long tid, Manager manager) throws Exception { + var ample = manager.getContext().getAmple(); + + // ELASTICITY_TODO use existing compaction logging + + boolean allCleanedUp = false; + Retry retry = Retry.builder().infiniteRetries().retryAfter(100, MILLISECONDS) + .incrementBy(100, MILLISECONDS).maxWait(1, SECONDS).backOffFactor(1.5) + .logInterval(3, MINUTES).createRetry(); + + while (!allCleanedUp) { + + try ( + var tablets = ample.readTablets().forTable(tableId).overlapping(startRow, endRow) + .fetch(PREV_ROW, COMPACTED, SELECTED).checkConsistency().build(); + var tabletsMutator = ample.conditionallyMutateTablets()) { + Predicate needsUpdate = + tabletMetadata -> (tabletMetadata.getSelectedFiles() != null + && tabletMetadata.getSelectedFiles().getFateTxId() == tid) + || tabletMetadata.getCompacted().contains(tid); + Predicate needsNoUpdate = needsUpdate.negate(); + + for (TabletMetadata tablet : tablets) { + + if (needsUpdate.test(tablet)) { + var mutator = tabletsMutator.mutateTablet(tablet.getExtent()).requireAbsentOperation() + .requireSame(tablet, PREV_ROW, COMPACTED, SELECTED); + if (tablet.getSelectedFiles() != null + && tablet.getSelectedFiles().getFateTxId() == tid) { + mutator.deleteSelectedFiles(); + } + + if (tablet.getCompacted().contains(tid)) { + mutator.deleteCompacted(tid); + } + + mutator.submit(needsNoUpdate::test); + } + } + + allCleanedUp = tabletsMutator.process().values().stream() + .allMatch(result -> result.getStatus() == Status.ACCEPTED); + } + + if (!allCleanedUp) { + retry.waitForNextAttempt(log, + "Cleanup metadata for failed compaction " + FateTxId.formatTid(tid)); + } + } } } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/RefreshTablets.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/RefreshTablets.java index a9b3f85e16b..cccfad771cc 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/RefreshTablets.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/RefreshTablets.java @@ -24,7 +24,6 @@ import org.apache.accumulo.core.fate.Repo; import org.apache.accumulo.manager.Manager; import org.apache.accumulo.manager.tableOps.ManagerRepo; -import org.apache.accumulo.manager.tableOps.Utils; import org.apache.accumulo.manager.tableOps.bulkVer2.TabletRefresher; public class RefreshTablets extends ManagerRepo { @@ -48,10 +47,6 @@ public Repo call(long tid, Manager manager) throws Exception { TabletRefresher.refresh(manager.getContext(), manager::onlineTabletServers, tid, tableId, startRow, endRow, tabletMetadata -> true); - CompactRange.removeIterators(manager, tid, tableId); - Utils.getReadLock(manager, tableId, tid).unlock(); - Utils.getReadLock(manager, namespaceId, tid).unlock(); - - return null; + return new CleanUp(tableId, namespaceId, startRow, endRow); } } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/cancel/CancelCompactions.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/cancel/CancelCompactions.java index 7d25f2d7ac2..2249b9c511d 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/cancel/CancelCompactions.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/cancel/CancelCompactions.java @@ -18,18 +18,15 @@ */ package org.apache.accumulo.manager.tableOps.compact.cancel; -import static java.nio.charset.StandardCharsets.UTF_8; - -import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.clientImpl.thrift.TableOperation; import org.apache.accumulo.core.data.NamespaceId; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.fate.FateTxId; import org.apache.accumulo.core.fate.Repo; -import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.manager.Manager; import org.apache.accumulo.manager.tableOps.ManagerRepo; import org.apache.accumulo.manager.tableOps.Utils; +import org.apache.accumulo.server.compaction.CompactionConfigStorage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,7 +51,15 @@ public long isReady(long tid, Manager env) throws Exception { @Override public Repo call(long tid, Manager environment) throws Exception { - mutateZooKeeper(tid, tableId, environment); + + var idsToCancel = + CompactionConfigStorage.getAllConfig(environment.getContext(), tableId::equals).keySet(); + + for (var idToCancel : idsToCancel) { + log.debug("{} deleting compaction config {}", FateTxId.formatTid(tid), + FateTxId.formatTid(idToCancel)); + CompactionConfigStorage.deleteConfig(environment.getContext(), idToCancel); + } return new FinishCancelCompaction(namespaceId, tableId); } @@ -64,33 +69,4 @@ public void undo(long tid, Manager env) { Utils.unreserveNamespace(env, namespaceId, tid, false); } - public static void mutateZooKeeper(long tid, TableId tableId, Manager environment) - throws Exception { - String zCompactID = Constants.ZROOT + "/" + environment.getInstanceID() + Constants.ZTABLES - + "/" + tableId + Constants.ZTABLE_COMPACT_ID; - String zCancelID = Constants.ZROOT + "/" + environment.getInstanceID() + Constants.ZTABLES + "/" - + tableId + Constants.ZTABLE_COMPACT_CANCEL_ID; - - ZooReaderWriter zoo = environment.getContext().getZooReaderWriter(); - - byte[] currentValue = zoo.getData(zCompactID); - - String cvs = new String(currentValue, UTF_8); - String[] tokens = cvs.split(","); - final long flushID = Long.parseLong(tokens[0]); - - zoo.mutateExisting(zCancelID, currentValue2 -> { - long cid = Long.parseLong(new String(currentValue2, UTF_8)); - - if (cid < flushID) { - log.debug("{} setting cancel compaction id to {} for {}", FateTxId.formatTid(tid), flushID, - tableId); - return Long.toString(flushID).getBytes(UTF_8); - } else { - log.debug("{} leaving cancel compaction id as {} for {}", FateTxId.formatTid(tid), cid, - tableId); - return Long.toString(cid).getBytes(UTF_8); - } - }); - } } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/PreDeleteTable.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/PreDeleteTable.java index 7ba0700aed4..d9c31c13b83 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/PreDeleteTable.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/PreDeleteTable.java @@ -29,7 +29,7 @@ import org.apache.accumulo.manager.Manager; import org.apache.accumulo.manager.tableOps.ManagerRepo; import org.apache.accumulo.manager.tableOps.Utils; -import org.apache.accumulo.manager.tableOps.compact.cancel.CancelCompactions; +import org.apache.accumulo.server.compaction.CompactionConfigStorage; import org.apache.zookeeper.KeeperException; public class PreDeleteTable extends ManagerRepo { @@ -66,7 +66,13 @@ private void preventFutureCompactions(Manager environment) public Repo call(long tid, Manager environment) throws Exception { try { preventFutureCompactions(environment); - CancelCompactions.mutateZooKeeper(tid, tableId, environment); + + var idsToCancel = + CompactionConfigStorage.getAllConfig(environment.getContext(), tableId::equals).keySet(); + + for (var idToCancel : idsToCancel) { + CompactionConfigStorage.deleteConfig(environment.getContext(), idToCancel); + } return new DeleteTable(namespaceId, tableId); } finally { Utils.unreserveTable(environment, tableId, tid, false); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java index cdc4ac012ac..a804bedd736 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java @@ -96,7 +96,7 @@ public Repo call(long tid, Manager manager) throws Exception { // Only update the original tablet after successfully creating the new tablets, this is // important for failure cases where this operation partially runs a then runs again. - updateExistingTablet(manager, tabletMetadata, opid, newTablets, newTabletsFiles); + updateExistingTablet(tid, manager, tabletMetadata, opid, newTablets, newTabletsFiles); return new DeleteOperationIds(splitInfo); } @@ -180,7 +180,12 @@ private void addNewTablets(long tid, Manager manager, TabletMetadata tabletMetad mutator.putTime(tabletMetadata.getTime()); tabletMetadata.getFlushId().ifPresent(mutator::putFlushId); mutator.putPrevEndRow(newExtent.prevEndRow()); - tabletMetadata.getCompactId().ifPresent(mutator::putCompactionId); + tabletMetadata.getCompacted().forEach(mutator::putCompacted); + + tabletMetadata.getCompacted() + .forEach(ctid -> log.debug("{} copying compacted marker to new child tablet {}", + FateTxId.formatTid(tid), FateTxId.formatTid(ctid))); + mutator.putHostingGoal(tabletMetadata.getHostingGoal()); tabletMetadata.getLoaded().forEach((k, v) -> mutator.putBulkFile(k.getTabletFile(), v)); @@ -201,7 +206,7 @@ private void addNewTablets(long tid, Manager manager, TabletMetadata tabletMetad } } - private void updateExistingTablet(Manager manager, TabletMetadata tabletMetadata, + private void updateExistingTablet(long tid, Manager manager, TabletMetadata tabletMetadata, TabletOperationId opid, SortedSet newTablets, Map> newTabletsFiles) { try (var tabletsMutator = manager.getContext().getAmple().conditionallyMutateTablets()) { @@ -221,11 +226,26 @@ private void updateExistingTablet(Manager manager, TabletMetadata tabletMetadata } }); + // remove any external compaction entries that are present + tabletMetadata.getExternalCompactions().keySet().forEach(mutator::deleteExternalCompaction); + + tabletMetadata.getExternalCompactions().keySet() + .forEach(ecid -> log.debug("{} deleting external compaction entry for split {}", + FateTxId.formatTid(tid), ecid)); + + // remove any selected file entries that are present, the compaction operation will need to + // reselect files + if (tabletMetadata.getSelectedFiles() != null) { + mutator.deleteSelectedFiles(); + log.debug("{} deleting selected files {} because of split", FateTxId.formatTid(tid), + FateTxId.formatTid(tabletMetadata.getSelectedFiles().getFateTxId())); + } + mutator.submit(tm -> false); var result = tabletsMutator.process().get(splitInfo.getOriginal()); - if (result.getStatus() == Status.REJECTED) { + if (result.getStatus() != Status.ACCEPTED) { // Can not use Ample's built in code for checking rejected because we are changing the prev // end row and Ample would try to read the old tablet, so must check it manually. @@ -234,11 +254,9 @@ private void updateExistingTablet(Manager manager, TabletMetadata tabletMetadata if (tabletMeta == null || !tabletMeta.getOperationId().equals(opid)) { throw new IllegalStateException("Failed to update existing tablet in split " + splitInfo.getOriginal() + " " + result.getStatus() + " " + result.getExtent()); + } else { + // ELASTICITY_TODO } - } else if (result.getStatus() != Status.ACCEPTED) { - // maybe this step is being run again and the update was already made - throw new IllegalStateException("Failed to update existing tablet in split " - + splitInfo.getOriginal() + " " + result.getStatus() + " " + result.getExtent()); } } } diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriverTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriverTest.java deleted file mode 100644 index a533fbb9ba0..00000000000 --- a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriverTest.java +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.accumulo.manager.tableOps.compact; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertThrows; - -import java.util.UUID; - -import org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationException; -import org.apache.accumulo.core.clientImpl.TableOperationsImpl; -import org.apache.accumulo.core.clientImpl.thrift.TableOperation; -import org.apache.accumulo.core.clientImpl.thrift.TableOperationExceptionType; -import org.apache.accumulo.core.data.InstanceId; -import org.apache.accumulo.core.data.NamespaceId; -import org.apache.accumulo.core.data.TableId; -import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; -import org.apache.accumulo.manager.Manager; -import org.apache.accumulo.manager.tableOps.delete.PreDeleteTable; -import org.apache.accumulo.server.ServerContext; -import org.easymock.EasyMock; -import org.junit.jupiter.api.Test; - -public class CompactionDriverTest { - - @Test - public void testCancelId() throws Exception { - - final InstanceId instance = InstanceId.of(UUID.randomUUID()); - final long compactId = 123; - final long cancelId = 124; - final NamespaceId namespaceId = NamespaceId.of("13"); - final TableId tableId = TableId.of("42"); - final byte[] startRow = new byte[0]; - final byte[] endRow = new byte[0]; - - Manager manager = EasyMock.createNiceMock(Manager.class); - ServerContext ctx = EasyMock.createNiceMock(ServerContext.class); - ZooReaderWriter zrw = EasyMock.createNiceMock(ZooReaderWriter.class); - EasyMock.expect(manager.getInstanceID()).andReturn(instance).anyTimes(); - EasyMock.expect(manager.getContext()).andReturn(ctx); - EasyMock.expect(ctx.getZooReaderWriter()).andReturn(zrw); - - final String zCancelID = CompactionDriver.createCompactionCancellationPath(instance, tableId); - EasyMock.expect(zrw.getData(zCancelID)).andReturn(Long.toString(cancelId).getBytes()); - - EasyMock.replay(manager, ctx, zrw); - - final CompactionDriver driver = - new CompactionDriver(compactId, namespaceId, tableId, startRow, endRow); - final long tableIdLong = Long.parseLong(tableId.toString()); - - var e = assertThrows(AcceptableThriftTableOperationException.class, - () -> driver.isReady(tableIdLong, manager)); - - assertEquals(e.getTableId(), tableId.toString()); - assertEquals(e.getOp(), TableOperation.COMPACT); - assertEquals(e.getType(), TableOperationExceptionType.OTHER); - assertEquals(TableOperationsImpl.COMPACTION_CANCELED_MSG, e.getDescription()); - - EasyMock.verify(manager, ctx, zrw); - } - - @Test - public void testTableBeingDeleted() throws Exception { - - final InstanceId instance = InstanceId.of(UUID.randomUUID()); - final long compactId = 123; - final long cancelId = 122; - final NamespaceId namespaceId = NamespaceId.of("14"); - final TableId tableId = TableId.of("43"); - final byte[] startRow = new byte[0]; - final byte[] endRow = new byte[0]; - - Manager manager = EasyMock.createNiceMock(Manager.class); - ServerContext ctx = EasyMock.createNiceMock(ServerContext.class); - ZooReaderWriter zrw = EasyMock.createNiceMock(ZooReaderWriter.class); - EasyMock.expect(manager.getInstanceID()).andReturn(instance).anyTimes(); - EasyMock.expect(manager.getContext()).andReturn(ctx); - EasyMock.expect(ctx.getZooReaderWriter()).andReturn(zrw); - - final String zCancelID = CompactionDriver.createCompactionCancellationPath(instance, tableId); - EasyMock.expect(zrw.getData(zCancelID)).andReturn(Long.toString(cancelId).getBytes()); - - String deleteMarkerPath = PreDeleteTable.createDeleteMarkerPath(instance, tableId); - EasyMock.expect(zrw.exists(deleteMarkerPath)).andReturn(true); - - EasyMock.replay(manager, ctx, zrw); - - final CompactionDriver driver = - new CompactionDriver(compactId, namespaceId, tableId, startRow, endRow); - final long tableIdLong = Long.parseLong(tableId.toString()); - - var e = assertThrows(AcceptableThriftTableOperationException.class, - () -> driver.isReady(tableIdLong, manager)); - - assertEquals(e.getTableId(), tableId.toString()); - assertEquals(e.getOp(), TableOperation.COMPACT); - assertEquals(e.getType(), TableOperationExceptionType.OTHER); - assertEquals(TableOperationsImpl.TABLE_DELETED_MSG, e.getDescription()); - - EasyMock.verify(manager, ctx, zrw); - } - -} diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java index a5c441a69c0..14ece9b8c4d 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java @@ -87,7 +87,6 @@ import org.apache.accumulo.core.trace.TraceUtil; import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.volume.Volume; -import org.apache.accumulo.server.compaction.CompactionConfigStorage; import org.apache.accumulo.server.compaction.CompactionStats; import org.apache.accumulo.server.compaction.PausedCompactionMetrics; import org.apache.accumulo.server.fs.VolumeUtil; @@ -687,7 +686,9 @@ long getCompactionCancelID() { } public Pair getCompactionID() throws NoNodeException { - return CompactionConfigStorage.getCompactionID(getContext(), extent); + // ELASTICITY_TODO remove this code + throw new UnsupportedOperationException( + "This code no longer functions and needs to be removed"); } private synchronized CommitSession finishPreparingMutations(long time) { diff --git a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java index f128a80f74c..41244347c60 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java @@ -18,6 +18,7 @@ */ package org.apache.accumulo.test.functional; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.COMPACTED; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOADED; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION; @@ -481,6 +482,72 @@ public void testOperations() throws Exception { } } + @Test + public void testCompacted() { + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { + var context = cluster.getServerContext(); + + var ctmi = new ConditionalTabletsMutatorImpl(context); + + var tabletMeta1 = TabletMetadata.builder(e1).build(COMPACTED); + ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tabletMeta1, COMPACTED) + .putCompacted(55L).submit(tabletMetadata -> tabletMetadata.getCompacted().contains(55L)); + var tabletMeta2 = TabletMetadata.builder(e2).putCompacted(45L).build(COMPACTED); + ctmi.mutateTablet(e2).requireAbsentOperation().requireSame(tabletMeta2, COMPACTED) + .putCompacted(56L).submit(tabletMetadata -> tabletMetadata.getCompacted().contains(56L)); + + var results = ctmi.process(); + assertEquals(Status.ACCEPTED, results.get(e1).getStatus()); + assertEquals(Status.REJECTED, results.get(e2).getStatus()); + + tabletMeta1 = context.getAmple().readTablet(e1); + assertEquals(Set.of(55L), tabletMeta1.getCompacted()); + assertEquals(Set.of(), context.getAmple().readTablet(e2).getCompacted()); + + ctmi = new ConditionalTabletsMutatorImpl(context); + ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tabletMeta1, COMPACTED) + .putCompacted(65L).putCompacted(75L).submit(tabletMetadata -> false); + + results = ctmi.process(); + assertEquals(Status.ACCEPTED, results.get(e1).getStatus()); + + tabletMeta1 = context.getAmple().readTablet(e1); + assertEquals(Set.of(55L, 65L, 75L), tabletMeta1.getCompacted()); + + // test require same with a superset + ctmi = new ConditionalTabletsMutatorImpl(context); + tabletMeta1 = TabletMetadata.builder(e2).putCompacted(55L).putCompacted(65L).putCompacted(75L) + .putCompacted(45L).build(COMPACTED); + ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tabletMeta1, COMPACTED) + .deleteCompacted(55L).deleteCompacted(65L).deleteCompacted(75L) + .submit(tabletMetadata -> false); + results = ctmi.process(); + assertEquals(Status.REJECTED, results.get(e1).getStatus()); + assertEquals(Set.of(55L, 65L, 75L), context.getAmple().readTablet(e1).getCompacted()); + + // test require same with a subset + ctmi = new ConditionalTabletsMutatorImpl(context); + tabletMeta1 = TabletMetadata.builder(e2).putCompacted(55L).putCompacted(65L).build(COMPACTED); + ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tabletMeta1, COMPACTED) + .deleteCompacted(55L).deleteCompacted(65L).deleteCompacted(75L) + .submit(tabletMetadata -> false); + results = ctmi.process(); + assertEquals(Status.REJECTED, results.get(e1).getStatus()); + assertEquals(Set.of(55L, 65L, 75L), context.getAmple().readTablet(e1).getCompacted()); + + // now use the exact set the tablet has + ctmi = new ConditionalTabletsMutatorImpl(context); + tabletMeta1 = TabletMetadata.builder(e2).putCompacted(55L).putCompacted(65L).putCompacted(75L) + .build(COMPACTED); + ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tabletMeta1, COMPACTED) + .deleteCompacted(55L).deleteCompacted(65L).deleteCompacted(75L) + .submit(tabletMetadata -> false); + results = ctmi.process(); + assertEquals(Status.ACCEPTED, results.get(e1).getStatus()); + assertEquals(Set.of(), context.getAmple().readTablet(e1).getCompacted()); + } + } + @Test public void testRootTabletUpdate() throws Exception { var context = cluster.getServerContext(); diff --git a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java index 298d321ccee..e03c48e38af 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java @@ -25,9 +25,9 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.junit.jupiter.api.Assertions.fail; import java.io.IOException; import java.time.Duration; @@ -39,13 +39,14 @@ import java.util.Map; import java.util.Map.Entry; import java.util.NoSuchElementException; -import java.util.Objects; import java.util.Set; +import java.util.TreeSet; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.Collectors; +import java.util.function.Predicate; +import java.util.stream.IntStream; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; @@ -58,7 +59,6 @@ import org.apache.accumulo.core.client.admin.CompactionConfig; import org.apache.accumulo.core.client.admin.NewTableConfiguration; import org.apache.accumulo.core.client.admin.PluginConfig; -import org.apache.accumulo.core.client.admin.compaction.CompactableFile; import org.apache.accumulo.core.client.admin.compaction.CompactionSelector; import org.apache.accumulo.core.client.admin.compaction.CompressionConfigurer; import org.apache.accumulo.core.clientImpl.ClientContext; @@ -80,6 +80,7 @@ import org.apache.accumulo.core.metadata.RootTable; import org.apache.accumulo.core.metadata.StoredTabletFile; import org.apache.accumulo.core.metadata.schema.Ample; +import org.apache.accumulo.core.metadata.schema.MetadataSchema; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily; import org.apache.accumulo.core.metadata.schema.TabletMetadata; @@ -103,8 +104,8 @@ import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; +import com.google.common.base.Supplier; import com.google.common.collect.Iterators; -import com.google.common.collect.Lists; public class CompactionIT extends AccumuloClusterHarness { @@ -138,34 +139,16 @@ public boolean accept(Key k, Value v) { } - public static class RandomErrorThrowingSelector implements CompactionSelector { - - public static final String FILE_LIST_PARAM = "filesToCompact"; - private static Boolean ERROR_THROWN = Boolean.FALSE; - - private List filesToCompact; + public static class ErrorThrowingSelector implements CompactionSelector { @Override public void init(InitParameters iparams) { - String files = iparams.getOptions().get(FILE_LIST_PARAM); - Objects.requireNonNull(files); - String[] f = files.split(","); - filesToCompact = Lists.newArrayList(f); + } @Override public Selection select(SelectionParameters sparams) { - if (!ERROR_THROWN) { - ERROR_THROWN = Boolean.TRUE; - throw new RuntimeException("Exception for test"); - } - List matches = new ArrayList<>(); - sparams.getAvailableFiles().forEach(cf -> { - if (filesToCompact.contains(cf.getFileName())) { - matches.add(cf); - } - }); - return new Selection(matches); + throw new RuntimeException("Exception for test"); } } @@ -223,28 +206,16 @@ public void testBadSelector() throws Exception { } } - List files = FunctionalTestUtils.getRFilePaths(c, tableName); - assertEquals(4, files.size()); - - String subset = files.get(0).substring(files.get(0).lastIndexOf('/') + 1) + "," - + files.get(3).substring(files.get(3).lastIndexOf('/') + 1); - CompactionConfig config = new CompactionConfig() - .setSelector(new PluginConfig(RandomErrorThrowingSelector.class.getName(), - Map.of(RandomErrorThrowingSelector.FILE_LIST_PARAM, subset))) + .setSelector(new PluginConfig(ErrorThrowingSelector.class.getName(), Map.of())) .setWait(true); - c.tableOperations().compact(tableName, config); - - // check that the subset of files selected are compacted, but the others remain untouched - List filesAfterCompact = FunctionalTestUtils.getRFilePaths(c, tableName); - assertFalse(filesAfterCompact.contains(files.get(0))); - assertTrue(filesAfterCompact.contains(files.get(1))); - assertTrue(filesAfterCompact.contains(files.get(2))); - assertFalse(filesAfterCompact.contains(files.get(3))); + assertThrows(AccumuloException.class, () -> c.tableOperations().compact(tableName, config)); List rows = new ArrayList<>(); c.createScanner(tableName).forEach((k, v) -> rows.add(k.getRow().toString())); assertEquals(List.of("1", "2", "3", "4"), rows); + + assertNoCompactionMetadata(tableName); } } @@ -271,6 +242,8 @@ public void testCompactionWithTableIterator() throws Exception { try (Scanner s = client.createScanner(table1)) { assertFalse(s.iterator().hasNext()); } + + assertNoCompactionMetadata(table1); } } @@ -291,21 +264,21 @@ public void testUserCompactionCancellation() throws Exception { } final AtomicReference error = new AtomicReference<>(); - final AtomicBoolean started = new AtomicBoolean(false); Thread t = new Thread(() -> { try { - started.set(true); IteratorSetting setting = new IteratorSetting(50, "sleepy", SlowIterator.class); setting.addOption("sleepTime", "3000"); setting.addOption("seekSleepTime", "3000"); - client.tableOperations().attachIterator(table1, setting, EnumSet.of(IteratorScope.majc)); - client.tableOperations().compact(table1, new CompactionConfig().setWait(true)); + var cconf = new CompactionConfig().setWait(true).setIterators(List.of(setting)); + client.tableOperations().compact(table1, cconf); } catch (AccumuloSecurityException | TableNotFoundException | AccumuloException e) { error.set(e); } }); t.start(); - while (!started.get()) { + // when the compaction starts it will create a selected files column in the tablet, wait for + // that to happen + while (countTablets(table1, tm -> tm.getSelectedFiles() != null) == 0) { Thread.sleep(1000); } client.tableOperations().cancelCompaction(table1); @@ -313,6 +286,19 @@ public void testUserCompactionCancellation() throws Exception { Exception e = error.get(); assertNotNull(e); assertEquals(TableOperationsImpl.COMPACTION_CANCELED_MSG, e.getMessage()); + // ensure the canceled compaction deletes any tablet metadata related to the compaction + while (countTablets(table1, + tm -> tm.getSelectedFiles() != null || !tm.getCompacted().isEmpty()) > 0) { + Thread.sleep(1000); + } + } + } + + private long countTablets(String tableName, Predicate tabletTest) { + var tableId = TableId.of(getServerContext().tableOperations().tableIdMap().get(tableName)); + try (var tabletsMetadata = + getServerContext().getAmple().readTablets().forTable(tableId).build()) { + return tabletsMetadata.stream().filter(tabletTest).count(); } } @@ -397,28 +383,41 @@ public void testTableDeletedDuringUserCompaction() throws Exception { } final AtomicReference error = new AtomicReference<>(); - final AtomicBoolean started = new AtomicBoolean(false); Thread t = new Thread(() -> { try { - started.set(true); IteratorSetting setting = new IteratorSetting(50, "sleepy", SlowIterator.class); setting.addOption("sleepTime", "3000"); setting.addOption("seekSleepTime", "3000"); - client.tableOperations().attachIterator(table1, setting, EnumSet.of(IteratorScope.majc)); - client.tableOperations().compact(table1, new CompactionConfig().setWait(true)); + var cconf = new CompactionConfig().setWait(true).setIterators(List.of(setting)); + client.tableOperations().compact(table1, cconf); } catch (AccumuloSecurityException | TableNotFoundException | AccumuloException e) { error.set(e); } }); t.start(); - while (!started.get()) { + // when the compaction starts it will create a selected files column in the tablet, wait for + // that to happen + while (countTablets(table1, tm -> tm.getSelectedFiles() != null) == 0) { Thread.sleep(1000); } + + // grab the table id before deleting the table as its needed for a check later and can not get + // it after delete + var tableId = TableId.of(getServerContext().tableOperations().tableIdMap().get(table1)); + client.tableOperations().delete(table1); t.join(); Exception e = error.get(); assertNotNull(e); assertEquals(TableOperationsImpl.COMPACTION_CANCELED_MSG, e.getMessage()); + + // ELASTICITY_TODO make delete table fate op get operation ids before deleting + // there should be no metadata for the table, check to see if the compaction wrote anything + // after table delete + try (var scanner = client.createScanner(MetadataTable.NAME)) { + scanner.setRange(MetadataSchema.TabletsSection.getRange(tableId)); + assertEquals(0, scanner.stream().count()); + } } } @@ -456,7 +455,7 @@ public void testPartialCompaction() throws Exception { // this should create an F file client.tableOperations().flush(tableName, null, null, true); - // TODO compactions flush tablets, needs to evaluate this behavior + // ELASTICITY_TODO compactions flush tablets, needs to evaluate this behavior // run a compaction that only compacts F files iterSetting = new IteratorSetting(100, TestFilter.class); @@ -465,6 +464,7 @@ public void testPartialCompaction() throws Exception { config = new CompactionConfig().setIterators(List.of(iterSetting)).setWait(true) .setSelector(new PluginConfig(FSelector.class.getName())); client.tableOperations().compact(tableName, config); + assertNoCompactionMetadata(tableName); try (Scanner scanner = client.createScanner(tableName)) { int count = 0; @@ -515,6 +515,7 @@ public void testConfigurer() throws Exception { .setConfigurer(new PluginConfig(CompressionConfigurer.class.getName(), Map.of(CompressionConfigurer.LARGE_FILE_COMPRESSION_TYPE, "gz", CompressionConfigurer.LARGE_FILE_COMPRESSION_THRESHOLD, data.length + "")))); + assertNoCompactionMetadata(tableName); // after compacting with compression, expect small file sizes = CompactionExecutorIT.getFileSizes(client, tableName); @@ -522,6 +523,7 @@ public void testConfigurer() throws Exception { "Unexpected files sizes: data: " + data.length + ", file:" + sizes); client.tableOperations().compact(tableName, new CompactionConfig().setWait(true)); + assertNoCompactionMetadata(tableName); // after compacting without compression, expect big files again sizes = CompactionExecutorIT.getFileSizes(client, tableName); @@ -564,6 +566,8 @@ public void testConfigurerSetOnTable() throws Exception { assertTrue(sizes < data.length, "Unexpected files sizes: data: " + data.length + ", file:" + sizes); + assertNoCompactionMetadata(tableName); + } } @@ -687,6 +691,8 @@ public void testMultiStepCompactionThatDeletesAll() throws Exception { var finalCount = countFiles(c); assertTrue(finalCount <= beforeCount); + + assertNoCompactionMetadata(tableName); } } @@ -709,46 +715,182 @@ public void testSelectNoFiles() throws Exception { c.tableOperations().compact(tableName, config); assertEquals(Set.of("a", "b"), getRows(c, tableName)); + + assertNoCompactionMetadata(tableName); } } @Test - public void testConcurrent() throws Exception { - // two compactions without iterators or strategy should be able to run concurrently + public void testConcurrentWithIterators() throws Exception { + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { + + String tableName = getUniqueNames(1)[0]; + c.tableOperations().create(tableName); + try (BatchWriter bw = c.createBatchWriter(tableName)) { + for (int i = 0; i < MAX_DATA; i++) { + Mutation m = new Mutation(String.format("r:%04d", i)); + m.put("", "", "" + i); + bw.addMutation(m); + } + } + + TreeSet splits = new TreeSet<>(); + for (int i = 10; i < MAX_DATA; i += 10) { + splits.add(new Text(String.format("r:%04d", i))); + } + + c.tableOperations().addSplits(tableName, splits); + + // Start three concurrent compactions with different iterators that filter different data. + // Expect all to run on each tablet in some order. + for (int modulus : List.of(2, 3, 5)) { + IteratorSetting iterSetting = new IteratorSetting(100, TestFilter.class); + // make sure iterator options make it to compactor process + iterSetting.addOption("modulus", modulus + ""); + CompactionConfig config = + new CompactionConfig().setIterators(List.of(iterSetting)).setWait(false); + c.tableOperations().compact(tableName, config); + } + // only expected to see numbers that are divisible by 2, 3, and 5 in the data after all + // compactions run + var expected = IntStream.range(0, MAX_DATA).filter(i -> i % (2 * 3 * 5) == 0) + .mapToObj(i -> String.format("r:%04d", i)).collect(toSet()); + + Supplier> actualSupplier = () -> { + try (var scanner = c.createScanner(tableName)) { + return scanner.stream().map(e -> e.getKey().getRowData().toString()).collect(toSet()); + } catch (Exception e) { + throw new RuntimeException(e); + } + }; + + // wait until the filtering done by all three compactions is seen + while (!expected.equals(actualSupplier.get())) { + Thread.sleep(250); + } + + // eventually the compactions should clean up all of their metadata, wait for this to happen + while (countTablets(tableName, + tabletMetadata -> !tabletMetadata.getCompacted().isEmpty() + || tabletMetadata.getSelectedFiles() != null + || !tabletMetadata.getExternalCompactions().isEmpty()) + > 0) { + Thread.sleep(250); + } + } + } + + @Test + public void testConcurrentSplit() throws Exception { + // test compaction and split running concurrently try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { String tableName = getUniqueNames(1)[0]; c.tableOperations().create(tableName); + try (BatchWriter bw = c.createBatchWriter(tableName)) { + for (int i = 0; i < MAX_DATA; i++) { + Mutation m = new Mutation(String.format("r:%04d", i)); + m.put("", "", "" + i); + bw.addMutation(m); + } + } + + TreeSet splits = new TreeSet<>(); + for (int i = 100; i < MAX_DATA; i += 100) { + splits.add(new Text(String.format("r:%04d", i))); + } - // write random data because its very unlikely it will compress - writeRandomValue(c, tableName, 1 << 16); - writeRandomValue(c, tableName, 1 << 16); + // add 10 splits to the table + c.tableOperations().addSplits(tableName, splits); - c.tableOperations().compact(tableName, new CompactionConfig().setWait(false)); - c.tableOperations().compact(tableName, new CompactionConfig().setWait(true)); + for (int modulus : List.of(2, 3)) { + IteratorSetting iterSetting = new IteratorSetting(100, TestFilter.class); + // make sure iterator options make it to compactor process + iterSetting.addOption("modulus", modulus + ""); + CompactionConfig config = + new CompactionConfig().setIterators(List.of(iterSetting)).setWait(false); + c.tableOperations().compact(tableName, config); + } - assertEquals(1, FunctionalTestUtils.countRFiles(c, tableName)); + splits = new TreeSet<>(); + for (int i = 50; i < MAX_DATA; i += 50) { + splits.add(new Text(String.format("r:%04d", i))); + } - writeRandomValue(c, tableName, 1 << 16); + // wait a bit for some tablets to have files selected, it possible the compaction have + // completed before this so do not wait long + Wait.waitFor( + () -> countTablets(tableName, tabletMetadata -> tabletMetadata.getSelectedFiles() != null) + > 0, + 3000, 10); - IteratorSetting iterConfig = new IteratorSetting(30, SlowIterator.class); - SlowIterator.setSleepTime(iterConfig, 1000); + // add 10 more splits to the table + c.tableOperations().addSplits(tableName, splits); - long t1 = System.currentTimeMillis(); - c.tableOperations().compact(tableName, - new CompactionConfig().setWait(false).setIterators(java.util.Arrays.asList(iterConfig))); - try { - // this compaction should fail because previous one set iterators - c.tableOperations().compact(tableName, new CompactionConfig().setWait(true)); - if (System.currentTimeMillis() - t1 < 2000) { - fail("Expected compaction to fail because another concurrent compaction set iterators"); + splits = new TreeSet<>(); + for (int i = 10; i < MAX_DATA; i += 10) { + splits.add(new Text(String.format("r:%04d", i))); + } + + // wait a bit for some tablets to be compacted, it possible the compaction have completed + // before this so do not wait long + Wait.waitFor( + () -> countTablets(tableName, tabletMetadata -> !tabletMetadata.getCompacted().isEmpty()) + > 0, + 3000, 10); + + // add 80 more splits to the table + c.tableOperations().addSplits(tableName, splits); + + assertEquals(99, c.tableOperations().listSplits(tableName).size()); + + // only expect to see numbers that are divisible by 2 and 3 in the data after all + // compactions run + var expected = IntStream.range(0, MAX_DATA).filter(i -> i % (2 * 3) == 0) + .mapToObj(i -> String.format("r:%04d", i)).collect(toSet()); + + Supplier> actualSupplier = () -> { + try (var scanner = c.createScanner(tableName)) { + return scanner.stream().map(e -> e.getKey().getRowData().toString()).collect(toSet()); + } catch (Exception e) { + throw new RuntimeException(e); } - } catch (AccumuloException e) {} + }; + + // wait until the filtering done by all three compactions is seen + while (!expected.equals(actualSupplier.get())) { + Thread.sleep(250); + } + + // eventually the compactions should clean up all of their metadata, wait for this to happen + while (countTablets(tableName, + tabletMetadata -> !tabletMetadata.getCompacted().isEmpty() + || tabletMetadata.getSelectedFiles() != null + || !tabletMetadata.getExternalCompactions().isEmpty()) + > 0) { + Thread.sleep(250); + } } } + private void assertNoCompactionMetadata(String tableName) { + var tableId = TableId.of(getServerContext().tableOperations().tableIdMap().get(tableName)); + var tabletsMetadata = getServerContext().getAmple().readTablets().forTable(tableId).build(); + + int count = 0; + + for (var tabletMetadata : tabletsMetadata) { + assertEquals(Set.of(), tabletMetadata.getCompacted()); + assertNull(tabletMetadata.getSelectedFiles()); + assertEquals(Set.of(), tabletMetadata.getExternalCompactions().keySet()); + count++; + } + + assertTrue(count > 0); + } + @Test public void testMetadataCompactions() throws Exception { // The metadata and root table have default config that causes them to compact down to one @@ -870,7 +1012,7 @@ public void testSystemCompactionsRefresh() throws Exception { var tabletMeta = ((ClientContext) client).getAmple().readTablet(extent); var files = tabletMeta.getFiles(); log.debug("Current files {}", - files.stream().map(StoredTabletFile::getFileName).collect(Collectors.toList())); + files.stream().map(StoredTabletFile::getFileName).collect(toList())); if (files.size() == 1) { // Once only one file exists the tablet may still have not gotten the refresh message @@ -878,7 +1020,7 @@ public void testSystemCompactionsRefresh() throws Exception { // eventually see the tablet refresh its files. try (Scanner scanner = client.createScanner(tableName)) { var acutalData = scanner.stream().map(e -> Integer.parseInt(e.getValue().toString())) - .collect(Collectors.toSet()); + .collect(toSet()); return acutalData.equals(expectedData); } } diff --git a/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java b/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java index 69af3e91ec0..6fe2a309bf7 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java @@ -424,6 +424,11 @@ public ManagerState getManagerState() { return ManagerState.NORMAL; } + @Override + public Map> getCompactionHints() { + return Map.of(); + } + @Override public String toString() { return "tservers: " + tservers + " onlineTables: " + onlineTables; diff --git a/test/src/main/java/org/apache/accumulo/test/manager/MergeStateIT.java b/test/src/main/java/org/apache/accumulo/test/manager/MergeStateIT.java index 3e82445aee4..546427694cd 100644 --- a/test/src/main/java/org/apache/accumulo/test/manager/MergeStateIT.java +++ b/test/src/main/java/org/apache/accumulo/test/manager/MergeStateIT.java @@ -110,6 +110,11 @@ public ManagerState getManagerState() { return ManagerState.NORMAL; } + @Override + public Map> getCompactionHints() { + return Map.of(); + } + @Override public Set shutdownServers() { return Collections.emptySet();