modifies bulk import to use conditional mutations #3350
Conversation
|
I wonder if the tablet refresh mechanism should be pulled out into a separate PR as a solution for #3310. |
|
I'm curious if you need a refresh column in the metadata. You could send the RPC to the hosting tablet server as a FaTE transaction and expect a response. If the call times out or fails, then you could retry as part of the FaTE framework. |
I think that could be done and it may be more efficient for the normal case when everything is stable and working well. The refresh column handles the following cases that would not handled by an RPC alone.
Using only an RPC, I think we could do the following.
Barring all the edge cases, this may be faster. The fate thread would be tied up for however long this takes. If all the fate threads are tied up, more can be added as long as memory supports it. There are other fate operations that hold a bunch of stuff in memory and/or tie up a fate thread for an indefinite time period while waiting for a condition to become true. |
|
It appears that a check for tablet metadata being out of date was already added (maybe in 2.1). I wonder if there is a way to consolidate some of these things on the tserver side. Given that the TabletServer is going to have to periodically check the TabletMetadata to perform a refresh, I wonder if it would just be easier to have a scan iterator on the tablet metadata that computes a a single hash value for the tablet metadata in the metadata table. The hash could be stored in memory in the TabletMetadata object in the TabletServer, and if it's different, then refresh. |
We may be able to remove that because it based on different assumptions. It assumes the tablet reads the metadata and then only writes to it and is doing periodic validation. We are moving away from what is in memory on the tserver being the authority to whats in the metadata table being the authority.
That could avoid some uneeded refreshes, the tablet would need to track the last few hashes. The following shows a situation where tacking the last few hashes is useful.
We could also have a file update counter in the tablet. Every time the set of files is updated in the tablet this counter is incremented by one. This increment can be safely done with conditional mutations, its like an atomic long in java. Refresh request just specify the counter they observed asking the tablet to update if its current counter is less than the one from the refresh RPC. The counter may be simpler to implement than the hash. |
|
I am going to try only having a refresh RPC and no refresh column to see what it looks like and see if there are any surprises. |
|
Experimented with changing just the RefreshTablets Repo for the FATE operation to see what it would look like if we dropped the refresh column and only sent RPCs. public class RefreshTablets extends ManagerRepo {
public long isReady(long tid, Manager manager) throws Exception {
// nothing to do here, always ready to run
return 0;
}
@Override
public Repo<Manager> call(long tid, Manager manager) throws Exception {
Map<Location,List<KeyExtent>> refreshesNeeded;
try (var tablets =
manager.getContext().getAmple().readTablets().forTable(bulkInfo.tableId).checkConsistency()
.fetch(ColumnType.LOADED, ColumnType.LOCATION, ColumnType.PREV_ROW).build()) {
// Find all tablets that have a location and load markers for this bulk load operation and
// therefore need to refresh their metadata. There may be some tablets in the map that were
// hosted after the bulk files were set and that is ok, it just results in an unneeded refresh
// request. There may also be tablets that had a location when the files were set but do not
// have a location now, that is ok the next time that tablet loads somewhere it will see the
// files.
refreshesNeeded = tablets.stream()
.filter(tabletMetadata -> tabletMetadata.getLocation() != null
&& tabletMetadata.getLoaded().containsValue(tid))
.collect(groupingBy(TabletMetadata::getLocation,
mapping(TabletMetadata::getExtent, toList())));
}
HashSet<Location> completed = new HashSet<>();
while (!refreshesNeeded.isEmpty()) {
for (Map.Entry<Location,List<KeyExtent>> entry : refreshesNeeded.entrySet()) {
try {
// TODO use a thread pool to send RPC request
// Ask tablet server to reload the metadata for these tablets. If the tabletserver is no
// longer serving a tablet that is ok as long is refreshes the ones that it is
// still serving. If a tablet was unloaded after the map was created above when it
// reloads elsewhere it will see the bulk files. The tserver will need to ensure tablets
// that had future location and are in the process of loading are handled correctly or
// the refresh request could be missed due to race conditions in the tablet server.
sendSyncRefreshRequest(entry.getKey(), entry.getValue());
// refresh request returned successfully, so mark the location as completed
completed.add(entry.getKey());
} catch (Exception e) { // TODO what exceptions are ok to catch and retry on? this
// exception is too broad
if (tserverIsDead(entry.getKey())) {
// if the tserver is no longer alive then when tablets load elsewhere they will see
// the files, so mark it completed
completed.add(entry.getKey());
} else {
// TODO log something about waiting on this tserver for bulk import refresh request
}
}
}
// remove all tablet server that were successfully refreshed
refreshesNeeded.keySet().removeAll(completed);
if (!refreshesNeeded.isEmpty()) {
// TODO sleep and log using a retry object
}
}
return new CleanUpBulkImport(bulkInfo);
}
}Learned a few things from this. Will probably need a thread pool and will need to handle tservers that die after getting the information from the metadata table. I also realized there could be some tricky issues with future locations with this approach, but those should be ok also long as the tserver handles concurrent tablet loads and refresh request correctly. I can't think of any cases with this approach where a tablet that needs to refresh does not. Thinking of switching to this approach. It has pluses and minuses compared to the refresh column. Thinking its simpler to start with, likely faster, and if any of the problems the refresh column addresses show up then we can reconsider it later. |
|
I think this approach coupled with a periodic check in the tserver might cover all bases. |
The periodic validation the tserver does to check memory vs metadata can be changed to just refresh metadata (and drop the validation against whats in memory aspect of what it does). |
Modifies the core of bulk import to use conditional mutations for metadata table updates. Bulk import no longer request that the tserver update the metadata table on behalf of the FATE transaction. If a tablet is hosted the bulk import fate transaction sets a refresh column in the metadata table and then ask the tserver to refresh its metadata. When the tablet server does update its metadata it will delete the refresh column. This modification is incomplete. There is code that can be removed after this changes. Also the refresh changes in the tablet servers are a hack and have race conditions. Leaving the tserver changes half done was intentional, that will be properly done in its own larger change to the tserver. Issues will be opened for follow in changes. The most important updates in this commit are in server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/* along with a documention update in MetadataSchema.TabletsSection.RefreshIdColumnFamily core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java this give a good overview of the new refresh column Fixes apache#3337
cf154e4 to
9318029
Compare
| * @param tableId Table ID for transaction removals | ||
| * @param tid Transaction ID to remove | ||
| */ | ||
| default void removeBulkLoadEntries(TableId tableId, long tid) { |
There was a problem hiding this comment.
I merged #3336 into 2.1, main, elasticity and then this branch. Lots of conflicts to resolve along the way.
Note : this PR builds on changes from #3317 which is not merged and that is why this PR is a draft. After #3317 is merged this will be rebased and taken out of draft.
Modifies the core of bulk import to use conditional mutations for
metadata table updates. Bulk import no longer request that the tserver
update the metadata table on behalf of the FATE transaction. If a tablet
is hosted the bulk import fate transaction sets a refresh column in the
metadata table and then ask the tserver to refresh its metadata. When
the tablet server does update its metadata it will delete the refresh
column.
This modification is incomplete. There is code that can be removed after
this changes. Also the refresh changes in the tablet servers are a hack
and have race conditions. Leaving the tserver changes half done was
intentional, that will be properly done in its own larger change to the
tserver. Issues will be opened for follow in changes.
The most important updates in this commit are in
server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/*
along with a documention update in MetadataSchema.TabletsSection.RefreshIdColumnFamily
core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
this give a good overview of the new refresh column
Fixes #3337