Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
bb15261
Basic multiplexing is working using the SHARED method - only have bas…
kkeirstead May 10, 2023
4e47265
Improvements to testing, some refactoring, added event to report unma…
kkeirstead May 11, 2023
cd76fc5
Tests still need fixing/updating - resolved issue where a Disable was…
kkeirstead May 15, 2023
9d7528a
Some test cleanup
kkeirstead May 16, 2023
c137c3a
Added an extra test
kkeirstead May 16, 2023
aa26df8
Added in ref counting for proper disabling. When all shared listeners…
kkeirstead May 17, 2023
f4ba2f7
Cleanup
kkeirstead May 18, 2023
2619c99
Merge branch 'main' of https://github.com/dotnet/runtime into kkeirst…
kkeirstead May 18, 2023
0a5a901
Fixes a small bug with ref counting
kkeirstead May 18, 2023
d363465
Updated some naming
kkeirstead May 19, 2023
452f306
Addressing a few nits.
kkeirstead May 23, 2023
72857f6
Stashing changes.
kkeirstead May 30, 2023
df15960
Refactoring to share parsing logic, need to do more refactoring.
kkeirstead Jun 1, 2023
77150d1
Refactoring for aggregation manager - currently have two mechanisms f…
kkeirstead Jun 1, 2023
5e94836
Some cleanup and fixing an incorrect test.
kkeirstead Jun 1, 2023
58aedba
Added in disabling ref counting when ClientId isn't provided (as well…
kkeirstead Jun 6, 2023
8b6d433
Some refactoring, bulking up testing.
kkeirstead Jun 8, 2023
e6c26a9
Cleanup, especially for tests.
kkeirstead Jun 8, 2023
0021369
Cleanup, especially for tests.
kkeirstead Jun 8, 2023
c3f746d
Small cleanup
kkeirstead Jun 8, 2023
15ae5bc
Removed stray comment
kkeirstead Jun 8, 2023
93fa74a
Small cleanup
kkeirstead Jun 9, 2023
516f4a2
Merge branch 'main' of https://github.com/dotnet/runtime into kkeirst…
kkeirstead Jun 14, 2023
ce8a923
Merge branch 'main' of https://github.com/dotnet/runtime into kkeirst…
kkeirstead Jun 15, 2023
108671f
Merge branch 'main' of https://github.com/dotnet/runtime into kkeirst…
kkeirstead Jun 16, 2023
8396b8e
Removing instruments after they've stopped collecting measurements, a…
kkeirstead Jun 16, 2023
74ff54a
Added NET8 check to new event
kkeirstead Jun 16, 2023
f6c1e47
Added description for ClientId protocol; added check for the EventSou…
kkeirstead Jun 19, 2023
409b2a8
Changed test IntervalSecs
kkeirstead Jun 19, 2023
da49819
Minor adjustments before checking in.
kkeirstead Jun 30, 2023
0d5b140
Re-enabled OuterLoop
kkeirstead Jun 30, 2023
cc17cb0
Fixed typo.
kkeirstead Jun 30, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,17 @@ internal sealed class AggregationManager
// these fields are modified after construction and accessed on multiple threads, use lock(this) to ensure the data
// is synchronized
private readonly List<Predicate<Instrument>> _instrumentConfigFuncs = new();
private TimeSpan _collectionPeriod;
public TimeSpan CollectionPeriod { get; private set; }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: consider scoping the lock to this property, rather than 'this' + _aggregationManager


public int MaxTimeSeries { get; }
public int MaxHistograms { get; }
private Dictionary<Instrument, bool> _instruments = new();
private readonly ConcurrentDictionary<Instrument, InstrumentState> _instrumentStates = new();
private readonly CancellationTokenSource _cts = new();
private Thread? _collectThread;
private readonly MeterListener _listener;
private int _currentTimeSeries;
private int _currentHistograms;

private readonly int _maxTimeSeries;
private readonly int _maxHistograms;
private readonly Action<Instrument, LabeledAggregationStatistics> _collectMeasurement;
private readonly Action<DateTime, DateTime> _beginCollection;
private readonly Action<DateTime, DateTime> _endCollection;
Expand Down Expand Up @@ -59,8 +59,8 @@ public AggregationManager(
Action histogramLimitReached,
Action<Exception> observableInstrumentCallbackError)
{
_maxTimeSeries = maxTimeSeries;
_maxHistograms = maxHistograms;
MaxTimeSeries = maxTimeSeries;
MaxHistograms = maxHistograms;
_collectMeasurement = collectMeasurement;
_beginCollection = beginCollection;
_endCollection = endCollection;
Expand All @@ -73,24 +73,10 @@ public AggregationManager(
_histogramLimitReached = histogramLimitReached;
_observableInstrumentCallbackError = observableInstrumentCallbackError;

_listener = new MeterListener()
{
InstrumentPublished = (instrument, listener) =>
{
_instrumentPublished(instrument);
InstrumentState? state = GetInstrumentState(instrument);
if (state != null)
{
_beginInstrumentMeasurements(instrument);
listener.EnableMeasurementEvents(instrument, state);
}
},
MeasurementsCompleted = (instrument, cookie) =>
{
_endInstrumentMeasurements(instrument);
RemoveInstrumentState(instrument);
}
};
_listener = new MeterListener();
_listener.InstrumentPublished += PublishedInstrument;
_listener.MeasurementsCompleted += CompletedMeasurements;

_listener.SetMeasurementEventCallback<double>((i, m, l, c) => ((InstrumentState)c!).Update((double)m, l));
_listener.SetMeasurementEventCallback<float>((i, m, l, c) => ((InstrumentState)c!).Update((double)m, l));
_listener.SetMeasurementEventCallback<long>((i, m, l, c) => ((InstrumentState)c!).Update((double)m, l));
Expand Down Expand Up @@ -124,16 +110,42 @@ public AggregationManager SetCollectionPeriod(TimeSpan collectionPeriod)
Debug.Assert(collectionPeriod.TotalSeconds >= MinCollectionTimeSecs);
lock (this)
{
_collectionPeriod = collectionPeriod;
CollectionPeriod = collectionPeriod;
}
return this;
}

private void CompletedMeasurements(Instrument instrument, object? cookie)
{
_instruments.Remove(instrument);
_endInstrumentMeasurements(instrument);
RemoveInstrumentState(instrument);
}

private void PublishedInstrument(Instrument instrument, MeterListener _)
{
_instrumentPublished(instrument);
InstrumentState? state = GetInstrumentState(instrument);
if (state != null)
{
_beginInstrumentMeasurements(instrument);

if (!_instruments.ContainsKey(instrument))
{
// This has side effects that prompt MeasurementsCompleted
// to be called if this is called multiple times on an
// instrument in a shared MetricsEventSource.
_listener.EnableMeasurementEvents(instrument, state);
_instruments.Add(instrument, true);
}
}
}

public void Start()
{
// if already started or already stopped we can't be started again
Debug.Assert(_collectThread == null && !_cts.IsCancellationRequested);
Debug.Assert(_collectionPeriod.TotalSeconds >= MinCollectionTimeSecs);
Debug.Assert(CollectionPeriod.TotalSeconds >= MinCollectionTimeSecs);

// This explicitly uses a Thread and not a Task so that metrics still work
// even when an app is experiencing thread-pool starvation. Although we
Expand All @@ -148,14 +160,28 @@ public void Start()
_initialInstrumentEnumerationComplete();
}

public void Update()
{
// Creating (and destroying) a MeterListener to leverage the existing
// mechanisms for enumerating and publishing instruments.
using (MeterListener tempListener = new MeterListener())
{
tempListener.InstrumentPublished += PublishedInstrument;
tempListener.MeasurementsCompleted += CompletedMeasurements;
tempListener.Start();
}

_initialInstrumentEnumerationComplete();
}

private void CollectWorker(CancellationToken cancelToken)
{
try
{
double collectionIntervalSecs = -1;
lock (this)
{
collectionIntervalSecs = _collectionPeriod.TotalSeconds;
collectionIntervalSecs = CollectionPeriod.TotalSeconds;
}
Debug.Assert(collectionIntervalSecs >= MinCollectionTimeSecs);

Expand Down Expand Up @@ -340,12 +366,12 @@ private void RemoveInstrumentState(Instrument instrument)

private bool CheckTimeSeriesAllowed()
{
if (_currentTimeSeries < _maxTimeSeries)
if (_currentTimeSeries < MaxTimeSeries)
{
_currentTimeSeries++;
return true;
}
else if (_currentTimeSeries == _maxTimeSeries)
else if (_currentTimeSeries == MaxTimeSeries)
{
_currentTimeSeries++;
_timeSeriesLimitReached();
Expand All @@ -359,12 +385,12 @@ private bool CheckTimeSeriesAllowed()

private bool CheckHistogramAllowed()
{
if (_currentHistograms < _maxHistograms)
if (_currentHistograms < MaxHistograms)
{
_currentHistograms++;
return true;
}
else if (_currentHistograms == _maxHistograms)
else if (_currentHistograms == MaxHistograms)
{
_currentHistograms++;
_histogramLimitReached();
Expand Down
Loading