Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

- Add sampling based on transaction name. ([#1058](https://github.com/getsentry/relay/pull/1058))
- Support running Relay without config directory. The most important configuration, including Relay mode and credentials, can now be provided through commandline arguments or environment variables alone. ([#1055](https://github.com/getsentry/relay/pull/1055)
- Protocol support for client reports. ([#1074](https://github.com/getsentry/relay/pull/1074))
- Extract session metrics in non processing relays. ([#1073](https://github.com/getsentry/relay/pull/1073))

**Bug Fixes**:
Expand Down
6 changes: 6 additions & 0 deletions relay-common/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ pub enum DataCategory {
Attachment = 4,
/// Session updates. Quantity is the number of updates in the batch.
Session = 5,
/// Reserved data category that shall not appear in the outcomes.
Internal = -2,
/// Any other data category not known by this Relay.
#[serde(other)]
Unknown = -1,
Expand All @@ -122,6 +124,7 @@ impl DataCategory {
"security" => Self::Security,
"attachment" => Self::Attachment,
"session" => Self::Session,
"internal" => Self::Internal,
_ => Self::Unknown,
}
}
Expand All @@ -135,6 +138,7 @@ impl DataCategory {
Self::Security => "security",
Self::Attachment => "attachment",
Self::Session => "session",
Self::Internal => "internal",
Self::Unknown => "unknown",
}
}
Expand All @@ -146,6 +150,8 @@ impl DataCategory {

/// Returns the numeric value for this outcome.
pub fn value(self) -> Option<u8> {
// negative values (Internal and Unknown) cannot be sent as
// outcomes (internally so!)
(self as i8).try_into().ok()
}
}
Expand Down
14 changes: 13 additions & 1 deletion relay-common/src/time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use std::fmt;
use std::time::{Duration, Instant, SystemTime};

use chrono::{DateTime, TimeZone, Utc};
use chrono::{DateTime, NaiveDateTime, TimeZone, Utc};
use serde::{Deserialize, Serialize};

/// Converts an `Instant` into a `SystemTime`.
Expand Down Expand Up @@ -83,6 +83,11 @@ impl UnixTimestamp {
self.0
}

/// Returns the timestamp as chrono datetime.
pub fn as_datetime(self) -> DateTime<Utc> {
DateTime::from_utc(NaiveDateTime::from_timestamp(self.0 as i64, 0), Utc)
}

/// Converts the UNIX timestamp into an `Instant` based on the current system timestamp.
///
/// Returns [`MonotonicResult::Instant`] if the timestamp can be represented. Otherwise, returns
Expand Down Expand Up @@ -139,13 +144,20 @@ impl std::ops::Sub for UnixTimestamp {
}
}

#[derive(Debug)]
/// An error returned from parsing [`UnixTimestamp`].
pub struct ParseUnixTimestampError(());

impl std::str::FromStr for UnixTimestamp {
type Err = ParseUnixTimestampError;

fn from_str(s: &str) -> Result<Self, Self::Err> {
if let Ok(datetime) = s.parse::<DateTime<Utc>>() {
Comment thread
mitsuhiko marked this conversation as resolved.
let timestamp = datetime.timestamp();
if timestamp >= 0 {
return Ok(UnixTimestamp(timestamp as u64));
}
}
let ts = s.parse().or(Err(ParseUnixTimestampError(())))?;
Ok(Self(ts))
}
Expand Down
8 changes: 8 additions & 0 deletions relay-config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,8 @@ struct Limits {
max_attachment_size: ByteSize,
/// The maximum combined size for all attachments in an envelope or request.
max_attachments_size: ByteSize,
/// The maximum combined size for all client reports in an envelope or request.
max_client_reports_size: ByteSize,
/// The maximum payload size for an entire envelopes. Individual limits still apply.
max_envelope_size: ByteSize,
/// The maximum number of session items per envelope.
Expand Down Expand Up @@ -539,6 +541,7 @@ impl Default for Limits {
max_event_size: ByteSize::mebibytes(1),
max_attachment_size: ByteSize::mebibytes(100),
max_attachments_size: ByteSize::mebibytes(100),
max_client_reports_size: ByteSize::kibibytes(4),
max_envelope_size: ByteSize::mebibytes(100),
max_session_count: 100,
max_api_payload_size: ByteSize::mebibytes(20),
Expand Down Expand Up @@ -1451,6 +1454,11 @@ impl Config {
self.values.limits.max_attachments_size.as_bytes()
}

/// Returns the maxmium combined size of client reports in bytes.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if we want this config flag, but if we do, we probably need to document it.

pub fn max_client_reports_size(&self) -> usize {
self.values.limits.max_client_reports_size.as_bytes()
}

/// Returns the maximum size of an envelope payload in bytes.
///
/// Individual item size limits still apply.
Expand Down
81 changes: 81 additions & 0 deletions relay-general/src/protocol/client_report.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
use relay_common::{DataCategory, UnixTimestamp};
use serde::{Deserialize, Serialize};

#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct DiscardedEvent {
pub reason: String,
pub category: DataCategory,
pub quantity: u32,
}

#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct ClientReport {
/// The timestamp of when the report was created.
pub timestamp: Option<UnixTimestamp>,
/// Discard reason counters.
pub discarded_events: Vec<DiscardedEvent>,
}

impl ClientReport {
/// Parses a client report update from JSON.
pub fn parse(payload: &[u8]) -> Result<Self, serde_json::Error> {
serde_json::from_slice(payload)
}

/// Serializes a client report update back into JSON.
pub fn serialize(&self) -> Result<Vec<u8>, serde_json::Error> {
serde_json::to_vec(self)
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_client_report_roundtrip() {
let json = r#"{
"timestamp": "2020-02-07T15:17:00Z",
"discarded_events": [
{"reason": "foo_reason", "category": "error", "quantity": 42},
{"reason": "foo_reason", "category": "transaction", "quantity": 23}
]
}"#;

let output = r#"{
"timestamp": 1581088620,
"discarded_events": [
{
"reason": "foo_reason",
"category": "error",
"quantity": 42
},
{
"reason": "foo_reason",
"category": "transaction",
"quantity": 23
}
]
}"#;

let update = ClientReport {
timestamp: Some("2020-02-07T15:17:00Z".parse().unwrap()),
discarded_events: vec![
DiscardedEvent {
reason: "foo_reason".into(),
category: DataCategory::Error,
quantity: 42,
},
DiscardedEvent {
reason: "foo_reason".into(),
category: DataCategory::Transaction,
quantity: 23,
},
],
};

let parsed = ClientReport::parse(json.as_bytes()).unwrap();
assert_eq_dbg!(update, parsed);
assert_eq_str!(output, serde_json::to_string_pretty(&update).unwrap());
}
}
2 changes: 2 additions & 0 deletions relay-general/src/protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

mod breadcrumb;
mod breakdowns;
mod client_report;
mod clientsdk;
mod constants;
mod contexts;
Expand Down Expand Up @@ -32,6 +33,7 @@ pub use sentry_release_parser::{validate_environment, validate_release};

pub use self::breadcrumb::Breadcrumb;
pub use self::breakdowns::Breakdowns;
pub use self::client_report::ClientReport;
pub use self::clientsdk::{ClientSdkInfo, ClientSdkPackage};
pub use self::constants::VALID_PLATFORMS;
pub use self::contexts::{
Expand Down
2 changes: 1 addition & 1 deletion relay-quotas/src/quota.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ impl CategoryUnit {
| DataCategory::Security => Some(Self::Count),
DataCategory::Attachment => Some(Self::Bytes),
DataCategory::Session => Some(Self::Batched),
DataCategory::Unknown => None,
DataCategory::Internal | DataCategory::Unknown => None,
}
}
}
Expand Down
126 changes: 122 additions & 4 deletions relay-server/src/actors/envelopes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ use relay_config::{Config, HttpEncoding, RelayMode};
use relay_general::pii::{PiiAttachmentsProcessor, PiiProcessor};
use relay_general::processor::{process_value, ProcessingState};
use relay_general::protocol::{
self, Breadcrumb, Csp, Event, EventId, EventType, ExpectCt, ExpectStaple, Hpkp, IpAddr,
LenientString, Metrics, RelayInfo, SecurityReportType, SessionUpdate, Timestamp, UserReport,
Values,
self, Breadcrumb, ClientReport, Csp, Event, EventId, EventType, ExpectCt, ExpectStaple, Hpkp,
IpAddr, LenientString, Metrics, RelayInfo, SecurityReportType, SessionUpdate, Timestamp,
UserReport, Values,
};
use relay_general::store::ClockDriftProcessor;
use relay_general::types::{Annotated, Array, FromValue, Object, ProcessingAction, Value};
Expand Down Expand Up @@ -272,7 +272,10 @@ impl EnvelopeContext {
event_id: self.event_id,
remote_addr: self.remote_addr,
category: DataCategory::Attachment,
quantity: self.summary.attachment_quantity,
// XXX: attachment_quantity is usize which lets us go all the way to
// 64bit on our machines, but the protocl and data store can only
// do 32.
quantity: self.summary.attachment_quantity as u32,
});
}
}
Expand Down Expand Up @@ -761,6 +764,71 @@ impl EnvelopeProcessor {
});
}

/// Validates and extracts client reports.
///
/// At the moment client reports are primarily used to transfer outcomes from
/// client SDKs. The outcomes are removed here and sent directly to the outcomes
/// system.
fn process_client_reports(&self, state: &mut ProcessEnvelopeState) {
let mut timestamp = None;
let mut discarded_events = BTreeMap::new();
let received = state.envelope_context.received_at;

let clock_drift_processor = ClockDriftProcessor::new(state.envelope.sent_at(), received)
.at_least(MINIMUM_CLOCK_DRIFT);

// we're going through all client reports but we're effectively just merging
// them into the first one.
state.envelope.retain_items(|item| {
if item.ty() != ItemType::ClientReport {
return true;
};
match ClientReport::parse(&item.payload()) {
Ok(report) => {
for discarded_event in report.discarded_events.into_iter() {
if discarded_event.reason.len() > 200 {
relay_log::trace!("ignored client outcome with an overlong reason");
continue;
}
*discarded_events
.entry((discarded_event.reason, discarded_event.category))
.or_insert(0) += discarded_event.quantity;
}
if let Some(ts) = report.timestamp {
timestamp.get_or_insert(ts);
}
}
Err(err) => relay_log::trace!("invalid client report received: {}", LogError(&err)),
}
false
});

if discarded_events.is_empty() {
return;
}

let timestamp =
timestamp.get_or_insert_with(|| UnixTimestamp::from_secs(received.timestamp() as u64));

if clock_drift_processor.is_drifted() {
relay_log::trace!("applying clock drift correction to client report");
clock_drift_processor.process_timestamp(timestamp);
}

let producer = OutcomeProducer::from_registry();
for ((reason, category), quantity) in discarded_events.into_iter() {
producer.do_send(TrackOutcome {
Comment thread
mitsuhiko marked this conversation as resolved.
timestamp: timestamp.as_datetime(),
scoping: state.envelope_context.scoping,
outcome: Outcome::ClientDiscard(reason),
event_id: None,
remote_addr: state.envelope_context.remote_addr,
category,
quantity,
});
}
}

/// Creates and initializes the processing state.
///
/// This applies defaults to the envelope and initializes empty rate limits.
Expand Down Expand Up @@ -1060,6 +1128,7 @@ impl EnvelopeProcessor {
ItemType::Sessions => false,
ItemType::Metrics => false,
ItemType::MetricBuckets => false,
ItemType::ClientReport => false,
}
}

Expand Down Expand Up @@ -1549,6 +1618,7 @@ impl EnvelopeProcessor {
}

self.process_sessions(&mut state);
self.process_client_reports(&mut state);
self.process_user_reports(&mut state);

if state.creates_event() {
Expand Down Expand Up @@ -2612,6 +2682,54 @@ mod tests {
}

#[test]
fn test_client_report_removal() {
relay_test::setup();

let processor = EnvelopeProcessor::new(Arc::new(Default::default()));

let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
.parse()
.unwrap();

let request_meta = RequestMeta::new(dsn);
let mut envelope = Envelope::from_request(None, request_meta);

envelope.add_item({
let mut item = Item::new(ItemType::ClientReport);
item.set_payload(
ContentType::Json,
r###"
{
"discarded_events": [
["queue_full", "error", 42]
]
}
"###,
);
item
});

let envelope_response = relay_test::with_system(move || {
Comment thread
mitsuhiko marked this conversation as resolved.
processor
.process(ProcessEnvelope {
envelope,
project_state: Arc::new(ProjectState::allowed()),
start_time: Instant::now(),
scoping: Scoping {
project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
organization_id: 1,
project_id: ProjectId::new(1),
key_id: None,
},
})
.unwrap()
});

assert!(envelope_response.envelope.is_none());
}

#[test]
#[cfg(feature = "processing")]
fn test_extract_session_metrics() {
let mut metrics = vec![];

Expand Down
Loading