Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -314,37 +314,6 @@
* <td></td>
* </tr>
* <tr>
* <td>queries</td>
* <td>Gauge</td>
* <td>{@value #METRICS_TSERVER_QUERIES}</td>
* <td>Gauge</td>
* <td></td>
* </tr>
* <tr>
* <td>scannedRate</td>
* <td>Gauge</td>
* <td>{@value #METRICS_TSERVER_SCANNED_ENTRIES}</td>
* <td>Gauge</td>
* <td>Prior to 2.1.0 this metric was reported as a rate, it is now the count and the rate can be
* derived</td>
* </tr>
* <tr>
* <td>queryRate</td>
* <td>Gauge</td>
* <td>{@value #METRICS_TSERVER_SCAN_RESULTS}</td>
* <td>Gauge</td>
* <td>Prior to 2.1.0 this metric was reported as a rate, it is now the count and the rate can be
* derived</td>
* </tr>
* <tr>
* <td>queryByteRate</td>
* <td>Gauge</td>
* <td>{@value #METRICS_TSERVER_SCAN_RESULTS_BYTES}</td>
* <td>Gauge</td>
* <td>Prior to 2.1.0 this metric was reported as a rate, it is now the count and the rate can be
* derived</td>
* </tr>
* <tr>
* <td>ingestRate</td>
* <td>Gauge</td>
* <td>{@value #METRICS_TSERVER_INGEST_MUTATIONS}</td>
Expand All @@ -367,6 +336,28 @@
* <td>Gauge</td>
* <td></td>
* </tr>
* <!-- scan server -->
* <tr>
* <th>N/A</th>
* <th>N/A</th>
* <th>{@value #METRICS_SCAN_RESERVATION_TIMER}</th>
* <th>Timer</th>
* <th>Time to reserve a tablets files for scan</th>
* </tr>
* <tr>
* <th>N/A</th>
* <th>N/A</th>
* <th>{@value #METRICS_SCAN_BUSY_TIMEOUT_COUNTER}</th>
* <th>Counter</th>
* <th>Count of the scans where a busy timeout happened</th>
* </tr>
* <tr>
* <th>N/A</th>
* <th>N/A</th>
* <th>{@value #METRICS_SCAN_TABLET_METADATA_CACHE}</th>
* <th>Cache</th>
* <th>scan server tablet cache metrics</th>
* </tr>
* <!-- scans -->
* <tr>
* <td>scan</td>
Expand Down Expand Up @@ -418,12 +409,36 @@
* <td></td>
* </tr>
* <tr>
* <td>N/A</td>
* <td>N/A</td>
* <td>{@value #METRICS_SCAN_BUSY_TIMEOUT}</td>
* <td>Counter</td>
* <td>queries</td>
* <td>Gauge</td>
* <td>{@value #METRICS_SCAN_QUERIES}</td>
* <td>Gauge</td>
* <td></td>
* </tr>
* <tr>
* <td>scannedRate</td>
* <td>Gauge</td>
* <td>{@value #METRICS_SCAN_SCANNED_ENTRIES}</td>
* <td>Gauge</td>
* <td>Prior to 2.1.0 this metric was reported as a rate, it is now the count and the rate can be
* derived</td>
* </tr>
* <tr>
* <td>queryRate</td>
* <td>Gauge</td>
* <td>{@value #METRICS_SCAN_QUERY_SCAN_RESULTS}</td>
* <td>Gauge</td>
* <td>Prior to 2.1.0 this metric was reported as a rate, it is now the count and the rate can be
* derived</td>
* </tr>
* <tr>
* <td>queryByteRate</td>
* <td>Gauge</td>
* <td>{@value #METRICS_SCAN_QUERY_SCAN_RESULTS_BYTES}</td>
* <td>Gauge</td>
* <td>Prior to 2.1.0 this metric was reported as a rate, it is now the count and the rate can be
* derived</td>
* </tr>
* <!-- major compactions -->
* <tr>
* <td>{i|e}_{compactionServiceName}_{executor_name}_queued</td>
Expand Down Expand Up @@ -605,15 +620,22 @@ public interface MetricsProducer {
String METRICS_REPLICATION_PEERS = METRICS_REPLICATION_PREFIX + "peers";
String METRICS_REPLICATION_THREADS = METRICS_REPLICATION_PREFIX + "threads";

String METRICS_SCAN_PREFIX = "accumulo.tserver.scans.";
String METRICS_SCAN_PREFIX = "accumulo.scan.";
String METRICS_SCAN_TIMES = METRICS_SCAN_PREFIX + "times";
String METRICS_SCAN_OPEN_FILES = METRICS_SCAN_PREFIX + "files.open";
String METRICS_SCAN_RESULTS = METRICS_SCAN_PREFIX + "result";
String METRICS_SCAN_YIELDS = METRICS_SCAN_PREFIX + "yields";
String METRICS_SCAN_START = METRICS_SCAN_PREFIX + "start";
String METRICS_SCAN_CONTINUE = METRICS_SCAN_PREFIX + "continue";
String METRICS_SCAN_CLOSE = METRICS_SCAN_PREFIX + "close";
String METRICS_SCAN_BUSY_TIMEOUT = METRICS_SCAN_PREFIX + "busy.timeout";
String METRICS_SCAN_BUSY_TIMEOUT_COUNTER = METRICS_SCAN_PREFIX + "busy.timeout.count";
String METRICS_SCAN_RESERVATION_TIMER = METRICS_SCAN_PREFIX + "reservation.timer";
String METRICS_SCAN_QUERIES = METRICS_SCAN_PREFIX + "queries";
String METRICS_SCAN_QUERY_SCAN_RESULTS = METRICS_SCAN_PREFIX + "query.results";
String METRICS_SCAN_QUERY_SCAN_RESULTS_BYTES = METRICS_SCAN_PREFIX + "query.results.bytes";
String METRICS_SCAN_SCANNED_ENTRIES = METRICS_SCAN_PREFIX + "query.scanned.entries";

String METRICS_SCAN_TABLET_METADATA_CACHE = METRICS_SCAN_PREFIX + "tablet.metadata.cache";

String METRICS_TSERVER_PREFIX = "accumulo.tserver.";
String METRICS_TSERVER_ENTRIES = METRICS_TSERVER_PREFIX + "entries";
Expand All @@ -629,14 +651,10 @@ public interface MetricsProducer {
String METRICS_TSERVER_TABLETS_ONLINE = METRICS_TSERVER_PREFIX + "tablets.online";
String METRICS_TSERVER_TABLETS_OPENING = METRICS_TSERVER_PREFIX + "tablets.opening";
String METRICS_TSERVER_TABLETS_UNOPENED = METRICS_TSERVER_PREFIX + "tablets.unopened";
String METRICS_TSERVER_QUERIES = METRICS_TSERVER_PREFIX + "queries";
String METRICS_TSERVER_TABLETS_FILES = METRICS_TSERVER_PREFIX + "tablets.files";
String METRICS_TSERVER_HOLD = METRICS_TSERVER_PREFIX + "hold";
String METRICS_TSERVER_INGEST_MUTATIONS = METRICS_TSERVER_PREFIX + "ingest.mutations";
String METRICS_TSERVER_INGEST_BYTES = METRICS_TSERVER_PREFIX + "ingest.bytes";
String METRICS_TSERVER_SCAN_RESULTS = METRICS_TSERVER_PREFIX + "scan.results";
String METRICS_TSERVER_SCAN_RESULTS_BYTES = METRICS_TSERVER_PREFIX + "scan.results.bytes";
String METRICS_TSERVER_SCANNED_ENTRIES = METRICS_TSERVER_PREFIX + "scan.scanned.entries";

String METRICS_THRIFT_PREFIX = "accumulo.thrift.";
String METRICS_THRIFT_EXECUTE = METRICS_THRIFT_PREFIX + "execute";
Expand Down Expand Up @@ -669,7 +687,7 @@ default Map<String,String> getMetricFields() {
fields.put((String) f.get(MetricsProducer.class), f.getName());
} catch (IllegalArgumentException | IllegalAccessException e) {
// this shouldn't happen, but let's log it anyway
LOG.error("Error getting metric value for field: " + f.getName());
LOG.error("Error getting metric value for field: {}", f.getName());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import org.apache.accumulo.core.tabletserver.thrift.ActiveScan;
import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException;
import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException;
import org.apache.accumulo.core.tabletserver.thrift.ScanServerBusyException;
import org.apache.accumulo.core.tabletserver.thrift.TSampleNotPresentException;
import org.apache.accumulo.core.tabletserver.thrift.TSamplerConfiguration;
import org.apache.accumulo.core.tabletserver.thrift.TabletScanClientService;
Expand Down Expand Up @@ -122,6 +123,8 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;

import io.micrometer.core.instrument.Tag;

public class ScanServer extends AbstractServer
implements TabletScanClientService.Iface, TabletHostingServer {

Expand Down Expand Up @@ -199,6 +202,7 @@ private TabletMetadataLoader(Ample ample) {
private volatile boolean serverStopRequested = false;
private ServiceLock scanServerLock;
protected TabletServerScanMetrics scanMetrics;
private ScanServerMetrics scanServerMetrics;

private ZooCache managerLockCache;

Expand Down Expand Up @@ -243,7 +247,7 @@ public ScanServer(ScanServerOpts opts, String[] args) {
}
tabletMetadataCache =
Caffeine.newBuilder().expireAfterWrite(cacheExpiration, TimeUnit.MILLISECONDS)
.scheduler(Scheduler.systemScheduler()).build(tabletMetadataLoader);
.scheduler(Scheduler.systemScheduler()).recordStats().build(tabletMetadataLoader);
}

delegate = newThriftScanClientHandler(new WriteTracker());
Expand Down Expand Up @@ -338,6 +342,7 @@ public void unableToMonitorLockNode(final Exception e) {
// Don't use the normal ServerServices lock content, instead put the server UUID here.
byte[] lockContent = (serverLockUUID.toString() + "," + groupName).getBytes(UTF_8);

// wait for 120 seconds with 5 second delay
for (int i = 0; i < 120 / 5; i++) {
zoo.putPersistentData(zLockPath.toString(), new byte[0], NodeExistsPolicy.SKIP);

Expand Down Expand Up @@ -371,10 +376,12 @@ public void run() {

MetricsInfo metricsInfo = getContext().getMetricsInfo();
metricsInfo.addServiceTags(getApplicationName(), clientAddress);
metricsInfo.addCommonTags(List.of(Tag.of("resource.group", groupName)));

scanMetrics = new TabletServerScanMetrics();
scanServerMetrics = new ScanServerMetrics(tabletMetadataCache);

metricsInfo.addMetricsProducers(scanMetrics);
metricsInfo.addMetricsProducers(scanMetrics, scanServerMetrics);
metricsInfo.init();
// We need to set the compaction manager so that we don't get an NPE in CompactableImpl.close

Expand Down Expand Up @@ -657,6 +664,19 @@ private Map<KeyExtent,TabletMetadata> reserveFilesInner(Collection<KeyExtent> ex
}
}

@VisibleForTesting
ScanReservation reserveFilesInstrumented(Map<KeyExtent,List<TRange>> extents)
throws AccumuloException {
long start = System.nanoTime();
try {
return reserveFiles(extents);
} finally {
scanServerMetrics.getReservationTimer().record(System.nanoTime() - start,
TimeUnit.NANOSECONDS);
}

}

protected ScanReservation reserveFiles(Map<KeyExtent,List<TRange>> extents)
throws AccumuloException {

Expand Down Expand Up @@ -687,6 +707,16 @@ protected ScanReservation reserveFiles(Map<KeyExtent,List<TRange>> extents)
return new ScanReservation(tabletsMetadata, myReservationId, failures);
}

private ScanReservation reserveFilesInstrumented(long scanId) throws NoSuchScanIDException {
long start = System.nanoTime();
try {
return reserveFiles(scanId);
} finally {
scanServerMetrics.getReservationTimer().record(System.nanoTime() - start,
TimeUnit.NANOSECONDS);
}
}

protected ScanReservation reserveFiles(long scanId) throws NoSuchScanIDException {
var session = (ScanSession) sessionManager.getSession(scanId);
if (session == null) {
Expand Down Expand Up @@ -875,7 +905,7 @@ public InitialScan startScan(TInfo tinfo, TCredentials credentials, TKeyExtent t

KeyExtent extent = getKeyExtent(textent);
try (ScanReservation reservation =
reserveFiles(Map.of(extent, Collections.singletonList(range)))) {
reserveFilesInstrumented(Map.of(extent, Collections.singletonList(range)))) {

if (reservation.getFailures().containsKey(textent)) {
throw new NotServingTabletException(extent.toThrift());
Expand All @@ -889,7 +919,9 @@ batchTimeOut, classLoaderContext, executionHints, getScanTabletResolver(tablet),
busyTimeout);

return is;

} catch (ScanServerBusyException be) {
scanServerMetrics.incrementBusy();
throw be;
} catch (AccumuloException | IOException e) {
LOG.error("Error starting scan", e);
throw new RuntimeException(e);
Expand All @@ -905,6 +937,9 @@ public ScanResult continueScan(TInfo tinfo, long scanID, long busyTimeout)
try (ScanReservation reservation = reserveFiles(scanID)) {
Preconditions.checkState(reservation.getFailures().isEmpty());
return delegate.continueScan(tinfo, scanID, busyTimeout);
} catch (ScanServerBusyException be) {
scanServerMetrics.incrementBusy();
throw be;
}
}

Expand Down Expand Up @@ -933,7 +968,7 @@ public InitialMultiScan startMultiScan(TInfo tinfo, TCredentials credentials,
batch.put(extent, entry.getValue());
}

try (ScanReservation reservation = reserveFiles(batch)) {
try (ScanReservation reservation = reserveFilesInstrumented(batch)) {

HashMap<KeyExtent,TabletBase> tablets = new HashMap<>();
reservation.getTabletMetadataExtents().forEach(extent -> {
Expand All @@ -950,6 +985,9 @@ public InitialMultiScan startMultiScan(TInfo tinfo, TCredentials credentials,

LOG.trace("started scan: {}", ims.getScanID());
return ims;
} catch (ScanServerBusyException be) {
scanServerMetrics.incrementBusy();
throw be;
} catch (TException e) {
LOG.error("Error starting scan", e);
throw e;
Expand All @@ -967,6 +1005,9 @@ public MultiScanResult continueMultiScan(TInfo tinfo, long scanID, long busyTime
try (ScanReservation reservation = reserveFiles(scanID)) {
Preconditions.checkState(reservation.getFailures().isEmpty());
return delegate.continueMultiScan(tinfo, scanID, busyTimeout);
} catch (ScanServerBusyException be) {
scanServerMetrics.incrementBusy();
throw be;
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.tserver;

import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
import org.apache.accumulo.core.metrics.MetricsProducer;

import com.github.benmanes.caffeine.cache.LoadingCache;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;
import io.micrometer.core.instrument.binder.cache.CaffeineCacheMetrics;

public class ScanServerMetrics implements MetricsProducer {

private Timer reservationTimer;
private Counter busyTimeoutCount;

private final LoadingCache<KeyExtent,TabletMetadata> tabletMetadataCache;

public ScanServerMetrics(final LoadingCache<KeyExtent,TabletMetadata> tabletMetadataCache) {
this.tabletMetadataCache = tabletMetadataCache;
}

@Override
public void registerMetrics(MeterRegistry registry) {
reservationTimer = Timer.builder(MetricsProducer.METRICS_SCAN_RESERVATION_TIMER)
.description("Time to reserve a tablets files for scan").register(registry);
busyTimeoutCount = Counter.builder(METRICS_SCAN_BUSY_TIMEOUT_COUNTER)
.description("The number of scans where a busy timeout happened").register(registry);
CaffeineCacheMetrics.monitor(registry, tabletMetadataCache, METRICS_SCAN_TABLET_METADATA_CACHE);
}

public Timer getReservationTimer() {
return reservationTimer;
}

public void incrementBusy() {
busyTimeoutCount.increment();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ protected ScanResult continueScan(TInfo tinfo, long scanID, SingleScanSession sc
server.getSessionManager().removeSession(scanID);
TabletBase tablet = scanSession.getTabletResolver().getTablet(scanSession.extent);
if (busyTimeout > 0) {
server.getScanMetrics().incrementScanBusyTimeout(1.0D);
server.getScanMetrics().incrementBusy(1.0D);
throw new ScanServerBusyException();
} else if (tablet == null || tablet.isClosed()) {
throw new NotServingTabletException(scanSession.extent.toThrift());
Expand Down Expand Up @@ -495,7 +495,7 @@ private MultiScanResult continueMultiScan(long scanID, MultiScanSession session,
} catch (CancellationException ce) {
server.getSessionManager().removeSession(scanID);
if (busyTimeout > 0) {
server.getScanMetrics().incrementScanBusyTimeout(1.0D);
server.getScanMetrics().incrementBusy(1.0D);
throw new ScanServerBusyException();
} else {
log.warn("Failed to get multiscan result", ce);
Expand Down
Loading