Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions crates/sprout-acp/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -901,9 +901,17 @@ async fn tokio_main() -> Result<()> {
.as_secs();

let pubkey_hex = config.keys.public_key().to_hex();
let mut relay = HarnessRelay::connect(&config.relay_url, &config.keys, &pubkey_hex)
.await
.map_err(|e| anyhow::anyhow!("relay connect error: {e}"))?;

// Parse SPROUT_AUTH_TAG into a nostr::Tag for NIP-OA relay membership delegation.
let relay_auth_tag: Option<nostr::Tag> = std::env::var("SPROUT_AUTH_TAG")
.ok()
.filter(|s| !s.is_empty())
.and_then(|s| sprout_sdk::nip_oa::parse_auth_tag(&s).ok());

let mut relay =
HarnessRelay::connect(&config.relay_url, &config.keys, &pubkey_hex, relay_auth_tag)
.await
.map_err(|e| anyhow::anyhow!("relay connect error: {e}"))?;

// Finding #22: tell the relay background task the watermark so it can use
// `since = watermark - 5s` on the first REQ instead of `since=now`.
Expand Down
82 changes: 72 additions & 10 deletions crates/sprout-acp/src/relay.rs

Large diffs are not rendered by default.

56 changes: 44 additions & 12 deletions crates/sprout-mcp/src/relay_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ async fn do_connect(
relay_url: &str,
keys: &Keys,
api_token: Option<&str>,
auth_tag: Option<&Tag>,
) -> Result<WsStream, RelayClientError> {
let parsed = relay_url
.parse::<url::Url>()
Expand All @@ -236,7 +237,7 @@ async fn do_connect(
// Wait for AUTH challenge (5s timeout).
let challenge = wait_for_auth_challenge(&mut ws, Duration::from_secs(5)).await?;

let auth_event = build_auth_event(&challenge, relay_url, keys, api_token)?;
let auth_event = build_auth_event(&challenge, relay_url, keys, api_token, auth_tag)?;
let event_id = auth_event.id.to_hex();
debug!("sending AUTH event {event_id}");
let auth_msg = serde_json::to_string(&json!(["AUTH", auth_event]))?;
Expand Down Expand Up @@ -318,25 +319,42 @@ async fn wait_for_ok(
}

/// Build a NIP-42 AUTH event for the given challenge.
///
/// If `auth_tag` is provided (NIP-OA owner attestation), it is included in the
/// AUTH event so the relay can use it for membership delegation fallback.
#[allow(clippy::result_large_err)]
fn build_auth_event(
challenge: &str,
relay_url: &str,
keys: &Keys,
api_token: Option<&str>,
auth_tag: Option<&Tag>,
) -> Result<Event, RelayClientError> {
let relay_nostr_url: Url = relay_url
.parse()
.map_err(|e: url::ParseError| RelayClientError::Url(e.to_string()))?;
if let Some(token) = api_token {
let tags = vec![
let mut tags = vec![
Tag::parse(&["relay", relay_url])
.map_err(|e| RelayClientError::EventBuilder(e.to_string()))?,
Tag::parse(&["challenge", challenge])
.map_err(|e| RelayClientError::EventBuilder(e.to_string()))?,
Tag::parse(&["auth_token", token])
.map_err(|e| RelayClientError::EventBuilder(e.to_string()))?,
];
if let Some(t) = auth_tag {
tags.push(t.clone());
}
Ok(EventBuilder::new(Kind::Authentication, "", tags).sign_with_keys(keys)?)
} else if let Some(t) = auth_tag {
// Cannot use EventBuilder::auth() shortcut — it doesn't accept extra tags.
let tags = vec![
Tag::parse(&["relay", relay_url])
.map_err(|e| RelayClientError::EventBuilder(e.to_string()))?,
Tag::parse(&["challenge", challenge])
.map_err(|e| RelayClientError::EventBuilder(e.to_string()))?,
t.clone(),
];
Ok(EventBuilder::new(Kind::Authentication, "", tags).sign_with_keys(keys)?)
} else {
Ok(EventBuilder::auth(challenge, relay_nostr_url).sign_with_keys(keys)?)
Expand All @@ -353,9 +371,10 @@ async fn send_auth_response(
relay_url: &str,
keys: &Keys,
api_token: Option<&str>,
auth_tag: Option<&Tag>,
) {
let result: Result<(), RelayClientError> = async {
let auth_event = build_auth_event(challenge, relay_url, keys, api_token)?;
let auth_event = build_auth_event(challenge, relay_url, keys, api_token, auth_tag)?;
let msg = serde_json::to_string(&json!(["AUTH", auth_event]))?;
ws.send(Message::Text(msg.into())).await?;
debug!("sent AUTH response for mid-session challenge");
Expand All @@ -377,6 +396,7 @@ async fn handle_ws_message(
keys: &Keys,
relay_url: &str,
api_token: Option<&str>,
auth_tag: Option<&Tag>,
) -> bool {
match msg {
Message::Text(text) => {
Expand Down Expand Up @@ -429,7 +449,7 @@ async fn handle_ws_message(
}
RelayMessage::Auth { challenge } => {
debug!("received mid-session AUTH challenge — re-authenticating");
send_auth_response(ws, &challenge, relay_url, keys, api_token).await;
send_auth_response(ws, &challenge, relay_url, keys, api_token, auth_tag).await;
}
}
true
Expand Down Expand Up @@ -463,13 +483,14 @@ async fn do_reconnect(
keys: &Keys,
relay_url: &str,
api_token: Option<&str>,
auth_tag: Option<&Tag>,
) -> bool {
warn!("relay connection lost — reconnecting…");
state.cancel_pending();

let mut delay = Duration::from_secs(1);
loop {
match do_connect(relay_url, keys, api_token).await {
match do_connect(relay_url, keys, api_token, auth_tag).await {
Ok(new_ws) => {
tracing::info!("reconnected to relay at {relay_url}");
*ws = new_ws;
Expand Down Expand Up @@ -544,6 +565,7 @@ async fn run_background_task(
keys: Keys,
relay_url: String,
api_token: Option<String>,
auth_tag: Option<Tag>,
) {
let mut state = BgState::new();
// Ticker for expiring timed-out pending operations (~1s granularity).
Expand All @@ -557,14 +579,14 @@ async fn run_background_task(
let needs_reconnect = match raw {
Some(Ok(msg)) => {
!handle_ws_message(
msg, &mut ws, &mut state, &keys, &relay_url, api_token.as_deref(),
msg, &mut ws, &mut state, &keys, &relay_url, api_token.as_deref(), auth_tag.as_ref(),
).await
}
Some(Err(e)) => { warn!("WebSocket error: {e}"); true }
None => { debug!("WebSocket stream ended"); true }
};
if needs_reconnect
&& !do_reconnect(&mut ws, &mut state, &mut cmd_rx, &keys, &relay_url, api_token.as_deref()).await
&& !do_reconnect(&mut ws, &mut state, &mut cmd_rx, &keys, &relay_url, api_token.as_deref(), auth_tag.as_ref()).await
{
return; // Shutdown received during reconnect
}
Expand All @@ -581,7 +603,7 @@ async fn run_background_task(
};
if let Err(e) = ws.send(Message::Text(msg.into())).await {
let _ = reply.send(Err(RelayClientError::WebSocket(e)));
if !do_reconnect(&mut ws, &mut state, &mut cmd_rx, &keys, &relay_url, api_token.as_deref()).await {
if !do_reconnect(&mut ws, &mut state, &mut cmd_rx, &keys, &relay_url, api_token.as_deref(), auth_tag.as_ref()).await {
return;
}
continue;
Expand Down Expand Up @@ -611,7 +633,7 @@ async fn run_background_task(
};
if let Err(e) = ws.send(Message::Text(text.into())).await {
let _ = reply.send(Err(RelayClientError::WebSocket(e)));
if !do_reconnect(&mut ws, &mut state, &mut cmd_rx, &keys, &relay_url, api_token.as_deref()).await {
if !do_reconnect(&mut ws, &mut state, &mut cmd_rx, &keys, &relay_url, api_token.as_deref(), auth_tag.as_ref()).await {
return;
}
continue;
Expand All @@ -636,7 +658,7 @@ async fn run_background_task(
};
if let Err(e) = ws.send(Message::Text(msg.into())).await {
let _ = reply.send(Err(RelayClientError::WebSocket(e)));
if !do_reconnect(&mut ws, &mut state, &mut cmd_rx, &keys, &relay_url, api_token.as_deref()).await {
if !do_reconnect(&mut ws, &mut state, &mut cmd_rx, &keys, &relay_url, api_token.as_deref(), auth_tag.as_ref()).await {
return;
}
continue;
Expand Down Expand Up @@ -715,16 +737,17 @@ impl RelayClient {
api_token: Option<&str>,
auth_tag: Option<nostr::Tag>,
) -> Result<Self, RelayClientError> {
let ws = do_connect(relay_url, keys, api_token).await?;
let ws = do_connect(relay_url, keys, api_token, auth_tag.as_ref()).await?;

let (cmd_tx, cmd_rx) = mpsc::channel(CMD_CHANNEL_CAPACITY);

let bg_keys = keys.clone();
let bg_relay_url = relay_url.to_string();
let bg_api_token = api_token.map(|t| t.to_string());
let bg_auth_tag = auth_tag.clone();

let handle = tokio::spawn(async move {
run_background_task(ws, cmd_rx, bg_keys, bg_relay_url, bg_api_token).await;
run_background_task(ws, cmd_rx, bg_keys, bg_relay_url, bg_api_token, bg_auth_tag).await;
});

Ok(Self {
Expand Down Expand Up @@ -802,6 +825,15 @@ impl RelayClient {
&self.keys
}

/// Returns the NIP-OA auth tag JSON string for use in HTTP `x-auth-tag` headers.
///
/// Returns `None` if no auth tag is configured (direct-member agents).
pub fn auth_tag_json(&self) -> Option<String> {
self.auth_tag
.as_ref()
.and_then(|t| serde_json::to_string(t.as_slice()).ok())
}

/// Returns the relay's server authority (host or host:port) for BUD-11 server tags.
///
/// Uses the same logic as the desktop client's `extract_server_authority`:
Expand Down
2 changes: 2 additions & 0 deletions crates/sprout-mcp/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1017,6 +1017,7 @@ Default kind is 9 (stream message)."
&self.client.relay_http_url(),
self.client.server_domain().as_deref(),
path,
self.client.auth_tag_json().as_deref(),
)
.await
{
Expand Down Expand Up @@ -3113,6 +3114,7 @@ on send_message to upload and attach in one step."
&self.client.relay_http_url(),
self.client.server_domain().as_deref(),
&p.file_path,
self.client.auth_tag_json().as_deref(),
)
.await
{
Expand Down
15 changes: 10 additions & 5 deletions crates/sprout-mcp/src/upload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,16 @@ pub enum UploadError {
///
/// Performs validation, SHA-256 hashing, Blossom auth signing, and the HTTP PUT.
/// Returns the relay's [`BlobDescriptor`] on success.
///
/// `auth_tag_json` is an optional NIP-OA auth tag (JSON-array string) sent as
/// the `x-auth-tag` header for relay membership delegation.
pub async fn upload_file(
http: &reqwest::Client,
keys: &Keys,
relay_http_url: &str,
server_domain: Option<&str>,
file_path: &str,
auth_tag_json: Option<&str>,
) -> Result<BlobDescriptor, UploadError> {
// 1. Validate path exists
let metadata = std::fs::metadata(file_path).map_err(|e| {
Expand Down Expand Up @@ -223,15 +227,16 @@ pub async fn upload_file(
};

let url = format!("{}/media/upload", relay_http_url.trim_end_matches('/'));
let resp = http
let mut req = http
.put(&url)
.timeout(upload_timeout)
.header("Authorization", &auth_header)
.header("Content-Type", mime)
.header("X-SHA-256", &sha256)
.body(bytes)
.send()
.await?;
.header("X-SHA-256", &sha256);
if let Some(tag) = auth_tag_json {
req = req.header("x-auth-tag", tag);
}
let resp = req.body(bytes).send().await?;

// 9. Handle response
let status = resp.status();
Expand Down
11 changes: 7 additions & 4 deletions crates/sprout-relay/src/api/bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,9 @@ pub async fn submit_event(
check_nip98_replay(&state, event_id_bytes)?;
let pubkey_bytes = pubkey.serialize().to_vec();

// Enforce relay membership
super::relay_members::enforce_relay_membership(&state, &pubkey_bytes, None).await?;
// Enforce relay membership (with NIP-OA fallback via x-auth-tag header).
let auth_tag = headers.get("x-auth-tag").and_then(|v| v.to_str().ok());
super::relay_members::enforce_relay_membership(&state, &pubkey_bytes, auth_tag).await?;

let event: nostr::Event = serde_json::from_slice(&body)
.map_err(|e| api_error(StatusCode::BAD_REQUEST, &format!("invalid event JSON: {e}")))?;
Expand Down Expand Up @@ -184,7 +185,8 @@ pub async fn query_events(
check_nip98_replay(&state, event_id_bytes)?;
let pubkey_bytes = pubkey.serialize().to_vec();

super::relay_members::enforce_relay_membership(&state, &pubkey_bytes, None).await?;
let auth_tag = headers.get("x-auth-tag").and_then(|v| v.to_str().ok());
super::relay_members::enforce_relay_membership(&state, &pubkey_bytes, auth_tag).await?;

let filters: Vec<nostr::Filter> = serde_json::from_slice(&body)
.map_err(|e| api_error(StatusCode::BAD_REQUEST, &format!("invalid filters: {e}")))?;
Expand Down Expand Up @@ -278,7 +280,8 @@ pub async fn count_events(
check_nip98_replay(&state, event_id_bytes)?;
let pubkey_bytes = pubkey.serialize().to_vec();

super::relay_members::enforce_relay_membership(&state, &pubkey_bytes, None).await?;
let auth_tag = headers.get("x-auth-tag").and_then(|v| v.to_str().ok());
super::relay_members::enforce_relay_membership(&state, &pubkey_bytes, auth_tag).await?;

let filters: Vec<nostr::Filter> = serde_json::from_slice(&body)
.map_err(|e| api_error(StatusCode::BAD_REQUEST, &format!("invalid filters: {e}")))?;
Expand Down
15 changes: 11 additions & 4 deletions crates/sprout-relay/src/audio/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ async fn handle_audio_connection(socket: WebSocket, state: Arc<AppState>, channe
}
};

// Extract NIP-OA auth tag before verify_auth_event consumes the event.
let auth_tag_json = crate::handlers::auth::extract_auth_tag_json(&auth_msg.event);

let relay_url = state.config.relay_url.clone();
let auth_ctx = match state
.auth
Expand All @@ -142,10 +145,14 @@ async fn handle_audio_connection(socket: WebSocket, state: Arc<AppState>, channe
let pubkey_bytes = pubkey.serialize().to_vec();
let parent_channel_id = auth_msg.parent_channel_id;

// ── Relay membership gate (NIP-43) ────────────────────────────────────────
if crate::api::relay_members::enforce_relay_membership(&state, &pubkey.serialize(), None)
.await
.is_err()
// ── Relay membership gate (with NIP-OA fallback) ────────────────────────────
if crate::api::relay_members::enforce_relay_membership(
&state,
&pubkey.serialize(),
auth_tag_json.as_deref(),
)
.await
.is_err()
{
warn!(channel_id = %channel_id, pubkey = %pubkey_hex, "audio: relay membership denied");
let _ = ws_send
Expand Down
Loading
Loading