Skip to content

Commit 3c2d0cd

Browse files
authored
refactor(client): Remove GapFillFailedError (#1579)
When an error occurs in message ordering, the `OrderedMsgChain` calls an `onError` callback which is registered by the `OrderMessages` class. There is a parameter in that call, but `OrderMessages` doesn't use the value. See https://github.com/streamr-dev/network/blob/73b83c0cc2af716a3b09c71f13cb05665cee25fe/packages/client/src/subscribe/OrderMessages.ts#L51 Refactored error handler to be just a callback without arguments and removed the custom `GapFillFailedError` class. ### Open questions In PR #1570 we changed the error handling functionality so that the pipeline can continue to process messages after error. There we consider the error case as an empty result. But if we want we could notify subscription somehow if there is an error. If we want to notify, should we emit error only when there is a storage node error, or maybe also when we can't fill the gap (i.e. don't find the missing messages from the storage node). See: https://github.com/streamr-dev/network/blob/73b83c0cc2af716a3b09c71f13cb05665cee25fe/packages/client/test/unit/OrderedMsgChain.test.ts#L429 - If we want to notify the subscription, we should use `StreamrClientError` instead of custom `GapFillFailedError` - It currently not possible to add error listeners for resend streams (i.e. when user calls `StreamrClient#resend()`)
1 parent 73b83c0 commit 3c2d0cd

3 files changed

Lines changed: 10 additions & 36 deletions

File tree

packages/client/src/subscribe/ordering/GapFillFailedError.ts

Lines changed: 0 additions & 17 deletions
This file was deleted.

packages/client/src/subscribe/ordering/OrderedMsgChain.ts

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import { MessageRef, StreamMessage, StreamPartID } from '@streamr/protocol'
22
import { EthereumAddress, Logger } from '@streamr/utils'
33
import Heap from 'heap'
4-
import GapFillFailedError from './GapFillFailedError'
54

65
function toMsgRefId(streamMessage: StreamMessage): MsgRefId {
76
return streamMessage.getMessageRef().serialize()
@@ -128,7 +127,7 @@ class MsgChainQueue {
128127
export type MessageHandler = (msg: StreamMessage) => void
129128
export type GapHandler = (from: MessageRef, to: MessageRef, context: MsgChainContext) => void | Promise<void>
130129
export type OnDrain = (numMessages: number) => void
131-
export type OnError = (error: Error) => void
130+
export type OnError = () => void
132131

133132
const logger = new Logger(module)
134133

@@ -322,7 +321,7 @@ export class OrderedMsgChain {
322321

323322
this.inOrderHandler(msg)
324323
} catch (err: any) {
325-
this.onError(err)
324+
this.onError()
326325
}
327326
return msg
328327
}
@@ -384,7 +383,7 @@ export class OrderedMsgChain {
384383
try {
385384
await this.gapHandler(from, to, this.context)
386385
} catch (err: any) {
387-
this.onError(err)
386+
this.onError()
388387
}
389388
} else {
390389
this.onGapFillsExhausted()
@@ -411,7 +410,7 @@ export class OrderedMsgChain {
411410
// skip gap, allow queue processing to continue
412411
this.lastOrderedMsgRef = msg.getPreviousMessageRef()
413412
if (this.isGapHandlingEnabled()) {
414-
this.onError(new GapFillFailedError(from, to, this.context, maxGapRequests))
413+
this.onError()
415414
}
416415

417416
this.clearGap()

packages/client/test/unit/OrderedMsgChain.test.ts

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ import { Defer, EthereumAddress, toEthereumAddress } from '@streamr/utils'
33
import assert from 'assert'
44
import { shuffle } from 'lodash'
55
import { createSignedMessage } from '../../src/publish/MessageFactory'
6-
import GapFillFailedError from '../../src/subscribe/ordering/GapFillFailedError'
76
import { MsgChainContext, OrderedMsgChain } from '../../src/subscribe/ordering/OrderedMsgChain'
87
import { createRandomAuthentication } from '../test-utils/utils'
98

@@ -398,17 +397,11 @@ describe('OrderedMsgChain', () => {
398397
})
399398

400399
describe('maxGapRequests', () => {
401-
it('call the gap handler maxGapRequests times and then fails with GapFillFailedError', (done) => {
400+
it('call the gap handler maxGapRequests times and then fails with error', (done) => {
402401
let counter = 0
403-
const onError = (err: Error) => {
404-
expect(err).toBeInstanceOf(GapFillFailedError)
405-
if (err instanceof GapFillFailedError) {
406-
expect(err.from.serialize()).toEqual('[1,1]')
407-
expect(err.to.serialize()).toEqual('[2,0]')
408-
expect(err.context).toEqual(CONTEXT)
409-
// @ts-expect-error private method
410-
expect(counter).toBe(util.maxGapRequests)
411-
}
402+
const onError = () => {
403+
// @ts-expect-error private method
404+
expect(counter).toBe(util.maxGapRequests)
412405
done()
413406
}
414407
util = new OrderedMsgChain(
@@ -426,12 +419,11 @@ describe('OrderedMsgChain', () => {
426419
util.add(msg3)
427420
})
428421

429-
it('after maxGapRequests OrderingUtil gives up on filling gap with GapFillFailedError "error" event', (done) => {
422+
it('after maxGapRequests OrderingUtil gives up on filling gap with "error" event', (done) => {
430423
const received: StreamMessage[] = []
431424
const onGap = jest.fn()
432425
let onErrorCallCount = 0
433-
const onError = (err: Error) => {
434-
if (!(err instanceof GapFillFailedError)) { throw err }
426+
const onError = () => {
435427
onGap()
436428
if (onErrorCallCount === 0) {
437429
setImmediate(() => {

0 commit comments

Comments
 (0)