Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions snuba/web/rpc/v1/endpoint_get_trace.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
import random
import uuid
from datetime import datetime
Expand Down Expand Up @@ -58,6 +59,7 @@
"project_id",
"trace_id",
"sampling_factor",
"attributes_bool",
]
APPLY_FINAL_ROLLOUT_PERCENTAGE_CONFIG_KEY = "EndpointGetTrace.apply_final_rollout_percentage"

Expand Down Expand Up @@ -219,6 +221,14 @@ def _build_query(
tuple(column(f"attributes_float_{i}") for i in range(40)),
),
),
SelectedExpression(
name=("attributes_array"),
expression=FunctionCall(
"attributes_array",
"toJSONString",
(column("attributes_array"),),
),
),
]
selected_columns.extend(
map(
Expand Down Expand Up @@ -418,6 +428,21 @@ def _value_to_attribute(key: str, value: Any) -> tuple[AttributeKey, AttributeVa
)


def _transform_array_value(value: dict[str, str]) -> Any:
for t, v in value.items():
if t == "Int":
return int(v)
return v


def _process_arrays(raw: str) -> dict[str, list[Any]]:
parsed = json.loads(raw)
arrays = {}
for key, values in parsed.items():
arrays[key] = [_transform_array_value(v) for v in values]
return arrays


def _process_results(
data: Iterable[Dict[str, Any]],
) -> ProcessedResults:
Expand All @@ -433,6 +458,7 @@ def _process_results(
for row in data:
id = row.pop("id")
ts = row.pop("timestamp")
arrays = row.pop("attributes_array")
last_seen_timestamp_precise = float(ts)
last_seen_id = id

Expand All @@ -459,6 +485,10 @@ def add_attribute(key: str, value: Any) -> None:
else:
add_attribute(key, value)

attributes_array = _process_arrays(arrays)
for key, value in attributes_array.items():
add_attribute(k, v)

item = GetTraceResponse.Item(
Comment on lines 533 to 543

This comment was marked as outdated.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a real bug, was this come across in tests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it was due to a late refactor before I commit (rename k/v to key/value) and didn't run the tests in-between.

id=id,
timestamp=timestamp,
Expand Down
12 changes: 12 additions & 0 deletions tests/datasets/configuration/test_storage_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from typing import Any

from snuba.clickhouse.columns import (
JSON,
Array,
Bool,
Column,
Expand Down Expand Up @@ -196,6 +197,13 @@ def test_column_parser(self) -> None:
"schema_modifiers": ["nullable"],
},
},
{
"name": "json_col",
"type": "JSON",
"args": {
"max_dynamic_paths": 128,
},
},
]

expected_python_columns = [
Expand All @@ -222,6 +230,10 @@ def test_column_parser(self) -> None:
SchemaModifiers(nullable=True, readonly=False),
),
),
Column(
"json_col",
JSON(max_dynamic_paths=128),
),
]

assert parse_columns(serialized_columns) == expected_python_columns
40 changes: 29 additions & 11 deletions tests/web/rpc/v1/test_endpoint_get_trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@
ResponseMeta,
TraceItemType,
)
from sentry_protos.snuba.v1.trace_item_attribute_pb2 import AttributeKey, AttributeValue
from sentry_protos.snuba.v1.trace_item_attribute_pb2 import (
Array,
AttributeKey,
AttributeValue,
)
from sentry_protos.snuba.v1.trace_item_pb2 import AnyValue, TraceItem

from snuba import state
Expand Down Expand Up @@ -79,14 +83,14 @@
for i in range(10)
]


_PROTOBUF_TO_SENTRY_PROTOS = {
_PROTOBUF_TO_SENTRY_PROTOS: dict[str, tuple[str, AttributeKey.Type.ValueType]] = {
"string_value": ("val_str", AttributeKey.Type.TYPE_STRING),
"double_value": ("val_double", AttributeKey.Type.TYPE_DOUBLE),
# we store integers as double
"int_value": ("val_double", AttributeKey.Type.TYPE_DOUBLE),
# we store boolean as double
"bool_value": ("val_double", AttributeKey.Type.TYPE_DOUBLE),
"array_value": ("val_array", AttributeKey.Type.TYPE_ARRAY),
}


Expand All @@ -111,18 +115,29 @@ def get_attributes(
value=attribute_value,
)
)
for key, value in span.attributes.items():

def _convert_to_attribute_value(value: AnyValue) -> AttributeValue:
value_type = value.WhichOneof("value")
if value_type:
attribute_key = AttributeKey(
name=key,
type=_PROTOBUF_TO_SENTRY_PROTOS[value_type][1],
)
args = {_PROTOBUF_TO_SENTRY_PROTOS[value_type][0]: getattr(value, value_type)}
arg_name = _PROTOBUF_TO_SENTRY_PROTOS[str(value_type)][0]
arg_value = getattr(value, value_type)
if value_type == "array_value":
arg_value = Array(values=[_convert_to_attribute_value(v) for v in arg_value.values])
args = {arg_name: arg_value}
else:
continue
args = {"is_null": True}

return AttributeValue(**args)

attribute_value = AttributeValue(**args)
for key, value in span.attributes.items():
value_type = value.WhichOneof("value")
if not value_type:
continue
attribute_key = AttributeKey(
name=key,
type=_PROTOBUF_TO_SENTRY_PROTOS[str(value_type)][1],
)
attribute_value = _convert_to_attribute_value(value)
attributes.append(
GetTraceResponse.Item.Attribute(
key=attribute_key,
Expand All @@ -134,7 +149,10 @@ def get_attributes(

@pytest.fixture(autouse=False)
def setup_teardown(clickhouse_db: None, redis_db: None) -> None:
state.set_config("eap_items_consumer_insert_arrays", "1")

items_storage = get_storage(StorageKey("eap_items"))

write_raw_unprocessed_events(items_storage, _SPANS) # type: ignore
write_raw_unprocessed_events(items_storage, _LOGS) # type: ignore

Expand Down
35 changes: 27 additions & 8 deletions tests/web/rpc/v1/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
OrFilter,
TraceItemFilter,
)
from sentry_protos.snuba.v1.trace_item_pb2 import AnyValue, TraceItem
from sentry_protos.snuba.v1.trace_item_pb2 import AnyValue, ArrayValue, TraceItem

from snuba.datasets.storages.factory import get_storage
from snuba.datasets.storages.storage_key import StorageKey
Expand Down Expand Up @@ -69,6 +69,17 @@
"transaction.method": AnyValue(string_value="POST"),
"transaction.op": AnyValue(string_value="http.server"),
"user": AnyValue(string_value="ip:127.0.0.1"),
"i_am_an_array": AnyValue(
array_value=ArrayValue(
values=[
AnyValue(int_value=1),
AnyValue(int_value=2),
AnyValue(int_value=3),
AnyValue(int_value=4),
AnyValue(int_value=5),
]
)
),
}


Expand All @@ -89,16 +100,24 @@ def write_eap_item(
count: the number of these spans to write.
"""

attributes: dict[str, AnyValue] = {}
for key, value in raw_attributes.items():
def convert_attribute_value(v: Any) -> AnyValue:
if isinstance(value, str):
attributes[key] = AnyValue(string_value=value)
return AnyValue(string_value=value)
elif isinstance(value, int):
attributes[key] = AnyValue(int_value=value)
return AnyValue(int_value=value)
elif isinstance(value, bool):
attributes[key] = AnyValue(bool_value=value)
else:
attributes[key] = AnyValue(double_value=value)
return AnyValue(bool_value=value)
elif isinstance(value, float):
return AnyValue(double_value=value)
elif isinstance(value, list):
return AnyValue(
array_value=ArrayValue(values=[convert_attribute_value(v) for v in value])
)
return AnyValue()

attributes: dict[str, AnyValue] = {}
for key, value in raw_attributes.items():
attributes[key] = convert_attribute_value(value)

write_raw_unprocessed_events(
get_storage(StorageKey("eap_items")), # type: ignore
Expand Down
Loading