diff --git a/.gitignore b/.gitignore index 9dd4db640..c45ee2854 100644 --- a/.gitignore +++ b/.gitignore @@ -23,3 +23,4 @@ src/protos/*.rs /.cloud_certs/ cloud_envs.fish /.claude/settings.local.json +.codex diff --git a/Cargo.toml b/Cargo.toml index d07fbeb00..87b79ccad 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,8 +14,8 @@ license = "MIT" license-file = "LICENSE.txt" [workspace.dependencies] -bon = { version = "3", features = ["implied-bounds"] } -derive_more = { version = "2.0", features = [ +bon = { version = "3", default-features = false, features = ["alloc", "implied-bounds"] } +derive_more = { version = "2.0", default-features = false, features = [ "constructor", "display", "from", @@ -24,10 +24,10 @@ derive_more = { version = "2.0", features = [ "try_into", ] } thiserror = "2" -tonic = "0.14" +tonic = { version = "0.14", default-features = false } tonic-prost = "0.14" tonic-prost-build = "0.14" -opentelemetry = { version = "0.31", features = ["metrics"] } +opentelemetry = { version = "0.31", default-features = false, features = ["metrics"] } prost = "0.14" prost-types = { version = "0.7", package = "prost-wkt-types" } pbjson = "0.9" diff --git a/crates/client/Cargo.toml b/crates/client/Cargo.toml index 22a6f400f..10c206e5a 100644 --- a/crates/client/Cargo.toml +++ b/crates/client/Cargo.toml @@ -22,7 +22,7 @@ anyhow = "1.0" async-trait = "0.1" backoff = "0.4" base64 = "0.22" -bon = "3" +bon = { version = "3", default-features = false, features = ["alloc"] } derive_more = { workspace = true } dyn-clone = "1.0" bytes = "1.10" @@ -35,12 +35,18 @@ hyper-util = "0.1.16" opentelemetry = { workspace = true, features = ["metrics"], optional = true } parking_lot = "0.12" thiserror = { workspace = true } -tokio = { version = "1.47", features = ["net", "time"] } -tonic = { workspace = true, features = ["tls-ring", "tls-native-roots"] } +tokio = { version = "1.47", default-features = false, features = [ + "io-util", + "net", + "rt", + "sync", + "time", +] } +tonic = { workspace = true, default-features = false, features = ["tls-ring", "tls-native-roots", "channel"] } tower = { version = "0.5", features = ["util"] } tracing = "0.1" url = "2.5" -uuid = { version = "1.18", features = ["v4"] } +uuid = { version = "1.18", default-features = false, features = ["v4"] } rand = "0.10" [dependencies.temporalio-common] @@ -54,6 +60,14 @@ prost = "0.14" prost-types = { workspace = true } rstest = "0.26" tempfile = "3" +tokio = { version = "1.47", default-features = false, features = [ + "io-util", + "macros", + "net", + "rt", + "sync", + "time", +] } [lints] workspace = true diff --git a/crates/common/Cargo.toml b/crates/common/Cargo.toml index 93b403843..2f7464f42 100644 --- a/crates/common/Cargo.toml +++ b/crates/common/Cargo.toml @@ -42,7 +42,7 @@ crc32fast = "1" dirs = { version = "6.0", optional = true } derive_more = { workspace = true } erased-serde = "0.4" -futures = "0.3" +futures = { version = "0.3", default-features = false, features = ["alloc"] } futures-channel = { version = "0.3", default-features = false, features = [ "std", ], optional = true } @@ -55,17 +55,18 @@ hyper-util = { version = "0.1", features = [ "tokio", ], optional = true } opentelemetry = { workspace = true, features = ["metrics"], optional = true } -opentelemetry_sdk = { version = "0.31", features = [ +opentelemetry_sdk = { version = "0.31", default-features = false, features = [ "rt-tokio", "metrics", "spec_unstable_metrics_views", ], optional = true } -opentelemetry-otlp = { version = "0.31", features = [ +opentelemetry-otlp = { version = "0.31", default-features = false, features = [ "tokio", "metrics", - "tls", + "tls-roots", "http-proto", "grpc-tonic", + "reqwest-rustls", ], optional = true } parking_lot = { version = "0.12" } prometheus = { version = "0.14", optional = true, default-features = false } @@ -77,9 +78,13 @@ ringbuf = { version = "0.4", optional = true } serde = { version = "1.0", features = ["derive"] } serde_json = { workspace = true } thiserror = { workspace = true } -tokio = { version = "1.47", features = [], optional = true } +tokio = { version = "1.47", default-features = false, features = [ + "io-util", + "net", + "rt", +], optional = true } toml = { version = "1.0", optional = true } -tonic = { workspace = true } +tonic = { workspace = true, default-features = false, features = ["transport", "codegen"] } tonic-prost = { workspace = true } tracing = "0.1" # TODO [rust-sdk-branch]: Is it reasonable to make this optional? @@ -91,7 +96,7 @@ tracing-subscriber = { version = "0.3", default-features = false, features = [ ] } tracing-core = "0.1" url = "2.5" -uuid = { version = "1.18", features = ["v4"] } +uuid = { version = "1.18", default-features = false, features = ["v4"] } pbjson = { workspace = true } [build-dependencies] diff --git a/crates/sdk-core-c-bridge/Cargo.toml b/crates/sdk-core-c-bridge/Cargo.toml index 5105b0a19..b8778d70e 100644 --- a/crates/sdk-core-c-bridge/Cargo.toml +++ b/crates/sdk-core-c-bridge/Cargo.toml @@ -30,10 +30,10 @@ rand = "0.10" rand_pcg = "0.10" serde = { version = "1.0", features = ["derive"] } serde_json = { workspace = true } -tokio = "1.47" -tokio-stream = "0.1" +tokio = { version = "1.47", default-features = false, features = ["sync"] } +tokio-stream = { version = "0.1", default-features = false } tokio-util = "0.7" -tonic = { workspace = true } +tonic = { workspace = true, default-features = false } tracing = "0.1" url = "2.5" # This is only needed as an explicit dependency so we can enable static as a feature diff --git a/crates/sdk-core/Cargo.toml b/crates/sdk-core/Cargo.toml index 01f60454a..1d193cba8 100644 --- a/crates/sdk-core/Cargo.toml +++ b/crates/sdk-core/Cargo.toml @@ -39,7 +39,6 @@ async-trait = "0.1" bon = { workspace = true } crossbeam-channel = "0.5" crossbeam-utils = "0.8" -dashmap = "6.1" derive_more = { workspace = true } enum_dispatch = "0.3" enum-iterator = "2" @@ -47,23 +46,23 @@ flate2 = { version = "1.1", optional = true } futures = { version = "0.3", default-features = false } futures-util = { version = "0.3", default-features = false } gethostname = "1.0.2" -governor = "0.10" hyper = { version = "1.7", optional = true } itertools = "0.14" lru = "0.16" mockall = "0.14" opentelemetry = { workspace = true, features = ["metrics"], optional = true } -opentelemetry_sdk = { version = "0.31", features = [ +opentelemetry_sdk = { version = "0.31", default-features = false, features = [ "rt-tokio", "metrics", "spec_unstable_metrics_views", ], optional = true } -opentelemetry-otlp = { version = "0.31", features = [ +opentelemetry-otlp = { version = "0.31", default-features = false, features = [ "tokio", "metrics", - "tls", + "tls-roots", "http-proto", "grpc-tonic", + "reqwest-rustls", ], optional = true } parking_lot = { version = "0.12" } pid = "4.0" @@ -84,20 +83,21 @@ slotmap = "1.0" sysinfo = { version = "0.38", default-features = false, features = ["system"] } tar = { version = "0.4", optional = true } thiserror = { workspace = true } -tokio = { version = "1.47", features = [ +tokio = { version = "1.47", default-features = false, features = [ "rt", "rt-multi-thread", "parking_lot", "time", "fs", "process", + "macros", ] } tokio-util = { version = "0.7", features = ["io", "io-util"] } -tokio-stream = "0.1" -tonic = { workspace = true, features = ["tls-ring", "tls-native-roots"] } +tokio-stream = { version = "0.1", default-features = false } +tonic = { workspace = true, default-features = false, features = ["tls-ring", "tls-native-roots", "transport", "codegen"] } tracing = "0.1" url = "2.5" -uuid = { version = "1.18", features = ["v4"] } +uuid = { version = "1.18", default-features = false, features = ["v4"] } # Only need specific features to decompress zip files for ephemeral server download zip = { version = "8.4", optional = true, default-features = false, features = [ "deflate", @@ -138,7 +138,7 @@ hyper-util = { version = "0.1", features = [ rstest = "0.26" semver = "1.0" temporalio-sdk = { path = "../sdk" } -tokio = { version = "1.47", features = [ +tokio = { version = "1.47", default-features = false, features = [ "rt", "rt-multi-thread", "parking_lot", @@ -146,8 +146,10 @@ tokio = { version = "1.47", features = [ "fs", "process", "test-util", + "macros", ] } -tokio-stream = { version = "0.1", features = ["net"] } +tokio-stream = { version = "0.1", default-features = false, features = ["net"] } +tonic = { workspace = true, default-features = false, features = ["router"] } tracing-subscriber = { version = "0.3", default-features = false } trybuild = { version = "1.0", features = ["diff"] } @@ -202,6 +204,7 @@ name = "workflow_replay" path = "benches/workflow_replay_bench.rs" test = false harness = false +required-features = ["test-utilities"] # The integration test runner should compile with the same configuration as the # rest of the integration tests so that artifacts are shared and no additional diff --git a/crates/sdk-core/src/pollers/poll_buffer.rs b/crates/sdk-core/src/pollers/poll_buffer.rs index adb71056a..e29b1cea5 100644 --- a/crates/sdk-core/src/pollers/poll_buffer.rs +++ b/crates/sdk-core/src/pollers/poll_buffer.rs @@ -10,7 +10,6 @@ use crate::{ use backoff::{SystemClock, backoff::Backoff, exponential::ExponentialBackoff}; use crossbeam_utils::atomic::AtomicCell; use futures_util::{FutureExt, StreamExt, future::BoxFuture}; -use governor::{Quota, RateLimiter}; use std::{ cmp, fmt::Debug, @@ -19,7 +18,7 @@ use std::{ Arc, atomic::{AtomicBool, AtomicUsize, Ordering}, }, - time::{Duration, SystemTime}, + time::{Duration, Instant, SystemTime}, }; use temporalio_client::{ ERROR_RETURNED_DUE_TO_SHORT_CIRCUIT, request_extensions::NoRetryOnMatching, @@ -45,6 +44,32 @@ use tracing::Instrument; type PollReceiver = Mutex)>>>; + +struct PollRateLimiter { + interval: Duration, + next_allowed_at: Mutex, +} + +impl PollRateLimiter { + fn new(polls_per_second: f64) -> Self { + Self { + interval: Duration::from_secs_f64(polls_per_second.recip()), + next_allowed_at: Mutex::new(Instant::now()), + } + } + + async fn wait(&self) { + let scheduled_at = { + let mut next_allowed_at = self.next_allowed_at.lock().await; + let now = Instant::now(); + let scheduled_at = (*next_allowed_at).max(now); + *next_allowed_at = scheduled_at + self.interval; + scheduled_at + }; + tokio::time::sleep_until(scheduled_at.into()).await; + } +} + pub(crate) struct LongPollBuffer { buffered_polls: PollReceiver, shutdown: CancellationToken, @@ -163,14 +188,11 @@ impl LongPollBuffer { ) -> Self { let pre_permit_delay = options .max_worker_acts_per_second - .and_then(|ps| { - Quota::with_period(Duration::from_secs_f64(ps.recip())) - .map(|q| Arc::new(RateLimiter::direct(q))) - }) + .map(|ps| Arc::new(PollRateLimiter::new(ps))) .map(|rl| { move || { let rl = rl.clone(); - async move { rl.until_ready().await }.boxed() + async move { rl.wait().await }.boxed() } }); let no_retry = if matches!(poller_behavior, PollerBehavior::Autoscaling { .. }) { diff --git a/crates/sdk-core/src/worker/activities.rs b/crates/sdk-core/src/worker/activities.rs index 71a583ece..a60806a6f 100644 --- a/crates/sdk-core/src/worker/activities.rs +++ b/crates/sdk-core/src/worker/activities.rs @@ -22,12 +22,12 @@ use crate::{ }, }; use activity_heartbeat_manager::ActivityHeartbeatManager; -use dashmap::DashMap; use futures_util::{ Stream, StreamExt, stream, stream::{BoxStream, PollNext}, }; use std::{ + collections::HashMap, convert::TryInto, future, sync::{ @@ -59,7 +59,7 @@ use tokio_stream::wrappers::UnboundedReceiverStream; use tokio_util::sync::CancellationToken; use tracing::Span; -type OutstandingActMap = Arc>; +type OutstandingActMap = Arc>>; #[derive(Debug)] struct PendingActivityCancel { @@ -191,7 +191,7 @@ impl WorkerActivityTasks { local_timeout_buffer: Duration, ) -> Self { let shutdown_initiated_token = CancellationToken::new(); - let outstanding_activity_tasks = Arc::new(DashMap::new()); + let outstanding_activity_tasks = Arc::new(parking_lot::Mutex::new(HashMap::new())); let server_poller_stream = new_activity_task_poller(poller, metrics.clone(), shutdown_initiated_token.clone()); let (eager_activities_tx, eager_activities_rx) = unbounded_channel(); @@ -320,7 +320,11 @@ impl WorkerActivityTasks { status: aer::Status, client: &dyn WorkerClient, ) { - if let Some((_, act_info)) = self.outstanding_activity_tasks.remove(&task_token) { + let act_info = { + let mut outstanding_activity_tasks = self.outstanding_activity_tasks.lock(); + outstanding_activity_tasks.remove(&task_token) + }; + if let Some(act_info) = act_info { let act_metrics = self.metrics.with_new_attrs([ activity_type(act_info.base.activity_type), workflow_type(act_info.base.workflow_type), @@ -433,12 +437,14 @@ impl WorkerActivityTasks { details: ActivityHeartbeat, ) -> Result<(), ActivityHeartbeatError> { // TODO: Propagate these back as cancels. Silent fails is too nonobvious - let at_info = self - .outstanding_activity_tasks - .get(&TaskToken(details.task_token.clone())) - .ok_or(ActivityHeartbeatError::UnknownActivity)?; - let heartbeat_timeout: Duration = at_info - .heartbeat_timeout + let (heartbeat_timeout, timeout_resetter) = { + let outstanding_activity_tasks = self.outstanding_activity_tasks.lock(); + let at_info = outstanding_activity_tasks + .get(&TaskToken(details.task_token.clone())) + .ok_or(ActivityHeartbeatError::UnknownActivity)?; + (at_info.heartbeat_timeout, at_info.timeout_resetter.clone()) + }; + let heartbeat_timeout: Duration = heartbeat_timeout // We treat None as 0 (even though heartbeat_timeout is never set to None by the server) .unwrap_or_default() .try_into() @@ -457,7 +463,7 @@ impl WorkerActivityTasks { let throttle_interval = std::cmp::min(throttle_interval, self.max_heartbeat_throttle_interval); self.heartbeat_manager - .record(details, throttle_interval, at_info.timeout_resetter.clone()) + .record(details, throttle_interval, timeout_resetter) } /// Returns a handle that the workflows management side can use to interact with this manager @@ -509,31 +515,32 @@ where // an outstanding activity task. This is fine because it means that we // no longer need to cancel this activity, so we'll just ignore such // orphaned cancellations. - if let Some(mut details) = - self.outstanding_tasks.get_mut(&next_pc.task_token) { - if details.issued_cancel_to_lang.is_some() { - // Don't double-issue cancellations - None - } else { - details.issued_cancel_to_lang = Some(next_pc.reason); - if next_pc.reason == ActivityCancelReason::NotFound - || next_pc.details.is_not_found - { - details.known_not_found = true; + let mut outstanding_tasks = self.outstanding_tasks.lock(); + if let Some(details) = outstanding_tasks.get_mut(&next_pc.task_token) { + if details.issued_cancel_to_lang.is_some() { + // Don't double-issue cancellations + None + } else { + details.issued_cancel_to_lang = Some(next_pc.reason); + if next_pc.reason == ActivityCancelReason::NotFound + || next_pc.details.is_not_found + { + details.known_not_found = true; + } + Some(Ok(ActivityTask::cancel_from_ids( + next_pc.task_token.0, + next_pc.reason, + next_pc.details, + ))) } - Some(Ok(ActivityTask::cancel_from_ids( - next_pc.task_token.0, - next_pc.reason, - next_pc.details, - ))) + } else { + debug!(task_token = %next_pc.task_token, + "Unknown activity task when issuing cancel"); + // If we can't find the activity here, it's already been completed, + // in which case issuing a cancel again is pointless. + None } - } else { - debug!(task_token = %next_pc.task_token, - "Unknown activity task when issuing cancel"); - // If we can't find the activity here, it's already been completed, - // in which case issuing a cancel again is pointless. - None } } ActivityTaskSource::PendingStart(res) => { @@ -560,14 +567,16 @@ where }; let tt: TaskToken = task.resp.task_token.clone().into(); - let outstanding_entry = self.outstanding_tasks.entry(tt.clone()); - let mut outstanding_info = - outstanding_entry.insert(RemoteInFlightActInfo::new( + self.outstanding_tasks.lock().insert( + tt.clone(), + RemoteInFlightActInfo::new( &task.resp, task.permit.into_used(ActivitySlotInfo { activity_type: activity_type_name.to_string(), }), - )); + ), + ); + // If we have already waited the grace period and issued cancels, // this will have been set true, indicating anything that happened // to be buffered/in-flight/etc should get an immediate cancel. This @@ -575,7 +584,7 @@ where // do work on polls that got received during shutdown. if should_issue_immediate_cancel.load(Ordering::Acquire) { let _ = cancels_tx.send(PendingActivityCancel::new( - tt, + tt.clone(), ActivityCancelReason::WorkerShutdown, ActivityTask::primary_reason_to_cancellation_details( ActivityCancelReason::WorkerShutdown, @@ -602,13 +611,14 @@ where if let Some((timeout_type, timeout_at)) = timeout_at { let sleep_time = timeout_at + local_timeout_buffer; let cancel_tx = cancels_tx.clone(); + let task_token = tt.clone(); let resetter = if timeout_type == HEARTBEAT_TYPE { Some(Arc::new(Notify::new())) } else { None }; let resetter_clone = resetter.clone(); - outstanding_info.local_timeouts_task = + let local_timeouts_task = Some(tokio::task::spawn(async move { if let Some(rs) = resetter_clone { loop { @@ -621,12 +631,12 @@ where tokio::time::sleep(sleep_time).await; } debug!( - task_token=%tt, + task_token=%task_token, "Timing out activity due to elapsed local \ {timeout_type} timer" ); let _ = cancel_tx.send(PendingActivityCancel::new( - tt, + task_token, ActivityCancelReason::TimedOut, ActivityCancellationDetails { is_not_found: true, @@ -635,7 +645,12 @@ where }, )); })); - outstanding_info.timeout_resetter = resetter; + if let Some(outstanding_info) = + self.outstanding_tasks.lock().get_mut(&tt) + { + outstanding_info.local_timeouts_task = local_timeouts_task; + outstanding_info.timeout_resetter = resetter; + } } } @@ -653,9 +668,14 @@ where self.shutdown_initiated_token.cancelled().await; tokio::time::sleep(gp).await; should_issue_immediate_cancel_clone.store(true, Ordering::Release); - for mapref in outstanding_tasks_clone.iter() { + for task_token in outstanding_tasks_clone + .lock() + .keys() + .cloned() + .collect::>() + { let _ = self.cancels_tx.send(PendingActivityCancel::new( - mapref.key().clone(), + task_token, ActivityCancelReason::WorkerShutdown, ActivityTask::primary_reason_to_cancellation_details( ActivityCancelReason::WorkerShutdown, @@ -667,7 +687,10 @@ where join!( async { self.start_tasks_stream_complete.cancelled().await; - while !outstanding_tasks_clone.is_empty() { + while { + let outstanding_tasks = outstanding_tasks_clone.lock(); + !outstanding_tasks.is_empty() + } { self.complete_notify.notified().await } // If we were waiting for the grace period but everything already finished, diff --git a/crates/sdk-core/src/worker/workflow/machines/transition_coverage.rs b/crates/sdk-core/src/worker/workflow/machines/transition_coverage.rs index 15942bf44..b461d8dd6 100644 --- a/crates/sdk-core/src/worker/workflow/machines/transition_coverage.rs +++ b/crates/sdk-core/src/worker/workflow/machines/transition_coverage.rs @@ -3,8 +3,8 @@ //! in stable Rust. Don't do the things in here. They're bad. This is test only code, and should //! never ever be removed from behind `#[cfg(test)]` compilation. -use dashmap::{DashMap, DashSet, mapref::entry::Entry}; use std::{ + collections::{HashMap, HashSet}, path::PathBuf, sync::{ LazyLock, Mutex, @@ -15,8 +15,8 @@ use std::{ }; // During test we want to know about which transitions we've covered in state machines -static COVERED_TRANSITIONS: LazyLock>> = - LazyLock::new(DashMap::new); +static COVERED_TRANSITIONS: LazyLock>>> = + LazyLock::new(|| Mutex::new(HashMap::new())); static COVERAGE_SENDER: LazyLock> = LazyLock::new(spawn_save_coverage_at_end); static THREAD_HANDLE: LazyLock>>> = LazyLock::new(|| Mutex::new(None)); @@ -49,18 +49,12 @@ fn spawn_save_coverage_at_end() -> SyncSender<(String, CoveredTransition)> { // last second that we are probably done running all the tests. This is to avoid // needing to instrument every single test. while let Ok((machine_name, ct)) = rx.recv_timeout(Duration::from_secs(1)) { - match COVERED_TRANSITIONS.entry(machine_name) { - Entry::Occupied(o) => { - o.get().insert(ct); - } - Entry::Vacant(v) => { - v.insert({ - let ds = DashSet::new(); - ds.insert(ct); - ds - }); - } - } + COVERED_TRANSITIONS + .lock() + .unwrap() + .entry(machine_name) + .or_default() + .insert(ct); } }); *THREAD_HANDLE.lock().unwrap() = Some(handle); @@ -124,8 +118,7 @@ mod machine_coverage_report { // This isn't at all efficient but doesn't need to be. // Replace transitions in the vizzes with green color if they are covered. - for item in COVERED_TRANSITIONS.iter() { - let (machine, coverage) = item.pair(); + for (machine, coverage) in COVERED_TRANSITIONS.lock().unwrap().iter() { match machine.as_ref() { m @ "ActivityMachine" => cover_transitions(m, &mut activity, coverage), m @ "TimerMachine" => cover_transitions(m, &mut timer, coverage), @@ -153,7 +146,7 @@ mod machine_coverage_report { } } - fn cover_transitions(machine: &str, viz: &mut String, cov: &DashSet) { + fn cover_transitions(machine: &str, viz: &mut String, cov: &HashSet) { for trans in cov.iter() { let find_line = format!( "{} --> {}: {}", diff --git a/crates/sdk/Cargo.toml b/crates/sdk/Cargo.toml index 9e5aca1ea..ca66bfd7f 100644 --- a/crates/sdk/Cargo.toml +++ b/crates/sdk/Cargo.toml @@ -25,17 +25,18 @@ parking_lot = { version = "0.12" } prost-types = { workspace = true } serde = "1.0" thiserror = "2" -tokio = { version = "1.47", features = [ +tokio = { version = "1.47", default-features = false, features = [ "rt", "rt-multi-thread", "parking_lot", "time", "fs", + "macros", ] } tokio-util = { version = "0.7" } -tokio-stream = "0.1" +tokio-stream = { version = "0.1", default-features = false } tracing = "0.1" -uuid = { version = "1.18", features = ["v4"] } +uuid = { version = "1.18", default-features = false, features = ["v4"] } [dependencies.temporalio-sdk-core] path = "../sdk-core"