1- import { injectable , inject , postConstruct , named } from '@theia/core/shared/inversify' ;
1+ import { injectable , inject , named } from '@theia/core/shared/inversify' ;
22import { ClientDuplexStream } from '@grpc/grpc-js' ;
33import { ILogger } from '@theia/core/lib/common/logger' ;
44import { deepClone } from '@theia/core/lib/common/objects' ;
5- import { CoreClientAware , CoreClientProvider } from './core-client-provider' ;
5+ import { CoreClientAware } from './core-client-provider' ;
66import {
77 BoardListWatchRequest ,
88 BoardListWatchResponse ,
@@ -14,31 +14,47 @@ import {
1414 AvailablePorts ,
1515 AttachedBoardsChangeEvent ,
1616} from '../common/protocol' ;
17+ import { Emitter } from '@theia/core/lib/common/event' ;
18+ import { DisposableCollection } from '@theia/core/lib/common/disposable' ;
19+ import { Disposable } from '@theia/core/shared/vscode-languageserver-protocol' ;
20+ import { ArduinoCoreServiceClient } from './cli-protocol/cc/arduino/cli/commands/v1/commands_grpc_pb' ;
21+ import { v4 } from 'uuid' ;
22+ import { ServiceError } from './service-error' ;
23+ import { BackendApplicationContribution } from '@theia/core/lib/node' ;
24+
25+ type Duplex = ClientDuplexStream < BoardListWatchRequest , BoardListWatchResponse > ;
26+ interface StreamWrapper extends Disposable {
27+ readonly stream : Duplex ;
28+ readonly uuid : string ; // For logging only
29+ }
1730
1831/**
1932 * Singleton service for tracking the available ports and board and broadcasting the
2033 * changes to all connected frontend instances. \
2134 * Unlike other services, this is not connection scoped.
2235 */
2336@injectable ( )
24- export class BoardDiscovery extends CoreClientAware {
37+ export class BoardDiscovery
38+ extends CoreClientAware
39+ implements BackendApplicationContribution
40+ {
2541 @inject ( ILogger )
26- @named ( 'discovery' )
27- protected discoveryLogger : ILogger ;
42+ @named ( 'discovery-log ' )
43+ private readonly logger : ILogger ;
2844
2945 @inject ( NotificationServiceServer )
30- protected readonly notificationService : NotificationServiceServer ;
46+ private readonly notificationService : NotificationServiceServer ;
3147
3248 // Used to know if the board watch process is already running to avoid
3349 // starting it multiple times
3450 private watching : boolean ;
35-
36- protected boardWatchDuplex :
37- | ClientDuplexStream < BoardListWatchRequest , BoardListWatchResponse >
38- | undefined ;
51+ private wrapper : StreamWrapper | undefined ;
52+ private readonly onStreamDidEndEmitter = new Emitter < void > ( ) ; // sent from the CLI when the discovery process is killed for example after the indexes update and the core client re-initialization.
53+ private readonly onStreamDidCancelEmitter = new Emitter < void > ( ) ; // when the watcher is canceled by the IDE2
54+ private readonly toDisposeOnStopWatch = new DisposableCollection ( ) ;
3955
4056 /**
41- * Keys are the `address` of the ports. \
57+ * Keys are the `address` of the ports.
4258 * The `protocol` is ignored because the board detach event does not carry the protocol information,
4359 * just the address.
4460 * ```json
@@ -48,62 +64,153 @@ export class BoardDiscovery extends CoreClientAware {
4864 * }
4965 * ```
5066 */
51- protected _state : AvailablePorts = { } ;
67+ private _state : AvailablePorts = { } ;
5268 get state ( ) : AvailablePorts {
5369 return this . _state ;
5470 }
5571
56- @postConstruct ( )
57- protected async init ( ) : Promise < void > {
58- this . coreClient . then ( ( client ) => this . startBoardListWatch ( client ) ) ;
59- this . onClientDidRefresh ( ( client ) =>
60- this . stopBoardListWatch ( client ) . then ( ( ) =>
61- this . startBoardListWatch ( client )
62- )
63- ) ;
72+ onStart ( ) : void {
73+ this . start ( ) ;
74+ this . onClientDidRefresh ( ( ) => this . start ( ) ) ;
6475 }
6576
66- stopBoardListWatch ( coreClient : CoreClientProvider . Client ) : Promise < void > {
67- return new Promise ( ( resolve , reject ) => {
68- if ( ! this . boardWatchDuplex ) {
69- return resolve ( ) ;
70- }
77+ onStop ( ) : void {
78+ this . stop ( ) ;
79+ }
7180
72- const { instance } = coreClient ;
73- const req = new BoardListWatchRequest ( ) ;
74- req . setInstance ( instance ) ;
75- try {
76- this . boardWatchDuplex . write ( req . setInterrupt ( true ) , resolve ) ;
77- } catch ( e ) {
78- this . discoveryLogger . error ( e ) ;
79- resolve ( ) ;
81+ stop ( ) : Promise < void > {
82+ this . logger . info ( '>>> Stopping boards watcher...' ) ;
83+ return new Promise < void > ( ( resolve , reject ) => {
84+ const timeout = this . timeout ( BoardDiscovery . StopWatchTimeout , reject ) ;
85+ const toDispose = new DisposableCollection ( ) ;
86+ toDispose . pushAll ( [
87+ timeout ,
88+ this . onStreamDidEndEmitter . event ( ( ) => {
89+ this . logger . info (
90+ `<<< Received the end event from the stream. Boards watcher has been successfully stopped.`
91+ ) ;
92+ this . watching = false ;
93+ toDispose . dispose ( ) ;
94+ resolve ( ) ;
95+ } ) ,
96+ this . onStreamDidCancelEmitter . event ( ( ) => {
97+ this . logger . info (
98+ `<<< Received the cancel event from the stream. Boards watcher has been successfully stopped.`
99+ ) ;
100+ this . watching = false ;
101+ toDispose . dispose ( ) ;
102+ resolve ( ) ;
103+ } ) ,
104+ ] ) ;
105+ this . logger . info ( 'Canceling boards watcher...' ) ;
106+ this . toDisposeOnStopWatch . dispose ( ) ;
107+ } ) ;
108+ }
109+
110+ private timeout (
111+ after : number ,
112+ onTimeout : ( error : Error ) => void
113+ ) : Disposable {
114+ const timer = setTimeout (
115+ ( ) => onTimeout ( new Error ( `Timed out after ${ after } ms.` ) ) ,
116+ after
117+ ) ;
118+ return Disposable . create ( ( ) => clearTimeout ( timer ) ) ;
119+ }
120+
121+ private async write (
122+ req : BoardListWatchRequest ,
123+ duplex : Duplex
124+ ) : Promise < void > {
125+ return new Promise < void > ( ( resolve , reject ) => {
126+ this . logger . info ( `>>> Writing ${ this . toJson ( req ) } to the stream...` ) ;
127+ if (
128+ ! duplex . write ( req , ( err : Error | undefined ) => {
129+ if ( err ) {
130+ this . logger . error (
131+ `<<< Error ocurred while writing to the stream.` ,
132+ err
133+ ) ;
134+ reject ( err ) ;
135+ return ;
136+ }
137+ } )
138+ ) {
139+ duplex . once ( 'drain' , ( ) => {
140+ this . logger . info (
141+ `<<< Board list watch request has been successfully written to the stream after the handling backpressure.`
142+ ) ;
143+ resolve ( ) ;
144+ } ) ;
145+ } else {
146+ process . nextTick ( ( ) => {
147+ this . logger . info (
148+ `<<< Board list watch request has been successfully written to the stream.`
149+ ) ;
150+ resolve ( ) ;
151+ } ) ;
80152 }
81153 } ) ;
82154 }
83155
84- startBoardListWatch ( coreClient : CoreClientProvider . Client ) : void {
156+ private async createWrapper (
157+ client : ArduinoCoreServiceClient
158+ ) : Promise < StreamWrapper > {
159+ if ( this . wrapper ) {
160+ throw new Error ( `Duplex was already set.` ) ;
161+ }
162+ const stream = client
163+ . boardListWatch ( )
164+ . on ( 'end' , ( ) => this . onStreamDidEndEmitter . fire ( ) )
165+ . on ( 'error' , ( error ) => {
166+ if ( ServiceError . isCancel ( error ) ) {
167+ this . onStreamDidCancelEmitter . fire ( ) ;
168+ } else {
169+ this . logger . error (
170+ 'Unexpected error occurred during the boards discovery.' ,
171+ error
172+ ) ;
173+ // TODO: terminate? restart? reject?
174+ }
175+ } ) ;
176+ const wrapper = {
177+ stream,
178+ uuid : v4 ( ) ,
179+ dispose : ( ) => {
180+ // Cancelling the stream will kill the discovery `builtin:mdns-discovery process`.
181+ // The client (this class) will receive a `{"eventType":"quit","error":""}` response from the CLI.
182+ stream . cancel ( ) ;
183+ this . wrapper = undefined ;
184+ } ,
185+ } ;
186+ this . toDisposeOnStopWatch . pushAll ( [ wrapper ] ) ;
187+ return wrapper ;
188+ }
189+
190+ private toJson ( arg : BoardListWatchRequest | BoardListWatchResponse ) : string {
191+ let object : Record < string , unknown > | undefined = undefined ;
192+ if ( arg instanceof BoardListWatchRequest ) {
193+ object = BoardListWatchRequest . toObject ( false , arg ) ;
194+ } else if ( arg instanceof BoardListWatchResponse ) {
195+ object = BoardListWatchResponse . toObject ( false , arg ) ;
196+ } else {
197+ throw new Error ( `Unhandled object type: ${ arg } ` ) ;
198+ }
199+ return JSON . stringify ( object ) ;
200+ }
201+
202+ async start ( ) : Promise < void > {
85203 if ( this . watching ) {
86204 // We want to avoid starting the board list watch process multiple
87205 // times to meet unforeseen consequences
88206 return ;
89207 }
90- this . watching = true ;
91- const { client, instance } = coreClient ;
92- const req = new BoardListWatchRequest ( ) ;
93- req . setInstance ( instance ) ;
94- this . boardWatchDuplex = client . boardListWatch ( ) ;
95- this . boardWatchDuplex . on ( 'end' , ( ) => {
96- this . watching = false ;
97- console . info ( 'board watch ended' ) ;
98- } ) ;
99- this . boardWatchDuplex . on ( 'close' , ( ) => {
100- this . watching = false ;
101- console . info ( 'board watch ended' ) ;
102- } ) ;
103- this . boardWatchDuplex . on ( 'data' , ( resp : BoardListWatchResponse ) => {
208+ const { client, instance } = await this . coreClient ;
209+ const wrapper = await this . createWrapper ( client ) ;
210+ wrapper . stream . on ( 'data' , async ( resp : BoardListWatchResponse ) => {
211+ this . logger . info ( 'onData' , this . toJson ( resp ) ) ;
104212 if ( resp . getEventType ( ) === 'quit' ) {
105- this . watching = false ;
106- console . info ( 'board watch ended' ) ;
213+ await this . stop ( ) ;
107214 return ;
108215 }
109216
@@ -135,7 +242,9 @@ export class BoardDiscovery extends CoreClientAware {
135242 // protocols.
136243 const portID = `${ address } |${ protocol } ` ;
137244 const label = ( detectedPort as any ) . getPort ( ) . getLabel ( ) ;
138- const protocolLabel = ( detectedPort as any ) . getPort ( ) . getProtocolLabel ( ) ;
245+ const protocolLabel = ( detectedPort as any )
246+ . getPort ( )
247+ . getProtocolLabel ( ) ;
139248 const port = {
140249 id : portID ,
141250 address,
@@ -155,16 +264,20 @@ export class BoardDiscovery extends CoreClientAware {
155264 if ( eventType === 'add' ) {
156265 if ( newState [ portID ] ) {
157266 const [ , knownBoards ] = newState [ portID ] ;
158- console . warn (
159- `Port '${ Port . toString ( port ) } ' was already available. Known boards before override: ${ JSON . stringify (
267+ this . logger . warn (
268+ `Port '${ Port . toString (
269+ port
270+ ) } ' was already available. Known boards before override: ${ JSON . stringify (
160271 knownBoards
161272 ) } `
162273 ) ;
163274 }
164275 newState [ portID ] = [ port , boards ] ;
165276 } else if ( eventType === 'remove' ) {
166277 if ( ! newState [ portID ] ) {
167- console . warn ( `Port '${ Port . toString ( port ) } ' was not available. Skipping` ) ;
278+ this . logger . warn (
279+ `Port '${ Port . toString ( port ) } ' was not available. Skipping`
280+ ) ;
168281 return ;
169282 }
170283 delete newState [ portID ] ;
@@ -189,7 +302,11 @@ export class BoardDiscovery extends CoreClientAware {
189302 this . notificationService . notifyAttachedBoardsDidChange ( event ) ;
190303 }
191304 } ) ;
192- this . boardWatchDuplex . write ( req ) ;
305+ await this . write (
306+ new BoardListWatchRequest ( ) . setInstance ( instance ) ,
307+ wrapper . stream
308+ ) ;
309+ this . watching = true ;
193310 }
194311
195312 getAttachedBoards ( state : AvailablePorts = this . state ) : Board [ ] {
@@ -210,3 +327,6 @@ export class BoardDiscovery extends CoreClientAware {
210327 return availablePorts ;
211328 }
212329}
330+ export namespace BoardDiscovery {
331+ export const StopWatchTimeout = 10_000 ;
332+ }
0 commit comments