diff --git a/docs/docs/api/Client.md b/docs/docs/api/Client.md index fdee5ea7026..e0d41b47805 100644 --- a/docs/docs/api/Client.md +++ b/docs/docs/api/Client.md @@ -27,6 +27,7 @@ Returns: `Client` * **maxHeaderSize** `number | null` (optional) - Default: `--max-http-header-size` or `16384` - The maximum length of request headers in bytes. Defaults to Node.js' --max-http-header-size or 16KiB. * **maxResponseSize** `number | null` (optional) - Default: `-1` - The maximum length of response body in bytes. Set to `-1` to disable. * **webSocket** `WebSocketOptions` (optional) - WebSocket-specific configuration options. + * **maxFragments** `number` (optional) - Default: `131072` - Maximum number of fragments in a message. Set to 0 to disable the limit. * **maxPayloadSize** `number` (optional) - Default: `134217728` (128 MB) - Maximum allowed payload size in bytes for WebSocket messages. Applied to uncompressed messages, compressed frame payloads, and decompressed (permessage-deflate) messages. Set to 0 to disable the limit. * **pipelining** `number | null` (optional) - Default: `1` - The amount of concurrent requests to be sent over the single TCP/TLS connection according to [RFC7230](https://tools.ietf.org/html/rfc7230#section-6.3.2). Carefully consider your workload and environment before enabling concurrent requests as pipelining may reduce performance if used incorrectly. Pipelining is sensitive to network stack settings as well as head of line blocking caused by e.g. long running requests. Set to `0` to disable keep-alive connections. * **connect** `ConnectOptions | Function | null` (optional) - Default: `null`. diff --git a/lib/dispatcher/dispatcher-base.js b/lib/dispatcher/dispatcher-base.js index c999b2c2fb6..371a3ea1f6f 100644 --- a/lib/dispatcher/dispatcher-base.js +++ b/lib/dispatcher/dispatcher-base.js @@ -26,6 +26,7 @@ class DispatcherBase extends Dispatcher { get webSocketOptions () { return { + maxFragments: this[kWebSocketOptions].maxFragments ?? 131072, maxPayloadSize: this[kWebSocketOptions].maxPayloadSize ?? 128 * 1024 * 1024 } } diff --git a/lib/web/websocket/receiver.js b/lib/web/websocket/receiver.js index 53e427eb2e4..a7dea7fae1c 100644 --- a/lib/web/websocket/receiver.js +++ b/lib/web/websocket/receiver.js @@ -20,6 +20,11 @@ const { closeWebSocketConnection } = require('./connection') const { PerMessageDeflate } = require('./permessage-deflate') const { MessageSizeExceededError } = require('../../core/errors') +function failWebsocketConnectionWithCode (ws, code, reason) { + closeWebSocketConnection(ws, code, reason, Buffer.byteLength(reason)) + failWebsocketConnection(ws, reason) +} + // This code was influenced by ws released under the MIT license. // Copyright (c) 2011 Einar Otto Stangvik // Copyright (c) 2013 Arnout Kazemier and contributors @@ -39,19 +44,23 @@ class ByteParser extends Writable { /** @type {Map} */ #extensions + /** @type {number} */ + #maxFragments + /** @type {number} */ #maxPayloadSize /** * @param {import('./websocket').WebSocket} ws * @param {Map|null} extensions - * @param {{ maxPayloadSize?: number }} [options] + * @param {{ maxFragments?: number, maxPayloadSize?: number }} [options] */ constructor (ws, extensions, options = {}) { super() this.ws = ws this.#extensions = extensions == null ? new Map() : extensions + this.#maxFragments = options.maxFragments ?? 0 this.#maxPayloadSize = options.maxPayloadSize ?? 0 if (this.#extensions.has('permessage-deflate')) { @@ -75,9 +84,9 @@ class ByteParser extends Writable { if ( this.#maxPayloadSize > 0 && !isControlFrame(this.#info.opcode) && - this.#info.payloadLength > this.#maxPayloadSize + this.#info.payloadLength + this.#fragmentsBytes > this.#maxPayloadSize ) { - failWebsocketConnection(this.ws, 'Payload size exceeds maximum allowed size') + failWebsocketConnectionWithCode(this.ws, 1009, 'Payload size exceeds maximum allowed size') return false } @@ -242,10 +251,12 @@ class ByteParser extends Writable { this.#state = parserStates.INFO } else { if (!this.#info.compressed) { - this.writeFragments(body) + if (!this.writeFragments(body)) { + return + } if (this.#maxPayloadSize > 0 && this.#fragmentsBytes > this.#maxPayloadSize) { - failWebsocketConnection(this.ws, new MessageSizeExceededError().message) + failWebsocketConnectionWithCode(this.ws, 1009, new MessageSizeExceededError().message) return } @@ -264,14 +275,17 @@ class ByteParser extends Writable { this.#info.fin, (error, data) => { if (error) { - failWebsocketConnection(this.ws, error.message) + const code = error instanceof MessageSizeExceededError ? 1009 : 1007 + failWebsocketConnectionWithCode(this.ws, code, error.message) return } - this.writeFragments(data) + if (!this.writeFragments(data)) { + return + } if (this.#maxPayloadSize > 0 && this.#fragmentsBytes > this.#maxPayloadSize) { - failWebsocketConnection(this.ws, new MessageSizeExceededError().message) + failWebsocketConnectionWithCode(this.ws, 1009, new MessageSizeExceededError().message) return } @@ -341,8 +355,17 @@ class ByteParser extends Writable { } writeFragments (fragment) { + if ( + this.#maxFragments > 0 && + this.#fragments.length === this.#maxFragments + ) { + failWebsocketConnectionWithCode(this.ws, 1008, 'Too many message fragments') + return false + } + this.#fragmentsBytes += fragment.length this.#fragments.push(fragment) + return true } consumeFragments () { diff --git a/lib/web/websocket/websocket.js b/lib/web/websocket/websocket.js index ccedb792169..80991e96a2e 100644 --- a/lib/web/websocket/websocket.js +++ b/lib/web/websocket/websocket.js @@ -435,9 +435,12 @@ class WebSocket extends EventTarget { // once this happens, the connection is open this[kResponse] = response - const maxPayloadSize = this[kController]?.dispatcher?.webSocketOptions?.maxPayloadSize + const webSocketOptions = this[kController]?.dispatcher?.webSocketOptions + const maxFragments = webSocketOptions?.maxFragments + const maxPayloadSize = webSocketOptions?.maxPayloadSize const parser = new ByteParser(this, parsedExtensions, { + maxFragments, maxPayloadSize }) parser.on('drain', onParserDrain) diff --git a/test/websocket/fragments.js b/test/websocket/fragments.js index a4a168fe7cb..5c57cd3ead3 100644 --- a/test/websocket/fragments.js +++ b/test/websocket/fragments.js @@ -3,7 +3,7 @@ const assert = require('node:assert') const { test, after } = require('node:test') const { WebSocketServer } = require('ws') -const { WebSocket } = require('../..') +const { WebSocket, Agent } = require('../..') const diagnosticsChannel = require('node:diagnostics_channel') test('Fragmented frame with a ping frame in the middle of it', () => { @@ -40,3 +40,184 @@ test('Fragmented frame with a ping frame in the middle of it', () => { }) }) }) + +test('Too many fragments (uncompressed)', (t, done) => { + function maybeDone () { + if (++maybeDone.callCount === 2) { + agent.close() + server.close(done) + } + } + + maybeDone.callCount = 0 + + const agent = new Agent({ + webSocket: { + maxFragments: 3 + } + }) + + const server = new WebSocketServer({ port: 0 }, () => { + const { port } = server.address() + const client = new WebSocket(`ws://127.0.0.1:${port}`, { + dispatcher: agent + }) + + client.addEventListener('error', () => { + assert.ok(true) + }) + + client.addEventListener('close', (event) => { + assert.strictEqual(event.code, 1006) + maybeDone() + }) + }) + + server.on('connection', (ws) => { + ws.on('close', (code, reason) => { + assert.strictEqual(code, 1008) + assert.strictEqual(reason.toString(), 'Too many message fragments') + maybeDone() + }) + + const fragment = Buffer.from('a') + const options = { fin: false } + + ws.send(fragment, options) + ws.send(fragment, options) + ws.send(fragment, options) + ws.send(fragment, options) + }) +}) + +test('Too many fragments (compressed)', (t, done) => { + function maybeDone () { + if (++maybeDone.callCount === 2) { + agent.close() + server.close(done) + } + } + + maybeDone.callCount = 0 + + const agent = new Agent({ + webSocket: { + maxFragments: 3 + } + }) + + const server = new WebSocketServer({ + perMessageDeflate: { threshold: 0 }, + port: 0 + }, () => { + const { port } = server.address() + const client = new WebSocket(`ws://127.0.0.1:${port}`, { + dispatcher: agent + }) + + client.addEventListener('error', () => { + assert.ok(true) + }) + + client.addEventListener('close', (event) => { + assert.strictEqual(event.code, 1006) + maybeDone() + }) + }) + + server.on('connection', (ws) => { + ws.on('close', (code, reason) => { + assert.strictEqual(code, 1008) + assert.strictEqual(reason.toString(), 'Too many message fragments') + maybeDone() + }) + + const fragment = Buffer.from('a') + const options = { fin: false } + + ws.send(fragment, options) + ws.send(fragment, options) + ws.send(fragment, options) + ws.send(fragment, options) + }) +}) + +test('Empty first fragment followed by non-empty continuation delivers the message', () => { + // RFC 6455 ยง5.4 allows zero-byte fragments. A conforming server that opens + // a fragmented message with an empty frame must be honored: the parser must + // recognize the in-progress fragmented message when the continuation arrives. + const server = new WebSocketServer({ port: 0 }) + + server.on('connection', (ws) => { + ws.send('', { fin: false }) + ws.send('hello', { fin: true }) + }) + + after(() => { + for (const client of server.clients) { + client.close() + } + + server.close() + }) + + const ws = new WebSocket(`ws://localhost:${server.address().port}`) + + return new Promise((resolve) => { + ws.addEventListener('message', ({ data }) => { + assert.strictEqual(data, 'hello') + + ws.close() + resolve() + }) + }) +}) + +test('Too many empty fragments triggers close 1008', (t, done) => { + function maybeDone () { + if (++maybeDone.callCount === 2) { + agent.close() + server.close(done) + } + } + + maybeDone.callCount = 0 + + const agent = new Agent({ + webSocket: { + maxFragments: 3 + } + }) + + const server = new WebSocketServer({ port: 0 }, () => { + const { port } = server.address() + const client = new WebSocket(`ws://127.0.0.1:${port}`, { + dispatcher: agent + }) + + client.addEventListener('error', () => { + assert.ok(true) + }) + + client.addEventListener('close', (event) => { + assert.strictEqual(event.code, 1006) + maybeDone() + }) + }) + + server.on('connection', (ws) => { + ws.on('close', (code, reason) => { + assert.strictEqual(code, 1008) + assert.strictEqual(reason.toString(), 'Too many message fragments') + maybeDone() + }) + + const fragment = '' + const options = { fin: false } + + ws.send(fragment, options) // Text frame fin=0, len=0 + ws.send(fragment, options) // Continuation fin=0, len=0 + ws.send(fragment, options) // Continuation fin=0, len=0 + ws.send(fragment, options) // Continuation fin=0, len=0 + }) +}) diff --git a/test/websocket/permessage-deflate-limit.js b/test/websocket/permessage-deflate-limit.js index 45cdea4b77d..d5286b1e6f7 100644 --- a/test/websocket/permessage-deflate-limit.js +++ b/test/websocket/permessage-deflate-limit.js @@ -420,3 +420,47 @@ test('Raw uncompressed payload over 64-bit extended limit is rejected', async (t assert.strictEqual(messageReceived, false, 'Raw uncompressed message over limit should be rejected') assert.strictEqual(client.readyState, WebSocket.CLOSED, 'Connection should be closed after exceeding limit') }) + +test('cumulative payload size', (t, done) => { + const LIMIT = 100 + const FRAGMENT_SIZE = 60 + const NUM_FRAGMENTS = 10 + + const server = new WebSocketServer({ port: 0 }) + + server.on('connection', (ws) => { + const socket = ws._socket + const payload = Buffer.alloc(FRAGMENT_SIZE, 0x41) + + for (let i = 0; i < NUM_FRAGMENTS; i++) { + const fin = i === NUM_FRAGMENTS - 1 ? 0x80 : 0x00 + const opcode = i === 0 ? 0x02 : 0x00 + const header = Buffer.alloc(2) + header[0] = fin | opcode + header[1] = FRAGMENT_SIZE + socket.write(header) + socket.write(payload) + } + }) + + const agent = new Agent({ + webSocket: { + maxPayloadSize: LIMIT + } + }) + + const client = new WebSocket(`ws://127.0.0.1:${server.address().port}`, { dispatcher: agent }) + + t.after(async () => { + client.close() + server.close() + await agent.close() + }) + + client.onmessage = () => assert.fail('message should not be received') + + client.addEventListener('error', (event) => { + assert.ok(event) + done() + }) +}) diff --git a/types/client.d.ts b/types/client.d.ts index 22833ae671a..c292b9b2651 100644 --- a/types/client.d.ts +++ b/types/client.d.ts @@ -106,6 +106,12 @@ export declare namespace Client { bytesRead?: number } export interface WebSocketOptions { + /** + * Maximum number of fragments in a message. + * Set to 0 to disable the limit. + * @default 131072 + */ + maxFragments?: number; /** * Maximum allowed payload size in bytes for WebSocket messages. * Applied to uncompressed messages, compressed frame payloads, and decompressed (permessage-deflate) messages.