Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 32 additions & 19 deletions prometheus_client/multiprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,32 +88,42 @@ def _parse_key(key):
@staticmethod
def _accumulate_metrics(metrics, accumulate):
for metric in metrics.values():
samples = defaultdict(float)
sample_timestamps = defaultdict(float)
samples = defaultdict(lambda: defaultdict(float))
sample_timestamps = defaultdict(lambda: defaultdict(float))
buckets = defaultdict(lambda: defaultdict(float))
samples_setdefault = samples.setdefault
for s in metric.samples:
name, labels, value, timestamp, exemplar, native_histogram_value = s

if (
metric.type == 'gauge'
and metric._multiprocess_mode in (
'min', 'livemin',
'max', 'livemax',
'sum', 'livesum',
'mostrecent', 'livemostrecent',
)
):
labels = tuple(l for l in labels if l[0] != 'pid')

if metric.type == 'gauge':
without_pid_key = (name, tuple(l for l in labels if l[0] != 'pid'))
if metric._multiprocess_mode in ('min', 'livemin'):
current = samples_setdefault(without_pid_key, value)
current = samples[labels].setdefault((name, labels), value)
if value < current:
samples[without_pid_key] = value
samples[labels][(name, labels)] = value
elif metric._multiprocess_mode in ('max', 'livemax'):
current = samples_setdefault(without_pid_key, value)
current = samples[labels].setdefault((name, labels), value)
if value > current:
samples[without_pid_key] = value
samples[labels][(name, labels)] = value
elif metric._multiprocess_mode in ('sum', 'livesum'):
samples[without_pid_key] += value
samples[labels][(name, labels)] += value
elif metric._multiprocess_mode in ('mostrecent', 'livemostrecent'):
current_timestamp = sample_timestamps[without_pid_key]
current_timestamp = sample_timestamps[labels][name]
timestamp = float(timestamp or 0)
if current_timestamp < timestamp:
samples[without_pid_key] = value
sample_timestamps[without_pid_key] = timestamp
samples[labels][(name, labels)] = value
sample_timestamps[labels][name] = timestamp
else: # all/liveall
samples[(name, labels)] = value
samples[labels][(name, labels)] = value

elif metric.type == 'histogram':
# A for loop with early exit is faster than a genexpr
Expand All @@ -127,10 +137,10 @@ def _accumulate_metrics(metrics, accumulate):
break
else: # did not find the `le` key
# _sum/_count
samples[(name, labels)] += value
samples[labels][(name, labels)] += value
else:
# Counter and Summary.
samples[(name, labels)] += value
samples[labels][(name, labels)] += value

# Accumulate bucket values.
if metric.type == 'histogram':
Expand All @@ -143,14 +153,17 @@ def _accumulate_metrics(metrics, accumulate):
)
if accumulate:
acc += value
samples[sample_key] = acc
samples[labels][sample_key] = acc
else:
samples[sample_key] = value
samples[labels][sample_key] = value
if accumulate:
samples[(metric.name + '_count', labels)] = acc
samples[labels][(metric.name + '_count', labels)] = acc

# Convert to correct sample format.
metric.samples = [Sample(name_, dict(labels), value) for (name_, labels), value in samples.items()]
metric.samples = []
for _, samples_by_labels in samples.items():
for (name_, labels), value in samples_by_labels.items():
metric.samples.append(Sample(name_, dict(labels), value))
return metrics.values()

def collect(self):
Expand Down
70 changes: 62 additions & 8 deletions tests/test_multiprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,10 +276,8 @@ def add_label(key, value):
Sample('g', add_label('pid', '1'), 1.0),
])

metrics['h'].samples.sort(
key=lambda x: (x[0], float(x[1].get('le', 0)))
)
expected_histogram = [
Sample('h_sum', labels, 6.0),
Sample('h_bucket', add_label('le', '0.005'), 0.0),
Sample('h_bucket', add_label('le', '0.01'), 0.0),
Sample('h_bucket', add_label('le', '0.025'), 0.0),
Expand All @@ -296,7 +294,66 @@ def add_label(key, value):
Sample('h_bucket', add_label('le', '10.0'), 2.0),
Sample('h_bucket', add_label('le', '+Inf'), 2.0),
Sample('h_count', labels, 2.0),
Sample('h_sum', labels, 6.0),
]

self.assertEqual(metrics['h'].samples, expected_histogram)

def test_collect_histogram_ordering(self):
pid = 0
values.ValueClass = MultiProcessValue(lambda: pid)
labels = {i: i for i in 'abcd'}

def add_label(key, value):
l = labels.copy()
l[key] = value
return l

h = Histogram('h', 'help', labelnames=['view'], registry=None)

h.labels(view='view1').observe(1)

pid = 1

h.labels(view='view1').observe(5)
h.labels(view='view2').observe(1)

metrics = {m.name: m for m in self.collector.collect()}

expected_histogram = [
Sample('h_sum', {'view': 'view1'}, 6.0),
Sample('h_bucket', {'view': 'view1', 'le': '0.005'}, 0.0),
Sample('h_bucket', {'view': 'view1', 'le': '0.01'}, 0.0),
Sample('h_bucket', {'view': 'view1', 'le': '0.025'}, 0.0),
Sample('h_bucket', {'view': 'view1', 'le': '0.05'}, 0.0),
Sample('h_bucket', {'view': 'view1', 'le': '0.075'}, 0.0),
Sample('h_bucket', {'view': 'view1', 'le': '0.1'}, 0.0),
Sample('h_bucket', {'view': 'view1', 'le': '0.25'}, 0.0),
Sample('h_bucket', {'view': 'view1', 'le': '0.5'}, 0.0),
Sample('h_bucket', {'view': 'view1', 'le': '0.75'}, 0.0),
Sample('h_bucket', {'view': 'view1', 'le': '1.0'}, 1.0),
Sample('h_bucket', {'view': 'view1', 'le': '2.5'}, 1.0),
Sample('h_bucket', {'view': 'view1', 'le': '5.0'}, 2.0),
Sample('h_bucket', {'view': 'view1', 'le': '7.5'}, 2.0),
Sample('h_bucket', {'view': 'view1', 'le': '10.0'}, 2.0),
Sample('h_bucket', {'view': 'view1', 'le': '+Inf'}, 2.0),
Sample('h_count', {'view': 'view1'}, 2.0),
Sample('h_sum', {'view': 'view2'}, 1.0),
Sample('h_bucket', {'view': 'view2', 'le': '0.005'}, 0.0),
Sample('h_bucket', {'view': 'view2', 'le': '0.01'}, 0.0),
Sample('h_bucket', {'view': 'view2', 'le': '0.025'}, 0.0),
Sample('h_bucket', {'view': 'view2', 'le': '0.05'}, 0.0),
Sample('h_bucket', {'view': 'view2', 'le': '0.075'}, 0.0),
Sample('h_bucket', {'view': 'view2', 'le': '0.1'}, 0.0),
Sample('h_bucket', {'view': 'view2', 'le': '0.25'}, 0.0),
Sample('h_bucket', {'view': 'view2', 'le': '0.5'}, 0.0),
Sample('h_bucket', {'view': 'view2', 'le': '0.75'}, 0.0),
Sample('h_bucket', {'view': 'view2', 'le': '1.0'}, 1.0),
Sample('h_bucket', {'view': 'view2', 'le': '2.5'}, 1.0),
Sample('h_bucket', {'view': 'view2', 'le': '5.0'}, 1.0),
Sample('h_bucket', {'view': 'view2', 'le': '7.5'}, 1.0),
Sample('h_bucket', {'view': 'view2', 'le': '10.0'}, 1.0),
Sample('h_bucket', {'view': 'view2', 'le': '+Inf'}, 1.0),
Sample('h_count', {'view': 'view2'}, 1.0),
]

self.assertEqual(metrics['h'].samples, expected_histogram)
Expand Down Expand Up @@ -347,10 +404,8 @@ def add_label(key, value):
m.name: m for m in self.collector.merge(files, accumulate=False)
}

metrics['h'].samples.sort(
key=lambda x: (x[0], float(x[1].get('le', 0)))
)
expected_histogram = [
Sample('h_sum', labels, 6.0),
Sample('h_bucket', add_label('le', '0.005'), 0.0),
Sample('h_bucket', add_label('le', '0.01'), 0.0),
Sample('h_bucket', add_label('le', '0.025'), 0.0),
Expand All @@ -366,7 +421,6 @@ def add_label(key, value):
Sample('h_bucket', add_label('le', '7.5'), 0.0),
Sample('h_bucket', add_label('le', '10.0'), 0.0),
Sample('h_bucket', add_label('le', '+Inf'), 0.0),
Sample('h_sum', labels, 6.0),
]

self.assertEqual(metrics['h'].samples, expected_histogram)
Expand Down