diff --git a/core/src/main/java/org/apache/accumulo/core/Constants.java b/core/src/main/java/org/apache/accumulo/core/Constants.java index 5118607e2eb..a06c41ea5c7 100644 --- a/core/src/main/java/org/apache/accumulo/core/Constants.java +++ b/core/src/main/java/org/apache/accumulo/core/Constants.java @@ -124,4 +124,6 @@ public class Constants { public static final String HDFS_TABLES_DIR = "/tables"; public static final int DEFAULT_VISIBILITY_CACHE_SIZE = 1000; + + public static final String DEFAULT_RESOURCE_GROUP_NAME = "default"; } diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index c8f8116e2d0..c982689f31d 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -25,6 +25,7 @@ import java.util.HashSet; import java.util.function.Predicate; +import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.classloader.ClassLoaderUtil; import org.apache.accumulo.core.data.constraints.NoDeleteConstraint; import org.apache.accumulo.core.file.rfile.RFile; @@ -391,9 +392,10 @@ public enum Property { @Experimental SSERV_GROUP_NAME("sserver.group", ScanServerSelector.DEFAULT_SCAN_SERVER_GROUP_NAME, PropertyType.STRING, - "Optional group name that will be made available to the " - + "ScanServerSelector client plugin. Groups support at least two use cases:" - + " dedicating resources to scans and/or using different hardware for scans.", + "Resource group name for this ScanServer. Resource groups support at least two use cases:" + + " dedicating resources to scans and/or using different hardware for scans. Clients can" + + " configure the ConfigurableScanServerSelector to specify the resource group to use for" + + " eventual consistency scans.", "3.0.0"), @Experimental SSERV_CACHED_TABLET_METADATA_EXPIRATION("sserver.cache.metadata.expiration", "5m", @@ -731,6 +733,10 @@ public enum Property { PropertyType.TIMEDURATION, "The interval at which the TabletServer will check if on-demand tablets can be unloaded", "4.0.0"), + TSERV_GROUP_NAME("tserver.group", Constants.DEFAULT_RESOURCE_GROUP_NAME, PropertyType.STRING, + "Resource group name for this TabletServer. Resource groups can be defined to dedicate resources " + + " to specific tables (e.g. balancing tablets for table(s) within a group, see TABLE_ASSIGNMENT_GROUP)", + "4.0.0"), // accumulo garbage collector properties GC_PREFIX("gc.", null, PropertyType.PREFIX, @@ -1076,7 +1082,6 @@ public enum Property { + "also consider configuring the `" + NoDeleteConstraint.class.getName() + "` " + "constraint.", "2.0.0"), - // Compactor properties @Experimental COMPACTOR_PREFIX("compactor.", null, PropertyType.PREFIX, @@ -1102,8 +1107,8 @@ public enum Property { COMPACTOR_MAX_MESSAGE_SIZE("compactor.message.size.max", "10M", PropertyType.BYTES, "The maximum size of a message that can be sent to a tablet server.", "2.1.0"), @Experimental - COMPACTOR_QUEUE_NAME("compactor.queue", "", PropertyType.STRING, - "The queue for which this Compactor will perform compactions", "3.0.0"), + COMPACTOR_QUEUE_NAME("compactor.queue", Constants.DEFAULT_RESOURCE_GROUP_NAME, + PropertyType.STRING, "The queue for which this Compactor will perform compactions", "3.0.0"), // CompactionCoordinator properties @Experimental COMPACTION_COORDINATOR_PREFIX("compaction.coordinator.", null, PropertyType.PREFIX, diff --git a/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockData.java b/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockData.java index 736a36f0fea..2b0b6327815 100644 --- a/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockData.java +++ b/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockData.java @@ -58,20 +58,11 @@ public static enum ThriftService { */ public static class ServiceDescriptor { - /** - * The group name that will be used when one is not specified. - */ - public static final String DEFAULT_GROUP_NAME = "default"; - private final UUID uuid; private final ThriftService service; private final String address; private final String group; - public ServiceDescriptor(UUID uuid, ThriftService service, String address) { - this(uuid, service, address, DEFAULT_GROUP_NAME); - } - public ServiceDescriptor(UUID uuid, ThriftService service, String address, String group) { this.uuid = requireNonNull(uuid); this.service = requireNonNull(service); @@ -157,11 +148,6 @@ public ServiceLockData(UUID uuid, String address, ThriftService service, String Collections.singleton(new ServiceDescriptor(uuid, service, address, group))))); } - public ServiceLockData(UUID uuid, String address, ThriftService service) { - this(new ServiceDescriptors( - new HashSet<>(Collections.singleton(new ServiceDescriptor(uuid, service, address))))); - } - public String getAddressString(ThriftService service) { ServiceDescriptor sd = services.get(service); if (sd == null) { diff --git a/core/src/main/java/org/apache/accumulo/core/manager/balancer/AssignmentParamsImpl.java b/core/src/main/java/org/apache/accumulo/core/manager/balancer/AssignmentParamsImpl.java index 14ccaa5d05c..7bdbf70fc1a 100644 --- a/core/src/main/java/org/apache/accumulo/core/manager/balancer/AssignmentParamsImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/manager/balancer/AssignmentParamsImpl.java @@ -20,7 +20,9 @@ import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; @@ -40,25 +42,38 @@ public class AssignmentParamsImpl implements TabletBalancer.AssignmentParameters private final SortedMap thriftCurrentStatus; private final Map thriftUnassigned; private final Map thriftAssignmentsOut; + private final Map> tserverGroups; public static AssignmentParamsImpl fromThrift( SortedMap currentStatus, + Map> currentTServerGrouping, Map unassigned, Map assignmentsOut) { SortedMap currentStatusNew = new TreeMap<>(); currentStatus.forEach((tsi, status) -> currentStatusNew.put(new TabletServerIdImpl(tsi), TServerStatusImpl.fromThrift(status))); + + Map> tserverGroups = new HashMap<>(); + currentTServerGrouping.forEach((k, v) -> { + Set servers = new HashSet<>(); + v.forEach(tsi -> servers.add(TabletServerIdImpl.fromThrift(tsi))); + tserverGroups.put(k, servers); + }); + Map unassignedNew = new HashMap<>(); unassigned.forEach( (ke, tsi) -> unassignedNew.put(new TabletIdImpl(ke), TabletServerIdImpl.fromThrift(tsi))); return new AssignmentParamsImpl(Collections.unmodifiableSortedMap(currentStatusNew), - Collections.unmodifiableMap(unassignedNew), currentStatus, unassigned, assignmentsOut); + Collections.unmodifiableMap(tserverGroups), Collections.unmodifiableMap(unassignedNew), + currentStatus, unassigned, assignmentsOut); } public AssignmentParamsImpl(SortedMap currentStatus, - Map unassigned, Map assignmentsOut) { + Map> currentGroups, Map unassigned, + Map assignmentsOut) { this.currentStatus = currentStatus; + this.tserverGroups = currentGroups; this.unassigned = unassigned; this.assignmentsOut = assignmentsOut; this.thriftCurrentStatus = null; @@ -67,11 +82,12 @@ public AssignmentParamsImpl(SortedMap currentStatu } private AssignmentParamsImpl(SortedMap currentStatus, - Map unassigned, + Map> currentGroups, Map unassigned, SortedMap thriftCurrentStatus, Map thriftUnassigned, Map thriftAssignmentsOut) { this.currentStatus = currentStatus; + this.tserverGroups = currentGroups; this.unassigned = unassigned; this.assignmentsOut = null; this.thriftCurrentStatus = thriftCurrentStatus; @@ -84,6 +100,11 @@ public SortedMap currentStatus() { return currentStatus; } + @Override + public Map> currentResourceGroups() { + return tserverGroups; + } + @Override public Map unassignedTablets() { return unassigned; diff --git a/core/src/main/java/org/apache/accumulo/core/manager/balancer/BalanceParamsImpl.java b/core/src/main/java/org/apache/accumulo/core/manager/balancer/BalanceParamsImpl.java index a7cc5226d33..6794b847e89 100644 --- a/core/src/main/java/org/apache/accumulo/core/manager/balancer/BalanceParamsImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/manager/balancer/BalanceParamsImpl.java @@ -19,7 +19,10 @@ package org.apache.accumulo.core.manager.balancer; import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.SortedMap; import java.util.stream.Collectors; @@ -40,20 +43,31 @@ public class BalanceParamsImpl implements TabletBalancer.BalanceParameters { private final List migrationsOut; private final SortedMap thriftCurrentStatus; private final Set thriftCurrentMigrations; + private final Map> tserverResourceGroups; public static BalanceParamsImpl fromThrift(SortedMap currentStatus, + Map> currentTServerGrouping, SortedMap thriftCurrentStatus, Set thriftCurrentMigrations) { Set currentMigrations = thriftCurrentMigrations.stream().map(TabletIdImpl::new) .collect(Collectors.toUnmodifiableSet()); - return new BalanceParamsImpl(currentStatus, currentMigrations, new ArrayList<>(), + Map> tserverGroups = new HashMap<>(); + currentTServerGrouping.forEach((k, v) -> { + Set servers = new HashSet<>(); + v.forEach(tsi -> servers.add(TabletServerIdImpl.fromThrift(tsi))); + tserverGroups.put(k, servers); + }); + + return new BalanceParamsImpl(currentStatus, tserverGroups, currentMigrations, new ArrayList<>(), thriftCurrentStatus, thriftCurrentMigrations); } public BalanceParamsImpl(SortedMap currentStatus, - Set currentMigrations, List migrationsOut) { + Map> currentGroups, Set currentMigrations, + List migrationsOut) { this.currentStatus = currentStatus; + this.tserverResourceGroups = currentGroups; this.currentMigrations = currentMigrations; this.migrationsOut = migrationsOut; this.thriftCurrentStatus = null; @@ -61,10 +75,12 @@ public BalanceParamsImpl(SortedMap currentStatus, } private BalanceParamsImpl(SortedMap currentStatus, - Set currentMigrations, List migrationsOut, + Map> currentGroups, Set currentMigrations, + List migrationsOut, SortedMap thriftCurrentStatus, Set thriftCurrentMigrations) { this.currentStatus = currentStatus; + this.tserverResourceGroups = currentGroups; this.currentMigrations = currentMigrations; this.migrationsOut = migrationsOut; this.thriftCurrentStatus = thriftCurrentStatus; @@ -100,4 +116,10 @@ public void addMigration(KeyExtent extent, TServerInstance oldServer, TServerIns TabletServerId newTsid = new TabletServerIdImpl(newServer); migrationsOut.add(new TabletMigration(id, oldTsid, newTsid)); } + + @Override + public Map> currentResourceGroups() { + return tserverResourceGroups; + } + } diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/TabletState.java b/core/src/main/java/org/apache/accumulo/core/metadata/TabletState.java index 33d6141a0a8..0d4b74d0600 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/TabletState.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/TabletState.java @@ -19,5 +19,5 @@ package org.apache.accumulo.core.metadata; public enum TabletState { - UNASSIGNED, ASSIGNED, HOSTED, ASSIGNED_TO_DEAD_SERVER, SUSPENDED + UNASSIGNED, ASSIGNED, HOSTED, ASSIGNED_TO_DEAD_SERVER, SUSPENDED, ASSIGNED_TO_WRONG_GROUP } diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java index 4aa281a13e2..fae8344503b 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java @@ -52,9 +52,11 @@ import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.dataImpl.TabletIdImpl; import org.apache.accumulo.core.fate.zookeeper.ZooCache; import org.apache.accumulo.core.lock.ServiceLock; import org.apache.accumulo.core.lock.ServiceLockData; +import org.apache.accumulo.core.manager.balancer.TabletServerIdImpl; import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.metadata.RootTable; import org.apache.accumulo.core.metadata.StoredTabletFile; @@ -75,6 +77,8 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.SuspendLocationColumn; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily; +import org.apache.accumulo.core.spi.balancer.TabletBalancer; +import org.apache.accumulo.core.spi.balancer.data.TabletServerId; import org.apache.accumulo.core.tabletserver.log.LogEntry; import org.apache.commons.lang3.builder.ToStringBuilder; import org.apache.commons.lang3.builder.ToStringStyle; @@ -421,6 +425,11 @@ public SortedMap getKeyValues() { } public TabletState getTabletState(Set liveTServers) { + return getTabletState(liveTServers, null, null); + } + + public TabletState getTabletState(Set liveTServers, TabletBalancer balancer, + Map> currentTServerGrouping) { ensureFetched(ColumnType.LOCATION); ensureFetched(ColumnType.LAST); ensureFetched(ColumnType.SUSPEND); @@ -435,8 +444,20 @@ public TabletState getTabletState(Set liveTServers) { return liveTServers.contains(future.getServerInstance()) ? TabletState.ASSIGNED : TabletState.ASSIGNED_TO_DEAD_SERVER; } else if (current != null) { - return liveTServers.contains(current.getServerInstance()) ? TabletState.HOSTED - : TabletState.ASSIGNED_TO_DEAD_SERVER; + if (liveTServers.contains(current.getServerInstance())) { + if (balancer != null) { + String resourceGroup = balancer.getResourceGroup(new TabletIdImpl(extent)); + log.trace("Resource Group for extent {} is {}", extent, resourceGroup); + Set tservers = currentTServerGrouping.get(resourceGroup); + if (tservers == null + || !tservers.contains(new TabletServerIdImpl(current.getServerInstance()))) { + return TabletState.ASSIGNED_TO_WRONG_GROUP; + } + } + return TabletState.HOSTED; + } else { + return TabletState.ASSIGNED_TO_DEAD_SERVER; + } } else if (getSuspend() != null) { return TabletState.SUSPENDED; } else { diff --git a/core/src/main/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancer.java b/core/src/main/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancer.java index f7dd3847913..6f5159f5dca 100644 --- a/core/src/main/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancer.java +++ b/core/src/main/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancer.java @@ -357,8 +357,8 @@ public void getAssignments(AssignmentParameters params) { } LOG.debug("Sending {} tablets to balancer for table {} for assignment within tservers {}", e.getValue().size(), tableName, currentView.keySet()); - getBalancerForTable(e.getKey()) - .getAssignments(new AssignmentParamsImpl(currentView, e.getValue(), newAssignments)); + getBalancerForTable(e.getKey()).getAssignments(new AssignmentParamsImpl(currentView, + params.currentResourceGroups(), e.getValue(), newAssignments)); newAssignments.forEach(params::addAssignment); } } @@ -495,8 +495,8 @@ public long balance(BalanceParameters params) { continue; } ArrayList newMigrations = new ArrayList<>(); - getBalancerForTable(tableId) - .balance(new BalanceParamsImpl(currentView, migrations, newMigrations)); + getBalancerForTable(tableId).balance(new BalanceParamsImpl(currentView, + params.currentResourceGroups(), migrations, newMigrations)); if (newMigrations.isEmpty()) { tableToTimeSinceNoMigrations.remove(tableId); diff --git a/core/src/main/java/org/apache/accumulo/core/spi/balancer/TableLoadBalancer.java b/core/src/main/java/org/apache/accumulo/core/spi/balancer/TableLoadBalancer.java index cb89e5b093a..915f4acb7cb 100644 --- a/core/src/main/java/org/apache/accumulo/core/spi/balancer/TableLoadBalancer.java +++ b/core/src/main/java/org/apache/accumulo/core/spi/balancer/TableLoadBalancer.java @@ -23,25 +23,43 @@ import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; +import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.classloader.ClassLoaderUtil; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.TabletId; import org.apache.accumulo.core.manager.balancer.AssignmentParamsImpl; import org.apache.accumulo.core.manager.balancer.BalanceParamsImpl; +import org.apache.accumulo.core.spi.balancer.data.TServerStatus; import org.apache.accumulo.core.spi.balancer.data.TabletMigration; import org.apache.accumulo.core.spi.balancer.data.TabletServerId; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** + * TabletBalancer that balances Tablets for a Table using the TabletBalancer defined by + * {@link Property#TABLE_LOAD_BALANCER}. This allows for different Tables to specify different + * TabletBalancer classes. + *

+ * Note that in versions prior to 4.0 this class would pass all known TabletServers to the Table + * load balancers. In version 4.0 this changed with the introduction of the + * {@link TABLE_ASSIGNMENT_GROUP_PROPERTY} table property. If defined, this balancer passes the + * TabletServers that have the corresponding {@link Property#TSERV_GROUP_NAME} property to the Table + * load balancer. + * * @since 2.1.0 */ public class TableLoadBalancer implements TabletBalancer { private static final Logger log = LoggerFactory.getLogger(TableLoadBalancer.class); + public static final String TABLE_ASSIGNMENT_GROUP_PROPERTY = "table.custom.assignment.group"; + protected BalancerEnvironment environment; Map perTableBalancers = new HashMap<>(); @@ -66,6 +84,14 @@ protected String getLoadBalancerClassNameForTable(TableId table) { return null; } + protected String getResourceGroupNameForTable(TableId tid) { + String resourceGroup = environment.getConfiguration(tid).get(TABLE_ASSIGNMENT_GROUP_PROPERTY); + if (!StringUtils.isEmpty(resourceGroup)) { + return resourceGroup; + } + return Constants.DEFAULT_RESOURCE_GROUP_NAME; + } + protected TabletBalancer getBalancerForTable(TableId tableId) { TabletBalancer balancer = perTableBalancers.get(tableId); @@ -106,6 +132,31 @@ protected TabletBalancer getBalancerForTable(TableId tableId) { return balancer; } + private SortedMap getCurrentSetForTable( + SortedMap allTServers, + Map> groupedTServers, String resourceGroup) { + + String groupName = + resourceGroup == null ? Constants.DEFAULT_RESOURCE_GROUP_NAME : resourceGroup; + Set tserversInGroup = groupedTServers.get(groupName); + if (tserversInGroup == null || tserversInGroup.isEmpty()) { + log.warn("No TabletServers in assignment group {}", groupName); + return null; + } + log.trace("{} TabletServers in group: {}", tserversInGroup.size(), groupName); + SortedMap group = new TreeMap<>(); + final String groupNameInUse = groupName; + tserversInGroup.forEach(tsid -> { + TServerStatus tss = allTServers.get(tsid); + if (tss == null) { + throw new IllegalStateException("TabletServer " + tsid + " in " + groupNameInUse + + " TabletServer group, but not in set of all TabletServers"); + } + group.put(tsid, tss); + }); + return group; + } + @Override public void getAssignments(AssignmentParameters params) { // separate the unassigned into tables @@ -113,10 +164,27 @@ public void getAssignments(AssignmentParameters params) { params.unassignedTablets().forEach((tid, lastTserver) -> groupedUnassigned .computeIfAbsent(tid.getTable(), k -> new HashMap<>()).put(tid, lastTserver)); for (Entry> e : groupedUnassigned.entrySet()) { - Map newAssignments = new HashMap<>(); - getBalancerForTable(e.getKey()).getAssignments( - new AssignmentParamsImpl(params.currentStatus(), e.getValue(), newAssignments)); - newAssignments.forEach(params::addAssignment); + final String tableResourceGroup = getResourceGroupNameForTable(e.getKey()); + log.trace("Table {} is set to use resource group: {}", e.getKey(), tableResourceGroup); + final Map newAssignments = new HashMap<>(); + // get the group of tservers for this table + final SortedMap groupedTServers = getCurrentSetForTable( + params.currentStatus(), params.currentResourceGroups(), tableResourceGroup); + if (groupedTServers == null) { + // group for table does not contain any tservers, warning already logged + continue; + } + getBalancerForTable(e.getKey()).getAssignments(new AssignmentParamsImpl(groupedTServers, + params.currentResourceGroups(), e.getValue(), newAssignments)); + + newAssignments.forEach((tid, tsid) -> { + if (!groupedTServers.containsKey(tsid)) { + log.warn( + "table balancer assigned {} to tablet server {} that is not in the assigned resource group {}", + tid, tsid, tableResourceGroup); + } + params.addAssignment(tid, tsid); + }); } } @@ -125,9 +193,18 @@ public long balance(BalanceParameters params) { long minBalanceTime = 5_000; // Iterate over the tables and balance each of them for (TableId tableId : environment.getTableIdMap().values()) { + final String tableResourceGroup = getResourceGroupNameForTable(tableId); + // get the group of tservers for this table + SortedMap groupedTServers = getCurrentSetForTable( + params.currentStatus(), params.currentResourceGroups(), tableResourceGroup); + if (groupedTServers == null) { + // group for table does not contain any tservers, warning already logged + continue; + } ArrayList newMigrations = new ArrayList<>(); - long tableBalanceTime = getBalancerForTable(tableId).balance( - new BalanceParamsImpl(params.currentStatus(), params.currentMigrations(), newMigrations)); + long tableBalanceTime = + getBalancerForTable(tableId).balance(new BalanceParamsImpl(groupedTServers, + params.currentResourceGroups(), params.currentMigrations(), newMigrations)); if (tableBalanceTime < minBalanceTime) { minBalanceTime = tableBalanceTime; } @@ -135,4 +212,13 @@ public long balance(BalanceParameters params) { } return minBalanceTime; } + + @Override + public String getResourceGroup(TabletId tabletId) { + String value = + environment.getConfiguration(tabletId.getTable()).get(TABLE_ASSIGNMENT_GROUP_PROPERTY); + return (value == null || StringUtils.isEmpty(value)) ? Constants.DEFAULT_RESOURCE_GROUP_NAME + : value; + } + } diff --git a/core/src/main/java/org/apache/accumulo/core/spi/balancer/TabletBalancer.java b/core/src/main/java/org/apache/accumulo/core/spi/balancer/TabletBalancer.java index a7dfcbdc2bb..f20cb3c8355 100644 --- a/core/src/main/java/org/apache/accumulo/core/spi/balancer/TabletBalancer.java +++ b/core/src/main/java/org/apache/accumulo/core/spi/balancer/TabletBalancer.java @@ -23,6 +23,7 @@ import java.util.Set; import java.util.SortedMap; +import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.TabletId; import org.apache.accumulo.core.spi.balancer.data.TServerStatus; @@ -67,6 +68,12 @@ interface AssignmentParameters { * Assigns {@code tabletId} to {@code tabletServerId}. */ void addAssignment(TabletId tabletId, TabletServerId tabletServerId); + + /** + * @return map of resource group name to set of TServerInstance objects + * @since 4.0.0 + */ + Map> currentResourceGroups(); } /** @@ -93,6 +100,13 @@ interface BalanceParameters { * migrations. */ List migrationsOut(); + + /** + * @return map of resource group name to set of TServerInstance objects + * @since 4.0.0 + */ + Map> currentResourceGroups(); + } /** @@ -119,4 +133,15 @@ interface BalanceParameters { * @return the time, in milliseconds, to wait before re-balancing. */ long balance(BalanceParameters params); + + /** + * Get the ResourceGroup name for this tablet + * + * @param tabletId id of tablet + * @return resource group name + * @since 4.0.0 + */ + default String getResourceGroup(TabletId tabletId) { + return Constants.DEFAULT_RESOURCE_GROUP_NAME; + } } diff --git a/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanServerSelector.java b/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanServerSelector.java index a39450e002d..8889c566803 100644 --- a/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanServerSelector.java +++ b/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanServerSelector.java @@ -25,9 +25,9 @@ import java.util.concurrent.TimeUnit; import java.util.function.Supplier; +import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.ScannerBase; import org.apache.accumulo.core.data.TabletId; -import org.apache.accumulo.core.lock.ServiceLockData; import org.apache.accumulo.core.spi.common.ServiceEnvironment; import com.google.common.base.Preconditions; @@ -47,7 +47,7 @@ public interface ScanServerSelector { /** * The scan server group name that will be used when one is not specified. */ - String DEFAULT_SCAN_SERVER_GROUP_NAME = ServiceLockData.ServiceDescriptor.DEFAULT_GROUP_NAME; + String DEFAULT_SCAN_SERVER_GROUP_NAME = Constants.DEFAULT_RESOURCE_GROUP_NAME; /** * This method is called once after a {@link ScanServerSelector} is instantiated. diff --git a/core/src/test/java/org/apache/accumulo/core/lock/ServiceLockDataTest.java b/core/src/test/java/org/apache/accumulo/core/lock/ServiceLockDataTest.java index 5f89af57b17..e050dcc492e 100644 --- a/core/src/test/java/org/apache/accumulo/core/lock/ServiceLockDataTest.java +++ b/core/src/test/java/org/apache/accumulo/core/lock/ServiceLockDataTest.java @@ -27,6 +27,7 @@ import java.util.Optional; import java.util.UUID; +import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.lock.ServiceLockData.ServiceDescriptor; import org.apache.accumulo.core.lock.ServiceLockData.ServiceDescriptors; import org.apache.accumulo.core.lock.ServiceLockData.ThriftService; @@ -40,11 +41,12 @@ public class ServiceLockDataTest { @Test public void testSingleServiceConstructor() throws Exception { - ServiceLockData ss = new ServiceLockData(serverUUID, "127.0.0.1", ThriftService.TSERV); + ServiceLockData ss = new ServiceLockData(serverUUID, "127.0.0.1", ThriftService.TSERV, + Constants.DEFAULT_RESOURCE_GROUP_NAME); assertEquals(serverUUID, ss.getServerUUID(ThriftService.TSERV)); assertEquals("127.0.0.1", ss.getAddressString(ThriftService.TSERV)); assertThrows(IllegalArgumentException.class, () -> ss.getAddress(ThriftService.TSERV)); - assertEquals(ServiceDescriptor.DEFAULT_GROUP_NAME, ss.getGroup(ThriftService.TSERV)); + assertEquals(Constants.DEFAULT_RESOURCE_GROUP_NAME, ss.getGroup(ThriftService.TSERV)); assertNull(ss.getServerUUID(ThriftService.TABLET_SCAN)); assertNull(ss.getAddressString(ThriftService.TABLET_SCAN)); assertThrows(NullPointerException.class, () -> ss.getAddress(ThriftService.TABLET_SCAN)); @@ -54,18 +56,20 @@ public void testSingleServiceConstructor() throws Exception { @Test public void testMultipleServiceConstructor() throws Exception { ServiceDescriptors sds = new ServiceDescriptors(); - sds.addService(new ServiceDescriptor(serverUUID, ThriftService.TSERV, "127.0.0.1:9997")); - sds.addService(new ServiceDescriptor(serverUUID, ThriftService.TABLET_SCAN, "127.0.0.1:9998")); + sds.addService(new ServiceDescriptor(serverUUID, ThriftService.TSERV, "127.0.0.1:9997", + Constants.DEFAULT_RESOURCE_GROUP_NAME)); + sds.addService(new ServiceDescriptor(serverUUID, ThriftService.TABLET_SCAN, "127.0.0.1:9998", + Constants.DEFAULT_RESOURCE_GROUP_NAME)); ServiceLockData ss = new ServiceLockData(sds); assertEquals(serverUUID, ss.getServerUUID(ThriftService.TSERV)); assertEquals("127.0.0.1:9997", ss.getAddressString(ThriftService.TSERV)); assertEquals(HostAndPort.fromString("127.0.0.1:9997"), ss.getAddress(ThriftService.TSERV)); - assertEquals(ServiceDescriptor.DEFAULT_GROUP_NAME, ss.getGroup(ThriftService.TSERV)); + assertEquals(Constants.DEFAULT_RESOURCE_GROUP_NAME, ss.getGroup(ThriftService.TSERV)); assertEquals(serverUUID, ss.getServerUUID(ThriftService.TABLET_SCAN)); assertEquals("127.0.0.1:9998", ss.getAddressString(ThriftService.TABLET_SCAN)); assertEquals(HostAndPort.fromString("127.0.0.1:9998"), ss.getAddress(ThriftService.TABLET_SCAN)); - assertEquals(ServiceDescriptor.DEFAULT_GROUP_NAME, ss.getGroup(ThriftService.TSERV)); + assertEquals(Constants.DEFAULT_RESOURCE_GROUP_NAME, ss.getGroup(ThriftService.TSERV)); } @Test diff --git a/core/src/test/java/org/apache/accumulo/core/spi/balancer/GroupBalancerTest.java b/core/src/test/java/org/apache/accumulo/core/spi/balancer/GroupBalancerTest.java index 3dfd120e481..45a1b84d5a2 100644 --- a/core/src/test/java/org/apache/accumulo/core/spi/balancer/GroupBalancerTest.java +++ b/core/src/test/java/org/apache/accumulo/core/spi/balancer/GroupBalancerTest.java @@ -33,6 +33,7 @@ import java.util.TreeMap; import java.util.function.Function; +import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.TabletId; import org.apache.accumulo.core.dataImpl.KeyExtent; @@ -119,7 +120,9 @@ public void balance(TabletBalancer balancer, int maxMigrations) { new org.apache.accumulo.core.manager.thrift.TabletServerStatus())); } - balancer.balance(new BalanceParamsImpl(current, migrations, migrationsOut)); + balancer.balance(new BalanceParamsImpl(current, + Map.of(Constants.DEFAULT_RESOURCE_GROUP_NAME, current.keySet()), migrations, + migrationsOut)); assertTrue(migrationsOut.size() <= (maxMigrations + 5), "Max Migration exceeded " + maxMigrations + " " + migrationsOut.size()); diff --git a/core/src/test/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancerReconfigurationTest.java b/core/src/test/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancerReconfigurationTest.java index f6b2123b6df..f9f7d873457 100644 --- a/core/src/test/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancerReconfigurationTest.java +++ b/core/src/test/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancerReconfigurationTest.java @@ -35,6 +35,7 @@ import java.util.Map.Entry; import java.util.Set; +import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.conf.ConfigurationCopy; import org.apache.accumulo.core.conf.SiteConfiguration; import org.apache.accumulo.core.data.TableId; @@ -81,6 +82,7 @@ public void testConfigurationChanges() { } this.getAssignments( new AssignmentParamsImpl(Collections.unmodifiableSortedMap(allTabletServers), + Map.of(Constants.DEFAULT_RESOURCE_GROUP_NAME, allTabletServers.keySet()), Collections.unmodifiableMap(unassigned), assignments)); assertEquals(15, assignments.size()); // Ensure unique tservers @@ -107,7 +109,8 @@ public void testConfigurationChanges() { // getOnlineTabletsForTable UtilWaitThread.sleep(3000); this.balance(new BalanceParamsImpl(Collections.unmodifiableSortedMap(allTabletServers), - migrations, migrationsOut)); + Map.of(Constants.DEFAULT_RESOURCE_GROUP_NAME, allTabletServers.keySet()), migrations, + migrationsOut)); assertEquals(0, migrationsOut.size()); // Change property, simulate call by TableConfWatcher @@ -116,7 +119,8 @@ public void testConfigurationChanges() { // Wait to trigger the out of bounds check and the repool check UtilWaitThread.sleep(10000); this.balance(new BalanceParamsImpl(Collections.unmodifiableSortedMap(allTabletServers), - migrations, migrationsOut)); + Map.of(Constants.DEFAULT_RESOURCE_GROUP_NAME, allTabletServers.keySet()), migrations, + migrationsOut)); assertEquals(5, migrationsOut.size()); for (TabletMigration migration : migrationsOut) { assertTrue(migration.getNewTabletServer().getHost().startsWith("192.168.0.1") diff --git a/core/src/test/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancerTest.java b/core/src/test/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancerTest.java index c70841e9821..52356af6f72 100644 --- a/core/src/test/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancerTest.java +++ b/core/src/test/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancerTest.java @@ -39,6 +39,7 @@ import java.util.SortedMap; import java.util.regex.Pattern; +import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.conf.ConfigurationCopy; import org.apache.accumulo.core.conf.SiteConfiguration; import org.apache.accumulo.core.data.TableId; @@ -96,9 +97,10 @@ public void testBalance() { init(DEFAULT_TABLE_PROPERTIES); Set migrations = new HashSet<>(); List migrationsOut = new ArrayList<>(); - long wait = - this.balance(new BalanceParamsImpl(Collections.unmodifiableSortedMap(createCurrent(15)), - migrations, migrationsOut)); + SortedMap current = createCurrent(15); + long wait = this.balance(new BalanceParamsImpl(current, + Map.of(Constants.DEFAULT_RESOURCE_GROUP_NAME, current.keySet()), migrations, + migrationsOut)); assertEquals(20000, wait); // should balance four tablets in one of the tables before reaching max assertEquals(4, migrationsOut.size()); @@ -108,8 +110,10 @@ public void testBalance() { migrations.add(m.getTablet()); } migrationsOut.clear(); - wait = this.balance(new BalanceParamsImpl(Collections.unmodifiableSortedMap(createCurrent(15)), - migrations, migrationsOut)); + SortedMap current2 = createCurrent(15); + wait = this.balance(new BalanceParamsImpl(current2, + Map.of(Constants.DEFAULT_RESOURCE_GROUP_NAME, current2.keySet()), migrations, + migrationsOut)); assertEquals(20000, wait); // should balance four tablets in one of the other tables before reaching max assertEquals(4, migrationsOut.size()); @@ -119,8 +123,10 @@ public void testBalance() { migrations.add(m.getTablet()); } migrationsOut.clear(); - wait = this.balance(new BalanceParamsImpl(Collections.unmodifiableSortedMap(createCurrent(15)), - migrations, migrationsOut)); + SortedMap current3 = createCurrent(15); + wait = this.balance(new BalanceParamsImpl(current3, + Map.of(Constants.DEFAULT_RESOURCE_GROUP_NAME, current3.keySet()), migrations, + migrationsOut)); assertEquals(20000, wait); // should balance four tablets in one of the other tables before reaching max assertEquals(4, migrationsOut.size()); @@ -130,8 +136,10 @@ public void testBalance() { migrations.add(m.getTablet()); } migrationsOut.clear(); - wait = this.balance(new BalanceParamsImpl(Collections.unmodifiableSortedMap(createCurrent(15)), - migrations, migrationsOut)); + SortedMap current4 = createCurrent(15); + wait = this.balance(new BalanceParamsImpl(current4, + Map.of(Constants.DEFAULT_RESOURCE_GROUP_NAME, current4.keySet()), migrations, + migrationsOut)); assertEquals(20000, wait); // no more balancing to do assertEquals(0, migrationsOut.size()); @@ -146,9 +154,10 @@ public void testBalanceWithTooManyOutstandingMigrations() { Set migrations = new HashSet<>(); migrations.addAll(tableTablets.get(FOO.getTableName())); migrations.addAll(tableTablets.get(BAR.getTableName())); - long wait = - this.balance(new BalanceParamsImpl(Collections.unmodifiableSortedMap(createCurrent(15)), - migrations, migrationsOut)); + SortedMap current = createCurrent(15); + long wait = this.balance(new BalanceParamsImpl(current, + Map.of(Constants.DEFAULT_RESOURCE_GROUP_NAME, current.keySet()), migrations, + migrationsOut)); assertEquals(20000, wait); // no migrations should have occurred as 10 is the maxOutstandingMigrations assertEquals(0, migrationsOut.size()); @@ -342,6 +351,7 @@ public void testAllUnassigned() { } this.getAssignments( new AssignmentParamsImpl(Collections.unmodifiableSortedMap(allTabletServers), + Map.of(Constants.DEFAULT_RESOURCE_GROUP_NAME, allTabletServers.keySet()), Collections.unmodifiableMap(unassigned), assignments)); assertEquals(15, assignments.size()); // Ensure unique tservers @@ -367,8 +377,10 @@ public void testAllUnassigned() { public void testAllAssigned() { init(DEFAULT_TABLE_PROPERTIES); Map assignments = new HashMap<>(); - this.getAssignments(new AssignmentParamsImpl( - Collections.unmodifiableSortedMap(allTabletServers), Map.of(), assignments)); + this.getAssignments( + new AssignmentParamsImpl(Collections.unmodifiableSortedMap(allTabletServers), + Map.of(Constants.DEFAULT_RESOURCE_GROUP_NAME, allTabletServers.keySet()), Map.of(), + assignments)); assertEquals(0, assignments.size()); } @@ -388,6 +400,7 @@ public void testPartiallyAssigned() { } this.getAssignments( new AssignmentParamsImpl(Collections.unmodifiableSortedMap(allTabletServers), + Map.of(Constants.DEFAULT_RESOURCE_GROUP_NAME, allTabletServers.keySet()), Collections.unmodifiableMap(unassigned), assignments)); assertEquals(unassigned.size(), assignments.size()); // Ensure unique tservers @@ -432,6 +445,7 @@ public void testUnassignedWithNoTServers() { current.remove(r); } this.getAssignments(new AssignmentParamsImpl(Collections.unmodifiableSortedMap(current), + Map.of(Constants.DEFAULT_RESOURCE_GROUP_NAME, allTabletServers.keySet()), Collections.unmodifiableMap(unassigned), assignments)); assertEquals(unassigned.size(), assignments.size()); // Ensure assignments are correct @@ -474,6 +488,7 @@ public void testUnassignedWithNoDefaultPool() { } this.getAssignments(new AssignmentParamsImpl(Collections.unmodifiableSortedMap(current), + Map.of(Constants.DEFAULT_RESOURCE_GROUP_NAME, allTabletServers.keySet()), Collections.unmodifiableMap(unassigned), assignments)); assertEquals(unassigned.size(), assignments.size()); @@ -493,7 +508,10 @@ public void testOutOfBoundsTablets() { UtilWaitThread.sleep(11000); Set migrations = new HashSet<>(); List migrationsOut = new ArrayList<>(); - this.balance(new BalanceParamsImpl(createCurrent(15), migrations, migrationsOut)); + SortedMap current = createCurrent(15); + this.balance(new BalanceParamsImpl(current, + Map.of(Constants.DEFAULT_RESOURCE_GROUP_NAME, current.keySet()), migrations, + migrationsOut)); assertEquals(2, migrationsOut.size()); } diff --git a/core/src/test/java/org/apache/accumulo/core/spi/balancer/SimpleLoadBalancerTest.java b/core/src/test/java/org/apache/accumulo/core/spi/balancer/SimpleLoadBalancerTest.java index 9b7c2b7a56f..0fd617606ea 100644 --- a/core/src/test/java/org/apache/accumulo/core/spi/balancer/SimpleLoadBalancerTest.java +++ b/core/src/test/java/org/apache/accumulo/core/spi/balancer/SimpleLoadBalancerTest.java @@ -33,6 +33,7 @@ import java.util.SortedMap; import java.util.TreeMap; +import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.TabletId; import org.apache.accumulo.core.dataImpl.KeyExtent; @@ -202,7 +203,10 @@ public void testUnevenAssignment() { // balance until we can't balance no more! while (true) { List migrationsOut = new ArrayList<>(); - balancer.balance(new BalanceParamsImpl(getAssignments(servers), migrations, migrationsOut)); + SortedMap tservers = getAssignments(servers); + balancer.balance(new BalanceParamsImpl(tservers, + Map.of(Constants.DEFAULT_RESOURCE_GROUP_NAME, tservers.keySet()), migrations, + migrationsOut)); if (migrationsOut.isEmpty()) { break; } @@ -244,7 +248,10 @@ public void testUnevenAssignment2() { // balance until we can't balance no more! while (true) { List migrationsOut = new ArrayList<>(); - balancer.balance(new BalanceParamsImpl(getAssignments(servers), migrations, migrationsOut)); + SortedMap tservers = getAssignments(servers); + balancer.balance(new BalanceParamsImpl(tservers, + Map.of(Constants.DEFAULT_RESOURCE_GROUP_NAME, tservers.keySet()), migrations, + migrationsOut)); if (migrationsOut.isEmpty()) { break; } diff --git a/core/src/test/java/org/apache/accumulo/core/spi/balancer/TableLoadBalancerTest.java b/core/src/test/java/org/apache/accumulo/core/spi/balancer/TableLoadBalancerTest.java index 41af948f38d..13b55931ca0 100644 --- a/core/src/test/java/org/apache/accumulo/core/spi/balancer/TableLoadBalancerTest.java +++ b/core/src/test/java/org/apache/accumulo/core/spi/balancer/TableLoadBalancerTest.java @@ -34,6 +34,7 @@ import java.util.TreeMap; import java.util.stream.Collectors; +import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.conf.ConfigurationCopy; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.TableId; @@ -141,13 +142,15 @@ public void test() { List migrationsOut = new ArrayList<>(); TableLoadBalancer tls = new TableLoadBalancer(); tls.init(environment); - tls.balance(new BalanceParamsImpl(state, migrations, migrationsOut)); + tls.balance(new BalanceParamsImpl(state, + Map.of(Constants.DEFAULT_RESOURCE_GROUP_NAME, state.keySet()), migrations, migrationsOut)); assertEquals(0, migrationsOut.size()); state.put(mkts("10.0.0.2", 2345, "0x02030405"), status()); tls = new TableLoadBalancer(); tls.init(environment); - tls.balance(new BalanceParamsImpl(state, migrations, migrationsOut)); + tls.balance(new BalanceParamsImpl(state, + Map.of(Constants.DEFAULT_RESOURCE_GROUP_NAME, state.keySet()), migrations, migrationsOut)); int count = 0; Map movedByTable = new HashMap<>(); movedByTable.put(TableId.of(t1Id), 0); diff --git a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/ClusterServerConfiguration.java b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/ClusterServerConfiguration.java index 3a5d93e34a4..13fa4b93891 100644 --- a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/ClusterServerConfiguration.java +++ b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/ClusterServerConfiguration.java @@ -23,7 +23,7 @@ import java.util.Iterator; import java.util.Map; -import org.apache.accumulo.core.lock.ServiceLockData; +import org.apache.accumulo.core.Constants; public class ClusterServerConfiguration { @@ -48,23 +48,23 @@ public ClusterServerConfiguration() { */ public ClusterServerConfiguration(int numCompactors, int numSServers, int numTServers) { compactors = new HashMap<>(); - compactors.put(ServiceLockData.ServiceDescriptor.DEFAULT_GROUP_NAME, numCompactors); + compactors.put(Constants.DEFAULT_RESOURCE_GROUP_NAME, numCompactors); sservers = new HashMap<>(); - sservers.put(ServiceLockData.ServiceDescriptor.DEFAULT_GROUP_NAME, numSServers); + sservers.put(Constants.DEFAULT_RESOURCE_GROUP_NAME, numSServers); tservers = new HashMap<>(); - tservers.put(ServiceLockData.ServiceDescriptor.DEFAULT_GROUP_NAME, numTServers); + tservers.put(Constants.DEFAULT_RESOURCE_GROUP_NAME, numTServers); } public void setNumDefaultCompactors(int numCompactors) { - compactors.put(ServiceLockData.ServiceDescriptor.DEFAULT_GROUP_NAME, numCompactors); + compactors.put(Constants.DEFAULT_RESOURCE_GROUP_NAME, numCompactors); } public void setNumDefaultScanServers(int numSServers) { - sservers.put(ServiceLockData.ServiceDescriptor.DEFAULT_GROUP_NAME, numSServers); + sservers.put(Constants.DEFAULT_RESOURCE_GROUP_NAME, numSServers); } public void setNumDefaultTabletServers(int numTServers) { - tservers.put(ServiceLockData.ServiceDescriptor.DEFAULT_GROUP_NAME, numTServers); + tservers.put(Constants.DEFAULT_RESOURCE_GROUP_NAME, numTServers); } public void addCompactorResourceGroup(String resourceGroupName, int numCompactors) { @@ -95,7 +95,17 @@ public void clearCompactorResourceGroups() { Iterator iter = compactors.keySet().iterator(); while (iter.hasNext()) { String resourceGroup = iter.next(); - if (!resourceGroup.equals(ServiceLockData.ServiceDescriptor.DEFAULT_GROUP_NAME)) { + if (!resourceGroup.equals(Constants.DEFAULT_RESOURCE_GROUP_NAME)) { + iter.remove(); + } + } + } + + public void clearTServerResourceGroups() { + Iterator iter = tservers.keySet().iterator(); + while (iter.hasNext()) { + String resourceGroup = iter.next(); + if (!resourceGroup.equals(Constants.DEFAULT_RESOURCE_GROUP_NAME)) { iter.remove(); } } diff --git a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java index cfac24841c6..73eb3f01bb3 100644 --- a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java +++ b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java @@ -180,7 +180,8 @@ public synchronized void start(ServerType server, Map configOverr tabletServerProcesses.computeIfAbsent(e.getKey(), k -> new ArrayList<>()); int count = 0; for (int i = processes.size(); count < limit && i < e.getValue(); i++, ++count) { - processes.add(cluster._exec(classToUse, server, configOverrides).getProcess()); + processes.add(cluster._exec(classToUse, server, configOverrides, "-o", + Property.TSERV_GROUP_NAME.getKey() + "=" + e.getKey()).getProcess()); } } } @@ -216,7 +217,8 @@ public synchronized void start(ServerType server, Map configOverr scanServerProcesses.computeIfAbsent(e.getKey(), k -> new ArrayList<>()); int count = 0; for (int i = processes.size(); count < limit && i < e.getValue(); i++, ++count) { - processes.add(cluster._exec(classToUse, server, configOverrides).getProcess()); + processes.add(cluster._exec(classToUse, server, configOverrides, "-o", + Property.SSERV_GROUP_NAME.getKey() + "=" + e.getKey()).getProcess()); } } } @@ -230,9 +232,8 @@ public synchronized void start(ServerType server, Map configOverr compactorProcesses.computeIfAbsent(e.getKey(), k -> new ArrayList<>()); int count = 0; for (int i = processes.size(); count < limit && i < e.getValue(); i++, ++count) { - processes.add(cluster - .exec(classToUse, "-o", Property.COMPACTOR_QUEUE_NAME.getKey() + "=" + e.getKey()) - .getProcess()); + processes.add(cluster._exec(classToUse, server, configOverrides, "-o", + Property.COMPACTOR_QUEUE_NAME.getKey() + "=" + e.getKey()).getProcess()); } } } diff --git a/minicluster/src/test/java/org/apache/accumulo/minicluster/MiniAccumuloClusterTest.java b/minicluster/src/test/java/org/apache/accumulo/minicluster/MiniAccumuloClusterTest.java index e1229c418ff..b40a71fd3ff 100644 --- a/minicluster/src/test/java/org/apache/accumulo/minicluster/MiniAccumuloClusterTest.java +++ b/minicluster/src/test/java/org/apache/accumulo/minicluster/MiniAccumuloClusterTest.java @@ -187,7 +187,7 @@ public void test() throws Exception { public void testDebugPorts() { Set> debugPorts = accumulo.getDebugPorts(); - assertEquals(6, debugPorts.size()); + assertEquals(10, debugPorts.size()); for (Pair debugPort : debugPorts) { assertTrue(debugPort.getSecond() > 0); } diff --git a/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java b/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java index 89666879837..6d4355b567c 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java +++ b/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java @@ -27,6 +27,7 @@ import org.apache.accumulo.core.cli.ConfigOpts; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.conf.SiteConfiguration; import org.apache.accumulo.core.metrics.MetricsProducer; import org.apache.accumulo.core.metrics.MetricsUtil; import org.apache.accumulo.core.trace.TraceUtil; @@ -44,6 +45,7 @@ public abstract class AbstractServer implements AutoCloseable, MetricsProducer, private final ServerContext context; protected final String applicationName; private final String hostname; + private final String resourceGroup; private final ProcessMetrics processMetrics; @@ -52,6 +54,7 @@ protected AbstractServer(String appName, ConfigOpts opts, String[] args) { opts.parseArgs(appName, args); var siteConfig = opts.getSiteConfiguration(); this.hostname = siteConfig.get(Property.GENERAL_PROCESS_BIND_ADDRESS); + this.resourceGroup = getResourceGroupPropertyValue(siteConfig); SecurityUtil.serverLogin(siteConfig); context = new ServerContext(siteConfig); Logger log = LoggerFactory.getLogger(getClass()); @@ -72,6 +75,14 @@ protected AbstractServer(String appName, ConfigOpts opts, String[] args) { processMetrics = new ProcessMetrics(context); } + protected String getResourceGroupPropertyValue(SiteConfiguration conf) { + return Constants.DEFAULT_RESOURCE_GROUP_NAME; + } + + public String getResourceGroup() { + return resourceGroup; + } + /** * Run this server in a main thread */ diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java b/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java index e4af38f93f0..3cca4f8506f 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java @@ -232,20 +232,24 @@ public boolean isActive(long tid) throws TException { static class TServerInfo { TServerConnection connection; TServerInstance instance; + String resourceGroup; - TServerInfo(TServerInstance instance, TServerConnection connection) { + TServerInfo(TServerInstance instance, TServerConnection connection, String resourceGroup) { this.connection = connection; this.instance = instance; + this.resourceGroup = resourceGroup; } } // The set of active tservers with locks, indexed by their name in zookeeper - private Map current = new HashMap<>(); + private final Map current = new HashMap<>(); // as above, indexed by TServerInstance - private Map currentInstances = new HashMap<>(); + private final Map currentInstances = new HashMap<>(); + // as above, grouped by resource group name + private final Map> currentGroups = new HashMap<>(); // The set of entries in zookeeper without locks, and the first time each was noticed - private Map locklessServers = new HashMap<>(); + private final Map locklessServers = new HashMap<>(); public LiveTServerSet(ServerContext context, Listener cback) { this.cback = cback; @@ -314,6 +318,7 @@ private synchronized void checkServer(final Set updates, doomed.add(info.instance); current.remove(zPath); currentInstances.remove(info.instance); + currentGroups.get(info.resourceGroup).remove(info.instance); } Long firstSeen = locklessServers.get(zPath); @@ -326,20 +331,26 @@ private synchronized void checkServer(final Set updates, } else { locklessServers.remove(zPath); HostAndPort client = sld.orElseThrow().getAddress(ServiceLockData.ThriftService.TSERV); + String resourceGroup = sld.orElseThrow().getGroup(ServiceLockData.ThriftService.TSERV); TServerInstance instance = new TServerInstance(client, stat.getEphemeralOwner()); if (info == null) { updates.add(instance); - TServerInfo tServerInfo = new TServerInfo(instance, new TServerConnection(client)); + TServerInfo tServerInfo = + new TServerInfo(instance, new TServerConnection(client), resourceGroup); current.put(zPath, tServerInfo); currentInstances.put(instance, tServerInfo); + currentGroups.computeIfAbsent(resourceGroup, rg -> new HashSet<>()).add(instance); } else if (!info.instance.equals(instance)) { doomed.add(info.instance); updates.add(instance); - TServerInfo tServerInfo = new TServerInfo(instance, new TServerConnection(client)); + TServerInfo tServerInfo = + new TServerInfo(instance, new TServerConnection(client), resourceGroup); current.put(zPath, tServerInfo); currentInstances.remove(info.instance); + currentGroups.getOrDefault(resourceGroup, new HashSet<>()).remove(instance); currentInstances.put(instance, tServerInfo); + currentGroups.computeIfAbsent(resourceGroup, rg -> new HashSet<>()).add(instance); } } } @@ -393,6 +404,12 @@ public synchronized Set getCurrentServers() { return new HashSet<>(currentInstances.keySet()); } + public synchronized Map> getCurrentServersGroups() { + Map> copy = new HashMap<>(); + currentGroups.forEach((k, v) -> copy.put(k, new HashSet<>(v))); + return copy; + } + public synchronized int size() { return current.size(); } diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/state/CurrentState.java b/server/base/src/main/java/org/apache/accumulo/server/manager/state/CurrentState.java index 397f9b4d0ec..86b6998d299 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/state/CurrentState.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/state/CurrentState.java @@ -19,6 +19,7 @@ package org.apache.accumulo.server.manager.state; import java.util.Collection; +import java.util.Map; import java.util.Set; import org.apache.accumulo.core.data.TableId; @@ -32,6 +33,8 @@ public interface CurrentState { Set onlineTabletServers(); + Map> tServerResourceGroups(); + Set shutdownServers(); Collection merges(); diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java index 9abbdc30fd9..7f4b1e56bfb 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java @@ -32,10 +32,13 @@ import java.util.Map.Entry; import java.util.Set; import java.util.SortedMap; +import java.util.stream.Collectors; import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.ScannerBase; import org.apache.accumulo.core.client.admin.TabletHostingGoal; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.ConfigurationCopy; import org.apache.accumulo.core.conf.ConfigurationTypeHelper; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Key; @@ -46,6 +49,7 @@ import org.apache.accumulo.core.iterators.SkippingIterator; import org.apache.accumulo.core.iterators.SortedKeyValueIterator; import org.apache.accumulo.core.iterators.user.WholeRowIterator; +import org.apache.accumulo.core.manager.balancer.TabletServerIdImpl; import org.apache.accumulo.core.manager.state.TabletManagement; import org.apache.accumulo.core.manager.state.TabletManagement.ManagementAction; import org.apache.accumulo.core.manager.thrift.ManagerState; @@ -64,9 +68,14 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.SuspendLocationColumn; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily; import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.spi.balancer.SimpleLoadBalancer; +import org.apache.accumulo.core.spi.balancer.TabletBalancer; +import org.apache.accumulo.core.spi.balancer.data.TabletServerId; import org.apache.accumulo.core.spi.compaction.CompactionKind; import org.apache.accumulo.core.util.AddressUtil; import org.apache.accumulo.server.compaction.CompactionJobGenerator; +import org.apache.accumulo.server.iterators.TabletIteratorEnvironment; +import org.apache.accumulo.server.manager.balancer.BalancerEnvironmentImpl; import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.io.DataOutputBuffer; import org.slf4j.Logger; @@ -87,11 +96,13 @@ public class TabletManagementIterator extends SkippingIterator { private static final String SERVERS_OPTION = "servers"; private static final String TABLES_OPTION = "tables"; private static final String MERGES_OPTION = "merges"; - private static final String DEBUG_OPTION = "debug"; private static final String MIGRATIONS_OPTION = "migrations"; private static final String MANAGER_STATE_OPTION = "managerState"; private static final String SHUTTING_DOWN_OPTION = "shuttingDown"; + private static final String RESOURCE_GROUPS = "resourceGroups"; + private static final String TSERVER_GROUP_PREFIX = "serverGroups_"; private CompactionJobGenerator compactionGenerator; + private TabletBalancer balancer; private static void setCurrentServers(final IteratorSetting cfg, final Set goodServers) { @@ -153,6 +164,34 @@ private static void setShuttingDown(final IteratorSetting cfg, } } + private static void setTServerResourceGroups(final IteratorSetting cfg, + Map> tServerResourceGroups) { + if (tServerResourceGroups == null) { + return; + } + cfg.addOption(RESOURCE_GROUPS, Joiner.on(",").join(tServerResourceGroups.keySet())); + for (Entry> entry : tServerResourceGroups.entrySet()) { + cfg.addOption(TSERVER_GROUP_PREFIX + entry.getKey(), Joiner.on(",").join(entry.getValue())); + } + } + + private static Map> + parseTServerResourceGroups(Map options) { + Map> resourceGroups = new HashMap<>(); + String groups = options.get(RESOURCE_GROUPS); + if (groups != null) { + for (String groupName : groups.split(",")) { + String groupServers = options.get(TSERVER_GROUP_PREFIX + groupName); + if (groupServers != null) { + Set servers = parseServers(groupServers); + resourceGroups.put(groupName, + servers.stream().map(s -> new TabletServerIdImpl(s)).collect(Collectors.toSet())); + } + } + } + return resourceGroups; + } + private static Set parseMigrations(final String migrations) { Set result = new HashSet<>(); if (migrations != null) { @@ -229,7 +268,7 @@ private static boolean shouldReturnDueToSplit(final TabletMetadata tm, } private boolean shouldReturnDueToLocation(final TabletMetadata tm, - final Set onlineTables, final Set current, final boolean debug) { + final Set onlineTables, final Set current) { if (migrations.contains(tm.getExtent())) { return true; @@ -239,8 +278,8 @@ private boolean shouldReturnDueToLocation(final TabletMetadata tm, final boolean shouldBeOnline = onlineTables.contains(tm.getTableId()) && tm.getOperationId() == null; - TabletState state = tm.getTabletState(current); - if (debug) { + TabletState state = tm.getTabletState(current, balancer, tserverResourceGroups); + if (LOG.isDebugEnabled()) { LOG.debug("{} is {}. Table is {}line. Tablet hosting goal is {}, hostingRequested: {}", tm.getExtent(), state, (shouldBeOnline ? "on" : "off"), tm.getHostingGoal(), tm.getHostingRequested()); @@ -256,6 +295,7 @@ private boolean shouldReturnDueToLocation(final TabletMetadata tm, } break; case ASSIGNED_TO_DEAD_SERVER: + case ASSIGNED_TO_WRONG_GROUP: return true; case SUSPENDED: case UNASSIGNED: @@ -299,6 +339,8 @@ public static void configureScanner(final ScannerBase scanner, final CurrentStat Sets.union(state.migrationsSnapshot(), state.getUnassignmentRequest())); TabletManagementIterator.setManagerState(tabletChange, state.getManagerState()); TabletManagementIterator.setShuttingDown(tabletChange, state.shutdownServers()); + TabletManagementIterator.setTServerResourceGroups(tabletChange, + state.tServerResourceGroups()); } scanner.addScanIterator(tabletChange); } @@ -309,8 +351,8 @@ public static TabletManagement decode(Entry e) throws IOException { private final Set current = new HashSet<>(); private final Set onlineTables = new HashSet<>(); + private final Map> tserverResourceGroups = new HashMap<>(); private final Map merges = new HashMap<>(); - private boolean debug = false; private final Set migrations = new HashSet<>(); private ManagerState managerState = ManagerState.NORMAL; private IteratorEnvironment env; @@ -324,8 +366,8 @@ public void init(SortedKeyValueIterator source, Map op this.env = env; current.addAll(parseServers(options.get(SERVERS_OPTION))); onlineTables.addAll(parseTableIDs(options.get(TABLES_OPTION))); + tserverResourceGroups.putAll(parseTServerResourceGroups(options)); merges.putAll(parseMerges(options.get(MERGES_OPTION))); - debug = options.containsKey(DEBUG_OPTION); migrations.addAll(parseMigrations(options.get(MIGRATIONS_OPTION))); String managerStateOptionValue = options.get(MANAGER_STATE_OPTION); try { @@ -340,6 +382,13 @@ public void init(SortedKeyValueIterator source, Map op current.removeAll(shuttingDown); } compactionGenerator = new CompactionJobGenerator(env.getPluginEnv()); + final AccumuloConfiguration conf = new ConfigurationCopy(env.getPluginEnv().getConfiguration()); + BalancerEnvironmentImpl benv = + new BalancerEnvironmentImpl(((TabletIteratorEnvironment) env).getServerContext()); + balancer = Property.createInstanceFromPropertyName(conf, Property.MANAGER_TABLET_BALANCER, + TabletBalancer.class, new SimpleLoadBalancer()); + balancer.init(benv); + LOG.debug("Balancer is set to {}", balancer.getClass().getSimpleName()); } @Override @@ -417,25 +466,29 @@ private void computeTabletManagementActions(final TabletMetadata tm, reasonsToReturnThisTablet.add(ManagementAction.IS_MERGING); } - if (shouldReturnDueToLocation(tm, onlineTables, current, debug)) { + if (shouldReturnDueToLocation(tm, onlineTables, current)) { reasonsToReturnThisTablet.add(ManagementAction.NEEDS_LOCATION_UPDATE); } if (tm.getOperationId() == null) { - final long splitThreshold = - ConfigurationTypeHelper.getFixedMemoryAsBytes(this.env.getPluginEnv() - .getConfiguration(tm.getTableId()).get(Property.TABLE_SPLIT_THRESHOLD.getKey())); - if (shouldReturnDueToSplit(tm, splitThreshold)) { - reasonsToReturnThisTablet.add(ManagementAction.NEEDS_SPLITTING); - } - - // important to call this since reasonsToReturnThisTablet is passed to it - if (!compactionGenerator.generateJobs(tm, determineCompactionKinds(reasonsToReturnThisTablet)) - .isEmpty()) { - reasonsToReturnThisTablet.add(ManagementAction.NEEDS_COMPACTING); + try { + final long splitThreshold = + ConfigurationTypeHelper.getFixedMemoryAsBytes(this.env.getPluginEnv() + .getConfiguration(tm.getTableId()).get(Property.TABLE_SPLIT_THRESHOLD.getKey())); + if (shouldReturnDueToSplit(tm, splitThreshold)) { + reasonsToReturnThisTablet.add(ManagementAction.NEEDS_SPLITTING); + } + // important to call this since reasonsToReturnThisTablet is passed to it + if (!compactionGenerator + .generateJobs(tm, determineCompactionKinds(reasonsToReturnThisTablet)).isEmpty()) { + reasonsToReturnThisTablet.add(ManagementAction.NEEDS_COMPACTING); + } + } catch (NullPointerException e) { + LOG.info( + "Unable to determine if tablet {} should split or compact, maybe table was deleted?", + tm.getExtent()); } } - } private static final Set ALL_COMPACTION_KINDS = diff --git a/server/base/src/test/java/org/apache/accumulo/server/manager/LiveTServerSetTest.java b/server/base/src/test/java/org/apache/accumulo/server/manager/LiveTServerSetTest.java index f67b7cb80da..20326d4bf0c 100644 --- a/server/base/src/test/java/org/apache/accumulo/server/manager/LiveTServerSetTest.java +++ b/server/base/src/test/java/org/apache/accumulo/server/manager/LiveTServerSetTest.java @@ -24,6 +24,7 @@ import java.util.HashMap; import java.util.Map; +import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.metadata.TServerInstance; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.manager.LiveTServerSet.Listener; @@ -41,8 +42,9 @@ public void testSessionIds() { Map servers = new HashMap<>(); TServerConnection mockConn = EasyMock.createMock(TServerConnection.class); - TServerInfo server1 = new TServerInfo( - new TServerInstance(HostAndPort.fromParts("localhost", 1234), "5555"), mockConn); + TServerInfo server1 = + new TServerInfo(new TServerInstance(HostAndPort.fromParts("localhost", 1234), "5555"), + mockConn, Constants.DEFAULT_RESOURCE_GROUP_NAME); servers.put("server1", server1); LiveTServerSet tservers = new LiveTServerSet(EasyMock.createMock(ServerContext.class), diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java index df295cb9124..52ae273aaaf 100644 --- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java @@ -59,6 +59,7 @@ import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.ConfigurationCopy; import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.conf.SiteConfiguration; import org.apache.accumulo.core.data.NamespaceId; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; @@ -128,7 +129,6 @@ public class Compactor extends AbstractServer implements MetricsProducer, Compac private final UUID compactorId = UUID.randomUUID(); private final AccumuloConfiguration aconf; - private final String queueName; protected final AtomicReference currentCompactionId = new AtomicReference<>(); private final CompactionWatcher watcher; @@ -150,7 +150,6 @@ protected Compactor(ConfigOpts opts, String[] args) { protected Compactor(ConfigOpts opts, String[] args, AccumuloConfiguration conf) { super("compactor", opts, args); aconf = conf == null ? super.getConfiguration() : conf; - queueName = aconf.get(Property.COMPACTOR_QUEUE_NAME); setupSecurity(); watcher = new CompactionWatcher(aconf); var schedExecutor = @@ -159,6 +158,11 @@ protected Compactor(ConfigOpts opts, String[] args, AccumuloConfiguration conf) printStartupMsg(); } + @Override + protected String getResourceGroupPropertyValue(SiteConfiguration conf) { + return conf.get(Property.COMPACTOR_QUEUE_NAME); + } + @Override public AccumuloConfiguration getConfiguration() { return aconf; @@ -243,7 +247,7 @@ protected void announceExistence(HostAndPort clientAddress) ZooReaderWriter zoo = getContext().getZooReaderWriter(); String compactorQueuePath = - getContext().getZooKeeperRoot() + Constants.ZCOMPACTORS + "/" + this.queueName; + getContext().getZooKeeperRoot() + Constants.ZCOMPACTORS + "/" + this.getResourceGroup(); String zPath = compactorQueuePath + "/" + hostPort; try { @@ -278,8 +282,8 @@ public void unableToMonitorLockNode(final Exception e) { for (int i = 0; i < 25; i++) { zoo.putPersistentData(zPath, new byte[0], NodeExistsPolicy.SKIP); - if (compactorLock.tryLock(lw, - new ServiceLockData(compactorId, hostPort, ThriftService.COMPACTOR, this.queueName))) { + if (compactorLock.tryLock(lw, new ServiceLockData(compactorId, hostPort, + ThriftService.COMPACTOR, this.getResourceGroup()))) { LOG.debug("Obtained Compactor lock {}", compactorLock.getLockPath()); return; } @@ -433,7 +437,7 @@ protected TExternalCompactionJob getNextJob(Supplier uuid) throws RetriesE LOG.trace("Attempting to get next job, eci = {}", eci); currentCompactionId.set(eci); return coordinatorClient.getCompactionJob(TraceUtil.traceInfo(), - getContext().rpcCreds(), queueName, + getContext().rpcCreds(), this.getResourceGroup(), ExternalCompactionUtil.getHostPortString(compactorAddress.getAddress()), eci.toString()); } catch (Exception e) { @@ -516,7 +520,7 @@ protected Runnable createCompactionJob(final TExternalCompactionJob job, job.getIteratorSettings().getIterators() .forEach(tis -> iters.add(SystemIteratorUtil.toIteratorSetting(tis))); - ExtCEnv cenv = new ExtCEnv(JOB_HOLDER, queueName); + ExtCEnv cenv = new ExtCEnv(JOB_HOLDER, this.getResourceGroup()); FileCompactor compactor = new FileCompactor(getContext(), extent, files, outputFile, job.isPropagateDeletes(), cenv, iters, aConfig, tConfig.getCryptoService(), pausedMetrics); @@ -564,7 +568,8 @@ protected Supplier getNextId() { protected long getWaitTimeBetweenCompactionChecks() { // get the total number of compactors assigned to this queue - int numCompactors = ExternalCompactionUtil.countCompactors(queueName, getContext()); + int numCompactors = + ExternalCompactionUtil.countCompactors(this.getResourceGroup(), getContext()); // Aim for around 3 compactors checking in every second long sleepTime = numCompactors * 1000L / 3; // Ensure a compactor sleeps at least around a second @@ -619,7 +624,7 @@ public void run() { try { job = getNextJob(getNextId()); if (!job.isSetExternalCompactionId()) { - LOG.trace("No external compactions in queue {}", this.queueName); + LOG.trace("No external compactions in queue {}", this.getResourceGroup()); UtilWaitThread.sleep(getWaitTimeBetweenCompactionChecks()); continue; } diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java index 1946ffee3b0..c744b559f30 100644 --- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java @@ -355,8 +355,8 @@ public void unableToMonitorLockNode(final Exception e) { while (true) { ServiceLock lock = new ServiceLock(getContext().getZooReaderWriter().getZooKeeper(), path, zooLockUUID); - if (lock.tryLock(lockWatcher, - new ServiceLockData(zooLockUUID, addr.toString(), ThriftService.GC))) { + if (lock.tryLock(lockWatcher, new ServiceLockData(zooLockUUID, addr.toString(), + ThriftService.GC, this.getResourceGroup()))) { log.debug("Got GC ZooKeeper lock"); return; } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index e29637b2a5b..11524752e8b 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -19,6 +19,7 @@ package org.apache.accumulo.manager; import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; +import static java.util.Collections.emptyMap; import static java.util.Collections.emptySortedMap; import static java.util.concurrent.TimeUnit.HOURS; import static java.util.concurrent.TimeUnit.MILLISECONDS; @@ -210,7 +211,7 @@ public class Manager extends AbstractServer ServiceLock managerLock = null; private TServer clientService = null; - private volatile TabletBalancer tabletBalancer; + protected volatile TabletBalancer tabletBalancer; private final BalancerEnvironment balancerEnvironment; private ManagerState state = ManagerState.INITIAL; @@ -223,6 +224,8 @@ public class Manager extends AbstractServer volatile SortedMap tserverStatus = emptySortedMap(); volatile SortedMap tserverStatusForBalancer = emptySortedMap(); + volatile Map> tServerGroupingForBalancer = emptyMap(); + // ELASTICITY_TODO is this still needed? final ServerBulkImportStatus bulkImportStatus = new ServerBulkImportStatus(); @@ -939,6 +942,8 @@ private long updateStatus() { TreeMap temp = new TreeMap<>(); tserverStatus = gatherTableInformation(currentServers, temp); tserverStatusForBalancer = Collections.unmodifiableSortedMap(temp); + tServerGroupingForBalancer = + Collections.unmodifiableMap(tserverSet.getCurrentServersGroups()); checkForHeldServer(tserverStatus); if (!badServers.isEmpty()) { @@ -992,7 +997,7 @@ private void checkForHeldServer(SortedMap ts private long balanceTablets() { BalanceParamsImpl params = BalanceParamsImpl.fromThrift(tserverStatusForBalancer, - tserverStatus, migrationsSnapshot()); + tServerGroupingForBalancer, tserverStatus, migrationsSnapshot()); long wait = tabletBalancer.balance(params); for (TabletMigration m : checkMigrationSanity(tserverStatusForBalancer.keySet(), @@ -1344,7 +1349,7 @@ boolean canSuspendTablets() { ServiceDescriptors descriptors = new ServiceDescriptors(); for (ThriftService svc : new ThriftService[] {ThriftService.MANAGER, ThriftService.COORDINATOR}) { - descriptors.addService(new ServiceDescriptor(uuid, svc, address)); + descriptors.addService(new ServiceDescriptor(uuid, svc, address, this.getResourceGroup())); } sld = new ServiceLockData(descriptors); @@ -1575,7 +1580,8 @@ private ServiceLockData getManagerLock(final ServiceLockPath zManagerLoc) ServiceDescriptors descriptors = new ServiceDescriptors(); for (ThriftService svc : new ThriftService[] {ThriftService.MANAGER, ThriftService.COORDINATOR}) { - descriptors.addService(new ServiceDescriptor(zooLockUUID, svc, managerClientAddress)); + descriptors.addService( + new ServiceDescriptor(zooLockUUID, svc, managerClientAddress, this.getResourceGroup())); } ServiceLockData sld = new ServiceLockData(descriptors); @@ -1727,6 +1733,11 @@ public Set onlineTabletServers() { return tserverSet.getCurrentServers(); } + @Override + public Map> tServerResourceGroups() { + return tserverSet.getCurrentServersGroups(); + } + @Override public Collection merges() { List result = new ArrayList<>(); @@ -1872,11 +1883,13 @@ Class getBalancerClass() { } void getAssignments(SortedMap currentStatus, + Map> currentTServerGroups, Map unassigned, Map assignedOut) { - AssignmentParamsImpl params = AssignmentParamsImpl.fromThrift(currentStatus, - unassigned.entrySet().stream().collect(HashMap::new, - (m, e) -> m.put(e.getKey(), e.getValue().getServerInstance()), Map::putAll), - assignedOut); + AssignmentParamsImpl params = + AssignmentParamsImpl.fromThrift(currentStatus, currentTServerGroups, + unassigned.entrySet().stream().collect(HashMap::new, + (m, e) -> m.put(e.getKey(), e.getValue().getServerInstance()), Map::putAll), + assignedOut); tabletBalancer.getAssignments(params); } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java index 5f1f5f3f5a5..6e9a06667f8 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java @@ -37,6 +37,7 @@ import java.util.TreeMap; import java.util.TreeSet; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.AccumuloException; @@ -57,6 +58,7 @@ import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.gc.ReferenceFile; import org.apache.accumulo.core.logging.TabletLogger; +import org.apache.accumulo.core.manager.balancer.TabletServerIdImpl; import org.apache.accumulo.core.manager.state.TabletManagement; import org.apache.accumulo.core.manager.state.TabletManagement.ManagementAction; import org.apache.accumulo.core.manager.state.tables.TableState; @@ -81,6 +83,7 @@ import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location; import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.spi.balancer.data.TabletServerId; import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException; import org.apache.accumulo.core.util.TextUtil; import org.apache.accumulo.core.util.threads.Threads.AccumuloDaemonThread; @@ -179,11 +182,14 @@ private static class TabletLists { private final Map> logsForDeadServers = new TreeMap<>(); // read only list of tablet servers that are not shutting down private final SortedMap destinations; + private final Map> currentTServerGrouping; - public TabletLists(Manager m, SortedMap curTServers) { + public TabletLists(Manager m, SortedMap curTServers, + Map> grouping) { var destinationsMod = new TreeMap<>(curTServers); destinationsMod.keySet().removeAll(m.serversToShutdown); this.destinations = Collections.unmodifiableSortedMap(destinationsMod); + this.currentTServerGrouping = grouping; } public void reset() { @@ -222,7 +228,7 @@ public void run() { } // Get the current status for the current list of tservers - SortedMap currentTServers = new TreeMap<>(); + final SortedMap currentTServers = new TreeMap<>(); for (TServerInstance entry : manager.tserverSet.getCurrentServers()) { currentTServers.put(entry, manager.tserverStatus.get(entry)); } @@ -235,7 +241,10 @@ public void run() { continue; } - TabletLists tLists = new TabletLists(manager, currentTServers); + final Map> currentTServerGrouping = + manager.tserverSet.getCurrentServersGroups(); + + TabletLists tLists = new TabletLists(manager, currentTServers, currentTServerGrouping); ManagerState managerState = manager.getManagerState(); int[] counts = new int[TabletState.values().length]; @@ -244,6 +253,12 @@ public void run() { CompactionJobGenerator compactionGenerator = new CompactionJobGenerator(new ServiceEnvironmentImpl(manager.getContext())); + final Map> resourceGroups = new HashMap<>(); + manager.tServerResourceGroups().forEach((k, v) -> { + resourceGroups.put(k, + v.stream().map(s -> new TabletServerIdImpl(s)).collect(Collectors.toSet())); + }); + // Walk through the tablets in our store, and work tablets // towards their goal iter = store.iterator(); @@ -288,7 +303,8 @@ public void run() { return mStats != null ? mStats : new MergeStats(new MergeInfo()); }); TabletGoalState goal = manager.getGoalState(tm, mergeStats.getMergeInfo()); - final TabletState state = tm.getTabletState(currentTServers.keySet()); + TabletState state = + tm.getTabletState(currentTServers.keySet(), manager.tabletBalancer, resourceGroups); final Location location = tm.getLocation(); Location current = null; @@ -309,6 +325,8 @@ public void run() { // Always follow through with assignments if (state == TabletState.ASSIGNED) { goal = TabletGoalState.HOSTED; + } else if (state == TabletState.ASSIGNED_TO_WRONG_GROUP) { + goal = TabletGoalState.UNASSIGNED; } // if we are shutting down all the tabletservers, we have to do it in order @@ -374,6 +392,8 @@ public void run() { tLists.assigned.add(new Assignment(tm.getExtent(), future != null ? future.getServerInstance() : null, tm.getLast())); break; + default: + break; } } else { switch (state) { @@ -388,6 +408,7 @@ public void run() { case ASSIGNED_TO_DEAD_SERVER: unassignDeadTablet(tLists, tm, wals); break; + case ASSIGNED_TO_WRONG_GROUP: case HOSTED: TServerConnection client = manager.tserverSet.getConnection(location.getServerInstance()); @@ -1023,7 +1044,8 @@ private void getAssignmentsFromBalancer(TabletLists tLists, Map unassigned) { if (!tLists.destinations.isEmpty()) { Map assignedOut = new HashMap<>(); - manager.getAssignments(tLists.destinations, unassigned, assignedOut); + manager.getAssignments(tLists.destinations, tLists.currentTServerGrouping, unassigned, + assignedOut); for (Entry assignment : assignedOut.entrySet()) { if (unassigned.containsKey(assignment.getKey())) { if (assignment.getValue() != null) { diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java index 7e1242def63..1b7839a0201 100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java @@ -825,8 +825,8 @@ private void getMonitorLock() throws KeeperException, InterruptedException { while (true) { MoniterLockWatcher monitorLockWatcher = new MoniterLockWatcher(); monitorLock = new ServiceLock(zoo.getZooKeeper(), monitorLockPath, zooLockUUID); - monitorLock.lock(monitorLockWatcher, - new ServiceLockData(zooLockUUID, getHostname(), ThriftService.NONE)); + monitorLock.lock(monitorLockWatcher, new ServiceLockData(zooLockUUID, getHostname(), + ThriftService.NONE, this.getResourceGroup())); monitorLockWatcher.waitForChange(); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java index 102267aaf70..c2aec8a5b0d 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java @@ -53,6 +53,7 @@ import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.conf.SiteConfiguration; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.dataImpl.thrift.InitialMultiScan; import org.apache.accumulo.core.dataImpl.thrift.InitialScan; @@ -190,8 +191,6 @@ private TabletMetadataLoader(Ample ample) { private ZooCache managerLockCache; - private final String groupName; - public ScanServer(ConfigOpts opts, String[] args) { super("sserver", opts, args); @@ -236,8 +235,6 @@ public ScanServer(ConfigOpts opts, String[] args) { delegate = newThriftScanClientHandler(new WriteTracker()); - this.groupName = getConfiguration().get(Property.SSERV_GROUP_NAME); - ThreadPools.watchCriticalScheduledTask(getContext().getScheduledExecutor() .scheduleWithFixedDelay(() -> cleanUpReservedFiles(scanServerReservationExpiration), scanServerReservationExpiration, scanServerReservationExpiration, @@ -245,6 +242,11 @@ public ScanServer(ConfigOpts opts, String[] args) { } + @Override + protected String getResourceGroupPropertyValue(SiteConfiguration conf) { + return conf.get(Property.SSERV_GROUP_NAME); + } + @VisibleForTesting protected ThriftScanClientHandler newThriftScanClientHandler(WriteTracker writeTracker) { return new ThriftScanClientHandler(this, writeTracker); @@ -327,7 +329,7 @@ public void unableToMonitorLockNode(final Exception e) { zoo.putPersistentData(zLockPath.toString(), new byte[0], NodeExistsPolicy.SKIP); if (scanServerLock.tryLock(lw, new ServiceLockData(serverLockUUID, getClientAddressString(), - ThriftService.TABLET_SCAN, this.groupName))) { + ThriftService.TABLET_SCAN, this.getResourceGroup()))) { LOG.debug("Obtained scan server lock {}", scanServerLock.getLockPath()); return scanServerLock; } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index 41878ecc5b6..d16c5015a31 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -73,6 +73,7 @@ import org.apache.accumulo.core.clientImpl.DurabilityImpl; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.conf.SiteConfiguration; import org.apache.accumulo.core.data.InstanceId; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; @@ -370,6 +371,11 @@ private void logBusyTablets(List> busyTablets, config(); } + @Override + protected String getResourceGroupPropertyValue(SiteConfiguration conf) { + return conf.get(Property.TSERV_GROUP_NAME); + } + public InstanceId getInstanceID() { return getContext().getInstanceID(); } @@ -671,8 +677,8 @@ public void unableToMonitorLockNode(final Exception e) { for (ThriftService svc : new ThriftService[] {ThriftService.CLIENT, ThriftService.TABLET_INGEST, ThriftService.TABLET_MANAGEMENT, ThriftService.TABLET_SCAN, ThriftService.TSERV}) { - descriptors - .addService(new ServiceDescriptor(tabletServerUUID, svc, getClientAddressString())); + descriptors.addService(new ServiceDescriptor(tabletServerUUID, svc, + getClientAddressString(), this.getResourceGroup())); } if (tabletServerLock.tryLock(lw, new ServiceLockData(descriptors))) { diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BalanceInPresenceOfOfflineTableIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BalanceInPresenceOfOfflineTableIT.java index bec21401e6b..c80ad67f2e1 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/BalanceInPresenceOfOfflineTableIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/BalanceInPresenceOfOfflineTableIT.java @@ -27,6 +27,7 @@ import java.util.TreeSet; import java.util.concurrent.TimeUnit; +import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.AccumuloException; @@ -36,7 +37,6 @@ import org.apache.accumulo.core.clientImpl.ClientContext; import org.apache.accumulo.core.clientImpl.Credentials; import org.apache.accumulo.core.conf.Property; -import org.apache.accumulo.core.lock.ServiceLockData; import org.apache.accumulo.core.manager.thrift.ManagerMonitorInfo; import org.apache.accumulo.core.manager.thrift.TableInfo; import org.apache.accumulo.core.rpc.clients.ThriftClientTypes; @@ -73,7 +73,7 @@ public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoo cfg.setSiteConfig(siteConfig); // ensure we have two tservers if (cfg.getClusterServerConfiguration().getTabletServerConfiguration() - .get(ServiceLockData.ServiceDescriptor.DEFAULT_GROUP_NAME) < 2) { + .get(Constants.DEFAULT_RESOURCE_GROUP_NAME) < 2) { cfg.getClusterServerConfiguration().setNumDefaultTabletServers(2); } } diff --git a/test/src/main/java/org/apache/accumulo/test/functional/SplitRecoveryIT.java b/test/src/main/java/org/apache/accumulo/test/functional/SplitRecoveryIT.java index 379c259494d..ea78d7c29e6 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/SplitRecoveryIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/SplitRecoveryIT.java @@ -117,7 +117,7 @@ public void unableToMonitorLockNode(Exception e) { System.exit(-1); } }, new ServiceLockData(UUID.randomUUID(), "foo", ThriftService.TSERV, - ServiceLockData.ServiceDescriptor.DEFAULT_GROUP_NAME)); + Constants.DEFAULT_RESOURCE_GROUP_NAME)); if (!gotLock) { System.err.println("Failed to get lock " + zPath); diff --git a/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java b/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java index 1b3a14df5e5..69af3e91ec0 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java @@ -25,9 +25,11 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.SortedSet; @@ -392,6 +394,11 @@ public Set onlineTables() { return this.onlineTables; } + @Override + public Map> tServerResourceGroups() { + return new HashMap<>(); + } + @Override public Collection merges() { return Collections.emptySet(); diff --git a/test/src/main/java/org/apache/accumulo/test/functional/TabletResourceGroupBalanceIT.java b/test/src/main/java/org/apache/accumulo/test/functional/TabletResourceGroupBalanceIT.java new file mode 100644 index 00000000000..8bbdafa9d58 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/functional/TabletResourceGroupBalanceIT.java @@ -0,0 +1,361 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.functional; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Optional; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.Accumulo; +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.admin.NewTableConfiguration; +import org.apache.accumulo.core.client.admin.TabletHostingGoal; +import org.apache.accumulo.core.clientImpl.ClientContext; +import org.apache.accumulo.core.clientImpl.ClientTabletCache; +import org.apache.accumulo.core.clientImpl.ClientTabletCache.LocationNeed; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.fate.zookeeper.ZooCache; +import org.apache.accumulo.core.fate.zookeeper.ZooCache.ZcStat; +import org.apache.accumulo.core.lock.ServiceLock; +import org.apache.accumulo.core.lock.ServiceLockData; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.RootTable; +import org.apache.accumulo.core.metadata.schema.Ample; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location; +import org.apache.accumulo.core.metadata.schema.TabletMetadata.LocationType; +import org.apache.accumulo.harness.MiniClusterConfigurationCallback; +import org.apache.accumulo.harness.SharedMiniClusterBase; +import org.apache.accumulo.minicluster.ServerType; +import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; +import org.apache.accumulo.test.util.Wait; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.net.HostAndPort; + +public class TabletResourceGroupBalanceIT extends SharedMiniClusterBase { + + private static final Logger LOG = LoggerFactory.getLogger(TabletResourceGroupBalanceIT.class); + + public static class TRGBalanceITConfig implements MiniClusterConfigurationCallback { + + @Override + public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration coreSite) { + cfg.setProperty(Property.MANAGER_STARTUP_TSERVER_AVAIL_MIN_COUNT, "2"); + cfg.setProperty(Property.MANAGER_STARTUP_TSERVER_AVAIL_MAX_WAIT, "10s"); + cfg.setProperty(Property.TSERV_MIGRATE_MAXCONCURRENT, "50"); + cfg.getClusterServerConfiguration().setNumDefaultTabletServers(1); + cfg.getClusterServerConfiguration().addTabletServerResourceGroup("GROUP1", 1); + } + + } + + @BeforeAll + public static void beforeAll() throws Exception { + SharedMiniClusterBase.startMiniClusterWithConfig(new TRGBalanceITConfig()); + } + + @AfterAll + public static void afterAll() throws Exception { + SharedMiniClusterBase.stopMiniCluster(); + } + + private Map getTServerGroups() throws Exception { + + Map tservers = new HashMap<>(); + ZooCache zk = getCluster().getServerContext().getZooCache(); + String zpath = getCluster().getServerContext().getZooKeeperRoot() + Constants.ZTSERVERS; + + List children = zk.getChildren(zpath); + for (String child : children) { + final var zLockPath = ServiceLock.path(zpath + "/" + child); + ZcStat stat = new ZcStat(); + Optional sld = ServiceLock.getLockData(zk, zLockPath, stat); + try { + HostAndPort client = sld.orElseThrow().getAddress(ServiceLockData.ThriftService.TSERV); + String resourceGroup = sld.orElseThrow().getGroup(ServiceLockData.ThriftService.TSERV); + tservers.put(client.toString(), resourceGroup); + } catch (NoSuchElementException nsee) { + // We are starting and stopping servers, so it's possible for this to occur. + } + } + return tservers; + + } + + @Test + public void testBalancerWithResourceGroups() throws Exception { + + SortedSet splits = new TreeSet<>(); + IntStream.range(97, 122).forEach(i -> splits.add(new Text(new String("" + i)))); + + NewTableConfiguration ntc1 = new NewTableConfiguration(); + ntc1.withInitialHostingGoal(TabletHostingGoal.ALWAYS); + ntc1.withSplits(splits); + + Map properties = new HashMap<>(); + properties.put("table.custom.assignment.group", "GROUP1"); + + NewTableConfiguration ntc2 = new NewTableConfiguration(); + ntc2.withInitialHostingGoal(TabletHostingGoal.ALWAYS); + ntc2.withSplits(splits); + ntc2.setProperties(properties); + + String[] names = this.getUniqueNames(2); + try (AccumuloClient client = + Accumulo.newClient().from(getCluster().getClientProperties()).build()) { + client.tableOperations().create(names[0], ntc1); + client.tableOperations().create(names[1], ntc2); + client.instanceOperations().waitForBalance(); + + Map tserverGroups = getTServerGroups(); + assertEquals(2, tserverGroups.size()); + + Ample ample = ((ClientContext) client).getAmple(); + + // Check table names[0] + String tableId = client.tableOperations().tableIdMap().get(names[0]); + List locations = ample.readTablets().forTable(TableId.of(tableId)) + .fetch(TabletMetadata.ColumnType.LOCATION).build().stream().collect(Collectors.toList()); + + assertEquals(26, locations.size()); + Location l1 = locations.get(0).getLocation(); + assertEquals("default", tserverGroups.get(l1.getHostAndPort().toString())); + locations.forEach(loc -> assertEquals(l1, loc.getLocation())); + + // Check table names[1] + tableId = client.tableOperations().tableIdMap().get(names[1]); + locations = ample.readTablets().forTable(TableId.of(tableId)) + .fetch(TabletMetadata.ColumnType.LOCATION).build().stream().collect(Collectors.toList()); + + assertEquals(26, locations.size()); + Location l2 = locations.get(0).getLocation(); + assertEquals("GROUP1", tserverGroups.get(l2.getHostAndPort().toString())); + locations.forEach(loc -> assertEquals(l2, loc.getLocation())); + + client.tableOperations().delete(names[0]); + client.tableOperations().delete(names[1]); + } + + } + + @Test + public void testResourceGroupBalanceWithNoTServers() throws Exception { + + SortedSet splits = new TreeSet<>(); + IntStream.range(97, 122).forEach(i -> splits.add(new Text(new String("" + i)))); + + Map properties = new HashMap<>(); + properties.put("table.custom.assignment.group", "GROUP2"); + + NewTableConfiguration ntc1 = new NewTableConfiguration(); + ntc1.withInitialHostingGoal(TabletHostingGoal.ALWAYS); + ntc1.withSplits(splits); + ntc1.setProperties(properties); + + String tableName = this.getUniqueNames(1)[0]; + try (final AccumuloClient client = + Accumulo.newClient().from(getCluster().getClientProperties()).build()) { + + client.tableOperations().create(tableName, ntc1); + + assertEquals(0, getCountOfHostedTablets(client, tableName)); + + AtomicReference error = new AtomicReference<>(); + Thread ingest = new Thread(() -> { + try { + ReadWriteIT.ingest(client, 1000, 1, 1, 0, tableName); + ReadWriteIT.verify(client, 1000, 1, 1, 0, tableName); + } catch (Exception e) { + error.set(e); + } + }); + ingest.start(); + + assertEquals(0, getCountOfHostedTablets(client, tableName)); + + // Start TabletServer for GROUP2 + getCluster().getConfig().getClusterServerConfiguration() + .addTabletServerResourceGroup("GROUP2", 1); + getCluster().getClusterControl().start(ServerType.TABLET_SERVER); + + client.instanceOperations().waitForBalance(); + assertEquals(26, getCountOfHostedTablets(client, tableName)); + ingest.join(); + assertNull(error.get()); + + client.tableOperations().delete(tableName); + // Stop all tablet servers because there is no way to just stop + // the GROUP2 server yet. + getCluster().getClusterControl().stopAllServers(ServerType.TABLET_SERVER); + getCluster().getConfig().getClusterServerConfiguration().clearTServerResourceGroups(); + getCluster().getConfig().getClusterServerConfiguration() + .addTabletServerResourceGroup("GROUP1", 1); + getCluster().getClusterControl().start(ServerType.TABLET_SERVER); + + } + } + + @Test + public void testUserTablePropertyChange() throws Exception { + SortedSet splits = new TreeSet<>(); + IntStream.range(97, 122).forEach(i -> splits.add(new Text(new String("" + i)))); + + NewTableConfiguration ntc1 = new NewTableConfiguration(); + ntc1.withInitialHostingGoal(TabletHostingGoal.ALWAYS); + ntc1.withSplits(splits); + + String tableName = this.getUniqueNames(1)[0]; + try (final AccumuloClient client = + Accumulo.newClient().from(getCluster().getClientProperties()).build()) { + + client.tableOperations().create(tableName, ntc1); + + // wait for all tablets to be hosted + Wait.waitFor(() -> 26 != getCountOfHostedTablets(client, tableName)); + + client.instanceOperations().waitForBalance(); + + try { + testResourceGroupPropertyChange(client, tableName, 26); + } finally { + client.tableOperations().delete(tableName); + } + } + } + + @Test + public void testMetadataTablePropertyChange() throws Exception { + try (final AccumuloClient client = + Accumulo.newClient().from(getCluster().getClientProperties()).build()) { + + client.instanceOperations().waitForBalance(); + testResourceGroupPropertyChange(client, MetadataTable.NAME, + getCountOfHostedTablets(client, MetadataTable.NAME)); + } + } + + @Test + public void testRootTablePropertyChange() throws Exception { + try (final AccumuloClient client = + Accumulo.newClient().from(getCluster().getClientProperties()).build()) { + + client.instanceOperations().waitForBalance(); + testResourceGroupPropertyChange(client, RootTable.NAME, + getCountOfHostedTablets(client, RootTable.NAME)); + } + } + + public void testResourceGroupPropertyChange(AccumuloClient client, String tableName, + int numExpectedSplits) throws Exception { + + assertEquals(numExpectedSplits, getCountOfHostedTablets(client, tableName)); + + Map tserverGroups = getTServerGroups(); + LOG.info("Tablet Server groups: {}", tserverGroups); + + assertEquals(2, tserverGroups.size()); + + Ample ample = ((ClientContext) client).getAmple(); + String tableId = client.tableOperations().tableIdMap().get(tableName); + + // Validate that all of the tables tablets are on the same tserver and that + // the tserver is in the default resource group + List locations = ample.readTablets().forTable(TableId.of(tableId)) + .fetch(TabletMetadata.ColumnType.LOCATION).build().stream().collect(Collectors.toList()); + assertEquals(numExpectedSplits, locations.size()); + Location l1 = locations.get(0).getLocation(); + assertEquals("default", tserverGroups.get(l1.getHostAndPort().toString())); + locations.forEach(loc -> assertEquals(l1, loc.getLocation())); + + // change the resource group property for the table + client.tableOperations().setProperty(tableName, "table.custom.assignment.group", "GROUP1"); + + locations = ample.readTablets().forTable(TableId.of(tableId)) + .fetch(TabletMetadata.ColumnType.LOCATION).build().stream().collect(Collectors.toList()); + // wait for GROUP1 to show up in the list of locations as the current location + while ((locations == null || locations.isEmpty() || locations.size() != numExpectedSplits + || locations.get(0).getLocation() == null + || locations.get(0).getLocation().getType() == LocationType.FUTURE) + || (locations.get(0).getLocation().getType() == LocationType.CURRENT && !tserverGroups + .get(locations.get(0).getLocation().getHostAndPort().toString()).equals("GROUP1"))) { + locations = ample.readTablets().forTable(TableId.of(tableId)) + .fetch(TabletMetadata.ColumnType.LOCATION).build().stream().collect(Collectors.toList()); + } + Location group1Location = locations.get(0).getLocation(); + assertTrue(tserverGroups.get(group1Location.getHostAndPort().toString()).equals("GROUP1")); + + client.instanceOperations().waitForBalance(); + + // validate that all tablets have the same location as the first tablet + locations = ample.readTablets().forTable(TableId.of(tableId)) + .fetch(TabletMetadata.ColumnType.LOCATION).build().stream().collect(Collectors.toList()); + while (locations == null || locations.isEmpty() || locations.size() != numExpectedSplits) { + locations = ample.readTablets().forTable(TableId.of(tableId)) + .fetch(TabletMetadata.ColumnType.LOCATION).build().stream().collect(Collectors.toList()); + } + if (locations.stream().map(TabletMetadata::getLocation) + .allMatch((l) -> group1Location.equals(l))) { + LOG.info("Group1 location: {} matches all tablet locations: {}", group1Location, + locations.stream().map(TabletMetadata::getLocation).collect(Collectors.toList())); + } else { + LOG.info("Group1 location: {} does not match all tablet locations: {}", group1Location, + locations.stream().map(TabletMetadata::getLocation).collect(Collectors.toList())); + fail(); + } + + } + + private int getCountOfHostedTablets(AccumuloClient client, String tableName) throws Exception { + + ClientTabletCache locator = ClientTabletCache.getInstance((ClientContext) client, + TableId.of(client.tableOperations().tableIdMap().get(tableName))); + locator.invalidateCache(); + AtomicInteger locations = new AtomicInteger(0); + locator.findTablets((ClientContext) client, Collections.singletonList(new Range()), (ct, r) -> { + if (ct.getTserverLocation().isPresent()) { + locations.incrementAndGet(); + } + }, LocationNeed.NOT_REQUIRED); + return locations.get(); + } +} diff --git a/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java b/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java index 2a8d0236858..c2690411629 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java @@ -164,7 +164,7 @@ public void unableToMonitorLockNode(Exception e) { }; if (zlock.tryLock(lw, new ServiceLockData(UUID.randomUUID(), addressString, ThriftService.TSERV, - ServiceLockData.ServiceDescriptor.DEFAULT_GROUP_NAME))) { + Constants.DEFAULT_RESOURCE_GROUP_NAME))) { log.debug("Obtained tablet server lock {}", zlock.getLockPath()); } // modify metadata diff --git a/test/src/main/java/org/apache/accumulo/test/lock/ServiceLockIT.java b/test/src/main/java/org/apache/accumulo/test/lock/ServiceLockIT.java index 1a994e8b423..568df0d077d 100644 --- a/test/src/main/java/org/apache/accumulo/test/lock/ServiceLockIT.java +++ b/test/src/main/java/org/apache/accumulo/test/lock/ServiceLockIT.java @@ -37,6 +37,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.LockSupport; +import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.data.InstanceId; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.fate.zookeeper.ZooSession; @@ -46,7 +47,6 @@ import org.apache.accumulo.core.lock.ServiceLock.LockLossReason; import org.apache.accumulo.core.lock.ServiceLock.ServiceLockPath; import org.apache.accumulo.core.lock.ServiceLockData; -import org.apache.accumulo.core.lock.ServiceLockData.ServiceDescriptor; import org.apache.accumulo.core.lock.ServiceLockData.ThriftService; import org.apache.accumulo.test.zookeeper.ZooKeeperTestingServer; import org.apache.zookeeper.CreateMode; @@ -233,7 +233,7 @@ public void testDeleteParent() throws Exception { TestALW lw = new TestALW(); zl.lock(lw, new ServiceLockData(UUID.randomUUID(), "test1", ThriftService.TSERV, - ServiceDescriptor.DEFAULT_GROUP_NAME)); + Constants.DEFAULT_RESOURCE_GROUP_NAME)); lw.waitForChanges(1); @@ -258,7 +258,7 @@ public void testNoParent() throws Exception { TestALW lw = new TestALW(); zl.lock(lw, new ServiceLockData(UUID.randomUUID(), "test1", ThriftService.TSERV, - ServiceDescriptor.DEFAULT_GROUP_NAME)); + Constants.DEFAULT_RESOURCE_GROUP_NAME)); lw.waitForChanges(1); @@ -284,7 +284,7 @@ public void testDeleteLock() throws Exception { TestALW lw = new TestALW(); zl.lock(lw, new ServiceLockData(UUID.randomUUID(), "test1", ThriftService.TSERV, - ServiceDescriptor.DEFAULT_GROUP_NAME)); + Constants.DEFAULT_RESOURCE_GROUP_NAME)); lw.waitForChanges(1); @@ -318,7 +318,7 @@ public void testDeleteWaiting() throws Exception { TestALW lw = new TestALW(); zl.lock(lw, new ServiceLockData(UUID.randomUUID(), "test1", ThriftService.TSERV, - ServiceDescriptor.DEFAULT_GROUP_NAME)); + Constants.DEFAULT_RESOURCE_GROUP_NAME)); lw.waitForChanges(1); @@ -332,7 +332,7 @@ public void testDeleteWaiting() throws Exception { TestALW lw2 = new TestALW(); zl2.lock(lw2, new ServiceLockData(UUID.randomUUID(), "test2", ThriftService.TSERV, - ServiceDescriptor.DEFAULT_GROUP_NAME)); + Constants.DEFAULT_RESOURCE_GROUP_NAME)); assertFalse(lw2.locked); assertFalse(zl2.isLocked()); @@ -342,7 +342,7 @@ public void testDeleteWaiting() throws Exception { TestALW lw3 = new TestALW(); zl3.lock(lw3, new ServiceLockData(UUID.randomUUID(), "test3", ThriftService.TSERV, - ServiceDescriptor.DEFAULT_GROUP_NAME)); + Constants.DEFAULT_RESOURCE_GROUP_NAME)); List children = ServiceLock.validateAndSort(parent, zk.getChildren(parent.toString())); @@ -398,7 +398,7 @@ public void testUnexpectedEvent() throws Exception { TestALW lw = new TestALW(); zl.lock(lw, new ServiceLockData(UUID.randomUUID(), "test1", ThriftService.TSERV, - ServiceDescriptor.DEFAULT_GROUP_NAME)); + Constants.DEFAULT_RESOURCE_GROUP_NAME)); lw.waitForChanges(1); @@ -449,7 +449,7 @@ public void testLockSerial() throws Exception { ServiceLock zl1 = getZooLock(zk1, parent, UUID.fromString("00000000-0000-0000-0000-aaaaaaaaaaaa")); zl1.lock(zlw1, new ServiceLockData(UUID.randomUUID(), "test1", ThriftService.TSERV, - ServiceDescriptor.DEFAULT_GROUP_NAME)); + Constants.DEFAULT_RESOURCE_GROUP_NAME)); // The call above creates two nodes in ZK because of the overridden create method in // ZooKeeperWrapper. // The nodes created are: @@ -465,7 +465,7 @@ public void testLockSerial() throws Exception { ServiceLock zl2 = getZooLock(zk2, parent, UUID.fromString("00000000-0000-0000-0000-bbbbbbbbbbbb")); zl2.lock(zlw2, new ServiceLockData(UUID.randomUUID(), "test2", ThriftService.TSERV, - ServiceDescriptor.DEFAULT_GROUP_NAME)); + Constants.DEFAULT_RESOURCE_GROUP_NAME)); // The call above creates two nodes in ZK because of the overridden create method in // ZooKeeperWrapper. // The nodes created are: @@ -553,7 +553,7 @@ public void run() { getLockLatch.countDown(); // signal we are done getLockLatch.await(); // wait for others to finish zl.lock(lockWatcher, new ServiceLockData(UUID.randomUUID(), "test1", ThriftService.TSERV, - ServiceDescriptor.DEFAULT_GROUP_NAME)); // race to the lock + Constants.DEFAULT_RESOURCE_GROUP_NAME)); // race to the lock lockCompletedLatch.countDown(); unlockLatch.await(); zl.unlock(); @@ -683,7 +683,7 @@ public void testTryLock() throws Exception { TestALW lw = new TestALW(); boolean ret = zl.tryLock(lw, new ServiceLockData(UUID.randomUUID(), "test1", - ThriftService.TSERV, ServiceDescriptor.DEFAULT_GROUP_NAME)); + ThriftService.TSERV, Constants.DEFAULT_RESOURCE_GROUP_NAME)); assertTrue(ret); @@ -718,13 +718,13 @@ public void testChangeData() throws Exception { TestALW lw = new TestALW(); ServiceLockData sld1 = new ServiceLockData(UUID.randomUUID(), "test1", ThriftService.TSERV, - ServiceDescriptor.DEFAULT_GROUP_NAME); + Constants.DEFAULT_RESOURCE_GROUP_NAME); zl.lock(lw, sld1); assertEquals(Optional.of(sld1), ServiceLockData.parse(zk.getData(zl.getLockPath(), null, null))); ServiceLockData sld2 = new ServiceLockData(UUID.randomUUID(), "test2", ThriftService.TSERV, - ServiceDescriptor.DEFAULT_GROUP_NAME); + Constants.DEFAULT_RESOURCE_GROUP_NAME); zl.replaceLockData(sld2); assertEquals(Optional.of(sld2), ServiceLockData.parse(zk.getData(zl.getLockPath(), null, null))); diff --git a/test/src/main/java/org/apache/accumulo/test/manager/MergeStateIT.java b/test/src/main/java/org/apache/accumulo/test/manager/MergeStateIT.java index 9556aabff4b..3e82445aee4 100644 --- a/test/src/main/java/org/apache/accumulo/test/manager/MergeStateIT.java +++ b/test/src/main/java/org/apache/accumulo/test/manager/MergeStateIT.java @@ -22,7 +22,9 @@ import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.TreeSet; @@ -88,6 +90,11 @@ public Set onlineTabletServers() { return Collections.singleton(someTServer); } + @Override + public Map> tServerResourceGroups() { + return new HashMap<>(); + } + @Override public Collection merges() { return Collections.singleton(mergeInfo); diff --git a/test/src/test/java/org/apache/accumulo/test/ChaoticLoadBalancerTest.java b/test/src/test/java/org/apache/accumulo/test/ChaoticLoadBalancerTest.java index 8b803e70895..798c5c77fa7 100644 --- a/test/src/test/java/org/apache/accumulo/test/ChaoticLoadBalancerTest.java +++ b/test/src/test/java/org/apache/accumulo/test/ChaoticLoadBalancerTest.java @@ -30,6 +30,7 @@ import java.util.SortedMap; import java.util.TreeMap; +import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.TabletId; import org.apache.accumulo.core.dataImpl.KeyExtent; @@ -115,8 +116,9 @@ public void testAssignMigrations() { TestChaoticLoadBalancer balancer = new TestChaoticLoadBalancer(); Map assignments = new HashMap<>(); - balancer.getAssignments( - new AssignmentParamsImpl(getAssignments(servers), metadataTable, assignments)); + balancer.getAssignments(new AssignmentParamsImpl(getAssignments(servers), + Map.of(Constants.DEFAULT_RESOURCE_GROUP_NAME, servers.keySet()), metadataTable, + assignments)); assertEquals(assignments.size(), metadataTable.size()); } @@ -157,7 +159,10 @@ public void testUnevenAssignment() { // amount, or even expected amount List migrationsOut = new ArrayList<>(); while (!migrationsOut.isEmpty()) { - balancer.balance(new BalanceParamsImpl(getAssignments(servers), migrations, migrationsOut)); + SortedMap current = getAssignments(servers); + balancer.balance(new BalanceParamsImpl(current, + Map.of(Constants.DEFAULT_RESOURCE_GROUP_NAME, current.keySet()), migrations, + migrationsOut)); } }