Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions crates/sprout-relay/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ description = "WebSocket relay server for the Sprout communications platform"
name = "sprout-relay"
path = "src/main.rs"

[[bin]]
name = "sprout-reindex-kind0"
path = "src/bin/reindex_kind0.rs"

[dependencies]
sprout-core = { workspace = true }
sprout-db = { workspace = true }
Expand Down
168 changes: 168 additions & 0 deletions crates/sprout-relay/src/bin/reindex_kind0.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
//! One-shot admin tool: re-index all kind:0 (user metadata) events in Typesense.
//!
//! Necessary after the indexer change that appends `display_name`/`name`/`nip05`
//! values to the indexed content for kind:0 docs (see `sprout-search`'s
//! `flatten_kind0_for_indexing`). Existing docs need to be rewritten with the
//! appended tokens before they become searchable by display name.
//!
//! New / updated kind:0 events index correctly automatically — this tool only
//! exists to backfill the existing population.
//!
//! Usage (from the repo root, with .env sourced):
//!
//! ```
//! cargo run --release -p sprout-relay --bin sprout-reindex-kind0
//! ```
//!
//! Idempotent — Typesense uses upsert semantics, so running twice is safe.
//! Streams in batches so memory stays bounded regardless of relay size.
//!
//! ## Paging
//!
//! Walks `query_events` with a snapshot ceiling (`until = now()` at start) plus
//! a keyset cursor over `(created_at, id)` matching the underlying
//! `ORDER BY created_at DESC, id ASC` index. This guarantees:
//!
//! - No rows are skipped if new kind:0 events arrive during the run
//! (they're newer than the snapshot, so they fall outside the predicate).
//! - No rows are double-counted at page boundaries (the cursor advances
//! strictly past the last row of each batch).
//! - Bounded total work — won't chase its own tail under live write traffic.
//!
//! Newly-arrived kind:0 events that fall outside the snapshot are indexed by
//! the relay's live write path anyway, so this backfill plus the live path
//! together cover the full population.

use anyhow::Context;
use chrono::{DateTime, Utc};
use tracing::{info, warn};
use tracing_subscriber::{fmt, prelude::*, EnvFilter};

use sprout_db::{Db, DbConfig, EventQuery};
use sprout_relay::config::Config;
use sprout_search::{SearchConfig, SearchService};

/// Page size for the SQL → Typesense pipeline. Small enough to keep DB and
/// Typesense memory comfortable, large enough to amortise per-batch overhead.
const BATCH: i64 = 500;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::registry()
.with(fmt::layer())
.with(
EnvFilter::from_default_env()
.add_directive("sprout_reindex_kind0=info".parse()?)
.add_directive("sprout_relay=info".parse()?),
)
.init();

let config = Config::from_env().context("loading relay config from environment")?;

let db_config = DbConfig {
database_url: config.database_url.clone(),
..DbConfig::default()
};
let db = Db::new(&db_config)
.await
.context("connecting to postgres")?;

// SearchConfig::default() reads TYPESENSE_URL / TYPESENSE_API_KEY /
// TYPESENSE_COLLECTION from the environment, same as the relay does.
let search = SearchService::new(SearchConfig::default());
search
.ensure_collection()
.await
.context("ensuring Typesense collection")?;

// Snapshot ceiling: we only reindex events that already exist at start.
// Anything newer is handled by the relay's live indexing path.
let snapshot: DateTime<Utc> = Utc::now();

// Keyset cursor over (created_at, id) — matches the underlying
// `ORDER BY created_at DESC, id ASC` index. On the first iteration both
// cursor fields are None and the predicate reduces to `created_at <= snapshot`.
// Subsequent iterations advance to strictly past the last row of the prior batch.
let mut cursor_until: DateTime<Utc> = snapshot;
let mut cursor_before_id: Option<Vec<u8>> = None;

let mut total_indexed: usize = 0;
let mut total_failed: usize = 0;
let mut batches: usize = 0;

info!(?snapshot, "starting kind:0 reindex");

loop {
let q = EventQuery {
kinds: Some(vec![0]),
limit: Some(BATCH),
max_limit: Some(BATCH),
until: Some(cursor_until),
before_id: cursor_before_id.clone(),
..EventQuery::default()
};

let batch = db
.query_events(&q)
.await
.context("querying kind:0 events")?;

if batch.is_empty() {
break;
}

let batch_len = batch.len();

// Capture the tail of the batch for cursor advance *before* the index
// call, so we still advance even if indexing fails for this batch.
// (We'd otherwise loop forever on a poisoned batch.)
let tail = batch
.last()
.map(|ev| {
let ts = ev.event.created_at.as_u64() as i64;
let dt = DateTime::<Utc>::from_timestamp(ts, 0).unwrap_or(cursor_until);
let id_bytes = ev.event.id.to_bytes().to_vec();
(dt, id_bytes)
})
.expect("batch is non-empty (checked above)");

match search.index_batch(&batch).await {
Ok(indexed) => {
total_indexed += indexed;
if indexed < batch_len {
let failed = batch_len - indexed;
total_failed += failed;
warn!(failed, batch_len, "some events failed to index in batch");
}
info!(indexed, batch_len, batches, total_indexed, "indexed batch");
}
Err(e) => {
warn!(error = %e, batch_len, batches, "batch index failed entirely");
total_failed += batch_len;
}
}

batches += 1;

// Tail of the prior batch becomes the cursor for the next page.
// `query_events` will use the composite predicate
// created_at < cursor_until OR (created_at = cursor_until AND id > cursor_before_id)
// which exactly skips past the last row we just processed.
cursor_until = tail.0;
cursor_before_id = Some(tail.1);

// If we got fewer than BATCH back, we're at the tail of the table.
if (batch_len as i64) < BATCH {
break;
}
}

info!(
total_indexed,
total_failed, batches, "kind:0 reindex complete"
);
if total_failed > 0 {
std::process::exit(1);
}
Ok(())
}
189 changes: 188 additions & 1 deletion crates/sprout-search/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,36 @@ pub fn event_to_document(event: &StoredEvent) -> Result<Value, SearchError> {
.map(|id| id.to_string())
.unwrap_or_else(|| "__global__".to_string());

// For kind:0 (user metadata) we append the parsed JSON values to `content`
// so Typesense's word-tokenizer can index them cleanly. Without this, a
// raw blob like `{"display_name":"alice","about":"loves cats"}` does not
// produce a clean `alice` token — the leading `"` glues onto the next
// word, so the doc is unreachable for the obvious `q=alice` search.
//
// We only flatten kind:0 (the structured-metadata kind defined by NIP-01)
// and only the small set of fields the member-picker uses. Bio / about /
// website are intentionally left out so they don't pollute name-prefix
// searches with false positives. Stays consistent with `display_name >
// nip05 > pubkey` ranking applied on the desktop side.
//
// The Typesense `content` field is write-only as far as the relay's read
// paths go (the bridge fetches the canonical event from Postgres by id
// after Typesense returns hits), so appending derived tokens here doesn't
// affect any consumer's view of the event's actual content.
//
// NOTE: existing kind:0 docs indexed before this change won't have the
// appended tokens. Running `just reindex-search` (or the
// `sprout-relay reindex-search` admin path) repopulates them. New /
// updated profiles get the tokens automatically.
let content_indexed = if event_kind_i32(nostr_event) == 0 {
flatten_kind0_for_indexing(nostr_event.content.as_str())
} else {
nostr_event.content.as_str().to_string()
};

let doc = json!({
"id": nostr_event.id.to_string(),
"content": nostr_event.content.as_str(),
"content": content_indexed,
// Cast to i32 for Typesense schema (int32 field). nostr Kind is u16; all Sprout kinds fit in i32.
"kind": event_kind_i32(nostr_event),
"pubkey": nostr_event.pubkey.to_string(),
Expand All @@ -57,6 +84,38 @@ pub fn event_to_document(event: &StoredEvent) -> Result<Value, SearchError> {
Ok(doc)
}

/// For kind:0 events, return the original content with the searchable fields
/// (`display_name`, `name`, `nip05`) appended as space-separated plain words.
///
/// Tolerant of malformed input: anything that fails JSON parsing returns the
/// original content unchanged, never an error.
fn flatten_kind0_for_indexing(raw_content: &str) -> String {
let Ok(parsed) = serde_json::from_str::<Value>(raw_content) else {
return raw_content.to_string();
};
let Some(obj) = parsed.as_object() else {
return raw_content.to_string();
};

let mut extracted: Vec<&str> = Vec::with_capacity(3);
for key in ["display_name", "name", "nip05"] {
if let Some(val) = obj.get(key).and_then(Value::as_str) {
let trimmed = val.trim();
if !trimmed.is_empty() {
extracted.push(trimmed);
}
}
}

if extracted.is_empty() {
raw_content.to_string()
} else {
// Single leading space ensures we don't smash the closing `}` of the
// original JSON into the first appended token.
format!("{} {}", raw_content, extracted.join(" "))
}
}

/// Indexes a single event via Typesense upsert.
pub async fn index_event(
client: &reqwest::Client,
Expand Down Expand Up @@ -269,6 +328,134 @@ mod tests {
assert_eq!(doc["channel_id"].as_str().unwrap(), "__global__");
}

// ── kind:0 flattening for searchability ─────────────────────────────────

#[test]
fn kind0_appends_display_name_for_tokenization() {
let stored = make_stored_event(
r#"{"display_name":"alice","about":"loves cats"}"#,
Kind::Metadata,
None,
);
let doc = event_to_document(&stored).unwrap();
let content = doc["content"].as_str().unwrap();
// Original JSON is preserved (read paths don't depend on this but it
// costs nothing and makes debugging the index cheaper).
assert!(content.contains(r#""display_name":"alice""#));
// The display name is also present as a free-standing token so the
// default Typesense tokenizer can index it without the leading-quote
// gluing onto the next character.
assert!(content.ends_with(" alice"), "got: {content:?}");
}

#[test]
fn kind0_appends_name_when_display_name_absent() {
// NIP-01 allows `name` as the canonical display field too.
let stored = make_stored_event(r#"{"name":"bob","about":"x"}"#, Kind::Metadata, None);
let doc = event_to_document(&stored).unwrap();
let content = doc["content"].as_str().unwrap();
assert!(content.ends_with(" bob"), "got: {content:?}");
}

#[test]
fn kind0_includes_both_display_name_and_name_when_present() {
let stored = make_stored_event(
r#"{"display_name":"Alice","name":"alice"}"#,
Kind::Metadata,
None,
);
let doc = event_to_document(&stored).unwrap();
let content = doc["content"].as_str().unwrap();
assert!(content.ends_with(" Alice alice"), "got: {content:?}");
}

#[test]
fn kind0_includes_nip05_in_appended_tokens() {
let stored = make_stored_event(
r#"{"display_name":"alice","nip05":"alice@example.com"}"#,
Kind::Metadata,
None,
);
let doc = event_to_document(&stored).unwrap();
let content = doc["content"].as_str().unwrap();
assert!(
content.ends_with(" alice alice@example.com"),
"got: {content:?}"
);
}

#[test]
fn kind0_excludes_about_and_website_from_appended_tokens() {
// `about` and `website` deliberately do not get appended — including
// them would cause name-prefix searches to return false positives from
// bios. The user's own display_name still appears.
let stored = make_stored_event(
r#"{"display_name":"alice","about":"I work with bob on x","website":"https://carol.example"}"#,
Kind::Metadata,
None,
);
let doc = event_to_document(&stored).unwrap();
let content = doc["content"].as_str().unwrap();
assert!(content.ends_with(" alice"), "got: {content:?}");
// Sanity: the about/website are still in the doc because we preserve
// the original JSON — they just don't appear in the trailing tokens.
assert!(content.contains("bob"));
assert!(content.contains("carol"));
}

#[test]
fn kind0_malformed_json_is_passed_through_unchanged() {
let stored = make_stored_event("not json at all", Kind::Metadata, None);
let doc = event_to_document(&stored).unwrap();
assert_eq!(doc["content"].as_str().unwrap(), "not json at all");
}

#[test]
fn kind0_non_object_json_is_passed_through_unchanged() {
// Defensive: NIP-01 says content is a JSON object, but a malformed
// client could publish e.g. a JSON array. We don't crash, we just
// skip the flattening for that doc.
let stored = make_stored_event(r#"["nope"]"#, Kind::Metadata, None);
let doc = event_to_document(&stored).unwrap();
assert_eq!(doc["content"].as_str().unwrap(), r#"["nope"]"#);
}

#[test]
fn kind0_empty_string_values_skipped() {
let stored = make_stored_event(
r#"{"display_name":"","name":"alice","nip05":" "}"#,
Kind::Metadata,
None,
);
let doc = event_to_document(&stored).unwrap();
let content = doc["content"].as_str().unwrap();
// Only `name` is non-empty; whitespace-only `nip05` is also skipped.
assert!(content.ends_with(" alice"), "got: {content:?}");
}

#[test]
fn kind0_no_searchable_fields_is_passed_through() {
// Profile with only fields we don't extract.
let stored = make_stored_event(
r#"{"about":"just a bio","picture":"https://x"}"#,
Kind::Metadata,
None,
);
let doc = event_to_document(&stored).unwrap();
let content = doc["content"].as_str().unwrap();
// No trailing space-separated tokens added; original content unchanged.
assert_eq!(content, r#"{"about":"just a bio","picture":"https://x"}"#);
}

#[test]
fn non_kind0_events_not_flattened() {
// kind:1 (note) with a JSON-looking body must be left strictly alone.
let json_looking = r#"{"display_name":"alice"}"#;
let stored = make_stored_event(json_looking, Kind::TextNote, None);
let doc = event_to_document(&stored).unwrap();
assert_eq!(doc["content"].as_str().unwrap(), json_looking);
}

#[test]
fn tag_flattening_uses_unit_separator() {
let keys = Keys::generate();
Expand Down
Loading
Loading