33//! This module demonstrates how to implement a CSP push input adapter in Rust
44//! using the C API FFI bindings.
55//!
6- //! Note: The actual data pushing requires the CSP C API symbols to be available
7- //! at runtime. When built standalone, the callbacks are implemented but the
8- //! push functionality is stubbed.
6+ //! The adapter spawns a background thread that periodically pushes integer
7+ //! values to the CSP graph using the C API.
98
109use std:: ffi:: c_void;
11- use std:: sync:: atomic:: { AtomicBool , AtomicI64 , Ordering } ;
10+ use std:: ptr;
11+ use std:: sync:: atomic:: { AtomicBool , AtomicI64 , AtomicPtr , Ordering } ;
1212use std:: sync:: Arc ;
13+ use std:: thread:: { self , JoinHandle } ;
14+ use std:: time:: Duration ;
1315
1416use crate :: bindings:: {
15- CCspDateTime , CCspEngineHandle , CCspPushInputAdapterHandle ,
17+ csp_push_input_adapter_push_int64, CCspDateTime , CCspEngineHandle ,
18+ CCspPushInputAdapterHandle ,
1619} ;
1720
1821/// Push input adapter that generates incrementing counter values.
1922///
20- /// This adapter would spawn a background thread when started that periodically
23+ /// This adapter spawns a background thread when started that periodically
2124/// pushes integer values to the CSP graph using the C API.
2225pub struct RustInputAdapter {
2326 /// Interval between pushes in milliseconds
@@ -29,9 +32,11 @@ pub struct RustInputAdapter {
2932 /// Flag to signal thread to stop
3033 running : Arc < AtomicBool > ,
3134
32- /// Push adapter handle (set by start callback)
33- #[ allow( dead_code) ]
34- adapter_handle : Option < CCspPushInputAdapterHandle > ,
35+ /// Push adapter handle (stored as AtomicPtr for thread-safe access)
36+ adapter_handle : Arc < AtomicPtr < c_void > > ,
37+
38+ /// Thread handle
39+ thread_handle : Option < JoinHandle < ( ) > > ,
3540}
3641
3742impl RustInputAdapter {
@@ -41,36 +46,86 @@ impl RustInputAdapter {
4146 interval_ms : if interval_ms > 0 { interval_ms } else { 100 } ,
4247 counter : Arc :: new ( AtomicI64 :: new ( 0 ) ) ,
4348 running : Arc :: new ( AtomicBool :: new ( false ) ) ,
44- adapter_handle : None ,
49+ adapter_handle : Arc :: new ( AtomicPtr :: new ( ptr:: null_mut ( ) ) ) ,
50+ thread_handle : None ,
4551 }
4652 }
4753
4854 /// Start the adapter.
4955 ///
50- /// In a full implementation with CSP C API symbols linked, this would
51- /// spawn a background thread that calls ccsp_push_input_adapter_push_int64 .
56+ /// Spawns a background thread that pushes integer values at the configured
57+ /// interval using the CSP C API .
5258 pub fn start ( & mut self , adapter : CCspPushInputAdapterHandle ) {
53- self . adapter_handle = Some ( adapter) ;
59+ // Store the adapter handle (AtomicPtr is Send + Sync)
60+ self . adapter_handle . store ( adapter, Ordering :: SeqCst ) ;
5461 self . running . store ( true , Ordering :: SeqCst ) ;
5562
63+ let running = Arc :: clone ( & self . running ) ;
64+ let counter = Arc :: clone ( & self . counter ) ;
65+ let adapter_handle = Arc :: clone ( & self . adapter_handle ) ;
66+ let interval_ms = self . interval_ms ;
67+
5668 eprintln ! (
57- "[RustInputAdapter] Started with interval {} ms (adapter handle: {:?})" ,
69+ "[RustInputAdapter] Starting with interval {} ms (adapter handle: {:?})" ,
5870 self . interval_ms, adapter
5971 ) ;
6072
61- // Note: In a full implementation, we would spawn a thread here that
62- // calls ccsp_push_input_adapter_push_int64 to push values.
63- // This requires the CSP C API symbols to be available at runtime.
73+ // Spawn background thread that pushes values
74+ let handle = thread:: spawn ( move || {
75+ let interval = Duration :: from_millis ( interval_ms) ;
76+
77+ while running. load ( Ordering :: SeqCst ) {
78+ let value = counter. fetch_add ( 1 , Ordering :: SeqCst ) ;
79+ let adapter_ptr = adapter_handle. load ( Ordering :: SeqCst ) ;
80+
81+ // Push the counter value to CSP using the C API
82+ if !adapter_ptr. is_null ( ) {
83+ unsafe {
84+ let result = csp_push_input_adapter_push_int64 (
85+ adapter_ptr,
86+ value,
87+ ptr:: null_mut ( ) ,
88+ ) ;
89+ if result. is_none ( ) {
90+ eprintln ! ( "[RustInputAdapter] CSP symbol missing: ccsp_push_input_adapter_push_int64" ) ;
91+ break ;
92+ }
93+ if result != Some ( crate :: bindings:: CCspErrorCode :: Ok ) {
94+ eprintln ! (
95+ "[RustInputAdapter] Push failed with error: {:?}" ,
96+ result
97+ ) ;
98+ }
99+ }
100+ }
101+
102+ thread:: sleep ( interval) ;
103+ }
104+
105+ eprintln ! (
106+ "[RustInputAdapter] Thread exiting after {} values" ,
107+ counter. load( Ordering :: SeqCst )
108+ ) ;
109+ } ) ;
110+
111+ self . thread_handle = Some ( handle) ;
64112 }
65113
66114 /// Stop the adapter.
67115 pub fn stop ( & mut self ) {
68116 eprintln ! (
69- "[RustInputAdapter] Stopped after {} values" ,
117+ "[RustInputAdapter] Stopping after {} values" ,
70118 self . counter. load( Ordering :: SeqCst )
71119 ) ;
72120
73121 self . running . store ( false , Ordering :: SeqCst ) ;
122+
123+ // Wait for the thread to finish
124+ if let Some ( handle) = self . thread_handle . take ( ) {
125+ if let Err ( e) = handle. join ( ) {
126+ eprintln ! ( "[RustInputAdapter] Thread join error: {:?}" , e) ;
127+ }
128+ }
74129 }
75130}
76131
0 commit comments