Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 150 additions & 36 deletions core/metadata/src/impls/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::MuxStateMachine;
use crate::stm::consumer_group::ConsumerGroups;
use crate::stm::snapshot::{FillSnapshot, MetadataSnapshot, Snapshot, SnapshotError};
use crate::stm::stream::Streams;
use crate::stm::user::Users;
use crate::stm::user::{DeletePersonalAccessTokenRequest, Users};
use crate::stm::{ConsensusGroupAllocator, StateMachine};
use consensus::{
CLIENTS_TABLE_MAX, Canceled, ClientTable, CommitLogEvent, Consensus, EvictionContext, Pipeline,
Expand All @@ -38,9 +38,10 @@ use iggy_binary_protocol::requests::topics::CreateTopicRequest as WireCreateTopi
use iggy_binary_protocol::requests::topics::CreateTopicWithAssignmentsRequest as PersistedCreateTopicRequest;
use iggy_binary_protocol::{
Command2, ConsensusHeader, GenericHeader, Operation, PrepareHeader, PrepareOkHeader,
RequestHeader, WireDecode, WireEncode,
RequestHeader, WireDecode, WireEncode, WireName,
};
use iggy_common::IggyError;
use iggy_common::UserId;
use iggy_common::variadic;
use journal::{Journal, JournalHandle};
use message_bus::MessageBus;
Expand Down Expand Up @@ -278,39 +279,47 @@ impl<M> SnapshotCoordinator<M> {
}
}

/// Failures for [`IggyMetadata::submit_register_in_process`]. All transient;
/// the login/register handler wraps every variant in
/// `LoginRegisterError::Transient` so SDK read-timeout replays.
/// Failures shared by the in-process metadata submit helpers.
///
/// Returned by [`IggyMetadata::submit_register_in_process`],
/// [`IggyMetadata::submit_logout_in_process`],
/// [`IggyMetadata::submit_request_in_process`], and
/// [`IggyMetadata::submit_delete_personal_access_token_in_process`]. Every variant is
/// transient: the caller retries on a later attempt, and the login/register
/// handler wraps them in `LoginRegisterError::Transient` so the SDK
/// read-timeout replays.
#[derive(Debug, Clone, PartialEq, Eq)]
#[non_exhaustive]
pub enum RegisterSubmitError {
pub enum MetadataSubmitError {
/// Not primary / not Normal.
NotPrimary,
/// Primary but `commit_min < commit_max`. Fresh dispatch would race an
/// inherited register and panic `commit_register`'s session-eq assert.
/// Primary but `commit_min < commit_max` (committed prefix not yet
/// drained). Dispatching now would race ops inherited from a prior view;
/// for `Register` that trips `commit_register`'s session-eq assert.
NotCaughtUp,
/// Prepare queue full.
PipelineFull,
/// In-flight prepare from this client.
InProgress,
/// Receiver `Canceled` and post-await re-check showed no session.
/// SDK replay hits new primary via cached register reply or `New`.
/// The pending prepare was canceled before commit (a view change reset
/// the pipeline). The caller retries; the SDK read-timeout replay reaches
/// the new primary.
Canceled,
}

impl std::fmt::Display for RegisterSubmitError {
impl std::fmt::Display for MetadataSubmitError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::NotPrimary => f.write_str("not primary in normal status"),
Self::NotCaughtUp => f.write_str("primary not yet caught up on commit_journal"),
Self::PipelineFull => f.write_str("metadata prepare queue is full"),
Self::InProgress => f.write_str("another register from this client is in flight"),
Self::Canceled => f.write_str("view change canceled the pending register"),
Self::InProgress => f.write_str("another in-flight prepare from this client"),
Self::Canceled => f.write_str("view change canceled the pending prepare"),
}
}
}

impl std::error::Error for RegisterSubmitError {}
impl std::error::Error for MetadataSubmitError {}

/// Log + surface `None` when a metadata callback runs on a peer shard
/// (whose `consensus` / `journal` slot is `None`). The `Plane` trait
Expand Down Expand Up @@ -853,7 +862,7 @@ where
/// Session number (= commit op). Idempotent: existing session short-circuits.
///
/// # Errors
/// [`RegisterSubmitError`] (all transient): `NotPrimary`, `NotCaughtUp`,
/// [`MetadataSubmitError`] (all transient): `NotPrimary`, `NotCaughtUp`,
/// `PipelineFull`, `InProgress`, `Canceled`. `Canceled` dominates on view
/// change; new primary inherits via `commit_journal`, SDK retries.
///
Expand All @@ -867,7 +876,7 @@ where
pub async fn submit_register_in_process(
&self,
client_id: u128,
) -> Result<u64, RegisterSubmitError> {
) -> Result<u64, MetadataSubmitError> {
assert!(client_id != 0, "client_id 0 is reserved for internal use");
let consensus = self
.consensus
Expand All @@ -884,9 +893,9 @@ where
if !is_caught_up_primary(consensus) {
return Err(
if consensus.is_primary() && consensus.is_normal() && !consensus.is_syncing() {
RegisterSubmitError::NotCaughtUp
MetadataSubmitError::NotCaughtUp
} else {
RegisterSubmitError::NotPrimary
MetadataSubmitError::NotPrimary
},
);
}
Expand All @@ -898,13 +907,13 @@ where
.borrow()
.has_message_from_client(client_id)
{
return Err(RegisterSubmitError::InProgress);
return Err(MetadataSubmitError::InProgress);
}

// TODO(pipeline-backpressure): in-process has no request_queue yet;
// terminal on full. Wire path buffers.
if consensus.pipeline().borrow().is_full() {
return Err(RegisterSubmitError::PipelineFull);
return Err(MetadataSubmitError::PipelineFull);
}

let request = build_register_request_message(consensus, client_id);
Expand Down Expand Up @@ -977,7 +986,7 @@ where
self.client_table
.borrow()
.get_session(client_id)
.ok_or(RegisterSubmitError::Canceled)
.ok_or(MetadataSubmitError::Canceled)
}
}
}
Expand All @@ -1003,7 +1012,7 @@ where
client_id: u128,
session: u64,
request: u64,
) -> Result<u64, RegisterSubmitError> {
) -> Result<u64, MetadataSubmitError> {
assert!(client_id != 0, "client_id 0 is reserved for internal use");
let consensus = self
.consensus
Expand All @@ -1022,9 +1031,9 @@ where
if !is_caught_up_primary(consensus) {
return Err(
if consensus.is_primary() && consensus.is_normal() && !consensus.is_syncing() {
RegisterSubmitError::NotCaughtUp
MetadataSubmitError::NotCaughtUp
} else {
RegisterSubmitError::NotPrimary
MetadataSubmitError::NotPrimary
},
);
}
Expand All @@ -1034,11 +1043,11 @@ where
.borrow()
.has_message_from_client(client_id)
{
return Err(RegisterSubmitError::InProgress);
return Err(MetadataSubmitError::InProgress);
}

if consensus.pipeline().borrow().is_full() {
return Err(RegisterSubmitError::PipelineFull);
return Err(MetadataSubmitError::PipelineFull);
}

let request = build_logout_request_message(consensus, client_id, session, request);
Expand Down Expand Up @@ -1090,12 +1099,116 @@ where
if self.client_table.borrow().get_session(client_id).is_none() {
Ok(consensus.commit_min())
} else {
Err(RegisterSubmitError::Canceled)
Err(MetadataSubmitError::Canceled)
}
}
}
}

/// `true` when this node is the caught-up primary of the metadata
/// consensus group. Gates leader-only maintenance (the PAT cleaner)
/// off backups and lagging primaries.
#[must_use]
pub fn is_caught_up_primary(&self) -> bool {
self.consensus.as_ref().is_some_and(is_caught_up_primary)
}

/// Submit a replicated `DeletePersonalAccessToken` originated by the
/// server (the PAT cleaner), not a client.
///
/// No client session exists, so this skips `request_preflight` (like
/// the logout precedent) and uses the reserved internal `client` id
/// `0`: never registered, so the commit path's `get_session(0)` is
/// `None` and skips `commit_reply` (and its `assert!(client_id != 0)`),
/// while the preflight and register asserts never run. Delete is
/// idempotent, so the dropped dedup is harmless and a re-proposal on the
/// next tick is a no-op.
///
/// # Errors
/// `NotPrimary` / `NotCaughtUp` when this node cannot replicate,
/// `PipelineFull` under pipeline pressure, `Canceled` if the prepare is
/// dropped before commit.
///
/// # Panics
/// On a shard without consensus (shard 0 only), or if the prepare gate
/// flips between validation and dispatch.
#[allow(clippy::future_not_send)]
pub async fn submit_delete_personal_access_token_in_process(
&self,
user_id: UserId,
name: WireName,
) -> Result<u64, MetadataSubmitError> {
let consensus = self.consensus.as_ref().expect(
"submit_delete_personal_access_token_in_process: consensus only exists on shard 0",
);

if !is_caught_up_primary(consensus) {
return Err(
if consensus.is_primary() && consensus.is_normal() && !consensus.is_syncing() {
MetadataSubmitError::NotCaughtUp
} else {
MetadataSubmitError::NotPrimary
},
);
}

if consensus.pipeline().borrow().is_full() {
return Err(MetadataSubmitError::PipelineFull);
}

let body = DeletePersonalAccessTokenRequest { user_id, name }.to_bytes();
// Build the prepare directly so the `client = 0` header skips the
// client-header validation in `prepare_request` / `Project::project`
// (the in-process path `build_prepare_message` documents).
let header = RequestHeader {
client: 0,
namespace: server_common::sharding::METADATA_CONSENSUS_NAMESPACE,
..RequestHeader::default()
};
let prepare = build_prepare_message(
consensus,
&header,
Operation::DeletePersonalAccessToken,
&body,
);

consensus.verify_pipeline();
let view_snapshot = consensus.view();
let commit_min_snapshot = consensus.commit_min();
let receiver = consensus.pipeline_message_with_subscriber(PlaneKind::Metadata, &prepare);
debug_assert!(
is_caught_up_primary(consensus),
"submit_delete_personal_access_token_in_process: gate flipped between check and dispatch"
);
self.on_replicate(prepare).await;
debug_assert!(
consensus.view() == view_snapshot && consensus.commit_min() == commit_min_snapshot,
"submit_delete_personal_access_token_in_process: view/commit_min advanced across on_replicate await"
);
let mut loopback = Vec::new();
consensus.drain_loopback_into(&mut loopback);
for message in loopback {
match message.header().command {
Command2::PrepareOk => match message.try_into_typed::<PrepareOkHeader>() {
Ok(prepare_ok) => self.on_ack(prepare_ok).await,
Err(error) => warn!(
error = %error,
"dropping malformed PrepareOk from metadata loopback queue"
),
},
command => warn!(
?command,
"dropping unexpected message from metadata loopback queue"
),
}
}

match receiver.await {
Ok(reply) => Ok(reply.header().commit),
Err(Canceled) => Err(MetadataSubmitError::Canceled),
}
}

/// Submit a replicated client request from in-process and await the
/// committed reply.
///
Expand Down Expand Up @@ -1125,7 +1238,7 @@ where
pub async fn submit_request_in_process(
&self,
message: Message<RequestHeader>,
) -> Result<Message<GenericHeader>, RegisterSubmitError> {
) -> Result<Message<GenericHeader>, MetadataSubmitError> {
let request_header = *message.header();
let client_id = request_header.client;
let session = request_header.session;
Expand All @@ -1139,9 +1252,9 @@ where
if !is_caught_up_primary(consensus) {
return Err(
if consensus.is_primary() && consensus.is_normal() && !consensus.is_syncing() {
RegisterSubmitError::NotCaughtUp
MetadataSubmitError::NotCaughtUp
} else {
RegisterSubmitError::NotPrimary
MetadataSubmitError::NotPrimary
},
);
}
Expand All @@ -1160,22 +1273,22 @@ where
reply.as_slice(),
),
)
.map_err(|_| RegisterSubmitError::Canceled);
.map_err(|_| MetadataSubmitError::Canceled);
}
PreflightOutcome::Evict(reason) => {
let ctx = EvictionContext::from_consensus(consensus);
return Ok(build_eviction_message(ctx, client_id, reason).into_generic());
}
PreflightOutcome::Drop => return Err(RegisterSubmitError::Canceled),
PreflightOutcome::Drop => return Err(MetadataSubmitError::Canceled),
}

if consensus.pipeline().borrow().is_full() {
return Err(RegisterSubmitError::PipelineFull);
return Err(MetadataSubmitError::PipelineFull);
}

let prepare = self
.prepare_request(message)
.map_err(|_| RegisterSubmitError::Canceled)?;
.map_err(|_| MetadataSubmitError::Canceled)?;

consensus.verify_pipeline();
let view_snapshot = consensus.view();
Expand Down Expand Up @@ -1211,7 +1324,7 @@ where
receiver
.await
.map(server_common::Message::into_generic)
.map_err(|Canceled| RegisterSubmitError::Canceled)
.map_err(|Canceled| MetadataSubmitError::Canceled)
}

/// Promote up to `slots_freed` buffered requests into prepares after
Expand Down Expand Up @@ -1646,7 +1759,8 @@ where
// stamps wall-clock once here so every replica's `StateHandler::apply`
// reads the same `created_at`. A `0` stamp would persist a 1970-01-01
// `created_at` on every CreateStream/CreateTopic/CreatePartitions, since
// submit_command_in_process bypasses `Project::project` and calls this
// submit_delete_personal_access_token_in_process bypasses
// `Project::project` and calls this
// helper directly. Shared `next_monotonic_timestamp` keeps the in-process
// path on the same monotonic-clock guard as the wire path.
let timestamp = consensus.next_monotonic_timestamp();
Expand Down
2 changes: 1 addition & 1 deletion core/metadata/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub mod permissioner;
pub mod stm;

// Re-export IggyMetadata for use in other modules
pub use impls::metadata::{IggyMetadata, RegisterSubmitError};
pub use impls::metadata::{IggyMetadata, MetadataSubmitError};

// Re-export MuxStateMachine for use in other modules
pub use stm::mux::MuxStateMachine;
Loading
Loading