Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.function.BiConsumer;
import java.util.function.Predicate;
import java.util.stream.Stream;

Expand Down Expand Up @@ -209,14 +210,37 @@ default TabletsMutator mutateTablets() {
}

/**
* An entry point for updating tablets metadata using a conditional writer.
* An entry point for updating tablets metadata using a conditional writer. The returned mutator
* will buffer everything in memory until {@link ConditionalTabletsMutator#process()} is called.
* If buffering everything in memory is undesirable, then consider using
* {@link #conditionallyMutateTablets(BiConsumer)}
*
* @see ConditionalTabletMutator#submit(RejectionHandler)
*/
default ConditionalTabletsMutator conditionallyMutateTablets() {
throw new UnsupportedOperationException();
}

/**
* An entry point for updating tablets metadata using a conditional writer asynchronously. This
* will process conditional mutations in the background as they are added. The benefit of this
* method over {@link #conditionallyMutateTablets()} is that it can avoid buffering everything in
* memory. Using this method may also be faster as it allows tablet metadata scans and conditional
* updates of tablets to run concurrently.
*
* @param resultsConsumer as conditional mutations are processed in the background their result is
* passed to this consumer. This consumer should be thread safe as it may be called from a
* different thread.
* @return A conditional tablet mutator that will asynchronously report results. Closing this
* object will force everything to be processed and reported. The returned object is not
* thread safe and is only intended to be used by a single thread.
* @see ConditionalTabletMutator#submit(RejectionHandler)
*/
default AsyncConditionalTabletsMutator
conditionallyMutateTablets(BiConsumer<KeyExtent,ConditionalResult> resultsConsumer) {
throw new UnsupportedOperationException();
}

default void putGcCandidates(TableId tableId, Collection<StoredTabletFile> candidates) {
throw new UnsupportedOperationException();
}
Expand Down Expand Up @@ -264,7 +288,7 @@ public interface TabletsMutator extends AutoCloseable {
void close();
}

public interface ConditionalResult {
interface ConditionalResult {

/**
* This enum was created instead of using {@link ConditionalWriter.Status} because Ample has
Expand Down Expand Up @@ -292,24 +316,29 @@ enum Status {
TabletMetadata readMetadata();
}

public interface ConditionalTabletsMutator extends AutoCloseable {

interface AsyncConditionalTabletsMutator extends AutoCloseable {
/**
* @return A fluent interface to conditional mutating a tablet. Ensure you call
* {@link ConditionalTabletMutator#submit(RejectionHandler)} when finished.
*/
OperationRequirements mutateTablet(KeyExtent extent);

/**
* Closing ensures that all mutations are processed and their results are reported.
*/
@Override
void close();
Comment thread
cshannon marked this conversation as resolved.
}

interface ConditionalTabletsMutator extends AsyncConditionalTabletsMutator {

/**
* After creating one or more conditional mutations using {@link #mutateTablet(KeyExtent)}, call
* this method to process them using a {@link ConditionalWriter}
*
* @return The result from the {@link ConditionalWriter} of processing each tablet.
*/
Map<KeyExtent,ConditionalResult> process();

@Override
void close();
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.server.metadata;

import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.function.BiConsumer;

import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.metadata.schema.Ample;
import org.apache.accumulo.core.util.threads.Threads;
import org.apache.accumulo.server.ServerContext;

public class AsyncConditionalTabletsMutatorImpl implements Ample.AsyncConditionalTabletsMutator {
private final BiConsumer<KeyExtent,Ample.ConditionalResult> resultsConsumer;
private final ExecutorService executor;
private Future<Map<KeyExtent,Ample.ConditionalResult>> backgroundProcessing = null;
private ConditionalTabletsMutatorImpl bufferingMutator;
private final ServerContext context;
private long mutatedTablets = 0;
public static final int BATCH_SIZE = 1000;

AsyncConditionalTabletsMutatorImpl(ServerContext context,
BiConsumer<KeyExtent,Ample.ConditionalResult> resultsConsumer) {
this.resultsConsumer = Objects.requireNonNull(resultsConsumer);
this.bufferingMutator = new ConditionalTabletsMutatorImpl(context);
this.context = context;
var creatorId = Thread.currentThread().getId();
this.executor = Executors.newSingleThreadExecutor(runnable -> Threads.createThread(
"Async conditional tablets mutator background thread, created by : #" + creatorId,
runnable));

}

@Override
public Ample.OperationRequirements mutateTablet(KeyExtent extent) {
if (mutatedTablets > BATCH_SIZE) {
if (backgroundProcessing != null) {
// a previous batch of mutations was submitted for processing so wait on it.
try {
backgroundProcessing.get().forEach(resultsConsumer);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if we should add an optional timeout configuration for the Aync mutator to pass when waiting on the future? I'm not sure how you'd pick a good timeout value as operations could be long running so this might be the best option.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now I think it best to just wait. This comment made me realize that debugging the situation where it gets stuck could be tricky. Also realized I was not using Accumulo's internal thread utilities. So in 08608df I did the following.

  • Use Accumulo's internal Threads class to create the thread
  • Put the thread id of the creating thread in the background threads name. In the case where it does get stuck, this allows linking of the foreground and background thread in a jstack.

} catch (InterruptedException | ExecutionException e) {
throw new IllegalStateException(e);
}
}

// Spin up processing of the mutations submitted so far in a background thread. Must copy the
// reference for the background thread because a new one is about to be created.
var bufferingMutatorRef = bufferingMutator;
backgroundProcessing = executor.submit(() -> {
var result = bufferingMutatorRef.process();
bufferingMutatorRef.close();
return result;
});

bufferingMutator = new ConditionalTabletsMutatorImpl(context);
mutatedTablets = 0;
}
mutatedTablets++;
return bufferingMutator.mutateTablet(extent);
}

@Override
public void close() {
if (backgroundProcessing != null) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was trying to go through and think if there's any case where we could end up calling backgroundProcessing.get().forEach(resultsConsumer); twice (once in mutateTablet and then on close) but I don't think so as long as this object is only ever used from one thread which seems like that is the intention.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should definitely only be called by a single thread. I added a javadoc comment in 08608df. I also experimented with the @NotThreadSafe annotation for the class, however in experiments spotbugs did not trigger when the class was misused. I also could not find any spotbugs documentation for the annotation. So I ended up not adding that annotation to the class.

// a previous batch of mutations was submitted for processing so wait on it.
try {
backgroundProcessing.get().forEach(resultsConsumer);
} catch (InterruptedException | ExecutionException e) {
throw new IllegalStateException(e);
}
}
// process anything not processed so far
bufferingMutator.process().forEach(resultsConsumer);
bufferingMutator.close();
executor.shutdownNow();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -93,6 +94,12 @@ public ConditionalTabletsMutator conditionallyMutateTablets() {
return new ConditionalTabletsMutatorImpl(context);
}

@Override
public AsyncConditionalTabletsMutator
conditionallyMutateTablets(BiConsumer<KeyExtent,ConditionalResult> resultsConsumer) {
return new AsyncConditionalTabletsMutatorImpl(context, resultsConsumer);
}

private void mutateRootGcCandidates(Consumer<RootGcCandidates> mutator) {
String zpath = context.getZooKeeperRoot() + ZROOT_TABLET_GC_CANDIDATES;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,10 @@
*/
package org.apache.accumulo.manager.tableOps.delete;

import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SUSPEND;

import java.io.IOException;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.Map.Entry;
import java.util.Set;

import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.BatchScanner;
Expand All @@ -40,11 +35,8 @@
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.core.iterators.user.GrepIterator;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.TServerInstance;
import org.apache.accumulo.core.metadata.TabletState;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.manager.Manager;
import org.apache.accumulo.manager.tableOps.ManagerRepo;
Expand Down Expand Up @@ -87,32 +79,11 @@ public CleanUp(TableId tableId, NamespaceId namespaceId) {

@Override
public long isReady(long tid, Manager manager) throws Exception {
// ELASTICITY_TODO investigate this, what is it for and is it still needed?
if (!manager.hasCycled(creationTime)) {
return 50;
}

boolean done = true;

try (var tablets = manager.getContext().getAmple().readTablets().forTable(tableId)
.fetch(LOCATION, PREV_ROW, SUSPEND).checkConsistency().build()) {
Set<TServerInstance> liveTServers = manager.onlineTabletServers();
for (TabletMetadata tm : tablets) {
TabletState state = TabletState.compute(tm, liveTServers);
if (!state.equals(TabletState.UNASSIGNED)) {
// This code will even wait on tablets that are assigned to dead tablets servers. This is
// intentional because the manager may make metadata writes for these tablets. See #587
log.debug("Still waiting for table({}) to be deleted; Target tablet state: UNASSIGNED, "
+ "Current tablet state: {}, locationState: {}", tableId, state, tm);
done = false;
break;
}
}
}

if (!done) {
return 50;
}

return 0;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public long isReady(long tid, Manager env) throws Exception {
public Repo<Manager> call(long tid, Manager env) {
env.getTableManager().transitionTableState(tableId, TableState.DELETING);
env.getEventCoordinator().event(tableId, "deleting table %s ", tableId);
return new CleanUp(tableId, namespaceId);
return new ReserveTablets(tableId, namespaceId);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public long isReady(long tid, Manager env) throws Exception {

private void preventFutureCompactions(Manager environment)
throws KeeperException, InterruptedException {
// ELASTICITY_TODO investigate this. Is still needed? Is it still working as expected?
String deleteMarkerPath = createDeleteMarkerPath(environment.getInstanceID(), tableId);
ZooReaderWriter zoo = environment.getContext().getZooReaderWriter();
zoo.putPersistentData(deleteMarkerPath, new byte[] {}, NodeExistsPolicy.SKIP);
Expand Down
Loading