Skip to content

Commit 72e4a79

Browse files
authored
refactor(client): Inline PushPipeline#pull method (#1553)
Inlined `PushPipeline#pull` method which was just a delegated to the `pull` function in `PushBuffer`. The utility function works both with `PushPipeline` and `PushBuffer` it would make sense to have the delegator method in both classes, or in neither class. Maybe the best approach is to keep the pipeline and buffer classes as simple as possible. Removed also `PushBuffer#pull` and `PushBuffer#filter` methods which were not used.
1 parent 7c17925 commit 72e4a79

6 files changed

Lines changed: 38 additions & 39 deletions

File tree

packages/client/src/subscribe/Resends.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import { StorageNodeRegistry } from '../registry/StorageNodeRegistry'
1010
import { StreamStorageRegistry } from '../registry/StreamStorageRegistry'
1111
import { forEach, map } from '../utils/GeneratorUtils'
1212
import { LoggerFactory } from '../utils/LoggerFactory'
13+
import { pull } from '../utils/PushBuffer'
1314
import { PushPipeline } from '../utils/PushPipeline'
1415
import { createQueryString, fetchHttpStream } from '../utils/utils'
1516
import { MessagePipelineFactory } from './MessagePipelineFactory'
@@ -190,8 +191,9 @@ export class Resends {
190191
setImmediate(async () => {
191192
let count = 0
192193
const messages = map(lines, (line: string) => StreamMessage.deserialize(line))
193-
await messageStream.pull(
194-
forEach(messages, () => count++)
194+
await pull(
195+
forEach(messages, () => count++),
196+
messageStream
195197
)
196198
this.logger.debug('Finished resend', { loggerIdx: traceId, messageCount: count })
197199
})

packages/client/src/utils/PushBuffer.ts

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import { Gate } from './Gate'
2-
import * as G from './GeneratorUtils'
32
import { StreamrClientError } from '../StreamrClientError'
43

54
export const DEFAULT_BUFFER_SIZE = 256
@@ -79,12 +78,6 @@ export class PushBuffer<T> implements IPushBuffer<T> {
7978
return this.writeGate.check()
8079
}
8180

82-
filter(fn: G.GeneratorFilter<T>): PushBuffer<unknown> {
83-
const p = new PushBuffer(this.bufferSize)
84-
pull(G.filter(this, fn), p)
85-
return p
86-
}
87-
8881
private updateWriteGate(): void {
8982
this.writeGate.setOpenState(!this.isFull())
9083
}
@@ -215,18 +208,6 @@ export class PushBuffer<T> implements IPushBuffer<T> {
215208
return this.iterator.next()
216209
}
217210

218-
async pull(src: AsyncGenerator<T>): Promise<void> {
219-
try {
220-
for await (const v of src) {
221-
const ok = await this.push(v)
222-
if (!ok || !this.isWritable()) { break }
223-
}
224-
} catch (err) {
225-
// this.endWrite(err)
226-
}
227-
this.endWrite()
228-
}
229-
230211
[Symbol.asyncIterator](): this {
231212
if (this.isIterating) {
232213
// @ts-expect-error ts can't do this.constructor properly

packages/client/src/utils/PushPipeline.ts

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { IPushBuffer, PushBuffer, DEFAULT_BUFFER_SIZE, pull } from './PushBuffer'
1+
import { IPushBuffer, PushBuffer, DEFAULT_BUFFER_SIZE } from './PushBuffer'
22
import * as G from './GeneratorUtils'
33
import { Pipeline, PipelineTransform } from './Pipeline'
44

@@ -26,10 +26,6 @@ export class PushPipeline<InType, OutType = InType> extends Pipeline<InType, Out
2626
return super.filter(fn) as PushPipeline<InType, OutType>
2727
}
2828

29-
pull(source: AsyncGenerator<InType>): Promise<void> {
30-
return pull(source, this)
31-
}
32-
3329
// wrapped PushBuffer methods below here
3430

3531
async push(item: InType | Error): Promise<boolean> {

packages/client/test/test-utils/utils.ts

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,10 @@ import { MAX_PARTITION_COUNT, StreamMessage, StreamPartID, StreamPartIDUtils } f
55
import { fastPrivateKey, fetchPrivateKeyWithGas } from '@streamr/test-utils'
66
import { EthereumAddress, Logger, merge, wait, waitForCondition } from '@streamr/utils'
77
import crypto from 'crypto'
8+
import { once } from 'events'
9+
import express, { Request, Response } from 'express'
810
import { mock } from 'jest-mock-extended'
11+
import { AddressInfo } from 'net'
912
import { DependencyContainer } from 'tsyringe'
1013
import { Authentication, createPrivateKeyAuthentication } from '../../src/Authentication'
1114
import { StreamrClientConfig } from '../../src/Config'
@@ -223,3 +226,23 @@ export const waitForCalls = async (mockFunction: jest.Mock<any>, n: number): Pro
223226
return `Timeout while waiting for calls: got ${mockFunction.mock.calls.length} out of ${n}`
224227
})
225228
}
229+
230+
export const startTestServer = async (
231+
endpoint: string,
232+
onRequest: (req: Request, res: Response) => Promise<void>
233+
): Promise<{ url: string, stop: () => Promise<void> }> => {
234+
const app = express()
235+
app.get(endpoint, async (req, res) => {
236+
await onRequest(req, res)
237+
})
238+
const server = app.listen()
239+
await once(server, 'listening')
240+
const port = (server.address() as AddressInfo).port
241+
return {
242+
url: `http://localhost:${port}`,
243+
stop: async () => {
244+
server.close()
245+
await once(server, 'close')
246+
}
247+
}
248+
}

packages/client/test/unit/Pipeline.test.ts

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -643,8 +643,7 @@ describe('Pipeline', () => {
643643
yield* s
644644
})
645645
pipeline.onFinally.listen(onFinally)
646-
647-
pipeline.pull(generate())
646+
pull(generate(), pipeline)
648647
return pipeline
649648
})
650649

@@ -654,11 +653,10 @@ describe('Pipeline', () => {
654653
yield* s
655654
})
656655
pipeline.onFinally.listen(onFinally)
657-
658-
pipeline.pull((async function* generateError() {
656+
pull((async function* generateError() {
659657
yield* generate()
660658
throw err
661-
}()))
659+
}()), pipeline)
662660
const received: number[] = []
663661
await expect(async () => {
664662
for await (const msg of pipeline) {
@@ -675,11 +673,10 @@ describe('Pipeline', () => {
675673
yield* s
676674
})
677675
pipeline.onFinally.listen(onFinally)
678-
679676
// eslint-disable-next-line require-yield
680-
pipeline.pull((async function* generateError() {
677+
pull((async function* generateError() {
681678
throw err
682-
}()))
679+
}()), pipeline)
683680
const received: any[] = []
684681
await expect(async () => {
685682
for await (const msg of pipeline) {

packages/client/test/unit/PushPipeline.test.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { MessageID, StreamMessage, toStreamID } from '@streamr/protocol'
22
import { collect, toEthereumAddress, wait } from '@streamr/utils'
33
import { Authentication } from '../../src/Authentication'
44
import { createSignedMessage } from '../../src/publish/MessageFactory'
5+
import { pull } from '../../src/utils/PushBuffer'
56
import { PushPipeline } from '../../src/utils/PushPipeline'
67
import { counterId, instanceId } from '../../src/utils/utils'
78
import { LeaksDetector } from '../test-utils/LeaksDetector'
@@ -61,11 +62,10 @@ describe('PushPipeline', () => {
6162
const s = new PushPipeline<StreamMessage>()
6263
leaksDetector.add(instanceId(s), s)
6364
const received: StreamMessage[] = []
64-
s.pull((async function* g() {
65+
pull((async function* g() {
6566
yield streamMessage
66-
6767
throw err
68-
}()))
68+
}()), s)
6969

7070
await expect(async () => {
7171
for await (const msg of s) {
@@ -95,9 +95,9 @@ describe('PushPipeline', () => {
9595
throw error
9696
})
9797
// eslint-disable-next-line require-yield
98-
s.pull((async function* g() {
98+
pull((async function* g() {
9999
throw err
100-
}()))
100+
}()), s)
101101

102102
await expect(async () => {
103103
for await (const msg of s) {

0 commit comments

Comments
 (0)