updates delete table to use conditional muations#3929
Conversation
Updates the delete table fate operation to use conditional mutations. Also introduces a new mechanism in Ample for processing conditional mutations that does not buffer everything in memory. This avoid buffering large amounts of tablet metadata in memory when deleting a large table. The IT that creates 1 million splits in a table and deletes it passes with these changes.
There was a problem hiding this comment.
@keith-turner - I tested this out quite a bit and looks pretty good to me over all. I made a couple minor comments and also pointed out some possible enhancements that could be a follow on (such as multiple background threads/thread pool and filter on the tablet iterator)
I didn't see any major issues as long as the Async mutator is only ever intended to be used from the same thread. The implementation is not thread safe but I can't think of a scenario where want or need try and share the mutator across threads as each thread can just create a mutator so I think it looks good to me.
I also tested out error handling and the errors bubble back up to the fate executor so I think error handling is ok as well.
| var conditionalMutator = | ||
| manager.getContext().getAmple().conditionallyMutateTablets(resultsConsumer)) { | ||
| tabletsSeen++; | ||
| for (var tabletMeta : tablets) { |
There was a problem hiding this comment.
This will scan over all tablets each time it's executed but I was thinking maybe as a future enhancement it might be nice to have a filter on what is returned in case we are waiting on an existing op to finish. It may not make it any faster since the iterator still needs to skip the rows but it could make the code a bit cleaner.
I'm thinking we could enhance the builder and add a new filter method that could be applied to the row iterator (just like we already filter just like we can filter by extents here). Adding a generic filter method could be nice for future use cases to filter the tablets returned by some arbitrary condition.
For example, in this case the filter could be to return tablets that don't have the delete op id set already:
var tablets = manager.getContext().getAmple().readTablets().forTable(tableId)
// new filter method added to TableRangeOptions
.filter(tm -> !opid.equals(tm.getOperationId())
.fetch(OPID, PREV_ROW, LOCATION).checkConsistency().build();Then you'd only see tablets that need to be set.
There was a problem hiding this comment.
This can sorta be done on the client side currently using streams. Can do the following with the current code, however the tablets object can not be used with the foreach loop later in the code because its a stream.
var tablets = manager.getContext().getAmple().readTablets().forTable(tableId)
.fetch(OPID, PREV_ROW, LOCATION).checkConsistency().build().stream().filter(tm->!opid.equals(tm.getOperationId()));One thing that may be nice is adding some sort of easy to use filters to ample that can push things down to server sdie iterators. There is a uses case for this in the GC, I'll open an issue about exploring this further.
There was a problem hiding this comment.
That's a good point that we can just use the stream api, so that works so can just use that since it already exists. The sever side iterator would be a nice enhancement.
| this.resultsConsumer = Objects.requireNonNull(resultsConsumer); | ||
| this.bufferingMutator = new ConditionalTabletsMutatorImpl(context); | ||
| this.context = context; | ||
| this.executor = Executors.newSingleThreadExecutor(); |
There was a problem hiding this comment.
We could use a thread pool and reuse executors vs having to create a new one each time. We could use a cached pool or a bounded pool if we wanted to limit the number of concurrent fate operations. (I'm not sure if the number of operations is already bound elsewhere).
Something else that could be done if using a pool would be to allow executing more than one background thread at once which could speed up processing so we are not blocking and waiting for the previous background future to finish. The downside would be increased complexity by needing to track multiple submitted background tasks instead of 1 and also would mean ordering would be lost as you don't know which thread would finish first but that probably wouldn't matter in a lot of cases. (Ie in this case you don't really care which updates finish first you just want the op id set on all tablets)
There was a problem hiding this comment.
The background task uses a conditional writer which is itself multithreaded and by defaults uses three threads. This made me realize that we made need to explore making that configurable in ample so that in the case where metadata tablets span multiple tservers that the conditional writer could write to more in parallel.
There was a problem hiding this comment.
We could use a cached pool or a bounded pool if we wanted to limit the number of concurrent fate operations. (I'm not sure if the number of operations is already bound elsewhere).
There is a thread pool for FATE operations and its size is configurable.
There was a problem hiding this comment.
This little background thread would be a good use case for the new light weight threads introduced in JDK 21 as all its really doing is waiting on the results from the conditional writer.
|
|
||
| @Override | ||
| public void close() { | ||
| if (backgroundProcessing != null) { |
There was a problem hiding this comment.
I was trying to go through and think if there's any case where we could end up calling backgroundProcessing.get().forEach(resultsConsumer); twice (once in mutateTablet and then on close) but I don't think so as long as this object is only ever used from one thread which seems like that is the intention.
There was a problem hiding this comment.
This should definitely only be called by a single thread. I added a javadoc comment in 08608df. I also experimented with the @NotThreadSafe annotation for the class, however in experiments spotbugs did not trigger when the class was misused. I also could not find any spotbugs documentation for the annotation. So I ended up not adding that annotation to the class.
| if (backgroundProcessing != null) { | ||
| // a previous batch of mutations was submitted for processing so wait on it. | ||
| try { | ||
| backgroundProcessing.get().forEach(resultsConsumer); |
There was a problem hiding this comment.
I'm wondering if we should add an optional timeout configuration for the Aync mutator to pass when waiting on the future? I'm not sure how you'd pick a good timeout value as operations could be long running so this might be the best option.
There was a problem hiding this comment.
For now I think it best to just wait. This comment made me realize that debugging the situation where it gets stuck could be tricky. Also realized I was not using Accumulo's internal thread utilities. So in 08608df I did the following.
- Use Accumulo's internal Threads class to create the thread
- Put the thread id of the creating thread in the background threads name. In the case where it does get stuck, this allows linking of the foreground and background thread in a jstack.
Using the conditional writer changes from apache#3929 this commit updates the merge code to support merging more tablets than will fit in memory. Also adds merge and clone operations to the SplitMillionIT and merges a table with one million tablets in the test. Without the changes in this commit that test would cause the manager to die with an out of memory error.
Using the conditional writer changes from #3929 this commit updates the merge code to support merging more tablets than will fit in memory. Also adds merge and clone operations to the SplitMillionIT and merges a table with one million tablets in the test. Without the changes in this commit that test would cause the manager to die with an out of memory error. Added a check to ensure the number of files in the merge range does not exceed a configurable maximum. With these changes its possible to merge lots (like millions) of empty tablets. However if merging lots of tablets with files, then the new check will trigger to avoid creating a tablet with too many files. Added test for merging tablets with too many files.
Updates the delete table fate operation to use conditional mutations. Also introduces a new mechanism in Ample for processing conditional mutations that does not buffer everything in memory. This avoid buffering large amounts of tablet metadata in memory when deleting a large table. The IT that creates 1 million splits in a table and deletes it passes with these changes.