diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java index 6135942ee9d..fe6755bc564 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java @@ -30,7 +30,9 @@ import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SCANS; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SELECTED; +import java.io.FileNotFoundException; import java.io.IOException; +import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -112,6 +114,8 @@ import org.apache.accumulo.server.manager.LiveTServerSet; import org.apache.accumulo.server.security.SecurityOperation; import org.apache.accumulo.server.tablets.TabletNameGenerator; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; import org.apache.thrift.TException; import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportException; @@ -123,6 +127,7 @@ import com.github.benmanes.caffeine.cache.CacheLoader; import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.LoadingCache; +import com.github.benmanes.caffeine.cache.Weigher; import com.google.common.base.Preconditions; import com.google.common.collect.Iterators; import com.google.common.collect.Sets; @@ -154,6 +159,13 @@ public class CompactionCoordinator implements CompactionCoordinatorService.Iface private static final Cache COMPLETED = Caffeine.newBuilder().maximumSize(200).expireAfterWrite(10, TimeUnit.MINUTES).build(); + private final Weigher weigher = (path, count) -> { + return path.toUri().toString().length(); + }; + + private final Cache checked_tablet_dir_cache = + Caffeine.newBuilder().maximumWeight(10485760L).weigher(weigher).build(); + /* Map of group name to last time compactor called to get a compaction job */ // ELASTICITY_TODO need to clean out groups that are no longer configured.. private static final Map TIME_COMPACTOR_LAST_CHECKED = new ConcurrentHashMap<>(); @@ -478,6 +490,28 @@ private boolean canReserveCompaction(TabletMetadata tablet, CompactionJob job, return true; } + private void checkTabletDir(KeyExtent extent, Path path) { + try { + if (checked_tablet_dir_cache.getIfPresent(path) == null) { + FileStatus[] files = null; + try { + files = ctx.getVolumeManager().listStatus(path); + } catch (FileNotFoundException ex) { + // ignored + } + + if (files == null) { + LOG.debug("Tablet {} had no dir, creating {}", extent, path); + + ctx.getVolumeManager().mkdirs(path); + } + checked_tablet_dir_cache.put(path, 1); + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + private ExternalCompactionMetadata createExternalCompactionMetadata(CompactionJob job, Set jobFiles, TabletMetadata tablet, String compactorAddress) { boolean propDels; @@ -502,12 +536,9 @@ private ExternalCompactionMetadata createExternalCompactionMetadata(CompactionJo throw new IllegalArgumentException(); } - // ELASTICITY_TODO need to create dir if it does not exists.. look at tablet code, it has cache, - // but its unbounded in size which is ok for a single tablet... in the manager we need a cache - // of dirs that were created that is bounded in size - Consumer directorCreator = dirName -> {}; + Consumer directoryCreator = dir -> checkTabletDir(tablet.getExtent(), new Path(dir)); ReferencedTabletFile newFile = - TabletNameGenerator.getNextDataFilenameForMajc(propDels, ctx, tablet, directorCreator); + TabletNameGenerator.getNextDataFilenameForMajc(propDels, ctx, tablet, directoryCreator); return new ExternalCompactionMetadata(jobFiles, newFile, compactorAddress, job.getKind(), job.getPriority(), job.getExecutor(), propDels, fateTxId);