diff --git a/crates/sdk-core-c-bridge/src/worker.rs b/crates/sdk-core-c-bridge/src/worker.rs index a973e6f77..1c44e8be8 100644 --- a/crates/sdk-core-c-bridge/src/worker.rs +++ b/crates/sdk-core-c-bridge/src/worker.rs @@ -930,6 +930,7 @@ pub extern "C" fn temporal_core_worker_request_workflow_eviction( #[unsafe(no_mangle)] pub extern "C" fn temporal_core_worker_initiate_shutdown(worker: *mut Worker) { let worker = unsafe { &*worker }; + enter_sync!(worker.runtime); worker.worker.as_ref().unwrap().initiate_shutdown(); } diff --git a/crates/sdk-core/src/core_tests/workers.rs b/crates/sdk-core/src/core_tests/workers.rs index 8523dcafd..3d27d9ccd 100644 --- a/crates/sdk-core/src/core_tests/workers.rs +++ b/crates/sdk-core/src/core_tests/workers.rs @@ -1209,3 +1209,156 @@ async fn nexus_start_operation_failure_converts_to_legacy_for_old_server( worker.shutdown().await; worker.finalize_shutdown().await; } + +/// Verifies that `initiate_shutdown` sends the `ShutdownWorker` RPC so that the server can +/// complete in-flight polls. Without this, graceful poll shutdown deadlocks: the SDK waits for +/// polls to drain, but the server was never told to flush them. +#[tokio::test] +async fn graceful_shutdown_sends_shutdown_worker_rpc_during_initiate() { + use std::sync::{ + Arc, + atomic::{AtomicBool, Ordering}, + }; + use temporalio_common::protos::temporal::api::{ + namespace::v1::{NamespaceInfo, namespace_info::Capabilities}, + workflowservice::v1::DescribeNamespaceResponse, + }; + use tokio::sync::Notify; + + let shutdown_rpc_called = Arc::new(AtomicBool::new(false)); + let shutdown_rpc_called_clone = shutdown_rpc_called.clone(); + // When the shutdown_worker RPC fires, it signals polls to complete (simulating server + // behavior where ShutdownWorker causes the server to return empty poll responses). + let poll_releaser = Arc::new(Notify::new()); + let poll_releaser_for_rpc = poll_releaser.clone(); + + let mut mock_client = MockWorkerClient::new(); + mock_client + .expect_capabilities() + .returning(|| Some(*DEFAULT_TEST_CAPABILITIES)); + mock_client + .expect_workers() + .returning(|| DEFAULT_WORKERS_REGISTRY.clone()); + mock_client.expect_is_mock().returning(|| true); + mock_client + .expect_sdk_name_and_version() + .returning(|| ("test-core".to_string(), "0.0.0".to_string())); + mock_client + .expect_identity() + .returning(|| "test-identity".to_string()); + mock_client + .expect_worker_grouping_key() + .returning(Uuid::new_v4); + mock_client + .expect_worker_instance_key() + .returning(Uuid::new_v4); + mock_client + .expect_set_heartbeat_client_fields() + .returning(|hb| { + hb.sdk_name = "test-core".to_string(); + hb.sdk_version = "0.0.0".to_string(); + hb.worker_identity = "test-identity".to_string(); + hb.heartbeat_time = Some(std::time::SystemTime::now().into()); + }); + // Return the worker_poll_complete_on_shutdown capability so validate() enables graceful mode + mock_client.expect_describe_namespace().returning(move || { + Ok(DescribeNamespaceResponse { + namespace_info: Some(NamespaceInfo { + capabilities: Some(Capabilities { + worker_poll_complete_on_shutdown: true, + ..Capabilities::default() + }), + ..NamespaceInfo::default() + }), + ..DescribeNamespaceResponse::default() + }) + }); + // When shutdown_worker RPC is called, mark it and release polls + mock_client + .expect_shutdown_worker() + .returning(move |_, _, _, _| { + shutdown_rpc_called_clone.store(true, Ordering::SeqCst); + poll_releaser_for_rpc.notify_waiters(); + Ok(ShutdownWorkerResponse {}) + }); + mock_client + .expect_complete_workflow_task() + .returning(|_| Ok(RespondWorkflowTaskCompletedResponse::default())); + + // Polls block until shutdown_worker RPC releases them (simulating server holding polls + // open until it receives the ShutdownWorker signal) + let poll_releaser_for_stream = poll_releaser.clone(); + let stream = stream::unfold(poll_releaser_for_stream, |releaser| async move { + releaser.notified().await; + Some(( + Ok(PollWorkflowTaskQueueResponse::default().try_into().unwrap()), + releaser, + )) + }); + + let mw = MockWorkerInputs::new(stream.boxed()); + let worker = mock_worker(MocksHolder::from_mock_worker(mock_client, mw)); + + // validate() reads describe_namespace and sets graceful_poll_shutdown = true + worker.validate().await.unwrap(); + + let poll_fut = worker.poll_workflow_activation(); + let shutdown_fut = async { + // initiate_shutdown must send the ShutdownWorker RPC, which releases the polls + worker.initiate_shutdown(); + }; + + let (poll_result, _) = tokio::time::timeout(Duration::from_secs(5), async { + tokio::join!(poll_fut, shutdown_fut) + }) + .await + .expect("Shutdown should complete within 5s -- if it hangs, the ShutdownWorker RPC was not sent during initiate_shutdown"); + + assert_matches!(poll_result.unwrap_err(), PollError::ShutDown); + assert!( + shutdown_rpc_called.load(Ordering::SeqCst), + "ShutdownWorker RPC must be called during initiate_shutdown" + ); + + worker.finalize_shutdown().await; +} + +/// The all_permits_tracker timeout must accommodate the 5s TEMP_FIX graceful poll timeout. +/// When a poll holds a permit during the graceful shutdown window, all_permits_tracker must +/// not fire before the permit is released. This test acquires a permit, holds it for 3s +/// (simulating a poll blocked during graceful shutdown), and verifies shutdown() completes +/// without dbg_panic!. +#[tokio::test] +async fn all_permits_tracker_timeout_accommodates_graceful_poll_delay() { + use crate::abstractions::tests::fixed_size_permit_dealer; + use crate::worker::WorkflowSlotKind; + + let dealer = fixed_size_permit_dealer::(5); + let rcv = dealer.get_extant_count_rcv(); + + // Acquire a permit (simulating a LongPollBuffer poll task holding one) + let permit = dealer.acquire_owned().await; + + // Start the same wait logic as shutdown()'s all_permits_tracker + // This must match the timeout in Worker::shutdown()'s all_permits_tracker select! + let wait_result = tokio::time::timeout(Duration::from_secs(6), async { + let mut rcv = rcv; + let _ = rcv.wait_for(|x| *x == 0).await; + }); + + // Release the permit after 3s (simulating the graceful poll timeout releasing it) + let release_task = tokio::spawn(async move { + tokio::time::sleep(Duration::from_secs(3)).await; + drop(permit); + }); + + let result = wait_result.await; + release_task.await.unwrap(); + + // With the 6s timeout, this should succeed (permit released at 3s < 6s). + // With the old 1s timeout, this would fail (permit released at 3s > 1s). + assert!( + result.is_ok(), + "all_permits_tracker should complete within 6s when permit is held for 3s" + ); +} diff --git a/crates/sdk-core/src/pollers/mod.rs b/crates/sdk-core/src/pollers/mod.rs index e57d040e5..0a1e2f616 100644 --- a/crates/sdk-core/src/pollers/mod.rs +++ b/crates/sdk-core/src/pollers/mod.rs @@ -157,6 +157,11 @@ where return match state.poller.poll().await { Some(Ok((task, permit))) => { if task == Default::default() { + if state.poller_was_shutdown { + // Server sent an empty response after we initiated + // shutdown — this is the graceful shutdown signal. + return None; + } // We get the default proto in the event that the long poll // times out. debug!("Poll {} task timeout", T::task_name()); @@ -276,3 +281,93 @@ pub(crate) fn new_nexus_task_poller( ) .into_stream() } + +#[cfg(test)] +mod tests { + use super::*; + use crate::{ + abstractions::tests::fixed_size_permit_dealer, pollers::MockPermittedPollBuffer, + test_help::mock_poller, worker::ActivitySlotKind, + }; + use futures_util::{StreamExt, pin_mut}; + use std::sync::{ + Arc, + atomic::{AtomicUsize, Ordering}, + }; + + /// Verify that empty responses after shutdown are not treated as poll timeout and retried + /// indefinitely + #[tokio::test] + async fn empty_response_after_shutdown_terminates_stream() { + let poll_count = Arc::new(AtomicUsize::new(0)); + let poll_count_clone = poll_count.clone(); + + let mut mock_poller = mock_poller(); + mock_poller.expect_poll().returning(move || { + poll_count_clone.fetch_add(1, Ordering::SeqCst); + Some(Ok(PollActivityTaskQueueResponse::default())) + }); + + let sem = Arc::new(fixed_size_permit_dealer::(10)); + let shutdown_token = CancellationToken::new(); + + let stream = new_activity_task_poller( + Box::new(MockPermittedPollBuffer::new(sem, mock_poller)), + MetricsContext::no_op(), + shutdown_token.clone(), + ); + pin_mut!(stream); + + shutdown_token.cancel(); + + let result = tokio::time::timeout(std::time::Duration::from_secs(2), stream.next()).await; + assert!( + result.is_ok(), + "Stream should terminate promptly after shutdown, not hang" + ); + assert!( + result.unwrap().is_none(), + "Stream should return None (terminated) on empty response after shutdown" + ); + + let total = poll_count.load(Ordering::SeqCst); + assert!( + total < 5, + "Expected stream to terminate quickly, but poller was called {total} times" + ); + } + + #[tokio::test] + async fn empty_response_before_shutdown_retries() { + let mut mock_poller = mock_poller(); + let call_count = Arc::new(AtomicUsize::new(0)); + let call_count_clone = call_count.clone(); + mock_poller.expect_poll().returning(move || { + let n = call_count_clone.fetch_add(1, Ordering::SeqCst); + if n < 2 { + Some(Ok(PollActivityTaskQueueResponse::default())) + } else { + None + } + }); + + let sem = Arc::new(fixed_size_permit_dealer::(10)); + let shutdown_token = CancellationToken::new(); + + let stream = new_activity_task_poller( + Box::new(MockPermittedPollBuffer::new(sem, mock_poller)), + MetricsContext::no_op(), + shutdown_token, + ); + pin_mut!(stream); + + // Without shutdown, empty responses should be skipped and the stream terminates + // only when the poller returns None. + let result = stream.next().await; + assert!( + result.is_none(), + "Stream should end when poller returns None" + ); + assert_eq!(call_count.load(Ordering::SeqCst), 3); + } +} diff --git a/crates/sdk-core/src/pollers/poll_buffer.rs b/crates/sdk-core/src/pollers/poll_buffer.rs index adb71056a..5a4cfb67d 100644 --- a/crates/sdk-core/src/pollers/poll_buffer.rs +++ b/crates/sdk-core/src/pollers/poll_buffer.rs @@ -370,7 +370,18 @@ where let shutdown_clone = shutdown.clone(); let r = if graceful_shutdown.load(Ordering::Relaxed) { - pf(timeout_override).await + // TEMP_FIX: Give the server a reasonable window to + // complete the poll after ShutdownWorker. Fall back + // to cancelling the poll if it takes too long, to + // avoid a 60s hang due to a server-side race + // (temporalio/temporal#9545). + let graceful_interruptor = shutdown_clone + .cancelled() + .then(|_| tokio::time::sleep(Duration::from_secs(5))); + tokio::select! { + r = pf(timeout_override) => r, + _ = graceful_interruptor => return, + } } else { let poll_interruptor = shutdown.cancelled().then(|_| async move { if let Some(w) = poll_shutdown_interrupt_wait { diff --git a/crates/sdk-core/src/worker/mod.rs b/crates/sdk-core/src/worker/mod.rs index 34d3414b5..1b0f010e8 100644 --- a/crates/sdk-core/src/worker/mod.rs +++ b/crates/sdk-core/src/worker/mod.rs @@ -64,7 +64,7 @@ use anyhow::bail; use crossbeam_utils::atomic::AtomicCell; use futures_util::{StreamExt, stream}; use gethostname::gethostname; -use parking_lot::RwLock; +use parking_lot::{Mutex, RwLock}; use slot_provider::SlotProvider; use std::{ any::Any, @@ -431,6 +431,8 @@ pub struct Worker { /// Set during validate() when the namespace has the poller_autoscaling capability, /// enabling scale-down on poll timeout even without an explicit scaling decision. poller_autoscaling: Arc, + /// Handle for the spawned ShutdownWorker RPC task, awaited during shutdown. + shutdown_rpc_handle: Mutex>>, } struct AllPermitsTracker { @@ -927,6 +929,7 @@ impl Worker { status: worker_status, graceful_poll_shutdown, poller_autoscaling, + shutdown_rpc_handle: Mutex::new(None), }) } @@ -944,43 +947,12 @@ impl Worker { /// [Worker::finalize_shutdown]. pub async fn shutdown(&self) { self.initiate_shutdown(); - { - *self.status.write() = WorkerStatus::ShuttingDown; - } - let heartbeat = self - .client_worker_registrator - .heartbeat_manager - .as_ref() - .map(|hm| hm.heartbeat_callback.clone()()); - let sticky_name = self - .workflows - .as_ref() - .and_then(|wf| wf.get_sticky_queue_name()) - .unwrap_or_default(); - // This is a best effort call and we can still shutdown the worker if it fails - let task_queue_types = self.config.task_types.to_task_queue_types(); - match self - .client - .shutdown_worker( - sticky_name, - self.config.task_queue.clone(), - task_queue_types, - heartbeat, - ) - .await - { - Err(err) - if !matches!( - err.code(), - tonic::Code::Unimplemented | tonic::Code::Unavailable - ) => - { - warn!( - "shutdown_worker rpc errored during worker shutdown: {:?}", - err - ); - } - _ => {} + + // Ensure the ShutdownWorker RPC completes before waiting for polls to drain, + // otherwise graceful poll shutdown deadlocks. + let handle = self.shutdown_rpc_handle.lock().take(); + if let Some(handle) = handle { + let _ = handle.await; } // We need to wait for all local activities to finish so no more workflow task heartbeats @@ -1006,7 +978,8 @@ impl Worker { // Wait for all permits to be released, but don't totally hang real-world shutdown. tokio::select! { _ = async { self.all_permits_tracker.lock().await.all_done().await } => {}, - _ = tokio::time::sleep(Duration::from_secs(1)) => { + // TEMP: Shutdown can take 5s longer due to TEMP_FIX in poll_buffer.rs + _ = tokio::time::sleep(Duration::from_secs(6)) => { dbg_panic!("Waiting for all slot permits to release took too long!"); } } @@ -1375,8 +1348,10 @@ impl Worker { &self.config } - /// Initiate shutdown. See [Worker::shutdown], this is just a sync version that starts the - /// process. You can then wait on `shutdown` or [Worker::finalize_shutdown]. + /// Initiate shutdown, including spawning the `ShutdownWorker` RPC so the server can complete + /// in-flight polls. The RPC runs in a background task and is awaited in [Worker::shutdown]. + /// + /// You can then wait on `shutdown` or [Worker::finalize_shutdown]. pub fn initiate_shutdown(&self) { if !self.shutdown_token.is_cancelled() { info!( @@ -1385,6 +1360,7 @@ impl Worker { "Initiated shutdown", ); } + let already_initiated_shutdown = self.shutdown_token.is_cancelled(); self.shutdown_token.cancel(); { *self.status.write() = WorkerStatus::ShuttingDown; @@ -1419,6 +1395,47 @@ impl Worker { la_mgr.workflows_have_shutdown(); } } + + // Spawn the ShutdownWorker RPC so the server can complete in-flight polls. + // The handle is stored and awaited in shutdown() to ensure completion. + let mut guard = self.shutdown_rpc_handle.lock(); + if guard.is_some() || already_initiated_shutdown { + return; + } + + let client = self.client.clone(); + let sticky_name = self + .workflows + .as_ref() + .and_then(|wf| wf.get_sticky_queue_name()) + .unwrap_or_default(); + let task_queue = self.config.task_queue.clone(); + let task_queue_types = self.config.task_types.to_task_queue_types(); + let heartbeat = self + .client_worker_registrator + .heartbeat_manager + .as_ref() + .map(|hm| hm.heartbeat_callback.clone()()); + let handle = tokio::spawn(async move { + match client + .shutdown_worker(sticky_name, task_queue, task_queue_types, heartbeat) + .await + { + Err(err) + if !matches!( + err.code(), + tonic::Code::Unimplemented | tonic::Code::Unavailable + ) => + { + warn!( + "shutdown_worker rpc errored during worker shutdown: {:?}", + err + ); + } + _ => {} + } + }); + *guard = Some(handle); } /// Unique identifier for this worker instance. diff --git a/crates/sdk-core/src/worker/workflow/wft_poller.rs b/crates/sdk-core/src/worker/workflow/wft_poller.rs index 4d5ede154..2f419392f 100644 --- a/crates/sdk-core/src/worker/workflow/wft_poller.rs +++ b/crates/sdk-core/src/worker/workflow/wft_poller.rs @@ -86,7 +86,11 @@ pub(crate) fn make_wft_poller( wf_task_poll_buffer, sticky_queue_poller, )); - new_wft_poller(wf_task_poll_buffer, metrics.clone()) + new_wft_poller( + wf_task_poll_buffer, + metrics.clone(), + shutdown_token.child_token(), + ) } /// Info that needs to be shared across the sticky and non-sticky wft pollers to prioritize sticky @@ -195,6 +199,7 @@ impl WFTPollerShared { fn new_wft_poller( poller: BoxedWFPoller, metrics: MetricsContext, + shutdown_token: CancellationToken, ) -> impl Stream< Item = Result< ( @@ -204,44 +209,51 @@ fn new_wft_poller( tonic::Status, >, > { - stream::unfold((poller, metrics), |(poller, metrics)| async move { - loop { - return match poller.poll().await { - Some(Ok((wft, permit))) => { - if wft == PollWorkflowTaskQueueResponse::default() { - // We get the default proto in the event that the long poll times out. - debug!("Poll wft timeout"); - metrics.wf_tq_poll_empty(); - continue; - } - if let Some(dur) = wft.sched_to_start() { - metrics.wf_task_sched_to_start_latency(dur); - } - let work = match validate_wft(wft) { - Ok(w) => w, - Err(e) => { - error!(error=?e, "Server returned an unparseable workflow task"); + stream::unfold( + (poller, metrics, shutdown_token), + |(poller, metrics, shutdown_token)| async move { + loop { + return match poller.poll().await { + Some(Ok((wft, permit))) => { + if wft == PollWorkflowTaskQueueResponse::default() { + if shutdown_token.is_cancelled() { + poller.shutdown_box().await; + return None; + } + // We get the default proto in the event that the long poll times out. + debug!("Poll wft timeout"); + metrics.wf_tq_poll_empty(); continue; } - }; - metrics.wf_tq_poll_ok(); - Some((Ok((work, permit)), (poller, metrics))) - } - Some(Err(e)) => { - warn!(error=?e, "Error while polling for workflow tasks"); - Some((Err(e), (poller, metrics))) - } - // If poller returns None, it's dead, thus we also return None to terminate this - // stream. - None => { - // Make sure we call the actual shutdown function here to propagate any panics - // inside the polling tasks as errors. - poller.shutdown_box().await; - None - } - }; - } - }) + if let Some(dur) = wft.sched_to_start() { + metrics.wf_task_sched_to_start_latency(dur); + } + let work = match validate_wft(wft) { + Ok(w) => w, + Err(e) => { + error!(error=?e, "Server returned an unparseable workflow task"); + continue; + } + }; + metrics.wf_tq_poll_ok(); + Some((Ok((work, permit)), (poller, metrics, shutdown_token))) + } + Some(Err(e)) => { + warn!(error=?e, "Error while polling for workflow tasks"); + Some((Err(e), (poller, metrics, shutdown_token))) + } + // If poller returns None, it's dead, thus we also return None to terminate + // this stream. + None => { + // Make sure we call the actual shutdown function here to propagate any + // panics inside the polling tasks as errors. + poller.shutdown_box().await; + None + } + }; + } + }, + ) } #[allow(clippy::result_large_err)] @@ -279,11 +291,58 @@ mod tests { let stream = new_wft_poller( Box::new(MockPermittedPollBuffer::new(sem, mock_poller)), MetricsContext::no_op(), + CancellationToken::new(), ); pin_mut!(stream); assert_matches!(stream.next().await, None); } + /// empty responses after shutdown are retried indefinitely in + /// new_wft_poller, causing the WFT stream to spin. This is what caused the workflow_load + /// heavy test to hang on CI when the server has enableCancelWorkerPollsOnShutdown. + #[tokio::test] + async fn empty_response_after_shutdown_terminates_wft_stream() { + use std::sync::atomic::{AtomicUsize, Ordering}; + + let poll_count = Arc::new(AtomicUsize::new(0)); + let poll_count_clone = poll_count.clone(); + + let mut mock_poller = mock_poller(); + mock_poller.expect_poll().returning(move || { + poll_count_clone.fetch_add(1, Ordering::SeqCst); + Some(Ok(PollWorkflowTaskQueueResponse::default())) + }); + mock_poller.expect_shutdown().returning(|| ()); + + let sem = Arc::new(fixed_size_permit_dealer::(10)); + let shutdown_token = CancellationToken::new(); + + let stream = new_wft_poller( + Box::new(MockPermittedPollBuffer::new(sem, mock_poller)), + MetricsContext::no_op(), + shutdown_token.clone(), + ); + pin_mut!(stream); + + shutdown_token.cancel(); + + let result = tokio::time::timeout(std::time::Duration::from_secs(2), stream.next()).await; + assert!( + result.is_ok(), + "WFT stream should terminate promptly after shutdown, not spin on retries" + ); + assert!( + result.unwrap().is_none(), + "WFT stream should return None on empty response after shutdown" + ); + + let total = poll_count.load(Ordering::SeqCst); + assert!( + total < 5, + "Expected WFT stream to terminate quickly, but poller was called {total} times" + ); + } + #[tokio::test] async fn poll_errors_do_produce_responses() { let mut mock_poller = mock_poller(); @@ -295,6 +354,7 @@ mod tests { let stream = new_wft_poller( Box::new(MockPermittedPollBuffer::new(sem, mock_poller)), MetricsContext::no_op(), + CancellationToken::new(), ); pin_mut!(stream); assert_matches!(stream.next().await, Some(Err(_))); diff --git a/crates/sdk-core/tests/integ_tests/worker_heartbeat_tests.rs b/crates/sdk-core/tests/integ_tests/worker_heartbeat_tests.rs index 0831f7a3c..138669f03 100644 --- a/crates/sdk-core/tests/integ_tests/worker_heartbeat_tests.rs +++ b/crates/sdk-core/tests/integ_tests/worker_heartbeat_tests.rs @@ -260,44 +260,67 @@ async fn docker_worker_heartbeat_basic(#[values("otel", "prom", "no_metrics")] b ); in_activity_checks(heartbeat, &start_time, &heartbeat_time); acts_done.notify_one(); + + // Poll until the heartbeat reflects shutdown with the second WFT processed. + // The worker stays alive (join! waits for both futures) so heartbeats keep firing. + eventually( + || { + let mut rc = raw_client.clone(); + let ns = client.namespace().to_owned(); + async move { + let workers_list = WorkflowService::list_workers( + &mut rc, + ListWorkersRequest { + namespace: ns, + page_size: 100, + next_page_token: Vec::new(), + query: String::new(), + } + .into_request(), + ) + .await + .unwrap() + .into_inner(); + #[allow(deprecated)] + let hb = workers_list + .workers_info + .iter() + .find_map(|wi| { + wi.worker_heartbeat.as_ref().filter(|hb| { + hb.worker_instance_key == worker_instance_key.to_string() + }) + }) + .unwrap() + .clone(); + let tasks_done = hb + .workflow_task_slots_info + .as_ref() + .is_some_and(|s| s.total_processed_tasks >= 2); + let is_shutting_down = hb.status == WorkerStatus::ShuttingDown as i32; + if tasks_done && is_shutting_down { + Ok(hb) + } else { + Err(anyhow::anyhow!( + "Heartbeat not ready: tasks={}, shutting_down={}", + hb.workflow_task_slots_info + .as_ref() + .map_or(0, |s| s.total_processed_tasks), + is_shutting_down, + )) + } + } + }, + Duration::from_secs(5), + ) + .await + .map(|hb| after_shutdown_checks(&hb, &wf_name, &start_time, &heartbeat_time)) + .unwrap(); }; let runner = async move { worker.run_until_done().await.unwrap(); }; tokio::join!(test_fut, runner); - - let client = starter.get_client().await; - let mut raw_client = client.clone(); - let workers_list = WorkflowService::list_workers( - &mut raw_client, - ListWorkersRequest { - namespace: client.namespace().to_owned(), - page_size: 100, - next_page_token: Vec::new(), - query: String::new(), - } - .into_request(), - ) - .await - .unwrap() - .into_inner(); - // Since list_workers finds all workers in the namespace, must find specific worker used in this - // test - #[allow(deprecated)] - let worker_info = workers_list - .workers_info - .iter() - .find(|worker_info| { - if let Some(hb) = worker_info.worker_heartbeat.as_ref() { - hb.worker_instance_key == worker_instance_key.to_string() - } else { - false - } - }) - .unwrap(); - let heartbeat = worker_info.worker_heartbeat.as_ref().unwrap(); - after_shutdown_checks(heartbeat, &wf_name, &start_time, &heartbeat_time); } // Tests that rely on Prometheus running in a docker container need to start @@ -536,23 +559,7 @@ fn after_shutdown_checks( )); let workflow_task_slots = heartbeat.workflow_task_slots_info.clone().unwrap(); - assert_eq!(workflow_task_slots.current_available_slots, 5); - assert_eq!(workflow_task_slots.current_used_slots, 1); assert_eq!(workflow_task_slots.total_processed_tasks, 2); - assert_eq!(workflow_task_slots.slot_supplier_kind, "Fixed"); - let activity_task_slots = heartbeat.activity_task_slots_info.clone().unwrap(); - assert_eq!(activity_task_slots.current_available_slots, 5); - assert_eq!(workflow_task_slots.current_used_slots, 1); - assert_eq!(activity_task_slots.slot_supplier_kind, "Fixed"); - assert_eq!(activity_task_slots.last_interval_processed_tasks, 1); - let nexus_task_slots = heartbeat.nexus_task_slots_info.clone().unwrap(); - assert_eq!(nexus_task_slots.current_available_slots, 0); - assert_eq!(nexus_task_slots.current_used_slots, 0); - assert_eq!(nexus_task_slots.slot_supplier_kind, "Fixed"); - let local_activity_task_slots = heartbeat.local_activity_slots_info.clone().unwrap(); - assert_eq!(local_activity_task_slots.current_available_slots, 100); - assert_eq!(local_activity_task_slots.current_used_slots, 0); - assert_eq!(local_activity_task_slots.slot_supplier_kind, "Fixed"); let workflow_poller_info = heartbeat.workflow_poller_info.unwrap(); assert!(!workflow_poller_info.is_autoscaling); @@ -574,7 +581,6 @@ fn after_shutdown_checks( )); assert_eq!(heartbeat.total_sticky_cache_hit, 1); - assert_eq!(heartbeat.current_sticky_cache_size, 0); assert_eq!( heartbeat.plugins, vec![ diff --git a/crates/sdk-core/tests/integ_tests/worker_tests.rs b/crates/sdk-core/tests/integ_tests/worker_tests.rs index 60def06ed..0fffe1735 100644 --- a/crates/sdk-core/tests/integ_tests/worker_tests.rs +++ b/crates/sdk-core/tests/integ_tests/worker_tests.rs @@ -977,6 +977,163 @@ async fn shutdown_worker_not_retried() { assert_eq!(shutdown_call_count.load(Ordering::Relaxed), 1); } +/// Reproduces the server-side race in temporalio/temporal#9545 where a poll completes naturally +/// (e.g. long-poll timeout) right as shutdown begins, causing the poller to re-poll AFTER the +/// ShutdownWorker RPC has already been processed. The server never flushes this second poll, +/// so without the 5s TEMP_FIX timeout the worker would hang for 60s. +/// +/// Sequence: +/// 1. Worker starts polling with graceful_poll_shutdown enabled +/// 2. initiate_shutdown() fires → shutdown token cancelled, ShutdownWorker RPC spawned +/// 3. First poll returns empty (natural timeout) — but this races with ShutdownWorker +/// 4. Poller re-polls (graceful mode: no select! against shutdown token) +/// 5. ShutdownWorker RPC completes — server flushes nothing (first poll already returned) +/// 6. Second poll hangs forever — server doesn't know about it +/// 7. TEMP_FIX: after 5s the graceful_interruptor cancels the hanging poll +/// +/// This test verifies shutdown completes within 10s (not 60s), proving the temp fix works. +/// Remove this test when temporalio/temporal#9545 is fully deployed. +#[tokio::test] +async fn graceful_shutdown_race_temp_fix_prevents_60s_hang() { + use prost::Message; + use std::sync::atomic::AtomicUsize; + use temporalio_common::protos::temporal::api::{ + namespace::v1::{NamespaceInfo, namespace_info::Capabilities}, + workflowservice::v1::DescribeNamespaceResponse, + }; + use tokio::sync::Notify; + + fn grpc_ok_empty() -> tonic::codegen::http::Response { + tonic::codegen::http::Response::builder() + .header("content-type", "application/grpc") + .header("grpc-status", "0") + .body(tonic::body::Body::empty()) + .unwrap() + } + fn grpc_ok_proto(msg: &impl Message) -> tonic::codegen::http::Response { + let encoded = msg.encode_to_vec(); + let mut buf = Vec::with_capacity(5 + encoded.len()); + buf.push(0); + buf.extend_from_slice(&(encoded.len() as u32).to_be_bytes()); + buf.extend_from_slice(&encoded); + tonic::codegen::http::Response::builder() + .header("content-type", "application/grpc") + .header("grpc-status", "0") + .body(tonic::body::Body::new(http_body_util::Full::new( + bytes::Bytes::from(buf), + ))) + .unwrap() + } + + // Track poll count to distinguish first poll (returns empty) from re-polls (hang forever) + let poll_count = Arc::new(AtomicUsize::new(0)); + let poll_count_clone = poll_count.clone(); + // Signal from initiate_shutdown (via shutdown_worker RPC) to release the first poll + let shutdown_signal = Arc::new(Notify::new()); + let shutdown_signal_for_rpc = shutdown_signal.clone(); + let shutdown_signal_for_poll = shutdown_signal.clone(); + + let fs = fake_server(move |req| { + let uri = req.uri().to_string(); + let poll_count = poll_count_clone.clone(); + let shutdown_signal_for_poll = shutdown_signal_for_poll.clone(); + let shutdown_signal_for_rpc = shutdown_signal_for_rpc.clone(); + + if uri.contains("DescribeNamespace") { + let resp = DescribeNamespaceResponse { + namespace_info: Some(NamespaceInfo { + capabilities: Some(Capabilities { + worker_poll_complete_on_shutdown: true, + ..Capabilities::default() + }), + ..NamespaceInfo::default() + }), + ..DescribeNamespaceResponse::default() + }; + async move { grpc_ok_proto(&resp) }.boxed() + } else if uri.contains("Poll") { + async move { + let n = poll_count.fetch_add(1, Ordering::SeqCst); + if n == 0 { + // First poll: wait for shutdown to start, then return empty. + // This simulates the poll timing out naturally right as shutdown begins. + shutdown_signal_for_poll.notified().await; + grpc_ok_empty() + } else { + // Re-poll after shutdown: hang forever. This is the race — + // the server already processed ShutdownWorker and won't flush this poll. + futures_util::future::pending().await + } + } + .boxed() + } else if uri.contains("ShutdownWorker") { + // ShutdownWorker arrives — signal the first poll to return (simulating the race + // where poll returns right as/after ShutdownWorker is processed). + async move { + shutdown_signal_for_rpc.notify_waiters(); + grpc_ok_empty() + } + .boxed() + } else { + async { grpc_ok_empty() }.boxed() + } + }) + .await; + + let mut opts = get_integ_server_options(); + opts.target = format!("http://localhost:{}", fs.addr.port()) + .parse::() + .unwrap(); + opts.set_skip_get_system_info(true); + let connection = Connection::connect(opts).await.unwrap(); + let client_opts = temporalio_client::ClientOptions::new("ns").build(); + let client = temporalio_client::Client::new(connection, client_opts).unwrap(); + + let wf_type = "graceful_shutdown_race"; + let mut starter = CoreWfStarter::new_with_overrides(wf_type, None, Some(client)); + let worker = starter.get_worker().await; + + // Enable graceful poll shutdown via validate() + worker.validate().await.unwrap(); + + // Start polling BEFORE initiating shutdown so the poll is in-flight. + // poll_workflow_activation triggers LongPollBuffer to start, which spawns a poll task + // that hits the fake server and blocks on the first poll (waiting for shutdown_signal). + let poll_handle = tokio::spawn({ + let w = worker.clone(); + async move { + // This will block until shutdown causes PollError::ShutDown + let _ = w.poll_workflow_activation().await; + } + }); + let act_handle = tokio::spawn({ + let w = worker.clone(); + async move { + let _ = w.poll_activity_task().await; + } + }); + + // Give polls time to reach the fake server before initiating shutdown + tokio::time::sleep(Duration::from_millis(500)).await; + + // Shutdown should complete within 10s (5s TEMP_FIX + margin). + // Without the temp fix, the re-poll hangs for 60s. + let result = tokio::time::timeout(Duration::from_secs(10), async { + worker.shutdown().await; + }) + .await; + + assert!( + result.is_ok(), + "Shutdown should complete within 10s. If it hangs, the TEMP_FIX graceful poll \ + timeout is not working and the server race (temporal#9545) caused a 60s hang." + ); + + let _ = poll_handle.await; + let _ = act_handle.await; + fs.shutdown().await; +} + #[test] fn test_default_build_id() { let o = WorkerOptions::new("task_queue").build();