Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -937,6 +937,13 @@ public enum Property {
+ " from having more RFiles than can be opened. Setting this property low may"
+ " throttle ingest and increase query performance.",
"1.4.0"),
TABLE_MERGE_FILE_MAX("table.merge.file.max", "10000", PropertyType.COUNT,
"The maximum number of files that a merge operation will process. Before "
+ "merging a sum of the number of files in the merge range is computed and if it "
+ "exceeds this configuration then the merge will error and fail. For example if "
+ "there are 100 tablets each having 10 files in the merge range, then the sum would "
+ "be 1000 and the merge will only proceed if this property is greater than 1000.",
"4.0.0"),
TABLE_FILE_SUMMARY_MAX_SIZE("table.file.summary.maxSize", "256k", PropertyType.BYTES,
"The maximum size summary that will be stored. The number of RFiles that"
+ " had summary data exceeding this threshold is reported by"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.manager.tableOps.merge;

import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES;

import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.fate.FateTxId;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.manager.Manager;
import org.apache.accumulo.manager.tableOps.ManagerRepo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CountFiles extends ManagerRepo {
private static final Logger log = LoggerFactory.getLogger(CountFiles.class);
private static final long serialVersionUID = 1L;
private final MergeInfo data;

public CountFiles(MergeInfo mergeInfo) {
this.data = mergeInfo;
}

@Override
public Repo<Manager> call(long tid, Manager env) throws Exception {

var range = data.getReserveExtent();

long totalFiles = 0;

try (var tablets = env.getContext().getAmple().readTablets().forTable(data.tableId)
.overlapping(range.prevEndRow(), range.endRow()).fetch(FILES).checkConsistency().build()) {

switch (data.op) {
case MERGE:
for (var tabletMeta : tablets) {
totalFiles += tabletMeta.getFiles().size();
}
break;
case DELETE:
for (var tabletMeta : tablets) {
// Files in tablets that are completely contained within the merge range will be
// deleted, so do not count these files .
if (!data.getOriginalExtent().contains(tabletMeta.getExtent())) {
totalFiles += tabletMeta.getFiles().size();
}
}
break;
default:
throw new IllegalStateException("Unknown op " + data.op);
}
}

long maxFiles = env.getContext().getTableConfiguration(data.getOriginalExtent().tableId())
.getCount(Property.TABLE_MERGE_FILE_MAX);

log.debug("{} found {} files in the merge range, maxFiles is {}", FateTxId.formatTid(tid),
totalFiles, maxFiles);

if (totalFiles >= maxFiles) {
return new UnreserveAndError(data, totalFiles, maxFiles);
} else {
if (data.op == MergeInfo.Operation.MERGE) {
return new MergeTablets(data);
} else {
return new DeleteRows(data);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.manager.tableOps.merge;

import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;

import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.fate.FateTxId;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.core.metadata.schema.Ample;
import org.apache.accumulo.core.metadata.schema.TabletOperationId;
import org.apache.accumulo.core.metadata.schema.TabletOperationType;
import org.apache.accumulo.core.util.TextUtil;
import org.apache.accumulo.manager.Manager;
import org.apache.accumulo.manager.tableOps.ManagerRepo;
import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Preconditions;

/**
* Delete tablets that were merged into another tablet.
*/
public class DeleteTablets extends ManagerRepo {

private static final long serialVersionUID = 1L;

private final MergeInfo data;

private final byte[] lastTabletEndRow;

private static final Logger log = LoggerFactory.getLogger(DeleteTablets.class);

DeleteTablets(MergeInfo mergeInfo, Text lastTabletEndRow) {
this.data = mergeInfo;
this.lastTabletEndRow = lastTabletEndRow == null ? null : TextUtil.getBytes(lastTabletEndRow);
}

@Override
public Repo<Manager> call(long tid, Manager manager) throws Exception {

var fateStr = FateTxId.formatTid(tid);
KeyExtent range = data.getMergeExtent();
log.debug("{} Deleting tablets for {}", fateStr, range);
var opid = TabletOperationId.from(TabletOperationType.MERGING, tid);

AtomicLong acceptedCount = new AtomicLong();
AtomicLong rejectedCount = new AtomicLong();
// delete tablets
BiConsumer<KeyExtent,Ample.ConditionalResult> resultConsumer = (extent, result) -> {
if (result.getStatus() == Ample.ConditionalResult.Status.ACCEPTED) {
acceptedCount.incrementAndGet();
} else {
log.error("{} failed to update {}", fateStr, extent);
rejectedCount.incrementAndGet();
}
};

long submitted = 0;

try (
var tabletsMetadata =
manager.getContext().getAmple().readTablets().forTable(range.tableId())
.overlapping(range.prevEndRow(), range.endRow()).saveKeyValues().build();
var tabletsMutator =
manager.getContext().getAmple().conditionallyMutateTablets(resultConsumer)) {

var lastEndRow = lastTabletEndRow == null ? null : new Text(lastTabletEndRow);

for (var tabletMeta : tabletsMetadata) {
MergeTablets.validateTablet(tabletMeta, fateStr, opid, data.tableId);

// do not delete the last tablet
if (Objects.equals(tabletMeta.getExtent().endRow(), lastEndRow)) {
break;
}

var tabletMutator = tabletsMutator.mutateTablet(tabletMeta.getExtent())
.requireOperation(opid).requireAbsentLocation();

tabletMeta.getKeyValues().keySet().forEach(key -> {
log.trace("{} deleting {}", fateStr, key);
});

tabletMutator.deleteAll(tabletMeta.getKeyValues().keySet());
// if the tablet no longer exists, then it was successful
tabletMutator.submit(Ample.RejectionHandler.acceptAbsentTablet());
submitted++;
}
}

Preconditions.checkState(acceptedCount.get() == submitted && rejectedCount.get() == 0,
"Failed to delete tablets accepted:%s != %s rejected:%s", acceptedCount.get(), submitted,
rejectedCount.get());

log.debug("{} deleted {} tablets", fateStr, submitted);

return new FinishTableRangeOp(data);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.OPID;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW;

import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;

import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.fate.FateTxId;
import org.apache.accumulo.core.fate.Repo;
Expand Down Expand Up @@ -49,45 +52,60 @@ public FinishTableRangeOp(MergeInfo data) {

@Override
public Repo<Manager> call(long tid, Manager manager) throws Exception {
removeOperationIds(log, data, tid, manager);

Utils.unreserveTable(manager, data.tableId, tid, true);
Utils.unreserveNamespace(manager, data.namespaceId, tid, false);
return null;
}

static void removeOperationIds(Logger log, MergeInfo data, long tid, Manager manager) {
KeyExtent range = data.getReserveExtent();
var opid = TabletOperationId.from(TabletOperationType.MERGING, tid);
log.debug("{} unreserving tablet in range {}", FateTxId.formatTid(tid), range);
var fateStr = FateTxId.formatTid(tid);

AtomicLong acceptedCount = new AtomicLong();
AtomicLong rejectedCount = new AtomicLong();
// delete tablets
BiConsumer<KeyExtent,Ample.ConditionalResult> resultConsumer = (extent, result) -> {
if (result.getStatus() == Ample.ConditionalResult.Status.ACCEPTED) {
acceptedCount.incrementAndGet();
} else {
log.error("{} failed to update {}", fateStr, extent);
rejectedCount.incrementAndGet();
}
};

int submitted = 0;
int count = 0;

try (var tablets = manager.getContext().getAmple().readTablets().forTable(data.tableId)
.overlapping(range.prevEndRow(), range.endRow()).fetch(PREV_ROW, LOCATION, OPID).build();
var tabletsMutator = manager.getContext().getAmple().conditionallyMutateTablets();) {
int opsDeleted = 0;
int count = 0;
var tabletsMutator =
manager.getContext().getAmple().conditionallyMutateTablets(resultConsumer)) {

for (var tabletMeta : tablets) {
if (opid.equals(tabletMeta.getOperationId())) {
tabletsMutator.mutateTablet(tabletMeta.getExtent()).requireOperation(opid)
.deleteOperation().submit(tm -> !opid.equals(tm.getOperationId()));
opsDeleted++;
submitted++;
}
count++;
}

Preconditions.checkState(count > 0);
}

var results = tabletsMutator.process();
var deletesAccepted =
results.values().stream().filter(conditionalResult -> conditionalResult.getStatus()
== Ample.ConditionalResult.Status.ACCEPTED).count();

log.debug("{} deleted {}/{} opids out of {} tablets", FateTxId.formatTid(tid),
deletesAccepted, opsDeleted, count);

manager.getEventCoordinator().event(range, "Merge or deleterows completed %s",
FateTxId.formatTid(tid));
log.debug("{} deleted {}/{} opids out of {} tablets", FateTxId.formatTid(tid),
acceptedCount.get(), submitted, count);

DeleteRows.verifyAccepted(results, FateTxId.formatTid(tid));
Preconditions.checkState(deletesAccepted == opsDeleted);
}
manager.getEventCoordinator().event(range, "Merge or deleterows completed %s",
FateTxId.formatTid(tid));

Utils.unreserveTable(manager, data.tableId, tid, true);
Utils.unreserveNamespace(manager, data.namespaceId, tid, false);
return null;
Preconditions.checkState(acceptedCount.get() == submitted && rejectedCount.get() == 0,
"Failed to delete tablets accepted:%s != %s rejected:%s", acceptedCount.get(), submitted,
rejectedCount.get());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import java.util.Objects;
import java.util.Set;

import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.admin.TabletHostingGoal;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.TableId;
Expand Down Expand Up @@ -77,11 +76,6 @@ public MergeTablets(MergeInfo data) {

@Override
public Repo<Manager> call(long tid, Manager manager) throws Exception {
mergeMetadataRecords(manager, tid);
return new FinishTableRangeOp(data);
}

private void mergeMetadataRecords(Manager manager, long tid) throws AccumuloException {
var fateStr = FateTxId.formatTid(tid);
KeyExtent range = data.getMergeExtent();
log.debug("{} Merging metadata for {}", fateStr, range);
Expand Down Expand Up @@ -130,14 +124,19 @@ private void mergeMetadataRecords(Manager manager, long tid) throws AccumuloExce

// queue all tablets dirs except the last tablets to be added as GC candidates
dirs.add(new AllVolumesDirectory(range.tableId(), tabletMeta.getDirName()));
if (dirs.size() > 1000) {
Preconditions.checkState(tabletsSeen > 1);
manager.getContext().getAmple().putGcFileAndDirCandidates(range.tableId(), dirs);
dirs.clear();
}
}
}

if (tabletsSeen == 1) {
// The merge range overlaps a single tablet, so there is nothing to do. This could be
// because there was only a single tablet before merge started or this operation completed
// but the process died and now its running a 2nd time.
return;
return new FinishTableRangeOp(data);
}

Preconditions.checkState(lastTabletMeta != null, "%s no tablets seen in range %s", opid,
Expand Down Expand Up @@ -190,35 +189,7 @@ private void mergeMetadataRecords(Manager manager, long tid) throws AccumuloExce
// Accumulo GC will delete the dir
manager.getContext().getAmple().putGcFileAndDirCandidates(range.tableId(), dirs);

// delete tablets
try (
var tabletsMetadata =
manager.getContext().getAmple().readTablets().forTable(range.tableId())
.overlapping(range.prevEndRow(), range.endRow()).saveKeyValues().build();
var tabletsMutator = manager.getContext().getAmple().conditionallyMutateTablets()) {

for (var tabletMeta : tabletsMetadata) {
validateTablet(tabletMeta, fateStr, opid, data.tableId);

// do not delete the last tablet
if (Objects.equals(tabletMeta.getExtent().endRow(), lastTabletMeta.getExtent().endRow())) {
break;
}

var tabletMutator = tabletsMutator.mutateTablet(tabletMeta.getExtent())
.requireOperation(opid).requireAbsentLocation();

tabletMeta.getKeyValues().keySet().forEach(key -> {
log.debug("{} deleting {}", fateStr, key);
});

tabletMutator.deleteAll(tabletMeta.getKeyValues().keySet());
// if the tablet no longer exists, then it was successful
tabletMutator.submit(Ample.RejectionHandler.acceptAbsentTablet());
}

verifyAccepted(tabletsMutator.process(), fateStr);
}
return new DeleteTablets(data, lastTabletMeta.getEndRow());
}

static void validateTablet(TabletMetadata tabletMeta, String fateStr, TabletOperationId opid,
Expand Down
Loading