@@ -45,6 +45,7 @@ pub struct RunningTopology {
4545 abort_tx : mpsc:: UnboundedSender < ( ) > ,
4646 watch : ( WatchTx , WatchRx ) ,
4747 pub ( crate ) running : Arc < AtomicBool > ,
48+ graceful_shutdown_duration : Option < Duration > ,
4849}
4950
5051impl RunningTopology {
@@ -54,14 +55,15 @@ impl RunningTopology {
5455 inputs_tap_metadata : HashMap :: new ( ) ,
5556 outputs : HashMap :: new ( ) ,
5657 outputs_tap_metadata : HashMap :: new ( ) ,
57- config,
5858 shutdown_coordinator : SourceShutdownCoordinator :: default ( ) ,
5959 detach_triggers : HashMap :: new ( ) ,
6060 source_tasks : HashMap :: new ( ) ,
6161 tasks : HashMap :: new ( ) ,
6262 abort_tx,
6363 watch : watch:: channel ( TapResource :: default ( ) ) ,
6464 running : Arc :: new ( AtomicBool :: new ( true ) ) ,
65+ graceful_shutdown_duration : config. graceful_shutdown_duration ,
66+ config,
6567 }
6668 }
6769
@@ -120,30 +122,36 @@ impl RunningTopology {
120122 check_handles. entry ( key) . or_default ( ) . push ( task) ;
121123 }
122124
123- // If we reach this, we will forcefully shutdown the sources.
124- let deadline = Instant :: now ( ) + Duration :: from_secs ( 60 ) ;
125-
126- // If we reach the deadline, this future will print out which components
127- // won't gracefully shutdown since we will start to forcefully shutdown
128- // the sources.
129- let mut check_handles2 = check_handles. clone ( ) ;
130- let timeout = async move {
131- sleep_until ( deadline) . await ;
132- // Remove all tasks that have shutdown.
133- check_handles2. retain ( |_key, handles| {
134- retain ( handles, |handle| handle. peek ( ) . is_none ( ) ) ;
135- !handles. is_empty ( )
136- } ) ;
137- let remaining_components = check_handles2
138- . keys ( )
139- . map ( |item| item. to_string ( ) )
140- . collect :: < Vec < _ > > ( )
141- . join ( ", " ) ;
125+ // If we reach this, we will forcefully shutdown the sources. If None, we will never force shutdown.
126+ let deadline = self
127+ . graceful_shutdown_duration
128+ . map ( |grace_period| Instant :: now ( ) + grace_period) ;
142129
143- error ! (
144- components = ?remaining_components,
145- "Failed to gracefully shut down in time. Killing components."
146- ) ;
130+ let timeout = if let Some ( deadline) = deadline {
131+ // If we reach the deadline, this future will print out which components
132+ // won't gracefully shutdown since we will start to forcefully shutdown
133+ // the sources.
134+ let mut check_handles2 = check_handles. clone ( ) ;
135+ Box :: pin ( async move {
136+ sleep_until ( deadline) . await ;
137+ // Remove all tasks that have shutdown.
138+ check_handles2. retain ( |_key, handles| {
139+ retain ( handles, |handle| handle. peek ( ) . is_none ( ) ) ;
140+ !handles. is_empty ( )
141+ } ) ;
142+ let remaining_components = check_handles2
143+ . keys ( )
144+ . map ( |item| item. to_string ( ) )
145+ . collect :: < Vec < _ > > ( )
146+ . join ( ", " ) ;
147+
148+ error ! (
149+ components = ?remaining_components,
150+ "Failed to gracefully shut down in time. Killing components."
151+ ) ;
152+ } ) as future:: BoxFuture < ' static , ( ) >
153+ } else {
154+ Box :: pin ( future:: pending ( ) ) as future:: BoxFuture < ' static , ( ) >
147155 } ;
148156
149157 // Reports in intervals which components are still running.
@@ -163,10 +171,12 @@ impl RunningTopology {
163171 . collect :: < Vec < _ > > ( )
164172 . join ( ", " ) ;
165173
166- let time_remaining = match deadline. checked_duration_since ( Instant :: now ( ) ) {
167- Some ( remaining) => format ! ( "{} seconds left" , remaining. as_secs( ) ) ,
168- None => "overdue" . to_string ( ) ,
169- } ;
174+ let time_remaining = deadline
175+ . map ( |d| match d. checked_duration_since ( Instant :: now ( ) ) {
176+ Some ( remaining) => format ! ( "{} seconds left" , remaining. as_secs( ) ) ,
177+ None => "overdue" . to_string ( ) ,
178+ } )
179+ . unwrap_or ( "no time limit" . to_string ( ) ) ;
170180
171181 info ! (
172182 remaining_components = ?remaining_components,
0 commit comments