From d2ff216abcd9f11726dba8ba17084928065c388b Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Thu, 8 Jun 2023 22:43:23 -0400 Subject: [PATCH 1/7] Partially moves compactions into the manager This commit moves system compactions into the manager. User compactions are not done yet. The TabletManagmentIterator now finds tablets that need compaction by calling the compaction dispatcher and compaction planner plugins. Any tablets that needs compaction is returned to the manager where it runs the plugins again to generate compaction jobs. The compaction jobs are placed in a new bounded priority queue. The queue is bounded in size to prevent exhausting manager memory, it keeps the Top N jobs with the highest priority. The CompactionCoordinator pulls jobs out of then new queue when compactors request work. A conditional mutation is used in the compaction coordinator to atomically reserve the files for compaction. When the compactor reports the job is done the coordinator atomically updates the tablet metadata using a conditional mutation and if the tablet is hosted its asked to refresh its metadata. User compactions in the manager still need to be implemented. Also many loose ends still need to be implemented for system compactions, these changes will be done in follow on commits. --- assemble/bin/accumulo-cluster | 13 +- .../apache/accumulo/core/conf/Property.java | 2 +- .../core/manager/state/TabletManagement.java | 2 +- .../core/metadata/CompactableFileImpl.java | 2 + .../accumulo/core/metadata/schema/Ample.java | 12 +- .../schema/ExternalCompactionFinalState.java | 1 + .../spi/compaction/CompactionExecutorId.java | 1 + .../compaction/CompactionServicesConfig.java | 17 +- .../compaction/CompactionJobGenerator.java | 304 +++++++++++ .../state/TabletManagementIterator.java | 35 +- .../ConditionalTabletMutatorImpl.java | 23 + .../iterators/CompactionsExistsIterator.java | 52 ++ .../iterators/LocationExistsIterator.java | 4 +- .../server/tablets/TabletNameGenerator.java | 97 ++++ .../server/tablets/UniqueNameAllocator.java | 11 - server/manager/pom.xml | 4 + .../org/apache/accumulo/manager/Manager.java | 20 +- .../accumulo/manager/TabletGroupWatcher.java | 19 + .../compaction/CompactionFinalizer.java | 229 -------- .../manager/compaction/QueueAndPriority.java | 90 --- .../manager/compaction/QueueSummaries.java | 220 -------- .../CompactionCoordinator.java | 359 +++++++----- .../DeadCompactionDetector.java | 13 +- .../queue/CompactionJobPriorityQueue.java | 188 +++++++ .../compaction/queue/CompactionJobQueues.java | 105 ++++ .../manager/tableOps/split/PreSplit.java | 4 +- .../compaction/CompactionCoordinatorTest.java | 515 +----------------- .../compaction/QueueSummariesTest.java | 194 ------- .../tserver/tablet/CompactableImpl.java | 5 + .../tserver/tablet/CompactableUtils.java | 12 - .../tserver/tablet/DatafileManager.java | 3 +- .../accumulo/tserver/tablet/Tablet.java | 55 +- .../compaction/CompactableUtilsTest.java | 4 +- .../accumulo/test/functional/BulkNewIT.java | 25 + 34 files changed, 1178 insertions(+), 1462 deletions(-) create mode 100644 server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionJobGenerator.java create mode 100644 server/base/src/main/java/org/apache/accumulo/server/metadata/iterators/CompactionsExistsIterator.java create mode 100644 server/base/src/main/java/org/apache/accumulo/server/tablets/TabletNameGenerator.java delete mode 100644 server/manager/src/main/java/org/apache/accumulo/manager/compaction/CompactionFinalizer.java delete mode 100644 server/manager/src/main/java/org/apache/accumulo/manager/compaction/QueueAndPriority.java delete mode 100644 server/manager/src/main/java/org/apache/accumulo/manager/compaction/QueueSummaries.java rename server/manager/src/main/java/org/apache/accumulo/manager/compaction/{ => coordinator}/CompactionCoordinator.java (66%) rename server/manager/src/main/java/org/apache/accumulo/manager/compaction/{ => coordinator}/DeadCompactionDetector.java (91%) create mode 100644 server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java create mode 100644 server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java delete mode 100644 server/manager/src/test/java/org/apache/accumulo/manager/compaction/QueueSummariesTest.java diff --git a/assemble/bin/accumulo-cluster b/assemble/bin/accumulo-cluster index 54f0c6afecf..80229922dff 100755 --- a/assemble/bin/accumulo-cluster +++ b/assemble/bin/accumulo-cluster @@ -459,16 +459,17 @@ gc: tserver: - localhost +compaction: + compactor: + - user-small: + - localhost + - user-large: + - localhost + #sserver: # - default: # - localhost # -#compaction: -# compactor: -# - q1: -# - localhost -# - q2: -# - localhost # # diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index dfc896f13c5..bb1f2f427fa 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -606,7 +606,7 @@ public enum Property { "The maximum number of files a compaction will open", "2.1.0"), TSERV_COMPACTION_SERVICE_DEFAULT_EXECUTORS( "tserver.compaction.major.service.default.planner.opts.executors", - "[{'name':'small','type':'internal','maxSize':'32M','numThreads':2},{'name':'medium','type':'internal','maxSize':'128M','numThreads':2},{'name':'large','type':'internal','numThreads':2}]" + ("[{'name':'small','type':'external','maxSize':'128M','queue':'user-small'}, {'name':'large','type':'external','queue':'user-large'}]") .replaceAll("'", "\""), PropertyType.STRING, "See {% jlink -f org.apache.accumulo.core.spi.compaction.DefaultCompactionPlanner %} ", diff --git a/core/src/main/java/org/apache/accumulo/core/manager/state/TabletManagement.java b/core/src/main/java/org/apache/accumulo/core/manager/state/TabletManagement.java index d6aece245f0..17869791eec 100644 --- a/core/src/main/java/org/apache/accumulo/core/manager/state/TabletManagement.java +++ b/core/src/main/java/org/apache/accumulo/core/manager/state/TabletManagement.java @@ -45,7 +45,7 @@ public class TabletManagement { public static final EnumSet CONFIGURED_COLUMNS = EnumSet.of(ColumnType.PREV_ROW, ColumnType.LOCATION, ColumnType.SUSPEND, ColumnType.LOGS, ColumnType.CHOPPED, ColumnType.HOSTING_GOAL, ColumnType.HOSTING_REQUESTED, - ColumnType.FILES, ColumnType.LAST, ColumnType.OPID); + ColumnType.FILES, ColumnType.LAST, ColumnType.OPID, ColumnType.ECOMP, ColumnType.DIR); private static final Text REASONS_COLUMN_NAME = new Text("REASONS"); diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/CompactableFileImpl.java b/core/src/main/java/org/apache/accumulo/core/metadata/CompactableFileImpl.java index e8cf1090601..d09ad4e1755 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/CompactableFileImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/CompactableFileImpl.java @@ -30,6 +30,8 @@ public class CompactableFileImpl implements CompactableFile { private final DataFileValue dataFileValue; public CompactableFileImpl(URI uri, long size, long entries) { + // TODO this normalizes the path passing it through URI defeating the purpose of + // StoredTabletFile this.storedTabletFile = new StoredTabletFile(uri.toString()); this.dataFileValue = new DataFileValue(size, entries); } diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java index bc7dcdc5594..1c2571a4959 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java @@ -338,7 +338,7 @@ interface TabletUpdates { T putBulkFile(ReferencedTabletFile bulkref, long tid); - T deleteBulkFile(ReferencedTabletFile bulkref); + T deleteBulkFile(ReferencedTabletFile bulkref); // TODO should probably be a stored tablet file T putChopped(); @@ -443,6 +443,16 @@ interface ConditionalTabletMutator extends TabletUpdates * Ample provides the following features on top of the conditional writer to help automate diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/ExternalCompactionFinalState.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/ExternalCompactionFinalState.java index 9212ccde9fd..88ada722af5 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/ExternalCompactionFinalState.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/ExternalCompactionFinalState.java @@ -28,6 +28,7 @@ import com.google.common.base.Preconditions; import com.google.gson.Gson; +// ELASTICITY_TODO remove this class, remove it from ample, add upgrade code to remove it from metadata table public class ExternalCompactionFinalState { private static final Gson GSON = new Gson(); diff --git a/core/src/main/java/org/apache/accumulo/core/spi/compaction/CompactionExecutorId.java b/core/src/main/java/org/apache/accumulo/core/spi/compaction/CompactionExecutorId.java index 9a40a560a60..5a456f6e997 100644 --- a/core/src/main/java/org/apache/accumulo/core/spi/compaction/CompactionExecutorId.java +++ b/core/src/main/java/org/apache/accumulo/core/spi/compaction/CompactionExecutorId.java @@ -28,6 +28,7 @@ * @see org.apache.accumulo.core.spi.compaction */ public class CompactionExecutorId extends AbstractId { + // ELASTICITY_TODO make this cache ids like TableId. This will help save manager memory. private static final long serialVersionUID = 1L; protected CompactionExecutorId(String canonical) { diff --git a/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionServicesConfig.java b/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionServicesConfig.java index c6e1cea55de..471cd28f8a2 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionServicesConfig.java +++ b/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionServicesConfig.java @@ -21,7 +21,9 @@ import java.util.HashMap; import java.util.Map; import java.util.Objects; +import java.util.function.Predicate; +import org.apache.accumulo.core.client.PluginEnvironment; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.ConfigurationTypeHelper; import org.apache.accumulo.core.conf.Property; @@ -49,13 +51,21 @@ private long getDefaultThroughput() { .getMemoryAsBytes(Property.TSERV_COMPACTION_SERVICE_DEFAULT_RATE_LIMIT.getDefaultValue()); } - private Map getConfiguration(AccumuloConfiguration aconf) { + private static Map getConfiguration(AccumuloConfiguration aconf) { return aconf.getAllPropertiesWithPrefix(Property.TSERV_COMPACTION_SERVICE_PREFIX); } + public CompactionServicesConfig(PluginEnvironment.Configuration conf) { + // TODO will probably not need rate limit eventually and the 2nd param predicate can go away + this(conf.getWithPrefix(Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey()), + property -> conf.isSet(property.getKey())); + } + public CompactionServicesConfig(AccumuloConfiguration aconf) { - Map configs = getConfiguration(aconf); + this(getConfiguration(aconf), aconf::isPropertySet); + } + private CompactionServicesConfig(Map configs, Predicate isSetPredicate) { configs.forEach((prop, val) -> { var suffix = prop.substring(Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey().length()); @@ -66,7 +76,7 @@ public CompactionServicesConfig(AccumuloConfiguration aconf) { planners.put(tokens[0], val); } else if (tokens.length == 3 && tokens[1].equals("rate") && tokens[2].equals("limit")) { var eprop = Property.getPropertyByKey(prop); - if (eprop == null || aconf.isPropertySet(eprop)) { + if (eprop == null || isSetPredicate.test(eprop)) { rateLimits.put(tokens[0], ConfigurationTypeHelper.getFixedMemoryAsBytes(val)); } } else { @@ -82,7 +92,6 @@ public CompactionServicesConfig(AccumuloConfiguration aconf) { throw new IllegalArgumentException( "Incomplete compaction service definitions, missing planner class " + diff); } - } public long getRateLimit(String serviceName) { diff --git a/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionJobGenerator.java b/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionJobGenerator.java new file mode 100644 index 00000000000..373603d9bb7 --- /dev/null +++ b/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionJobGenerator.java @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.server.compaction; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +import org.apache.accumulo.core.client.PluginEnvironment; +import org.apache.accumulo.core.client.admin.compaction.CompactableFile; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.metadata.CompactableFileImpl; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.spi.common.ServiceEnvironment; +import org.apache.accumulo.core.spi.compaction.CompactionDispatcher; +import org.apache.accumulo.core.spi.compaction.CompactionExecutorId; +import org.apache.accumulo.core.spi.compaction.CompactionJob; +import org.apache.accumulo.core.spi.compaction.CompactionKind; +import org.apache.accumulo.core.spi.compaction.CompactionPlan; +import org.apache.accumulo.core.spi.compaction.CompactionPlanner; +import org.apache.accumulo.core.spi.compaction.CompactionServiceId; +import org.apache.accumulo.core.spi.compaction.CompactionServices; +import org.apache.accumulo.core.spi.compaction.ExecutorManager; +import org.apache.accumulo.core.util.compaction.CompactionExecutorIdImpl; +import org.apache.accumulo.core.util.compaction.CompactionJobImpl; +import org.apache.accumulo.core.util.compaction.CompactionPlanImpl; +import org.apache.accumulo.core.util.compaction.CompactionServicesConfig; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; + +public class CompactionJobGenerator { + + private final CompactionServicesConfig servicesConfig; + private final Map planners = new HashMap<>(); + private final Cache dispatchers; + private final Set serviceIds; + private final PluginEnvironment env; + + public CompactionJobGenerator(PluginEnvironment env) { + servicesConfig = new CompactionServicesConfig(env.getConfiguration()); + serviceIds = servicesConfig.getPlanners().keySet().stream().map(CompactionServiceId::of) + .collect(Collectors.toUnmodifiableSet()); + + dispatchers = Caffeine.newBuilder().maximumSize(10).build(); + this.env = env; + } + + public Collection generateJobs(TabletMetadata tablet, Set kinds) { + + // ELASTICITY_TODO do not want user configured plugins to cause exceptions that prevents tablets + // from being + // assigned. So probably want to catch exceptions and log, but not too spammily OR some how + // report something + // back to the manager so it can log. + + if (kinds.contains(CompactionKind.SYSTEM)) { + CompactionServiceId serviceId = dispatch(CompactionKind.SYSTEM, tablet); + + return planCompactions(serviceId, CompactionKind.SYSTEM, tablet); + } else { + return Set.of(); + } + } + + private CompactionServiceId dispatch(CompactionKind kind, TabletMetadata tablet) { + + CompactionDispatcher dispatcher = dispatchers.get(tablet.getTableId(), this::createDispatcher); + + CompactionDispatcher.DispatchParameters dispatchParams = + new CompactionDispatcher.DispatchParameters() { + @Override + public CompactionServices getCompactionServices() { + return () -> serviceIds; + } + + @Override + public ServiceEnvironment getServiceEnv() { + return (ServiceEnvironment) env; + } + + @Override + public CompactionKind getCompactionKind() { + return kind; + } + + @Override + public Map getExecutionHints() { + // ELASTICITY_TODO do for user compactions + return Map.of(); + } + }; + + return dispatcher.dispatch(dispatchParams).getService(); + } + + private CompactionDispatcher createDispatcher(TableId tableId) { + + var conf = env.getConfiguration(); + + var className = conf.get(Property.TABLE_COMPACTION_DISPATCHER.getKey()); + + Map opts = new HashMap<>(); + + conf.getWithPrefix(Property.TABLE_COMPACTION_DISPATCHER_OPTS.getKey()).forEach((k, v) -> { + opts.put(k.substring(Property.TABLE_COMPACTION_DISPATCHER.getKey().length()), v); + }); + + var finalOpts = Collections.unmodifiableMap(opts); + + CompactionDispatcher.InitParameters initParameters = new CompactionDispatcher.InitParameters() { + @Override + public Map getOptions() { + return finalOpts; + } + + @Override + public TableId getTableId() { + return tableId; + } + + @Override + public ServiceEnvironment getServiceEnv() { + return (ServiceEnvironment) env; + } + }; + + CompactionDispatcher dispatcher = null; + try { + dispatcher = env.instantiate(className, CompactionDispatcher.class); + } catch (ReflectiveOperationException e) { + throw new RuntimeException(e); + } + + dispatcher.init(initParameters); + + return dispatcher; + } + + private Collection planCompactions(CompactionServiceId serviceId, + CompactionKind kind, TabletMetadata tablet) { + + CompactionPlanner planner = + planners.computeIfAbsent(serviceId, sid -> createPlanner(serviceId)); + + // selecting indicator + // selected files + + String ratioStr = + env.getConfiguration(tablet.getTableId()).get(Property.TABLE_MAJC_RATIO.getKey()); + if (ratioStr == null) { + ratioStr = Property.TABLE_MAJC_RATIO.getDefaultValue(); + } + + double ratio = Double.parseDouble(ratioStr); + + Set allFiles = tablet.getFilesMap().entrySet().stream() + .map(entry -> new CompactableFileImpl(entry.getKey(), entry.getValue())) + .collect(Collectors.toUnmodifiableSet()); + Set candidates; + + if (kind == CompactionKind.SYSTEM) { + if (tablet.getExternalCompactions().isEmpty()) { + candidates = allFiles; + } else { + var tmpFiles = new HashMap<>(tablet.getFilesMap()); + tablet.getExternalCompactions().values().stream().flatMap(ecm -> ecm.getJobFiles().stream()) + .forEach(tmpFiles::remove); + candidates = tmpFiles.entrySet().stream() + .map(entry -> new CompactableFileImpl(entry.getKey(), entry.getValue())) + .collect(Collectors.toUnmodifiableSet()); + } + } else { + throw new UnsupportedOperationException(); + } + + CompactionPlanner.PlanningParameters params = new CompactionPlanner.PlanningParameters() { + @Override + public TableId getTableId() { + return tablet.getTableId(); + } + + @Override + public ServiceEnvironment getServiceEnvironment() { + return (ServiceEnvironment) env; + } + + @Override + public CompactionKind getKind() { + return kind; + } + + @Override + public double getRatio() { + return ratio; + } + + @Override + public Collection getAll() { + return allFiles; + } + + @Override + public Collection getCandidates() { + return candidates; + } + + @Override + public Collection getRunningCompactions() { + var allFiles2 = tablet.getFilesMap(); + return tablet.getExternalCompactions().values().stream().map(ecMeta -> { + Collection files = ecMeta.getJobFiles().stream() + .map(f -> new CompactableFileImpl(f, allFiles2.get(f))).collect(Collectors.toList()); + CompactionJob job = new CompactionJobImpl(ecMeta.getPriority(), + ecMeta.getCompactionExecutorId(), files, ecMeta.getKind(), Optional.empty()); + return job; + }).collect(Collectors.toList()); + } + + @Override + public Map getExecutionHints() { + // ELASTICITY_TODO implement for user compactions + return Map.of(); + } + + @Override + public CompactionPlan.Builder createPlanBuilder() { + return new CompactionPlanImpl.BuilderImpl(kind, allFiles, candidates); + } + }; + + return planner.makePlan(params).getJobs(); + } + + private CompactionPlanner createPlanner(CompactionServiceId serviceId) { + + String plannerClassName = servicesConfig.getPlanners().get(serviceId.canonical()); + + CompactionPlanner planner = null; + try { + planner = env.instantiate(plannerClassName, CompactionPlanner.class); + } catch (ReflectiveOperationException e) { + throw new RuntimeException(e); + } + + CompactionPlanner.InitParameters initParameters = new CompactionPlanner.InitParameters() { + @Override + public ServiceEnvironment getServiceEnvironment() { + return (ServiceEnvironment) env; + } + + @Override + public Map getOptions() { + return servicesConfig.getOptions().get(serviceId.canonical()); + } + + @Override + public String getFullyQualifiedOption(String key) { + return Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + serviceId + ".opts." + key; + } + + @Override + public ExecutorManager getExecutorManager() { + return new ExecutorManager() { + @Override + public CompactionExecutorId createExecutor(String name, int threads) { + // ELASTICITY_TODO need to deprecate + return CompactionExecutorIdImpl.internalId(serviceId, name); + } + + @Override + public CompactionExecutorId getExternalExecutor(String name) { + return CompactionExecutorIdImpl.externalId(name); + } + }; + } + }; + + planner.init(initParameters); + + return planner; + } +} diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java index 814c4ee2241..3af8cf9f76d 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java @@ -23,6 +23,8 @@ import java.util.Arrays; import java.util.Base64; import java.util.Collection; +import java.util.Collections; +import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -53,6 +55,7 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ChoppedColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ExternalCompactionColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.FutureLocationColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.HostingColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LastLocationColumnFamily; @@ -61,7 +64,9 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.SuspendLocationColumn; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily; import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.spi.compaction.CompactionKind; import org.apache.accumulo.core.util.AddressUtil; +import org.apache.accumulo.server.compaction.CompactionJobGenerator; import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.io.DataOutputBuffer; import org.slf4j.Logger; @@ -86,6 +91,7 @@ public class TabletManagementIterator extends SkippingIterator { private static final String MIGRATIONS_OPTION = "migrations"; private static final String MANAGER_STATE_OPTION = "managerState"; private static final String SHUTTING_DOWN_OPTION = "shuttingDown"; + private CompactionJobGenerator compactionGenerator; private static void setCurrentServers(final IteratorSetting cfg, final Set goodServers) { @@ -265,7 +271,9 @@ private boolean shouldReturnDueToLocation(final TabletMetadata tm, } public static void configureScanner(final ScannerBase scanner, final CurrentState state) { + // TODO so many columns are being fetch it may not make sense to fetch columns TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner); + ServerColumnFamily.DIRECTORY_COLUMN.fetch(scanner); scanner.fetchColumnFamily(CurrentLocationColumnFamily.NAME); scanner.fetchColumnFamily(FutureLocationColumnFamily.NAME); scanner.fetchColumnFamily(LastLocationColumnFamily.NAME); @@ -274,6 +282,7 @@ public static void configureScanner(final ScannerBase scanner, final CurrentStat scanner.fetchColumnFamily(ChoppedColumnFamily.NAME); scanner.fetchColumnFamily(HostingColumnFamily.NAME); scanner.fetchColumnFamily(DataFileColumnFamily.NAME); + scanner.fetchColumnFamily(ExternalCompactionColumnFamily.NAME); ServerColumnFamily.OPID_COLUMN.fetch(scanner); scanner.addScanIterator(new IteratorSetting(1000, "wholeRows", WholeRowIterator.class)); IteratorSetting tabletChange = @@ -329,6 +338,7 @@ public void init(SortedKeyValueIterator source, Map op if (shuttingDown != null) { current.removeAll(shuttingDown); } + compactionGenerator = new CompactionJobGenerator(env.getPluginEnv()); } @Override @@ -418,8 +428,31 @@ private void computeTabletManagementActions(final TabletMetadata tm, reasonsToReturnThisTablet.add(ManagementAction.NEEDS_SPLITTING); } - // TODO: Add compaction logic + // important to call this since reasonsToReturnThisTablet is passed to it + if (!compactionGenerator.generateJobs(tm, determineCompactionKinds(reasonsToReturnThisTablet)) + .isEmpty()) { + reasonsToReturnThisTablet.add(ManagementAction.NEEDS_COMPACTING); + } } } + + private static final Set ALL_COMPACTION_KINDS = + Collections.unmodifiableSet(EnumSet.allOf(CompactionKind.class)); + private static final Set SPLIT_COMPACTION_KINDS; + + static { + var tmp = EnumSet.allOf(CompactionKind.class); + tmp.remove(CompactionKind.SYSTEM); + SPLIT_COMPACTION_KINDS = Collections.unmodifiableSet(tmp); + } + + public static Set + determineCompactionKinds(Set reasonsToReturnThisTablet) { + if (reasonsToReturnThisTablet.contains(ManagementAction.NEEDS_SPLITTING)) { + return SPLIT_COMPACTION_KINDS; + } else { + return ALL_COMPACTION_KINDS; + } + } } diff --git a/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java b/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java index 45e6664f465..b3ae08e1c08 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java +++ b/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java @@ -36,12 +36,15 @@ import org.apache.accumulo.core.metadata.ReferencedTabletFile; import org.apache.accumulo.core.metadata.StoredTabletFile; import org.apache.accumulo.core.metadata.schema.Ample; +import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.BulkFileColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ExternalCompactionColumnFamily; import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location; import org.apache.accumulo.core.metadata.schema.TabletOperationId; import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.server.metadata.iterators.CompactionsExistsIterator; import org.apache.accumulo.server.metadata.iterators.LocationExistsIterator; import org.apache.accumulo.server.metadata.iterators.PresentIterator; import org.apache.accumulo.server.metadata.iterators.TabletExistsIterator; @@ -132,6 +135,26 @@ public Ample.ConditionalTabletMutator requireHostingGoal(TabletHostingGoal goal) return this; } + @Override + public Ample.ConditionalTabletMutator requireAbsentCompactions() { + Preconditions.checkState(updatesEnabled, "Cannot make updates after calling mutate."); + IteratorSetting is = + new IteratorSetting(INITIAL_ITERATOR_PRIO, CompactionsExistsIterator.class); + Condition c = new Condition(ExternalCompactionColumnFamily.STR_NAME, "").setIterators(is); + mutation.addCondition(c); + return this; + } + + @Override + public Ample.ConditionalTabletMutator requireCompaction(ExternalCompactionId ecid) { + Preconditions.checkState(updatesEnabled, "Cannot make updates after calling mutate."); + IteratorSetting is = new IteratorSetting(INITIAL_ITERATOR_PRIO, PresentIterator.class); + Condition c = new Condition(ExternalCompactionColumnFamily.STR_NAME, ecid.canonical()) + .setValue(PresentIterator.VALUE).setIterators(is); + mutation.addCondition(c); + return this; + } + @Override public Ample.ConditionalTabletMutator requireAbsentTablet() { Preconditions.checkState(updatesEnabled, "Cannot make updates after calling mutate."); diff --git a/server/base/src/main/java/org/apache/accumulo/server/metadata/iterators/CompactionsExistsIterator.java b/server/base/src/main/java/org/apache/accumulo/server/metadata/iterators/CompactionsExistsIterator.java new file mode 100644 index 00000000000..1e7bc418cca --- /dev/null +++ b/server/base/src/main/java/org/apache/accumulo/server/metadata/iterators/CompactionsExistsIterator.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.server.metadata.iterators; + +import java.io.IOException; +import java.util.Collection; +import java.util.Set; + +import org.apache.accumulo.core.data.ArrayByteSequence; +import org.apache.accumulo.core.data.ByteSequence; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.PartialKey; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.iterators.WrappingIterator; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ExternalCompactionColumnFamily; +import org.apache.hadoop.io.Text; + +public class CompactionsExistsIterator extends WrappingIterator { + private static final Collection FAMS = + Set.of(new ArrayByteSequence(ExternalCompactionColumnFamily.STR_NAME)); + + @Override + public void seek(Range range, Collection columnFamilies, boolean inclusive) + throws IOException { + + Text tabletRow = LocationExistsIterator.getTabletRow(range); + Key startKey = new Key(tabletRow, ExternalCompactionColumnFamily.NAME); + Key endKey = + new Key(tabletRow, ExternalCompactionColumnFamily.NAME).followingKey(PartialKey.ROW_COLFAM); + + Range r = new Range(startKey, true, endKey, false); + + super.seek(r, FAMS, true); + } + +} diff --git a/server/base/src/main/java/org/apache/accumulo/server/metadata/iterators/LocationExistsIterator.java b/server/base/src/main/java/org/apache/accumulo/server/metadata/iterators/LocationExistsIterator.java index bbe5d14a2e5..bd4396b33bb 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/metadata/iterators/LocationExistsIterator.java +++ b/server/base/src/main/java/org/apache/accumulo/server/metadata/iterators/LocationExistsIterator.java @@ -20,7 +20,7 @@ import java.io.IOException; import java.util.Collection; -import java.util.List; +import java.util.Set; import org.apache.accumulo.core.data.ArrayByteSequence; import org.apache.accumulo.core.data.ByteSequence; @@ -41,7 +41,7 @@ */ public class LocationExistsIterator extends WrappingIterator { private static final Collection LOC_FAMS = - List.of(new ArrayByteSequence(FutureLocationColumnFamily.STR_NAME), + Set.of(new ArrayByteSequence(FutureLocationColumnFamily.STR_NAME), new ArrayByteSequence(CurrentLocationColumnFamily.STR_NAME)); @Override diff --git a/server/base/src/main/java/org/apache/accumulo/server/tablets/TabletNameGenerator.java b/server/base/src/main/java/org/apache/accumulo/server/tablets/TabletNameGenerator.java new file mode 100644 index 00000000000..4106c174085 --- /dev/null +++ b/server/base/src/main/java/org/apache/accumulo/server/tablets/TabletNameGenerator.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.server.tablets; + +import java.util.function.Consumer; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.file.FileOperations; +import org.apache.accumulo.core.file.FilePrefix; +import org.apache.accumulo.core.metadata.ReferencedTabletFile; +import org.apache.accumulo.core.metadata.schema.MetadataSchema; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.spi.fs.VolumeChooserEnvironment; +import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.server.fs.VolumeChooserEnvironmentImpl; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; + +/** + * contains code related to generating new file path for a tablet + */ +public class TabletNameGenerator { + + public static String createTabletDirectoryName(ServerContext context, Text endRow) { + if (endRow == null) { + return MetadataSchema.TabletsSection.ServerColumnFamily.DEFAULT_TABLET_DIR_NAME; + } else { + UniqueNameAllocator namer = context.getUniqueNameAllocator(); + return Constants.GENERATED_TABLET_DIRECTORY_PREFIX + namer.getNextName(); + } + } + + public static String chooseTabletDir(ServerContext context, KeyExtent extent, String dirName, + Consumer dirCreator) { + VolumeChooserEnvironment chooserEnv = + new VolumeChooserEnvironmentImpl(extent.tableId(), extent.endRow(), context); + String dirUri = context.getVolumeManager().choose(chooserEnv, context.getBaseUris()) + + Constants.HDFS_TABLES_DIR + Path.SEPARATOR + extent.tableId() + Path.SEPARATOR + dirName; + dirCreator.accept(dirUri); + return dirUri; + } + + public static ReferencedTabletFile getNextDataFilename(FilePrefix prefix, ServerContext context, + KeyExtent extent, String dirName, Consumer dirCreator) { + String extension = + FileOperations.getNewFileExtension(context.getTableConfiguration(extent.tableId())); + return new ReferencedTabletFile( + new Path(chooseTabletDir(context, extent, dirName, dirCreator) + "/" + prefix.toPrefix() + + context.getUniqueNameAllocator().getNextName() + "." + extension)); + } + + public static ReferencedTabletFile getNextDataFilenameForMajc(boolean propagateDeletes, + ServerContext context, KeyExtent extent, String dirName, Consumer dirCreator) { + String tmpFileName = getNextDataFilename( + !propagateDeletes ? FilePrefix.MAJOR_COMPACTION_ALL_FILES : FilePrefix.MAJOR_COMPACTION, + context, extent, dirName, dirCreator).getMetaInsert() + "_tmp"; + return new ReferencedTabletFile(new Path(tmpFileName)); + } + + public static ReferencedTabletFile getNextDataFilenameForMajc(boolean propagateDeletes, + ServerContext context, TabletMetadata tabletMetadata, Consumer dirCreator) { + String tmpFileName = getNextDataFilename( + !propagateDeletes ? FilePrefix.MAJOR_COMPACTION_ALL_FILES : FilePrefix.MAJOR_COMPACTION, + context, tabletMetadata.getExtent(), tabletMetadata.getDirName(), dirName -> {}) + .getMetaInsert() + "_tmp"; + return new ReferencedTabletFile(new Path(tmpFileName)); + } + + public static ReferencedTabletFile computeCompactionFileDest(ReferencedTabletFile tmpFile) { + String newFilePath = tmpFile.getMetaInsert(); + int idx = newFilePath.indexOf("_tmp"); + if (idx > 0) { + newFilePath = newFilePath.substring(0, idx); + } else { + throw new IllegalArgumentException( + "Expected compaction tmp file " + tmpFile.getMetaInsert() + " to have suffix '_tmp'"); + } + return new ReferencedTabletFile(new Path(newFilePath)); + } +} diff --git a/server/base/src/main/java/org/apache/accumulo/server/tablets/UniqueNameAllocator.java b/server/base/src/main/java/org/apache/accumulo/server/tablets/UniqueNameAllocator.java index 188b4f428c8..66775fa1176 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/tablets/UniqueNameAllocator.java +++ b/server/base/src/main/java/org/apache/accumulo/server/tablets/UniqueNameAllocator.java @@ -23,10 +23,8 @@ import java.security.SecureRandom; import org.apache.accumulo.core.Constants; -import org.apache.accumulo.core.metadata.schema.MetadataSchema; import org.apache.accumulo.core.util.FastFormat; import org.apache.accumulo.server.ServerContext; -import org.apache.hadoop.io.Text; /** * Allocates unique names for an accumulo instance. The names are unique for the lifetime of the @@ -47,15 +45,6 @@ public UniqueNameAllocator(ServerContext context) { nextNamePath = Constants.ZROOT + "/" + context.getInstanceID() + Constants.ZNEXT_FILE; } - public static String createTabletDirectoryName(ServerContext context, Text endRow) { - if (endRow == null) { - return MetadataSchema.TabletsSection.ServerColumnFamily.DEFAULT_TABLET_DIR_NAME; - } else { - UniqueNameAllocator namer = context.getUniqueNameAllocator(); - return Constants.GENERATED_TABLET_DIRECTORY_PREFIX + namer.getNextName(); - } - } - public synchronized String getNextName() { while (next >= maxAllocated) { diff --git a/server/manager/pom.xml b/server/manager/pom.xml index f08e01070a0..f9e0e45234f 100644 --- a/server/manager/pom.xml +++ b/server/manager/pom.xml @@ -80,6 +80,10 @@ org.apache.accumulo accumulo-start + + org.apache.accumulo + accumulo-tserver + org.apache.commons commons-lang3 diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index 4796a1aebc9..1d81fee284f 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -114,7 +114,8 @@ import org.apache.accumulo.core.util.Retry; import org.apache.accumulo.core.util.threads.ThreadPools; import org.apache.accumulo.core.util.threads.Threads; -import org.apache.accumulo.manager.compaction.CompactionCoordinator; +import org.apache.accumulo.manager.compaction.coordinator.CompactionCoordinator; +import org.apache.accumulo.manager.compaction.queue.CompactionJobQueues; import org.apache.accumulo.manager.metrics.ManagerMetrics; import org.apache.accumulo.manager.recovery.RecoveryManager; import org.apache.accumulo.manager.split.Splitter; @@ -621,6 +622,12 @@ public Splitter getSplitter() { return splitter; } + private CompactionJobQueues compactionJobQueues; + + public CompactionJobQueues getCompactionQueues() { + return compactionJobQueues; + } + enum TabletGoalState { HOSTED(TUnloadTabletGoal.UNKNOWN), UNASSIGNED(TUnloadTabletGoal.UNASSIGNED), @@ -1146,13 +1153,16 @@ public void run() { final ServerContext context = getContext(); final String zroot = getZooKeeperRoot(); + this.compactionJobQueues = new CompactionJobQueues(); + // ACCUMULO-4424 Put up the Thrift servers before getting the lock as a sign of process health // when a hot-standby // // Start the Manager's Fate Service fateServiceHandler = new FateServiceHandler(this); managerClientHandler = new ManagerClientServiceHandler(this); - compactionCoordinator = new CompactionCoordinator(context, tserverSet, security); + compactionCoordinator = + new CompactionCoordinator(context, tserverSet, security, compactionJobQueues); // Start the Manager's Client service // Ensure that calls before the manager gets the lock fail ManagerClientService.Iface haProxy = @@ -1242,6 +1252,9 @@ public void process(WatchedEvent event) { throw new IllegalStateException("Unable to read " + zroot + Constants.ZRECOVERY, e); } + this.splitter = new Splitter(context); + this.splitter.start(); + watchers.add(new TabletGroupWatcher(this, TabletStateStore.getStoreForLevel(DataLevel.USER, context, this), null) { @Override @@ -1348,9 +1361,6 @@ boolean canSuspendTablets() { throw new IllegalStateException("Exception updating manager lock", e); } - this.splitter = new Splitter(context); - this.splitter.start(); - while (!clientService.isServing()) { sleepUninterruptibly(100, MILLISECONDS); } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java index 44fb1ba38f5..8764d212ca9 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java @@ -90,6 +90,8 @@ import org.apache.accumulo.manager.state.TableCounts; import org.apache.accumulo.manager.state.TableStats; import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.server.ServiceEnvironmentImpl; +import org.apache.accumulo.server.compaction.CompactionJobGenerator; import org.apache.accumulo.server.conf.TableConfiguration; import org.apache.accumulo.server.gc.AllVolumesDirectory; import org.apache.accumulo.server.log.WalStateManager; @@ -100,6 +102,7 @@ import org.apache.accumulo.server.manager.state.DistributedStoreException; import org.apache.accumulo.server.manager.state.MergeInfo; import org.apache.accumulo.server.manager.state.MergeState; +import org.apache.accumulo.server.manager.state.TabletManagementIterator; import org.apache.accumulo.server.manager.state.TabletStateStore; import org.apache.accumulo.server.manager.state.UnassignedTablet; import org.apache.accumulo.server.tablets.TabletTime; @@ -237,6 +240,10 @@ public void run() { ManagerState managerState = manager.getManagerState(); int[] counts = new int[TabletState.values().length]; stats.begin(); + + CompactionJobGenerator compactionGenerator = + new CompactionJobGenerator(new ServiceEnvironmentImpl(manager.getContext())); + // Walk through the tablets in our store, and work tablets // towards their goal iter = store.iterator(); @@ -322,6 +329,18 @@ public void run() { // sendSplitRequest(mergeStats.getMergeInfo(), state, tm); } + if (actions.contains(ManagementAction.NEEDS_COMPACTING)) { + var jobs = compactionGenerator.generateJobs(tm, + TabletManagementIterator.determineCompactionKinds(actions)); + LOG.debug("{} may need compacting.", tm.getExtent()); + manager.getCompactionQueues().add(tm, jobs); + } + + // ELASITICITY_TODO the case where a planner generates compactions at time T1 for tablet + // and later at time T2 generates nothing for the same tablet is not being handled. At + // time T1 something could have been queued. However at time T2 we will not clear those + // entries from the queue because we see nothing here for that case. + if (actions.contains(ManagementAction.NEEDS_LOCATION_UPDATE)) { if (goal == TabletGoalState.HOSTED) { if ((state != TabletState.HOSTED && !tm.getLogs().isEmpty()) diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/CompactionFinalizer.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/CompactionFinalizer.java deleted file mode 100644 index 7279bc08a9a..00000000000 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/CompactionFinalizer.java +++ /dev/null @@ -1,229 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.accumulo.manager.compaction; - -import static java.util.function.Function.identity; -import static java.util.stream.Collectors.toList; -import static java.util.stream.Collectors.toMap; - -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; - -import org.apache.accumulo.core.conf.Property; -import org.apache.accumulo.core.dataImpl.KeyExtent; -import org.apache.accumulo.core.metadata.schema.ExternalCompactionFinalState; -import org.apache.accumulo.core.metadata.schema.ExternalCompactionFinalState.FinalState; -import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; -import org.apache.accumulo.core.metadata.schema.TabletMetadata; -import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; -import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location; -import org.apache.accumulo.core.metadata.schema.TabletMetadata.LocationType; -import org.apache.accumulo.core.metadata.schema.TabletsMetadata; -import org.apache.accumulo.core.rpc.ThriftUtil; -import org.apache.accumulo.core.rpc.clients.ThriftClientTypes; -import org.apache.accumulo.core.tabletserver.thrift.TabletServerClientService.Client; -import org.apache.accumulo.core.trace.TraceUtil; -import org.apache.accumulo.core.util.threads.ThreadPools; -import org.apache.accumulo.server.ServerContext; -import org.apache.thrift.TException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class CompactionFinalizer { - - private static final Logger LOG = LoggerFactory.getLogger(CompactionFinalizer.class); - - protected final ServerContext context; - private final ExecutorService ntfyExecutor; - private final ExecutorService backgroundExecutor; - private final BlockingQueue pendingNotifications; - private final long tserverCheckInterval; - - protected CompactionFinalizer(ServerContext context, ScheduledThreadPoolExecutor schedExecutor) { - this.context = context; - this.pendingNotifications = new ArrayBlockingQueue<>(1000); - - tserverCheckInterval = this.context.getConfiguration() - .getTimeInMillis(Property.COMPACTION_COORDINATOR_FINALIZER_COMPLETION_CHECK_INTERVAL); - int max = this.context.getConfiguration() - .getCount(Property.COMPACTION_COORDINATOR_FINALIZER_TSERVER_NOTIFIER_MAXTHREADS); - - this.ntfyExecutor = ThreadPools.getServerThreadPools().createThreadPool(3, max, 1, - TimeUnit.MINUTES, "Compaction Finalizer Notifier", true); - - this.backgroundExecutor = ThreadPools.getServerThreadPools().createFixedThreadPool(1, - "Compaction Finalizer Background Task", true); - - backgroundExecutor.execute(() -> { - processPending(); - }); - - ThreadPools.watchCriticalScheduledTask(schedExecutor.scheduleWithFixedDelay( - this::notifyTservers, 0, tserverCheckInterval, TimeUnit.MILLISECONDS)); - } - - public void commitCompaction(ExternalCompactionId ecid, KeyExtent extent, long fileSize, - long fileEntries) { - - var ecfs = - new ExternalCompactionFinalState(ecid, extent, FinalState.FINISHED, fileSize, fileEntries); - - LOG.debug("Initiating commit for external compaction: {}", ecfs); - - // write metadata entry - context.getAmple().putExternalCompactionFinalStates(List.of(ecfs)); - - if (!pendingNotifications.offer(ecfs)) { - LOG.debug("Queue full, notification to tablet server will happen later {}.", ecfs); - } else { - LOG.debug("Queued tserver notification for completed external compaction: {}", ecfs); - } - } - - public void failCompactions(Map compactionsToFail) { - - var finalStates = compactionsToFail.entrySet().stream().map( - e -> new ExternalCompactionFinalState(e.getKey(), e.getValue(), FinalState.FAILED, 0L, 0L)) - .collect(Collectors.toList()); - - context.getAmple().putExternalCompactionFinalStates(finalStates); - - finalStates.forEach(pendingNotifications::offer); - } - - private void notifyTserver(Location loc, ExternalCompactionFinalState ecfs) { - - Client client = null; - try { - client = ThriftUtil.getClient(ThriftClientTypes.TABLET_SERVER, loc.getHostAndPort(), context); - if (ecfs.getFinalState() == FinalState.FINISHED) { - LOG.debug("Notifying tserver {} that compaction {} has finished.", loc, ecfs); - client.compactionJobFinished(TraceUtil.traceInfo(), context.rpcCreds(), - ecfs.getExternalCompactionId().canonical(), ecfs.getExtent().toThrift(), - ecfs.getFileSize(), ecfs.getEntries()); - } else if (ecfs.getFinalState() == FinalState.FAILED) { - LOG.debug("Notifying tserver {} that compaction {} has failed.", loc, ecfs); - client.compactionJobFailed(TraceUtil.traceInfo(), context.rpcCreds(), - ecfs.getExternalCompactionId().canonical(), ecfs.getExtent().toThrift()); - } else { - throw new IllegalArgumentException(ecfs.getFinalState().name()); - } - } catch (TException e) { - LOG.warn("Failed to notify tserver {}", loc.getHostAndPort(), e); - } finally { - ThriftUtil.returnClient(client, context); - } - } - - private void processPending() { - - while (!Thread.interrupted()) { - try { - ArrayList batch = new ArrayList<>(); - batch.add(pendingNotifications.take()); - pendingNotifications.drainTo(batch); - - List> futures = new ArrayList<>(); - - List statusesToDelete = new ArrayList<>(); - - Map tabletsMetadata; - var extents = batch.stream().map(ExternalCompactionFinalState::getExtent).collect(toList()); - try (TabletsMetadata tablets = - context.getAmple().readTablets().forTablets(extents, Optional.empty()) - .fetch(ColumnType.LOCATION, ColumnType.PREV_ROW, ColumnType.ECOMP).build()) { - tabletsMetadata = tablets.stream().collect(toMap(TabletMetadata::getExtent, identity())); - } - - for (ExternalCompactionFinalState ecfs : batch) { - - TabletMetadata tabletMetadata = tabletsMetadata.get(ecfs.getExtent()); - - if (tabletMetadata == null || !tabletMetadata.getExternalCompactions().keySet() - .contains(ecfs.getExternalCompactionId())) { - // there is not per tablet external compaction entry, so delete its final state marker - // from metadata table - LOG.debug( - "Unable to find tablets external compaction entry, deleting completion entry {}", - ecfs); - statusesToDelete.add(ecfs.getExternalCompactionId()); - } else if (tabletMetadata.getLocation() != null - && tabletMetadata.getLocation().getType() == LocationType.CURRENT) { - futures - .add(ntfyExecutor.submit(() -> notifyTserver(tabletMetadata.getLocation(), ecfs))); - } else { - LOG.trace( - "External compaction {} is completed, but there is no location for tablet. Unable to notify tablet, will try again later.", - ecfs); - } - } - - if (!statusesToDelete.isEmpty()) { - LOG.info( - "Deleting unresolvable completed external compactions from metadata table, ids: {}", - statusesToDelete); - context.getAmple().deleteExternalCompactionFinalStates(statusesToDelete); - } - - for (Future future : futures) { - try { - future.get(); - } catch (ExecutionException e) { - LOG.debug("Failed to notify tserver", e); - } - } - - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new IllegalStateException(e); - } catch (RuntimeException e) { - LOG.warn("Failed to process pending notifications", e); - } - } - } - - private void notifyTservers() { - try { - Iterator finalStates = - context.getAmple().getExternalCompactionFinalStates().iterator(); - while (finalStates.hasNext()) { - ExternalCompactionFinalState state = finalStates.next(); - LOG.debug("Found external compaction in final state: {}, queueing for tserver notification", - state); - pendingNotifications.put(state); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new IllegalStateException(e); - } catch (RuntimeException e) { - LOG.warn("Failed to notify tservers", e); - } - } -} diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/QueueAndPriority.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/QueueAndPriority.java deleted file mode 100644 index aa6c5f5e6f5..00000000000 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/QueueAndPriority.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.accumulo.manager.compaction; - -import java.util.WeakHashMap; - -import org.apache.accumulo.core.util.Pair; - -public class QueueAndPriority implements Comparable { - - private static WeakHashMap,QueueAndPriority> CACHE = new WeakHashMap<>(); - - public static QueueAndPriority get(String queue, short priority) { - return CACHE.computeIfAbsent(new Pair<>(queue, priority), - k -> new QueueAndPriority(queue, priority)); - } - - private final String queue; - private final short priority; - - private QueueAndPriority(String queue, short priority) { - this.queue = queue; - this.priority = priority; - } - - public String getQueue() { - return queue; - } - - public short getPriority() { - return priority; - } - - @Override - public int hashCode() { - return queue.hashCode() + ((Short) priority).hashCode(); - } - - @Override - public String toString() { - StringBuilder buf = new StringBuilder(); - buf.append("queue: ").append(queue); - buf.append(", priority: ").append(priority); - return buf.toString(); - } - - @Override - public boolean equals(Object obj) { - if (null == obj) { - return false; - } - if (obj == this) { - return true; - } - if (!(obj instanceof QueueAndPriority)) { - return false; - } else { - QueueAndPriority other = (QueueAndPriority) obj; - return this.queue.equals(other.queue) && this.priority == other.priority; - } - } - - @Override - public int compareTo(QueueAndPriority other) { - int result = this.queue.compareTo(other.queue); - if (result == 0) { - // reversing order such that if other priority is lower, then this has a higher return value - return Long.compare(other.priority, this.priority); - } else { - return result; - } - } - -} diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/QueueSummaries.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/QueueSummaries.java deleted file mode 100644 index eaaa255d647..00000000000 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/QueueSummaries.java +++ /dev/null @@ -1,220 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.accumulo.manager.compaction; - -import java.util.ArrayList; -import java.util.Comparator; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Objects; -import java.util.Set; -import java.util.TreeMap; -import java.util.TreeSet; - -import org.apache.accumulo.core.metadata.TServerInstance; -import org.apache.accumulo.core.tabletserver.thrift.TCompactionQueueSummary; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.collect.Sets; - -public class QueueSummaries { - - private static final Logger log = LoggerFactory.getLogger(QueueSummaries.class); - - // keep track of the last tserver returned for queue - final Map LAST = new HashMap<>(); - - /* Map of external queue name -> priority -> tservers */ - final Map>> QUEUES = new HashMap<>(); - /* index of tserver to queue and priority, exists to provide O(1) lookup into QUEUES */ - final Map> INDEX = new HashMap<>(); - - private Entry> getNextTserverEntry(String queue) { - TreeMap> m = QUEUES.get(queue); - if (m == null) { - return null; - } - - Iterator>> iter = m.entrySet().iterator(); - - if (iter.hasNext()) { - Entry> next = iter.next(); - if (next.getValue().isEmpty()) { - throw new IllegalStateException( - "Unexpected empty tserver set for queue " + queue + " and prio " + next.getKey()); - } - return next; - } - - throw new IllegalStateException("Unexpected empty map for queue " + queue); - } - - static class PrioTserver { - TServerInstance tserver; - final short prio; - - public PrioTserver(TServerInstance t, short p) { - this.tserver = t; - this.prio = p; - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof PrioTserver) { - PrioTserver opt = (PrioTserver) obj; - return tserver.equals(opt.tserver) && prio == opt.prio; - } - - return false; - } - - @Override - public int hashCode() { - return Objects.hash(tserver, prio); - } - - @Override - public String toString() { - return tserver + " " + prio; - } - } - - synchronized PrioTserver getNextTserver(String queue) { - - Entry> entry = getNextTserverEntry(queue); - - if (entry == null) { - // no tserver, so remove any last entry if it exists - LAST.remove(queue); - return null; - } - - final Short priority = entry.getKey(); - final TreeSet tservers = entry.getValue(); - - PrioTserver last = LAST.get(queue); - - TServerInstance nextTserver = null; - - if (last != null && last.prio == priority) { - TServerInstance higher = tservers.higher(last.tserver); - if (higher == null) { - nextTserver = tservers.first(); - } else { - nextTserver = higher; - } - } else { - nextTserver = tservers.first(); - } - - PrioTserver result = new PrioTserver(nextTserver, priority); - - LAST.put(queue, result); - - return result; - } - - synchronized void update(TServerInstance tsi, List summaries) { - - if (log.isTraceEnabled()) { - Map> summariesToLog = new TreeMap<>(); - summaries.forEach(summary -> summariesToLog - .computeIfAbsent(summary.getQueue(), k -> new ArrayList<>()).add(summary.getPriority())); - log.trace("Adding summaries from {} : {}", tsi, summariesToLog); - } - - Set newQP = new HashSet<>(); - summaries.forEach(summary -> { - QueueAndPriority qp = - QueueAndPriority.get(summary.getQueue().intern(), summary.getPriority()); - newQP.add(qp); - }); - - Set currentQP = INDEX.getOrDefault(tsi, Set.of()); - - // remove anything the tserver did not report - for (QueueAndPriority qp : List.copyOf(Sets.difference(currentQP, newQP))) { - removeSummary(tsi, qp.getQueue(), qp.getPriority()); - } - - INDEX.put(tsi, newQP); - - newQP.forEach(qp -> { - QUEUES.computeIfAbsent(qp.getQueue(), k -> new TreeMap<>(Comparator.reverseOrder())) - .computeIfAbsent(qp.getPriority(), k -> new TreeSet<>()).add(tsi); - }); - } - - synchronized void removeSummary(TServerInstance tsi, String queue, short priority) { - - log.trace("Removing summary {} {} {}", tsi, queue, priority); - - TreeMap> m = QUEUES.get(queue); - if (m != null) { - TreeSet s = m.get(priority); - if (s != null) { - if (s.remove(tsi) && s.isEmpty()) { - m.remove(priority); - } - } - - if (m.isEmpty()) { - QUEUES.remove(queue); - } - } - - Set qaps = INDEX.get(tsi); - if (qaps != null) { - if (qaps.remove(QueueAndPriority.get(queue, priority)) && qaps.isEmpty()) { - INDEX.remove(tsi); - } - } - } - - synchronized void remove(Set deleted) { - - if (!deleted.isEmpty()) { - log.trace("Removing all summaries to tservers {}", deleted); - } - - deleted.forEach(tsi -> { - INDEX.getOrDefault(tsi, Set.of()).forEach(qp -> { - TreeMap> m = QUEUES.get(qp.getQueue()); - if (null != m) { - TreeSet tservers = m.get(qp.getPriority()); - if (null != tservers) { - if (tservers.remove(tsi) && tservers.isEmpty()) { - m.remove(qp.getPriority()); - } - - if (m.isEmpty()) { - QUEUES.remove(qp.getQueue()); - } - } - } - }); - INDEX.remove(tsi); - }); - } -} diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/CompactionCoordinator.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java similarity index 66% rename from server/manager/src/main/java/org/apache/accumulo/manager/compaction/CompactionCoordinator.java rename to server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java index a070ecfe816..3af90565562 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/CompactionCoordinator.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java @@ -16,20 +16,17 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.accumulo.manager.compaction; +package org.apache.accumulo.manager.compaction.coordinator; -import java.util.ArrayList; -import java.util.Iterator; +import java.io.IOException; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentSkipListSet; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import java.util.stream.Collectors; import org.apache.accumulo.core.Constants; @@ -51,27 +48,37 @@ import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; +import org.apache.accumulo.core.iteratorsImpl.system.SystemIteratorUtil; +import org.apache.accumulo.core.metadata.CompactableFileImpl; +import org.apache.accumulo.core.metadata.ReferencedTabletFile; +import org.apache.accumulo.core.metadata.StoredTabletFile; import org.apache.accumulo.core.metadata.TServerInstance; import org.apache.accumulo.core.metadata.schema.Ample; +import org.apache.accumulo.core.metadata.schema.DataFileValue; import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; +import org.apache.accumulo.core.metadata.schema.ExternalCompactionMetadata; import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.rpc.ThriftUtil; import org.apache.accumulo.core.rpc.clients.ThriftClientTypes; import org.apache.accumulo.core.securityImpl.thrift.TCredentials; -import org.apache.accumulo.core.tabletserver.thrift.TCompactionQueueSummary; +import org.apache.accumulo.core.spi.compaction.CompactionKind; +import org.apache.accumulo.core.tabletserver.thrift.InputFile; +import org.apache.accumulo.core.tabletserver.thrift.IteratorConfig; +import org.apache.accumulo.core.tabletserver.thrift.TCompactionKind; import org.apache.accumulo.core.tabletserver.thrift.TCompactionStats; import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob; import org.apache.accumulo.core.tabletserver.thrift.TabletServerClientService; import org.apache.accumulo.core.trace.TraceUtil; import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.accumulo.core.util.compaction.CompactionExecutorIdImpl; import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil; import org.apache.accumulo.core.util.compaction.RunningCompaction; import org.apache.accumulo.core.util.threads.ThreadPools; -import org.apache.accumulo.manager.compaction.QueueSummaries.PrioTserver; +import org.apache.accumulo.manager.compaction.queue.CompactionJobQueues; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.manager.LiveTServerSet; -import org.apache.accumulo.server.manager.LiveTServerSet.TServerConnection; import org.apache.accumulo.server.security.SecurityOperation; +import org.apache.accumulo.server.tablets.TabletNameGenerator; import org.apache.thrift.TException; import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportException; @@ -81,17 +88,15 @@ import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; +import com.google.common.base.Preconditions; import com.google.common.collect.Sets; import com.google.common.net.HostAndPort; -import com.google.common.util.concurrent.Uninterruptibles; public class CompactionCoordinator implements CompactionCoordinatorService.Iface, Runnable { private static final Logger LOG = LoggerFactory.getLogger(CompactionCoordinator.class); private static final long FIFTEEN_MINUTES = TimeUnit.MINUTES.toMillis(15); - protected static final QueueSummaries QUEUE_SUMMARIES = new QueueSummaries(); - /* * Map of compactionId to RunningCompactions. This is an informational cache of what external * compactions may be running. Its possible it may contain external compactions that are not @@ -106,28 +111,25 @@ public class CompactionCoordinator implements CompactionCoordinatorService.Iface Caffeine.newBuilder().maximumSize(200).expireAfterWrite(10, TimeUnit.MINUTES).build(); /* Map of queue name to last time compactor called to get a compaction job */ + // ELASTICITY_TODO need to clean out queues that are no longer configured.. private static final Map TIME_COMPACTOR_LAST_CHECKED = new ConcurrentHashMap<>(); private final ServerContext ctx; private final LiveTServerSet tserverSet; private final SecurityOperation security; - private CompactionFinalizer compactionFinalizer; - + private final CompactionJobQueues jobQueues; // Exposed for tests protected volatile Boolean shutdown = false; private final ScheduledThreadPoolExecutor schedExecutor; - private final ExecutorService summariesExecutor; public CompactionCoordinator(ServerContext ctx, LiveTServerSet tservers, - SecurityOperation security) { + SecurityOperation security, CompactionJobQueues jobQueues) { this.ctx = ctx; this.tserverSet = tservers; this.schedExecutor = this.ctx.getScheduledExecutor(); - summariesExecutor = ThreadPools.getServerThreadPools().createFixedThreadPool(10, - "Compaction Summary Gatherer", false); this.security = security; - createCompactionFinalizer(schedExecutor); + this.jobQueues = jobQueues; startCompactionCleaner(schedExecutor); startRunningCleaner(schedExecutor); } @@ -136,10 +138,6 @@ public void shutdown() { shutdown = true; } - protected void createCompactionFinalizer(ScheduledThreadPoolExecutor schedExecutor) { - this.compactionFinalizer = new CompactionFinalizer(this.ctx, schedExecutor); - } - protected void startCompactionCleaner(ScheduledThreadPoolExecutor schedExecutor) { ScheduledFuture future = schedExecutor.scheduleWithFixedDelay(this::cleanUpCompactors, 0, 5, TimeUnit.MINUTES); @@ -176,15 +174,19 @@ public void run() { startDeadCompactionDetector(); + // ELASTICITY_TODO the main function of the following loop was getting queue summaries from + // tservers. Its no longer doing that. May be best to remove the loop and make the remaining + // task a scheduled one. + LOG.info("Starting loop to check tservers for compaction summaries"); while (!shutdown) { long start = System.currentTimeMillis(); - updateSummaries(); - long now = System.currentTimeMillis(); TIME_COMPACTOR_LAST_CHECKED.forEach((k, v) -> { if ((now - v) > getMissingCompactorWarningTime()) { + // ELASTICITY_TODO may want to consider of the queue has any jobs queued OR if the queue + // still exist in configuration LOG.warn("No compactors have checked in with coordinator for queue {} in {}ms", k, getMissingCompactorWarningTime()); } @@ -193,69 +195,14 @@ public void run() { long checkInterval = getTServerCheckInterval(); long duration = (System.currentTimeMillis() - start); if (checkInterval - duration > 0) { - LOG.debug("Waiting {}ms for next tserver check", (checkInterval - duration)); + LOG.debug("Waiting {}ms for next queue check", (checkInterval - duration)); UtilWaitThread.sleep(checkInterval - duration); } } - summariesExecutor.shutdownNow(); LOG.info("Shutting down"); } - private void updateSummaries() { - - final ArrayList> tasks = new ArrayList<>(); - Set queuesSeen = new ConcurrentSkipListSet<>(); - - tserverSet.getCurrentServers().forEach(tsi -> { - tasks.add(summariesExecutor.submit(() -> updateSummaries(tsi, queuesSeen))); - }); - - // Wait for all tasks to complete - while (!tasks.isEmpty()) { - Iterator> iter = tasks.iterator(); - while (iter.hasNext()) { - Future f = iter.next(); - if (f.isDone()) { - iter.remove(); - } - } - Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); - } - - // remove any queues that were seen in the past, but were not seen in the latest gathering of - // summaries - TIME_COMPACTOR_LAST_CHECKED.keySet().retainAll(queuesSeen); - - // add any queues that were never seen before - queuesSeen.forEach(q -> { - TIME_COMPACTOR_LAST_CHECKED.computeIfAbsent(q, k -> System.currentTimeMillis()); - }); - } - - private void updateSummaries(TServerInstance tsi, Set queuesSeen) { - try { - TabletServerClientService.Client client = null; - try { - LOG.debug("Contacting tablet server {} to get external compaction summaries", - tsi.getHostPort()); - client = getTabletServerConnection(tsi); - List summaries = - client.getCompactionQueueInfo(TraceUtil.traceInfo(), this.ctx.rpcCreds()); - QUEUE_SUMMARIES.update(tsi, summaries); - summaries.forEach(summary -> { - queuesSeen.add(summary.getQueue()); - }); - } finally { - returnTServerClient(client); - } - } catch (TException e) { - LOG.warn("Error getting external compaction summaries from tablet server: {}", - tsi.getHostAndPort(), e); - QUEUE_SUMMARIES.remove(Set.of(tsi)); - } - } - protected void startDeadCompactionDetector() { new DeadCompactionDetector(this.ctx, this, schedExecutor).start(); } @@ -280,10 +227,6 @@ protected long getTServerCheckInterval() { public void updateTServerSet(LiveTServerSet current, Set deleted, Set added) { - // run() will iterate over the current and added tservers and add them to the internal - // data structures. For tservers that are deleted, we need to remove them from QUEUES - // and INDEX - QUEUE_SUMMARIES.remove(deleted); } /** @@ -310,42 +253,27 @@ public TExternalCompactionJob getCompactionJob(TInfo tinfo, TCredentials credent TExternalCompactionJob result = null; - PrioTserver prioTserver = QUEUE_SUMMARIES.getNextTserver(queue); - - while (prioTserver != null) { - TServerInstance tserver = prioTserver.tserver; - - LOG.trace("Getting compaction for queue {} from tserver {}", queue, tserver.getHostAndPort()); - // Get a compaction from the tserver - TabletServerClientService.Client client = null; - try { - client = getTabletServerConnection(tserver); - TExternalCompactionJob job = client.reserveCompactionJob(TraceUtil.traceInfo(), - this.ctx.rpcCreds(), queue, prioTserver.prio, compactorAddress, externalCompactionId); - if (null == job.getExternalCompactionId()) { - LOG.trace("No compactions found for queue {} on tserver {}, trying next tserver", queue, - tserver.getHostAndPort()); - - QUEUE_SUMMARIES.removeSummary(tserver, queue, prioTserver.prio); - prioTserver = QUEUE_SUMMARIES.getNextTserver(queue); - continue; - } - // It is possible that by the time this added that the tablet has already canceled the - // compaction or the compactor that made this request is dead. In these cases the compaction - // is not actually running. - RUNNING_CACHE.put(ExternalCompactionId.of(job.getExternalCompactionId()), - new RunningCompaction(job, compactorAddress, queue)); - LOG.debug("Returning external job {} to {}", job.externalCompactionId, compactorAddress); - result = job; - break; - } catch (TException e) { - LOG.warn("Error from tserver {} while trying to reserve compaction, trying next tserver", - getTServerAddressString(tserver.getHostAndPort()), e); - QUEUE_SUMMARIES.removeSummary(tserver, queue, prioTserver.prio); - prioTserver = QUEUE_SUMMARIES.getNextTserver(queue); - } finally { - returnTServerClient(client); + CompactionJobQueues.MetaJob metaJob = + jobQueues.poll(CompactionExecutorIdImpl.externalId(queueName)); + + if (metaJob != null) { + ExternalCompactionMetadata ecm = + reserveCompaction(metaJob, compactorAddress, externalCompactionId); + + if (ecm != null) { + result = createThriftJob(externalCompactionId, ecm, metaJob); + // It is possible that by the time this added that the the compactor that made this request + // is dead. In this cases the compaction is not actually running. + RUNNING_CACHE.put(ExternalCompactionId.of(result.getExternalCompactionId()), + new RunningCompaction(result, compactorAddress, queue)); + LOG.debug("Returning external job {} to {} with {} files", result.externalCompactionId, + compactorAddress, ecm.getJobFiles().size()); + } else { + LOG.debug("Unable to reserve compaction for {} ", metaJob.getTabletMetadata().getExtent()); } + // create TExternalCompactionJob if above is successful and return it + } else { + LOG.debug("No jobs found in queue {} ", queue); } if (result == null) { @@ -367,12 +295,95 @@ public TExternalCompactionJob getCompactionJob(TInfo tinfo, TCredentials credent */ protected TabletServerClientService.Client getTabletServerConnection(TServerInstance tserver) throws TTransportException { - TServerConnection connection = tserverSet.getConnection(tserver); + LiveTServerSet.TServerConnection connection = tserverSet.getConnection(tserver); TTransport transport = this.ctx.getTransportPool().getTransport(connection.getAddress(), 0, this.ctx); return ThriftUtil.createClient(ThriftClientTypes.TABLET_SERVER, transport); } + private ExternalCompactionMetadata reserveCompaction(CompactionJobQueues.MetaJob metaJob, + String compactorAddress, String externalCompactionId) { + + // only handle system ATM + Preconditions.checkArgument(metaJob.getJob().getKind() == CompactionKind.SYSTEM); + + var jobFiles = metaJob.getJob().getFiles().stream().map(CompactableFileImpl::toStoredTabletFile) + .collect(Collectors.toSet()); + + // ELASTICITY_TODO can probably remove this when selected files are stored in metadata + Set nextFiles = Set.of(); + + // ELASTICITY_TODO maybe structure code to where this can be unit tested + boolean compactingAll = metaJob.getTabletMetadata().getFiles().equals(jobFiles); + + boolean propDels = !compactingAll; + + // ELASTICITY_TODO need to create dir if it does not exists.. look at tablet code, it has cache, + // but its unbounded in size which is ok for a single tablet... in the manager we need a cache + // of dirs that were created that is bounded in size + Consumer directorCreator = dirName -> {}; + ReferencedTabletFile newFile = TabletNameGenerator.getNextDataFilenameForMajc(propDels, ctx, + metaJob.getTabletMetadata(), directorCreator); + + // ELASTICITY_TODO this determine what to set this for user compactions, may be able to remove + // it + boolean initiallSelAll = false; + + Long compactionId = null; + + ExternalCompactionMetadata ecm = new ExternalCompactionMetadata(jobFiles, nextFiles, newFile, + compactorAddress, metaJob.getJob().getKind(), metaJob.getJob().getPriority(), + metaJob.getJob().getExecutor(), propDels, initiallSelAll, compactionId); + + try (var tabletsMutator = ctx.getAmple().conditionallyMutateTablets()) { + var extent = metaJob.getTabletMetadata().getExtent(); + + // ELASTICITY_TODO need a more complex conditional check that allows multiple concurrenct + // compactions... + // need to check that this new compaction has disjoint files with any existing compactions + var tabletMutator = tabletsMutator.mutateTablet(extent).requireAbsentOperation() + .requireAbsentCompactions().requirePrevEndRow(extent.prevEndRow()); + jobFiles.forEach(tabletMutator::requireFile); + + var ecid = ExternalCompactionId.of(externalCompactionId); + tabletMutator.putExternalCompaction(ecid, ecm); + + tabletMutator + .submit(tabletMetadata -> tabletMetadata.getExternalCompactions().containsKey(ecid)); + + if (tabletsMutator.process().get(extent).getStatus() + == Ample.ConditionalResult.Status.ACCEPTED) { + return ecm; + } else { + return null; + } + } + + } + + TExternalCompactionJob createThriftJob(String externalCompactionId, + ExternalCompactionMetadata ecm, CompactionJobQueues.MetaJob metaJob) { + + // ELASTICITY_TODO get iterator config.. is this only needed for user compactions that pass + // iters? + IteratorConfig iteratorSettings = SystemIteratorUtil.toIteratorConfig(List.of()); + + var files = ecm.getJobFiles().stream().map(storedTabletFile -> { + var dfv = metaJob.getTabletMetadata().getFilesMap().get(storedTabletFile); + return new InputFile(storedTabletFile.getNormalizedPathStr(), dfv.getSize(), + dfv.getNumEntries(), dfv.getTime()); + }).collect(Collectors.toList()); + + // ELASTICITY_TODO will need to compute this + Map overrides = Map.of(); + + return new TExternalCompactionJob(externalCompactionId, + metaJob.getTabletMetadata().getExtent().toThrift(), files, iteratorSettings, + ecm.getCompactTmpName().getNormalizedPathStr(), ecm.getPropagateDeletes(), + TCompactionKind.valueOf(ecm.getKind().name()), + ecm.getCompactionId() == null ? 0 : ecm.getCompactionId(), overrides); + }; + /** * Compactor calls compactionCompleted passing in the CompactionStats * @@ -397,13 +408,87 @@ public void compactionCompleted(TInfo tinfo, TCredentials credentials, LOG.info("Compaction completed, id: {}, stats: {}, extent: {}", externalCompactionId, stats, extent); final var ecid = ExternalCompactionId.of(externalCompactionId); - compactionFinalizer.commitCompaction(ecid, extent, stats.fileSize, stats.entriesWritten); + + var tabletMeta = ctx.getAmple().readTablet(extent, TabletMetadata.ColumnType.ECOMP, + TabletMetadata.ColumnType.LOCATION); + + if (tabletMeta == null) { + LOG.debug("Received completion notification for nonexistent tablet {} {}", ecid, extent); + return; + } + + ExternalCompactionMetadata ecm = tabletMeta.getExternalCompactions().get(ecid); + + if (ecm == null) { + LOG.debug("Received completion notification for unknown compaction {} {}", ecid, extent); + return; + } + + // ELASTICITY_TODO this code does not handle race conditions or faults. Need to ensure refresh + // happens in the case of manager process death between commit and refresh. + ReferencedTabletFile newDatafile = + TabletNameGenerator.computeCompactionFileDest(ecm.getCompactTmpName()); + + try { + if (!ctx.getVolumeManager().rename(ecm.getCompactTmpName().getPath(), + newDatafile.getPath())) { + throw new IOException("rename returned false"); + } + } catch (IOException e) { + LOG.warn("Can not commit complete compaction {} because unable to rename {} to {} ", ecid, + ecm.getCompactTmpName(), newDatafile, e); + compactionFailed(Map.of(ecid, extent)); + return; + } + + commitCompaction(stats, extent, ecid, ecm, newDatafile); + + // compactionFinalizer.commitCompaction(ecid, extent, stats.fileSize, stats.entriesWritten); + + refreshTablet(tabletMeta); + // It's possible that RUNNING might not have an entry for this ecid in the case // of a coordinator restart when the Coordinator can't find the TServer for the // corresponding external compaction. recordCompletion(ecid); } + private void refreshTablet(TabletMetadata metadata) { + var location = metadata.getLocation(); + if (location != null) { + TabletServerClientService.Client client = null; + try { + client = getTabletServerConnection(location.getServerInstance()); + client.refreshTablets(TraceUtil.traceInfo(), ctx.rpcCreds(), + List.of(metadata.getExtent().toThrift())); + } catch (TException e) { + throw new RuntimeException(e); + } finally { + returnTServerClient(client); + } + } + } + + private void commitCompaction(TCompactionStats stats, KeyExtent extent, ExternalCompactionId ecid, + ExternalCompactionMetadata ecm, ReferencedTabletFile newDatafile) { + try (var tabletsMutator = ctx.getAmple().conditionallyMutateTablets()) { + var tabletMutator = tabletsMutator.mutateTablet(extent).requireAbsentOperation() + .requirePrevEndRow(extent.prevEndRow()).requireCompaction(ecid); + + ecm.getJobFiles().forEach(tabletMutator::requireFile); + ecm.getJobFiles().forEach(tabletMutator::deleteFile); + tabletMutator.deleteExternalCompaction(ecid); + tabletMutator.putFile(newDatafile, + new DataFileValue(stats.getFileSize(), stats.getEntriesWritten())); + + tabletMutator + .submit(tabletMetadata -> !tabletMetadata.getExternalCompactions().containsKey(ecid)); + + // ELASTICITY_TODO check return value + tabletsMutator.process(); + } + } + @Override public void compactionFailed(TInfo tinfo, TCredentials credentials, String externalCompactionId, TKeyExtent extent) throws ThriftSecurityException { @@ -415,10 +500,36 @@ public void compactionFailed(TInfo tinfo, TCredentials credentials, String exter LOG.info("Compaction failed, id: {}", externalCompactionId); final var ecid = ExternalCompactionId.of(externalCompactionId); compactionFailed(Map.of(ecid, KeyExtent.fromThrift(extent))); + + // ELASTICITIY_TODO need to open an issue about making the GC clean up tmp files. The tablet + // currently + // cleans up tmp files on tablet load. With tablets never loading possibly but still compacting + // dying compactors may still leave tmp files behind. } void compactionFailed(Map compactions) { - compactionFinalizer.failCompactions(compactions); + + try (var tabletsMutator = ctx.getAmple().conditionallyMutateTablets()) { + compactions.forEach((ecid, extent) -> { + tabletsMutator.mutateTablet(extent).requireAbsentOperation().requireCompaction(ecid) + .requirePrevEndRow(extent.prevEndRow()).deleteExternalCompaction(ecid) + .submit(tabletMetadata -> !tabletMetadata.getExternalCompactions().containsKey(ecid)); + }); + + tabletsMutator.process().forEach((extent, result) -> { + if (result.getStatus() != Ample.ConditionalResult.Status.ACCEPTED) { + // this should try again later when the dead compaction detector runs, lets log it in case + // its a persistent problem + if (LOG.isDebugEnabled()) { + var ecid = + compactions.entrySet().stream().filter(entry -> entry.getValue().equals(extent)) + .findFirst().map(Map.Entry::getKey).orElse(null); + LOG.debug("Unable to remove failed compaction {} {}", extent, ecid); + } + } + }); + } + compactions.forEach((k, v) -> recordCompletion(k)); } @@ -479,7 +590,7 @@ protected void cleanUpRunning() { var idsToRemove = Sets.difference(idsSnapshot, idsInMetadata); // remove ids that are in the running set but not in the metadata table - idsToRemove.forEach(ecid -> recordCompletion(ecid)); + idsToRemove.forEach(this::recordCompletion); if (idsToRemove.size() > 0) { LOG.debug("Removed stale entries from RUNNING_CACHE : {}", idsToRemove); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/DeadCompactionDetector.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/DeadCompactionDetector.java similarity index 91% rename from server/manager/src/main/java/org/apache/accumulo/manager/compaction/DeadCompactionDetector.java rename to server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/DeadCompactionDetector.java index 84eaeebeee7..b8bd5b45628 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/DeadCompactionDetector.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/DeadCompactionDetector.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.accumulo.manager.compaction; +package org.apache.accumulo.manager.compaction.coordinator; import java.util.Collection; import java.util.HashMap; @@ -105,17 +105,6 @@ private void detectDeadCompactions() { } }); - // Determine which compactions are currently committing and remove those - context.getAmple().getExternalCompactionFinalStates() - .map(ecfs -> ecfs.getExternalCompactionId()).forEach(ecid -> { - if (tabletCompactions.remove(ecid) != null) { - log.debug("Removed compaction {} that is committing", ecid); - } - if (this.deadCompactions.remove(ecid) != null) { - log.debug("Removed {} from the dead compaction map, it's committing", ecid); - } - }); - tabletCompactions.forEach((ecid, extent) -> { log.info("Possible dead compaction detected {} {}", ecid, extent); this.deadCompactions.merge(ecid, 1L, Long::sum); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java new file mode 100644 index 00000000000..9dc6bba41ff --- /dev/null +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.manager.compaction.queue; + +import static com.google.common.base.Preconditions.checkState; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.TreeMap; + +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.spi.compaction.CompactionExecutorId; +import org.apache.accumulo.core.spi.compaction.CompactionJob; +import org.apache.accumulo.core.util.compaction.CompactionJobPrioritizer; + +import com.google.common.base.Preconditions; + +/** + * Priority Queue for {@link CompactionJob}s that supports a maximum size. When a job is added and + * the queue is at maximum size the new job is compared to the lowest job with the lowest priority. + * The new job will either replace the lowest priority one or be ignored. + * + *

+ * When jobs are added for tablet, any previous jobs that are queued for the tablet are removed. + *

+ */ +public class CompactionJobPriorityQueue { + // ELASTICITY_TODO unit test this class + private final CompactionExecutorId executorId; + + private class CjqpKey implements Comparable { + private final CompactionJob job; + + // this exists to make every entry unique even if the job is the same, this is done because a + // treeset is used as a queue + private final long seq; + + CjqpKey(CompactionJob job) { + this.job = job; + this.seq = nextSeq++; + } + + @Override + public int compareTo(CjqpKey oe) { + int cmp = CompactionJobPrioritizer.JOB_COMPARATOR.compare(this.job, oe.job); + if (cmp == 0) { + cmp = Long.compare(seq, oe.seq); + } + return cmp; + } + + @Override + public int hashCode() { + return Objects.hash(job, seq); + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + CjqpKey cjqpKey = (CjqpKey) o; + return seq == cjqpKey.seq && job.equals(cjqpKey.job); + } + } + + // There are two reasons for using a TreeMap instead of a PriorityQueue. First the maximum size + // behavior is not supported with a PriorityQueue. Second a PriorityQueue does not support + // efficiently removing entries from anywhere in the queue. Efficient removal is needed for the + // case where tablets decided to issues different compaction jobs than what is currently queued. + private final TreeMap jobQueue; + private final int maxSize; + + // This map tracks what jobs a tablet currently has in the queue. Its used to efficiently remove + // jobs in the queue when new jobs are queued for a tablet. + private final Map> tabletJobs; + + private long nextSeq; + + private boolean closed = true; + + public CompactionJobPriorityQueue(CompactionExecutorId executorId, int maxSize) { + this.jobQueue = new TreeMap<>(); + this.maxSize = maxSize; + this.tabletJobs = new HashMap<>(); + this.executorId = executorId; + } + + public synchronized boolean add(TabletMetadata tabletMetadata, Collection jobs) { + if(closed){ + return false; + } + + + Preconditions + .checkArgument(jobs.stream().allMatch(job -> job.getExecutor().equals(executorId))); + + removePreviousSubmissions(tabletMetadata.getExtent()); + + List newEntries = new ArrayList<>(jobs.size()); + + for (CompactionJob job : jobs) { + CjqpKey cjqpKey = addJobToQueue(tabletMetadata, job); + if (cjqpKey != null) { + newEntries.add(cjqpKey); + } + } + + if (!newEntries.isEmpty()) { + checkState(tabletJobs.put(tabletMetadata.getExtent(), newEntries) == null); + } + + return true; + } + + public synchronized CompactionJobQueues.MetaJob poll() { + var first = jobQueue.pollFirstEntry(); + + if (first != null) { + var extent = first.getValue().getTabletMetadata().getExtent(); + List jobs = tabletJobs.get(extent); + checkState(jobs.remove(first.getKey())); + if (jobs.isEmpty()) { + tabletJobs.remove(extent); + } + } + + return first == null ? null : first.getValue(); + } + + public synchronized boolean closeIfEmpty() { + if(jobQueue.isEmpty()) { + closed = true; + return true; + } + + return false; + } + + private void removePreviousSubmissions(KeyExtent extent) { + List prevJobs = tabletJobs.get(extent); + if (prevJobs != null) { + prevJobs.forEach(jobQueue::remove); + tabletJobs.remove(extent); + } + } + + private CjqpKey addJobToQueue(TabletMetadata tabletMetadata, CompactionJob job) { + if (jobQueue.size() >= maxSize) { + var lastEntry = jobQueue.lastKey(); + if (job.getPriority() <= lastEntry.job.getPriority()) { + // the queue is full and this job has a lower or same priority than the lowest job in the + // queue, so do not add it + return null; + } else { + // the new job has a higher priority than the lowest job in the queue, so remove the lowest + jobQueue.pollLastEntry(); + } + + } + + var key = new CjqpKey(job); + jobQueue.put(key, new CompactionJobQueues.MetaJob(job, tabletMetadata)); + return key; + } +} diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java new file mode 100644 index 00000000000..f76f6e554b0 --- /dev/null +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.manager.compaction.queue; + +import java.util.Collection; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.spi.compaction.CompactionExecutorId; +import org.apache.accumulo.core.spi.compaction.CompactionJob; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CompactionJobQueues { + + private static final Logger log = LoggerFactory.getLogger(CompactionJobQueues.class); + + private final ConcurrentHashMap priorityQueues = + new ConcurrentHashMap<>(); + + public void add(TabletMetadata tabletMetadata, Collection jobs) { + if (jobs.size() == 1) { + var executorId = jobs.iterator().next().getExecutor(); + add(tabletMetadata, executorId, jobs); + } else { + jobs.stream().collect(Collectors.groupingBy(CompactionJob::getExecutor)).forEach( + ((executorId, compactionJobs) -> add(tabletMetadata, executorId, compactionJobs))); + } + } + + public static class MetaJob { + private final CompactionJob job; + + // the metadata from which the compaction job was derived + private final TabletMetadata tabletMetadata; + + public MetaJob(CompactionJob job, TabletMetadata tabletMetadata) { + this.job = job; + this.tabletMetadata = tabletMetadata; + } + + public CompactionJob getJob() { + return job; + } + + public TabletMetadata getTabletMetadata() { + return tabletMetadata; + } + } + + public MetaJob poll(CompactionExecutorId executorId) { + var prioQ = priorityQueues.get(executorId); + if (prioQ == null) { + return null; + } + MetaJob mj = prioQ.poll(); + + if(mj == null){ + priorityQueues.computeIfPresent(executorId, (eid, pq)->{ + if(pq.closeIfEmpty()){ + return null; + } else { + return pq; + } + }); + } + + return mj; + } + + private void add(TabletMetadata tabletMetadata, CompactionExecutorId executorId, + Collection jobs) { + + // TODO log level + if (log.isDebugEnabled()) { + log.debug("Adding jobs to queue {} {} {}", executorId, tabletMetadata.getExtent(), + jobs.stream().map(job -> "#files:" + job.getFiles().size() + ",prio:" + job.getPriority() + + ",kind:" + job.getKind()).collect(Collectors.toList())); + } + + // TODO make max size configurable + var pq = priorityQueues.computeIfAbsent(executorId, eid -> new CompactionJobPriorityQueue(eid, 10000)); + while(!pq.add(tabletMetadata, jobs)){ + // This loop handles race condition where poll() closes empty priority queues. The queue could be closed after its obtained from the map and before add is called. + pq = priorityQueues.computeIfAbsent(executorId, eid -> new CompactionJobPriorityQueue(eid, 10000)); + } + } +} diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/PreSplit.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/PreSplit.java index 6b7e72298ee..1e7a1592cec 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/PreSplit.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/PreSplit.java @@ -36,7 +36,7 @@ import org.apache.accumulo.core.metadata.schema.TabletOperationType; import org.apache.accumulo.manager.Manager; import org.apache.accumulo.manager.tableOps.ManagerRepo; -import org.apache.accumulo.server.tablets.UniqueNameAllocator; +import org.apache.accumulo.server.tablets.TabletNameGenerator; import org.apache.hadoop.io.Text; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -139,7 +139,7 @@ public Repo call(long tid, Manager manager) throws Exception { List dirs = new ArrayList<>(); splitInfo.getSplits().forEach(split -> { - String dirName = UniqueNameAllocator.createTabletDirectoryName(manager.getContext(), split); + String dirName = TabletNameGenerator.createTabletDirectoryName(manager.getContext(), split); dirs.add(dirName); log.trace("{} allocated dir name {}", FateTxId.formatTid(tid), dirName); }); diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java index a02021b4dd6..68107da1958 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java @@ -18,519 +18,6 @@ */ package org.apache.accumulo.manager.compaction; -import static org.easymock.EasyMock.anyObject; -import static org.easymock.EasyMock.expect; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertTrue; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.TreeMap; -import java.util.TreeSet; -import java.util.UUID; -import java.util.concurrent.ScheduledThreadPoolExecutor; - -import org.apache.accumulo.core.clientImpl.thrift.TInfo; -import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException; -import org.apache.accumulo.core.compaction.thrift.TExternalCompaction; -import org.apache.accumulo.core.conf.DefaultConfiguration; -import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent; -import org.apache.accumulo.core.metadata.TServerInstance; -import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; -import org.apache.accumulo.core.securityImpl.thrift.TCredentials; -import org.apache.accumulo.core.tabletserver.thrift.TCompactionQueueSummary; -import org.apache.accumulo.core.tabletserver.thrift.TCompactionStats; -import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob; -import org.apache.accumulo.core.tabletserver.thrift.TabletServerClientService; -import org.apache.accumulo.core.tabletserver.thrift.TabletServerClientService.Client; -import org.apache.accumulo.core.trace.TraceUtil; -import org.apache.accumulo.core.util.compaction.RunningCompaction; -import org.apache.accumulo.server.ServerContext; -import org.apache.accumulo.server.manager.LiveTServerSet; -import org.apache.accumulo.server.security.AuditedSecurityOperation; -import org.apache.thrift.transport.TTransportException; -import org.easymock.EasyMock; -import org.junit.jupiter.api.Test; - -import com.google.common.collect.Sets; -import com.google.common.net.HostAndPort; - public class CompactionCoordinatorTest { - - public class TestCoordinator extends CompactionCoordinator { - - private final Client tabletServerClient; - private final List runningCompactions; - - private Set metadataCompactionIds = null; - - protected TestCoordinator(LiveTServerSet tservers, Client tabletServerClient, - ServerContext context, AuditedSecurityOperation security, - List runningCompactions) { - super(context, tservers, security); - this.tabletServerClient = tabletServerClient; - this.runningCompactions = runningCompactions; - } - - @Override - protected void startDeadCompactionDetector() {} - - @Override - protected long getTServerCheckInterval() { - this.shutdown = true; - return 0L; - } - - @Override - protected void startCompactionCleaner(ScheduledThreadPoolExecutor schedExecutor) {} - - @Override - protected void createCompactionFinalizer(ScheduledThreadPoolExecutor stpe) {} - - @Override - protected void startRunningCleaner(ScheduledThreadPoolExecutor schedExecutor) {} - - @Override - protected Client getTabletServerConnection(TServerInstance tserver) throws TTransportException { - return tabletServerClient; - } - - @Override - public void compactionCompleted(TInfo tinfo, TCredentials credentials, - String externalCompactionId, TKeyExtent textent, TCompactionStats stats) - throws ThriftSecurityException {} - - @Override - public void compactionFailed(TInfo tinfo, TCredentials credentials, String externalCompactionId, - TKeyExtent extent) throws ThriftSecurityException {} - - void setMetadataCompactionIds(Set mci) { - metadataCompactionIds = mci; - } - - @Override - protected Set readExternalCompactionIds() { - if (metadataCompactionIds == null) { - return RUNNING_CACHE.keySet(); - } else { - return metadataCompactionIds; - } - } - - public Map>> getQueues() { - return CompactionCoordinator.QUEUE_SUMMARIES.QUEUES; - } - - public Map> getIndex() { - return CompactionCoordinator.QUEUE_SUMMARIES.INDEX; - } - - public Map getRunning() { - return RUNNING_CACHE; - } - - public void resetInternals() { - getQueues().clear(); - getIndex().clear(); - getRunning().clear(); - metadataCompactionIds = null; - } - - @Override - protected String getTServerAddressString(HostAndPort tserverAddress) { - return ""; - } - - @Override - protected List getCompactionsRunningOnCompactors() { - return runningCompactions; - } - - @Override - protected void cancelCompactionOnCompactor(String address, String externalCompactionId) {} - - @Override - protected void returnTServerClient(Client client) {} - - } - - @Test - public void testCoordinatorColdStartNoCompactions() throws Exception { - - ServerContext context = EasyMock.createNiceMock(ServerContext.class); - expect(context.getConfiguration()).andReturn(DefaultConfiguration.getInstance()).anyTimes(); - - LiveTServerSet tservers = EasyMock.createNiceMock(LiveTServerSet.class); - expect(tservers.getCurrentServers()).andReturn(Collections.emptySet()).anyTimes(); - - TServerInstance tsi = EasyMock.createNiceMock(TServerInstance.class); - expect(tsi.getHostPort()).andReturn("localhost:9997").anyTimes(); - - TabletServerClientService.Client tsc = - EasyMock.createNiceMock(TabletServerClientService.Client.class); - expect(tsc.getCompactionQueueInfo(anyObject(), anyObject())).andReturn(Collections.emptyList()) - .anyTimes(); - - AuditedSecurityOperation security = EasyMock.createNiceMock(AuditedSecurityOperation.class); - - EasyMock.replay(context, tservers, tsi, tsc, security); - - var coordinator = new TestCoordinator(tservers, tsc, context, security, new ArrayList<>()); - coordinator.resetInternals(); - assertEquals(0, coordinator.getQueues().size()); - assertEquals(0, coordinator.getIndex().size()); - assertEquals(0, coordinator.getRunning().size()); - coordinator.run(); - assertEquals(0, coordinator.getQueues().size()); - assertEquals(0, coordinator.getIndex().size()); - assertEquals(0, coordinator.getRunning().size()); - coordinator.shutdown(); - - EasyMock.verify(context, tservers, tsi, tsc, security); - coordinator.resetInternals(); - } - - @Test - public void testCoordinatorColdStart() throws Exception { - - ServerContext context = EasyMock.createNiceMock(ServerContext.class); - expect(context.getConfiguration()).andReturn(DefaultConfiguration.getInstance()).anyTimes(); - - TCredentials creds = EasyMock.createNiceMock(TCredentials.class); - expect(context.rpcCreds()).andReturn(creds); - - LiveTServerSet tservers = EasyMock.createNiceMock(LiveTServerSet.class); - TServerInstance instance = EasyMock.createNiceMock(TServerInstance.class); - expect(tservers.getCurrentServers()).andReturn(Collections.singleton(instance)).once(); - - TServerInstance tsi = EasyMock.createNiceMock(TServerInstance.class); - expect(tsi.getHostPort()).andReturn("localhost:9997").anyTimes(); - - TabletServerClientService.Client tsc = - EasyMock.createNiceMock(TabletServerClientService.Client.class); - TCompactionQueueSummary queueSummary = EasyMock.createNiceMock(TCompactionQueueSummary.class); - expect(tsc.getCompactionQueueInfo(anyObject(), anyObject())) - .andReturn(Collections.singletonList(queueSummary)).anyTimes(); - expect(queueSummary.getQueue()).andReturn("R2DQ").anyTimes(); - expect(queueSummary.getPriority()).andReturn((short) 1).anyTimes(); - - AuditedSecurityOperation security = EasyMock.createNiceMock(AuditedSecurityOperation.class); - - EasyMock.replay(context, creds, tservers, instance, tsi, tsc, queueSummary, security); - - var coordinator = new TestCoordinator(tservers, tsc, context, security, new ArrayList<>()); - coordinator.resetInternals(); - assertEquals(0, coordinator.getQueues().size()); - assertEquals(0, coordinator.getIndex().size()); - assertEquals(0, coordinator.getRunning().size()); - coordinator.run(); - assertEquals(1, coordinator.getQueues().size()); - QueueAndPriority qp = QueueAndPriority.get("R2DQ".intern(), (short) 1); - Map> m = coordinator.getQueues().get("R2DQ".intern()); - assertNotNull(m); - assertEquals(1, m.size()); - assertTrue(m.containsKey((short) 1)); - Set t = m.get((short) 1); - assertNotNull(t); - assertEquals(1, t.size()); - TServerInstance queuedTsi = t.iterator().next(); - assertEquals(tsi.getHostPortSession(), queuedTsi.getHostPortSession()); - assertEquals(1, coordinator.getIndex().size()); - assertTrue(coordinator.getIndex().containsKey(queuedTsi)); - Set i = coordinator.getIndex().get(queuedTsi); - assertEquals(1, i.size()); - assertEquals(qp, i.iterator().next()); - assertEquals(0, coordinator.getRunning().size()); - coordinator.shutdown(); - - EasyMock.verify(context, tservers, instance, tsi, tsc, queueSummary, security); - coordinator.resetInternals(); - } - - @Test - public void testCoordinatorRestartNoRunningCompactions() throws Exception { - - ServerContext context = EasyMock.createNiceMock(ServerContext.class); - expect(context.getConfiguration()).andReturn(DefaultConfiguration.getInstance()).anyTimes(); - - TCredentials creds = EasyMock.createNiceMock(TCredentials.class); - expect(context.rpcCreds()).andReturn(creds); - - LiveTServerSet tservers = EasyMock.createNiceMock(LiveTServerSet.class); - TServerInstance instance = EasyMock.createNiceMock(TServerInstance.class); - HostAndPort tserverAddress = HostAndPort.fromString("localhost:9997"); - expect(instance.getHostAndPort()).andReturn(tserverAddress).anyTimes(); - expect(tservers.getCurrentServers()).andReturn(Sets.newHashSet(instance)).once(); - - expect(instance.getHostPort()).andReturn("localhost:9997").anyTimes(); - - TabletServerClientService.Client tsc = - EasyMock.createNiceMock(TabletServerClientService.Client.class); - TCompactionQueueSummary queueSummary = EasyMock.createNiceMock(TCompactionQueueSummary.class); - expect(tsc.getCompactionQueueInfo(anyObject(), anyObject())) - .andReturn(Collections.singletonList(queueSummary)).anyTimes(); - expect(queueSummary.getQueue()).andReturn("R2DQ").anyTimes(); - expect(queueSummary.getPriority()).andReturn((short) 1).anyTimes(); - - AuditedSecurityOperation security = EasyMock.createNiceMock(AuditedSecurityOperation.class); - - EasyMock.replay(context, creds, tservers, instance, tsc, queueSummary, security); - - var coordinator = new TestCoordinator(tservers, tsc, context, security, new ArrayList<>()); - coordinator.resetInternals(); - assertEquals(0, coordinator.getQueues().size()); - assertEquals(0, coordinator.getIndex().size()); - assertEquals(0, coordinator.getRunning().size()); - coordinator.run(); - assertEquals(1, coordinator.getQueues().size()); - QueueAndPriority qp = QueueAndPriority.get("R2DQ".intern(), (short) 1); - Map> m = coordinator.getQueues().get("R2DQ".intern()); - assertNotNull(m); - assertEquals(1, m.size()); - assertTrue(m.containsKey((short) 1)); - Set t = m.get((short) 1); - assertNotNull(t); - assertEquals(1, t.size()); - TServerInstance queuedTsi = t.iterator().next(); - assertEquals(instance.getHostPortSession(), queuedTsi.getHostPortSession()); - assertEquals(1, coordinator.getIndex().size()); - assertTrue(coordinator.getIndex().containsKey(queuedTsi)); - Set i = coordinator.getIndex().get(queuedTsi); - assertEquals(1, i.size()); - assertEquals(qp, i.iterator().next()); - assertEquals(0, coordinator.getRunning().size()); - coordinator.shutdown(); - - EasyMock.verify(context, creds, tservers, instance, tsc, queueSummary, security); - coordinator.resetInternals(); - } - - @Test - public void testCoordinatorRestartOneRunningCompaction() throws Exception { - - ServerContext context = EasyMock.createNiceMock(ServerContext.class); - expect(context.getConfiguration()).andReturn(DefaultConfiguration.getInstance()).anyTimes(); - - TCredentials creds = EasyMock.createNiceMock(TCredentials.class); - expect(context.rpcCreds()).andReturn(creds); - - LiveTServerSet tservers = EasyMock.createNiceMock(LiveTServerSet.class); - TServerInstance instance = EasyMock.createNiceMock(TServerInstance.class); - HostAndPort tserverAddress = HostAndPort.fromString("localhost:9997"); - expect(instance.getHostAndPort()).andReturn(tserverAddress).anyTimes(); - expect(tservers.getCurrentServers()).andReturn(Sets.newHashSet(instance)).once(); - - List runningCompactions = new ArrayList<>(); - ExternalCompactionId eci = ExternalCompactionId.generate(UUID.randomUUID()); - TExternalCompactionJob job = EasyMock.createNiceMock(TExternalCompactionJob.class); - expect(job.getExternalCompactionId()).andReturn(eci.toString()).anyTimes(); - TKeyExtent extent = new TKeyExtent(); - extent.setTable("1".getBytes()); - runningCompactions.add(new RunningCompaction(job, tserverAddress.toString(), "queue")); - - expect(instance.getHostPort()).andReturn("localhost:9997").anyTimes(); - - TabletServerClientService.Client tsc = - EasyMock.createNiceMock(TabletServerClientService.Client.class); - TCompactionQueueSummary queueSummary = EasyMock.createNiceMock(TCompactionQueueSummary.class); - expect(tsc.getCompactionQueueInfo(anyObject(), anyObject())) - .andReturn(Collections.singletonList(queueSummary)).anyTimes(); - expect(queueSummary.getQueue()).andReturn("R2DQ").anyTimes(); - expect(queueSummary.getPriority()).andReturn((short) 1).anyTimes(); - - AuditedSecurityOperation security = EasyMock.createNiceMock(AuditedSecurityOperation.class); - - EasyMock.replay(context, creds, tservers, instance, job, tsc, queueSummary, security); - - var coordinator = new TestCoordinator(tservers, tsc, context, security, runningCompactions); - coordinator.resetInternals(); - assertEquals(0, coordinator.getQueues().size()); - assertEquals(0, coordinator.getIndex().size()); - assertEquals(0, coordinator.getRunning().size()); - coordinator.run(); - assertEquals(1, coordinator.getQueues().size()); - QueueAndPriority qp = QueueAndPriority.get("R2DQ".intern(), (short) 1); - Map> m = coordinator.getQueues().get("R2DQ".intern()); - assertNotNull(m); - assertEquals(1, m.size()); - assertTrue(m.containsKey((short) 1)); - Set t = m.get((short) 1); - assertNotNull(t); - assertEquals(1, t.size()); - TServerInstance queuedTsi = t.iterator().next(); - assertEquals(instance.getHostPortSession(), queuedTsi.getHostPortSession()); - assertEquals(1, coordinator.getIndex().size()); - assertTrue(coordinator.getIndex().containsKey(queuedTsi)); - Set i = coordinator.getIndex().get(queuedTsi); - assertEquals(1, i.size()); - assertEquals(qp, i.iterator().next()); - assertEquals(1, coordinator.getRunning().size()); - coordinator.shutdown(); - - EasyMock.verify(context, creds, tservers, instance, job, tsc, queueSummary, security); - coordinator.resetInternals(); - } - - @Test - public void testGetCompactionJob() throws Exception { - - ServerContext context = EasyMock.createNiceMock(ServerContext.class); - expect(context.getConfiguration()).andReturn(DefaultConfiguration.getInstance()).anyTimes(); - - TCredentials creds = EasyMock.createNiceMock(TCredentials.class); - expect(context.rpcCreds()).andReturn(creds).anyTimes(); - - LiveTServerSet tservers = EasyMock.createNiceMock(LiveTServerSet.class); - TServerInstance instance = EasyMock.createNiceMock(TServerInstance.class); - expect(tservers.getCurrentServers()).andReturn(Collections.singleton(instance)).once(); - HostAndPort tserverAddress = HostAndPort.fromString("localhost:9997"); - expect(instance.getHostAndPort()).andReturn(tserverAddress).anyTimes(); - - TServerInstance tsi = EasyMock.createNiceMock(TServerInstance.class); - expect(tsi.getHostPort()).andReturn("localhost:9997").anyTimes(); - - TabletServerClientService.Client tsc = - EasyMock.createNiceMock(TabletServerClientService.Client.class); - TCompactionQueueSummary queueSummary = EasyMock.createNiceMock(TCompactionQueueSummary.class); - expect(tsc.getCompactionQueueInfo(anyObject(), anyObject())) - .andReturn(Collections.singletonList(queueSummary)).anyTimes(); - expect(queueSummary.getQueue()).andReturn("R2DQ").anyTimes(); - expect(queueSummary.getPriority()).andReturn((short) 1).anyTimes(); - - ExternalCompactionId eci = ExternalCompactionId.generate(UUID.randomUUID()); - TExternalCompactionJob job = EasyMock.createNiceMock(TExternalCompactionJob.class); - expect(job.getExternalCompactionId()).andReturn(eci.toString()).anyTimes(); - TInfo trace = TraceUtil.traceInfo(); - expect(tsc.reserveCompactionJob(trace, creds, "R2DQ", 1, "localhost:10241", eci.toString())) - .andReturn(job).anyTimes(); - - AuditedSecurityOperation security = EasyMock.createNiceMock(AuditedSecurityOperation.class); - expect(security.canPerformSystemActions(creds)).andReturn(true); - - EasyMock.replay(context, creds, tservers, instance, job, tsc, queueSummary, security); - - var coordinator = new TestCoordinator(tservers, tsc, context, security, new ArrayList<>()); - coordinator.resetInternals(); - assertEquals(0, coordinator.getQueues().size()); - assertEquals(0, coordinator.getIndex().size()); - assertEquals(0, coordinator.getRunning().size()); - // Use coordinator.run() to populate the internal data structures. This is tested in a different - // test. - coordinator.run(); - coordinator.shutdown(); - - assertEquals(1, coordinator.getQueues().size()); - QueueAndPriority qp = QueueAndPriority.get("R2DQ".intern(), (short) 1); - Map> m = coordinator.getQueues().get("R2DQ".intern()); - assertNotNull(m); - assertEquals(1, m.size()); - assertTrue(m.containsKey((short) 1)); - Set t = m.get((short) 1); - assertNotNull(t); - assertEquals(1, t.size()); - TServerInstance queuedTsi = t.iterator().next(); - assertEquals(tsi.getHostPortSession(), queuedTsi.getHostPortSession()); - assertEquals(1, coordinator.getIndex().size()); - assertTrue(coordinator.getIndex().containsKey(queuedTsi)); - Set i = coordinator.getIndex().get(queuedTsi); - assertEquals(1, i.size()); - assertEquals(qp, i.iterator().next()); - assertEquals(0, coordinator.getRunning().size()); - - // Get the next job - TExternalCompactionJob createdJob = - coordinator.getCompactionJob(trace, creds, "R2DQ", "localhost:10241", eci.toString()); - assertEquals(eci.toString(), createdJob.getExternalCompactionId()); - - assertEquals(1, coordinator.getQueues().size()); - assertEquals(1, coordinator.getIndex().size()); - assertEquals(1, coordinator.getRunning().size()); - Entry entry = - coordinator.getRunning().entrySet().iterator().next(); - assertEquals(eci.toString(), entry.getKey().toString()); - assertEquals("localhost:10241", entry.getValue().getCompactorAddress()); - assertEquals(eci.toString(), entry.getValue().getJob().getExternalCompactionId()); - - EasyMock.verify(context, creds, tservers, instance, job, tsc, queueSummary, security); - coordinator.resetInternals(); - - } - - @Test - public void testGetCompactionJobNoJobs() throws Exception { - - ServerContext context = EasyMock.createNiceMock(ServerContext.class); - expect(context.getConfiguration()).andReturn(DefaultConfiguration.getInstance()).anyTimes(); - - TCredentials creds = EasyMock.createNiceMock(TCredentials.class); - - LiveTServerSet tservers = EasyMock.createNiceMock(LiveTServerSet.class); - - TabletServerClientService.Client tsc = - EasyMock.createNiceMock(TabletServerClientService.Client.class); - - AuditedSecurityOperation security = EasyMock.createNiceMock(AuditedSecurityOperation.class); - expect(security.canPerformSystemActions(creds)).andReturn(true); - - EasyMock.replay(context, creds, tservers, tsc, security); - - var coordinator = new TestCoordinator(tservers, tsc, context, security, new ArrayList<>()); - coordinator.resetInternals(); - TExternalCompactionJob job = coordinator.getCompactionJob(TraceUtil.traceInfo(), creds, "R2DQ", - "localhost:10240", UUID.randomUUID().toString()); - assertNull(job.getExternalCompactionId()); - coordinator.shutdown(); - - EasyMock.verify(context, creds, tservers, tsc, security); - coordinator.resetInternals(); - } - - @Test - public void testCleanUpRunning() throws Exception { - - ServerContext context = EasyMock.createNiceMock(ServerContext.class); - expect(context.getConfiguration()).andReturn(DefaultConfiguration.getInstance()).anyTimes(); - - TCredentials creds = EasyMock.createNiceMock(TCredentials.class); - - LiveTServerSet tservers = EasyMock.createNiceMock(LiveTServerSet.class); - - TabletServerClientService.Client tsc = - EasyMock.createNiceMock(TabletServerClientService.Client.class); - - AuditedSecurityOperation security = EasyMock.createNiceMock(AuditedSecurityOperation.class); - - EasyMock.replay(context, creds, tservers, tsc, security); - - TestCoordinator coordinator = - new TestCoordinator(tservers, tsc, context, security, new ArrayList<>()); - coordinator.resetInternals(); - - var ecid1 = ExternalCompactionId.generate(UUID.randomUUID()); - var ecid2 = ExternalCompactionId.generate(UUID.randomUUID()); - var ecid3 = ExternalCompactionId.generate(UUID.randomUUID()); - - coordinator.getRunning().put(ecid1, new RunningCompaction(new TExternalCompaction())); - coordinator.getRunning().put(ecid2, new RunningCompaction(new TExternalCompaction())); - coordinator.getRunning().put(ecid3, new RunningCompaction(new TExternalCompaction())); - - coordinator.cleanUpRunning(); - - assertEquals(Set.of(ecid1, ecid2, ecid3), coordinator.getRunning().keySet()); - - coordinator.setMetadataCompactionIds(Set.of(ecid1, ecid2)); - - coordinator.cleanUpRunning(); - - assertEquals(Set.of(ecid1, ecid2), coordinator.getRunning().keySet()); - - EasyMock.verify(context, creds, tservers, tsc, security); - - } + // ELASTICITY_TODO this test was no longer compiling with all the changes to CompactionCoordinator. Its contents were deleted to get things compiling, however need to go and look at the test and determine what to carry forward with CompactionCoordinator. } diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/QueueSummariesTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/QueueSummariesTest.java deleted file mode 100644 index 7a213149cd4..00000000000 --- a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/QueueSummariesTest.java +++ /dev/null @@ -1,194 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.accumulo.manager.compaction; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNull; - -import java.util.ArrayList; -import java.util.List; -import java.util.Set; - -import org.apache.accumulo.core.metadata.TServerInstance; -import org.apache.accumulo.core.tabletserver.thrift.TCompactionQueueSummary; -import org.apache.accumulo.manager.compaction.QueueSummaries.PrioTserver; -import org.junit.jupiter.api.Test; - -public class QueueSummariesTest { - - private TServerInstance ntsi(String tserver) { - return new TServerInstance(tserver + ":9997", 0); - } - - private PrioTserver npt(String tserver, short prio) { - return new PrioTserver(ntsi(tserver), prio); - } - - private void update(QueueSummaries queueSum, String tserver, String... data) { - - TServerInstance tsi = ntsi(tserver); - - List summaries = new ArrayList<>(); - - for (int i = 0; i < data.length; i += 2) { - summaries.add(new TCompactionQueueSummary(data[i], Short.parseShort(data[i + 1]))); - } - - queueSum.update(tsi, summaries); - } - - @Test - public void testBasic() { - QueueSummaries queueSum = new QueueSummaries(); - - update(queueSum, "ts1", "q1", "5", "q1", "4", "q2", "5", "q3", "4"); - update(queueSum, "ts2", "q1", "5", "q3", "5", "q3", "4"); - update(queueSum, "ts3", "q1", "5", "q2", "5"); - - for (int i = 0; i < 3; i++) { - assertEquals(npt("ts1", (short) 5), queueSum.getNextTserver("q1")); - assertEquals(npt("ts1", (short) 5), queueSum.getNextTserver("q2")); - assertEquals(npt("ts2", (short) 5), queueSum.getNextTserver("q1")); - assertEquals(npt("ts3", (short) 5), queueSum.getNextTserver("q1")); - assertEquals(npt("ts3", (short) 5), queueSum.getNextTserver("q2")); - assertEquals(npt("ts2", (short) 5), queueSum.getNextTserver("q3")); - } - - queueSum.removeSummary(ntsi("ts2"), "q1", (short) 5); - - for (int i = 0; i < 3; i++) { - assertEquals(npt("ts1", (short) 5), queueSum.getNextTserver("q1")); - assertEquals(npt("ts1", (short) 5), queueSum.getNextTserver("q2")); - assertEquals(npt("ts3", (short) 5), queueSum.getNextTserver("q1")); - assertEquals(npt("ts3", (short) 5), queueSum.getNextTserver("q2")); - assertEquals(npt("ts2", (short) 5), queueSum.getNextTserver("q3")); - } - - queueSum.removeSummary(ntsi("ts3"), "q2", (short) 5); - queueSum.removeSummary(ntsi("ts2"), "q3", (short) 5); - - for (int i = 0; i < 3; i++) { - assertEquals(npt("ts1", (short) 5), queueSum.getNextTserver("q1")); - assertEquals(npt("ts1", (short) 5), queueSum.getNextTserver("q2")); - assertEquals(npt("ts3", (short) 5), queueSum.getNextTserver("q1")); - assertEquals(npt("ts1", (short) 4), queueSum.getNextTserver("q3")); - assertEquals(npt("ts2", (short) 4), queueSum.getNextTserver("q3")); - } - - } - - @Test - public void testUpdate() { - QueueSummaries queueSum = new QueueSummaries(); - - update(queueSum, "ts1", "q1", "5", "q2", "5", "q3", "4"); - update(queueSum, "ts2", "q1", "5", "q2", "4", "q3", "5"); - - for (int i = 0; i < 3; i++) { - assertEquals(npt("ts1", (short) 5), queueSum.getNextTserver("q1")); - assertEquals(npt("ts1", (short) 5), queueSum.getNextTserver("q2")); - assertEquals(npt("ts2", (short) 5), queueSum.getNextTserver("q1")); - assertEquals(npt("ts2", (short) 5), queueSum.getNextTserver("q3")); - } - - // an update from the tserver should remove some existing entries - update(queueSum, "ts1", "q1", "4", "q2", "6"); - update(queueSum, "ts2", "q1", "7", "q3", "3", "q4", "5"); - - for (int i = 0; i < 3; i++) { - assertEquals(npt("ts2", (short) 7), queueSum.getNextTserver("q1")); - assertEquals(npt("ts1", (short) 6), queueSum.getNextTserver("q2")); - assertEquals(npt("ts2", (short) 3), queueSum.getNextTserver("q3")); - assertEquals(npt("ts2", (short) 5), queueSum.getNextTserver("q4")); - } - - queueSum.removeSummary(ntsi("ts2"), "q1", (short) 7); - queueSum.removeSummary(ntsi("ts1"), "q2", (short) 6); - - for (int i = 0; i < 3; i++) { - assertEquals(npt("ts1", (short) 4), queueSum.getNextTserver("q1")); - assertNull(queueSum.getNextTserver("q2")); - assertEquals(npt("ts2", (short) 3), queueSum.getNextTserver("q3")); - assertEquals(npt("ts2", (short) 5), queueSum.getNextTserver("q4")); - } - } - - @Test - public void testRemoveTserver() { - QueueSummaries queueSum = new QueueSummaries(); - - update(queueSum, "ts1", "q1", "5", "q2", "5", "q3", "4"); - update(queueSum, "ts2", "q1", "5", "q2", "4", "q3", "5"); - - for (int i = 0; i < 3; i++) { - assertEquals(npt("ts1", (short) 5), queueSum.getNextTserver("q1")); - assertEquals(npt("ts1", (short) 5), queueSum.getNextTserver("q2")); - assertEquals(npt("ts2", (short) 5), queueSum.getNextTserver("q1")); - assertEquals(npt("ts2", (short) 5), queueSum.getNextTserver("q3")); - } - - queueSum.remove(Set.of(ntsi("ts1"))); - - for (int i = 0; i < 3; i++) { - assertEquals(npt("ts2", (short) 4), queueSum.getNextTserver("q2")); - assertEquals(npt("ts2", (short) 5), queueSum.getNextTserver("q1")); - assertEquals(npt("ts2", (short) 5), queueSum.getNextTserver("q3")); - } - - queueSum.removeSummary(ntsi("ts2"), "q3", (short) 5); - - for (int i = 0; i < 3; i++) { - assertEquals(npt("ts2", (short) 4), queueSum.getNextTserver("q2")); - assertEquals(npt("ts2", (short) 5), queueSum.getNextTserver("q1")); - assertNull(queueSum.getNextTserver("q3")); - } - - update(queueSum, "ts1", "q2", "6", "q1", "3"); - - for (int i = 0; i < 3; i++) { - assertEquals(npt("ts1", (short) 6), queueSum.getNextTserver("q2")); - assertEquals(npt("ts2", (short) 5), queueSum.getNextTserver("q1")); - assertNull(queueSum.getNextTserver("q3")); - } - - queueSum.remove(Set.of(ntsi("ts2"))); - - for (int i = 0; i < 3; i++) { - assertEquals(npt("ts1", (short) 6), queueSum.getNextTserver("q2")); - assertEquals(npt("ts1", (short) 3), queueSum.getNextTserver("q1")); - assertNull(queueSum.getNextTserver("q3")); - } - - queueSum.removeSummary(ntsi("ts1"), "q2", (short) 6); - - for (int i = 0; i < 3; i++) { - assertEquals(npt("ts1", (short) 3), queueSum.getNextTserver("q1")); - assertNull(queueSum.getNextTserver("q2")); - assertNull(queueSum.getNextTserver("q3")); - } - - queueSum.remove(Set.of(ntsi("ts1"))); - - for (int i = 0; i < 3; i++) { - assertNull(queueSum.getNextTserver("q1")); - assertNull(queueSum.getNextTserver("q2")); - assertNull(queueSum.getNextTserver("q3")); - } - } -} diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java index 3a7c4e78c0f..9984783ad8f 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java @@ -1089,6 +1089,11 @@ public Optional getFiles(CompactionServiceId service, CompactionKind kind return Optional.empty(); } + if (!getExtent().isRootTablet() && !getExtent().isMeta() && kind == CompactionKind.SYSTEM) { + // ELASTICITY_TODO a hack added to disable system compactions for user tablets + return Optional.empty(); + } + servicesUsed.add(service); var files = tablet.getDatafiles(); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java index b56d8d20836..d8cdcb37c60 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java @@ -449,16 +449,4 @@ static Optional bringOnline(DatafileManager datafileManager, cInfo.checkCompactionId, cInfo.selectedFiles, dfv, Optional.empty()); } - public static ReferencedTabletFile computeCompactionFileDest(ReferencedTabletFile tmpFile) { - String newFilePath = tmpFile.getMetaInsert(); - int idx = newFilePath.indexOf("_tmp"); - if (idx > 0) { - newFilePath = newFilePath.substring(0, idx); - } else { - throw new IllegalArgumentException( - "Expected compaction tmp file " + tmpFile.getMetaInsert() + " to have suffix '_tmp'"); - } - return new ReferencedTabletFile(new Path(newFilePath)); - } - } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java index 8d1171b4221..eea648ef7bc 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java @@ -47,6 +47,7 @@ import org.apache.accumulo.core.util.MapCounter; import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.server.fs.VolumeManager; +import org.apache.accumulo.server.tablets.TabletNameGenerator; import org.apache.accumulo.server.util.ManagerMetadataUtil; import org.apache.accumulo.server.util.MetadataTableUtil; import org.apache.hadoop.fs.Path; @@ -402,7 +403,7 @@ Optional bringMajorCompactionOnline(Set oldD VolumeManager vm = tablet.getTabletServer().getContext().getVolumeManager(); long t1, t2; - ReferencedTabletFile newDatafile = CompactableUtils.computeCompactionFileDest(tmpDatafile); + ReferencedTabletFile newDatafile = TabletNameGenerator.computeCompactionFileDest(tmpDatafile); if (vm.exists(newDatafile.getPath())) { log.error("Target data file already exist " + newDatafile, new Exception()); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java index 538c2387c76..293572f99af 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java @@ -26,6 +26,7 @@ import java.io.DataInputStream; import java.io.FileNotFoundException; import java.io.IOException; +import java.io.UncheckedIOException; import java.lang.ref.SoftReference; import java.util.ArrayList; import java.util.Collection; @@ -64,7 +65,6 @@ import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.dataImpl.KeyExtent; -import org.apache.accumulo.core.file.FileOperations; import org.apache.accumulo.core.file.FilePrefix; import org.apache.accumulo.core.iterators.SortedKeyValueIterator; import org.apache.accumulo.core.iteratorsImpl.system.SourceSwitchingIterator; @@ -83,7 +83,6 @@ import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location; import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl; import org.apache.accumulo.core.security.Authorizations; -import org.apache.accumulo.core.spi.fs.VolumeChooserEnvironment; import org.apache.accumulo.core.spi.scan.ScanDispatch; import org.apache.accumulo.core.tabletingest.thrift.DataFileInfo; import org.apache.accumulo.core.tabletserver.log.LogEntry; @@ -93,15 +92,14 @@ import org.apache.accumulo.core.volume.Volume; import org.apache.accumulo.server.compaction.CompactionStats; import org.apache.accumulo.server.compaction.PausedCompactionMetrics; -import org.apache.accumulo.server.fs.VolumeChooserEnvironmentImpl; import org.apache.accumulo.server.fs.VolumeUtil; import org.apache.accumulo.server.fs.VolumeUtil.TabletFiles; import org.apache.accumulo.server.problems.ProblemReport; import org.apache.accumulo.server.problems.ProblemReports; import org.apache.accumulo.server.problems.ProblemType; import org.apache.accumulo.server.tablets.ConditionCheckerContext.ConditionChecker; +import org.apache.accumulo.server.tablets.TabletNameGenerator; import org.apache.accumulo.server.tablets.TabletTime; -import org.apache.accumulo.server.tablets.UniqueNameAllocator; import org.apache.accumulo.server.util.FileUtil; import org.apache.accumulo.server.util.ManagerMetadataUtil; import org.apache.accumulo.server.util.MetadataTableUtil; @@ -247,42 +245,39 @@ public static class LookupResult { } private String chooseTabletDir() throws IOException { - VolumeChooserEnvironment chooserEnv = - new VolumeChooserEnvironmentImpl(extent.tableId(), extent.endRow(), context); - String dirUri = tabletServer.getVolumeManager().choose(chooserEnv, context.getBaseUris()) - + Constants.HDFS_TABLES_DIR + Path.SEPARATOR + extent.tableId() + Path.SEPARATOR + dirName; - checkTabletDir(new Path(dirUri)); - return dirUri; + return TabletNameGenerator.chooseTabletDir(context, extent, dirName, + dir -> checkTabletDir(new Path(dir))); } ReferencedTabletFile getNextDataFilename(FilePrefix prefix) throws IOException { - String extension = FileOperations.getNewFileExtension(tableConfiguration); - return new ReferencedTabletFile(new Path(chooseTabletDir() + "/" + prefix.toPrefix() - + context.getUniqueNameAllocator().getNextName() + "." + extension)); + return TabletNameGenerator.getNextDataFilename(prefix, context, extent, dirName, + dir -> checkTabletDir(new Path(dir))); } ReferencedTabletFile getNextDataFilenameForMajc(boolean propagateDeletes) throws IOException { - String tmpFileName = getNextDataFilename( - !propagateDeletes ? FilePrefix.MAJOR_COMPACTION_ALL_FILES : FilePrefix.MAJOR_COMPACTION) - .getMetaInsert() + "_tmp"; - return new ReferencedTabletFile(new Path(tmpFileName)); + return TabletNameGenerator.getNextDataFilenameForMajc(propagateDeletes, context, extent, + dirName, dir -> checkTabletDir(new Path(dir))); } - private void checkTabletDir(Path path) throws IOException { - if (!checkedTabletDirs.contains(path)) { - FileStatus[] files = null; - try { - files = getTabletServer().getVolumeManager().listStatus(path); - } catch (FileNotFoundException ex) { - // ignored - } + private void checkTabletDir(Path path) { + try { + if (!checkedTabletDirs.contains(path)) { + FileStatus[] files = null; + try { + files = getTabletServer().getVolumeManager().listStatus(path); + } catch (FileNotFoundException ex) { + // ignored + } - if (files == null) { - log.debug("Tablet {} had no dir, creating {}", extent, path); + if (files == null) { + log.debug("Tablet {} had no dir, creating {}", extent, path); - getTabletServer().getVolumeManager().mkdirs(path); + getTabletServer().getVolumeManager().mkdirs(path); + } + checkedTabletDirs.add(path); } - checkedTabletDirs.add(path); + } catch (IOException e) { + throw new UncheckedIOException(e); } } @@ -1558,7 +1553,7 @@ public TreeMap split(byte[] sp) throws IOException { KeyExtent low = new KeyExtent(extent.tableId(), midRow, extent.prevEndRow()); KeyExtent high = new KeyExtent(extent.tableId(), extent.endRow(), midRow); - String lowDirectoryName = UniqueNameAllocator.createTabletDirectoryName(context, midRow); + String lowDirectoryName = TabletNameGenerator.createTabletDirectoryName(context, midRow); // write new tablet information to MetadataTable SortedMap lowDatafileSizes = new TreeMap<>(); diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/compaction/CompactableUtilsTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/compaction/CompactableUtilsTest.java index 371f4bb3146..b288af0f99c 100644 --- a/server/tserver/src/test/java/org/apache/accumulo/tserver/compaction/CompactableUtilsTest.java +++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/compaction/CompactableUtilsTest.java @@ -21,7 +21,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import org.apache.accumulo.core.metadata.ReferencedTabletFile; -import org.apache.accumulo.tserver.tablet.CompactableUtils; +import org.apache.accumulo.server.tablets.TabletNameGenerator; import org.apache.hadoop.fs.Path; import org.junit.jupiter.api.Test; @@ -33,7 +33,7 @@ public void testEquivalence() { new Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000070.rf")); ReferencedTabletFile tmpFile = new ReferencedTabletFile(new Path(expected.getMetaInsert() + "_tmp")); - ReferencedTabletFile dest = CompactableUtils.computeCompactionFileDest(tmpFile); + ReferencedTabletFile dest = TabletNameGenerator.computeCompactionFileDest(tmpFile); assertEquals(expected, dest); } diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java index 97f7a191465..97ad4433203 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java @@ -45,6 +45,7 @@ import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; +import org.apache.accumulo.compactor.Compactor; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.AccumuloException; @@ -505,6 +506,30 @@ public void testEndOfFirstTablet() throws Exception { } } + @Test + public void testManyFiles() throws Exception { + + getCluster().getClusterControl().startCompactors(Compactor.class, 1, "user-small"); + + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { + String dir = getDir("/testBulkFile-"); + FileSystem fs = getCluster().getFileSystem(); + fs.mkdirs(new Path(dir)); + + addSplits(c, tableName, "5000"); + + for (int i = 0; i < 100; i++) { + writeData(dir + "/f" + i + ".", aconf, i * 100, (i + 1) * 100 - 1); + } + + c.tableOperations().importDirectory(dir).to(tableName).load(); + + verifyData(c, tableName, 0, 100 * 100 - 1, false); + + Thread.sleep(600000); + } + } + private void addSplits(AccumuloClient client, String tableName, String splitString) throws Exception { SortedSet splits = new TreeSet<>(); From 5e0e0e30f01dd0b35247946c6ad44930c57366d9 Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Thu, 8 Jun 2023 23:10:56 -0400 Subject: [PATCH 2/7] format recent changes --- .../queue/CompactionJobPriorityQueue.java | 5 ++--- .../compaction/queue/CompactionJobQueues.java | 17 ++++++++++------- .../compaction/CompactionCoordinatorTest.java | 4 +++- .../accumulo/test/functional/BulkNewIT.java | 2 -- 4 files changed, 15 insertions(+), 13 deletions(-) diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java index 9dc6bba41ff..c2a1e77084c 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java @@ -109,11 +109,10 @@ public CompactionJobPriorityQueue(CompactionExecutorId executorId, int maxSize) } public synchronized boolean add(TabletMetadata tabletMetadata, Collection jobs) { - if(closed){ + if (closed) { return false; } - Preconditions .checkArgument(jobs.stream().allMatch(job -> job.getExecutor().equals(executorId))); @@ -151,7 +150,7 @@ public synchronized CompactionJobQueues.MetaJob poll() { } public synchronized boolean closeIfEmpty() { - if(jobQueue.isEmpty()) { + if (jobQueue.isEmpty()) { closed = true; return true; } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java index f76f6e554b0..a763a5550c7 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java @@ -72,9 +72,9 @@ public MetaJob poll(CompactionExecutorId executorId) { } MetaJob mj = prioQ.poll(); - if(mj == null){ - priorityQueues.computeIfPresent(executorId, (eid, pq)->{ - if(pq.closeIfEmpty()){ + if (mj == null) { + priorityQueues.computeIfPresent(executorId, (eid, pq) -> { + if (pq.closeIfEmpty()) { return null; } else { return pq; @@ -96,10 +96,13 @@ private void add(TabletMetadata tabletMetadata, CompactionExecutorId executorId, } // TODO make max size configurable - var pq = priorityQueues.computeIfAbsent(executorId, eid -> new CompactionJobPriorityQueue(eid, 10000)); - while(!pq.add(tabletMetadata, jobs)){ - // This loop handles race condition where poll() closes empty priority queues. The queue could be closed after its obtained from the map and before add is called. - pq = priorityQueues.computeIfAbsent(executorId, eid -> new CompactionJobPriorityQueue(eid, 10000)); + var pq = priorityQueues.computeIfAbsent(executorId, + eid -> new CompactionJobPriorityQueue(eid, 10000)); + while (!pq.add(tabletMetadata, jobs)) { + // This loop handles race condition where poll() closes empty priority queues. The queue could + // be closed after its obtained from the map and before add is called. + pq = priorityQueues.computeIfAbsent(executorId, + eid -> new CompactionJobPriorityQueue(eid, 10000)); } } } diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java index 68107da1958..6ef94028862 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java @@ -19,5 +19,7 @@ package org.apache.accumulo.manager.compaction; public class CompactionCoordinatorTest { - // ELASTICITY_TODO this test was no longer compiling with all the changes to CompactionCoordinator. Its contents were deleted to get things compiling, however need to go and look at the test and determine what to carry forward with CompactionCoordinator. + // ELASTICITY_TODO this test was no longer compiling with all the changes to + // CompactionCoordinator. Its contents were deleted to get things compiling, however need to go + // and look at the test and determine what to carry forward with CompactionCoordinator. } diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java index 97ad4433203..044a20d3990 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java @@ -525,8 +525,6 @@ public void testManyFiles() throws Exception { c.tableOperations().importDirectory(dir).to(tableName).load(); verifyData(c, tableName, 0, 100 * 100 - 1, false); - - Thread.sleep(600000); } } From 00111e7cea59a609d8940d9eab972607e1257c54 Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Fri, 9 Jun 2023 10:49:45 -0400 Subject: [PATCH 3/7] fixes build errors --- server/manager/pom.xml | 4 ---- .../compaction/coordinator/CompactionCoordinator.java | 2 +- .../compaction/queue/CompactionJobPriorityQueue.java | 6 ++++-- 3 files changed, 5 insertions(+), 7 deletions(-) diff --git a/server/manager/pom.xml b/server/manager/pom.xml index f9e0e45234f..f08e01070a0 100644 --- a/server/manager/pom.xml +++ b/server/manager/pom.xml @@ -80,10 +80,6 @@ org.apache.accumulo accumulo-start
- - org.apache.accumulo - accumulo-tserver - org.apache.commons commons-lang3 diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java index 3af90565562..5c1c027da1c 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java @@ -382,7 +382,7 @@ TExternalCompactionJob createThriftJob(String externalCompactionId, ecm.getCompactTmpName().getNormalizedPathStr(), ecm.getPropagateDeletes(), TCompactionKind.valueOf(ecm.getKind().name()), ecm.getCompactionId() == null ? 0 : ecm.getCompactionId(), overrides); - }; + } /** * Compactor calls compactionCompleted passing in the CompactionStats diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java index c2a1e77084c..65b5a904b6a 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java @@ -77,10 +77,12 @@ public int hashCode() { @Override public boolean equals(Object o) { - if (this == o) + if (this == o) { return true; - if (o == null || getClass() != o.getClass()) + } + if (o == null || getClass() != o.getClass()) { return false; + } CjqpKey cjqpKey = (CjqpKey) o; return seq == cjqpKey.seq && job.equals(cjqpKey.job); } From dccbd554345db6364882c2cdebd30dd74a739547 Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Fri, 9 Jun 2023 12:00:37 -0400 Subject: [PATCH 4/7] removes TODO addressed in #3478 --- .../java/org/apache/accumulo/core/metadata/schema/Ample.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java index 1c2571a4959..eb1c0486b35 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java @@ -338,7 +338,7 @@ interface TabletUpdates { T putBulkFile(ReferencedTabletFile bulkref, long tid); - T deleteBulkFile(ReferencedTabletFile bulkref); // TODO should probably be a stored tablet file + T deleteBulkFile(ReferencedTabletFile bulkref); T putChopped(); From 3fe3a71120260e9496a649136d94f281a862c317 Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Mon, 12 Jun 2023 20:12:21 -0400 Subject: [PATCH 5/7] fix bug in code added after testing --- .../manager/compaction/queue/CompactionJobPriorityQueue.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java index 65b5a904b6a..99ef3d327d5 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java @@ -101,7 +101,7 @@ public boolean equals(Object o) { private long nextSeq; - private boolean closed = true; + private boolean closed = false; public CompactionJobPriorityQueue(CompactionExecutorId executorId, int maxSize) { this.jobQueue = new TreeMap<>(); From f85223e76c2ded000eff80be93891a6721c2615d Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Mon, 12 Jun 2023 20:25:05 -0400 Subject: [PATCH 6/7] reuse code --- .../compaction/CompactionJobGenerator.java | 38 ++----------------- 1 file changed, 3 insertions(+), 35 deletions(-) diff --git a/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionJobGenerator.java b/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionJobGenerator.java index 373603d9bb7..3f93e8903b8 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionJobGenerator.java +++ b/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionJobGenerator.java @@ -34,17 +34,15 @@ import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.spi.common.ServiceEnvironment; import org.apache.accumulo.core.spi.compaction.CompactionDispatcher; -import org.apache.accumulo.core.spi.compaction.CompactionExecutorId; import org.apache.accumulo.core.spi.compaction.CompactionJob; import org.apache.accumulo.core.spi.compaction.CompactionKind; import org.apache.accumulo.core.spi.compaction.CompactionPlan; import org.apache.accumulo.core.spi.compaction.CompactionPlanner; import org.apache.accumulo.core.spi.compaction.CompactionServiceId; import org.apache.accumulo.core.spi.compaction.CompactionServices; -import org.apache.accumulo.core.spi.compaction.ExecutorManager; -import org.apache.accumulo.core.util.compaction.CompactionExecutorIdImpl; import org.apache.accumulo.core.util.compaction.CompactionJobImpl; import org.apache.accumulo.core.util.compaction.CompactionPlanImpl; +import org.apache.accumulo.core.util.compaction.CompactionPlannerInitParams; import org.apache.accumulo.core.util.compaction.CompactionServicesConfig; import com.github.benmanes.caffeine.cache.Cache; @@ -264,38 +262,8 @@ private CompactionPlanner createPlanner(CompactionServiceId serviceId) { throw new RuntimeException(e); } - CompactionPlanner.InitParameters initParameters = new CompactionPlanner.InitParameters() { - @Override - public ServiceEnvironment getServiceEnvironment() { - return (ServiceEnvironment) env; - } - - @Override - public Map getOptions() { - return servicesConfig.getOptions().get(serviceId.canonical()); - } - - @Override - public String getFullyQualifiedOption(String key) { - return Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + serviceId + ".opts." + key; - } - - @Override - public ExecutorManager getExecutorManager() { - return new ExecutorManager() { - @Override - public CompactionExecutorId createExecutor(String name, int threads) { - // ELASTICITY_TODO need to deprecate - return CompactionExecutorIdImpl.internalId(serviceId, name); - } - - @Override - public CompactionExecutorId getExternalExecutor(String name) { - return CompactionExecutorIdImpl.externalId(name); - } - }; - } - }; + CompactionPlannerInitParams initParameters = new CompactionPlannerInitParams(serviceId, + servicesConfig.getOptions().get(serviceId.canonical()), (ServiceEnvironment) env); planner.init(initParameters); From f6839ccc1871829ad4a60444b362197a99357257 Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Tue, 13 Jun 2023 10:20:40 -0400 Subject: [PATCH 7/7] address code review comments --- .../queue/CompactionJobPriorityQueue.java | 24 +++++++++---------- .../compaction/queue/CompactionJobQueues.java | 6 +++++ 2 files changed, 18 insertions(+), 12 deletions(-) diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java index 99ef3d327d5..aa9477818b6 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java @@ -49,20 +49,20 @@ public class CompactionJobPriorityQueue { // ELASTICITY_TODO unit test this class private final CompactionExecutorId executorId; - private class CjqpKey implements Comparable { + private class CjpqKey implements Comparable { private final CompactionJob job; // this exists to make every entry unique even if the job is the same, this is done because a // treeset is used as a queue private final long seq; - CjqpKey(CompactionJob job) { + CjpqKey(CompactionJob job) { this.job = job; this.seq = nextSeq++; } @Override - public int compareTo(CjqpKey oe) { + public int compareTo(CjpqKey oe) { int cmp = CompactionJobPrioritizer.JOB_COMPARATOR.compare(this.job, oe.job); if (cmp == 0) { cmp = Long.compare(seq, oe.seq); @@ -83,7 +83,7 @@ public boolean equals(Object o) { if (o == null || getClass() != o.getClass()) { return false; } - CjqpKey cjqpKey = (CjqpKey) o; + CjpqKey cjqpKey = (CjpqKey) o; return seq == cjqpKey.seq && job.equals(cjqpKey.job); } } @@ -92,12 +92,12 @@ public boolean equals(Object o) { // behavior is not supported with a PriorityQueue. Second a PriorityQueue does not support // efficiently removing entries from anywhere in the queue. Efficient removal is needed for the // case where tablets decided to issues different compaction jobs than what is currently queued. - private final TreeMap jobQueue; + private final TreeMap jobQueue; private final int maxSize; // This map tracks what jobs a tablet currently has in the queue. Its used to efficiently remove // jobs in the queue when new jobs are queued for a tablet. - private final Map> tabletJobs; + private final Map> tabletJobs; private long nextSeq; @@ -120,10 +120,10 @@ public synchronized boolean add(TabletMetadata tabletMetadata, Collection newEntries = new ArrayList<>(jobs.size()); + List newEntries = new ArrayList<>(jobs.size()); for (CompactionJob job : jobs) { - CjqpKey cjqpKey = addJobToQueue(tabletMetadata, job); + CjpqKey cjqpKey = addJobToQueue(tabletMetadata, job); if (cjqpKey != null) { newEntries.add(cjqpKey); } @@ -141,7 +141,7 @@ public synchronized CompactionJobQueues.MetaJob poll() { if (first != null) { var extent = first.getValue().getTabletMetadata().getExtent(); - List jobs = tabletJobs.get(extent); + List jobs = tabletJobs.get(extent); checkState(jobs.remove(first.getKey())); if (jobs.isEmpty()) { tabletJobs.remove(extent); @@ -161,14 +161,14 @@ public synchronized boolean closeIfEmpty() { } private void removePreviousSubmissions(KeyExtent extent) { - List prevJobs = tabletJobs.get(extent); + List prevJobs = tabletJobs.get(extent); if (prevJobs != null) { prevJobs.forEach(jobQueue::remove); tabletJobs.remove(extent); } } - private CjqpKey addJobToQueue(TabletMetadata tabletMetadata, CompactionJob job) { + private CjpqKey addJobToQueue(TabletMetadata tabletMetadata, CompactionJob job) { if (jobQueue.size() >= maxSize) { var lastEntry = jobQueue.lastKey(); if (job.getPriority() <= lastEntry.job.getPriority()) { @@ -182,7 +182,7 @@ private CjqpKey addJobToQueue(TabletMetadata tabletMetadata, CompactionJob job) } - var key = new CjqpKey(job); + var key = new CjpqKey(job); jobQueue.put(key, new CompactionJobQueues.MetaJob(job, tabletMetadata)); return key; } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java index a763a5550c7..02d037dff08 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java @@ -32,6 +32,12 @@ public class CompactionJobQueues { private static final Logger log = LoggerFactory.getLogger(CompactionJobQueues.class); + // The code in this class specifically depends on the behavior of ConcurrentHashMap, behavior that + // other ConcurrentMap implementations may not have. The behavior it depended on is that the + // compute functions for a key are atomic. This is documented on its javadoc and in the impl it + // can be observed that scoped locks are acquired. Other concurrent map impls may run the compute + // lambdas concurrently for a given key, which may still be correct but is more difficult to + // analyze. private final ConcurrentHashMap priorityQueues = new ConcurrentHashMap<>();