Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions src/adapter/src/active_compute_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,11 @@ use mz_ore::now::EpochMillis;
use mz_repr::adt::numeric;
use mz_repr::{Datum, GlobalId, IntoRowIterator, Row, Timestamp};
use mz_sql::plan::SubscribeOutput;
use mz_sql::session::metadata::SessionMetadata;
use timely::progress::Antichain;
use tokio::sync::mpsc;
use tokio::sync::{mpsc, oneshot};

use crate::coord::peek::PeekResponseUnary;
use crate::{AdapterError, ExecuteContext, ExecuteResponse};
use crate::{AdapterError, ExecuteResponse};

#[derive(Debug)]
/// A description of an active compute sink from the coordinator's perspective.
Expand All @@ -53,7 +52,7 @@ impl ActiveComputeSink {
pub fn connection_id(&self) -> &ConnectionId {
match &self {
ActiveComputeSink::Subscribe(subscribe) => &subscribe.conn_id,
ActiveComputeSink::CopyTo(copy_to) => copy_to.ctx.session().conn_id(),
ActiveComputeSink::CopyTo(copy_to) => &copy_to.conn_id,
}
}

Expand Down Expand Up @@ -391,9 +390,10 @@ impl ActiveSubscribe {
/// A description of an active copy to sink from the coordinator's perspective.
#[derive(Debug)]
pub struct ActiveCopyTo {
/// The execution context for the `COPY ... TO` statement that created the
/// copy to sink.
pub ctx: ExecuteContext,
/// The ID of the connection which created the subscribe.
pub conn_id: ConnectionId,
/// The result channel for the `COPY ... TO` statement that created the copy to sink.
pub tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
/// The ID of the cluster on which the copy to is running.
pub cluster_id: ClusterId,
/// The IDs of the objects on which the copy to depends.
Expand All @@ -414,7 +414,7 @@ impl ActiveCopyTo {
Ok(n) => Ok(ExecuteResponse::Copied(usize::cast_from(n))),
Err(error) => Err(AdapterError::Unstructured(error)),
};
let _ = self.ctx.retire(response);
let _ = self.tx.send(response);
}

/// Retires the copy to with the specified reason.
Expand All @@ -429,6 +429,6 @@ impl ActiveCopyTo {
anyhow!("copy has been terminated because underlying {d} was dropped"),
)),
};
let _ = self.ctx.retire(message);
let _ = self.tx.send(message);
}
}
80 changes: 2 additions & 78 deletions src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ use mz_storage_client::controller::{CollectionDescription, DataSource, DataSourc
use mz_storage_types::connections::inline::{IntoInlineConnection, ReferencedConnection};
use mz_storage_types::connections::Connection as StorageConnection;
use mz_storage_types::connections::ConnectionContext;
use mz_storage_types::controller::{StorageError, TxnWalTablesImpl};
use mz_storage_types::controller::TxnWalTablesImpl;
use mz_storage_types::sinks::S3SinkFormat;
use mz_storage_types::sources::Timeline;
use mz_timestamp_oracle::WriteTimestamp;
Expand Down Expand Up @@ -235,11 +235,6 @@ pub enum Message<T = mz_repr::Timestamp> {
StorageUsageSchedule,
StorageUsageFetch,
StorageUsageUpdate(ShardsUsageReferenced),
RealTimeRecencyTimestamp {
conn_id: ConnectionId,
real_time_recency_ts: Result<Timestamp, StorageError<T>>,
validity: PlanValidity,
},

/// Performs any cleanup and logging actions necessary for
/// finalizing a statement execution.
Expand All @@ -256,7 +251,7 @@ pub enum Message<T = mz_repr::Timestamp> {
},
PeekStageReady {
ctx: ExecuteContext,
otel_ctx: OpenTelemetryContext,
span: Span,
stage: PeekStage,
},
CreateIndexStageReady {
Expand Down Expand Up @@ -334,7 +329,6 @@ impl Message {
Message::StorageUsageSchedule => "storage_usage_schedule",
Message::StorageUsageFetch => "storage_usage_fetch",
Message::StorageUsageUpdate(_) => "storage_usage_update",
Message::RealTimeRecencyTimestamp { .. } => "real_time_recency_timestamp",
Message::RetireExecute { .. } => "retire_execute",
Message::ExecuteSingleStatementTransaction { .. } => {
"execute_single_statement_transaction"
Expand Down Expand Up @@ -385,33 +379,9 @@ pub struct ValidationReady<T> {
pub type CreateConnectionValidationReady = ValidationReady<CreateConnectionPlan>;
pub type AlterConnectionValidationReady = ValidationReady<Connection>;

#[derive(Debug)]
pub enum RealTimeRecencyContext {
Peek {
ctx: ExecuteContext,
plan: mz_sql::plan::SelectPlan,
root_otel_ctx: OpenTelemetryContext,
target_replica: Option<ReplicaId>,
timeline_context: TimelineContext,
oracle_read_ts: Option<Timestamp>,
source_ids: BTreeSet<GlobalId>,
optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
explain_ctx: ExplainContext,
},
}

impl RealTimeRecencyContext {
pub(crate) fn take_context(self) -> ExecuteContext {
match self {
RealTimeRecencyContext::Peek { ctx, .. } => ctx,
}
}
}

#[derive(Debug)]
pub enum PeekStage {
/// Common stages across SELECT, EXPLAIN and COPY TO queries.
Validate(PeekStageValidate),
LinearizeTimestamp(PeekStageLinearizeTimestamp),
RealTimeRecency(PeekStageRealTimeRecency),
TimestampReadHold(PeekStageTimestampReadHold),
Expand All @@ -425,24 +395,6 @@ pub enum PeekStage {
CopyTo(PeekStageCopyTo),
}

impl PeekStage {
fn validity(&mut self) -> Option<&mut PlanValidity> {
match self {
PeekStage::Validate(_) => None,
PeekStage::LinearizeTimestamp(PeekStageLinearizeTimestamp { validity, .. })
| PeekStage::RealTimeRecency(PeekStageRealTimeRecency { validity, .. })
| PeekStage::TimestampReadHold(PeekStageTimestampReadHold { validity, .. })
| PeekStage::Optimize(PeekStageOptimize { validity, .. })
| PeekStage::Finish(PeekStageFinish { validity, .. })
| PeekStage::CopyTo(PeekStageCopyTo { validity, .. })
| PeekStage::ExplainPlan(PeekStageExplainPlan { validity, .. })
| PeekStage::ExplainPushdown(PeekStageExplainPushdown { validity, .. }) => {
Some(validity)
}
}
}
}

#[derive(Debug)]
pub struct CopyToContext {
/// The `RelationDesc` of the data to be copied.
Expand All @@ -464,21 +416,6 @@ pub struct CopyToContext {
pub output_batch_count: Option<u64>,
}

#[derive(Debug)]
pub struct PeekStageValidate {
plan: mz_sql::plan::SelectPlan,
target_cluster: TargetCluster,
/// An optional context set iff the state machine is initiated from
/// sequencing a COPY TO statement.
///
/// Will result in creating and using [`optimize::copy_to::Optimizer`] in
/// the `optimizer` field of all subsequent stages.
copy_to_ctx: Option<CopyToContext>,
/// An optional context set iff the state machine is initiated from
/// sequencing an EXPLAIN for this statement.
explain_ctx: ExplainContext,
}

#[derive(Debug)]
pub struct PeekStageLinearizeTimestamp {
validity: PlanValidity,
Expand Down Expand Up @@ -1626,9 +1563,6 @@ pub struct Coordinator {
/// A map from client connection ids to a set of all pending peeks for that client.
client_pending_peeks: BTreeMap<ConnectionId, BTreeMap<Uuid, ClusterId>>,

/// A map from client connection ids to a pending real time recency timestamps.
pending_real_time_recency_timestamp: BTreeMap<ConnectionId, RealTimeRecencyContext>,

/// A map from client connection ids to pending linearize read transaction.
pending_linearize_read_txns: BTreeMap<ConnectionId, PendingReadTxn>,

Expand Down Expand Up @@ -3162,11 +3096,6 @@ impl Coordinator {
(id.to_string(), peek)
})
.collect();
let pending_real_time_recency_timestamp: BTreeMap<_, _> = self
.pending_real_time_recency_timestamp
.iter()
.map(|(id, timestamp)| (id.unhandled().to_string(), format!("{timestamp:?}")))
.collect();
let pending_linearize_read_txns: BTreeMap<_, _> = self
.pending_linearize_read_txns
.iter()
Expand Down Expand Up @@ -3198,10 +3127,6 @@ impl Coordinator {
"client_pending_peeks".to_string(),
serde_json::to_value(client_pending_peeks)?,
),
(
"pending_real_time_recency_timestamp".to_string(),
serde_json::to_value(pending_real_time_recency_timestamp)?,
),
(
"pending_linearize_read_txns".to_string(),
serde_json::to_value(pending_linearize_read_txns)?,
Expand Down Expand Up @@ -3481,7 +3406,6 @@ pub fn serve(
txn_read_holds: Default::default(),
pending_peeks: BTreeMap::new(),
client_pending_peeks: BTreeMap::new(),
pending_real_time_recency_timestamp: BTreeMap::new(),
pending_linearize_read_txns: BTreeMap::new(),
active_compute_sinks: BTreeMap::new(),
active_webhooks: BTreeMap::new(),
Expand Down
8 changes: 0 additions & 8 deletions src/adapter/src/coord/command_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1004,14 +1004,6 @@ impl Coordinator {
}
}

// Cancel commands waiting on a real time recency timestamp. There is at most one per session.
if let Some(real_time_recency_context) =
self.pending_real_time_recency_timestamp.remove(&conn_id)
{
let ctx = real_time_recency_context.take_context();
maybe_ctx = Some(ctx);
}

// Cancel reads waiting on being linearized. There is at most one linearized read per
// session.
if let Some(pending_read_txn) = self.pending_linearize_read_txns.remove(&conn_id) {
Expand Down
110 changes: 4 additions & 106 deletions src/adapter/src/coord/message_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use std::time::{Duration, Instant};
use futures::future::LocalBoxFuture;
use futures::FutureExt;
use maplit::btreemap;
use mz_adapter_types::connection::ConnectionId;
use mz_catalog::memory::objects::ClusterReplicaProcessStatus;
use mz_controller::clusters::{ClusterEvent, ClusterStatus};
use mz_controller::ControllerResponse;
Expand All @@ -26,10 +25,9 @@ use mz_ore::task;
use mz_ore::tracing::OpenTelemetryContext;
use mz_persist_client::usage::ShardsUsageReferenced;
use mz_sql::ast::Statement;
use mz_sql::catalog::SessionCatalog;
use mz_sql::names::ResolvedIds;
use mz_sql::pure::PurifiedStatement;
use mz_storage_types::controller::{CollectionMetadata, StorageError};
use mz_storage_types::controller::CollectionMetadata;
use opentelemetry::trace::TraceContextExt;
use rand::{rngs, Rng, SeedableRng};
use serde_json::json;
Expand All @@ -41,8 +39,7 @@ use crate::command::Command;
use crate::coord::appends::Deferred;
use crate::coord::{
AlterConnectionValidationReady, ClusterReplicaStatuses, Coordinator,
CreateConnectionValidationReady, Message, PeekStage, PeekStageTimestampReadHold, PlanValidity,
PurifiedStatementReady, RealTimeRecencyContext, WatchSetResponse,
CreateConnectionValidationReady, Message, PurifiedStatementReady, WatchSetResponse,
};
use crate::session::Session;
use crate::telemetry::{EventDetails, SegmentClientExt};
Expand Down Expand Up @@ -133,18 +130,6 @@ impl Coordinator {
Message::StorageUsageUpdate(sizes) => {
self.storage_usage_update(sizes).await;
}
Message::RealTimeRecencyTimestamp {
conn_id,
real_time_recency_ts,
validity,
} => {
self.message_real_time_recency_timestamp(
conn_id,
real_time_recency_ts,
validity,
)
.await;
}
Message::RetireExecute {
otel_ctx,
data,
Expand All @@ -165,11 +150,10 @@ impl Coordinator {
}
Message::PeekStageReady {
ctx,
otel_ctx,
span,
stage,
} => {
otel_ctx.attach_as_parent();
self.execute_peek_stage(ctx, otel_ctx, stage).await;
self.sequence_staged(ctx, span, stage).await;
}
Message::CreateIndexStageReady {
ctx,
Expand Down Expand Up @@ -901,90 +885,4 @@ impl Coordinator {
});
}
}

#[mz_ore::instrument(level = "debug")]
/// Finishes sequencing a command that was waiting on a real time recency timestamp.
async fn message_real_time_recency_timestamp(
&mut self,
conn_id: ConnectionId,
real_time_recency_ts: Result<mz_repr::Timestamp, StorageError<mz_repr::Timestamp>>,
mut validity: PlanValidity,
) {
let real_time_recency_context =
match self.pending_real_time_recency_timestamp.remove(&conn_id) {
Some(real_time_recency_context) => real_time_recency_context,
// Query was cancelled while waiting.
None => return,
};

let real_time_recency_ts = match real_time_recency_ts {
Ok(rtr) => rtr,
Err(e) => {
let ctx = real_time_recency_context.take_context();
let e = match e {
// TODO: we should be able to generalize this conversion
// from `GlobalId` to minimally qualified name string.
StorageError::RtrTimeout(id) => {
let session = ctx.session();
let conn_catalog = self.catalog().for_session(session);
let name = conn_catalog
.minimal_qualification(conn_catalog.get_item(&id).name())
.to_string();
crate::AdapterError::RtrTimeout(name)
}
// TODO: we should be able to generalize this conversion
// from `GlobalId` to minimally qualified name string.
StorageError::RtrDropFailure(id) => {
let session = ctx.session();
let conn_catalog = self.catalog().for_session(session);
let name = conn_catalog
.minimal_qualification(conn_catalog.get_item(&id).name())
.to_string();
crate::AdapterError::RtrDropFailure(name)
}
e => e.into(),
};

ctx.retire(Err(e));
return;
}
};

if let Err(err) = validity.check(self.catalog()) {
let ctx = real_time_recency_context.take_context();
ctx.retire(Err(err));
return;
}

match real_time_recency_context {
RealTimeRecencyContext::Peek {
ctx,
root_otel_ctx,
plan,
target_replica,
timeline_context,
oracle_read_ts,
source_ids,
optimizer,
explain_ctx,
} => {
self.execute_peek_stage(
ctx,
root_otel_ctx,
PeekStage::TimestampReadHold(PeekStageTimestampReadHold {
validity,
plan,
target_replica,
timeline_context,
oracle_read_ts,
source_ids,
real_time_recency_ts: Some(real_time_recency_ts),
optimizer,
explain_ctx,
}),
)
.await;
}
}
}
}
Loading