Skip to content

Commit 2704f11

Browse files
sproberts92AndrooTheChen
authored andcommitted
feat(journald source): Add emit_cursor option (vectordotdev#18882)
* Add emit_cursor option to journald source * Update documentation * Fix typo
1 parent 61e8240 commit 2704f11

2 files changed

Lines changed: 43 additions & 5 deletions

File tree

src/sources/journald.rs

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,13 @@ pub struct JournaldConfig {
209209
#[configurable(metadata(docs::hidden))]
210210
#[serde(default)]
211211
log_namespace: Option<bool>,
212+
213+
/// Whether to emit the [__CURSOR field][cursor]. See also [sd_journal_get_cursor][get_cursor].
214+
///
215+
/// [cursor]: https://www.freedesktop.org/software/systemd/man/latest/systemd.journal-fields.html#Address%20Fields
216+
/// [get_cursor]: https://www.freedesktop.org/software/systemd/man/latest/sd_journal_get_cursor.html
217+
#[serde(default = "crate::serde::default_false")]
218+
emit_cursor: bool,
212219
}
213220

214221
const fn default_batch_size() -> usize {
@@ -308,6 +315,7 @@ impl Default for JournaldConfig {
308315
acknowledgements: Default::default(),
309316
remap_priority: false,
310317
log_namespace: None,
318+
emit_cursor: false,
311319
}
312320
}
313321
}
@@ -377,6 +385,7 @@ impl SourceConfig for JournaldConfig {
377385
acknowledgements,
378386
starter,
379387
log_namespace,
388+
emit_cursor: self.emit_cursor,
380389
}
381390
.run_shutdown(cx.shutdown),
382391
))
@@ -404,6 +413,7 @@ struct JournaldSource {
404413
acknowledgements: bool,
405414
starter: StartJournalctl,
406415
log_namespace: LogNamespace,
416+
emit_cursor: bool,
407417
}
408418

409419
impl JournaldSource {
@@ -554,7 +564,11 @@ impl<'a> Batch<'a> {
554564
Some(Ok(bytes)) => {
555565
match decode_record(&bytes, self.source.remap_priority) {
556566
Ok(mut record) => {
557-
if let Some(tmp) = record.remove(CURSOR) {
567+
if self.source.emit_cursor {
568+
if let Some(tmp) = record.get(CURSOR) {
569+
self.cursor = Some(tmp.clone());
570+
}
571+
} else if let Some(tmp) = record.remove(CURSOR) {
558572
self.cursor = Some(tmp);
559573
}
560574

@@ -1089,13 +1103,14 @@ mod tests {
10891103
async fn run_with_units(iunits: &[&str], xunits: &[&str], cursor: Option<&str>) -> Vec<Event> {
10901104
let include_matches = create_unit_matches(iunits.to_vec());
10911105
let exclude_matches = create_unit_matches(xunits.to_vec());
1092-
run_journal(include_matches, exclude_matches, cursor).await
1106+
run_journal(include_matches, exclude_matches, cursor, false).await
10931107
}
10941108

10951109
async fn run_journal(
10961110
include_matches: Matches,
10971111
exclude_matches: Matches,
10981112
checkpoint: Option<&str>,
1113+
emit_cursor: bool,
10991114
) -> Vec<Event> {
11001115
assert_source_compliance(&["protocol"], async move {
11011116
let (tx, rx) = SourceSender::new_test_finalize(EventStatus::Delivered);
@@ -1128,6 +1143,7 @@ mod tests {
11281143
data_dir: Some(tempdir),
11291144
remap_priority: true,
11301145
acknowledgements: false.into(),
1146+
emit_cursor,
11311147
..Default::default()
11321148
};
11331149
let source = config.build(cx).await.unwrap();
@@ -1207,10 +1223,18 @@ mod tests {
12071223
);
12081224
}
12091225

1226+
#[tokio::test]
1227+
async fn emits_cursor() {
1228+
let received = run_journal(Matches::new(), Matches::new(), None, true).await;
1229+
assert_eq!(cursor(&received[0]), Value::Bytes("1".into()));
1230+
assert_eq!(cursor(&received[3]), Value::Bytes("4".into()));
1231+
assert_eq!(cursor(&received[7]), Value::Bytes("8".into()));
1232+
}
1233+
12101234
#[tokio::test]
12111235
async fn includes_matches() {
12121236
let matches = create_matches(vec![("PRIORITY", "ERR")]);
1213-
let received = run_journal(matches, HashMap::new(), None).await;
1237+
let received = run_journal(matches, HashMap::new(), None, false).await;
12141238
assert_eq!(received.len(), 2);
12151239
assert_eq!(
12161240
message(&received[0]),
@@ -1227,7 +1251,7 @@ mod tests {
12271251
#[tokio::test]
12281252
async fn includes_kernel() {
12291253
let matches = create_matches(vec![("_TRANSPORT", "kernel")]);
1230-
let received = run_journal(matches, HashMap::new(), None).await;
1254+
let received = run_journal(matches, HashMap::new(), None, false).await;
12311255
assert_eq!(received.len(), 1);
12321256
assert_eq!(timestamp(&received[0]), value_ts(1578529839, 140006000));
12331257
assert_eq!(message(&received[0]), Value::Bytes("audit log".into()));
@@ -1236,7 +1260,7 @@ mod tests {
12361260
#[tokio::test]
12371261
async fn excludes_matches() {
12381262
let matches = create_matches(vec![("PRIORITY", "INFO"), ("PRIORITY", "DEBUG")]);
1239-
let received = run_journal(HashMap::new(), matches, None).await;
1263+
let received = run_journal(HashMap::new(), matches, None, false).await;
12401264
assert_eq!(received.len(), 5);
12411265
assert_eq!(timestamp(&received[0]), value_ts(1578529839, 140003000));
12421266
assert_eq!(timestamp(&received[1]), value_ts(1578529839, 140004000));
@@ -1515,6 +1539,10 @@ mod tests {
15151539
event.as_log()[log_schema().timestamp_key().unwrap().to_string()].clone()
15161540
}
15171541

1542+
fn cursor(event: &Event) -> Value {
1543+
event.as_log()[CURSOR].clone()
1544+
}
1545+
15181546
fn value_ts(secs: i64, usecs: u32) -> Value {
15191547
Value::Timestamp(
15201548
chrono::Utc

website/cue/reference/components/sources/base/journald.cue

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,16 @@ base: components: sources: journald: configuration: {
4949
required: false
5050
type: string: examples: ["/var/lib/vector"]
5151
}
52+
emit_cursor: {
53+
description: """
54+
Whether to emit the [__CURSOR field][cursor]. See also [sd_journal_get_cursor][get_cursor].
55+
56+
[cursor]: https://www.freedesktop.org/software/systemd/man/latest/systemd.journal-fields.html#Address%20Fields
57+
[get_cursor]: https://www.freedesktop.org/software/systemd/man/latest/sd_journal_get_cursor.html
58+
"""
59+
required: false
60+
type: bool: default: false
61+
}
5262
exclude_matches: {
5363
description: """
5464
A list of sets of field/value pairs that, if any are present in a journal entry,

0 commit comments

Comments
 (0)