From ae3bbc5be54c2e0d91ed4763d411e2ccb619923a Mon Sep 17 00:00:00 2001 From: Armin Ronacher Date: Fri, 3 Sep 2021 21:29:53 +0200 Subject: [PATCH 01/11] feat(client-reports): Added initial support for client reports --- relay-common/src/constants.rs | 6 +- relay-common/src/time.rs | 7 +- relay-config/src/config.rs | 8 +++ relay-general/src/protocol/client_report.rs | 22 +++++++ relay-general/src/protocol/mod.rs | 2 + relay-server/src/actors/envelopes.rs | 72 +++++++++++++++++++-- relay-server/src/actors/outcome.rs | 9 ++- relay-server/src/endpoints/common.rs | 3 + relay-server/src/envelope.rs | 7 +- relay-server/src/utils/rate_limits.rs | 11 +++- 10 files changed, 137 insertions(+), 10 deletions(-) create mode 100644 relay-general/src/protocol/client_report.rs diff --git a/relay-common/src/constants.rs b/relay-common/src/constants.rs index 7fd4c665678..bbba6eb1e1b 100644 --- a/relay-common/src/constants.rs +++ b/relay-common/src/constants.rs @@ -96,6 +96,10 @@ impl fmt::Display for EventType { #[repr(i8)] pub enum DataCategory { /// Reserved and unused. + /// + /// There is a reserved `internal` data category which also maps to `Default`. Relays + /// will never emit rate limits for `internal` but client SDKs can assume a data category + /// of `internal` for envelopes carrying internal messages for instance SDK outcomes. Default = 0, /// Error events and Events with an `event_type` not explicitly listed below. Error = 1, @@ -116,7 +120,7 @@ impl DataCategory { /// Returns the data category corresponding to the given name. pub fn from_name(string: &str) -> Self { match string { - "default" => Self::Default, + "default" | "internal" => Self::Default, "error" => Self::Error, "transaction" => Self::Transaction, "security" => Self::Security, diff --git a/relay-common/src/time.rs b/relay-common/src/time.rs index 8da0f15eed8..2019b865a52 100644 --- a/relay-common/src/time.rs +++ b/relay-common/src/time.rs @@ -3,7 +3,7 @@ use std::fmt; use std::time::{Duration, Instant, SystemTime}; -use chrono::{DateTime, TimeZone, Utc}; +use chrono::{DateTime, NaiveDateTime, TimeZone, Utc}; use serde::{Deserialize, Serialize}; /// Converts an `Instant` into a `SystemTime`. @@ -83,6 +83,11 @@ impl UnixTimestamp { self.0 } + /// Returns the timestamp as chrono datetime. + pub fn as_datetime(self) -> DateTime { + DateTime::from_utc(NaiveDateTime::from_timestamp(self.0 as i64, 0), Utc) + } + /// Converts the UNIX timestamp into an `Instant` based on the current system timestamp. /// /// Returns [`MonotonicResult::Instant`] if the timestamp can be represented. Otherwise, returns diff --git a/relay-config/src/config.rs b/relay-config/src/config.rs index 896f982b7a3..40e797c9fe5 100644 --- a/relay-config/src/config.rs +++ b/relay-config/src/config.rs @@ -501,6 +501,8 @@ struct Limits { max_attachment_size: ByteSize, /// The maximum combined size for all attachments in an envelope or request. max_attachments_size: ByteSize, + /// The maximum combined size for all client reports in an envelope or request. + max_client_reports_size: ByteSize, /// The maximum payload size for an entire envelopes. Individual limits still apply. max_envelope_size: ByteSize, /// The maximum number of session items per envelope. @@ -539,6 +541,7 @@ impl Default for Limits { max_event_size: ByteSize::mebibytes(1), max_attachment_size: ByteSize::mebibytes(100), max_attachments_size: ByteSize::mebibytes(100), + max_client_reports_size: ByteSize::kibibytes(4), max_envelope_size: ByteSize::mebibytes(100), max_session_count: 100, max_api_payload_size: ByteSize::mebibytes(20), @@ -1451,6 +1454,11 @@ impl Config { self.values.limits.max_attachments_size.as_bytes() } + /// Returns the maxmium combined size of client reports in bytes. + pub fn max_client_reports_size(&self) -> usize { + self.values.limits.max_client_reports_size.as_bytes() + } + /// Returns the maximum size of an envelope payload in bytes. /// /// Individual item size limits still apply. diff --git a/relay-general/src/protocol/client_report.rs b/relay-general/src/protocol/client_report.rs new file mode 100644 index 00000000000..72eeb851abb --- /dev/null +++ b/relay-general/src/protocol/client_report.rs @@ -0,0 +1,22 @@ +use relay_common::{DataCategory, UnixTimestamp}; +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +pub struct ClientReport { + /// The timestamp of when the report was created. + pub timestamp: Option, + /// Discard reason counters. + pub discarded_events: Vec<(String, DataCategory, u32)>, +} + +impl ClientReport { + /// Parses a client report update from JSON. + pub fn parse(payload: &[u8]) -> Result { + serde_json::from_slice(payload) + } + + /// Serializes a client report update back into JSON. + pub fn serialize(&self) -> Result, serde_json::Error> { + serde_json::to_vec(self) + } +} diff --git a/relay-general/src/protocol/mod.rs b/relay-general/src/protocol/mod.rs index a7925eac5ce..080ef758274 100644 --- a/relay-general/src/protocol/mod.rs +++ b/relay-general/src/protocol/mod.rs @@ -2,6 +2,7 @@ mod breadcrumb; mod breakdowns; +mod client_report; mod clientsdk; mod constants; mod contexts; @@ -32,6 +33,7 @@ pub use sentry_release_parser::{validate_environment, validate_release}; pub use self::breadcrumb::Breadcrumb; pub use self::breakdowns::Breakdowns; +pub use self::client_report::ClientReport; pub use self::clientsdk::{ClientSdkInfo, ClientSdkPackage}; pub use self::constants::VALID_PLATFORMS; pub use self::contexts::{ diff --git a/relay-server/src/actors/envelopes.rs b/relay-server/src/actors/envelopes.rs index 7f63e601c32..b2c30e22d3a 100644 --- a/relay-server/src/actors/envelopes.rs +++ b/relay-server/src/actors/envelopes.rs @@ -20,9 +20,9 @@ use relay_config::{Config, HttpEncoding, RelayMode}; use relay_general::pii::{PiiAttachmentsProcessor, PiiProcessor}; use relay_general::processor::{process_value, ProcessingState}; use relay_general::protocol::{ - self, Breadcrumb, Csp, Event, EventId, EventType, ExpectCt, ExpectStaple, Hpkp, IpAddr, - LenientString, Metrics, RelayInfo, SecurityReportType, SessionUpdate, Timestamp, UserReport, - Values, + self, Breadcrumb, ClientReport, Csp, Event, EventId, EventType, ExpectCt, ExpectStaple, Hpkp, + IpAddr, LenientString, Metrics, RelayInfo, SecurityReportType, SessionUpdate, Timestamp, + UserReport, Values, }; use relay_general::store::ClockDriftProcessor; use relay_general::types::{Annotated, Array, FromValue, Object, ProcessingAction, Value}; @@ -273,7 +273,10 @@ impl EnvelopeContext { event_id: self.event_id, remote_addr: self.remote_addr, category: DataCategory::Attachment, - quantity: self.summary.attachment_quantity, + // XXX: attachment_quantity is usize which lets us go all the way to + // 64bit on our machines, but the protocl and data store can only + // do 32. + quantity: self.summary.attachment_quantity as u32, }); } } @@ -765,6 +768,65 @@ impl EnvelopeProcessor { }); } + /// Validates and extracts client reports. + /// + /// At the moment client reports are primarily used to transfer outcomes from + /// client SDKs. The outcomes are removed here and sent directly to the outcomes + /// system. + fn process_client_reports(&self, state: &mut ProcessEnvelopeState) { + let mut timestamp = None; + let mut discarded_events = BTreeMap::new(); + let received = state.envelope_context.received_at; + + let clock_drift_processor = ClockDriftProcessor::new(state.envelope.sent_at(), received) + .at_least(MINIMUM_CLOCK_DRIFT); + + // we're going through all client reports but we're effectively just merging + // them into the first one. + state.envelope.retain_items(|item| { + if item.ty() != ItemType::ClientReport { + return true; + }; + match ClientReport::parse(&item.payload()) { + Ok(report) => { + for (reason, category, quantity) in report.discarded_events.into_iter() { + if reason.len() > 200 { + relay_log::trace!("ignored client outcome with an overlong reason"); + continue; + } + *discarded_events.entry((reason, category)).or_insert(0) += quantity; + } + if let Some(ts) = report.timestamp { + timestamp.get_or_insert(ts); + } + } + Err(err) => relay_log::trace!("invalid client report received: {}", LogError(&err)), + } + false + }); + + let timestamp = + timestamp.get_or_insert_with(|| UnixTimestamp::from_secs(received.timestamp() as u64)); + + if clock_drift_processor.is_drifted() { + relay_log::trace!("applying clock drift correction to client report"); + clock_drift_processor.process_timestamp(timestamp); + } + + let producer = OutcomeProducer::from_registry(); + for ((reason, category), quantity) in discarded_events.into_iter() { + producer.do_send(TrackOutcome { + timestamp: timestamp.as_datetime(), + scoping: state.envelope_context.scoping.clone(), + outcome: Outcome::ClientDiscard(reason), + event_id: None, + remote_addr: state.envelope_context.remote_addr, + category, + quantity, + }); + } + } + /// Creates and initializes the processing state. /// /// This applies defaults to the envelope and initializes empty rate limits. @@ -1064,6 +1126,7 @@ impl EnvelopeProcessor { ItemType::Sessions => false, ItemType::Metrics => false, ItemType::MetricBuckets => false, + ItemType::ClientReport => false, } } @@ -1553,6 +1616,7 @@ impl EnvelopeProcessor { } self.process_sessions(&mut state); + self.process_client_reports(&mut state); self.process_user_reports(&mut state); if state.creates_event() { diff --git a/relay-server/src/actors/outcome.rs b/relay-server/src/actors/outcome.rs index e46368710a0..2144659bc7d 100644 --- a/relay-server/src/actors/outcome.rs +++ b/relay-server/src/actors/outcome.rs @@ -84,7 +84,7 @@ pub struct TrackOutcome { /// The event's data category. pub category: DataCategory, /// The number of events or total attachment size in bytes. - pub quantity: usize, + pub quantity: u32, } impl Message for TrackOutcome { @@ -114,6 +114,9 @@ pub enum Outcome { /// The event has been discarded because of invalid data. Invalid(DiscardReason), + /// The event has already been discarded on the client side. + ClientDiscard(String), + /// Reserved but unused in Sentry. #[allow(dead_code)] Abuse, @@ -127,6 +130,7 @@ impl Outcome { Outcome::RateLimited(_) => 2, Outcome::Invalid(_) => 3, Outcome::Abuse => 4, + Outcome::ClientDiscard(_) => 5, } } @@ -140,6 +144,7 @@ impl Outcome { Outcome::RateLimited(code_opt) => code_opt .as_ref() .map(|code| Cow::Owned(code.as_str().into())), + Outcome::ClientDiscard(ref discard_reason) => Some(Cow::Borrowed(discard_reason)), Outcome::Abuse => None, } } @@ -319,7 +324,7 @@ pub struct TrackRawOutcome { pub category: Option, /// The number of events or total attachment size in bytes. #[serde(default, skip_serializing_if = "Option::is_none")] - pub quantity: Option, + pub quantity: Option, } impl TrackRawOutcome { diff --git a/relay-server/src/endpoints/common.rs b/relay-server/src/endpoints/common.rs index 6052ca83787..97993cee21a 100644 --- a/relay-server/src/endpoints/common.rs +++ b/relay-server/src/endpoints/common.rs @@ -335,6 +335,7 @@ fn check_envelope_size_limits(config: &Config, envelope: &Envelope) -> bool { let mut event_size = 0; let mut attachments_size = 0; let mut session_count = 0; + let mut client_reports_size = 0; for item in envelope.items() { match item.ty() { @@ -355,12 +356,14 @@ fn check_envelope_size_limits(config: &Config, envelope: &Envelope) -> bool { ItemType::UserReport => (), ItemType::Metrics => (), ItemType::MetricBuckets => (), + ItemType::ClientReport => client_reports_size += item.len(), } } event_size <= config.max_event_size() && attachments_size <= config.max_attachments_size() && session_count <= config.max_session_count() + && client_reports_size <= config.max_client_reports_size() } /// Handles Sentry events. diff --git a/relay-server/src/envelope.rs b/relay-server/src/envelope.rs index c3a57824184..afc48252f56 100644 --- a/relay-server/src/envelope.rs +++ b/relay-server/src/envelope.rs @@ -100,6 +100,8 @@ pub enum ItemType { Metrics, /// Buckets of preaggregated metrics encoded as JSON. MetricBuckets, + /// Client internal report (eg: outcomes). + ClientReport, } impl ItemType { @@ -130,6 +132,7 @@ impl fmt::Display for ItemType { Self::Sessions => write!(f, "aggregated sessions"), Self::Metrics => write!(f, "metrics"), Self::MetricBuckets => write!(f, "metric buckets"), + Self::ClientReport => write!(f, "client report"), } } } @@ -548,7 +551,8 @@ impl Item { | ItemType::Session | ItemType::Sessions | ItemType::Metrics - | ItemType::MetricBuckets => false, + | ItemType::MetricBuckets + | ItemType::ClientReport => false, } } @@ -569,6 +573,7 @@ impl Item { ItemType::Sessions => false, ItemType::Metrics => false, ItemType::MetricBuckets => false, + ItemType::ClientReport => false, } } } diff --git a/relay-server/src/utils/rate_limits.rs b/relay-server/src/utils/rate_limits.rs index b4dacd9dbcb..0b171f7794c 100644 --- a/relay-server/src/utils/rate_limits.rs +++ b/relay-server/src/utils/rate_limits.rs @@ -102,6 +102,13 @@ fn infer_event_category(item: &Item) -> Option { ItemType::MetricBuckets => None, ItemType::FormData => None, ItemType::UserReport => None, + // the following items are "internal" item types. From the perspective of the SDK + // the use the "internal" data category however this data category is in fact never + // supposed to be emitted by relay as internal items must not be rate limited. As + // such we do not emit a data category here. An SDK however is supposed to assume + // that such item types use the reserved "internal" data category to not accidentally + // assume another rate limit (such as default). + ItemType::ClientReport => None, } } @@ -242,7 +249,9 @@ impl Enforcement { event_id: envelope.event_id(), remote_addr: envelope.meta().remote_addr(), category: limit.category, - quantity: limit.quantity, + // XXX: on the limiter we have quantity of usize, but in the protocol + // and data store we're limited to u32. + quantity: limit.quantity as u32, }); } } From 441fd4d6a46fe4de39054282a334446c248851b8 Mon Sep 17 00:00:00 2001 From: Armin Ronacher Date: Fri, 3 Sep 2021 21:46:51 +0200 Subject: [PATCH 02/11] test: added tests for basic client reports --- relay-common/src/constants.rs | 13 ++++++ relay-common/src/time.rs | 7 ++++ relay-general/src/protocol/client_report.rs | 44 +++++++++++++++++++++ 3 files changed, 64 insertions(+) diff --git a/relay-common/src/constants.rs b/relay-common/src/constants.rs index bbba6eb1e1b..fa50152a0ff 100644 --- a/relay-common/src/constants.rs +++ b/relay-common/src/constants.rs @@ -332,3 +332,16 @@ impl fmt::Display for SpanStatus { } } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_internal_data_category() { + assert_eq!( + DataCategory::from_str("internal"), + Ok(DataCategory::Default) + ); + } +} diff --git a/relay-common/src/time.rs b/relay-common/src/time.rs index 2019b865a52..6776472ff52 100644 --- a/relay-common/src/time.rs +++ b/relay-common/src/time.rs @@ -144,6 +144,7 @@ impl std::ops::Sub for UnixTimestamp { } } +#[derive(Debug)] /// An error returned from parsing [`UnixTimestamp`]. pub struct ParseUnixTimestampError(()); @@ -151,6 +152,12 @@ impl std::str::FromStr for UnixTimestamp { type Err = ParseUnixTimestampError; fn from_str(s: &str) -> Result { + if let Ok(datetime) = s.parse::>() { + let timestamp = datetime.timestamp(); + if timestamp >= 0 { + return Ok(UnixTimestamp(timestamp as u64)); + } + } let ts = s.parse().or(Err(ParseUnixTimestampError(())))?; Ok(Self(ts)) } diff --git a/relay-general/src/protocol/client_report.rs b/relay-general/src/protocol/client_report.rs index 72eeb851abb..e39079bfda6 100644 --- a/relay-general/src/protocol/client_report.rs +++ b/relay-general/src/protocol/client_report.rs @@ -20,3 +20,47 @@ impl ClientReport { serde_json::to_vec(self) } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_client_report_roundtrip() { + let json = r#"{ + "timestamp": "2020-02-07T15:17:00Z", + "discarded_events": [ + ["foo_reason", "error", 42], + ["foo_reason", "transaction", 23] + ] +}"#; + + let output = r#"{ + "timestamp": 1581088620, + "discarded_events": [ + [ + "foo_reason", + "error", + 42 + ], + [ + "foo_reason", + "transaction", + 23 + ] + ] +}"#; + + let update = ClientReport { + timestamp: Some("2020-02-07T15:17:00Z".parse().unwrap()), + discarded_events: vec![ + ("foo_reason".into(), DataCategory::Error, 42), + ("foo_reason".into(), DataCategory::Transaction, 23), + ], + }; + + let parsed = ClientReport::parse(json.as_bytes()).unwrap(); + assert_eq_dbg!(update, parsed); + assert_eq_str!(output, serde_json::to_string_pretty(&update).unwrap()); + } +} From b2df16beaa61b32d1fb0d85eb7e233caea75b085 Mon Sep 17 00:00:00 2001 From: Armin Ronacher Date: Fri, 3 Sep 2021 22:00:32 +0200 Subject: [PATCH 03/11] test: added test for removal of client report from envelopes --- relay-server/src/actors/envelopes.rs | 45 ++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/relay-server/src/actors/envelopes.rs b/relay-server/src/actors/envelopes.rs index b2c30e22d3a..dec0687daad 100644 --- a/relay-server/src/actors/envelopes.rs +++ b/relay-server/src/actors/envelopes.rs @@ -2680,6 +2680,51 @@ mod tests { assert_eq!(new_envelope.items().next().unwrap().ty(), ItemType::Event); } + #[test] + fn test_client_report_removal() { + let processor = EnvelopeProcessor::new(Arc::new(Default::default())); + + let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42" + .parse() + .unwrap(); + + let request_meta = RequestMeta::new(dsn); + let mut envelope = Envelope::from_request(None, request_meta); + + envelope.add_item({ + let mut item = Item::new(ItemType::ClientReport); + item.set_payload( + ContentType::Json, + r###" + { + "discarded_events": [ + ["queue_full", "error", 42] + ] + } + "###, + ); + item + }); + + let envelope_response = processor + .process(ProcessEnvelope { + envelope, + project_state: Arc::new(ProjectState::allowed()), + start_time: Instant::now(), + scoping: Scoping { + project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), + organization_id: 1, + project_id: ProjectId::new(1), + key_id: None, + }, + }) + .unwrap(); + + let new_envelope = envelope_response.envelope.unwrap(); + + assert_eq!(new_envelope.len(), 0); + } + #[test] #[cfg(feature = "processing")] fn test_extract_session_metrics() { From 42e24328ee79c89878db0f95a594c1784338600c Mon Sep 17 00:00:00 2001 From: Armin Ronacher Date: Fri, 3 Sep 2021 22:07:09 +0200 Subject: [PATCH 04/11] fix: clippy --- relay-server/src/actors/envelopes.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/relay-server/src/actors/envelopes.rs b/relay-server/src/actors/envelopes.rs index dec0687daad..e82a6341275 100644 --- a/relay-server/src/actors/envelopes.rs +++ b/relay-server/src/actors/envelopes.rs @@ -817,7 +817,7 @@ impl EnvelopeProcessor { for ((reason, category), quantity) in discarded_events.into_iter() { producer.do_send(TrackOutcome { timestamp: timestamp.as_datetime(), - scoping: state.envelope_context.scoping.clone(), + scoping: state.envelope_context.scoping, outcome: Outcome::ClientDiscard(reason), event_id: None, remote_addr: state.envelope_context.remote_addr, From 00be1afa9199ad8cb7449dc5cf1c59bc1bb64975 Mon Sep 17 00:00:00 2001 From: Armin Ronacher Date: Fri, 3 Sep 2021 22:39:22 +0200 Subject: [PATCH 05/11] fix: tests --- relay-server/src/actors/envelopes.rs | 38 ++++++++++++++++------------ relay-test/src/lib.rs | 10 ++++++++ 2 files changed, 32 insertions(+), 16 deletions(-) diff --git a/relay-server/src/actors/envelopes.rs b/relay-server/src/actors/envelopes.rs index e82a6341275..93e528adafe 100644 --- a/relay-server/src/actors/envelopes.rs +++ b/relay-server/src/actors/envelopes.rs @@ -813,6 +813,10 @@ impl EnvelopeProcessor { clock_drift_processor.process_timestamp(timestamp); } + if discarded_events.is_empty() { + return; + } + let producer = OutcomeProducer::from_registry(); for ((reason, category), quantity) in discarded_events.into_iter() { producer.do_send(TrackOutcome { @@ -2682,6 +2686,8 @@ mod tests { #[test] fn test_client_report_removal() { + relay_test::setup(); + let processor = EnvelopeProcessor::new(Arc::new(Default::default())); let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42" @@ -2706,23 +2712,23 @@ mod tests { item }); - let envelope_response = processor - .process(ProcessEnvelope { - envelope, - project_state: Arc::new(ProjectState::allowed()), - start_time: Instant::now(), - scoping: Scoping { - project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), - organization_id: 1, - project_id: ProjectId::new(1), - key_id: None, - }, - }) - .unwrap(); - - let new_envelope = envelope_response.envelope.unwrap(); + let envelope_response = relay_test::with_system(move || { + processor + .process(ProcessEnvelope { + envelope, + project_state: Arc::new(ProjectState::allowed()), + start_time: Instant::now(), + scoping: Scoping { + project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), + organization_id: 1, + project_id: ProjectId::new(1), + key_id: None, + }, + }) + .unwrap() + }); - assert_eq!(new_envelope.len(), 0); + assert!(envelope_response.envelope.is_none()); } #[test] diff --git a/relay-test/src/lib.rs b/relay-test/src/lib.rs index 93b0136b3dd..a7801f1b04e 100644 --- a/relay-test/src/lib.rs +++ b/relay-test/src/lib.rs @@ -112,6 +112,16 @@ where }) } +/// Runs the provided function with an active actix system. +/// +/// This function otherwise functions exactly as [`block_fn`]. +pub fn with_system(func: F) -> R +where + F: FnOnce() -> R, +{ + block_fn(move || future::ok::<_, ()>(func())).unwrap() +} + /// Returns a future which completes after the requested delay. /// ``` /// use std::time::Duration; From 956e0b382a0f9917657d5cc7fcc7a613198e00a2 Mon Sep 17 00:00:00 2001 From: Armin Ronacher Date: Sat, 4 Sep 2021 23:00:39 +0200 Subject: [PATCH 06/11] test: added integration test for sdk outcomes --- tests/integration/fixtures/__init__.py | 5 +++ tests/integration/test_client_report.py | 60 +++++++++++++++++++++++++ 2 files changed, 65 insertions(+) create mode 100644 tests/integration/test_client_report.py diff --git a/tests/integration/fixtures/__init__.py b/tests/integration/fixtures/__init__.py index bd3de7bb03e..dc189b24ba0 100644 --- a/tests/integration/fixtures/__init__.py +++ b/tests/integration/fixtures/__init__.py @@ -180,6 +180,11 @@ def send_session_aggregates(self, project_id, payload): envelope.add_item(Item(payload=PayloadRef(json=payload), type="sessions")) self.send_envelope(project_id, envelope) + def send_client_report(self, project_id, payload): + envelope = Envelope() + envelope.add_item(Item(PayloadRef(json=payload), type="client_report")) + self.send_envelope(project_id, envelope) + def send_metrics(self, project_id, payload, timestamp=None): envelope = Envelope() envelope.add_item( diff --git a/tests/integration/test_client_report.py b/tests/integration/test_client_report.py new file mode 100644 index 00000000000..b5fd19485b8 --- /dev/null +++ b/tests/integration/test_client_report.py @@ -0,0 +1,60 @@ +from datetime import datetime, timezone + + +def test_client_reports(relay, mini_sentry): + config = { + "outcomes": { + "emit_outcomes": True, + "batch_size": 1, + "batch_interval": 1, + "source": "my-layer", + } + } + + relay = relay(mini_sentry, config) + + project_id = 42 + timestamp = datetime.now(tz=timezone.utc) + + report_payload = { + "timestamp": timestamp.isoformat(), + "discarded_events": [ + ["queue_full", "error", 42], + ["queue_full", "transaction", 1231], + ], + } + + mini_sentry.add_full_project_config(project_id) + relay.send_client_report(project_id, report_payload) + + outcomes = [] + for _ in range(2): + outcomes.extend(mini_sentry.captured_outcomes.get(timeout=0.2)["outcomes"]) + + timestamp_formatted = timestamp.isoformat().split(".")[0] + ".000000Z" + assert outcomes == [ + { + "timestamp": timestamp_formatted, + "org_id": 1, + "project_id": 42, + "key_id": 123, + "outcome": 5, + "reason": "queue_full", + "remote_addr": "127.0.0.1", + "source": "my-layer", + "category": 1, + "quantity": 42, + }, + { + "timestamp": timestamp_formatted, + "org_id": 1, + "project_id": 42, + "key_id": 123, + "outcome": 5, + "reason": "queue_full", + "remote_addr": "127.0.0.1", + "source": "my-layer", + "category": 2, + "quantity": 1231, + }, + ] From e7cfdf7149d8d4812c6d9d0160dc07aa5240edd4 Mon Sep 17 00:00:00 2001 From: Armin Ronacher Date: Sat, 4 Sep 2021 23:18:16 +0200 Subject: [PATCH 07/11] meta: changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index e98dc865587..ee9f2d481b2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ - Add sampling based on transaction name. ([#1058](https://github.com/getsentry/relay/pull/1058)) - Support running Relay without config directory. The most important configuration, including Relay mode and credentials, can now be provided through commandline arguments or environment variables alone. ([#1055](https://github.com/getsentry/relay/pull/1055) +- Protocol support for client reports. ([#1074](https://github.com/getsentry/relay/pull/1074)) **Bug Fixes**: From 7cc0c68ccfa81f4615d45005ce973de0cd217ab3 Mon Sep 17 00:00:00 2001 From: Armin Ronacher Date: Sun, 5 Sep 2021 10:27:14 +0200 Subject: [PATCH 08/11] ref: change queue_full to queue_overflow --- tests/integration/test_client_report.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/integration/test_client_report.py b/tests/integration/test_client_report.py index b5fd19485b8..fd723423890 100644 --- a/tests/integration/test_client_report.py +++ b/tests/integration/test_client_report.py @@ -19,8 +19,8 @@ def test_client_reports(relay, mini_sentry): report_payload = { "timestamp": timestamp.isoformat(), "discarded_events": [ - ["queue_full", "error", 42], - ["queue_full", "transaction", 1231], + ["queue_overflow", "error", 42], + ["queue_overflow", "transaction", 1231], ], } @@ -39,7 +39,7 @@ def test_client_reports(relay, mini_sentry): "project_id": 42, "key_id": 123, "outcome": 5, - "reason": "queue_full", + "reason": "queue_overflow", "remote_addr": "127.0.0.1", "source": "my-layer", "category": 1, @@ -51,7 +51,7 @@ def test_client_reports(relay, mini_sentry): "project_id": 42, "key_id": 123, "outcome": 5, - "reason": "queue_full", + "reason": "queue_overflow", "remote_addr": "127.0.0.1", "source": "my-layer", "category": 2, From c69a3518ff726feade85490918652e3e10dc01ae Mon Sep 17 00:00:00 2001 From: Armin Ronacher Date: Mon, 6 Sep 2021 19:49:17 +0200 Subject: [PATCH 09/11] ref: change protocol to be object based like the last revision documents --- relay-general/src/protocol/client_report.rs | 45 ++++++++++++++------- relay-server/src/actors/envelopes.rs | 8 ++-- tests/integration/test_client_report.py | 4 +- 3 files changed, 37 insertions(+), 20 deletions(-) diff --git a/relay-general/src/protocol/client_report.rs b/relay-general/src/protocol/client_report.rs index e39079bfda6..9222a6a9c05 100644 --- a/relay-general/src/protocol/client_report.rs +++ b/relay-general/src/protocol/client_report.rs @@ -1,12 +1,19 @@ use relay_common::{DataCategory, UnixTimestamp}; use serde::{Deserialize, Serialize}; +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +pub struct DiscardedEvent { + pub reason: String, + pub category: DataCategory, + pub quantity: u32, +} + #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub struct ClientReport { /// The timestamp of when the report was created. pub timestamp: Option, /// Discard reason counters. - pub discarded_events: Vec<(String, DataCategory, u32)>, + pub discarded_events: Vec, } impl ClientReport { @@ -30,32 +37,40 @@ mod tests { let json = r#"{ "timestamp": "2020-02-07T15:17:00Z", "discarded_events": [ - ["foo_reason", "error", 42], - ["foo_reason", "transaction", 23] + {"reason": "foo_reason", "category": "error", "quantity": 42}, + {"reason": "foo_reason", "category": "transaction", "quantity": 23} ] }"#; let output = r#"{ "timestamp": 1581088620, "discarded_events": [ - [ - "foo_reason", - "error", - 42 - ], - [ - "foo_reason", - "transaction", - 23 - ] + { + "reason": "foo_reason", + "category": "error", + "quantity": 42 + }, + { + "reason": "foo_reason", + "category": "transaction", + "quantity": 23 + } ] }"#; let update = ClientReport { timestamp: Some("2020-02-07T15:17:00Z".parse().unwrap()), discarded_events: vec![ - ("foo_reason".into(), DataCategory::Error, 42), - ("foo_reason".into(), DataCategory::Transaction, 23), + DiscardedEvent { + reason: "foo_reason".into(), + category: DataCategory::Error, + quantity: 42, + }, + DiscardedEvent { + reason: "foo_reason".into(), + category: DataCategory::Transaction, + quantity: 23, + }, ], }; diff --git a/relay-server/src/actors/envelopes.rs b/relay-server/src/actors/envelopes.rs index 93e528adafe..58819b7c0e1 100644 --- a/relay-server/src/actors/envelopes.rs +++ b/relay-server/src/actors/envelopes.rs @@ -789,12 +789,14 @@ impl EnvelopeProcessor { }; match ClientReport::parse(&item.payload()) { Ok(report) => { - for (reason, category, quantity) in report.discarded_events.into_iter() { - if reason.len() > 200 { + for discarded_event in report.discarded_events.into_iter() { + if discarded_event.reason.len() > 200 { relay_log::trace!("ignored client outcome with an overlong reason"); continue; } - *discarded_events.entry((reason, category)).or_insert(0) += quantity; + *discarded_events + .entry((discarded_event.reason, discarded_event.category)) + .or_insert(0) += discarded_event.quantity; } if let Some(ts) = report.timestamp { timestamp.get_or_insert(ts); diff --git a/tests/integration/test_client_report.py b/tests/integration/test_client_report.py index fd723423890..2ca3adf9f79 100644 --- a/tests/integration/test_client_report.py +++ b/tests/integration/test_client_report.py @@ -19,8 +19,8 @@ def test_client_reports(relay, mini_sentry): report_payload = { "timestamp": timestamp.isoformat(), "discarded_events": [ - ["queue_overflow", "error", 42], - ["queue_overflow", "transaction", 1231], + {"reason": "queue_overflow", "category": "error", "quantity": 42}, + {"reason": "queue_overflow", "category": "transaction", "quantity": 1231}, ], } From 093117f4e3ed94c04740b855bcb0d7fd968b6508 Mon Sep 17 00:00:00 2001 From: Armin Ronacher Date: Wed, 8 Sep 2021 12:21:37 +0200 Subject: [PATCH 10/11] ref: make an early return even earlier --- relay-server/src/actors/envelopes.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/relay-server/src/actors/envelopes.rs b/relay-server/src/actors/envelopes.rs index 58819b7c0e1..ffbc506de21 100644 --- a/relay-server/src/actors/envelopes.rs +++ b/relay-server/src/actors/envelopes.rs @@ -807,6 +807,10 @@ impl EnvelopeProcessor { false }); + if discarded_events.is_empty() { + return; + } + let timestamp = timestamp.get_or_insert_with(|| UnixTimestamp::from_secs(received.timestamp() as u64)); @@ -815,10 +819,6 @@ impl EnvelopeProcessor { clock_drift_processor.process_timestamp(timestamp); } - if discarded_events.is_empty() { - return; - } - let producer = OutcomeProducer::from_registry(); for ((reason, category), quantity) in discarded_events.into_iter() { producer.do_send(TrackOutcome { From 08437753c637a09f064948708ecdb477597f8d33 Mon Sep 17 00:00:00 2001 From: Armin Ronacher Date: Wed, 8 Sep 2021 13:21:04 +0200 Subject: [PATCH 11/11] feat: create an explicit internal data category --- relay-common/src/constants.rs | 25 +++++++------------------ relay-quotas/src/quota.rs | 2 +- 2 files changed, 8 insertions(+), 19 deletions(-) diff --git a/relay-common/src/constants.rs b/relay-common/src/constants.rs index fa50152a0ff..4404dc2bb2f 100644 --- a/relay-common/src/constants.rs +++ b/relay-common/src/constants.rs @@ -96,10 +96,6 @@ impl fmt::Display for EventType { #[repr(i8)] pub enum DataCategory { /// Reserved and unused. - /// - /// There is a reserved `internal` data category which also maps to `Default`. Relays - /// will never emit rate limits for `internal` but client SDKs can assume a data category - /// of `internal` for envelopes carrying internal messages for instance SDK outcomes. Default = 0, /// Error events and Events with an `event_type` not explicitly listed below. Error = 1, @@ -111,6 +107,8 @@ pub enum DataCategory { Attachment = 4, /// Session updates. Quantity is the number of updates in the batch. Session = 5, + /// Reserved data category that shall not appear in the outcomes. + Internal = -2, /// Any other data category not known by this Relay. #[serde(other)] Unknown = -1, @@ -120,12 +118,13 @@ impl DataCategory { /// Returns the data category corresponding to the given name. pub fn from_name(string: &str) -> Self { match string { - "default" | "internal" => Self::Default, + "default" => Self::Default, "error" => Self::Error, "transaction" => Self::Transaction, "security" => Self::Security, "attachment" => Self::Attachment, "session" => Self::Session, + "internal" => Self::Internal, _ => Self::Unknown, } } @@ -139,6 +138,7 @@ impl DataCategory { Self::Security => "security", Self::Attachment => "attachment", Self::Session => "session", + Self::Internal => "internal", Self::Unknown => "unknown", } } @@ -150,6 +150,8 @@ impl DataCategory { /// Returns the numeric value for this outcome. pub fn value(self) -> Option { + // negative values (Internal and Unknown) cannot be sent as + // outcomes (internally so!) (self as i8).try_into().ok() } } @@ -332,16 +334,3 @@ impl fmt::Display for SpanStatus { } } } - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_internal_data_category() { - assert_eq!( - DataCategory::from_str("internal"), - Ok(DataCategory::Default) - ); - } -} diff --git a/relay-quotas/src/quota.rs b/relay-quotas/src/quota.rs index d3de0e0f0f9..578b0063144 100644 --- a/relay-quotas/src/quota.rs +++ b/relay-quotas/src/quota.rs @@ -106,7 +106,7 @@ impl CategoryUnit { | DataCategory::Security => Some(Self::Count), DataCategory::Attachment => Some(Self::Bytes), DataCategory::Session => Some(Self::Batched), - DataCategory::Unknown => None, + DataCategory::Internal | DataCategory::Unknown => None, } } }