Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
38ca5e9
Add TSERV_GROUP_NAME, ability to balance within tserver group
dlmarion Jun 14, 2023
bda0157
Add new index to LiveTServerSet instead of modifying TServerInstance
dlmarion Jun 15, 2023
0b5f646
Merge branch 'elasticity' into 3459-tserver-groups-attempt2
dlmarion Jun 15, 2023
0f1752a
wip
dlmarion Jun 15, 2023
119d468
fix javadoc
dlmarion Jun 15, 2023
d4b71fa
Merge branch 'elasticity' into 3459-tserver-groups-attempt2
dlmarion Jun 16, 2023
ec2f2dd
Update property language
dlmarion Jun 16, 2023
62ff480
Merge branch 'elasticity' into 3459-tserver-groups-attempt2
dlmarion Jun 21, 2023
a7b1762
Merge branch 'elasticity' into 3459-tserver-groups-attempt2
dlmarion Jun 21, 2023
736b7d1
Added ResourceGroupBalanceIT, necessary changes for it to work
dlmarion Jun 21, 2023
741f8b1
Merge branch 'elasticity' into 3459-tserver-groups-attempt2
dlmarion Jun 23, 2023
9f080e5
Merge branch 'elasticity' into 3459-tserver-groups-attempt2
dlmarion Jun 27, 2023
c2bfffd
Merge branch 'elasticity' into 3459-tserver-groups-attempt2
dlmarion Jul 3, 2023
deca07c
Working on moving TABLE_ASSIGNMENT_GROUP, need more tests
dlmarion Jul 5, 2023
db29813
Added no tserver test
dlmarion Jul 5, 2023
3cb511b
Merge branch 'elasticity' into 3459-tserver-groups-attempt2
dlmarion Jul 5, 2023
92c61dc
Added test that table property change causes migration
dlmarion Jul 6, 2023
508727a
Merge branch 'elasticity' into 3459-tserver-groups-attempt2
dlmarion Jul 7, 2023
d0b0c4d
Removed Thread.sleep from test
dlmarion Jul 11, 2023
a03bfb6
Modified TabletBalancer api to just take TabletId
dlmarion Jul 11, 2023
6ca1a88
Added tests for root and metadata table
dlmarion Jul 11, 2023
15cd817
fix formatting
dlmarion Jul 11, 2023
6f5d9cd
Add missing override
dlmarion Jul 11, 2023
899cb4e
Remove spammy log
dlmarion Jul 11, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions core/src/main/java/org/apache/accumulo/core/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -124,4 +124,6 @@ public class Constants {
public static final String HDFS_TABLES_DIR = "/tables";

public static final int DEFAULT_VISIBILITY_CACHE_SIZE = 1000;

public static final String DEFAULT_RESOURCE_GROUP_NAME = "default";
}
17 changes: 11 additions & 6 deletions core/src/main/java/org/apache/accumulo/core/conf/Property.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.HashSet;
import java.util.function.Predicate;

import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.classloader.ClassLoaderUtil;
import org.apache.accumulo.core.data.constraints.NoDeleteConstraint;
import org.apache.accumulo.core.file.rfile.RFile;
Expand Down Expand Up @@ -391,9 +392,10 @@ public enum Property {
@Experimental
SSERV_GROUP_NAME("sserver.group", ScanServerSelector.DEFAULT_SCAN_SERVER_GROUP_NAME,
PropertyType.STRING,
"Optional group name that will be made available to the "
+ "ScanServerSelector client plugin. Groups support at least two use cases:"
+ " dedicating resources to scans and/or using different hardware for scans.",
"Resource group name for this ScanServer. Resource groups support at least two use cases:"
+ " dedicating resources to scans and/or using different hardware for scans. Clients can"
+ " configure the ConfigurableScanServerSelector to specify the resource group to use for"
+ " eventual consistency scans.",
"3.0.0"),
@Experimental
SSERV_CACHED_TABLET_METADATA_EXPIRATION("sserver.cache.metadata.expiration", "5m",
Expand Down Expand Up @@ -731,6 +733,10 @@ public enum Property {
PropertyType.TIMEDURATION,
"The interval at which the TabletServer will check if on-demand tablets can be unloaded",
"4.0.0"),
TSERV_GROUP_NAME("tserver.group", Constants.DEFAULT_RESOURCE_GROUP_NAME, PropertyType.STRING,
"Resource group name for this TabletServer. Resource groups can be defined to dedicate resources "
+ " to specific tables (e.g. balancing tablets for table(s) within a group, see TABLE_ASSIGNMENT_GROUP)",
"4.0.0"),

// accumulo garbage collector properties
GC_PREFIX("gc.", null, PropertyType.PREFIX,
Expand Down Expand Up @@ -1076,7 +1082,6 @@ public enum Property {
+ "also consider configuring the `" + NoDeleteConstraint.class.getName() + "` "
+ "constraint.",
"2.0.0"),

// Compactor properties
@Experimental
COMPACTOR_PREFIX("compactor.", null, PropertyType.PREFIX,
Expand All @@ -1102,8 +1107,8 @@ public enum Property {
COMPACTOR_MAX_MESSAGE_SIZE("compactor.message.size.max", "10M", PropertyType.BYTES,
"The maximum size of a message that can be sent to a tablet server.", "2.1.0"),
@Experimental
COMPACTOR_QUEUE_NAME("compactor.queue", "", PropertyType.STRING,
"The queue for which this Compactor will perform compactions", "3.0.0"),
COMPACTOR_QUEUE_NAME("compactor.queue", Constants.DEFAULT_RESOURCE_GROUP_NAME,
PropertyType.STRING, "The queue for which this Compactor will perform compactions", "3.0.0"),
// CompactionCoordinator properties
@Experimental
COMPACTION_COORDINATOR_PREFIX("compaction.coordinator.", null, PropertyType.PREFIX,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,20 +58,11 @@ public static enum ThriftService {
*/
public static class ServiceDescriptor {

/**
* The group name that will be used when one is not specified.
*/
public static final String DEFAULT_GROUP_NAME = "default";

private final UUID uuid;
private final ThriftService service;
private final String address;
private final String group;

public ServiceDescriptor(UUID uuid, ThriftService service, String address) {
this(uuid, service, address, DEFAULT_GROUP_NAME);
}

public ServiceDescriptor(UUID uuid, ThriftService service, String address, String group) {
this.uuid = requireNonNull(uuid);
this.service = requireNonNull(service);
Expand Down Expand Up @@ -157,11 +148,6 @@ public ServiceLockData(UUID uuid, String address, ThriftService service, String
Collections.singleton(new ServiceDescriptor(uuid, service, address, group)))));
}

public ServiceLockData(UUID uuid, String address, ThriftService service) {
this(new ServiceDescriptors(
new HashSet<>(Collections.singleton(new ServiceDescriptor(uuid, service, address)))));
}

public String getAddressString(ThriftService service) {
ServiceDescriptor sd = services.get(service);
if (sd == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;

Expand All @@ -40,25 +42,38 @@ public class AssignmentParamsImpl implements TabletBalancer.AssignmentParameters
private final SortedMap<TServerInstance,TabletServerStatus> thriftCurrentStatus;
private final Map<KeyExtent,TServerInstance> thriftUnassigned;
private final Map<KeyExtent,TServerInstance> thriftAssignmentsOut;
private final Map<String,Set<TabletServerId>> tserverGroups;

public static AssignmentParamsImpl fromThrift(
SortedMap<TServerInstance,TabletServerStatus> currentStatus,
Map<String,Set<TServerInstance>> currentTServerGrouping,
Map<KeyExtent,TServerInstance> unassigned, Map<KeyExtent,TServerInstance> assignmentsOut) {

SortedMap<TabletServerId,TServerStatus> currentStatusNew = new TreeMap<>();
currentStatus.forEach((tsi, status) -> currentStatusNew.put(new TabletServerIdImpl(tsi),
TServerStatusImpl.fromThrift(status)));

Map<String,Set<TabletServerId>> tserverGroups = new HashMap<>();
currentTServerGrouping.forEach((k, v) -> {
Set<TabletServerId> servers = new HashSet<>();
v.forEach(tsi -> servers.add(TabletServerIdImpl.fromThrift(tsi)));
tserverGroups.put(k, servers);
});

Map<TabletId,TabletServerId> unassignedNew = new HashMap<>();
unassigned.forEach(
(ke, tsi) -> unassignedNew.put(new TabletIdImpl(ke), TabletServerIdImpl.fromThrift(tsi)));

return new AssignmentParamsImpl(Collections.unmodifiableSortedMap(currentStatusNew),
Collections.unmodifiableMap(unassignedNew), currentStatus, unassigned, assignmentsOut);
Collections.unmodifiableMap(tserverGroups), Collections.unmodifiableMap(unassignedNew),
currentStatus, unassigned, assignmentsOut);
}

public AssignmentParamsImpl(SortedMap<TabletServerId,TServerStatus> currentStatus,
Map<TabletId,TabletServerId> unassigned, Map<TabletId,TabletServerId> assignmentsOut) {
Map<String,Set<TabletServerId>> currentGroups, Map<TabletId,TabletServerId> unassigned,
Map<TabletId,TabletServerId> assignmentsOut) {
this.currentStatus = currentStatus;
this.tserverGroups = currentGroups;
this.unassigned = unassigned;
this.assignmentsOut = assignmentsOut;
this.thriftCurrentStatus = null;
Expand All @@ -67,11 +82,12 @@ public AssignmentParamsImpl(SortedMap<TabletServerId,TServerStatus> currentStatu
}

private AssignmentParamsImpl(SortedMap<TabletServerId,TServerStatus> currentStatus,
Map<TabletId,TabletServerId> unassigned,
Map<String,Set<TabletServerId>> currentGroups, Map<TabletId,TabletServerId> unassigned,
SortedMap<TServerInstance,TabletServerStatus> thriftCurrentStatus,
Map<KeyExtent,TServerInstance> thriftUnassigned,
Map<KeyExtent,TServerInstance> thriftAssignmentsOut) {
this.currentStatus = currentStatus;
this.tserverGroups = currentGroups;
this.unassigned = unassigned;
this.assignmentsOut = null;
this.thriftCurrentStatus = thriftCurrentStatus;
Expand All @@ -84,6 +100,11 @@ public SortedMap<TabletServerId,TServerStatus> currentStatus() {
return currentStatus;
}

@Override
public Map<String,Set<TabletServerId>> currentResourceGroups() {
return tserverGroups;
}

@Override
public Map<TabletId,TabletServerId> unassignedTablets() {
return unassigned;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@
package org.apache.accumulo.core.manager.balancer;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.stream.Collectors;
Expand All @@ -40,31 +43,44 @@ public class BalanceParamsImpl implements TabletBalancer.BalanceParameters {
private final List<TabletMigration> migrationsOut;
private final SortedMap<TServerInstance,TabletServerStatus> thriftCurrentStatus;
private final Set<KeyExtent> thriftCurrentMigrations;
private final Map<String,Set<TabletServerId>> tserverResourceGroups;

public static BalanceParamsImpl fromThrift(SortedMap<TabletServerId,TServerStatus> currentStatus,
Map<String,Set<TServerInstance>> currentTServerGrouping,
SortedMap<TServerInstance,TabletServerStatus> thriftCurrentStatus,
Set<KeyExtent> thriftCurrentMigrations) {
Set<TabletId> currentMigrations = thriftCurrentMigrations.stream().map(TabletIdImpl::new)
.collect(Collectors.toUnmodifiableSet());

return new BalanceParamsImpl(currentStatus, currentMigrations, new ArrayList<>(),
Map<String,Set<TabletServerId>> tserverGroups = new HashMap<>();
currentTServerGrouping.forEach((k, v) -> {
Set<TabletServerId> servers = new HashSet<>();
v.forEach(tsi -> servers.add(TabletServerIdImpl.fromThrift(tsi)));
tserverGroups.put(k, servers);
});

return new BalanceParamsImpl(currentStatus, tserverGroups, currentMigrations, new ArrayList<>(),
thriftCurrentStatus, thriftCurrentMigrations);
}

public BalanceParamsImpl(SortedMap<TabletServerId,TServerStatus> currentStatus,
Set<TabletId> currentMigrations, List<TabletMigration> migrationsOut) {
Map<String,Set<TabletServerId>> currentGroups, Set<TabletId> currentMigrations,
List<TabletMigration> migrationsOut) {
this.currentStatus = currentStatus;
this.tserverResourceGroups = currentGroups;
this.currentMigrations = currentMigrations;
this.migrationsOut = migrationsOut;
this.thriftCurrentStatus = null;
this.thriftCurrentMigrations = null;
}

private BalanceParamsImpl(SortedMap<TabletServerId,TServerStatus> currentStatus,
Set<TabletId> currentMigrations, List<TabletMigration> migrationsOut,
Map<String,Set<TabletServerId>> currentGroups, Set<TabletId> currentMigrations,
List<TabletMigration> migrationsOut,
SortedMap<TServerInstance,TabletServerStatus> thriftCurrentStatus,
Set<KeyExtent> thriftCurrentMigrations) {
this.currentStatus = currentStatus;
this.tserverResourceGroups = currentGroups;
this.currentMigrations = currentMigrations;
this.migrationsOut = migrationsOut;
this.thriftCurrentStatus = thriftCurrentStatus;
Expand Down Expand Up @@ -100,4 +116,10 @@ public void addMigration(KeyExtent extent, TServerInstance oldServer, TServerIns
TabletServerId newTsid = new TabletServerIdImpl(newServer);
migrationsOut.add(new TabletMigration(id, oldTsid, newTsid));
}

@Override
public Map<String,Set<TabletServerId>> currentResourceGroups() {
return tserverResourceGroups;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,5 @@
package org.apache.accumulo.core.metadata;

public enum TabletState {
UNASSIGNED, ASSIGNED, HOSTED, ASSIGNED_TO_DEAD_SERVER, SUSPENDED
UNASSIGNED, ASSIGNED, HOSTED, ASSIGNED_TO_DEAD_SERVER, SUSPENDED, ASSIGNED_TO_WRONG_GROUP
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,11 @@
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.dataImpl.TabletIdImpl;
import org.apache.accumulo.core.fate.zookeeper.ZooCache;
import org.apache.accumulo.core.lock.ServiceLock;
import org.apache.accumulo.core.lock.ServiceLockData;
import org.apache.accumulo.core.manager.balancer.TabletServerIdImpl;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.RootTable;
import org.apache.accumulo.core.metadata.StoredTabletFile;
Expand All @@ -75,6 +77,8 @@
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.SuspendLocationColumn;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily;
import org.apache.accumulo.core.spi.balancer.TabletBalancer;
import org.apache.accumulo.core.spi.balancer.data.TabletServerId;
import org.apache.accumulo.core.tabletserver.log.LogEntry;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
Expand Down Expand Up @@ -421,6 +425,11 @@ public SortedMap<Key,Value> getKeyValues() {
}

public TabletState getTabletState(Set<TServerInstance> liveTServers) {
return getTabletState(liveTServers, null, null);
}

public TabletState getTabletState(Set<TServerInstance> liveTServers, TabletBalancer balancer,
Map<String,Set<TabletServerId>> currentTServerGrouping) {
ensureFetched(ColumnType.LOCATION);
ensureFetched(ColumnType.LAST);
ensureFetched(ColumnType.SUSPEND);
Expand All @@ -435,8 +444,20 @@ public TabletState getTabletState(Set<TServerInstance> liveTServers) {
return liveTServers.contains(future.getServerInstance()) ? TabletState.ASSIGNED
: TabletState.ASSIGNED_TO_DEAD_SERVER;
} else if (current != null) {
return liveTServers.contains(current.getServerInstance()) ? TabletState.HOSTED
: TabletState.ASSIGNED_TO_DEAD_SERVER;
if (liveTServers.contains(current.getServerInstance())) {
if (balancer != null) {
String resourceGroup = balancer.getResourceGroup(new TabletIdImpl(extent));
log.trace("Resource Group for extent {} is {}", extent, resourceGroup);
Set<TabletServerId> tservers = currentTServerGrouping.get(resourceGroup);
if (tservers == null
|| !tservers.contains(new TabletServerIdImpl(current.getServerInstance()))) {
return TabletState.ASSIGNED_TO_WRONG_GROUP;
}
}
return TabletState.HOSTED;
} else {
return TabletState.ASSIGNED_TO_DEAD_SERVER;
}
} else if (getSuspend() != null) {
return TabletState.SUSPENDED;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,8 +357,8 @@ public void getAssignments(AssignmentParameters params) {
}
LOG.debug("Sending {} tablets to balancer for table {} for assignment within tservers {}",
e.getValue().size(), tableName, currentView.keySet());
getBalancerForTable(e.getKey())
.getAssignments(new AssignmentParamsImpl(currentView, e.getValue(), newAssignments));
getBalancerForTable(e.getKey()).getAssignments(new AssignmentParamsImpl(currentView,
params.currentResourceGroups(), e.getValue(), newAssignments));
newAssignments.forEach(params::addAssignment);
}
}
Expand Down Expand Up @@ -495,8 +495,8 @@ public long balance(BalanceParameters params) {
continue;
}
ArrayList<TabletMigration> newMigrations = new ArrayList<>();
getBalancerForTable(tableId)
.balance(new BalanceParamsImpl(currentView, migrations, newMigrations));
getBalancerForTable(tableId).balance(new BalanceParamsImpl(currentView,
params.currentResourceGroups(), migrations, newMigrations));

if (newMigrations.isEmpty()) {
tableToTimeSinceNoMigrations.remove(tableId);
Expand Down
Loading