Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions lib/dispatcher/pool-base.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ const kOnConnect = Symbol('onConnect')
const kOnDisconnect = Symbol('onDisconnect')
const kOnConnectionError = Symbol('onConnectionError')
const kGetDispatcher = Symbol('get dispatcher')
const kHasDispatcher = Symbol('has dispatcher')
const kAddClient = Symbol('add client')
const kRemoveClient = Symbol('remove client')

Expand Down Expand Up @@ -162,12 +163,28 @@ class PoolBase extends DispatcherBase {
this[kQueued]++
} else if (!dispatcher.dispatch(opts, handler)) {
dispatcher[kNeedDrain] = true
this[kNeedDrain] = !this[kGetDispatcher]()
this[kNeedDrain] = !this[kHasDispatcher]()
}

return !this[kNeedDrain]
}

[kHasDispatcher] () {
for (let i = 0; i < this[kClients].length; i++) {
const dispatcher = this[kClients][i]

if (
!dispatcher[kNeedDrain] &&
dispatcher.closed !== true &&
dispatcher.destroyed !== true
) {
return true
}
}

return false
}

[kAddClient] (client) {
client
.on('drain', this[kOnDrain].bind(this, client))
Expand Down Expand Up @@ -210,5 +227,6 @@ module.exports = {
kNeedDrain,
kAddClient,
kRemoveClient,
kGetDispatcher
kGetDispatcher,
kHasDispatcher
}
23 changes: 23 additions & 0 deletions lib/dispatcher/pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ const {
kNeedDrain,
kAddClient,
kGetDispatcher,
kHasDispatcher,
kRemoveClient
} = require('./pool-base')
const Client = require('./client')
Expand Down Expand Up @@ -115,6 +116,28 @@ class Pool extends PoolBase {
return dispatcher
}
}

[kHasDispatcher] () {
const clientTtlOption = this[kOptions].clientTtl
for (let i = 0; i < this[kClients].length; i++) {
const client = this[kClients][i]

if (clientTtlOption != null && clientTtlOption > 0 && client.ttl && ((Date.now() - client.ttl) > clientTtlOption)) {
this[kRemoveClient](client)
i--
} else if (!client[kNeedDrain]) {
return true
}
}

if (!this[kConnections] || this[kClients].length < this[kConnections]) {
const dispatcher = this[kFactory](this[kUrl], this[kOptions])
this[kAddClient](dispatcher)
return true
}

return false
}
}

module.exports = Pool
26 changes: 26 additions & 0 deletions lib/dispatcher/round-robin-pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ const {
kNeedDrain,
kAddClient,
kGetDispatcher,
kHasDispatcher,
kRemoveClient
} = require('./pool-base')
const Client = require('./client')
Expand Down Expand Up @@ -128,6 +129,31 @@ class RoundRobinPool extends PoolBase {
return dispatcher
}
}

[kHasDispatcher] () {
const clientTtlOption = this[kOptions].clientTtl
for (let i = 0; i < this[kClients].length; i++) {
const client = this[kClients][i]

if (clientTtlOption != null && clientTtlOption > 0 && client.ttl && ((Date.now() - client.ttl) > clientTtlOption)) {
this[kRemoveClient](client)
if (i <= this[kIndex]) {
this[kIndex]--
}
i--
} else if (!client[kNeedDrain]) {
return true
}
}

if (!this[kConnections] || this[kClients].length < this[kConnections]) {
const dispatcher = this[kFactory](this[kUrl], this[kOptions])
this[kAddClient](dispatcher)
return true
}

return false
}
}

module.exports = RoundRobinPool
53 changes: 53 additions & 0 deletions test/node-test/balanced-pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@
const { describe, test } = require('node:test')
const assert = require('node:assert/strict')
const { BalancedPool, Pool, Client, errors } = require('../..')
const { EventEmitter } = require('node:events')
const { createServer } = require('node:http')
const { promisify } = require('node:util')
const { tspl } = require('@matteo.collina/tspl')
const { kUrl } = require('../../lib/core/symbols')
const { kGetDispatcher } = require('../../lib/dispatcher/pool-base')

test('throws when factory is not a function', (t) => {
const p = tspl(t, { plan: 2 })
Expand Down Expand Up @@ -48,6 +51,56 @@ test('add/remove upstreams', (t) => {
p.deepStrictEqual(pool.upstreams, [])
})

test('does not select dispatcher twice when selected dispatcher backpressures', (t) => {
class FakeDispatcher extends EventEmitter {
constructor (origin) {
super()
this[kUrl] = new URL(origin)
}

dispatch () {
return false
}

close () {
this.closed = true
return Promise.resolve()
}

destroy () {
this.destroyed = true
return Promise.resolve()
}
}

class CountingBalancedPool extends BalancedPool {
constructor (...args) {
super(...args)
this.calls = 0
}

[kGetDispatcher] () {
this.calls++
return super[kGetDispatcher]()
}
}

const pool = new CountingBalancedPool([
'http://localhost:1',
'http://localhost:2'
], {
factory: (origin) => new FakeDispatcher(origin)
})
t.after(() => pool.close())

const ret = pool.dispatch({}, {
onResponseError () {}
})

assert.strictEqual(ret, true)
assert.strictEqual(pool.calls, 1)
})

test('basic get', async (t) => {
const p = tspl(t, { plan: 16 })

Expand Down
Loading