@@ -219,3 +219,108 @@ async fn test_streaming_with_stderr() {
219219 "Stderr should contain stderr message, got: {stderr}"
220220 ) ;
221221}
222+
223+ #[ tokio:: test]
224+ async fn test_streaming_large_output_backpressure ( ) {
225+ if !can_ssh_to_localhost ( ) {
226+ eprintln ! ( "Skipping large output test: Cannot SSH to localhost" ) ;
227+ return ;
228+ }
229+
230+ // Get current username
231+ let username = std:: env:: var ( "USER" ) . unwrap_or_else ( |_| "root" . to_string ( ) ) ;
232+
233+ // Create client
234+ let client = Client :: connect (
235+ ( "localhost" , 22 ) ,
236+ & username,
237+ AuthMethod :: Agent ,
238+ ServerCheckMethod :: NoCheck ,
239+ )
240+ . await ;
241+
242+ if client. is_err ( ) {
243+ eprintln ! ( "Skipping large output test: Cannot connect to localhost" ) ;
244+ return ;
245+ }
246+
247+ let client = client. unwrap ( ) ;
248+
249+ // Build output buffer for streaming
250+ let ( sender, receiver_task) = build_test_output_buffer ( ) ;
251+
252+ // Execute command that generates large output to test backpressure
253+ // Generate 10000 lines to ensure we exceed the channel buffer
254+ let exit_status = client
255+ . execute_streaming ( "for i in {1..10000}; do echo \" Line $i\" ; done" , sender)
256+ . await ;
257+
258+ assert ! (
259+ exit_status. is_ok( ) ,
260+ "Large output command should execute successfully"
261+ ) ;
262+ let exit_status = exit_status. unwrap ( ) ;
263+ assert_eq ! ( exit_status, 0 , "Command should exit with status 0" ) ;
264+
265+ // Wait for output collection
266+ let ( stdout_bytes, _stderr_bytes) = receiver_task. await . unwrap ( ) ;
267+ let stdout = String :: from_utf8_lossy ( & stdout_bytes) ;
268+
269+ // Verify we got all lines
270+ assert ! ( stdout. contains( "Line 1" ) , "Should contain first line" ) ;
271+ assert ! ( stdout. contains( "Line 10000" ) , "Should contain last line" ) ;
272+
273+ // Count lines to ensure no data loss
274+ let line_count = stdout. lines ( ) . count ( ) ;
275+ assert_eq ! (
276+ line_count, 10000 ,
277+ "Should have exactly 10000 lines, got: {line_count}"
278+ ) ;
279+ }
280+
281+ #[ tokio:: test]
282+ async fn test_streaming_receiver_drop_handling ( ) {
283+ if !can_ssh_to_localhost ( ) {
284+ eprintln ! ( "Skipping receiver drop test: Cannot SSH to localhost" ) ;
285+ return ;
286+ }
287+
288+ // Get current username
289+ let username = std:: env:: var ( "USER" ) . unwrap_or_else ( |_| "root" . to_string ( ) ) ;
290+
291+ // Create client
292+ let client = Client :: connect (
293+ ( "localhost" , 22 ) ,
294+ & username,
295+ AuthMethod :: Agent ,
296+ ServerCheckMethod :: NoCheck ,
297+ )
298+ . await ;
299+
300+ if client. is_err ( ) {
301+ eprintln ! ( "Skipping receiver drop test: Cannot connect to localhost" ) ;
302+ return ;
303+ }
304+
305+ let client = client. unwrap ( ) ;
306+
307+ // Create a channel but immediately drop the receiver
308+ let ( sender, receiver) = channel ( 100 ) ;
309+
310+ // Drop the receiver to simulate early termination
311+ drop ( receiver) ;
312+
313+ // Execute command - should handle receiver drop gracefully
314+ let exit_status = client. execute_streaming ( "echo 'test output'" , sender) . await ;
315+
316+ // Should still return exit status even though receiver dropped
317+ assert ! (
318+ exit_status. is_ok( ) ,
319+ "Command should handle receiver drop gracefully"
320+ ) ;
321+ let exit_status = exit_status. unwrap ( ) ;
322+ assert_eq ! (
323+ exit_status, 0 ,
324+ "Command should still report correct exit status"
325+ ) ;
326+ }
0 commit comments