@@ -183,96 +183,18 @@ impl BlockSync {
183183
184184 match block_rx. recv ( ) . await {
185185 Ok ( event) => {
186- match event {
187- BlockEvent :: NewBlock {
188- block_number,
189- epoch_info,
190- } => {
191- // Update state
192- * current_block. write ( ) . await = block_number;
193- * current_epoch. write ( ) . await = epoch_info. epoch_number ;
194- * current_phase. write ( ) . await = epoch_info. phase ;
195-
196- // Forward event
197- let _ = event_tx
198- . send ( BlockSyncEvent :: NewBlock {
199- block_number,
200- epoch_info,
201- } )
202- . await ;
203-
204- if was_disconnected {
205- was_disconnected = false ;
206- let _ = event_tx. send ( BlockSyncEvent :: Reconnected ) . await ;
207- }
208- }
209- BlockEvent :: EpochTransition ( EpochTransition :: NewEpoch {
210- old_epoch,
211- new_epoch,
212- block,
213- } ) => {
214- info ! (
215- "Bittensor epoch transition: {} -> {} at block {}" ,
216- old_epoch, new_epoch, block
217- ) ;
218- let _ = event_tx
219- . send ( BlockSyncEvent :: EpochTransition {
220- old_epoch,
221- new_epoch,
222- block,
223- } )
224- . await ;
225- }
226- BlockEvent :: PhaseChange {
227- block_number,
228- old_phase,
229- new_phase,
230- epoch,
231- } => {
232- info ! (
233- "Bittensor phase change: {} -> {} at block {} (epoch {})" ,
234- old_phase, new_phase, block_number, epoch
235- ) ;
236-
237- let _ = event_tx
238- . send ( BlockSyncEvent :: PhaseChange {
239- block_number,
240- old_phase,
241- new_phase,
242- epoch,
243- } )
244- . await ;
245-
246- // Emit specific events for commit/reveal windows
247- match new_phase {
248- EpochPhase :: CommitWindow => {
249- let _ = event_tx
250- . send ( BlockSyncEvent :: CommitWindowOpen {
251- epoch,
252- block : block_number,
253- } )
254- . await ;
255- }
256- EpochPhase :: RevealWindow => {
257- let _ = event_tx
258- . send ( BlockSyncEvent :: RevealWindowOpen {
259- epoch,
260- block : block_number,
261- } )
262- . await ;
263- }
264- _ => { }
265- }
266- }
267- BlockEvent :: ConnectionError ( e) => {
268- warn ! ( "Bittensor connection error: {}" , e) ;
269- was_disconnected = true ;
270- let _ = event_tx. send ( BlockSyncEvent :: Disconnected ( e) ) . await ;
271- }
272- BlockEvent :: Stopped => {
273- info ! ( "Block listener stopped" ) ;
274- break ;
275- }
186+ let should_break = BlockSync :: handle_block_event (
187+ event,
188+ & event_tx,
189+ & current_block,
190+ & current_epoch,
191+ & current_phase,
192+ & mut was_disconnected,
193+ )
194+ . await ;
195+
196+ if should_break {
197+ break ;
276198 }
277199 }
278200 Err ( broadcast:: error:: RecvError :: Lagged ( n) ) => {
@@ -289,6 +211,106 @@ impl BlockSync {
289211 Ok ( ( ) )
290212 }
291213
214+ async fn handle_block_event (
215+ event : BlockEvent ,
216+ event_tx : & mpsc:: Sender < BlockSyncEvent > ,
217+ current_block : & Arc < RwLock < u64 > > ,
218+ current_epoch : & Arc < RwLock < u64 > > ,
219+ current_phase : & Arc < RwLock < EpochPhase > > ,
220+ was_disconnected : & mut bool ,
221+ ) -> bool {
222+ match event {
223+ BlockEvent :: NewBlock {
224+ block_number,
225+ epoch_info,
226+ } => {
227+ * current_block. write ( ) . await = block_number;
228+ * current_epoch. write ( ) . await = epoch_info. epoch_number ;
229+ * current_phase. write ( ) . await = epoch_info. phase ;
230+
231+ let _ = event_tx
232+ . send ( BlockSyncEvent :: NewBlock {
233+ block_number,
234+ epoch_info,
235+ } )
236+ . await ;
237+
238+ if * was_disconnected {
239+ * was_disconnected = false ;
240+ let _ = event_tx. send ( BlockSyncEvent :: Reconnected ) . await ;
241+ }
242+ }
243+ BlockEvent :: EpochTransition ( EpochTransition :: NewEpoch {
244+ old_epoch,
245+ new_epoch,
246+ block,
247+ } ) => {
248+ info ! (
249+ "Bittensor epoch transition: {} -> {} at block {}" ,
250+ old_epoch, new_epoch, block
251+ ) ;
252+ let _ = event_tx
253+ . send ( BlockSyncEvent :: EpochTransition {
254+ old_epoch,
255+ new_epoch,
256+ block,
257+ } )
258+ . await ;
259+ }
260+ BlockEvent :: PhaseChange {
261+ block_number,
262+ old_phase,
263+ new_phase,
264+ epoch,
265+ } => {
266+ info ! (
267+ "Bittensor phase change: {} -> {} at block {} (epoch {})" ,
268+ old_phase, new_phase, block_number, epoch
269+ ) ;
270+
271+ let _ = event_tx
272+ . send ( BlockSyncEvent :: PhaseChange {
273+ block_number,
274+ old_phase,
275+ new_phase,
276+ epoch,
277+ } )
278+ . await ;
279+
280+ match new_phase {
281+ EpochPhase :: CommitWindow => {
282+ let _ = event_tx
283+ . send ( BlockSyncEvent :: CommitWindowOpen {
284+ epoch,
285+ block : block_number,
286+ } )
287+ . await ;
288+ }
289+ EpochPhase :: RevealWindow => {
290+ let _ = event_tx
291+ . send ( BlockSyncEvent :: RevealWindowOpen {
292+ epoch,
293+ block : block_number,
294+ } )
295+ . await ;
296+ }
297+ _ => { }
298+ }
299+ }
300+ BlockEvent :: ConnectionError ( e) => {
301+ warn ! ( "Bittensor connection error: {}" , e) ;
302+ * was_disconnected = true ;
303+ let _ = event_tx. send ( BlockSyncEvent :: Disconnected ( e) ) . await ;
304+ }
305+ BlockEvent :: Stopped => {
306+ info ! ( "Block listener stopped" ) ;
307+ return true ;
308+ }
309+ }
310+
311+ false
312+ }
313+
292314 /// Stop the block sync
293315 pub async fn stop ( & self ) {
294316 * self . running . write ( ) . await = false ;
@@ -324,3 +346,146 @@ impl BlockSync {
324346}
325347
326348// Re-export types from bittensor_rs for convenience (already imported at top)
349+
350+ #[ cfg( test) ]
351+ mod tests {
352+ use super :: * ;
353+
354+ #[ test]
355+ fn test_block_sync_config_default ( ) {
356+ let config = BlockSyncConfig :: default ( ) ;
357+ assert_eq ! ( config. netuid, 1 ) ;
358+ assert_eq ! ( config. channel_capacity, 100 ) ;
359+ }
360+
361+ #[ tokio:: test]
362+ async fn test_block_sync_initial_state ( ) {
363+ let mut sync = BlockSync :: new ( BlockSyncConfig {
364+ netuid : 42 ,
365+ channel_capacity : 8 ,
366+ } ) ;
367+
368+ assert ! ( !sync. is_connected( ) ) ;
369+ assert ! ( !sync. is_running( ) . await ) ;
370+ assert_eq ! ( sync. current_block( ) . await , 0 ) ;
371+ assert_eq ! ( sync. current_epoch( ) . await , 0 ) ;
372+ assert ! ( matches!( sync. current_phase( ) . await , EpochPhase :: Evaluation ) ) ;
373+
374+ let first_receiver = sync. take_event_receiver ( ) ;
375+ assert ! ( first_receiver. is_some( ) ) ;
376+ assert ! ( sync. take_event_receiver( ) . is_none( ) ) ;
377+ }
378+
379+ fn sample_epoch_info ( block : u64 , epoch : u64 , phase : EpochPhase ) -> EpochInfo {
380+ EpochInfo {
381+ current_block : block,
382+ tempo : 360 ,
383+ epoch_start_block : epoch * 360 ,
384+ next_epoch_start_block : epoch * 360 + 360 ,
385+ blocks_remaining : 10 ,
386+ epoch_number : epoch,
387+ phase,
388+ commit_reveal_enabled : true ,
389+ reveal_period_epochs : 1 ,
390+ }
391+ }
392+
393+ #[ tokio:: test]
394+ async fn test_handle_block_event_new_block_emits_reconnect ( ) {
395+ let ( tx, mut rx) = mpsc:: channel ( 4 ) ;
396+ let current_block = Arc :: new ( RwLock :: new ( 0 ) ) ;
397+ let current_epoch = Arc :: new ( RwLock :: new ( 0 ) ) ;
398+ let current_phase = Arc :: new ( RwLock :: new ( EpochPhase :: Evaluation ) ) ;
399+ let mut was_disconnected = true ;
400+
401+ let epoch_info = sample_epoch_info ( 123 , 9 , EpochPhase :: CommitWindow ) ;
402+
403+ let should_break = BlockSync :: handle_block_event (
404+ BlockEvent :: NewBlock {
405+ block_number : 123 ,
406+ epoch_info : epoch_info. clone ( ) ,
407+ } ,
408+ & tx,
409+ & current_block,
410+ & current_epoch,
411+ & current_phase,
412+ & mut was_disconnected,
413+ )
414+ . await ;
415+
416+ assert ! ( !should_break) ;
417+ assert_eq ! ( * current_block. read( ) . await , 123 ) ;
418+ assert_eq ! ( * current_epoch. read( ) . await , 9 ) ;
419+ assert ! ( matches!(
420+ * current_phase. read( ) . await ,
421+ EpochPhase :: CommitWindow
422+ ) ) ;
423+
424+ let first = rx. recv ( ) . await . unwrap ( ) ;
425+ assert ! ( matches!( first, BlockSyncEvent :: NewBlock { .. } ) ) ;
426+ let second = rx. recv ( ) . await . unwrap ( ) ;
427+ assert ! ( matches!( second, BlockSyncEvent :: Reconnected ) ) ;
428+ assert ! ( !was_disconnected) ;
429+ }
430+
431+ #[ tokio:: test]
432+ async fn test_handle_block_event_phase_change_emits_windows ( ) {
433+ let ( tx, mut rx) = mpsc:: channel ( 4 ) ;
434+ let current_block = Arc :: new ( RwLock :: new ( 0 ) ) ;
435+ let current_epoch = Arc :: new ( RwLock :: new ( 0 ) ) ;
436+ let current_phase = Arc :: new ( RwLock :: new ( EpochPhase :: Evaluation ) ) ;
437+ let mut was_disconnected = false ;
438+
439+ let should_break = BlockSync :: handle_block_event (
440+ BlockEvent :: PhaseChange {
441+ block_number : 200 ,
442+ old_phase : EpochPhase :: Evaluation ,
443+ new_phase : EpochPhase :: CommitWindow ,
444+ epoch : 7 ,
445+ } ,
446+ & tx,
447+ & current_block,
448+ & current_epoch,
449+ & current_phase,
450+ & mut was_disconnected,
451+ )
452+ . await ;
453+
454+ assert ! ( !should_break) ;
455+ assert_eq ! ( * current_epoch. read( ) . await , 0 ) ; // unchanged for phase events
456+ assert ! ( matches!(
457+ * current_phase. read( ) . await ,
458+ EpochPhase :: Evaluation
459+ ) ) ;
460+
461+ let phase_event = rx. recv ( ) . await . unwrap ( ) ;
462+ assert ! ( matches!( phase_event, BlockSyncEvent :: PhaseChange { .. } ) ) ;
463+ let window_event = rx. recv ( ) . await . unwrap ( ) ;
464+ assert ! ( matches!(
465+ window_event,
466+ BlockSyncEvent :: CommitWindowOpen { .. }
467+ ) ) ;
468+ }
469+
470+ #[ tokio:: test]
471+ async fn test_handle_block_event_stopped_breaks_loop ( ) {
472+ let ( tx, mut rx) = mpsc:: channel ( 1 ) ;
473+ let current_block = Arc :: new ( RwLock :: new ( 0 ) ) ;
474+ let current_epoch = Arc :: new ( RwLock :: new ( 0 ) ) ;
475+ let current_phase = Arc :: new ( RwLock :: new ( EpochPhase :: Evaluation ) ) ;
476+ let mut was_disconnected = false ;
477+
478+ let should_break = BlockSync :: handle_block_event (
479+ BlockEvent :: Stopped ,
480+ & tx,
481+ & current_block,
482+ & current_epoch,
483+ & current_phase,
484+ & mut was_disconnected,
485+ )
486+ . await ;
487+
488+ assert ! ( should_break) ;
489+ assert ! ( rx. try_recv( ) . is_err( ) ) ;
490+ }
491+ }
0 commit comments