Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ enum Status {
/**
* This can only be called when {@link #getStatus()} returns something other than
* {@link Status#ACCEPTED}. It reads that tablets metadata for a failed conditional mutation.
* This can used used to see why it was rejected.
* This can be used to see why it was not accepted.
*/
TabletMetadata readMetadata();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.accumulo.server.metadata;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.HostingColumnFamily.GOAL_COLUMN;
import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.COMPACT_COLUMN;
import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.FLUSH_COLUMN;
Expand All @@ -29,6 +30,7 @@
import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN;
import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily.encodePrevEndRow;

import java.util.HashSet;
import java.util.Objects;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
Expand Down Expand Up @@ -170,6 +172,13 @@ private void requireSameSingle(TabletMetadata tabletMetadata, ColumnType type) {
mutation.addCondition(c);
}
break;
case LOGS: {
Condition c = SetEqualityIterator.createCondition(new HashSet<>(tabletMetadata.getLogs()),
logEntry -> logEntry.getColumnQualifier().toString().getBytes(UTF_8),
LogColumnFamily.NAME);
mutation.addCondition(c);
}
break;
case FILES: {
// ELASTICITY_TODO compare values?
Condition c = SetEqualityIterator.createCondition(tabletMetadata.getFiles(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FLUSH_ID;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOADED;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOGS;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.OPID;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SELECTED;
Expand All @@ -40,11 +41,13 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
Expand Down Expand Up @@ -75,9 +78,11 @@
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location;
import org.apache.accumulo.core.metadata.schema.TabletMetadata.LocationType;
import org.apache.accumulo.core.metadata.schema.TabletMetadataBuilder;
import org.apache.accumulo.core.metadata.schema.TabletOperationId;
import org.apache.accumulo.core.metadata.schema.TabletOperationType;
import org.apache.accumulo.core.security.TablePermission;
import org.apache.accumulo.core.tabletserver.log.LogEntry;
import org.apache.accumulo.harness.AccumuloClusterHarness;
import org.apache.accumulo.server.metadata.AsyncConditionalTabletsMutatorImpl;
import org.apache.accumulo.server.metadata.ConditionalTabletsMutatorImpl;
Expand All @@ -86,6 +91,8 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import com.google.common.collect.Sets;

public class AmpleConditionalWriterIT extends AccumuloClusterHarness {

// ELASTICITY_TODO ensure that all conditional updates are tested
Expand Down Expand Up @@ -321,6 +328,89 @@ public void testFiles() throws Exception {
}
}

@Test
public void testWALs() {
var context = cluster.getServerContext();

// Test adding a WAL to a tablet and verifying its presence
String walFilePath =
java.nio.file.Path.of("tserver:8080", UUID.randomUUID().toString()).toString();
LogEntry originalLogEntry = new LogEntry(walFilePath);
ConditionalTabletsMutatorImpl ctmi = new ConditionalTabletsMutatorImpl(context);
// create a tablet metadata with no write ahead logs
var tmEmptySet = TabletMetadata.builder(e1).build(LOGS);
// tablet should not have any logs to start with so requireSame with the empty logs should pass
ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tmEmptySet, LOGS)
.putWal(originalLogEntry).submit(tm -> false);
var results = ctmi.process();
assertEquals(Status.ACCEPTED, results.get(e1).getStatus());

Set<LogEntry> expectedLogs = new HashSet<>();
expectedLogs.add(originalLogEntry);
assertEquals(expectedLogs, new HashSet<>(context.getAmple().readTablet(e1).getLogs()),
"The original LogEntry should be present.");

// Test adding another WAL and verifying the update
String walFilePath2 =
java.nio.file.Path.of("tserver:8080", UUID.randomUUID().toString()).toString();
LogEntry newLogEntry = new LogEntry(walFilePath2);
ctmi = new ConditionalTabletsMutatorImpl(context);
ctmi.mutateTablet(e1).requireAbsentOperation().putWal(newLogEntry).submit(tm -> false);
Comment thread
DomGarguilo marked this conversation as resolved.
results = ctmi.process();
assertEquals(Status.ACCEPTED, results.get(e1).getStatus());

// Verify that both the original and new WALs are present
expectedLogs.add(newLogEntry);
HashSet<LogEntry> actualLogs = new HashSet<>(context.getAmple().readTablet(e1).getLogs());
assertEquals(expectedLogs, actualLogs, "Both original and new LogEntry should be present.");

String walFilePath3 =
java.nio.file.Path.of("tserver:8080", UUID.randomUUID().toString()).toString();
LogEntry otherLogEntry = new LogEntry(walFilePath3);

// create a powerset to ensure all possible subsets fail when using requireSame except the
// expected current state
Set<LogEntry> allLogs = Set.of(originalLogEntry, newLogEntry, otherLogEntry);
Set<Set<LogEntry>> allSubsets = Sets.powerSet(allLogs);

for (Set<LogEntry> subset : allSubsets) {
// Skip the subset that matches the current state of the tablet
if (subset.equals(expectedLogs)) {
continue;
}

final TabletMetadataBuilder builder = TabletMetadata.builder(e1);
subset.forEach(builder::putWal);
TabletMetadata tmSubset = builder.build(LOGS);

ctmi = new ConditionalTabletsMutatorImpl(context);
ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tmSubset, LOGS)
.deleteWal(originalLogEntry).submit(t -> false);
results = ctmi.process();

assertEquals(Status.REJECTED, results.get(e1).getStatus());

// ensure the operation did not go through
actualLogs = new HashSet<>(context.getAmple().readTablet(e1).getLogs());
assertEquals(expectedLogs, actualLogs, "Both original and new LogEntry should be present.");
}

// Test that requiring the current WALs gets accepted when making an update (deleting a WAL in
// this example)
TabletMetadata tm2 =
TabletMetadata.builder(e1).putWal(originalLogEntry).putWal(newLogEntry).build(LOGS);
ctmi = new ConditionalTabletsMutatorImpl(context);
ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tm2, LOGS)
.deleteWal(originalLogEntry).submit(tm -> false);
results = ctmi.process();
assertEquals(Status.ACCEPTED, results.get(e1).getStatus(),
"Requiring the current WALs should result in acceptance when making an update.");

// Verify that the update went through as expected
assertEquals(List.of(newLogEntry), context.getAmple().readTablet(e1).getLogs(),
"Only the new LogEntry should remain after deleting the original.");
}

@Test
public void testSelectedFiles() throws Exception {
var context = cluster.getServerContext();
Expand Down