Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes balancing of metadata table #5195

Open
wants to merge 5 commits into
base: 2.1
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,17 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.stream.Collectors;

import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.data.TabletId;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.dataImpl.TabletIdImpl;
import org.apache.accumulo.core.master.thrift.TabletServerStatus;
import org.apache.accumulo.core.metadata.TServerInstance;
import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
import org.apache.accumulo.core.spi.balancer.TabletBalancer;
import org.apache.accumulo.core.spi.balancer.data.TServerStatus;
import org.apache.accumulo.core.spi.balancer.data.TabletMigration;
Expand All @@ -41,39 +42,44 @@ public class BalanceParamsImpl implements TabletBalancer.BalanceParameters {
private final List<TabletMigration> migrationsOut;
private final SortedMap<TServerInstance,TabletServerStatus> thriftCurrentStatus;
private final Set<KeyExtent> thriftCurrentMigrations;
private final DataLevel currentDataLevel;
private final String partition;
private final Map<String,TableId> tablesToBalance;

public static BalanceParamsImpl fromThrift(SortedMap<TabletServerId,TServerStatus> currentStatus,
SortedMap<TServerInstance,TabletServerStatus> thriftCurrentStatus,
Set<KeyExtent> thriftCurrentMigrations, DataLevel currentLevel) {
Set<KeyExtent> thriftCurrentMigrations, String partition,
Map<String,TableId> tablesToBalance) {
Set<TabletId> currentMigrations = thriftCurrentMigrations.stream().map(TabletIdImpl::new)
.collect(Collectors.toUnmodifiableSet());

return new BalanceParamsImpl(currentStatus, currentMigrations, new ArrayList<>(),
thriftCurrentStatus, thriftCurrentMigrations, currentLevel);
thriftCurrentStatus, thriftCurrentMigrations, partition, tablesToBalance);
}

public BalanceParamsImpl(SortedMap<TabletServerId,TServerStatus> currentStatus,
Set<TabletId> currentMigrations, List<TabletMigration> migrationsOut,
DataLevel currentLevel) {
Set<TabletId> currentMigrations, List<TabletMigration> migrationsOut, String partition,
Map<String,TableId> tablesToBalance) {
this.currentStatus = currentStatus;
this.currentMigrations = currentMigrations;
this.migrationsOut = migrationsOut;
this.thriftCurrentStatus = null;
this.thriftCurrentMigrations = null;
this.currentDataLevel = currentLevel;
this.partition = partition;
this.tablesToBalance = tablesToBalance;
}

private BalanceParamsImpl(SortedMap<TabletServerId,TServerStatus> currentStatus,
Set<TabletId> currentMigrations, List<TabletMigration> migrationsOut,
SortedMap<TServerInstance,TabletServerStatus> thriftCurrentStatus,
Set<KeyExtent> thriftCurrentMigrations, DataLevel currentLevel) {
Set<KeyExtent> thriftCurrentMigrations, String partition,
Map<String,TableId> tablesToBalance) {
this.currentStatus = currentStatus;
this.currentMigrations = currentMigrations;
this.migrationsOut = migrationsOut;
this.thriftCurrentStatus = thriftCurrentStatus;
this.thriftCurrentMigrations = thriftCurrentMigrations;
this.currentDataLevel = currentLevel;
this.partition = partition;
this.tablesToBalance = tablesToBalance;
}

@Override
Expand Down Expand Up @@ -107,7 +113,12 @@ public void addMigration(KeyExtent extent, TServerInstance oldServer, TServerIns
}

@Override
public String currentLevel() {
return currentDataLevel.name();
public String partitionName() {
return partition;
}

@Override
public Map<String,TableId> getTablesToBalance() {
return tablesToBalance;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ public interface BalancerEnvironment extends ServiceEnvironment {
* Many Accumulo plugins are given table IDs as this is what Accumulo uses internally to identify
* tables. This provides a mapping of table names to table IDs for the purposes of translating
* and/or enumerating the existing tables.
*
* <P>
* This returns all tables that exists in the system. Each request to balance should limit itself
* to {@link TabletBalancer.BalanceParameters#getTablesToBalance()} and not balance everything
* returned by this.
* </P>
*/
Map<String,TableId> getTableIdMap();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public abstract class GroupBalancer implements TabletBalancer {
protected BalancerEnvironment environment;
private final TableId tableId;

protected final Map<DataLevel,Long> lastRunTimes = new HashMap<>(DataLevel.values().length);
protected final Map<String,Long> lastRunTimes = new HashMap<>(DataLevel.values().length);

@Override
public void init(BalancerEnvironment balancerEnvironment) {
Expand Down Expand Up @@ -213,9 +213,8 @@ public long balance(BalanceParameters params) {
return 5000;
}

final DataLevel currentLevel = DataLevel.valueOf(params.currentLevel());

if (System.currentTimeMillis() - lastRunTimes.getOrDefault(currentLevel, 0L) < getWaitTime()) {
if (System.currentTimeMillis() - lastRunTimes.getOrDefault(params.partitionName(), 0L)
< getWaitTime()) {
return 5000;
}

Expand Down Expand Up @@ -279,7 +278,7 @@ public long balance(BalanceParameters params) {

populateMigrations(tservers.keySet(), params.migrationsOut(), moves);

lastRunTimes.put(currentLevel, System.currentTimeMillis());
lastRunTimes.put(params.partitionName(), System.currentTimeMillis());

return 5000;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import org.apache.accumulo.core.manager.balancer.BalanceParamsImpl;
import org.apache.accumulo.core.manager.balancer.TServerStatusImpl;
import org.apache.accumulo.core.manager.balancer.TableStatisticsImpl;
import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
import org.apache.accumulo.core.spi.balancer.data.TServerStatus;
import org.apache.accumulo.core.spi.balancer.data.TableStatistics;
import org.apache.accumulo.core.spi.balancer.data.TabletMigration;
Expand Down Expand Up @@ -182,7 +181,7 @@ static class HrtlbConf {
}

private static final Set<TabletId> EMPTY_MIGRATIONS = Collections.emptySet();
protected final Map<DataLevel,Long> lastOOBCheckTimes = new HashMap<>(DataLevel.values().length);
protected final Map<String,Long> lastOOBCheckTimes = new HashMap<>();
private Map<String,SortedMap<TabletServerId,TServerStatus>> pools = new HashMap<>();
private final Map<TabletId,TabletMigration> migrationsFromLastPass = new HashMap<>();
private final Map<TableId,Long> tableToTimeSinceNoMigrations = new HashMap<>();
Expand Down Expand Up @@ -380,7 +379,7 @@ public void getAssignments(AssignmentParameters params) {
public long balance(BalanceParameters params) {
long minBalanceTime = 20_000;
// Iterate over the tables and balance each of them
Map<String,TableId> tableIdMap = environment.getTableIdMap();
Map<String,TableId> tableIdMap = params.getTablesToBalance();
Map<TableId,String> tableIdToTableName = tableIdMap.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getValue, Map.Entry::getKey));
tableIdToTableName.keySet().forEach(this::checkTableConfig);
Expand All @@ -395,9 +394,9 @@ public long balance(BalanceParameters params) {

Map<String,SortedMap<TabletServerId,TServerStatus>> currentGrouped =
splitCurrentByRegex(params.currentStatus());
final DataLevel currentLevel = DataLevel.valueOf(params.currentLevel());

if ((now - this.lastOOBCheckTimes.getOrDefault(currentLevel, System.currentTimeMillis()))
if ((now
- this.lastOOBCheckTimes.getOrDefault(params.partitionName(), System.currentTimeMillis()))
> myConf.oobCheckMillis) {
try {
// Check to see if a tablet is assigned outside the bounds of the pool. If so, migrate it.
Expand Down Expand Up @@ -458,7 +457,7 @@ public long balance(BalanceParameters params) {
}
} finally {
// this could have taken a while...get a new time
this.lastOOBCheckTimes.put(currentLevel, System.currentTimeMillis());
this.lastOOBCheckTimes.put(params.partitionName(), System.currentTimeMillis());
}
}

Expand Down Expand Up @@ -511,8 +510,8 @@ public long balance(BalanceParameters params) {
continue;
}
ArrayList<TabletMigration> newMigrations = new ArrayList<>();
getBalancerForTable(tableId).balance(
new BalanceParamsImpl(currentView, migrations, newMigrations, DataLevel.of(tableId)));
getBalancerForTable(tableId).balance(new BalanceParamsImpl(currentView, migrations,
newMigrations, params.partitionName() + ":" + tableId, Map.of(tableName, tableId)));

if (newMigrations.isEmpty()) {
tableToTimeSinceNoMigrations.remove(tableId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public class SimpleLoadBalancer implements TabletBalancer {

public SimpleLoadBalancer() {}

// TODO drop this constructor and use new getTablesToBalance() method
public SimpleLoadBalancer(TableId table) {
tableToBalance = table;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.accumulo.core.data.TabletId;
import org.apache.accumulo.core.manager.balancer.AssignmentParamsImpl;
import org.apache.accumulo.core.manager.balancer.BalanceParamsImpl;
import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
import org.apache.accumulo.core.spi.balancer.data.TabletMigration;
import org.apache.accumulo.core.spi.balancer.data.TabletServerId;
import org.slf4j.Logger;
Expand Down Expand Up @@ -125,12 +124,13 @@ public void getAssignments(AssignmentParameters params) {
public long balance(BalanceParameters params) {
long minBalanceTime = 5_000;
// Iterate over the tables and balance each of them
final DataLevel currentDataLevel = DataLevel.valueOf(params.currentLevel());
for (TableId tableId : environment.getTableIdMap().values()) {
for (Entry<String,TableId> entry : params.getTablesToBalance().entrySet()) {
String tableName = entry.getKey();
TableId tableId = entry.getValue();
ArrayList<TabletMigration> newMigrations = new ArrayList<>();
long tableBalanceTime =
getBalancerForTable(tableId).balance(new BalanceParamsImpl(params.currentStatus(),
params.currentMigrations(), newMigrations, currentDataLevel));
long tableBalanceTime = getBalancerForTable(tableId)
.balance(new BalanceParamsImpl(params.currentStatus(), params.currentMigrations(),
newMigrations, params.partitionName() + ":" + tableId, Map.of(tableName, tableId)));
if (tableBalanceTime < minBalanceTime) {
minBalanceTime = tableBalanceTime;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.SortedMap;

import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.data.TabletId;
import org.apache.accumulo.core.spi.balancer.data.TServerStatus;
import org.apache.accumulo.core.spi.balancer.data.TabletMigration;
Expand Down Expand Up @@ -95,13 +96,24 @@ interface BalanceParameters {
List<TabletMigration> migrationsOut();

/**
* Return the DataLevel name for which the Manager is currently balancing. Balancers should
* return migrations for tables within the current DataLevel.
* Accumulo may partition tables in different ways and pass subsets of tables to the balancer
* via {@link #getTablesToBalance()}. Each partition is given a unique name that is always the
* same for a given partition. Balancer can use this to determine if they are being called for
* the same or a different partition if tracking state between balance calls.
*
* @return name of current balancing iteration data level
* @return name of current partition of tables to balance.
* @since 2.1.4
*/
String currentLevel();
String partitionName();

/**
* This is the set of tables the balancer should consider. Balancing any tables outside of this
* set will be ignored and result in an error in the logs.
*
* @return map of table names to table ids that.
* @since 2.1.4
*/
Map<String,TableId> getTablesToBalance();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import org.apache.accumulo.core.manager.balancer.BalanceParamsImpl;
import org.apache.accumulo.core.manager.balancer.TServerStatusImpl;
import org.apache.accumulo.core.manager.balancer.TabletServerIdImpl;
import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
import org.apache.accumulo.core.spi.balancer.data.TServerStatus;
import org.apache.accumulo.core.spi.balancer.data.TabletMigration;
import org.apache.accumulo.core.spi.balancer.data.TabletServerId;
Expand Down Expand Up @@ -124,7 +123,7 @@ public void balance(TabletBalancer balancer, int maxMigrations, TableId tid) {
}

balancer
.balance(new BalanceParamsImpl(current, migrations, migrationsOut, DataLevel.of(tid)));
.balance(new BalanceParamsImpl(current, migrations, migrationsOut, "USER", Map.of()));

assertTrue(migrationsOut.size() <= (maxMigrations + 5),
"Max Migration exceeded " + maxMigrations + " " + migrationsOut.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import org.apache.accumulo.core.manager.balancer.AssignmentParamsImpl;
import org.apache.accumulo.core.manager.balancer.BalanceParamsImpl;
import org.apache.accumulo.core.manager.balancer.TabletStatisticsImpl;
import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
import org.apache.accumulo.core.spi.balancer.data.TabletMigration;
import org.apache.accumulo.core.spi.balancer.data.TabletServerId;
import org.apache.accumulo.core.spi.balancer.data.TabletStatistics;
Expand Down Expand Up @@ -108,7 +107,7 @@ public void testConfigurationChanges() {
// getOnlineTabletsForTable
UtilWaitThread.sleep(3000);
this.balance(new BalanceParamsImpl(Collections.unmodifiableSortedMap(allTabletServers),
migrations, migrationsOut, DataLevel.USER));
migrations, migrationsOut, "USER", tables));
assertEquals(0, migrationsOut.size());
// Change property, simulate call by TableConfWatcher

Expand All @@ -118,9 +117,9 @@ public void testConfigurationChanges() {
// in the HostRegexTableLoadBalancer. For this test we want
// to get into the out of bounds checking code, so we need to
// populate the map with an older time value
this.lastOOBCheckTimes.put(DataLevel.USER, System.currentTimeMillis() / 2);
this.lastOOBCheckTimes.put("USER", System.currentTimeMillis() / 2);
this.balance(new BalanceParamsImpl(Collections.unmodifiableSortedMap(allTabletServers),
migrations, migrationsOut, DataLevel.USER));
migrations, migrationsOut, "USER", tables));
assertEquals(5, migrationsOut.size());
for (TabletMigration migration : migrationsOut) {
assertTrue(migration.getNewTabletServer().getHost().startsWith("192.168.0.1")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import org.apache.accumulo.core.manager.balancer.BalanceParamsImpl;
import org.apache.accumulo.core.manager.balancer.TabletServerIdImpl;
import org.apache.accumulo.core.manager.balancer.TabletStatisticsImpl;
import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
import org.apache.accumulo.core.spi.balancer.data.TServerStatus;
import org.apache.accumulo.core.spi.balancer.data.TabletMigration;
import org.apache.accumulo.core.spi.balancer.data.TabletServerId;
Expand Down Expand Up @@ -98,7 +97,7 @@ public void testBalance() {
List<TabletMigration> migrationsOut = new ArrayList<>();
long wait =
this.balance(new BalanceParamsImpl(Collections.unmodifiableSortedMap(createCurrent(15)),
migrations, migrationsOut, DataLevel.USER));
migrations, migrationsOut, "USER", environment.getTableIdMap()));
assertEquals(20000, wait);
// should balance four tablets in one of the tables before reaching max
assertEquals(4, migrationsOut.size());
Expand All @@ -109,7 +108,7 @@ public void testBalance() {
}
migrationsOut.clear();
wait = this.balance(new BalanceParamsImpl(Collections.unmodifiableSortedMap(createCurrent(15)),
migrations, migrationsOut, DataLevel.USER));
migrations, migrationsOut, "USER", environment.getTableIdMap()));
assertEquals(20000, wait);
// should balance four tablets in one of the other tables before reaching max
assertEquals(4, migrationsOut.size());
Expand All @@ -120,7 +119,7 @@ public void testBalance() {
}
migrationsOut.clear();
wait = this.balance(new BalanceParamsImpl(Collections.unmodifiableSortedMap(createCurrent(15)),
migrations, migrationsOut, DataLevel.USER));
migrations, migrationsOut, "USER", environment.getTableIdMap()));
assertEquals(20000, wait);
// should balance four tablets in one of the other tables before reaching max
assertEquals(4, migrationsOut.size());
Expand All @@ -131,7 +130,7 @@ public void testBalance() {
}
migrationsOut.clear();
wait = this.balance(new BalanceParamsImpl(Collections.unmodifiableSortedMap(createCurrent(15)),
migrations, migrationsOut, DataLevel.USER));
migrations, migrationsOut, "USER", environment.getTableIdMap()));
assertEquals(20000, wait);
// no more balancing to do
assertEquals(0, migrationsOut.size());
Expand All @@ -148,7 +147,7 @@ public void testBalanceWithTooManyOutstandingMigrations() {
migrations.addAll(tableTablets.get(BAR.getTableName()));
long wait =
this.balance(new BalanceParamsImpl(Collections.unmodifiableSortedMap(createCurrent(15)),
migrations, migrationsOut, DataLevel.USER));
migrations, migrationsOut, "USER", environment.getTableIdMap()));
assertEquals(20000, wait);
// no migrations should have occurred as 10 is the maxOutstandingMigrations
assertEquals(0, migrationsOut.size());
Expand Down Expand Up @@ -491,12 +490,12 @@ public void testOutOfBoundsTablets() {
// in the HostRegexTableLoadBalancer. For this test we want
// to get into the out of bounds checking code, so we need to
// populate the map with an older time value
this.lastOOBCheckTimes.put(DataLevel.USER, System.currentTimeMillis() / 2);
this.lastOOBCheckTimes.put("USER", System.currentTimeMillis() / 2);
init(DEFAULT_TABLE_PROPERTIES);
Set<TabletId> migrations = new HashSet<>();
List<TabletMigration> migrationsOut = new ArrayList<>();
this.balance(
new BalanceParamsImpl(createCurrent(15), migrations, migrationsOut, DataLevel.USER));
this.balance(new BalanceParamsImpl(createCurrent(15), migrations, migrationsOut, "USER",
environment.getTableIdMap()));
assertEquals(2, migrationsOut.size());
}

Expand Down
Loading
Loading