Skip to content

Commit b107ff7

Browse files
authored
fix(http_server source): panic when http server receives metric events (#18781)
* fix: panic when http server receives metric events * refactor
1 parent 9d1a676 commit b107ff7

2 files changed

Lines changed: 64 additions & 58 deletions

File tree

src/sources/http_server.rs

Lines changed: 55 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -2,23 +2,23 @@ use std::{collections::HashMap, net::SocketAddr};
22

33
use bytes::{Bytes, BytesMut};
44
use chrono::Utc;
5+
use http::{StatusCode, Uri};
6+
use http_serde;
7+
use tokio_util::codec::Decoder as _;
8+
use vrl::value::{kind::Collection, Kind};
9+
use warp::http::{HeaderMap, HeaderValue};
10+
511
use codecs::{
612
decoding::{DeserializerConfig, FramingConfig},
713
BytesDecoderConfig, BytesDeserializerConfig, JsonDeserializerConfig,
814
NewlineDelimitedDecoderConfig,
915
};
10-
11-
use http::{StatusCode, Uri};
12-
use http_serde;
1316
use lookup::{lookup_v2::OptionalValuePath, owned_value_path, path};
14-
use tokio_util::codec::Decoder as _;
1517
use vector_config::configurable_component;
1618
use vector_core::{
1719
config::{DataType, LegacyKey, LogNamespace},
1820
schema::Definition,
1921
};
20-
use vrl::value::{kind::Collection, Kind};
21-
use warp::http::{HeaderMap, HeaderValue};
2222

2323
use crate::{
2424
codecs::{Decoder, DecodingConfig},
@@ -385,37 +385,50 @@ struct SimpleHttpSource {
385385
}
386386

387387
impl HttpSource for SimpleHttpSource {
388-
/// Enriches the passed in events with metadata for the `request_path` and for each of the headers.
388+
/// Enriches the log events with metadata for the `request_path` and for each of the headers.
389+
/// Non-log events are skipped.
389390
fn enrich_events(
390391
&self,
391392
events: &mut [Event],
392393
request_path: &str,
393394
headers_config: &HeaderMap,
394395
query_parameters: &HashMap<String, String>,
395396
) {
397+
let now = Utc::now();
396398
for event in events.iter_mut() {
397-
let log = event.as_mut_log();
398-
399-
// add request_path to each event
400-
self.log_namespace.insert_source_metadata(
401-
SimpleHttpConfig::NAME,
402-
log,
403-
self.path_key.path.as_ref().map(LegacyKey::InsertIfEmpty),
404-
path!("path"),
405-
request_path.to_owned(),
406-
);
407-
408-
// add each header to each event
409-
for header_name in &self.headers {
410-
let value = headers_config.get(header_name).map(HeaderValue::as_bytes);
399+
match event {
400+
Event::Log(log) => {
401+
// add request_path to each event
402+
self.log_namespace.insert_source_metadata(
403+
SimpleHttpConfig::NAME,
404+
log,
405+
self.path_key.path.as_ref().map(LegacyKey::InsertIfEmpty),
406+
path!("path"),
407+
request_path.to_owned(),
408+
);
411409

412-
self.log_namespace.insert_source_metadata(
413-
SimpleHttpConfig::NAME,
414-
log,
415-
Some(LegacyKey::InsertIfEmpty(path!(header_name))),
416-
path!("headers", header_name),
417-
Value::from(value.map(Bytes::copy_from_slice)),
418-
);
410+
// add each header to each event
411+
for header_name in &self.headers {
412+
let value = headers_config.get(header_name).map(HeaderValue::as_bytes);
413+
414+
self.log_namespace.insert_source_metadata(
415+
SimpleHttpConfig::NAME,
416+
log,
417+
Some(LegacyKey::InsertIfEmpty(path!(header_name))),
418+
path!("headers", header_name),
419+
Value::from(value.map(Bytes::copy_from_slice)),
420+
);
421+
}
422+
423+
self.log_namespace.insert_standard_vector_source_metadata(
424+
log,
425+
SimpleHttpConfig::NAME,
426+
now,
427+
);
428+
}
429+
_ => {
430+
continue;
431+
}
419432
}
420433
}
421434

@@ -426,17 +439,6 @@ impl HttpSource for SimpleHttpSource {
426439
self.log_namespace,
427440
SimpleHttpConfig::NAME,
428441
);
429-
430-
let now = Utc::now();
431-
for event in events {
432-
let log = event.as_mut_log();
433-
434-
self.log_namespace.insert_standard_vector_source_metadata(
435-
log,
436-
SimpleHttpConfig::NAME,
437-
now,
438-
);
439-
}
440442
}
441443

442444
fn build_events(
@@ -474,29 +476,29 @@ impl HttpSource for SimpleHttpSource {
474476

475477
#[cfg(test)]
476478
mod tests {
477-
use lookup::{event_path, owned_value_path, OwnedTargetPath};
478479
use std::str::FromStr;
479480
use std::{collections::BTreeMap, io::Write, net::SocketAddr};
480-
use vector_core::config::LogNamespace;
481-
use vector_core::event::LogEvent;
482-
use vector_core::schema::Definition;
483-
use vrl::value::kind::Collection;
484-
use vrl::value::Kind;
485481

486-
use codecs::{
487-
decoding::{DeserializerConfig, FramingConfig},
488-
BytesDecoderConfig, JsonDeserializerConfig,
489-
};
490482
use flate2::{
491483
write::{GzEncoder, ZlibEncoder},
492484
Compression,
493485
};
494486
use futures::Stream;
495487
use http::{HeaderMap, Method, StatusCode};
496-
use lookup::lookup_v2::OptionalValuePath;
497488
use similar_asserts::assert_eq;
489+
use vrl::value::kind::Collection;
490+
use vrl::value::Kind;
491+
492+
use codecs::{
493+
decoding::{DeserializerConfig, FramingConfig},
494+
BytesDecoderConfig, JsonDeserializerConfig,
495+
};
496+
use lookup::lookup_v2::OptionalValuePath;
497+
use lookup::{event_path, owned_value_path, OwnedTargetPath};
498+
use vector_core::config::LogNamespace;
499+
use vector_core::event::LogEvent;
500+
use vector_core::schema::Definition;
498501

499-
use super::{remove_duplicates, SimpleHttpConfig};
500502
use crate::sources::http_server::HttpMethod;
501503
use crate::{
502504
config::{log_schema, SourceConfig, SourceContext},
@@ -508,6 +510,8 @@ mod tests {
508510
SourceSender,
509511
};
510512

513+
use super::{remove_duplicates, SimpleHttpConfig};
514+
511515
#[test]
512516
fn generate_config() {
513517
crate::test_util::test_generate_config::<SimpleHttpConfig>();

src/sources/util/http/query.rs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,15 @@ pub fn add_query_parameters(
1616
for query_parameter_name in query_parameters_config {
1717
let value = query_parameters.get(query_parameter_name);
1818
for event in events.iter_mut() {
19-
log_namespace.insert_source_metadata(
20-
source_name,
21-
event.as_mut_log(),
22-
Some(LegacyKey::Overwrite(path!(query_parameter_name))),
23-
path!("query_parameters"),
24-
crate::event::Value::from(value.map(String::to_owned)),
25-
);
19+
if let Event::Log(log) = event {
20+
log_namespace.insert_source_metadata(
21+
source_name,
22+
log,
23+
Some(LegacyKey::Overwrite(path!(query_parameter_name))),
24+
path!("query_parameters"),
25+
crate::event::Value::from(value.map(String::to_owned)),
26+
);
27+
}
2628
}
2729
}
2830
}

0 commit comments

Comments
 (0)