diff --git a/.changeset/six-adults-lie.md b/.changeset/six-adults-lie.md new file mode 100644 index 0000000000000..01be49aa0abe3 --- /dev/null +++ b/.changeset/six-adults-lie.md @@ -0,0 +1,6 @@ +--- +'@rocket.chat/media-signaling': minor +'@rocket.chat/media-calls': minor +--- + +Fixes an error where Voice Calls could fail to negotiate webrtc params if both clients requested a renegotiation at the same time diff --git a/ee/packages/media-calls/src/internal/agents/CallSignalProcessor.ts b/ee/packages/media-calls/src/internal/agents/CallSignalProcessor.ts index 1d805bd29d1ca..457f5eabcaeb5 100644 --- a/ee/packages/media-calls/src/internal/agents/CallSignalProcessor.ts +++ b/ee/packages/media-calls/src/internal/agents/CallSignalProcessor.ts @@ -177,13 +177,69 @@ export class UserActorSignalProcessor { } private async processNegotiationNeeded(oldNegotiationId: string): Promise { + // Unsigned clients may not request negotiations + if (!this.signed) { + return; + } + logger.debug({ msg: 'UserActorSignalProcessor.processNegotiationNeeded', oldNegotiationId }); const negotiation = await MediaCallNegotiations.findLatestByCallId(this.callId); - // If the negotiation that triggered a request for renegotiation is not the latest negotiation, then a new one must already be happening and we can ignore this request. - if (negotiation?._id !== oldNegotiationId) { + + // If the call doesn't even have an initial negotiation yet, the clients shouldn't be requesting new ones. + if (!negotiation) { return; } + // If the latest negotiation has an answer, we can accept any request + if (negotiation.answer) { + return this.startNewNegotiation(); + } + + const comingFromLatest = oldNegotiationId === negotiation._id; + const isRequestImpolite = this.role === 'caller'; + const isLatestImpolite = negotiation.offerer === 'caller'; + + // If the request came from a client who was not yet aware of a newer renegotiation + if (!comingFromLatest) { + // If the client is polite, we can ignore their request in favor of the existing renegotiation + if (!isRequestImpolite) { + logger.debug({ msg: 'Ignoring outdated polite renegotiation request' }); + return; + } + + // If the latest negotiation is impolite and the impolite client is not aware of it yet, this must be a duplicate request + if (isLatestImpolite) { + // If we already received an offer in this situation then something is very wrong (some proxy interfering with signals, perhaps?) + if (negotiation.offer) { + logger.error({ msg: 'Invalid renegotiation request', requestedBy: this.role, isLatestImpolite }); + return; + } + + // Resend the offer request to the impolite client + return this.requestWebRTCOffer({ negotiationId: negotiation._id }); + } + + // The state of polite negotiations is irrelevant for impolite requests, so we can start a new negotiation here. + return this.startNewNegotiation(); + } + + // The client is up-to-date and requested a renegotiation before the last one was complete + // If the request came from the same side as the last negotiation, the client was in no position to request it + if (this.role === negotiation.offerer) { + logger.error({ msg: 'Invalid state for renegotiation request', requestedBy: this.role, isLatestImpolite }); + return; + } + + // If the request is from the impolite client, it takes priority over the existing polite negotiation + if (isRequestImpolite) { + return this.startNewNegotiation(); + } + + // It's a polite negotiation requested while an impolite one was not yet complete + logger.error({ msg: 'Invalid state for renegotiation request', requestedBy: this.role, isLatestImpolite }); + } + + private async startNewNegotiation(): Promise { const negotiationId = await mediaCallDirector.startNewNegotiation(this.call, this.role); if (negotiationId) { await this.requestWebRTCOffer({ negotiationId }); diff --git a/packages/media-signaling/src/definition/client.ts b/packages/media-signaling/src/definition/client.ts index 9b2ac8005406e..8a3168db5342d 100644 --- a/packages/media-signaling/src/definition/client.ts +++ b/packages/media-signaling/src/definition/client.ts @@ -4,12 +4,8 @@ export type ClientState = | 'accepting' // The client tried to accept the call and is wating for confirmation from the server | 'accepted' // The call was accepted, but the client doesn't have a webrtc offer yet | 'busy-elsewhere' // The call is happening in a different session/client - | 'has-offer' // The call was accepted and the client has a webrtc offer - | 'has-answer' // The call was accepted and the client has a webrtc offer and answer | 'active' // The webrtc call was established | 'renegotiating' // the webrtc call was established but the client is starting a new negotiation - | 'has-new-offer' // The webrtc call was established but the client has an offer for a new negotiation - | 'has-new-answer' // The webrtc call was established but the client is finishing a renegotiation | 'hangup'; // The call is over, or happening in some other client export type ClientContractState = diff --git a/packages/media-signaling/src/definition/services/IServiceProcessor.ts b/packages/media-signaling/src/definition/services/IServiceProcessor.ts index a20f6639ff3b0..17fb64420acd7 100644 --- a/packages/media-signaling/src/definition/services/IServiceProcessor.ts +++ b/packages/media-signaling/src/definition/services/IServiceProcessor.ts @@ -14,11 +14,13 @@ export type ServiceStateValue = { internalStateChange: keyof ServiceStateMap; internalError: { critical: boolean; error: string | Error; errorDetails?: string }; - negotiationNeeded: void; }; -export interface IServiceProcessor { - emitter: Emitter>; +export interface IServiceProcessor< + ServiceStateMap extends DefaultServiceStateMap = DefaultServiceStateMap, + ServiceUniqueEvents = Record, +> { + emitter: Emitter & ServiceUniqueEvents>; getInternalState(stateName: K): ServiceStateValue; } diff --git a/packages/media-signaling/src/definition/services/index.ts b/packages/media-signaling/src/definition/services/index.ts index 587361d596cb9..9ac4f49771820 100644 --- a/packages/media-signaling/src/definition/services/index.ts +++ b/packages/media-signaling/src/definition/services/index.ts @@ -1,3 +1,4 @@ export * from './webrtc/IWebRTCProcessor'; export * from './IServiceProcessorFactoryList'; export * from './MediaStreamFactory'; +export * from './negotiation'; diff --git a/packages/media-signaling/src/definition/services/negotiation.ts b/packages/media-signaling/src/definition/services/negotiation.ts new file mode 100644 index 0000000000000..3749510de9051 --- /dev/null +++ b/packages/media-signaling/src/definition/services/negotiation.ts @@ -0,0 +1,30 @@ +import type { IClientMediaCall } from '../call'; +import type { IMediaSignalLogger } from '../logger'; + +export type NegotiationManagerEvents = { + 'error': { errorCode: string; negotiationId: string }; + 'local-sdp': { negotiationId: string; sdp: RTCSessionDescriptionInit }; + 'negotiation-needed': { oldNegotiationId: string }; +}; + +export type NegotiationManagerConfig = { + logger?: IMediaSignalLogger | null; +}; + +export type NegotiationEvents = { + 'error': { errorCode: string }; + 'ended': void; + 'local-sdp': { sdp: RTCSessionDescriptionInit }; +}; + +export type NegotiationData = { + negotiationId: string; + sequence: number; + isPolite: boolean; + + remoteOffer: RTCSessionDescriptionInit | null; +}; + +export interface INegotiationCompatibleMediaCall extends IClientMediaCall { + hasInputTrack(): boolean; +} diff --git a/packages/media-signaling/src/definition/services/webrtc/IWebRTCProcessor.ts b/packages/media-signaling/src/definition/services/webrtc/IWebRTCProcessor.ts index 846919baa3bc2..b1a88a3b1a124 100644 --- a/packages/media-signaling/src/definition/services/webrtc/IWebRTCProcessor.ts +++ b/packages/media-signaling/src/definition/services/webrtc/IWebRTCProcessor.ts @@ -12,9 +12,13 @@ export type WebRTCInternalStateMap = { iceUntrickler: 'waiting' | 'not-waiting' | 'timeout'; }; -export type WebRTCProcessorEvents = ServiceProcessorEvents; +export type WebRTCUniqueEvents = { + negotiationNeeded: void; +}; + +export type WebRTCProcessorEvents = ServiceProcessorEvents & WebRTCUniqueEvents; -export interface IWebRTCProcessor extends IServiceProcessor { +export interface IWebRTCProcessor extends IServiceProcessor { emitter: Emitter; muted: boolean; @@ -24,10 +28,13 @@ export interface IWebRTCProcessor extends IServiceProcessor; - startNewNegotiation(): void; - createOffer(params: { iceRestart?: boolean }): Promise<{ sdp: RTCSessionDescriptionInit }>; - createAnswer(params: { sdp: RTCSessionDescriptionInit }): Promise<{ sdp: RTCSessionDescriptionInit }>; - setRemoteAnswer(params: { sdp: RTCSessionDescriptionInit }): Promise; + createOffer(params: { iceRestart?: boolean }): Promise; + createAnswer(): Promise; + + setLocalDescription(sdp: RTCSessionDescriptionInit): Promise; + setRemoteDescription(sdp: RTCSessionDescriptionInit): Promise; + waitForIceGathering(): Promise; + getLocalDescription(): RTCSessionDescriptionInit | null; getRemoteMediaStream(): MediaStream; diff --git a/packages/media-signaling/src/definition/signals/client/local-state.ts b/packages/media-signaling/src/definition/signals/client/local-state.ts index f23213deaa166..bc9bceaacffb0 100644 --- a/packages/media-signaling/src/definition/signals/client/local-state.ts +++ b/packages/media-signaling/src/definition/signals/client/local-state.ts @@ -41,20 +41,7 @@ export const clientMediaSignalLocalStateSchema: JSONSchemaType { - this.pendingAnswerRequest = null; if (this.hidden || this.shouldIgnoreWebRTC()) { return; } @@ -835,44 +784,8 @@ export class ClientMediaCall implements IClientMediaCall { this.config.logger?.debug('ClientMediaCall.processAnswerRequest', signal); this.requireWebRTC(); - const { negotiationId } = signal; - - const iceRestart = this.currentNegotiationId !== negotiationId; - if (iceRestart) { - this.hasLocalDescription = false; - this.hasRemoteDescription = false; - this.webrtcProcessor.startNewNegotiation(); - } - this.currentNegotiationId = negotiationId; - - if (!this.hasInputTrack()) { - this.pendingAnswerRequest = signal; - this.config.logger?.debug('Delaying WebRTC Answer due to missing audio input track.'); - return; - } - - let answer: { sdp: RTCSessionDescriptionInit } | null = null; - try { - answer = await this.webrtcProcessor.createAnswer(signal); - } catch (e) { - this.config.logger?.error(e); - this.sendError({ - errorType: 'service', - errorCode: 'failed-to-create-answer', - negotiationId, - critical: true, - errorDetails: serializeError(e), - }); - throw e; - } - if (!answer) { - this.sendError({ errorType: 'service', errorCode: 'implementation-error', negotiationId, critical: true }); - return; - } - - this.hasRemoteDescription = true; - await this.deliverSdp({ ...answer, negotiationId }); + this.negotiationManager.addNegotiation(signal.negotiationId, signal.sdp); } protected sendError(error: Partial): void { @@ -906,18 +819,16 @@ export class ClientMediaCall implements IClientMediaCall { return this.processAnswerRequest(signal); } - if (signal.negotiationId !== this.currentNegotiationId) { - this.config.logger?.error('Received an answer for an unexpected negotiation.'); + if (signal.sdp.type !== 'answer') { + this.config.logger?.error('Unsupported sdp type.'); return; } - await this.webrtcProcessor.setRemoteAnswer(signal); - this.hasRemoteDescription = true; + await this.negotiationManager.setRemoteDescription(signal.negotiationId, signal.sdp); } - protected async deliverSdp(data: { sdp: RTCSessionDescriptionInit; negotiationId: string }) { + protected deliverSdp(data: { sdp: RTCSessionDescriptionInit; negotiationId: string }) { this.config.logger?.debug('ClientMediaCall.deliverSdp'); - this.hasLocalDescription = true; if (!this.hidden) { this.config.transporter.sendToServer(this.callId, 'local-sdp', data); @@ -1008,7 +919,6 @@ export class ClientMediaCall implements IClientMediaCall { this.changeState('accepted'); this.addStateTimeout('accepted', TIMEOUT_TO_PROGRESS_SIGNALING); - this.addStateTimeout('has-offer', TIMEOUT_TO_PROGRESS_SIGNALING); } private flagAsEnded(reason: CallHangupReason): void { @@ -1098,39 +1008,22 @@ export class ClientMediaCall implements IClientMediaCall { } } - private onWebRTCInternalError({ - critical, - error, - errorDetails, - }: { - critical: boolean; - error: string | Error; - errorDetails?: string; - }): void { - this.config.logger?.debug('ClientMediaCall.onWebRTCInternalError', critical, error); - const errorCode = typeof error === 'object' ? error.message : error; + private onNegotiationNeeded(oldNegotiationId: string): void { + this.config.logger?.debug('ClientMediaCall.onNegotiationNeeded', oldNegotiationId); + this.config.transporter.requestRenegotiation(this.callId, oldNegotiationId); + } + + private onNegotiationError(negotiationId: string, errorCode: string): void { + this.config.logger?.debug('ClientMediaCall.onNegotiationError', negotiationId, errorCode); this.sendError({ errorType: 'service', errorCode, - ...(this.currentNegotiationId && { negotiationId: this.currentNegotiationId }), - ...(errorDetails && { errorDetails }), - critical, + negotiationId, + critical: false, }); } - private onWebRTCNegotiationNeeded(): void { - if (this._state !== 'active' || !this.currentNegotiationId) { - this.config.logger?.warn('WebRTCProcessor requested a renegotiation while in a state that should not trigger any.', { - state: this._state, - currentNegotiationId: this.currentNegotiationId, - }); - return; - } - - this.config.transporter.requestRenegotiation(this.callId, this.currentNegotiationId); - } - private onWebRTCConnectionStateChange(stateValue: RTCPeerConnectionState): void { if (this.hidden) { return; @@ -1149,7 +1042,7 @@ export class ClientMediaCall implements IClientMediaCall { errorType: 'service', errorCode: 'connection-failed', critical: true, - negotiationId: this.currentNegotiationId || undefined, + negotiationId: this.negotiationManager.currentNegotiationId || undefined, }); this.hangup('service-error'); @@ -1161,7 +1054,7 @@ export class ClientMediaCall implements IClientMediaCall { errorType: 'service', errorCode: 'connection-closed', critical: true, - negotiationId: this.currentNegotiationId || undefined, + negotiationId: this.negotiationManager.currentNegotiationId || undefined, }); this.hangup('service-error'); @@ -1224,9 +1117,12 @@ export class ClientMediaCall implements IClientMediaCall { } this.webrtcProcessor = webrtcFactory({ logger, iceGatheringTimeout, call: this, inputTrack: this.inputTrack }); - this.webrtcProcessor.emitter.on('internalError', (event) => this.onWebRTCInternalError(event)); this.webrtcProcessor.emitter.on('internalStateChange', (stateName) => this.onWebRTCInternalStateChange(stateName)); - this.webrtcProcessor.emitter.on('negotiationNeeded', () => this.onWebRTCNegotiationNeeded()); + + this.negotiationManager.emitter.on('local-sdp', ({ sdp, negotiationId }) => this.deliverSdp({ sdp, negotiationId })); + this.negotiationManager.emitter.on('negotiation-needed', ({ oldNegotiationId }) => this.onNegotiationNeeded(oldNegotiationId)); + this.negotiationManager.emitter.on('error', ({ errorCode, negotiationId }) => this.onNegotiationError(negotiationId, errorCode)); + this.negotiationManager.setWebRTCProcessor(this.webrtcProcessor); } private requireWebRTC(): asserts this is ClientMediaCallWebRTC { diff --git a/packages/media-signaling/src/lib/NegotiationManager.ts b/packages/media-signaling/src/lib/NegotiationManager.ts new file mode 100644 index 0000000000000..6e4cac8b08b64 --- /dev/null +++ b/packages/media-signaling/src/lib/NegotiationManager.ts @@ -0,0 +1,293 @@ +import { Emitter } from '@rocket.chat/emitter'; + +import type { INegotiationCompatibleMediaCall, IWebRTCProcessor, NegotiationManagerEvents, NegotiationManagerConfig } from '../definition'; +import { Negotiation } from './services/webrtc/Negotiation'; + +export class NegotiationManager { + public readonly emitter: Emitter; + + public get currentNegotiationId(): string | null { + return this.currentNegotiation?.negotiationId || this.highestNegotiationId; + } + + protected negotiations: Map; + + /** negotiation actively being processed, null once completed */ + protected currentNegotiation: Negotiation | null; + + protected highestProcessedSequence: number; + + protected highestImpoliteSequence: number; + + protected highestSequence: number; + + protected webrtcProcessor: IWebRTCProcessor | null; + + /** id of the newest negotiation that has reached the processing state */ + protected highestNegotiationId: string | null; + + /** id of the newest negotiation, regardless of state */ + protected highestKnownNegotiationId: string | null; + + constructor( + protected readonly call: INegotiationCompatibleMediaCall, + protected readonly config: NegotiationManagerConfig, + ) { + this.negotiations = new Map(); + this.currentNegotiation = null; + this.highestProcessedSequence = 0; + this.highestImpoliteSequence = 0; + this.highestSequence = 0; + this.webrtcProcessor = null; + this.highestNegotiationId = null; + this.highestKnownNegotiationId = null; + + this.emitter = new Emitter(); + } + + public async addNegotiation( + negotiationId: string, + remoteOffer: RTCSessionDescriptionInit | null = null, + negotiationSequence: number | null = null, + ): Promise { + if (this.negotiations.has(negotiationId)) { + return; + } + + if (remoteOffer && remoteOffer.type !== 'offer') { + return; + } + + // If we are not receiving the negotiation sequence, trust that they are arriving in order. + const sequence = negotiationSequence || this.highestSequence + 1; + const isRemoteOffer = Boolean(remoteOffer); + + this.config.logger?.debug('NegotiationManager.addNegotiation', negotiationId, sequence, isRemoteOffer); + + const isPoliteNegotiation = isRemoteOffer !== this.isPoliteClient(); + const maxSkipSequence = isPoliteNegotiation ? this.highestSequence : this.highestImpoliteSequence; + + const negotiation = new Negotiation( + { + negotiationId, + sequence, + isPolite: isPoliteNegotiation, + remoteOffer, + }, + this.config.logger, + ); + + if (sequence <= maxSkipSequence) { + negotiation.end(); + } + + this.addToQueue(negotiation); + + return this.processNegotiations(); + } + + public async setRemoteDescription( + negotiationId: string, + remoteDescription: RTCSessionDescriptionInit, + negotiationSequence: number | null = null, + ): Promise { + if (remoteDescription.type === 'offer') { + return this.addNegotiation(negotiationId, remoteDescription, negotiationSequence); + } + + this.config.logger?.debug('NegotiationManager.setRemoteDescription', negotiationId); + + if (this.currentNegotiation?.negotiationId !== negotiationId) { + this.config.logger?.warn('Received remote description for an unexpected negotiation'); + this.emitter.emit('error', { errorCode: 'not-current-negotiation', negotiationId }); + return; + } + + try { + return this.currentNegotiation.setRemoteAnswer(remoteDescription); + } catch (e) { + this.config.logger?.error(e); + this.currentNegotiation = null; + this.emitter.emit('error', { errorCode: 'failed-to-set-remote-answer', negotiationId }); + } + } + + public setWebRTCProcessor(webrtcProcessor: IWebRTCProcessor) { + this.config.logger?.debug('NegotiationManager.setWebRTCProcessor'); + this.webrtcProcessor = webrtcProcessor; + + this.webrtcProcessor.emitter.on('internalError', (event) => this.onWebRTCInternalError(event)); + this.webrtcProcessor.emitter.on('negotiationNeeded', () => this.onWebRTCNegotiationNeeded()); + } + + public async processNegotiations(): Promise { + this.config.logger?.debug('NegotiationManager.processNegotiations'); + if (!this.isConfigured()) { + return; + } + + if (this.currentNegotiation) { + return; + } + + const nextNegotiation = this.getNextInQueue(); + if (!nextNegotiation) { + return; + } + + await this.processNegotiation(nextNegotiation); + } + + protected isPoliteClient(): boolean { + return this.call.role === 'callee'; + } + + protected addToQueue(negotiation: Negotiation): void { + this.config.logger?.debug('NegotiationManager.addToQueue', negotiation.negotiationId); + + if (negotiation.sequence > this.highestSequence) { + this.highestSequence = negotiation.sequence; + this.highestKnownNegotiationId = negotiation.negotiationId; + } + this.negotiations.set(negotiation.negotiationId, negotiation); + + if (!negotiation.ended) { + if (!negotiation.isPolite) { + if (negotiation.sequence > this.highestImpoliteSequence) { + this.highestImpoliteSequence = negotiation.sequence; + } + + if (this.currentNegotiation?.isPolite) { + this.currentNegotiation.end(); + this.currentNegotiation = null; + } + } + } + } + + protected getNextInQueue(): Negotiation | null { + for (const negotiation of this.negotiations.values()) { + // Skip negotiations that have already started processing or been skipped + if (negotiation.ended || negotiation.started || negotiation.sequence <= this.highestProcessedSequence) { + continue; + } + + // Skip negotiations that can be fulfilled by some newer negotiation + if (negotiation.sequence < this.highestImpoliteSequence) { + negotiation.end(); + continue; + } + + // Polite negotiations are only processed if there's nothing else queued + if (negotiation.isPolite && negotiation.sequence < this.highestSequence) { + negotiation.end(); + continue; + } + + return negotiation; + } + + return null; + } + + protected async processNegotiation(this: WebRTCNegotiationManager, negotiation: Negotiation): Promise { + this.config.logger?.debug('NegotiationManager.processNegotiation', negotiation.negotiationId); + + this.currentNegotiation = negotiation; + this.highestProcessedSequence = negotiation.sequence; + this.highestNegotiationId = negotiation.negotiationId; + + negotiation.emitter.on('ended', () => { + if (this.currentNegotiation !== negotiation) { + return; + } + + this.config.logger?.debug('NegotiationManager.processNegotiation.ended'); + this.currentNegotiation = null; + void this.processNegotiations(); + }); + + negotiation.emitter.on('error', ({ errorCode }) => { + this.config.logger?.error('Negotiation error', errorCode); + this.emitter.emit('error', { errorCode, negotiationId: negotiation.negotiationId }); + }); + + negotiation.emitter.on('local-sdp', ({ sdp }) => { + this.config.logger?.debug('NegotiationManager.processNegotiation.local-sdp'); + this.emitter.emit('local-sdp', { sdp, negotiationId: negotiation.negotiationId }); + }); + + try { + return negotiation.process(this.webrtcProcessor); + } catch (e) { + this.config.logger?.error(e); + this.currentNegotiation = null; + this.emitter.emit('error', { errorCode: 'failed-to-process-negotiation', negotiationId: negotiation.negotiationId }); + } + } + + protected isConfigured(): this is WebRTCNegotiationManager { + if (this.call.state === 'hangup' || this.call.hidden) { + this.config.logger?.debug('Ignoring WebRTC negotiations due to call state.'); + return false; + } + + if (!this.webrtcProcessor) { + this.config.logger?.debug('Delaying WebRTC negotiations due to missing processor.'); + return false; + } + + // Wait for the input track before negotiating, to avoid potentially having to renegotiate immediately + if (!this.call.hasInputTrack()) { + this.config.logger?.debug('Delaying WebRTC negotiations due to missing input track.'); + return false; + } + + return true; + } + + protected isFulfillingNegotiationQueued(): boolean { + // If we're a polite client, then any queued negotiation is enough to fulfill our negotiation needs + if (this.isPoliteClient()) { + return this.highestSequence > this.highestProcessedSequence; + } + + // If there's an impolite negotiation queued, that's good enough for any client + return this.highestImpoliteSequence > this.highestProcessedSequence; + } + + protected onWebRTCNegotiationNeeded(): void { + this.config.logger?.debug('NegotiationManager.onWebRTCNegotiationNeeded'); + if (!this.isConfigured()) { + return; + } + + // If we haven't processed any negotiation yet, then we can ignore any negotiation request + if (!this.highestNegotiationId || !this.highestKnownNegotiationId) { + return; + } + + // If we already have a queued negotiation that would fulfill this need, then don't do anything + if (this.isFulfillingNegotiationQueued()) { + return; + } + + // When requesting a renegotiation, always use the newest negotiation id we know that doesn't fulfill our need + this.emitter.emit('negotiation-needed', { oldNegotiationId: this.highestKnownNegotiationId }); + } + + protected onWebRTCInternalError({ critical, error }: { critical: boolean; error: string | Error; errorDetails?: string }): void { + this.config.logger?.debug('NegotiationManager.onWebRTCInternalError', critical, error); + const errorCode = typeof error === 'object' ? error.message : error; + + const negotiationId = this.currentNegotiationId; + + if (negotiationId) { + this.emitter.emit('error', { errorCode, negotiationId }); + } + } +} + +abstract class WebRTCNegotiationManager extends NegotiationManager { + protected abstract webrtcProcessor: IWebRTCProcessor; +} diff --git a/packages/media-signaling/src/lib/services/webrtc/Negotiation.ts b/packages/media-signaling/src/lib/services/webrtc/Negotiation.ts new file mode 100644 index 0000000000000..c1e7a7cde3e79 --- /dev/null +++ b/packages/media-signaling/src/lib/services/webrtc/Negotiation.ts @@ -0,0 +1,167 @@ +import { Emitter } from '@rocket.chat/emitter'; + +import type { IMediaSignalLogger, IWebRTCProcessor, NegotiationData, NegotiationEvents } from '../../../definition'; + +export class Negotiation { + public readonly emitter: Emitter; + + public get started() { + return this._startedProcessing; + } + + /** Returns true when the negotiation will no longer process anything, no matter the reason */ + public get ended() { + return this._ended; + } + + public get isLocal(): boolean { + return !this.remoteOffer; + } + + public readonly negotiationId: string; + + public readonly sequence: number; + + public readonly isPolite: boolean; + + protected webrtcProcessor: IWebRTCProcessor | null; + + protected remoteOffer: RTCSessionDescriptionInit | null; + + protected _ended: boolean; + + protected _startedProcessing: boolean; + + protected _failed: boolean; + + constructor( + negotiation: NegotiationData, + protected readonly logger?: IMediaSignalLogger | null, + ) { + this.webrtcProcessor = null; + this._startedProcessing = false; + this._ended = false; + this._failed = false; + this.negotiationId = negotiation.negotiationId; + this.sequence = negotiation.sequence; + this.isPolite = negotiation.isPolite; + this.remoteOffer = negotiation.remoteOffer; + + this.emitter = new Emitter(); + } + + public end(): void { + if (this._ended) { + return; + } + + this.logger?.debug('Negotiation.end', this.negotiationId); + this._ended = true; + this.emitter.emit('ended'); + } + + public async process(webrtcProcessor: IWebRTCProcessor): Promise { + if (this._startedProcessing) { + return; + } + this.logger?.debug('Negotiation.process', this.negotiationId); + + this.setWebRTCProcessor(webrtcProcessor); + this._startedProcessing = true; + + if (this.remoteOffer) { + await this.createLocalAnswer(this.remoteOffer); + return; + } + + // after creating the local offer, this negotiation will remain active until it receives an answer + await this.createLocalOffer(); + } + + public async setRemoteAnswer(sdp: RTCSessionDescriptionInit): Promise { + if (!this.webrtcProcessor) { + return; + } + + this.logger?.debug('Negotiation.setRemoteAnswer', this.negotiationId); + + if (!this.isLocal || !this._startedProcessing || sdp.type !== 'answer') { + this.logger?.warn('Invalid negotiation workflow'); + return; + } + + await this.webrtcProcessor.setRemoteDescription(sdp); + // Local negotiations end when the remote description is available + this.end(); + } + + protected async setLocalDescription(this: WebRTCNegotiation, sdp: RTCSessionDescriptionInit): Promise { + this.logger?.debug('Negotiation.setLocalDescription', this.negotiationId); + + this.assertNegotiationIsActive(); + await this.webrtcProcessor.setLocalDescription(sdp); + + this.assertNegotiationIsActive(); + await this.webrtcProcessor.waitForIceGathering(); + + this.assertNegotiationIsActive(); + const localDescription = this.webrtcProcessor.getLocalDescription(); + if (!localDescription) { + this.fail('implementation-error'); + return; + } + + this.emitter.emit('local-sdp', { sdp: localDescription }); + + // Remote negotiations end when the local description is available + if (!this.isLocal) { + this.end(); + } + } + + protected setWebRTCProcessor(webrtcProcessor: IWebRTCProcessor): asserts this is WebRTCNegotiation { + this.webrtcProcessor = webrtcProcessor; + } + + protected assertNegotiationIsActive(): void { + if (this._ended) { + this.fail('skipped-negotiation'); + throw new Error('Skipped Negotiation'); + } + } + + protected async createLocalOffer(this: WebRTCNegotiation): Promise { + this.logger?.debug('Negotiation.createLocalOffer', this.negotiationId); + this.assertNegotiationIsActive(); + + const earlyOffer = await this.webrtcProcessor.createOffer({}); + + await this.setLocalDescription(earlyOffer); + } + + protected async createLocalAnswer(this: WebRTCNegotiation, remoteOffer: RTCSessionDescriptionInit): Promise { + this.logger?.debug('Negotiation.createLocalAnswer', this.negotiationId); + this.assertNegotiationIsActive(); + await this.webrtcProcessor.setRemoteDescription(remoteOffer); + + this.assertNegotiationIsActive(); + const earlyAnswer = await this.webrtcProcessor.createAnswer(); + + this.assertNegotiationIsActive(); + await this.setLocalDescription(earlyAnswer); + } + + protected fail(errorCode: string): void { + if (this._failed || this._ended) { + return; + } + + this.emitter.emit('error', { errorCode }); + + this._failed = true; + } +} + +export abstract class WebRTCNegotiation extends Negotiation { + protected abstract webrtcProcessor: IWebRTCProcessor; +} diff --git a/packages/media-signaling/src/lib/services/webrtc/Processor.ts b/packages/media-signaling/src/lib/services/webrtc/Processor.ts index 0606cf8b37b29..0841262434fe0 100644 --- a/packages/media-signaling/src/lib/services/webrtc/Processor.ts +++ b/packages/media-signaling/src/lib/services/webrtc/Processor.ts @@ -11,16 +11,12 @@ export class MediaCallWebRTCProcessor implements IWebRTCProcessor { private peer: RTCPeerConnection; - private iceGatheringFinished = false; - private iceGatheringTimedOut = false; private localStream: LocalStream; private localMediaStream: MediaStream; - private localMediaStreamInitialized = false; - private remoteStream: RemoteStream; private remoteMediaStream: MediaStream; @@ -45,8 +41,6 @@ export class MediaCallWebRTCProcessor implements IWebRTCProcessor { private iceCandidateCount = 0; - private lastSetLocalDescription: string | null = null; - private addedEmptyTransceiver = false; private _audioLevelTracker: ReturnType | null; @@ -59,6 +53,8 @@ export class MediaCallWebRTCProcessor implements IWebRTCProcessor { private _localAudioLevel: number; + private initialization: Promise; + public get localAudioLevel(): number { return this._localAudioLevel; } @@ -81,6 +77,11 @@ export class MediaCallWebRTCProcessor implements IWebRTCProcessor { this.registerPeerEvents(); this.registerAudioLevelTracker(); + + this.initialization = this.initialize().catch((e) => { + config.logger?.error('MediaCallWebRTCProcessor.initialization error', e); + this.stop(); + }); } public getRemoteMediaStream() { @@ -93,16 +94,19 @@ export class MediaCallWebRTCProcessor implements IWebRTCProcessor { throw new Error('Unsupported track kind'); } + await this.initialization; + this.inputTrack = newInputTrack; await this.loadInputTrack(); } - public async createOffer({ iceRestart }: { iceRestart?: boolean }): Promise<{ sdp: RTCSessionDescriptionInit }> { + public async createOffer({ iceRestart }: { iceRestart?: boolean }): Promise { this.config.logger?.debug('MediaCallWebRTCProcessor.createOffer'); if (this.stopped) { throw new Error('WebRTC Processor has already been stopped.'); } - await this.initializeLocalMediaStream(); + + await this.initialization; if (!this.addedEmptyTransceiver) { // If there's no audio transceivers yet, add a new one; since it's an offer, the track can be set later @@ -120,15 +124,7 @@ export class MediaCallWebRTCProcessor implements IWebRTCProcessor { this.restartIce(); } - const offer = await this.peer.createOffer(); - if (this.lastSetLocalDescription && offer.sdp !== this.lastSetLocalDescription && !iceRestart) { - this.startNewNegotiation(); - } - - this.lastSetLocalDescription = offer.sdp || null; - await this.peer.setLocalDescription(offer); - - return this.getLocalDescription(); + return this.peer.createOffer({}); } public setMuted(muted: boolean): void { @@ -162,25 +158,16 @@ export class MediaCallWebRTCProcessor implements IWebRTCProcessor { this.peer.close(); } - public startNewNegotiation(): void { - this.iceGatheringFinished = false; - this.clearIceGatheringWaiters(new Error('new-negotiation')); - this.iceCandidateCount = 0; - } - - public async createAnswer({ sdp }: { sdp: RTCSessionDescriptionInit }): Promise<{ sdp: RTCSessionDescriptionInit }> { + public async createAnswer(): Promise { this.config.logger?.debug('MediaCallWebRTCProcessor.createAnswer'); if (this.stopped) { throw new Error('WebRTC Processor has already been stopped.'); } - if (sdp.type !== 'offer') { - throw new Error('invalid-webrtc-offer'); - } if (!this.inputTrack) { throw new Error('no-input-track'); } - await this.initializeLocalMediaStream(); + await this.initialization; const transceivers = this.peer .getTransceivers() @@ -190,27 +177,34 @@ export class MediaCallWebRTCProcessor implements IWebRTCProcessor { throw new Error('no-audio-transceiver'); } - if (this.peer.remoteDescription?.sdp !== sdp.sdp) { - this.startNewNegotiation(); - await this.peer.setRemoteDescription(sdp); + return this.peer.createAnswer(); + } + + public async setLocalDescription(sdp: RTCSessionDescriptionInit): Promise { + this.config.logger?.debug('MediaCallWebRTCProcessor.setLocalDescription'); + if (this.stopped) { + return; } - const answer = await this.peer.createAnswer(); + await this.initialization; - this.lastSetLocalDescription = answer.sdp || null; - await this.peer.setLocalDescription(answer); + if (!['offer', 'answer'].includes(sdp.type)) { + throw new Error('unsupported-description-type'); + } - return this.getLocalDescription(); + await this.peer.setLocalDescription(sdp); } - public async setRemoteAnswer({ sdp }: { sdp: RTCSessionDescriptionInit }): Promise { - this.config.logger?.debug('MediaCallWebRTCProcessor.setRemoteAnswer'); + public async setRemoteDescription(sdp: RTCSessionDescriptionInit): Promise { + this.config.logger?.debug('MediaCallWebRTCProcessor.setRemoteDescription'); if (this.stopped) { return; } - if (sdp.type === 'offer') { - throw new Error('invalid-answer'); + await this.initialization; + + if (!['offer', 'answer'].includes(sdp.type)) { + throw new Error('unsupported-description-type'); } await this.peer.setRemoteDescription(sdp); @@ -239,43 +233,34 @@ export class MediaCallWebRTCProcessor implements IWebRTCProcessor { return null; } + await this.initialization; + return this.peer.getStats(selector); } - private changeInternalState(stateName: keyof WebRTCInternalStateMap): void { - this.config.logger?.debug('MediaCallWebRTCProcessor.changeInternalState', stateName); - this.emitter.emit('internalStateChange', stateName); + public isStable(): boolean { + if (this.stopped) { + return false; + } + + return this.peer.signalingState === 'stable'; } - private async getLocalDescription(): Promise<{ sdp: RTCSessionDescriptionInit }> { + public getLocalDescription(): RTCSessionDescriptionInit | null { this.config.logger?.debug('MediaCallWebRTCProcessor.getLocalDescription'); if (this.stopped) { throw new Error('WebRTC Processor has already been stopped.'); } - await this.waitForIceGathering(); - const sdp = this.peer.localDescription; - - if (!sdp) { - throw new Error('no-local-sdp'); - } - - this.config.logger?.debug('MediaCallWebRTCProcessor.getLocalDescription - ice candidates: ', this.iceCandidateCount); - // If we don't have any ice candidate, trigger a service error. - if (this.iceCandidateCount === 0) { - this.emitter.emit('internalError', { critical: true, error: 'no-ice-candidates' }); - } - - return { - sdp, - }; + return this.peer.localDescription; } - private async waitForIceGathering(): Promise { - this.config.logger?.debug('MediaCallWebRTCProcessor.waitForIceGathering'); - if (this.iceGatheringFinished || this.stopped) { + public async waitForIceGathering(): Promise { + if (this.stopped || this.peer.iceGatheringState === 'complete') { return; } + this.config.logger?.debug('MediaCallWebRTCProcessor.waitForIceGathering'); + await this.initialization; this.iceGatheringTimedOut = false; const iceGatheringData = getExternalWaiter({ @@ -295,10 +280,22 @@ export class MediaCallWebRTCProcessor implements IWebRTCProcessor { this.iceGatheringWaiters.add(iceGatheringData); this.changeInternalState('iceUntrickler'); await iceGatheringData.promise; + } - // always wait a little extra to ensure all relevant events have been fired - // 30ms is low enough that it won't be noticeable by users, but is also enough time to process any local stuff - await new Promise((resolve) => setTimeout(resolve, 30)); + private async initialize(): Promise { + if (this.inputTrack) { + await this.loadInputTrack(); + } + } + + private startNewGathering(): void { + this.clearIceGatheringWaiters(new Error('gathering-restarted')); + this.iceCandidateCount = 0; + } + + private changeInternalState(stateName: keyof WebRTCInternalStateMap): void { + this.config.logger?.debug('MediaCallWebRTCProcessor.changeInternalState', stateName); + this.emitter.emit('internalStateChange', stateName); } private registerPeerEvents() { @@ -378,8 +375,7 @@ export class MediaCallWebRTCProcessor implements IWebRTCProcessor { private restartIce() { this.config.logger?.debug('MediaCallWebRTCProcessor.restartIce'); - this.startNewNegotiation(); - + this.startNewGathering(); this.peer.restartIce(); } @@ -403,7 +399,7 @@ export class MediaCallWebRTCProcessor implements IWebRTCProcessor { } private onNegotiationNeeded() { - if (this.stopped) { + if (this.stopped || this.peer.signalingState !== 'stable') { return; } this.config.logger?.debug('MediaCallWebRTCProcessor.onNegotiationNeeded'); @@ -449,33 +445,27 @@ export class MediaCallWebRTCProcessor implements IWebRTCProcessor { return; } - this.config.logger?.debug('MediaCallWebRTCProcessor.onIceGatheringStateChange'); + const state = this.peer.iceGatheringState; - if (this.peer.iceGatheringState === 'complete') { - this.onIceGatheringComplete(); + this.config.logger?.debug('MediaCallWebRTCProcessor.onIceGatheringStateChange', state); + if (state === 'gathering') { + this.iceCandidateCount = 0; } - this.changeInternalState('iceGathering'); - } - - private async initializeLocalMediaStream(): Promise { - if (this.localMediaStreamInitialized) { - return; + if (state === 'complete') { + this.onIceGatheringComplete(); } - this.config.logger?.debug('MediaCallWebRTCProcessor.initializeLocalMediaStream'); - await this.loadInputTrack(); + this.changeInternalState('iceGathering'); } private async loadInputTrack(): Promise { this.config.logger?.debug('MediaCallWebRTCProcessor.loadInputTrack'); - this.localMediaStreamInitialized = true; await this.localStream.setTrack(this.inputTrack); } private onIceGatheringComplete() { this.config.logger?.debug('MediaCallWebRTCProcessor.onIceGatheringComplete'); - this.iceGatheringFinished = true; this.clearIceGatheringWaiters(); }