diff --git a/CHANGELOG.md b/CHANGELOG.md index 38439a48d20..e404bcd488d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ - Add sampling based on transaction name. ([#1058](https://github.com/getsentry/relay/pull/1058)) - Support running Relay without config directory. The most important configuration, including Relay mode and credentials, can now be provided through commandline arguments or environment variables alone. ([#1055](https://github.com/getsentry/relay/pull/1055) +- Protocol support for client reports. ([#1074](https://github.com/getsentry/relay/pull/1074)) - Extract session metrics in non processing relays. ([#1073](https://github.com/getsentry/relay/pull/1073)) **Bug Fixes**: diff --git a/relay-common/src/constants.rs b/relay-common/src/constants.rs index 7fd4c665678..4404dc2bb2f 100644 --- a/relay-common/src/constants.rs +++ b/relay-common/src/constants.rs @@ -107,6 +107,8 @@ pub enum DataCategory { Attachment = 4, /// Session updates. Quantity is the number of updates in the batch. Session = 5, + /// Reserved data category that shall not appear in the outcomes. + Internal = -2, /// Any other data category not known by this Relay. #[serde(other)] Unknown = -1, @@ -122,6 +124,7 @@ impl DataCategory { "security" => Self::Security, "attachment" => Self::Attachment, "session" => Self::Session, + "internal" => Self::Internal, _ => Self::Unknown, } } @@ -135,6 +138,7 @@ impl DataCategory { Self::Security => "security", Self::Attachment => "attachment", Self::Session => "session", + Self::Internal => "internal", Self::Unknown => "unknown", } } @@ -146,6 +150,8 @@ impl DataCategory { /// Returns the numeric value for this outcome. pub fn value(self) -> Option { + // negative values (Internal and Unknown) cannot be sent as + // outcomes (internally so!) (self as i8).try_into().ok() } } diff --git a/relay-common/src/time.rs b/relay-common/src/time.rs index 8da0f15eed8..6776472ff52 100644 --- a/relay-common/src/time.rs +++ b/relay-common/src/time.rs @@ -3,7 +3,7 @@ use std::fmt; use std::time::{Duration, Instant, SystemTime}; -use chrono::{DateTime, TimeZone, Utc}; +use chrono::{DateTime, NaiveDateTime, TimeZone, Utc}; use serde::{Deserialize, Serialize}; /// Converts an `Instant` into a `SystemTime`. @@ -83,6 +83,11 @@ impl UnixTimestamp { self.0 } + /// Returns the timestamp as chrono datetime. + pub fn as_datetime(self) -> DateTime { + DateTime::from_utc(NaiveDateTime::from_timestamp(self.0 as i64, 0), Utc) + } + /// Converts the UNIX timestamp into an `Instant` based on the current system timestamp. /// /// Returns [`MonotonicResult::Instant`] if the timestamp can be represented. Otherwise, returns @@ -139,6 +144,7 @@ impl std::ops::Sub for UnixTimestamp { } } +#[derive(Debug)] /// An error returned from parsing [`UnixTimestamp`]. pub struct ParseUnixTimestampError(()); @@ -146,6 +152,12 @@ impl std::str::FromStr for UnixTimestamp { type Err = ParseUnixTimestampError; fn from_str(s: &str) -> Result { + if let Ok(datetime) = s.parse::>() { + let timestamp = datetime.timestamp(); + if timestamp >= 0 { + return Ok(UnixTimestamp(timestamp as u64)); + } + } let ts = s.parse().or(Err(ParseUnixTimestampError(())))?; Ok(Self(ts)) } diff --git a/relay-config/src/config.rs b/relay-config/src/config.rs index 896f982b7a3..40e797c9fe5 100644 --- a/relay-config/src/config.rs +++ b/relay-config/src/config.rs @@ -501,6 +501,8 @@ struct Limits { max_attachment_size: ByteSize, /// The maximum combined size for all attachments in an envelope or request. max_attachments_size: ByteSize, + /// The maximum combined size for all client reports in an envelope or request. + max_client_reports_size: ByteSize, /// The maximum payload size for an entire envelopes. Individual limits still apply. max_envelope_size: ByteSize, /// The maximum number of session items per envelope. @@ -539,6 +541,7 @@ impl Default for Limits { max_event_size: ByteSize::mebibytes(1), max_attachment_size: ByteSize::mebibytes(100), max_attachments_size: ByteSize::mebibytes(100), + max_client_reports_size: ByteSize::kibibytes(4), max_envelope_size: ByteSize::mebibytes(100), max_session_count: 100, max_api_payload_size: ByteSize::mebibytes(20), @@ -1451,6 +1454,11 @@ impl Config { self.values.limits.max_attachments_size.as_bytes() } + /// Returns the maxmium combined size of client reports in bytes. + pub fn max_client_reports_size(&self) -> usize { + self.values.limits.max_client_reports_size.as_bytes() + } + /// Returns the maximum size of an envelope payload in bytes. /// /// Individual item size limits still apply. diff --git a/relay-general/src/protocol/client_report.rs b/relay-general/src/protocol/client_report.rs new file mode 100644 index 00000000000..9222a6a9c05 --- /dev/null +++ b/relay-general/src/protocol/client_report.rs @@ -0,0 +1,81 @@ +use relay_common::{DataCategory, UnixTimestamp}; +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +pub struct DiscardedEvent { + pub reason: String, + pub category: DataCategory, + pub quantity: u32, +} + +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +pub struct ClientReport { + /// The timestamp of when the report was created. + pub timestamp: Option, + /// Discard reason counters. + pub discarded_events: Vec, +} + +impl ClientReport { + /// Parses a client report update from JSON. + pub fn parse(payload: &[u8]) -> Result { + serde_json::from_slice(payload) + } + + /// Serializes a client report update back into JSON. + pub fn serialize(&self) -> Result, serde_json::Error> { + serde_json::to_vec(self) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_client_report_roundtrip() { + let json = r#"{ + "timestamp": "2020-02-07T15:17:00Z", + "discarded_events": [ + {"reason": "foo_reason", "category": "error", "quantity": 42}, + {"reason": "foo_reason", "category": "transaction", "quantity": 23} + ] +}"#; + + let output = r#"{ + "timestamp": 1581088620, + "discarded_events": [ + { + "reason": "foo_reason", + "category": "error", + "quantity": 42 + }, + { + "reason": "foo_reason", + "category": "transaction", + "quantity": 23 + } + ] +}"#; + + let update = ClientReport { + timestamp: Some("2020-02-07T15:17:00Z".parse().unwrap()), + discarded_events: vec![ + DiscardedEvent { + reason: "foo_reason".into(), + category: DataCategory::Error, + quantity: 42, + }, + DiscardedEvent { + reason: "foo_reason".into(), + category: DataCategory::Transaction, + quantity: 23, + }, + ], + }; + + let parsed = ClientReport::parse(json.as_bytes()).unwrap(); + assert_eq_dbg!(update, parsed); + assert_eq_str!(output, serde_json::to_string_pretty(&update).unwrap()); + } +} diff --git a/relay-general/src/protocol/mod.rs b/relay-general/src/protocol/mod.rs index a7925eac5ce..080ef758274 100644 --- a/relay-general/src/protocol/mod.rs +++ b/relay-general/src/protocol/mod.rs @@ -2,6 +2,7 @@ mod breadcrumb; mod breakdowns; +mod client_report; mod clientsdk; mod constants; mod contexts; @@ -32,6 +33,7 @@ pub use sentry_release_parser::{validate_environment, validate_release}; pub use self::breadcrumb::Breadcrumb; pub use self::breakdowns::Breakdowns; +pub use self::client_report::ClientReport; pub use self::clientsdk::{ClientSdkInfo, ClientSdkPackage}; pub use self::constants::VALID_PLATFORMS; pub use self::contexts::{ diff --git a/relay-quotas/src/quota.rs b/relay-quotas/src/quota.rs index d3de0e0f0f9..578b0063144 100644 --- a/relay-quotas/src/quota.rs +++ b/relay-quotas/src/quota.rs @@ -106,7 +106,7 @@ impl CategoryUnit { | DataCategory::Security => Some(Self::Count), DataCategory::Attachment => Some(Self::Bytes), DataCategory::Session => Some(Self::Batched), - DataCategory::Unknown => None, + DataCategory::Internal | DataCategory::Unknown => None, } } } diff --git a/relay-server/src/actors/envelopes.rs b/relay-server/src/actors/envelopes.rs index c0c8db4a5f0..4987c5d462e 100644 --- a/relay-server/src/actors/envelopes.rs +++ b/relay-server/src/actors/envelopes.rs @@ -20,9 +20,9 @@ use relay_config::{Config, HttpEncoding, RelayMode}; use relay_general::pii::{PiiAttachmentsProcessor, PiiProcessor}; use relay_general::processor::{process_value, ProcessingState}; use relay_general::protocol::{ - self, Breadcrumb, Csp, Event, EventId, EventType, ExpectCt, ExpectStaple, Hpkp, IpAddr, - LenientString, Metrics, RelayInfo, SecurityReportType, SessionUpdate, Timestamp, UserReport, - Values, + self, Breadcrumb, ClientReport, Csp, Event, EventId, EventType, ExpectCt, ExpectStaple, Hpkp, + IpAddr, LenientString, Metrics, RelayInfo, SecurityReportType, SessionUpdate, Timestamp, + UserReport, Values, }; use relay_general::store::ClockDriftProcessor; use relay_general::types::{Annotated, Array, FromValue, Object, ProcessingAction, Value}; @@ -272,7 +272,10 @@ impl EnvelopeContext { event_id: self.event_id, remote_addr: self.remote_addr, category: DataCategory::Attachment, - quantity: self.summary.attachment_quantity, + // XXX: attachment_quantity is usize which lets us go all the way to + // 64bit on our machines, but the protocl and data store can only + // do 32. + quantity: self.summary.attachment_quantity as u32, }); } } @@ -761,6 +764,71 @@ impl EnvelopeProcessor { }); } + /// Validates and extracts client reports. + /// + /// At the moment client reports are primarily used to transfer outcomes from + /// client SDKs. The outcomes are removed here and sent directly to the outcomes + /// system. + fn process_client_reports(&self, state: &mut ProcessEnvelopeState) { + let mut timestamp = None; + let mut discarded_events = BTreeMap::new(); + let received = state.envelope_context.received_at; + + let clock_drift_processor = ClockDriftProcessor::new(state.envelope.sent_at(), received) + .at_least(MINIMUM_CLOCK_DRIFT); + + // we're going through all client reports but we're effectively just merging + // them into the first one. + state.envelope.retain_items(|item| { + if item.ty() != ItemType::ClientReport { + return true; + }; + match ClientReport::parse(&item.payload()) { + Ok(report) => { + for discarded_event in report.discarded_events.into_iter() { + if discarded_event.reason.len() > 200 { + relay_log::trace!("ignored client outcome with an overlong reason"); + continue; + } + *discarded_events + .entry((discarded_event.reason, discarded_event.category)) + .or_insert(0) += discarded_event.quantity; + } + if let Some(ts) = report.timestamp { + timestamp.get_or_insert(ts); + } + } + Err(err) => relay_log::trace!("invalid client report received: {}", LogError(&err)), + } + false + }); + + if discarded_events.is_empty() { + return; + } + + let timestamp = + timestamp.get_or_insert_with(|| UnixTimestamp::from_secs(received.timestamp() as u64)); + + if clock_drift_processor.is_drifted() { + relay_log::trace!("applying clock drift correction to client report"); + clock_drift_processor.process_timestamp(timestamp); + } + + let producer = OutcomeProducer::from_registry(); + for ((reason, category), quantity) in discarded_events.into_iter() { + producer.do_send(TrackOutcome { + timestamp: timestamp.as_datetime(), + scoping: state.envelope_context.scoping, + outcome: Outcome::ClientDiscard(reason), + event_id: None, + remote_addr: state.envelope_context.remote_addr, + category, + quantity, + }); + } + } + /// Creates and initializes the processing state. /// /// This applies defaults to the envelope and initializes empty rate limits. @@ -1060,6 +1128,7 @@ impl EnvelopeProcessor { ItemType::Sessions => false, ItemType::Metrics => false, ItemType::MetricBuckets => false, + ItemType::ClientReport => false, } } @@ -1549,6 +1618,7 @@ impl EnvelopeProcessor { } self.process_sessions(&mut state); + self.process_client_reports(&mut state); self.process_user_reports(&mut state); if state.creates_event() { @@ -2612,6 +2682,54 @@ mod tests { } #[test] + fn test_client_report_removal() { + relay_test::setup(); + + let processor = EnvelopeProcessor::new(Arc::new(Default::default())); + + let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42" + .parse() + .unwrap(); + + let request_meta = RequestMeta::new(dsn); + let mut envelope = Envelope::from_request(None, request_meta); + + envelope.add_item({ + let mut item = Item::new(ItemType::ClientReport); + item.set_payload( + ContentType::Json, + r###" + { + "discarded_events": [ + ["queue_full", "error", 42] + ] + } + "###, + ); + item + }); + + let envelope_response = relay_test::with_system(move || { + processor + .process(ProcessEnvelope { + envelope, + project_state: Arc::new(ProjectState::allowed()), + start_time: Instant::now(), + scoping: Scoping { + project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), + organization_id: 1, + project_id: ProjectId::new(1), + key_id: None, + }, + }) + .unwrap() + }); + + assert!(envelope_response.envelope.is_none()); + } + + #[test] + #[cfg(feature = "processing")] fn test_extract_session_metrics() { let mut metrics = vec![]; diff --git a/relay-server/src/actors/outcome.rs b/relay-server/src/actors/outcome.rs index e46368710a0..2144659bc7d 100644 --- a/relay-server/src/actors/outcome.rs +++ b/relay-server/src/actors/outcome.rs @@ -84,7 +84,7 @@ pub struct TrackOutcome { /// The event's data category. pub category: DataCategory, /// The number of events or total attachment size in bytes. - pub quantity: usize, + pub quantity: u32, } impl Message for TrackOutcome { @@ -114,6 +114,9 @@ pub enum Outcome { /// The event has been discarded because of invalid data. Invalid(DiscardReason), + /// The event has already been discarded on the client side. + ClientDiscard(String), + /// Reserved but unused in Sentry. #[allow(dead_code)] Abuse, @@ -127,6 +130,7 @@ impl Outcome { Outcome::RateLimited(_) => 2, Outcome::Invalid(_) => 3, Outcome::Abuse => 4, + Outcome::ClientDiscard(_) => 5, } } @@ -140,6 +144,7 @@ impl Outcome { Outcome::RateLimited(code_opt) => code_opt .as_ref() .map(|code| Cow::Owned(code.as_str().into())), + Outcome::ClientDiscard(ref discard_reason) => Some(Cow::Borrowed(discard_reason)), Outcome::Abuse => None, } } @@ -319,7 +324,7 @@ pub struct TrackRawOutcome { pub category: Option, /// The number of events or total attachment size in bytes. #[serde(default, skip_serializing_if = "Option::is_none")] - pub quantity: Option, + pub quantity: Option, } impl TrackRawOutcome { diff --git a/relay-server/src/endpoints/common.rs b/relay-server/src/endpoints/common.rs index 6052ca83787..97993cee21a 100644 --- a/relay-server/src/endpoints/common.rs +++ b/relay-server/src/endpoints/common.rs @@ -335,6 +335,7 @@ fn check_envelope_size_limits(config: &Config, envelope: &Envelope) -> bool { let mut event_size = 0; let mut attachments_size = 0; let mut session_count = 0; + let mut client_reports_size = 0; for item in envelope.items() { match item.ty() { @@ -355,12 +356,14 @@ fn check_envelope_size_limits(config: &Config, envelope: &Envelope) -> bool { ItemType::UserReport => (), ItemType::Metrics => (), ItemType::MetricBuckets => (), + ItemType::ClientReport => client_reports_size += item.len(), } } event_size <= config.max_event_size() && attachments_size <= config.max_attachments_size() && session_count <= config.max_session_count() + && client_reports_size <= config.max_client_reports_size() } /// Handles Sentry events. diff --git a/relay-server/src/envelope.rs b/relay-server/src/envelope.rs index 54e37d85f2c..c2b687233e2 100644 --- a/relay-server/src/envelope.rs +++ b/relay-server/src/envelope.rs @@ -100,6 +100,8 @@ pub enum ItemType { Metrics, /// Buckets of preaggregated metrics encoded as JSON. MetricBuckets, + /// Client internal report (eg: outcomes). + ClientReport, } impl ItemType { @@ -130,6 +132,7 @@ impl fmt::Display for ItemType { Self::Sessions => write!(f, "aggregated sessions"), Self::Metrics => write!(f, "metrics"), Self::MetricBuckets => write!(f, "metric buckets"), + Self::ClientReport => write!(f, "client report"), } } } @@ -572,7 +575,8 @@ impl Item { | ItemType::Session | ItemType::Sessions | ItemType::Metrics - | ItemType::MetricBuckets => false, + | ItemType::MetricBuckets + | ItemType::ClientReport => false, } } @@ -593,6 +597,7 @@ impl Item { ItemType::Sessions => false, ItemType::Metrics => false, ItemType::MetricBuckets => false, + ItemType::ClientReport => false, } } } diff --git a/relay-server/src/utils/rate_limits.rs b/relay-server/src/utils/rate_limits.rs index b4dacd9dbcb..0b171f7794c 100644 --- a/relay-server/src/utils/rate_limits.rs +++ b/relay-server/src/utils/rate_limits.rs @@ -102,6 +102,13 @@ fn infer_event_category(item: &Item) -> Option { ItemType::MetricBuckets => None, ItemType::FormData => None, ItemType::UserReport => None, + // the following items are "internal" item types. From the perspective of the SDK + // the use the "internal" data category however this data category is in fact never + // supposed to be emitted by relay as internal items must not be rate limited. As + // such we do not emit a data category here. An SDK however is supposed to assume + // that such item types use the reserved "internal" data category to not accidentally + // assume another rate limit (such as default). + ItemType::ClientReport => None, } } @@ -242,7 +249,9 @@ impl Enforcement { event_id: envelope.event_id(), remote_addr: envelope.meta().remote_addr(), category: limit.category, - quantity: limit.quantity, + // XXX: on the limiter we have quantity of usize, but in the protocol + // and data store we're limited to u32. + quantity: limit.quantity as u32, }); } } diff --git a/relay-test/src/lib.rs b/relay-test/src/lib.rs index 93b0136b3dd..a7801f1b04e 100644 --- a/relay-test/src/lib.rs +++ b/relay-test/src/lib.rs @@ -112,6 +112,16 @@ where }) } +/// Runs the provided function with an active actix system. +/// +/// This function otherwise functions exactly as [`block_fn`]. +pub fn with_system(func: F) -> R +where + F: FnOnce() -> R, +{ + block_fn(move || future::ok::<_, ()>(func())).unwrap() +} + /// Returns a future which completes after the requested delay. /// ``` /// use std::time::Duration; diff --git a/tests/integration/fixtures/__init__.py b/tests/integration/fixtures/__init__.py index 9636eb0712b..3d483c9f90f 100644 --- a/tests/integration/fixtures/__init__.py +++ b/tests/integration/fixtures/__init__.py @@ -191,6 +191,11 @@ def send_session_aggregates(self, project_id, payload): envelope.add_item(Item(payload=PayloadRef(json=payload), type="sessions")) self.send_envelope(project_id, envelope) + def send_client_report(self, project_id, payload): + envelope = Envelope() + envelope.add_item(Item(PayloadRef(json=payload), type="client_report")) + self.send_envelope(project_id, envelope) + def send_metrics(self, project_id, payload, timestamp=None): envelope = Envelope() envelope.add_item( diff --git a/tests/integration/test_client_report.py b/tests/integration/test_client_report.py new file mode 100644 index 00000000000..2ca3adf9f79 --- /dev/null +++ b/tests/integration/test_client_report.py @@ -0,0 +1,60 @@ +from datetime import datetime, timezone + + +def test_client_reports(relay, mini_sentry): + config = { + "outcomes": { + "emit_outcomes": True, + "batch_size": 1, + "batch_interval": 1, + "source": "my-layer", + } + } + + relay = relay(mini_sentry, config) + + project_id = 42 + timestamp = datetime.now(tz=timezone.utc) + + report_payload = { + "timestamp": timestamp.isoformat(), + "discarded_events": [ + {"reason": "queue_overflow", "category": "error", "quantity": 42}, + {"reason": "queue_overflow", "category": "transaction", "quantity": 1231}, + ], + } + + mini_sentry.add_full_project_config(project_id) + relay.send_client_report(project_id, report_payload) + + outcomes = [] + for _ in range(2): + outcomes.extend(mini_sentry.captured_outcomes.get(timeout=0.2)["outcomes"]) + + timestamp_formatted = timestamp.isoformat().split(".")[0] + ".000000Z" + assert outcomes == [ + { + "timestamp": timestamp_formatted, + "org_id": 1, + "project_id": 42, + "key_id": 123, + "outcome": 5, + "reason": "queue_overflow", + "remote_addr": "127.0.0.1", + "source": "my-layer", + "category": 1, + "quantity": 42, + }, + { + "timestamp": timestamp_formatted, + "org_id": 1, + "project_id": 42, + "key_id": 123, + "outcome": 5, + "reason": "queue_overflow", + "remote_addr": "127.0.0.1", + "source": "my-layer", + "category": 2, + "quantity": 1231, + }, + ]