Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -674,6 +674,10 @@ public interface MetricsProducer {
String METRICS_TSERVER_SCAN_RESULTS_BYTES = METRICS_TSERVER_PREFIX + "scan.results.bytes";
String METRICS_TSERVER_SCANNED_ENTRIES = METRICS_TSERVER_PREFIX + "scan.scanned.entries";

String METRICS_SSERVER_PREFIX = "accumulo.sserver.";
String METRICS_SSERVER_REGISTRATION_TIMER = METRICS_SSERVER_PREFIX + "registration.timer";
String METRICS_SSERVER_BUSY_COUNTER = METRICS_SSERVER_PREFIX + "busy.count";

String METRICS_THRIFT_PREFIX = "accumulo.thrift.";
String METRICS_THRIFT_EXECUTE = METRICS_THRIFT_PREFIX + "execute";
String METRICS_THRIFT_IDLE = METRICS_THRIFT_PREFIX + "idle";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import org.apache.accumulo.core.tabletserver.thrift.ActiveScan;
import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException;
import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException;
import org.apache.accumulo.core.tabletserver.thrift.ScanServerBusyException;
import org.apache.accumulo.core.tabletserver.thrift.TSampleNotPresentException;
import org.apache.accumulo.core.tabletserver.thrift.TSamplerConfiguration;
import org.apache.accumulo.core.tabletserver.thrift.TabletScanClientService;
Expand Down Expand Up @@ -200,6 +201,7 @@ private TabletMetadataLoader(Ample ample) {
private volatile boolean serverStopRequested = false;
private ServiceLock scanServerLock;
protected TabletServerScanMetrics scanMetrics;
private ScanServerMetrics scanServerMetrics;

private ZooCache managerLockCache;

Expand Down Expand Up @@ -242,6 +244,7 @@ public ScanServer(ScanServerOpts opts, String[] args) {
LOG.warn(
"Tablet metadata caching less than one minute, may cause excessive scans on metadata table.");
}
// TODO Would like hit rate metrics for this cache, how can that be done?
tabletMetadataCache =
Caffeine.newBuilder().expireAfterWrite(cacheExpiration, TimeUnit.MILLISECONDS)
.scheduler(Scheduler.systemScheduler()).build(tabletMetadataLoader);
Expand Down Expand Up @@ -379,7 +382,8 @@ public void run() {
LOG.error("Error initializing metrics, metrics will not be emitted.", e1);
}
scanMetrics = new TabletServerScanMetrics();
MetricsUtil.initializeProducers(scanMetrics);
scanServerMetrics = new ScanServerMetrics();
MetricsUtil.initializeProducers(scanMetrics, scanServerMetrics);

// We need to set the compaction manager so that we don't get an NPE in CompactableImpl.close

Expand Down Expand Up @@ -662,6 +666,18 @@ private Map<KeyExtent,TabletMetadata> reserveFilesInner(Collection<KeyExtent> ex
}
}

private ScanReservation reserveFilesInstrumented(Map<KeyExtent,List<TRange>> extents)
throws AccumuloException {
long start = System.nanoTime();
try {
return reserveFiles(extents);
} finally {
scanServerMetrics.getReservationTimer().record(System.nanoTime() - start,
TimeUnit.NANOSECONDS);
}

}

protected ScanReservation reserveFiles(Map<KeyExtent,List<TRange>> extents)
throws AccumuloException {

Expand Down Expand Up @@ -692,6 +708,16 @@ protected ScanReservation reserveFiles(Map<KeyExtent,List<TRange>> extents)
return new ScanReservation(tabletsMetadata, myReservationId, failures);
}

private ScanReservation reserveFilesInstrumented(long scanId) throws NoSuchScanIDException {
long start = System.nanoTime();
try {
return reserveFiles(scanId);
} finally {
scanServerMetrics.getReservationTimer().record(System.nanoTime() - start,
TimeUnit.NANOSECONDS);
}
}

protected ScanReservation reserveFiles(long scanId) throws NoSuchScanIDException {
var session = (ScanSession) sessionManager.getSession(scanId);
if (session == null) {
Expand Down Expand Up @@ -880,7 +906,7 @@ public InitialScan startScan(TInfo tinfo, TCredentials credentials, TKeyExtent t

KeyExtent extent = getKeyExtent(textent);
try (ScanReservation reservation =
reserveFiles(Map.of(extent, Collections.singletonList(range)))) {
reserveFilesInstrumented(Map.of(extent, Collections.singletonList(range)))) {

if (reservation.getFailures().containsKey(textent)) {
throw new NotServingTabletException(extent.toThrift());
Expand All @@ -894,7 +920,9 @@ batchTimeOut, classLoaderContext, executionHints, getScanTabletResolver(tablet),
busyTimeout);

return is;

} catch (ScanServerBusyException be) {
scanServerMetrics.incrementBusy();
throw be;
} catch (AccumuloException | IOException e) {
LOG.error("Error starting scan", e);
throw new RuntimeException(e);
Expand All @@ -910,6 +938,9 @@ public ScanResult continueScan(TInfo tinfo, long scanID, long busyTimeout)
try (ScanReservation reservation = reserveFiles(scanID)) {
Preconditions.checkState(reservation.getFailures().isEmpty());
return delegate.continueScan(tinfo, scanID, busyTimeout);
} catch (ScanServerBusyException be) {
scanServerMetrics.incrementBusy();
throw be;
}
}

Expand Down Expand Up @@ -938,7 +969,7 @@ public InitialMultiScan startMultiScan(TInfo tinfo, TCredentials credentials,
batch.put(extent, entry.getValue());
}

try (ScanReservation reservation = reserveFiles(batch)) {
try (ScanReservation reservation = reserveFilesInstrumented(batch)) {

HashMap<KeyExtent,TabletBase> tablets = new HashMap<>();
reservation.getTabletMetadataExtents().forEach(extent -> {
Expand All @@ -955,6 +986,9 @@ public InitialMultiScan startMultiScan(TInfo tinfo, TCredentials credentials,

LOG.trace("started scan: {}", ims.getScanID());
return ims;
} catch (ScanServerBusyException be) {
scanServerMetrics.incrementBusy();
throw be;
} catch (TException e) {
LOG.error("Error starting scan", e);
throw e;
Expand All @@ -972,6 +1006,9 @@ public MultiScanResult continueMultiScan(TInfo tinfo, long scanID, long busyTime
try (ScanReservation reservation = reserveFiles(scanID)) {
Preconditions.checkState(reservation.getFailures().isEmpty());
return delegate.continueMultiScan(tinfo, scanID, busyTimeout);
} catch (ScanServerBusyException be) {
scanServerMetrics.incrementBusy();
throw be;
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.tserver;

import org.apache.accumulo.core.metrics.MetricsProducer;
import org.apache.accumulo.core.metrics.MetricsUtil;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;

public class ScanServerMetrics implements MetricsProducer {

private Timer reservationTimer;
private Counter busyCounter;

@Override
public void registerMetrics(MeterRegistry registry) {
reservationTimer = Timer.builder(MetricsProducer.METRICS_SSERVER_REGISTRATION_TIMER)
.description("Time to reserve a tablets files for scan").tags(MetricsUtil.getCommonTags())
.register(registry);
busyCounter = Counter.builder(MetricsProducer.METRICS_SSERVER_BUSY_COUNTER)
.description("The number of scans where a busy timeout happened")
.tags(MetricsUtil.getCommonTags()).register(registry);
}

public Timer getReservationTimer() {
return reservationTimer;
}

public void incrementBusy() {
busyCounter.increment();
}
}