diff --git a/.vscode/settings.json b/.vscode/settings.json index ae2bdd3..5f7d4fd 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -1,7 +1,7 @@ { "typescript.tsdk": "node_modules/typescript/lib", "javascript.preferences.importModuleSpecifier": "project-relative", - "typescript.preferences.importModuleSpecifier": "shortest", + "typescript.preferences.importModuleSpecifier": "project-relative", "editor.foldingImportsByDefault": false, "eslint.options": { "overrideConfigFile": "./eslint.config.mjs", diff --git a/README.md b/README.md index 9f86a65..447203e 100644 --- a/README.md +++ b/README.md @@ -1,61 +1,464 @@ -# BatchLoader -utility for batch data processing. The [DataLoader](https://github.com/graphql/dataloader) is taken as an example. +`BatchLoader` is a tool for batching data requests with support for deduplication, caching, and parallel task management. It is designed to enhance flexibility and performance in scenarios requiring asynchronous data processing. This module was inspired by [Facebook's Dataloader](https://github.com/graphql/dataloader). -Collects data over a specified time or within a specified amount +## Key Features +- **Batching Requests:** Combines multiple requests into batches, minimizing the number of calls. +- **Parallel Task Limitation:** Controls the number of concurrent tasks using the `concurrencyLimit` parameter. +- **Request Deduplication:** Eliminates duplicate requests with the same keys, redirecting them to already running tasks. +- **Flexible Caching:** Supports adapters for external storage like Redis and provides cache invalidation management. +- **Timeout Control:** Manages execution time for individual tasks and batches. +- **Delay Before Batch Execution:** Allows small delays to improve efficiency in asynchronous workflows. + +## Problems Addressed by This Module + +1. **Coupling Deduplication and Caching:** + - In the original `Dataloader`, deduplication is tightly coupled with caching. Disabling caching also disables deduplication. + - In `BatchLoader`, deduplication and caching are separate, allowing caching to be disabled without losing deduplication. + +2. **Inconvenient Cache Management:** + - In `Dataloader`, all cached data must be manually cleared, which is difficult to coordinate in distributed systems (e.g., PM2, Docker Swarm, or Kubernetes). + - `BatchLoader` supports cache adapters and provides methods for clearing or resetting the cache, making it easier to manage in distributed systems. + +3. **Synchronous Batch Formation:** + - In `Dataloader`, batches are formed synchronously within a single event loop. While it allows custom scheduling with `batchScheduleFn`, this approach is complex and redundant. + - `BatchLoader` supports configurable delays (`batchTimeMs`) before batch execution, enabling more efficient handling of asynchronous requests arriving at slightly different times. + + +--- +--- + + +# Main modules: + +## BatchLoader + +`BatchLoader` is a tool for grouping requests into batches and processing them efficiently with support for deduplication and caching. + +--- + +### Key Features +- **Batching requests:** Processes multiple requests simultaneously, reducing the number of calls. +- **Parallel task limits:** Controls the number of concurrent tasks using the `concurrencyLimit` parameter. +- **Request deduplication:** Prevents duplicate requests with the same keys, redirecting them to existing tasks. +- **Caching:** Stores results in a cache for reuse. +- **Timeouts:** Manages execution time for requests and batches. --- -## install +### Configuration Options +```typescript +interface IBatchLoaderOptions { + getKey?: (query: K) => Key // Function to generate a request key (default: query => `${query}`) + cache?: ICache // Cache adapter (default: StubCache) + timeoutMs?: number // Task execution timeout (default: 60,000 ms) + unrefTimeouts?: boolean // Allows timers to free the event loop (default: false) + concurrencyLimit?: number // Maximum number of parallel tasks (default: Infinity) + maxBatchSize?: number // Maximum number of requests per batch (default: 1000) + batchTimeMs?: number // Maximum time to form a batch (default: 50 ms) + maxWaitingTimeMs?: number // Maximum queue waiting time (only if concurrencyLimit > 0) (default: 60,000 ms) +} ``` -npm i @nerjs/batchloader -// or: -yarn add @nerjs/batchloader + +--- + +### Example Usage +```typescript +import { BatchLoader } from '@nerjs/batchloader' + +const loader = new BatchLoader( + async (queries: number[]) => queries.map(q => q * 2), + { + timeoutMs: 100, + maxBatchSize: 3, + batchTimeMs: 50, + } +) + +const result = await loader.load(5) +console.log(result) // 10 +``` + +--- + +### Cache Examples +#### Using Cache +```typescript +const loader = new BatchLoader( + async (queries: number[]) => queries.map(q => q * 2), + { + cache: new MapCache(), + timeoutMs: 100, + } +) + +const result = await loader.load(5) +console.log(result) // 10 +``` + +#### Clearing Cache +```typescript +await loader.resetCache(5) // Removes the result for the specified query +await loader.flush() // Clears the entire cache ``` -## usage +--- + +### Methods + +#### `load(query: K): Promise` +Adds a request to the current batch. If the batch reaches the maximum size (`maxBatchSize`) or the timeout (`batchTimeMs`) expires, it is processed immediately. Returns a promise with the result of the request. + +**Parameters:** +- **`query: K`** — The request added to the batch. + +#### `resetCache(query: K): Promise` +Clears the cache for the specified key. + +#### `clear(): void` +Cancels all tasks and clears their state. + +#### `flush(): Promise` +Clears the entire cache. + + +--- +--- -```js -const BatchLoader = require('@nerjs/batchloader') -// or: -import BatchLoader from '@nerjs/batchloader' +## Deduplicator -const batchLoadFn = arr => { - return arr.map(n => n * 2) +`Deduplicator` is a module designed to prevent duplicate execution of identical requests. If two or more requests share the same key, they are grouped and executed as a single request. All subsequent requests receive results from the already-running request. + +--- + +### Key Features +- **Request Deduplication**: Groups requests with the same key to avoid redundant executions. +- **Execution Timeouts**: Supports task timeouts (`timeoutMs`). +- **Task Cancellation Management**: Uses `AbortSignal` for safe task termination. +- **Flexible Timer Control**: Option `unrefTimeouts` allows timers to avoid blocking the event loop. + +--- + +### Configuration Options +```typescript +interface IDeduplicatorOptions { + getKey: (query: T) => Key // Function to extract the key from a query + timeoutMs: number // Task execution timeout + unrefTimeouts?: boolean // Allows timers to avoid blocking the event loop } +``` + +--- + +### Usage Examples + +#### Basic Usage +```typescript +import { Deduplicator } from '@nerjs/batchloader' + +const deduplicator = new Deduplicator( + async (query: number, signal: AbortSignal) => { + if (signal.aborted) throw new Error('Aborted') + return query * 2 + }, + { + getKey: query => query, + timeoutMs: 500, + } +) + +const result = await deduplicator.call(5) +console.log(result) // 10 +``` + +--- + +#### Request Deduplication +```typescript +const results = await Promise.all([ + deduplicator.call(1), + deduplicator.call(1), // Joins the first request + deduplicator.call(2) +]) + +console.log(results) // [2, 2, 4] +``` -const loader = new BatchLoader(batchLoadFn) +--- + +#### Handling Timeouts +```typescript +const deduplicator = new Deduplicator( + async () => { + await new Promise(res => setTimeout(res, 1000)) + return 42 + }, + { + getKey: query => query, + timeoutMs: 500, // Specified timeout + } +) + +await deduplicator.call(1).catch(err => console.error(err.message)) // TimeoutError +``` + +--- + +### Methods -loader.load(1).then(result => { - console.log(result) // console: 2 -}) +#### `call(query: T): Promise` +Adds a query to the execution queue or joins an already running request with the same key. Returns a promise with the result of the task execution. +--- + +#### `clear()` +Cancels all active tasks and clears their state. +```typescript +deduplicator.clear() ``` -## API +--- -api is maximally repeated from the [DataLoader API](https://github.com/graphql/dataloader#api) with small additions. +## Possible Errors -### class DataLoader(batchLoadFn, [, options]) +All errors inherit from the base class `LoaderError` and are designed to handle various situations that may arise during request execution. + +### `TimeoutError` +Occurs when the specified execution time for a task or batch (`timeoutMs`) is exceeded. This error can be thrown by both the deduplicator and the batch aggregator. + +### `SilentAbortError` +Occurs when a task is intentionally canceled, for example, during a call to the `clear()` method. It is used for safely terminating tasks without generating exceptions. + +### `AbortError`, `RejectedAbortError` +Thrown when a task is manually aborted during execution, explicitly indicating the process termination. -* ***batchLoadFn***: A function which accepts an Array of keys, and returns a Promise which resolves to an Array of values. -* ***options***: - | key | type | default | - |:--|:--:|:--:| - | maxSize | Number | 1000 | - | cacheTime | Number | 10 | - | batchTime | Number | 10 | - | getKey | Function | null | -### loader.load(any) -### loader.loadMany(Array) -### loader.clear() -clear cache -### loader.clearMany() -### loader.resolve(keyData, result) -### loader.reject(keyData, Error) --- +--- + +# utils: + + +## BatchAggregator + + +`BatchAggregator` is a utility for grouping multiple requests into batches and processing them in bulk. It optimizes task execution by managing batch size, execution timing, and concurrency limits. + +--- + +### Key Features + +- **Request Grouping**: Groups requests into batches based on size or timeout. +- **Concurrency Control**: Limits the number of tasks that can run in parallel (`concurrencyLimit`). +- **Timeout Handling**: Supports execution timeouts (`batchTimeout`) and waiting timeouts (`maxWaitingTimeMs`). + +--- + +### Configuration Options + +```typescript +interface IBatchAggregatorOptions { + concurrencyLimit?: number; // Maximum number of parallel tasks (default: unlimited) + maxBatchSize: number; // Maximum number of requests per batch + batchTimeMs: number; // Maximum time to form a batch + maxWaitingTimeMs?: number; // Maximum waiting time for tasks in the queue (only if concurrencyLimit > 0) + batchTimeout?: number; // Maximum execution time for batchFn (the function passed as the first argument) +} +``` + +--- + +### Usage Examples: + +#### Basic Example + +```typescript +import { BatchAggregator } from '@nerjs/batchloader' + +const aggregator = new BatchAggregator( + async (batch, signal) => { + if (signal.aborted) throw new Error('Aborted') + return batch.map(item => item * 2) // Example: double each number + }, + { + maxBatchSize: 3, + batchTimeMs: 100, + batchTimeout: 500, + } +) + +const results = await Promise.all([ + aggregator.load(1), + aggregator.load(2), + aggregator.load(3), +]) + +console.log(results) // Output: [2, 4, 6] +``` + + +#### Concurrency Limiting + +```typescript +import { BatchAggregator } from '@nerjs/batchloader' + +const aggregator = new BatchAggregator( + async (batch, signal) => { + if (signal.aborted) throw new Error('Aborted') + return batch.map(item => item * 2) + }, + { + maxBatchSize: 2, + batchTimeMs: 100, + batchTimeout: 500, + concurrencyLimit: 2, // Limit to 2 parallel tasks + } +) + +const results = await Promise.all([ + aggregator.load(1), + aggregator.load(2), + aggregator.load(3), + aggregator.load(4), +]) + +console.log(results) // Output: [2, 4, 6, 8] +``` + + +#### Timeout Handling + +```typescript +import { BatchAggregator } from '@nerjs/batchloader' + +const aggregator = new BatchAggregator( + async (batch, signal) => { + await new Promise(res => setTimeout(res, 200)) // Simulate delay + return batch.map(item => item * 2) + }, + { + maxBatchSize: 1, + batchTimeMs: 100, + batchTimeout: 500, + concurrencyLimit: 1, + maxWaitingTimeMs: 100, // Timeout for tasks in the queue + } +) + +await Promise.all([ + aggregator.load(1), // Completes successfully + aggregator.load(2).catch(err => console.error(err.message)), // Fails due to timeout +]) +``` + +--- + +### Methods + +#### `load(request: T): Promise` + +Adds a request to the current batch. If the batch reaches its maximum size (`maxBatchSize`) or the timeout (`batchTimeMs`) expires, it is processed immediately. Returns a promise that resolves with the result of the request. + + +#### `clear()` + +Clears all pending and waiting tasks. Useful for resource cleanup during shutdown or restart. + +```typescript +aggregator.clear() +``` + + +--- +--- + +## Unlimited Timekeeper + + +A class for managing tasks without restrictions on parallel execution. + + +Key Features: + + - Supports tasks with various statuses: pending, runned, resolved, rejected. + - Only one task can remain in the pending state at a time. + - Manages automatic task execution based on a specified timeout (timeoutMs). + - Handles errors during task execution. + - Supports cancellation of tasks using AbortSignal. + + +How to interact with data: + + - Data is created using initialDataFactory. + - You can modify task data fields: + ```ts + const task = timekeeper.current(); + task.data.field = 'other example'; + ``` +> Direct assignment of new data is not allowed: `task.data = {}; // Error` + + - In the runner, you can access data via task.data. + +--- + +### Usage Example: + +```typescript +import { UnlimitedTimekeeper } from '@nerjs/batchloader' + +const timekeeper = new UnlimitedTimekeeper<{ field: string }>({ + initialDataFactory: () => ({ field: 'example' }), + runMs: 100, + timeoutMs: 1000, + runner: async (task, signal) => { + if (signal.aborted) throw new Error('Aborted'); + console.log(task.data.field); // Accessing data (log: "other") + await someAsyncFunction(); + }, +}); + +const task = timekeeper.current(); +task.data.field = 'other' +timekeeper.run(); +await timekeeper.wait(task); +``` + +--- +--- + +## Limited Timekeeper + +A class for managing tasks with restrictions on parallel execution. Inherits functionality from UnlimitedTimekeeper. + + +Key Features: + - Controls parallel execution using concurrencyLimit. Tasks exceeding the limit are placed in a waiting queue. + - Manages tasks in the waiting queue (waiting). + - Handles timeouts for tasks in the waiting state (maxWaitingTimeMs). + - Supports cancellation of both active and waiting tasks. + +--- + +Usage Example: +```typescript +import { LimitedTimekeeper } from '@nerjs/batchloader' + +const timekeeper = new LimitedTimekeeper({ + initialDataFactory: () => ({ field: 'example' }), + runMs: 100, + timeoutMs: 1000, + concurrencyLimit: 2, + maxWaitingTimeMs: 500, + runner: async (task, signal) => { + if (signal.aborted) throw new Error('Aborted'); + console.log(task.data.field); // Accessing data + await someAsyncFunction(); + }, +}); + +const task = timekeeper.current(); +timekeeper.run(); +await timekeeper.wait(task); +``` -#### [:link: All utils ](https://github.com/nerjs/utils#readme) \ No newline at end of file diff --git a/__tests__/batchloader.test.js b/__tests__/batchloader.test.js deleted file mode 100644 index 3ef7aac..0000000 --- a/__tests__/batchloader.test.js +++ /dev/null @@ -1,197 +0,0 @@ -const BatchLoader = require('../lib/index') -const { - MSG_BAD_BATCHLOAD_FN, - MSG_EMPTY_LOAD_METHOD, - MSG_BAD_GET_KEY_FN, - MSG_BAD_RESULT, - MSG_LOAD_MANY_METHOD, -} = require('../lib/constants') - -describe('BatchLoader', () => { - it('init', () => { - const batchLoaderFn = () => {} - const getKeyFn = () => {} - const loader = new BatchLoader(batchLoaderFn, { - maxSize: 1, - cacheTime: 2, - batchTime: 3, - getKey: getKeyFn, - }) - - expect(loader.batchLoadFn).toEqual(batchLoaderFn) - expect(loader.options.maxSize).toEqual(1) - expect(loader.options.cacheTime).toEqual(2) - expect(loader.options.batchTime).toEqual(3) - expect(loader.options.getKey).toEqual(getKeyFn) - expect(loader.size).toEqual(0) - }) - - it('errors', async () => { - expect(() => { - new BatchLoader() - }).toThrow(MSG_BAD_BATCHLOAD_FN) - - const batchLoaderFn = async () => {} - - let loader = new BatchLoader(batchLoaderFn, { getKey: () => null }) - - await expect(() => loader.load()).toThrow(MSG_EMPTY_LOAD_METHOD) - await expect(() => loader.load(1)).toThrow(MSG_BAD_GET_KEY_FN) - - loader = new BatchLoader(batchLoaderFn) - await expect(loader.load(3)).rejects.toThrow(MSG_BAD_RESULT) - expect(() => loader.loadMany()).toThrow(MSG_LOAD_MANY_METHOD) - expect(() => loader.loadMany(null)).toThrow(MSG_LOAD_MANY_METHOD) - expect(() => loader.loadMany(1)).toThrow(MSG_LOAD_MANY_METHOD) - expect(() => loader.loadMany('1')).toThrow(MSG_LOAD_MANY_METHOD) - expect(() => loader.loadMany({ a: 1 })).toThrow(MSG_LOAD_MANY_METHOD) - - loader = new BatchLoader(() => { - throw new Error('Test Error') - }) - await expect(loader.load(3)).rejects.toThrow('Test Error') - - await expect( - Promise.all([loader.load(1), loader.load(2), loader.load(3), loader.load(4)]), - ).rejects.toThrow('Test Error') - - await Promise.all([ - expect(loader.load(5)).rejects.toThrow('Test Error'), - expect(loader.load(6)).rejects.toThrow('Test Error'), - expect(loader.load(7)).rejects.toThrow('Test Error'), - expect(loader.load(8)).rejects.toThrow('Test Error'), - ]) - - const badBatchLoaderFn = arr => { - arr.shift() - return arr - } - - loader = new BatchLoader(badBatchLoaderFn) - - await Promise.all([ - expect(loader.load(5)).rejects.toThrow(MSG_BAD_RESULT), - expect(loader.load(6)).rejects.toThrow(MSG_BAD_RESULT), - expect(loader.load(7)).rejects.toThrow(MSG_BAD_RESULT), - ]) - }) - - it('load() batch', async () => { - let batchLoaderFn, loader - jest.useFakeTimers() - - batchLoaderFn = jest.fn(async arr => arr) - - loader = new BatchLoader(batchLoaderFn, { batchTime: 20, cacheTime: 3 }) - - const promise1 = loader.load(1) - const promise11 = loader.load(1) - - expect(setTimeout).toHaveBeenCalledTimes(1) - expect(setTimeout).toHaveBeenLastCalledWith(expect.any(Function), 20) - expect(promise1).toEqual(promise11) - - expect(batchLoaderFn).toHaveBeenCalledTimes(0) - - jest.advanceTimersByTime(10) - const promise2 = loader.load(2) - jest.advanceTimersByTime(10) - - expect(batchLoaderFn).toHaveBeenCalledTimes(1) - expect(batchLoaderFn).toHaveBeenLastCalledWith([1, 2]) - - jest.advanceTimersByTime(10) - const promise3 = loader.load(3) - jest.advanceTimersByTime(20) - - expect(batchLoaderFn).toHaveBeenCalledTimes(2) - expect(batchLoaderFn).toHaveBeenLastCalledWith([3]) - - const [result1, result2, result3] = await Promise.all([promise1, promise2, promise3]) - - expect(result1).toEqual(1) - expect(result2).toEqual(2) - expect(result3).toEqual(3) - jest.useRealTimers() - }) - - it('load() cache', async () => { - let batchLoaderFn, loader - jest.useFakeTimers() - batchLoaderFn = jest.fn(async arr => arr) - loader = new BatchLoader(batchLoaderFn, { batchTime: 20, cacheTime: 20 }) - - loader.load(1) - jest.advanceTimersByTime(20) - const result1 = await loader.load(1) - jest.advanceTimersByTime(10) - const result2 = await loader.load(1) - - expect(result1).toEqual(1) - expect(result2).toEqual(1) - expect(batchLoaderFn).toHaveBeenCalledTimes(1) - - jest.advanceTimersByTime(10) - loader.load(1) - jest.advanceTimersByTime(20) - expect(batchLoaderFn).toHaveBeenCalledTimes(2) - }) - - it('.loadMany()', async () => { - const batchLoaderFn = jest.fn(async arr => arr) - const loader = new BatchLoader(batchLoaderFn, { batchTime: 10, cacheTime: 20 }) - jest.useFakeTimers() - - const promise = loader.loadMany([3, 4, 5]) - jest.runOnlyPendingTimers() - - const [result1, result2, result3] = await promise - - expect(result1).toEqual(3) - expect(result2).toEqual(4) - expect(result3).toEqual(5) - - const result4 = await loader.load(4) - - expect(result4).toEqual(4) - expect(batchLoaderFn).toHaveBeenCalledTimes(1) - jest.useRealTimers() - }) - - it('.clear()', async () => { - const batchLoaderFn = jest.fn(async arr => arr) - const loader = new BatchLoader(batchLoaderFn, { batchTime: 5, cacheTime: 30 }) - - let result = await loader.load(1) - - expect(batchLoaderFn).toHaveBeenCalledTimes(1) - expect(result).toEqual(1) - - result = await loader.load(1) - expect(batchLoaderFn).toHaveBeenCalledTimes(1) - expect(result).toEqual(1) - - loader.clear(1) - result = await loader.load(1) - expect(batchLoaderFn).toHaveBeenCalledTimes(2) - }) - - it('.resolve(), .reject()', async () => { - const batchLoaderFn = jest.fn(async arr => arr) - const loader = new BatchLoader(batchLoaderFn, { batchTime: 2, cacheTime: 3 }) - - const promise1 = loader.load(1) - const promise2 = loader.load(2) - const promise3 = loader.load(3) - - loader.resolve(1, 4) - loader.reject(2, new Error('Test error')) - - await expect(promise1).resolves.toEqual(4) - await expect(promise2).rejects.toThrow('Test error') - await expect(promise3).resolves.toEqual(3) - - expect(batchLoaderFn).toHaveBeenCalledTimes(1) - expect(batchLoaderFn).toHaveBeenLastCalledWith([3]) - }) -}) diff --git a/__tests__/parse_options.test.js b/__tests__/parse_options.test.js deleted file mode 100644 index 7339250..0000000 --- a/__tests__/parse_options.test.js +++ /dev/null @@ -1,43 +0,0 @@ -const parseOptions = require('../lib/parse_options') -const { - MSG_BAD_BATCHLOAD_FN, - DEFAULT_BATCH_TIME, - DEFAULT_CACHE_TIME, - DEFAULT_GET_KEY, - DEFAULT_SIZE, -} = require('../lib/constants') - -describe('parseOptions', () => { - test('bad batchLoaderFn', () => { - expect(() => parseOptions()).toThrow(MSG_BAD_BATCHLOAD_FN) - }) - - test('empty object', () => { - const batchLoadFn = () => {} - - const { batchFn, maxSize, cacheTime, batchTime, getKey } = parseOptions(batchLoadFn) - - expect(batchFn).toEqual(batchLoadFn) - expect(maxSize).toEqual(DEFAULT_SIZE) - expect(cacheTime).toEqual(DEFAULT_CACHE_TIME) - expect(batchTime).toEqual(DEFAULT_BATCH_TIME) - expect(getKey).toEqual(DEFAULT_GET_KEY) - }) - - test('correct parameters', () => { - const batchLoadFn = () => {} - const getKeyFn = () => {} - - const { maxSize, cacheTime, batchTime, getKey } = parseOptions(batchLoadFn, { - maxSize: 1, - cacheTime: 2, - batchTime: 3, - getKey: getKeyFn, - }) - - expect(maxSize).toEqual(1) - expect(cacheTime).toEqual(2) - expect(batchTime).toEqual(3) - expect(getKey).toEqual(getKeyFn) - }) -}) diff --git a/lib/constants.js b/lib/constants.js deleted file mode 100644 index 4741fe3..0000000 --- a/lib/constants.js +++ /dev/null @@ -1,23 +0,0 @@ -// protected props -exports.PROP_BATCH_FN = Symbol('Batch load fn') -exports.PROP_OPTIONS = Symbol('options') -exports.PROP_CACHE = Symbol('cache') -exports.PROP_BATCH = Symbol('batch') -exports.PROP_WAIT_BATCH = Symbol('wait batch') - -// default options -exports.DEFAULT_SIZE = 1000 -exports.DEFAULT_BATCH_TIME = 10 -exports.DEFAULT_CACHE_TIME = 10 -exports.DEFAULT_PARALLEL = false -exports.DEFAULT_GET_KEY = null - -// error messages -exports.MSG_BAD_BATCHLOAD_FN = - 'BatchLoader must be constructed with a function which accepts Array and returns Promise>' -exports.MSG_EMPTY_LOAD_METHOD = 'The loader.load() function must be called with a value' -exports.MSG_LOAD_MANY_METHOD = 'The loader.loadMany() function must be called with Array ' -exports.MSG_BAD_GET_KEY_FN = 'The option.getKey() function must be return value' -exports.MSG_BAD_RESULT = `The result of batchloadFn should return an Promise corresponding to the passed one. - Or another result (Promise) that will be returned in each promise. - Not null or undefined.` diff --git a/lib/index.d.ts b/lib/index.d.ts deleted file mode 100644 index 31ff1e7..0000000 --- a/lib/index.d.ts +++ /dev/null @@ -1,48 +0,0 @@ - -type BatchOptions = { - maxSize: number, - cacheTime: number, - batchTime: number, - - /** - * build key - * @param dataKey argument for BatchLoader.load() - */ - getKey: (dataKey: any) => any, -} - -declare class BatchLoader { - constructor( - batchLoaderFn:(batches:Array) => Promise>, - options?: BatchOptions - ); - - load(data: any): Promise; - - loadMany(dataArr: Array): Promise; - - /** - * Clear cache - * @param keyData data for build key. - */ - clear(keyData: any): undefined; - - /** - * Clear all cache - */ - clearAll(): undefined; - - /** - * resolve one wait promise - * @param keyData data for build key - */ - resolve(keyData: any, result: any): undefined; - - /** - * reject one wait promise. - * @param keyData data for build key - */ - reject(keyData: any, error: Error): undefined; -} - -export = BatchLoader diff --git a/lib/index.js b/lib/index.js deleted file mode 100644 index 1ad897a..0000000 --- a/lib/index.js +++ /dev/null @@ -1,153 +0,0 @@ -const DeferPromise = require('helpers-promise/defer') - -const parseOptions = require('./parse_options') -const promiseFns = require('./promise_fns') -const { - PROP_BATCH_FN, - PROP_OPTIONS, - PROP_CACHE, - PROP_BATCH, - PROP_WAIT_BATCH, - MSG_EMPTY_LOAD_METHOD, - MSG_LOAD_MANY_METHOD, - MSG_BAD_GET_KEY_FN, - MSG_BAD_RESULT, -} = require('./constants') - -class BatchLoader { - constructor(batchLoaderFn, options) { - const { - batchFn, - maxSize, - cacheTime, - batchTime, - parallel, // feature - getKey, - } = parseOptions(batchLoaderFn, options) - - this.timer = null - - this[PROP_BATCH_FN] = batchFn - this[PROP_OPTIONS] = { maxSize, cacheTime, batchTime, parallel, getKey } - this[PROP_CACHE] = new Map() - this[PROP_BATCH] = new Map() - this[PROP_WAIT_BATCH] = new Map() - } - - get batchLoadFn() { - return this[PROP_BATCH_FN] - } - - get options() { - return { ...this[PROP_OPTIONS] } - } - - get size() { - return this[PROP_BATCH].size - } - - testRun() { - const { maxSize, batchTime } = this.options - - if (this.size >= maxSize) return this.run() - - if (!this.timer) { - this.timer = setTimeout(this.run.bind(this), batchTime) - } - } - - async run() { - if (this.timer) { - clearTimeout(this.timer) - this.timer = null - } - - const { cacheTime } = this.options - - const batchCache = new Map(this[PROP_BATCH]) - this[PROP_BATCH].clear() - - const keys = [...batchCache.keys()] - if (keys.length === 0) return - - batchCache.forEach((data, key) => { - this[PROP_WAIT_BATCH].set(key, data) - }) - - let results, error - - try { - results = await this.batchLoadFn(keys.map(key => batchCache.get(key).data)) - } catch (err) { - results = keys - error = err - } - - if ( - !error && - (results === null || - results === undefined || - (Array.isArray(results) && results.length !== keys.length)) - ) { - error = new Error(MSG_BAD_RESULT) - } - - keys.forEach((key, index) => { - const batch = batchCache.get(key) - if (error) return batch.promise.reject(error) - const result = Array.isArray(results) ? results[index] : results - this[PROP_WAIT_BATCH].delete(key) - batch.promise.resolve(result) - if (cacheTime) { - this[PROP_CACHE].set(key, { result }) - setTimeout(() => this[PROP_CACHE].delete(key), cacheTime) - } - }) - } - - load(data) { - if (data === undefined || data === null) throw new TypeError(MSG_EMPTY_LOAD_METHOD) - const { getKey } = this.options - const key = getKey ? getKey(data) : data - if (key === undefined || key === null) throw new TypeError(MSG_BAD_GET_KEY_FN) - - if (this[PROP_CACHE].has(key)) - return Promise.resolve().then(() => this[PROP_CACHE].get(key).result) - if (this[PROP_BATCH].has(key)) return this[PROP_BATCH].get(key).promise - if (this[PROP_WAIT_BATCH].has(key)) return this[PROP_WAIT_BATCH].get(key).promise - - const promise = new DeferPromise() - - this[PROP_BATCH].set(key, { - data, - promise, - }) - - this.testRun() - - return promise - } - - loadMany(arr) { - if (!Array.isArray(arr) || arr.length === 0) throw new TypeError(MSG_LOAD_MANY_METHOD) - return Promise.all(arr.map(data => this.load(data))) - } - - clear(key) { - this[PROP_CACHE].delete(key) - } - - clearAll() { - this[PROP_CACHE].clear(key) - } - - resolve(key, result) { - return promiseFns(this, 'resolve', key, result) - } - - reject(key, error) { - return promiseFns(this, 'reject', key, error) - } -} - -module.exports = BatchLoader diff --git a/lib/parse_options.js b/lib/parse_options.js deleted file mode 100644 index 2197e43..0000000 --- a/lib/parse_options.js +++ /dev/null @@ -1,34 +0,0 @@ -const { - DEFAULT_SIZE, - DEFAULT_BATCH_TIME, - DEFAULT_CACHE_TIME, - DEFAULT_PARALLEL, - DEFAULT_GET_KEY, - MSG_BAD_BATCHLOAD_FN, -} = require('./constants') - -const prepareOption = (value, defaultValue, filter) => - value === undefined || !filter(value) ? defaultValue : value - -module.exports = (batchFn, options) => { - const { maxSize, batchTime, cacheTime, parallel, getKey } = - options && typeof options == 'object' ? options : {} - if (typeof batchFn !== 'function') throw new TypeError(MSG_BAD_BATCHLOAD_FN) - - return { - batchFn, - maxSize: prepareOption(maxSize, DEFAULT_SIZE, n => typeof n === 'number' && n >= 0), - parallel: prepareOption(parallel, DEFAULT_PARALLEL, n => typeof n === 'boolean'), - getKey: prepareOption(getKey, DEFAULT_GET_KEY, n => typeof n === 'function'), - cacheTime: prepareOption( - cacheTime, - DEFAULT_CACHE_TIME, - n => typeof n === 'number' && n >= 0, - ), - batchTime: prepareOption( - batchTime, - DEFAULT_BATCH_TIME, - n => typeof n === 'number' && n >= 0, - ), - } -} diff --git a/lib/promise_fns.js b/lib/promise_fns.js deleted file mode 100644 index ea85223..0000000 --- a/lib/promise_fns.js +++ /dev/null @@ -1,21 +0,0 @@ -const { PROP_CACHE, PROP_BATCH, PROP_WAIT_BATCH } = require('./constants') - -module.exports = (ctx, type, keyData, result) => { - const key = ctx.options.getKey ? ctx.options.getKey(keyData) : keyData - if (ctx[PROP_BATCH].has(key)) { - ctx[PROP_BATCH].get(key).promise[type](result) - ctx[PROP_BATCH].delete(key) - } else if (tctxhis[PROP_WAIT_BATCH].has(key)) { - ctx[PROP_WAIT_BATCH].get(key).promise[type](result) - ctx[PROP_WAIT_BATCH].delete(key) - } else { - return - } - - if (type === 'reject') return - - if (ctx.options.cacheTime) { - ctx[PROP_CACHE].set(key, { result }) - setTimeout(() => ctx[PROP_CACHE].delete(key), ctx.options.cacheTime) - } -} diff --git a/package-lock.json b/package-lock.json index 76f340e..ae511ad 100644 --- a/package-lock.json +++ b/package-lock.json @@ -9,11 +9,12 @@ "version": "1.0.13", "license": "MIT", "dependencies": { - "helpers-promise": "^1.3.0" + "debug": "^4.4.0" }, "devDependencies": { "@eslint/eslintrc": "^3.2.0", "@eslint/js": "^9.17.0", + "@types/debug": "^4.1.12", "@types/eslint__js": "^8.42.3", "@types/jest": "^29.5.14", "@types/node": "^22.10.2", @@ -1227,6 +1228,15 @@ "@babel/types": "^7.20.7" } }, + "node_modules/@types/debug": { + "version": "4.1.12", + "resolved": "https://registry.npmjs.org/@types/debug/-/debug-4.1.12.tgz", + "integrity": "sha512-vIChWdVG3LG1SMxEvI/AK+FWJthlrqlTu7fbrlywTkkaONwk/UAGaULXRlf8vkzFBLVm0zkMdCquhL5aOjhXPQ==", + "dev": true, + "dependencies": { + "@types/ms": "*" + } + }, "node_modules/@types/eslint": { "version": "9.6.1", "resolved": "https://registry.npmjs.org/@types/eslint/-/eslint-9.6.1.tgz", @@ -1301,6 +1311,12 @@ "integrity": "sha512-5+fP8P8MFNC+AyZCDxrB2pkZFPGzqQWUzpSeuuVLvm8VMcorNYavBqoFcxK8bQz4Qsbn4oUEEem4wDLfcysGHA==", "dev": true }, + "node_modules/@types/ms": { + "version": "0.7.34", + "resolved": "https://registry.npmjs.org/@types/ms/-/ms-0.7.34.tgz", + "integrity": "sha512-nG96G3Wp6acyAgJqGasjODb+acrI7KltPiRxzHPXnP3NgI28bpQDRv53olbqGXbfcgF5aiiHmO3xpwEpS5Ld9g==", + "dev": true + }, "node_modules/@types/node": { "version": "22.10.2", "resolved": "https://registry.npmjs.org/@types/node/-/node-22.10.2.tgz", @@ -1332,16 +1348,16 @@ "dev": true }, "node_modules/@typescript-eslint/eslint-plugin": { - "version": "8.18.0", - "resolved": "https://registry.npmjs.org/@typescript-eslint/eslint-plugin/-/eslint-plugin-8.18.0.tgz", - "integrity": "sha512-NR2yS7qUqCL7AIxdJUQf2MKKNDVNaig/dEB0GBLU7D+ZdHgK1NoH/3wsgO3OnPVipn51tG3MAwaODEGil70WEw==", + "version": "8.18.1", + "resolved": "https://registry.npmjs.org/@typescript-eslint/eslint-plugin/-/eslint-plugin-8.18.1.tgz", + "integrity": "sha512-Ncvsq5CT3Gvh+uJG0Lwlho6suwDfUXH0HztslDf5I+F2wAFAZMRwYLEorumpKLzmO2suAXZ/td1tBg4NZIi9CQ==", "dev": true, "dependencies": { "@eslint-community/regexpp": "^4.10.0", - "@typescript-eslint/scope-manager": "8.18.0", - "@typescript-eslint/type-utils": "8.18.0", - "@typescript-eslint/utils": "8.18.0", - "@typescript-eslint/visitor-keys": "8.18.0", + "@typescript-eslint/scope-manager": "8.18.1", + "@typescript-eslint/type-utils": "8.18.1", + "@typescript-eslint/utils": "8.18.1", + "@typescript-eslint/visitor-keys": "8.18.1", "graphemer": "^1.4.0", "ignore": "^5.3.1", "natural-compare": "^1.4.0", @@ -1361,15 +1377,15 @@ } }, "node_modules/@typescript-eslint/parser": { - "version": "8.18.0", - "resolved": "https://registry.npmjs.org/@typescript-eslint/parser/-/parser-8.18.0.tgz", - "integrity": "sha512-hgUZ3kTEpVzKaK3uNibExUYm6SKKOmTU2BOxBSvOYwtJEPdVQ70kZJpPjstlnhCHcuc2WGfSbpKlb/69ttyN5Q==", + "version": "8.18.1", + "resolved": "https://registry.npmjs.org/@typescript-eslint/parser/-/parser-8.18.1.tgz", + "integrity": "sha512-rBnTWHCdbYM2lh7hjyXqxk70wvon3p2FyaniZuey5TrcGBpfhVp0OxOa6gxr9Q9YhZFKyfbEnxc24ZnVbbUkCA==", "dev": true, "dependencies": { - "@typescript-eslint/scope-manager": "8.18.0", - "@typescript-eslint/types": "8.18.0", - "@typescript-eslint/typescript-estree": "8.18.0", - "@typescript-eslint/visitor-keys": "8.18.0", + "@typescript-eslint/scope-manager": "8.18.1", + "@typescript-eslint/types": "8.18.1", + "@typescript-eslint/typescript-estree": "8.18.1", + "@typescript-eslint/visitor-keys": "8.18.1", "debug": "^4.3.4" }, "engines": { @@ -1385,13 +1401,13 @@ } }, "node_modules/@typescript-eslint/scope-manager": { - "version": "8.18.0", - "resolved": "https://registry.npmjs.org/@typescript-eslint/scope-manager/-/scope-manager-8.18.0.tgz", - "integrity": "sha512-PNGcHop0jkK2WVYGotk/hxj+UFLhXtGPiGtiaWgVBVP1jhMoMCHlTyJA+hEj4rszoSdLTK3fN4oOatrL0Cp+Xw==", + "version": "8.18.1", + "resolved": "https://registry.npmjs.org/@typescript-eslint/scope-manager/-/scope-manager-8.18.1.tgz", + "integrity": "sha512-HxfHo2b090M5s2+/9Z3gkBhI6xBH8OJCFjH9MhQ+nnoZqxU3wNxkLT+VWXWSFWc3UF3Z+CfPAyqdCTdoXtDPCQ==", "dev": true, "dependencies": { - "@typescript-eslint/types": "8.18.0", - "@typescript-eslint/visitor-keys": "8.18.0" + "@typescript-eslint/types": "8.18.1", + "@typescript-eslint/visitor-keys": "8.18.1" }, "engines": { "node": "^18.18.0 || ^20.9.0 || >=21.1.0" @@ -1402,13 +1418,13 @@ } }, "node_modules/@typescript-eslint/type-utils": { - "version": "8.18.0", - "resolved": "https://registry.npmjs.org/@typescript-eslint/type-utils/-/type-utils-8.18.0.tgz", - "integrity": "sha512-er224jRepVAVLnMF2Q7MZJCq5CsdH2oqjP4dT7K6ij09Kyd+R21r7UVJrF0buMVdZS5QRhDzpvzAxHxabQadow==", + "version": "8.18.1", + "resolved": "https://registry.npmjs.org/@typescript-eslint/type-utils/-/type-utils-8.18.1.tgz", + "integrity": "sha512-jAhTdK/Qx2NJPNOTxXpMwlOiSymtR2j283TtPqXkKBdH8OAMmhiUfP0kJjc/qSE51Xrq02Gj9NY7MwK+UxVwHQ==", "dev": true, "dependencies": { - "@typescript-eslint/typescript-estree": "8.18.0", - "@typescript-eslint/utils": "8.18.0", + "@typescript-eslint/typescript-estree": "8.18.1", + "@typescript-eslint/utils": "8.18.1", "debug": "^4.3.4", "ts-api-utils": "^1.3.0" }, @@ -1425,9 +1441,9 @@ } }, "node_modules/@typescript-eslint/types": { - "version": "8.18.0", - "resolved": "https://registry.npmjs.org/@typescript-eslint/types/-/types-8.18.0.tgz", - "integrity": "sha512-FNYxgyTCAnFwTrzpBGq+zrnoTO4x0c1CKYY5MuUTzpScqmY5fmsh2o3+57lqdI3NZucBDCzDgdEbIaNfAjAHQA==", + "version": "8.18.1", + "resolved": "https://registry.npmjs.org/@typescript-eslint/types/-/types-8.18.1.tgz", + "integrity": "sha512-7uoAUsCj66qdNQNpH2G8MyTFlgerum8ubf21s3TSM3XmKXuIn+H2Sifh/ES2nPOPiYSRJWAk0fDkW0APBWcpfw==", "dev": true, "engines": { "node": "^18.18.0 || ^20.9.0 || >=21.1.0" @@ -1438,13 +1454,13 @@ } }, "node_modules/@typescript-eslint/typescript-estree": { - "version": "8.18.0", - "resolved": "https://registry.npmjs.org/@typescript-eslint/typescript-estree/-/typescript-estree-8.18.0.tgz", - "integrity": "sha512-rqQgFRu6yPkauz+ms3nQpohwejS8bvgbPyIDq13cgEDbkXt4LH4OkDMT0/fN1RUtzG8e8AKJyDBoocuQh8qNeg==", + "version": "8.18.1", + "resolved": "https://registry.npmjs.org/@typescript-eslint/typescript-estree/-/typescript-estree-8.18.1.tgz", + "integrity": "sha512-z8U21WI5txzl2XYOW7i9hJhxoKKNG1kcU4RzyNvKrdZDmbjkmLBo8bgeiOJmA06kizLI76/CCBAAGlTlEeUfyg==", "dev": true, "dependencies": { - "@typescript-eslint/types": "8.18.0", - "@typescript-eslint/visitor-keys": "8.18.0", + "@typescript-eslint/types": "8.18.1", + "@typescript-eslint/visitor-keys": "8.18.1", "debug": "^4.3.4", "fast-glob": "^3.3.2", "is-glob": "^4.0.3", @@ -1500,15 +1516,15 @@ } }, "node_modules/@typescript-eslint/utils": { - "version": "8.18.0", - "resolved": "https://registry.npmjs.org/@typescript-eslint/utils/-/utils-8.18.0.tgz", - "integrity": "sha512-p6GLdY383i7h5b0Qrfbix3Vc3+J2k6QWw6UMUeY5JGfm3C5LbZ4QIZzJNoNOfgyRe0uuYKjvVOsO/jD4SJO+xg==", + "version": "8.18.1", + "resolved": "https://registry.npmjs.org/@typescript-eslint/utils/-/utils-8.18.1.tgz", + "integrity": "sha512-8vikiIj2ebrC4WRdcAdDcmnu9Q/MXXwg+STf40BVfT8exDqBCUPdypvzcUPxEqRGKg9ALagZ0UWcYCtn+4W2iQ==", "dev": true, "dependencies": { "@eslint-community/eslint-utils": "^4.4.0", - "@typescript-eslint/scope-manager": "8.18.0", - "@typescript-eslint/types": "8.18.0", - "@typescript-eslint/typescript-estree": "8.18.0" + "@typescript-eslint/scope-manager": "8.18.1", + "@typescript-eslint/types": "8.18.1", + "@typescript-eslint/typescript-estree": "8.18.1" }, "engines": { "node": "^18.18.0 || ^20.9.0 || >=21.1.0" @@ -1523,12 +1539,12 @@ } }, "node_modules/@typescript-eslint/visitor-keys": { - "version": "8.18.0", - "resolved": "https://registry.npmjs.org/@typescript-eslint/visitor-keys/-/visitor-keys-8.18.0.tgz", - "integrity": "sha512-pCh/qEA8Lb1wVIqNvBke8UaRjJ6wrAWkJO5yyIbs8Yx6TNGYyfNjOo61tLv+WwLvoLPp4BQ8B7AHKijl8NGUfw==", + "version": "8.18.1", + "resolved": "https://registry.npmjs.org/@typescript-eslint/visitor-keys/-/visitor-keys-8.18.1.tgz", + "integrity": "sha512-Vj0WLm5/ZsD013YeUKn+K0y8p1M0jPpxOkKdbD1wB0ns53a5piVY02zjf072TblEweAbcYiFiPoSMF3kp+VhhQ==", "dev": true, "dependencies": { - "@typescript-eslint/types": "8.18.0", + "@typescript-eslint/types": "8.18.1", "eslint-visitor-keys": "^4.2.0" }, "engines": { @@ -1856,9 +1872,9 @@ } }, "node_modules/caniuse-lite": { - "version": "1.0.30001688", - "resolved": "https://registry.npmjs.org/caniuse-lite/-/caniuse-lite-1.0.30001688.tgz", - "integrity": "sha512-Nmqpru91cuABu/DTCXbM2NSRHzM2uVHfPnhJ/1zEAJx/ILBRVmz3pzH4N7DZqbdG0gWClsCC05Oj0mJ/1AWMbA==", + "version": "1.0.30001690", + "resolved": "https://registry.npmjs.org/caniuse-lite/-/caniuse-lite-1.0.30001690.tgz", + "integrity": "sha512-5ExiE3qQN6oF8Clf8ifIDcMRCRE/dMGcETG/XGMD8/XiXm6HXQgQTh1yZYLXXpSOsEUlJm1Xr7kGULZTuGtP/w==", "dev": true, "funding": [ { @@ -2020,7 +2036,6 @@ "version": "4.4.0", "resolved": "https://registry.npmjs.org/debug/-/debug-4.4.0.tgz", "integrity": "sha512-6WTZ/IxCY/T6BALoZHaE4ctp9xm+Z5kY/pzYaCHRFeyVhojxlrm+46y68HA6hr0TcwEssoxNiDEUJQjfPZ/RYA==", - "dev": true, "dependencies": { "ms": "^2.1.3" }, @@ -2096,9 +2111,9 @@ } }, "node_modules/electron-to-chromium": { - "version": "1.5.73", - "resolved": "https://registry.npmjs.org/electron-to-chromium/-/electron-to-chromium-1.5.73.tgz", - "integrity": "sha512-8wGNxG9tAG5KhGd3eeA0o6ixhiNdgr0DcHWm85XPCphwZgD1lIEoi6t3VERayWao7SF7AAZTw6oARGJeVjH8Kg==", + "version": "1.5.75", + "resolved": "https://registry.npmjs.org/electron-to-chromium/-/electron-to-chromium-1.5.75.tgz", + "integrity": "sha512-Lf3++DumRE/QmweGjU+ZcKqQ+3bKkU/qjaKYhIJKEOhgIO9Xs6IiAQFkfFoj+RhgDk4LUeNsLo6plExHqSyu6Q==", "dev": true }, "node_modules/emittery": { @@ -2703,11 +2718,6 @@ "node": ">= 0.4" } }, - "node_modules/helpers-promise": { - "version": "1.3.0", - "resolved": "https://registry.npmjs.org/helpers-promise/-/helpers-promise-1.3.0.tgz", - "integrity": "sha512-4n/Xlggn5N+K7bU+d5UBt/Q1FRbT1WnOWzyCEyFS++PBXVJMpd2n5/RUkYr2qVZDz6bKzHbhDgvoI2CZZWErXw==" - }, "node_modules/html-escaper": { "version": "2.0.2", "resolved": "https://registry.npmjs.org/html-escaper/-/html-escaper-2.0.2.tgz", @@ -2800,9 +2810,9 @@ "dev": true }, "node_modules/is-core-module": { - "version": "2.16.0", - "resolved": "https://registry.npmjs.org/is-core-module/-/is-core-module-2.16.0.tgz", - "integrity": "sha512-urTSINYfAYgcbLb0yDQ6egFm6h3Mo1DcF9EkyXSRjjzdHbsulg01qhwWuXdOoUBuTkbQ80KDboXa0vFJ+BDH+g==", + "version": "2.16.1", + "resolved": "https://registry.npmjs.org/is-core-module/-/is-core-module-2.16.1.tgz", + "integrity": "sha512-UfoeMA6fIJ8wTYFEUjelnaGI67v6+N7qXJEvQuIGa99l4xsCruSYOVSQ0uPANn4dAzm8lkYPaKLrrijLq7x23w==", "dev": true, "dependencies": { "hasown": "^2.0.2" @@ -3540,9 +3550,9 @@ } }, "node_modules/jest-watch-typeahead/node_modules/chalk": { - "version": "5.3.0", - "resolved": "https://registry.npmjs.org/chalk/-/chalk-5.3.0.tgz", - "integrity": "sha512-dLitG79d+GV1Nb/VYcCDFivJeK1hiukt9QjRNVOsUtTy1rR1YJsmpGGTZ3qJos+uw7WmWF4wUwBd9jxjocFC2w==", + "version": "5.4.1", + "resolved": "https://registry.npmjs.org/chalk/-/chalk-5.4.1.tgz", + "integrity": "sha512-zgVZuo2WcZgfUEmsn6eO3kINexW8RAE4maiQ8QNs8CtpPCSyMiYsULR3HQYkm3w8FIA3SberyMJMSldGsW+U3w==", "dev": true, "engines": { "node": "^12.17.0 || ^14.13 || >=16.0.0" @@ -3894,8 +3904,7 @@ "node_modules/ms": { "version": "2.1.3", "resolved": "https://registry.npmjs.org/ms/-/ms-2.1.3.tgz", - "integrity": "sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA==", - "dev": true + "integrity": "sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA==" }, "node_modules/natural-compare": { "version": "1.4.0", @@ -4306,9 +4315,9 @@ } }, "node_modules/resolve": { - "version": "1.22.9", - "resolved": "https://registry.npmjs.org/resolve/-/resolve-1.22.9.tgz", - "integrity": "sha512-QxrmX1DzraFIi9PxdG5VkRfRwIgjwyud+z/iBwfRRrVmHc+P9Q7u2lSSpQ6bjr2gy5lrqIiU9vb6iAeGf2400A==", + "version": "1.22.10", + "resolved": "https://registry.npmjs.org/resolve/-/resolve-1.22.10.tgz", + "integrity": "sha512-NPRy+/ncIMeDlTAsuqwKIiferiawhefFJtkNSW0qZJEqMEb+qBt/77B/jGeeek+F0uOeN05CDa6HXbbIgtVX4w==", "dev": true, "dependencies": { "is-core-module": "^2.16.0", @@ -4318,6 +4327,9 @@ "bin": { "resolve": "bin/resolve" }, + "engines": { + "node": ">= 0.4" + }, "funding": { "url": "https://github.com/sponsors/ljharb" } @@ -4757,14 +4769,14 @@ } }, "node_modules/typescript-eslint": { - "version": "8.18.0", - "resolved": "https://registry.npmjs.org/typescript-eslint/-/typescript-eslint-8.18.0.tgz", - "integrity": "sha512-Xq2rRjn6tzVpAyHr3+nmSg1/9k9aIHnJ2iZeOH7cfGOWqTkXTm3kwpQglEuLGdNrYvPF+2gtAs+/KF5rjVo+WQ==", + "version": "8.18.1", + "resolved": "https://registry.npmjs.org/typescript-eslint/-/typescript-eslint-8.18.1.tgz", + "integrity": "sha512-Mlaw6yxuaDEPQvb/2Qwu3/TfgeBHy9iTJ3mTwe7OvpPmF6KPQjVOfGyEJpPv6Ez2C34OODChhXrzYw/9phI0MQ==", "dev": true, "dependencies": { - "@typescript-eslint/eslint-plugin": "8.18.0", - "@typescript-eslint/parser": "8.18.0", - "@typescript-eslint/utils": "8.18.0" + "@typescript-eslint/eslint-plugin": "8.18.1", + "@typescript-eslint/parser": "8.18.1", + "@typescript-eslint/utils": "8.18.1" }, "engines": { "node": "^18.18.0 || ^20.9.0 || >=21.1.0" diff --git a/package.json b/package.json index 4278d3b..ba3e13c 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "@nerjs/batchloader", "version": "1.0.13", - "main": "lib/index.js", + "main": "dist/index.js", "scripts": { "build": "tsc", "watch": "tsc --watch", @@ -16,14 +16,19 @@ "keywords": [ "batches", "dataloader", + "deduplicate", "async", "Promise", "timeout", "cache" ], + "files": [ + "dist" + ], "devDependencies": { "@eslint/eslintrc": "^3.2.0", "@eslint/js": "^9.17.0", + "@types/debug": "^4.1.12", "@types/eslint__js": "^8.42.3", "@types/jest": "^29.5.14", "@types/node": "^22.10.2", @@ -38,6 +43,6 @@ "typescript-eslint": "^8.18.0" }, "dependencies": { - "helpers-promise": "^1.3.0" + "debug": "^4.4.0" } } diff --git a/src/__tests__/first.spec.ts b/src/__tests__/first.spec.ts deleted file mode 100644 index 00d62b8..0000000 --- a/src/__tests__/first.spec.ts +++ /dev/null @@ -1,3 +0,0 @@ -describe('draft', () => { - it.todo('draft') -}) diff --git a/src/batch-aggregator/__tests__/batch-aggregator.spec.ts b/src/batch-aggregator/__tests__/batch-aggregator.spec.ts new file mode 100644 index 0000000..d01af36 --- /dev/null +++ b/src/batch-aggregator/__tests__/batch-aggregator.spec.ts @@ -0,0 +1,137 @@ +import { LoaderError, TimeoutError } from '../../utils/errors' +import { sleep } from '../../utils/sleep' +import { BatchAggregator } from '../batch-aggregator' +import { IBatchAggregatorOptions } from '../interfaces' + +/** Mock implementations */ + +describe('BatchAggregator', () => { + let aggregator: BatchAggregator + + const defaultOptions: IBatchAggregatorOptions = { + maxBatchSize: 3, + batchTimeMs: 10, + timeoutMs: 500, + } + const mockLoader = jest.fn(async (batch: number[], signal: AbortSignal) => { + if (signal.aborted) throw new Error('Aborted') + return batch.map(item => item * 2) // Example: doubling numbers + }) + + beforeEach(() => { + mockLoader.mockClear() + }) + + afterEach(() => { + aggregator?.clear() + }) + + /** Negative Test Cases */ + describe('Negative Cases', () => { + it('throws error if batchLoaderFn returns non-array response', async () => { + aggregator = new BatchAggregator(async () => 42 as any, defaultOptions) + await expect(aggregator.load(1)).rejects.toThrow(LoaderError) + }) + + it('throws error if batchLoaderFn response length does not match requests', async () => { + aggregator = new BatchAggregator(async () => [1, 2], defaultOptions) + await expect(aggregator.load(1)).rejects.toThrow(LoaderError) + }) + + it('throws error when batchLoaderFn fails', async () => { + const error = new Error('Some error') + aggregator = new BatchAggregator(async () => { + throw error + }, defaultOptions) + await expect(aggregator.load(1)).rejects.toThrow(error) + }) + }) + + /** Positive Test Cases (No concurrency limit) */ + describe('Positive Cases (Unlimited Concurrency)', () => { + beforeEach(() => { + aggregator = new BatchAggregator(mockLoader, defaultOptions) + }) + + it('processes batch when maxBatchSize is reached', async () => { + const results = await Promise.all([aggregator.load(1), aggregator.load(2), aggregator.load(3)]) + expect(results).toEqual([2, 4, 6]) + expect(mockLoader).toHaveBeenCalledTimes(1) + }) + + it('processes batch based on batchTimeMs', async () => { + const resultPromise = aggregator.load(1) + await sleep(150) + expect(await resultPromise).toBe(2) + expect(mockLoader).toHaveBeenCalledTimes(1) + }) + + it('returns results in the same order as requests', async () => { + const results = await Promise.all([aggregator.load(3), aggregator.load(1), aggregator.load(2)]) + expect(results).toEqual([6, 2, 4]) + }) + + it('splits requests into multiple batches if maxBatchSize is exceeded', async () => { + const results = await Promise.all([ + aggregator.load(1), + aggregator.load(2), + aggregator.load(3), + aggregator.load(4), + aggregator.load(5), + ]) + + expect(results).toEqual([2, 4, 6, 8, 10]) + expect(mockLoader).toHaveBeenCalledTimes(2) + expect(mockLoader).toHaveBeenCalledWith([1, 2, 3], expect.any(AbortSignal)) + expect(mockLoader).toHaveBeenCalledWith([4, 5], expect.any(AbortSignal)) + }) + }) + + /** Specific Test Cases (Concurrency Limit) */ + describe('Specific Cases (Concurrency Limit)', () => { + beforeEach(() => { + aggregator = new BatchAggregator(mockLoader, { + ...defaultOptions, + concurrencyLimit: 2, + }) + }) + + it('limits parallel execution to concurrencyLimit', async () => { + const results = Promise.all([aggregator.load(1), aggregator.load(2), aggregator.load(3), aggregator.load(4)]) + + await sleep(200) + expect(mockLoader).toHaveBeenCalledTimes(2) + + await results + expect(mockLoader).toHaveBeenCalledTimes(2) + }) + + it('processes tasks from the waiting queue when active tasks complete', async () => { + const promise1 = aggregator.load(1) + const promise2 = aggregator.load(2) + const promise3 = aggregator.load(3) + const promise4 = aggregator.load(4) + + await sleep(200) + expect(mockLoader).toHaveBeenCalledTimes(2) + + await Promise.all([promise1, promise2, promise3, promise4]) + expect(mockLoader).toHaveBeenCalledTimes(2) + }) + + it('handles waiting timeouts properly', async () => { + mockLoader.mockImplementation(async (arr: number[]) => { + await sleep(200) + return arr.map(n => n * 2) + }) + aggregator = new BatchAggregator(mockLoader, { + ...defaultOptions, + concurrencyLimit: 1, + maxBatchSize: 1, + maxWaitingTimeMs: 100, + }) + + await Promise.all([expect(aggregator.load(1)).resolves.toEqual(2), expect(aggregator.load(3)).rejects.toThrow(TimeoutError)]) + }) + }) +}) diff --git a/src/batch-aggregator/batch-aggregator.ts b/src/batch-aggregator/batch-aggregator.ts new file mode 100644 index 0000000..9f61cd6 --- /dev/null +++ b/src/batch-aggregator/batch-aggregator.ts @@ -0,0 +1,99 @@ +import { ILimitedTimekeeperMetrics, ITask, ITimekeeper } from '../timekeeper/interfaces' +import { LimitedTimekeeper } from '../timekeeper/limited.timekeeper' +import { UnlimitedTimekeeper } from '../timekeeper/unlimited.timekeeper' +import createDebug from 'debug' +import { BatchLoaderFn, IBatchAggregatorMetrics, IBatchAggregatorOptions } from './interfaces' +import { LoaderError } from '../utils/errors' +const debug = createDebug('batchloader:aggregator') + +interface TaskData { + requests: T[] + responses: R[] +} + +const createTimekeeperMetrics = (metrics?: IBatchAggregatorMetrics): ILimitedTimekeeperMetrics | undefined => { + if (!metrics) return undefined + + const tkMetrics: ILimitedTimekeeperMetrics> = {} + + if (metrics.resolveBatch) tkMetrics.resolveTask = task => metrics.resolveBatch?.(task.data.requests.length) + if (metrics.rejectBatch) tkMetrics.rejectTask = (_, task) => metrics.rejectBatch?.(task.data.requests.length) + if (metrics.parallelBatches) tkMetrics.runTask = runnedSize => metrics.parallelBatches?.(runnedSize) + if (metrics.waitingBatches) tkMetrics.waitTask = runnedSize => metrics.waitingBatches?.(runnedSize) + + return tkMetrics +} + +export class BatchAggregator { + private readonly timekeeper: ITimekeeper> + + private readonly batchRunner = async (task: ITask>, signal: AbortSignal) => { + this.metrics?.rejectBatch?.(task.data.requests.length) + debug(`Running batchRunner with a query array of length ${task.data.requests.length}. task id="${task.id}"`) + const response = await this.batchLoaderFn([...task.data.requests], signal) + if (!Array.isArray(response) || response.length !== task.data.requests.length) + throw new LoaderError(`The result of batchLoadFn must be an array equal in length to the query array `) + + task.data.responses = response + } + + constructor( + private readonly batchLoaderFn: BatchLoaderFn, + private readonly options: IBatchAggregatorOptions, + private readonly metrics?: IBatchAggregatorMetrics, + ) { + const { concurrencyLimit, maxWaitingTimeMs, batchTimeMs: runMs, timeoutMs } = options + const initialDataFactory = () => ({ requests: [], responses: [] }) + this.timekeeper = + concurrencyLimit && concurrencyLimit > 0 && concurrencyLimit < Infinity + ? new LimitedTimekeeper( + { + concurrencyLimit, + initialDataFactory, + maxWaitingTimeMs: maxWaitingTimeMs || 60_000, + runMs, + runner: this.batchRunner, + timeoutMs, + callRejectedTask: false, + }, + createTimekeeperMetrics(metrics), + ) + : new UnlimitedTimekeeper( + { + initialDataFactory, + runMs, + runner: this.batchRunner, + timeoutMs, + callRejectedTask: false, + }, + createTimekeeperMetrics(metrics), + ) + + debug(`Create BatchAggregator with ${this.timekeeper.constructor.name}`) + } + + private getCurrentTask(): ITask> { + const task = this.timekeeper.current() + if (task.data.requests.length >= this.options.maxBatchSize) { + debug(`The size of the current batch has reached the maximum. size=${task.data.requests.length}`) + this.timekeeper.run() + return this.getCurrentTask() + } + return task + } + + async load(request: T): Promise { + const task = this.getCurrentTask() + const index = task.data.requests.length + this.metrics?.loadBatchItem?.() + debug(`Load data. task id="${task.id}"; curent index="${index}"`) + task.data.requests.push(request) + await this.timekeeper.wait(task) + + return task.data.responses[index] + } + + clear() { + this.timekeeper.clear() + } +} diff --git a/src/batch-aggregator/interfaces.ts b/src/batch-aggregator/interfaces.ts new file mode 100644 index 0000000..9d10a12 --- /dev/null +++ b/src/batch-aggregator/interfaces.ts @@ -0,0 +1,37 @@ +export interface IBatchAggregatorOptions { + /** + * @description Maximum number of parallel tasks (default: unlimited) + */ + concurrencyLimit?: number + + /** + * @description Maximum number of requests per batch + */ + maxBatchSize: number + + /** + * @description Maximum time to form a batch + */ + batchTimeMs: number + + /** + * @description Maximum waiting time for tasks in the queue (only if concurrencyLimit > 0) + */ + maxWaitingTimeMs?: number + + /** + * @description Maximum execution time for batchFn (the function passed as the first argument) + */ + timeoutMs: number +} + +export type BatchLoaderFn = (batchArray: T[], signal: AbortSignal) => Promise | R[] + +export interface IBatchAggregatorMetrics { + loadBatchItem?: () => void + runBatch?: (batchSize: number) => void + resolveBatch?: (batchSize: number) => void + rejectBatch?: (batchSize: number) => void + parallelBatches?: (batchesCount: number) => void + waitingBatches?: (batchesCount: number) => void +} diff --git a/src/batch-loader/__tests__/batch-loader.spec.ts b/src/batch-loader/__tests__/batch-loader.spec.ts new file mode 100644 index 0000000..f091626 --- /dev/null +++ b/src/batch-loader/__tests__/batch-loader.spec.ts @@ -0,0 +1,157 @@ +import { BatchLoader } from '../batch-loader' +import { MapCache, StubCache } from '../cache-adapter' +import { LoaderError, TimeoutError } from '../../utils/errors' +import { sleep } from '../../utils/sleep' +import { IBatchLoaderOptions, ICache } from '../interfaces' + +const defaultOptions: IBatchLoaderOptions = { + timeoutMs: 100, + maxBatchSize: 3, + batchTimeMs: 50, +} + +describe('BatchLoader', () => { + let loader: BatchLoader + + const batchLoaderFn = jest.fn(async (queries: number[]) => { + await sleep(50, true) + return queries.map(q => q * 2) + }) + + beforeEach(() => { + jest.clearAllMocks() + loader = new BatchLoader(batchLoaderFn, defaultOptions) + }) + + afterEach(() => { + loader.clear() + }) + + /** Negative Cases */ + describe('Negative Cases', () => { + let cache: ICache + const timeoutMs = 100 + + beforeEach(() => { + cache = new StubCache() + loader = new BatchLoader(batchLoaderFn, { ...defaultOptions, timeoutMs, cache }) + }) + + it('throws TimeoutError when batch execution exceeds timeoutMs', async () => { + batchLoaderFn.mockImplementationOnce(async (raw: number[]) => { + await sleep(timeoutMs * 2, true) + return raw.map(n => n * 2) + }) + await expect(loader.load(1)).rejects.toThrow(TimeoutError) + expect(batchLoaderFn).toHaveBeenCalled() + }) + + it('Throws a TimeoutError if the cache fetch has exceeded timeoutMs', async () => { + cache.get = () => sleep(timeoutMs * 2, true) + await expect(loader.load(1)).rejects.toThrow(TimeoutError) + expect(batchLoaderFn).not.toHaveBeenCalled() + }) + + it('Throws a TimeoutError if the cache retention has exceeded timeoutMs', async () => { + cache.set = () => sleep(timeoutMs * 2, true) + await expect(loader.load(1)).rejects.toThrow(TimeoutError) + expect(batchLoaderFn).toHaveBeenCalled() + }) + + it('throws error if batchLoaderFn returns non-array response', async () => { + batchLoaderFn.mockImplementationOnce(async () => 'some value' as any) + await expect(loader.load(1)).rejects.toThrow(LoaderError) + }) + + it('throws error if batchLoaderFn response length does not match requests', async () => { + batchLoaderFn.mockImplementationOnce(async () => [1, 2]) + await expect(loader.load(1)).rejects.toThrow(LoaderError) + }) + + it('throws error when batchLoaderFn fails', async () => { + const error = new Error('Some error') + batchLoaderFn.mockImplementationOnce(async () => { + throw error + }) + await expect(loader.load(1)).rejects.toThrow(error) + }) + }) + + /** Positive Cases */ + describe('Positive Cases', () => { + const batchTimeMs = 50 + const maxBatchSize = 3 + + beforeEach(() => { + loader = new BatchLoader(batchLoaderFn, { ...defaultOptions, batchTimeMs, maxBatchSize }) + }) + + it('processes queries and returns results', async () => { + const result = await loader.load(5) + expect(result).toBe(10) + }) + + it('batches requests up to maxBatchSize', async () => { + const results = await Promise.all([loader.load(1), loader.load(2), loader.load(3)]) + expect(batchLoaderFn).toHaveBeenCalledTimes(1) + expect(results).toEqual([2, 4, 6]) + }) + + it('Splits requests into groups when maxBatchSize is exceeded', async () => { + const results = await Promise.all([loader.load(1), loader.load(2), loader.load(3), loader.load(4)]) + expect(batchLoaderFn).toHaveBeenCalledTimes(2) + expect(results).toEqual([2, 4, 6, 8]) + }) + + it('handles requests arriving within batchTimeMs', async () => { + const promises = [loader.load(1), sleep(batchTimeMs - 5).then(() => loader.load(2))] + const results = await Promise.all(promises) + expect(results).toEqual([2, 4]) + expect(batchLoaderFn).toHaveBeenCalledTimes(1) + }) + + it('Requests that did not get to the batches after batchTimeMs expiration are added to the next batches.', async () => { + const promises = [loader.load(1), sleep(batchTimeMs + 5).then(() => loader.load(2))] + const results = await Promise.all(promises) + expect(results).toEqual([2, 4]) + expect(batchLoaderFn).toHaveBeenCalledTimes(2) + }) + }) + + /** Cache Handling */ + describe('Cache Handling', () => { + let cache: ICache + + beforeEach(() => { + cache = new MapCache() + jest.spyOn(cache, 'get').mockResolvedValue(undefined) + jest.spyOn(cache, 'set').mockResolvedValue(undefined) + jest.spyOn(cache, 'delete').mockResolvedValue(undefined) + jest.spyOn(cache, 'clear').mockResolvedValue(undefined) + loader = new BatchLoader(batchLoaderFn, { ...defaultOptions, cache: cache }) + }) + + it('uses cache to retrieve stored results', async () => { + jest.spyOn(cache, 'get').mockResolvedValue(42) + const result = await loader.load(1) + expect(result).toBe(42) + expect(cache.get).toHaveBeenCalledWith('1') + expect(batchLoaderFn).not.toHaveBeenCalled() + }) + + it('writes results to cache after batch execution', async () => { + await loader.load(1) + expect(cache.set).toHaveBeenCalledWith('1', 2) + }) + + it('resets cache for a specific query', async () => { + await loader.resetCache(1) + expect(cache.delete).toHaveBeenCalledWith('1') + }) + + it('clears all cache entries', async () => { + await loader.flush() + expect(cache.clear).toHaveBeenCalled() + }) + }) +}) diff --git a/src/batch-loader/__tests__/cache-adapter.spec.ts b/src/batch-loader/__tests__/cache-adapter.spec.ts new file mode 100644 index 0000000..9dcc1ac --- /dev/null +++ b/src/batch-loader/__tests__/cache-adapter.spec.ts @@ -0,0 +1,67 @@ +import { CacheAdapter, MapCache } from '../cache-adapter' +import { Key } from '../../utils/interfaces' +import { ICache } from '../interfaces' + +/** Общие тестовые параметры */ +const testKey: Key = 'testKey' +const testValue = 'testValue' + +describe('Cache module', () => { + /** Тесты для StubCache */ + describe('CacheAdapter without cache instance (use StubCache)', () => { + let adapter: ICache + + beforeEach(() => { + adapter = new CacheAdapter() + }) + + it('get() always resolves to undefined', async () => { + await adapter.set(testKey, testValue) + await expect(adapter.get(testKey)).resolves.toBeUndefined() + }) + + it('set() resolves without errors', async () => { + await expect(adapter.set(testKey, testValue)).resolves.toBeUndefined() + }) + + it('delete() resolves without errors', async () => { + await expect(adapter.delete(testKey)).resolves.toBeUndefined() + }) + + it('clear() resolves without errors', async () => { + await expect(adapter.clear()).resolves.toBeUndefined() + }) + }) + + describe('CacheAdapter with cache instance', () => { + let cache: ICache + let adapter: CacheAdapter + + beforeEach(() => { + cache = new MapCache() + adapter = new CacheAdapter(cache) + }) + + it('delegates get() calls to the underlying cache', async () => { + await cache.set(testKey, testValue) + await expect(adapter.get(testKey)).resolves.toBe(testValue) + }) + + it('delegates set() calls to the underlying cache', async () => { + await adapter.set(testKey, testValue) + await expect(cache.get(testKey)).resolves.toBe(testValue) + }) + + it('delegates delete() calls to the underlying cache', async () => { + await adapter.set(testKey, testValue) + await adapter.delete(testKey) + await expect(cache.get(testKey)).resolves.toBeUndefined() + }) + + it('delegates clear() calls to the underlying cache', async () => { + await adapter.set(testKey, testValue) + await adapter.clear() + await expect(cache.get(testKey)).resolves.toBeUndefined() + }) + }) +}) diff --git a/src/batch-loader/batch-loader.ts b/src/batch-loader/batch-loader.ts new file mode 100644 index 0000000..63ede30 --- /dev/null +++ b/src/batch-loader/batch-loader.ts @@ -0,0 +1,72 @@ +import { BatchAggregator } from '../batch-aggregator/batch-aggregator' +import { BatchLoaderFn } from '../batch-aggregator/interfaces' +import { Deduplicator } from '../deduplicator/deduplicator' +import { Key } from '../utils/interfaces' +import { CacheAdapter } from './cache-adapter' +import { IBatchLoaderOptions } from './interfaces' + +const prepareOptions = (options: IBatchLoaderOptions) => ({ + getKey: (query: K) => `${query}`, + timeoutMs: 60_000, + unrefTimeouts: false, + concurrencyLimit: Infinity, + maxBatchSize: 1000, + batchTimeMs: 50, + maxWaitingTimeMs: 60, + ...options, +}) + +export class BatchLoader { + private readonly cache: CacheAdapter + private readonly deduplicator: Deduplicator + private readonly aggregator: BatchAggregator + private readonly getKey: (query: K) => Key + + constructor(batchLoaderFn: BatchLoaderFn, options: IBatchLoaderOptions) { + const { cache, getKey, timeoutMs, unrefTimeouts, batchTimeMs, concurrencyLimit, maxBatchSize, maxWaitingTimeMs } = + prepareOptions(options) + + this.getKey = getKey + this.cache = new CacheAdapter(cache) + this.deduplicator = new Deduplicator(this.deduplicatorRunner, { + getKey, + timeoutMs: timeoutMs + batchTimeMs, + unrefTimeouts: !!unrefTimeouts, + }) + this.aggregator = new BatchAggregator(batchLoaderFn, { timeoutMs, batchTimeMs, concurrencyLimit, maxBatchSize, maxWaitingTimeMs }) + } + + private readonly deduplicatorRunner = async (query: K, signal: AbortSignal): Promise => { + const key = this.getKey(query) + const cached = await this.cache.get(key) + + if (signal.aborted) throw signal.reason + if (cached !== undefined) return cached + + this.deduplicator.restartTimeout(query) + + const loaded = await this.aggregator.load(query) + + this.deduplicator.restartTimeout(query) + await this.cache.set(key, loaded) + + return loaded + } + + load(query: K) { + return this.deduplicator.call(query) + } + + async resetCache(query: K) { + await this.cache.delete(this.getKey(query)) + } + + clear() { + this.deduplicator.clear() + this.aggregator.clear() + } + + async flush() { + await this.cache.clear() + } +} diff --git a/src/batch-loader/cache-adapter.ts b/src/batch-loader/cache-adapter.ts new file mode 100644 index 0000000..472cbdc --- /dev/null +++ b/src/batch-loader/cache-adapter.ts @@ -0,0 +1,61 @@ +import { Key } from '../utils/interfaces' +import { ICache } from './interfaces' + +export class StubCache implements ICache { + async get(_key: Key): Promise { + return Promise.resolve(undefined) + } + async set(_key: Key, _data: any): Promise {} + async delete(_key: Key): Promise {} + async clear(): Promise {} +} + +/** + * Simple cache implementation based on Map. + * Designed for local in-memory usage. + * Methods are implemented with an asynchronous interface for compatibility + * with other cache types, such as external databases or distributed systems. + * Note that this implementation does not support TTL and relies on external + * mechanisms for managing data expiration. + */ +export class MapCache implements ICache { + private readonly map = new Map() + + async get(key: Key): Promise { + return this.map.get(key) + } + + async set(key: Key, data: T): Promise { + this.map.set(key, data) + } + + async delete(key: Key): Promise { + this.map.delete(key) + } + + async clear(): Promise { + this.map.clear() + } +} + +export class CacheAdapter implements ICache { + private readonly cache: ICache + constructor(cache?: ICache) { + this.cache = cache || new StubCache() + } + get(key: Key): Promise { + return this.cache.get(key) + } + + set(key: Key, data: T): Promise { + return this.cache.set(key, data) + } + + delete(key: Key): Promise { + return this.cache.delete(key) + } + + clear(): Promise { + return this.cache.clear() + } +} diff --git a/src/batch-loader/interfaces.ts b/src/batch-loader/interfaces.ts new file mode 100644 index 0000000..e7f2449 --- /dev/null +++ b/src/batch-loader/interfaces.ts @@ -0,0 +1,54 @@ +import { Key } from '../utils/interfaces' + +export interface ICache { + get(key: Key): Promise + set(key: Key, data: T): Promise + delete(key: Key): Promise + clear(): Promise +} + +export interface IBatchLoaderOptions { + /** + * @description Function to extract the key from a query + * @default query => `${query}` + */ + getKey?: (query: K) => Key + + cache?: ICache + + /** + * @description Task execution timeout in milliseconds + * @default 60_000 + */ + timeoutMs?: number + + /** + * @description Allows timers to avoid blocking the event loop + * @default false + */ + unrefTimeouts?: boolean + + /** + * @description Maximum number of parallel tasks (default: unlimited) + * @default Infinity + */ + concurrencyLimit?: number + + /** + * @description Maximum number of requests per batch + * @default 1000 + */ + maxBatchSize?: number + + /** + * @description Maximum time in milliseconds to form a batch + * @default 50 + */ + batchTimeMs?: number + + /** + * @description Maximum waiting time in milliseconds for tasks in the queue (only if concurrencyLimit > 0) + * @default 60_000 + */ + maxWaitingTimeMs?: number +} diff --git a/src/deduplicator/__tests__/deduplicator.spec.ts b/src/deduplicator/__tests__/deduplicator.spec.ts new file mode 100644 index 0000000..27eb58c --- /dev/null +++ b/src/deduplicator/__tests__/deduplicator.spec.ts @@ -0,0 +1,127 @@ +import { Deduplicator } from '../deduplicator' +import { Defer } from '../../utils/defer' +import { RejectedAbortError, SilentAbortError, TimeoutError } from '../../utils/errors' +import { sleep } from '../../utils/sleep' +import { IDeduplicatorOptions } from '../interfaces' + +/** Mock runner function */ +const mockRunner = jest.fn(async (query: number, signal: AbortSignal) => { + if (signal.aborted) throw new Error('Aborted') + await sleep(50, true) // Simulate async work + return query * 2 +}) + +/** Test options */ +const defaultOptions: IDeduplicatorOptions = { + getKey: (query: number) => query, + timeoutMs: 100, + unrefTimeouts: false, +} + +describe('Deduplicator', () => { + let deduplicator: Deduplicator + + beforeEach(() => { + jest.clearAllMocks() + deduplicator = new Deduplicator(mockRunner, defaultOptions) + }) + + afterEach(() => { + deduplicator.clear() + }) + + /** Negative Cases */ + describe('Negative Cases', () => { + it('throws TimeoutError when execution exceeds timeoutMs', async () => { + const slowRunner = jest.fn(async () => { + await sleep(200, true) + return 42 + }) + + deduplicator = new Deduplicator(slowRunner, defaultOptions) + + await expect(deduplicator.call(1)).rejects.toThrow(TimeoutError) + }) + + it('throws SilentAbortError when task is aborted manually', async () => { + const call = deduplicator.call(1) + deduplicator.clear() + await expect(call).rejects.toThrow(SilentAbortError) + }) + + it('throws RejectedAbortError when runner fails and is aborted', async () => { + const defer = new Defer() + const failingRunner = jest.fn(async (_, signal) => { + signal.addEventListener('abort', () => defer.reject(signal.reason)) + throw new Error('Failure') + }) + + deduplicator = new Deduplicator(failingRunner, defaultOptions) + + await expect(deduplicator.call(1)).rejects.toThrow('Failure') + await expect(defer.promise).rejects.toThrow(RejectedAbortError) + }) + }) + + /** Positive Cases */ + describe('Positive Cases', () => { + it('processes query and returns result', async () => { + const result = await deduplicator.call(5) + expect(result).toBe(10) + }) + + it('deduplicates calls with the same key', async () => { + const calls = await Promise.all([deduplicator.call(2), deduplicator.call(2)]) + expect(mockRunner).toHaveBeenCalledTimes(1) + expect(calls).toEqual([4, 4]) + }) + + it('processes multiple distinct queries', async () => { + const results = await Promise.all([deduplicator.call(1), deduplicator.call(2)]) + expect(results).toEqual([2, 4]) + expect(mockRunner).toHaveBeenCalledTimes(2) + }) + }) + + /** Cleanup and Edge Cases */ + describe('Cleanup and Edge Cases', () => { + it('clears all tasks on manual clear()', async () => { + const call1 = deduplicator.call(1) + const call2 = deduplicator.call(2) + + deduplicator.clear() + + await expect(call1).rejects.toThrow(SilentAbortError) + await expect(call2).rejects.toThrow(SilentAbortError) + }) + + it('handles abort during processing', async () => { + const defer = new Defer() + const runner = jest.fn(async (_, signal) => { + signal.addEventListener('abort', () => defer.reject(new Error('Aborted'))) + return defer.promise + }) + + deduplicator = new Deduplicator(runner, defaultOptions) + const call = deduplicator.call(1) + + deduplicator.clear() + await expect(defer.promise).rejects.toThrow('Aborted') + await expect(call).rejects.toThrow(SilentAbortError) + }) + + it('restarts timeout with restartTimeout()', async () => { + const runner = jest.fn(async () => { + await sleep(150, true) + return 42 + }) + + deduplicator = new Deduplicator(runner, { ...defaultOptions, timeoutMs: 100 }) + const result = deduplicator.call(1) + await sleep(75) + deduplicator.restartTimeout(1) // Restart timeout before expiration + + await expect(result).resolves.toBe(42) + }) + }) +}) diff --git a/src/deduplicator/deduplicator.ts b/src/deduplicator/deduplicator.ts new file mode 100644 index 0000000..b1e1d17 --- /dev/null +++ b/src/deduplicator/deduplicator.ts @@ -0,0 +1,109 @@ +import { Defer } from '../utils/defer' +import createDebug from 'debug' +import { RejectedAbortError, SilentAbortError, TimeoutError } from '../utils/errors' +import { DeduplicatorRunnerCallback, IDeduplicatorOptions } from './interfaces' +import { Key } from '../utils/interfaces' +const debug = createDebug('batchloader:deduplicator') + +interface Runner { + defer: Defer + controller: AbortController + tid: NodeJS.Timeout +} + +export class Deduplicator { + private readonly runners = new Map>() + + constructor( + private readonly runnerFn: DeduplicatorRunnerCallback, + private readonly options: IDeduplicatorOptions, + ) {} + + private async run(query: T, signal: AbortSignal): Promise { + try { + debug('run next runner') + return await this.runnerFn(query, signal) + } catch (error) { + debug(`Aborted runner terminated with an error`) + throw error + } + } + + private callError(key: Key, error: unknown) { + const runner = this.runners.get(key) + if (runner) { + runner.defer.reject(error) + runner.controller.abort(error) + this.clearRunner(key) + } + } + + private clearRunner(key: Key) { + const runner = this.runners.get(key) + if (runner) { + clearTimeout(runner.tid) + this.runners.delete(key) + } + } + + private createTimeout(key: Key): NodeJS.Timeout { + return setTimeout(() => this.callError(key, new TimeoutError(this.options.timeoutMs)), this.options.timeoutMs) + } + + private createRunner(key: Key, query: T): Defer { + const defer = new Defer() + + const controller = new AbortController() + + const tid = this.createTimeout(key) + if (this.options.unrefTimeouts) tid?.unref?.() + + this.run(query, controller.signal) + .then(result => defer.resolve(result)) + .catch(error => defer.reject(error)) + + defer.promise + .catch(() => { + if (!controller.signal.aborted) controller.abort(new RejectedAbortError('deduplicate')) + }) + .finally(() => this.clearRunner(key)) + + this.runners.set(key, { defer, controller, tid }) + + return defer + } + + private getOrCreateRunner(query: T): Defer { + const key = this.options.getKey(query) + const current = this.runners.get(key) + if (current) return current.defer + return this.createRunner(key, query) + } + + /** + * @description Adds a query to the execution queue or joins an already running request with the same key. Returns a promise with the result of the task execution. + */ + async call(query: T): Promise { + debug('Call next runner') + const current = this.getOrCreateRunner(query) + return await current.promise + } + + restartTimeout(query: T) { + const key = this.options.getKey(query) + const runned = this.runners.get(key) + + if (runned) { + clearTimeout(runned.tid) + runned.tid = this.createTimeout(key) + } + } + + /** + * @description Cancels all active tasks and clears their state. + */ + clear() { + this.runners.forEach((_, key) => this.callError(key, new SilentAbortError('deduplicate'))) + this.runners.clear() + } +} diff --git a/src/deduplicator/interfaces.ts b/src/deduplicator/interfaces.ts new file mode 100644 index 0000000..bed8525 --- /dev/null +++ b/src/deduplicator/interfaces.ts @@ -0,0 +1,20 @@ +import { Key } from '../utils/interfaces' + +export type DeduplicatorRunnerCallback = (query: T, signal: AbortSignal) => Promise | R + +export interface IDeduplicatorOptions { + /** + * @description Function to extract the key from a query + */ + getKey: (query: T) => Key + + /** + * @description Task execution timeout + */ + timeoutMs: number + + /** + * @description Allows timers to avoid blocking the event loop + */ + unrefTimeouts?: boolean +} diff --git a/src/index.ts b/src/index.ts index cbf92a5..1a95577 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1 +1,23 @@ -console.log(123) +export { Key } from './utils/interfaces' + +export { IBatchLoaderOptions, ICache } from './batch-loader/interfaces' +export { BatchLoader } from './batch-loader/batch-loader' +export { CacheAdapter, MapCache } from './batch-loader/cache-adapter' + +export { DeduplicatorRunnerCallback, IDeduplicatorOptions } from './deduplicator/interfaces' +export { Deduplicator } from './deduplicator/deduplicator' + +export { BatchLoaderFn, IBatchAggregatorOptions } from './batch-aggregator/interfaces' +export { BatchAggregator } from './batch-aggregator/batch-aggregator' + +export { + ITask, + ITimekeeper, + InitiateDataFactory, + LimitedTimekeeperOptions, + UnlimitedTimekeeperOptions, + TaskStatus, + TimekeeperRunnerCallback, +} from './timekeeper/interfaces' +export { UnlimitedTimekeeper } from './timekeeper/unlimited.timekeeper' +export { LimitedTimekeeper } from './timekeeper/limited.timekeeper' diff --git a/src/timekeeper/__tests__/limited.timekeeper.spec.ts b/src/timekeeper/__tests__/limited.timekeeper.spec.ts new file mode 100644 index 0000000..094e00b --- /dev/null +++ b/src/timekeeper/__tests__/limited.timekeeper.spec.ts @@ -0,0 +1,167 @@ +import { AbortError, TimeoutError } from '../../utils/errors' +import { sleep } from '../../utils/sleep' +import { ITask, LimitedTimekeeperOptions } from '../interfaces' +import { LimitedTimekeeper } from '../limited.timekeeper' + +type Data = { field: string } + +describe('Unlimited Timekeeper', () => { + const runMs = 100 + const timeoutMs = 1000 + const concurrencyLimit = 1 + const maxWaitingTimeMs = 500 + const runnerFn = jest.fn() + const options: LimitedTimekeeperOptions = { + initialDataFactory: () => ({ field: 'qwerty' }), + runMs, + runner: runnerFn, + timeoutMs, + concurrencyLimit, + maxWaitingTimeMs, + } + let timekeeper: LimitedTimekeeper + + beforeEach(() => { + timekeeper = new LimitedTimekeeper(options) + jest.clearAllMocks() + jest.resetAllMocks() + runnerFn.mockImplementation(() => sleep(100, true)) + }) + + afterEach(() => { + timekeeper.clear() + }) + + const skipTasks = () => { + const tasks: ITask[] = [] + + for (let i = 0; i < concurrencyLimit; i++) { + tasks.push(timekeeper.current()) + timekeeper.run() + } + + return tasks + } + + it('When tasks are activated, only the first {concurrencyLimit} transition to runned, others remain pending', async () => { + for (const task of skipTasks()) expect(task.status).toEqual('runned') + + const nextTask = timekeeper.current() + timekeeper.run() + + expect(timekeeper.waiting()).toEqual([nextTask]) + expect(nextTask.status).toEqual('pending') + expect(runnerFn).toHaveBeenCalledTimes(concurrencyLimit) + }) + + it('After a running task completes, a task from the waiting list starts', async () => { + skipTasks() + + const task = timekeeper.current() + timekeeper.run() + + await sleep(runMs) + expect(task.status).toEqual('runned') + expect(runnerFn).toHaveBeenCalledTimes(concurrencyLimit + 1) + expect(timekeeper.waiting().length).toEqual(0) + }) + + describe('If a task remains in the waiting list longer than {maxWaitingTimeMs}', () => { + it('With {callRejectedTask = true}, the runner is called with an aborted signal', async () => { + timekeeper = new LimitedTimekeeper({ + ...options, + callRejectedTask: true, + }) + runnerFn.mockImplementation(async () => sleep(maxWaitingTimeMs + 100, true)) + skipTasks() + + runnerFn.mockClear() + + const task = timekeeper.current() + timekeeper.run() + await expect(() => timekeeper.wait(task)).rejects.toThrow(TimeoutError) + expect(runnerFn).toHaveBeenCalledWith(task, expect.objectContaining({ aborted: true, reason: expect.any(TimeoutError) })) + expect(task.status).toEqual('rejected') + }) + + it('With {callRejectedTask = false}, the task simply changes status to rejected', async () => { + timekeeper = new LimitedTimekeeper({ + ...options, + callRejectedTask: false, + }) + runnerFn.mockImplementation(async () => sleep(maxWaitingTimeMs + 100, true)) + skipTasks() + + runnerFn.mockClear() + + const task = timekeeper.current() + timekeeper.run() + await expect(() => timekeeper.wait(task)).rejects.toThrow(TimeoutError) + expect(runnerFn).not.toHaveBeenCalled() + expect(task.status).toEqual('rejected') + }) + }) + + describe('Calling abort on a pending task', () => { + describe('With {callRejectedTask = true}', () => { + beforeEach(() => { + timekeeper = new LimitedTimekeeper({ + ...options, + callRejectedTask: true, + }) + }) + + it('For the current task, starts the runner with an aborted signal and changes its status to rejected', async () => { + const task = timekeeper.current() + setTimeout(() => timekeeper.abort(task), 10) + + await expect(() => timekeeper.wait(task)).rejects.toThrow(AbortError) + expect(runnerFn).toHaveBeenCalledWith(task, expect.objectContaining({ aborted: true, reason: expect.any(AbortError) })) + expect(task.status).toEqual('rejected') + }) + + it('For a task in the waiting list, starts the runner with an aborted signal and changes its status to rejected', async () => { + skipTasks() + runnerFn.mockClear() + + const task = timekeeper.current() + timekeeper.run() + setTimeout(() => timekeeper.abort(task), 10) + + await expect(() => timekeeper.wait(task)).rejects.toThrow(AbortError) + expect(runnerFn).toHaveBeenCalledWith(task, expect.objectContaining({ aborted: true, reason: expect.any(AbortError) })) + expect(task.status).toEqual('rejected') + }) + }) + + describe('With {callRejectedTask = false}', () => { + beforeEach(() => { + timekeeper = new LimitedTimekeeper({ + ...options, + callRejectedTask: false, + }) + }) + it('For the current task, simply changes its status to rejected', async () => { + const task = timekeeper.current() + setTimeout(() => timekeeper.abort(task), 10) + + await expect(() => timekeeper.wait(task)).rejects.toThrow(AbortError) + expect(runnerFn).not.toHaveBeenCalled() + expect(task.status).toEqual('rejected') + }) + + it('For a task in the waiting list, simply changes its status to rejected', async () => { + skipTasks() + runnerFn.mockClear() + + const task = timekeeper.current() + timekeeper.run() + setTimeout(() => timekeeper.abort(task), 10) + + await expect(() => timekeeper.wait(task)).rejects.toThrow(AbortError) + expect(runnerFn).not.toHaveBeenCalled() + expect(task.status).toEqual('rejected') + }) + }) + }) +}) diff --git a/src/timekeeper/__tests__/unlimited.timekeeper.spec.ts b/src/timekeeper/__tests__/unlimited.timekeeper.spec.ts new file mode 100644 index 0000000..39c4e31 --- /dev/null +++ b/src/timekeeper/__tests__/unlimited.timekeeper.spec.ts @@ -0,0 +1,168 @@ +import { AbortError, TimeoutError } from '../../utils/errors' +import { sleep } from '../../utils/sleep' +import { ITask, UnlimitedTimekeeperOptions } from '../interfaces' +import { UnlimitedTimekeeper } from '../unlimited.timekeeper' + +type Data = { field: string } + +describe('Unlimited Timekeeper', () => { + const runMs = 100 + const timeoutMs = 1000 + const runnerFn = jest.fn() + const options: UnlimitedTimekeeperOptions = { + initialDataFactory: () => ({ field: 'qwerty' }), + runMs, + runner: runnerFn, + timeoutMs, + } + + let timekeeper: UnlimitedTimekeeper + + beforeEach(() => { + timekeeper = new UnlimitedTimekeeper(options) + jest.clearAllMocks() + runnerFn.mockImplementation(() => sleep(100, true)) + }) + + afterEach(() => { + timekeeper.clear() + }) + + it('Before activation, current() returns a single task with status pending', () => { + expect(timekeeper.current().id).toEqual(timekeeper.current().id) + }) + + it('After the specified time, the runner is executed, and the status changes to runned', async () => { + const task = timekeeper.current() + await sleep(runMs) + expect(task.status).toEqual('runned') + expect(runnerFn).toHaveBeenCalledWith(task, expect.any(AbortSignal)) + }) + + it('Calling run() triggers the runner and changes the status to runned', () => { + const task = timekeeper.current() + timekeeper.run() + + expect(task.status).toEqual('runned') + expect(runnerFn).toHaveBeenCalledWith(task, expect.any(AbortSignal)) + }) + + it('Repeated calls to run() have no effect', () => { + timekeeper.current() + + timekeeper.run() + timekeeper.run() + + expect(runnerFn).toHaveBeenCalledTimes(1) + }) + + it('After calling run(), the next call to current() creates a new task', () => { + const task1 = timekeeper.current() + timekeeper.run() + const task2 = timekeeper.current() + + expect(task1.id).not.toEqual(task2.id) + expect(task2.status).toEqual('pending') + }) + + it('A task successfully completed (synchronously) changes status to resolved', async () => { + runnerFn.mockImplementation(() => {}) + const task = timekeeper.current() + timekeeper.run() + + await timekeeper.wait(task) + expect(task.status).toEqual('resolved') + }) + + it('A task successfully completed (asynchronously) changes status to resolved', async () => { + runnerFn.mockImplementation(async () => sleep(100, true)) + const task = timekeeper.current() + timekeeper.run() + + await timekeeper.wait(task) + expect(task.status).toEqual('resolved') + }) + + it('A task that ends with an error (synchronously) changes status to rejected', async () => { + const error = new Error('qwerty') + runnerFn.mockImplementation(() => { + throw error + }) + const task = timekeeper.current() + timekeeper.run() + + await expect(() => timekeeper.wait(task)).rejects.toThrow(error) + expect(task.status).toEqual('rejected') + }) + + it('A task that ends with an error (asynchronously) changes status to rejected', async () => { + const error = new Error('qwerty') + runnerFn.mockImplementation(async () => { + await sleep(100, true) + throw error + }) + const task = timekeeper.current() + timekeeper.run() + + await expect(() => timekeeper.wait(task)).rejects.toThrow(error) + expect(task.status).toEqual('rejected') + }) + + it('If the runner does not complete before {timeoutMs}, abort is triggered', async () => { + const callAbort = jest.fn() + runnerFn.mockImplementation(async (_task: ITask, signal: AbortSignal) => { + signal.addEventListener('abort', () => callAbort(signal.reason)) + await sleep(timeoutMs + 100, true) + }) + + const task = timekeeper.current() + timekeeper.run() + + await expect(() => timekeeper.wait(task)).rejects.toThrow(TimeoutError) + expect(callAbort).toHaveBeenCalledWith(expect.any(TimeoutError)) + }) + + describe('Calling abort on a pending task', () => { + it('With {callRejectedTask = true}, starts the runner with an aborted signal and changes the task status to rejected', async () => { + timekeeper = new UnlimitedTimekeeper({ + ...options, + callRejectedTask: true, + }) + const task = timekeeper.current() + setTimeout(() => timekeeper.abort(task), 10) + + await expect(() => timekeeper.wait(task)).rejects.toThrow(AbortError) + expect(runnerFn).toHaveBeenCalledWith(task, expect.objectContaining({ aborted: true, reason: expect.any(AbortError) })) + expect(task.status).toEqual('rejected') + }) + + it('With {callRejectedTask = false}, simply changes the task status to rejected', async () => { + timekeeper = new UnlimitedTimekeeper({ + ...options, + callRejectedTask: false, + }) + const task = timekeeper.current() + setTimeout(() => timekeeper.abort(task), 10) + + await expect(() => timekeeper.wait(task)).rejects.toThrow(AbortError) + expect(runnerFn).not.toHaveBeenCalled() + expect(task.status).toEqual('rejected') + }) + }) + + it('Calling abort on a runned task changes its status to rejected and triggers abort on the signal', async () => { + const callAbort = jest.fn() + runnerFn.mockImplementation(async (_task: ITask, signal: AbortSignal) => { + signal.addEventListener('abort', () => callAbort(signal.reason)) + await sleep(timeoutMs + 100, true) + }) + + const task = timekeeper.current() + timekeeper.run() + setTimeout(() => timekeeper.abort(task), 10) + + await expect(() => timekeeper.wait(task)).rejects.toThrow(AbortError) + expect(callAbort).toHaveBeenCalledWith(expect.any(AbortError)) + expect(task.status).toEqual('rejected') + }) +}) diff --git a/src/timekeeper/__tests__/with-metrics.timekeeper.spec.ts b/src/timekeeper/__tests__/with-metrics.timekeeper.spec.ts new file mode 100644 index 0000000..b3aa5d2 --- /dev/null +++ b/src/timekeeper/__tests__/with-metrics.timekeeper.spec.ts @@ -0,0 +1,173 @@ +import { sleep } from '../../utils/sleep' +import { + ILimitedTimekeeperMetrics, + ITask, + ITimekeeper, + IUnlimitedTimekeeperMetrics, + LimitedTimekeeperOptions, + UnlimitedTimekeeperOptions, +} from '../interfaces' +import { LimitedTimekeeper } from '../limited.timekeeper' +import { UnlimitedTimekeeper } from '../unlimited.timekeeper' + +class UnlimitedMetrics implements IUnlimitedTimekeeperMetrics { + private created: number = 0 + private runned: number = 0 + private maxRunnedSize = 0 + private resolved = 0 + private rejected = 0 + private maxRunnedTime = 0 + create() { + this.created++ + } + + runTask(runnedSize: number, _task: ITask) { + this.runned++ + if (this.maxRunnedSize < runnedSize) this.maxRunnedSize = runnedSize + } + + resolveTask(task: ITask) { + this.resolved++ + const runnedTime = Date.now() - (task.runnedAt || task.createdAt) + if (this.maxRunnedTime < runnedTime) this.maxRunnedTime = runnedTime + } + + rejectTask(error: unknown, task: ITask) { + this.rejected++ + const runnedTime = Date.now() - (task.runnedAt || task.createdAt) + if (this.maxRunnedTime < runnedTime) this.maxRunnedTime = runnedTime + } + + toJSON() { + const { created, runned, maxRunnedSize, resolved, rejected, maxRunnedTime } = this + return { created, runned, maxRunnedSize, resolved, rejected, maxRunnedTime } + } +} + +class LimitedMetrics extends UnlimitedMetrics implements ILimitedTimekeeperMetrics { + private maxWaitingSize = 0 + private maxWaitingTime = 0 + waitTask(waitListSize: number) { + if (this.maxWaitingSize < waitListSize) this.maxWaitingSize = waitListSize + } + + runTask(runnedSize: number, task: ITask): void { + super.runTask(runnedSize, task) + const waitingTime = Date.now() - (task.runnedAt || task.createdAt) + if (this.maxWaitingTime < waitingTime) this.maxWaitingTime = waitingTime + } + + toJSON() { + const { maxWaitingSize, maxWaitingTime } = this + return { + ...super.toJSON(), + maxWaitingTime, + maxWaitingSize, + } as const + } +} + +describe.skip('Timekeepers with metrics.', () => { + const runMs = 100 + const timeoutMs = 1000 + const runnerFn = jest.fn() + const options: UnlimitedTimekeeperOptions = { + initialDataFactory: () => ({}), + runMs, + runner: runnerFn, + timeoutMs, + } + + let timekeeper: ITimekeeper + + beforeEach(() => { + jest.clearAllMocks() + runnerFn.mockImplementation(() => sleep(100, true)) + }) + + afterEach(() => { + timekeeper.clear() + }) + + describe('Unlimited', () => { + let metrics: LimitedMetrics + beforeEach(() => { + metrics = new LimitedMetrics() + timekeeper = new UnlimitedTimekeeper(options, metrics) + }) + + it('runned size', () => { + for (let i = 0; i < 3; i++) { + timekeeper.current() + timekeeper.run() + } + timekeeper.current() + + expect(metrics.toJSON().runned).toEqual(3) + expect(metrics.toJSON().maxRunnedSize).toEqual(3) + expect(metrics.toJSON().created).toEqual(4) + }) + + it('resolved & rejected', async () => { + for (let i = 0; i < 3; i++) { + timekeeper.current() + timekeeper.run() + } + for (let i = 0; i < 2; i++) { + const task = timekeeper.current() + timekeeper.run() + await sleep(10) + timekeeper.abort(task) + } + + await sleep(100) + + expect(metrics.toJSON().resolved).toEqual(3) + expect(metrics.toJSON().rejected).toEqual(2) + }) + + it('max run time', async () => { + runnerFn.mockImplementation(() => sleep(800, true)) + const task = timekeeper.current() + timekeeper.run() + runnerFn.mockImplementation(() => sleep(500, true)) + timekeeper.current() + timekeeper.run() + + await timekeeper.wait(task) + + expect(metrics.toJSON().maxRunnedTime).toBeGreaterThanOrEqual(800) + expect(metrics.toJSON().maxRunnedTime).toBeLessThan(900) + }) + }) + + describe('Limited', () => { + const unlimitedOptions: LimitedTimekeeperOptions = { + ...options, + concurrencyLimit: 10, + maxWaitingTimeMs: 2000, + } + let metrics: LimitedMetrics + + beforeEach(() => { + metrics = new LimitedMetrics() + timekeeper = new LimitedTimekeeper(unlimitedOptions, metrics) + }) + + it('waiting size', () => { + for (let i = 0; i < unlimitedOptions.concurrencyLimit; i++) { + timekeeper.current() + timekeeper.run() + } + for (let i = 0; i < 3; i++) { + timekeeper.current() + timekeeper.run() + } + + expect(metrics.toJSON().runned).toEqual(unlimitedOptions.concurrencyLimit) + expect(metrics.toJSON().maxRunnedSize).toEqual(unlimitedOptions.concurrencyLimit) + expect(metrics.toJSON().created).toEqual(unlimitedOptions.concurrencyLimit + 3) + expect(metrics.toJSON().maxWaitingSize).toEqual(3) + }) + }) +}) diff --git a/src/timekeeper/interfaces.ts b/src/timekeeper/interfaces.ts new file mode 100644 index 0000000..30c3af8 --- /dev/null +++ b/src/timekeeper/interfaces.ts @@ -0,0 +1,47 @@ +export type TaskStatus = 'pending' | 'runned' | 'resolved' | 'rejected' + +export interface ITask { + readonly id: string + status: TaskStatus + data: D + runnedAt: number | null + createdAt: number +} + +export type TimekeeperRunnerCallback = (task: ITask, signal: AbortSignal) => Promise | void +export type InitiateDataFactory = () => D + +export interface ITimekeeper { + current(): ITask + run(): void + abort(task: string | ITask, reason?: unknown): void + wait(task: string | ITask): Promise + clear(): void +} + +export interface UnlimitedTimekeeperOptions { + initialDataFactory: InitiateDataFactory + runMs: number + runner: TimekeeperRunnerCallback + timeoutMs: number + callRejectedTask?: boolean +} + +export interface LimitedOptions { + concurrencyLimit: number + maxWaitingTimeMs: number +} + +export type LimitedTimekeeperOptions = UnlimitedTimekeeperOptions & LimitedOptions + +export interface IUnlimitedTimekeeperMetrics { + create?: () => void + forcedRun?: () => void + abort?: (task: ITask, error: unknown) => void + runTask?: (runnedSize: number, task: ITask) => void + resolveTask?: (task: ITask) => void + rejectTask?: (error: unknown, task: ITask) => void +} +export interface ILimitedTimekeeperMetrics extends IUnlimitedTimekeeperMetrics { + waitTask?: (waitListSize: number) => void +} diff --git a/src/timekeeper/limited.timekeeper.ts b/src/timekeeper/limited.timekeeper.ts new file mode 100644 index 0000000..f8e857f --- /dev/null +++ b/src/timekeeper/limited.timekeeper.ts @@ -0,0 +1,67 @@ +import { SilentAbortError, TimeoutError } from '../utils/errors' +import { ILimitedTimekeeperMetrics, ITask, ITimekeeper, LimitedOptions, LimitedTimekeeperOptions } from './interfaces' +import { Task } from './task' +import { UnlimitedTimekeeper } from './unlimited.timekeeper' + +import createDebug from 'debug' +const debug = createDebug('batchloader:timekeeper') + +export class LimitedTimekeeper extends UnlimitedTimekeeper implements ITimekeeper { + private readonly limitedOptions: LimitedOptions + + private waitingTasks: Task[] = [] + + constructor({ concurrencyLimit, maxWaitingTimeMs, ...options }: LimitedTimekeeperOptions, metrics?: ILimitedTimekeeperMetrics) { + super(options, metrics) + this.limitedOptions = { concurrencyLimit, maxWaitingTimeMs } + } + + waiting(): ITask[] { + return this.waitingTasks.map(task => task.inner) + } + + private runNextWaitingTask() { + const next = this.waitingTasks.shift() + if (next) { + if (next.tid) clearTimeout(next.tid) + debug(`Attempting to run a task from the waiting list. id="${next.id}"`) + this.runTask(next) + } + } + + protected runTask(task: Task): void { + if (this.runnedTasks.size < this.limitedOptions.concurrencyLimit) { + super.runTask(task) + task.defer.promise.finally(() => this.runNextWaitingTask()).catch(() => {}) + return + } + const runnedTime = Date.now() + task.tid = setTimeout(() => { + debug( + `A task on the waiting list is waiting longer than it should. id="${task.id}"; time="${Date.now() - runnedTime}"; maxWaitingTimeMs="${this.limitedOptions.maxWaitingTimeMs}"`, + ) + this.abort(task.id, new TimeoutError(this.limitedOptions.maxWaitingTimeMs)) + }, this.limitedOptions.maxWaitingTimeMs)?.unref() + this.waitingTasks.push(task) + this.metrics?.waitTask?.(this.waitingTasks.length) + debug(`The task has been added to the waiting list. id="${task.id}"`) + } + + protected findTaskById(id: string): Task | null { + return super.findTaskById(id) || this.waitingTasks.find(task => task.id === id) || null + } + + protected rejectPendingTask(task: Task, error: unknown): void { + if (this.currentTask?.id === task.id) return super.rejectPendingTask(task, error) + this.waitingTasks = this.waitingTasks.filter(({ id }) => id !== task.id) + if (task.tid) clearTimeout(task.tid) + this.metrics?.rejectTask?.(error, task.inner) + this.callAbortedRunner(task, error) + debug(`The task was rejected. id="${task.id}"`) + } + + clear(): void { + super.clear() + this.waitingTasks.forEach(task => this.abort(task.inner, new SilentAbortError('timekeeper'))) + } +} diff --git a/src/timekeeper/task.ts b/src/timekeeper/task.ts new file mode 100644 index 0000000..bbe4678 --- /dev/null +++ b/src/timekeeper/task.ts @@ -0,0 +1,50 @@ +import { Defer } from '../utils/defer' +import { randomString } from '../utils/string' +import { ITask, TaskStatus } from './interfaces' + +export class InnerTask implements ITask { + #data: D + + get data() { + return this.#data + } + + constructor( + data: D, + private readonly task: Task, + ) { + this.#data = data + } + + get id() { + return this.task.id + } + + get status() { + return this.task.status + } + + get runnedAt() { + return this.task.runnedAt + } + + get createdAt() { + return this.task.createdAt + } +} + +export class Task { + readonly id: string + public status: TaskStatus = 'pending' + public readonly createdAt: number = Date.now() + public runnedAt: number | null = null + readonly inner: InnerTask + readonly defer = new Defer() + controller?: AbortController + tid?: NodeJS.Timeout | null = null + + constructor(data: D) { + this.id = randomString() + this.inner = new InnerTask(data, this) + } +} diff --git a/src/timekeeper/unlimited.timekeeper.ts b/src/timekeeper/unlimited.timekeeper.ts new file mode 100644 index 0000000..fcfdb0e --- /dev/null +++ b/src/timekeeper/unlimited.timekeeper.ts @@ -0,0 +1,151 @@ +import { AbortError, SilentAbortError, TimeoutError } from '../utils/errors' +import { isLoaderError } from '../utils/is' +import { ITask, ITimekeeper, IUnlimitedTimekeeperMetrics, UnlimitedTimekeeperOptions } from './interfaces' +import { Task } from './task' +import createDebug from 'debug' +const debug = createDebug('batchloader:timekeeper') + +export class UnlimitedTimekeeper implements ITimekeeper { + protected currentTask: Task | null = null + protected runnedTasks = new Map>() + + constructor( + protected readonly options: UnlimitedTimekeeperOptions, + protected readonly metrics?: M, + ) {} + + current(): ITask { + if (this.currentTask) return this.currentTask.inner + const task = new Task(this.options.initialDataFactory()) + debug(`Create new task. id="${task.id}"`) + this.currentTask = task + this.startRunnerTimeout() + this.metrics?.create?.() + return task.inner + } + + run(): void { + if (!this.currentTask) return + this.clearRunnerTimeout() + this.metrics?.forcedRun?.() + debug(`The current task is started manually. id="${this.currentTask.id}"`) + this.runCurrentTask() + } + + abort(task: string | ITask, reason?: unknown): void { + const target = this.findTask(task) + if (target) { + debug(`Abort task. id="${target.id}"`) + const error = isLoaderError(reason) ? reason : new AbortError('timekeeper', reason) + this.metrics?.abort?.(target.inner, error) + switch (target.status) { + case 'runned': + this.rejectRunnedTask(target, error) + break + case 'pending': + this.rejectPendingTask(target, error) + break + } + } + } + + async wait(task: string | ITask): Promise { + await this.findTask(task)?.defer.promise + } + + clear(): void { + this.clearRunnerTimeout() + this.runnedTasks.forEach(task => this.abort(task.inner, new SilentAbortError('timekeeper'))) + } + + private findTask(taskId: string | Pick, 'id'>): Task | null { + if (typeof taskId === 'object') return this.findTask(taskId.id) + return this.findTaskById(taskId) + } + + protected findTaskById(id: string): Task | null { + if (this.currentTask && this.currentTask.id === id) return this.currentTask + return this.runnedTasks.get(id) || null + } + + private tidRunner: NodeJS.Timeout | null = null + private startRunnerTimeout() { + this.clearRunnerTimeout() + + this.tidRunner = setTimeout(() => { + debug(`The current task is started by a timer. id=${this.currentTask?.id}`) + this.runCurrentTask() + }, this.options.runMs)?.unref() + } + + private clearRunnerTimeout() { + if (this.tidRunner) { + clearTimeout(this.tidRunner) + this.tidRunner = null + } + } + + private runCurrentTask() { + if (this.currentTask) { + this.runTask(this.currentTask) + this.currentTask = null + } + } + + private async callRunner(task: Task, signal: AbortSignal, toThrow?: boolean) { + try { + await this.options.runner(task.inner, signal) + } catch (error) { + debug(`Aborted runner terminated with an error. id="${task.id}"`) + if (toThrow) throw error + } + } + + protected runTask(task: Task) { + task.status = 'runned' + task.controller = task.controller || new AbortController() + task.tid = setTimeout(() => this.abort(task.id, new TimeoutError(this.options.timeoutMs)), this.options.timeoutMs)?.unref() + + this.runnedTasks.set(task.id, task) + this.metrics?.runTask?.(this.runnedTasks.size, task.inner) + + this.callRunner(task, task.controller.signal, true) + .then(() => this.resolveTask(task)) + .catch(error => this.rejectRunnedTask(task, error)) + } + + private resolveTask(task: Task) { + if (task.tid) clearTimeout(task.tid) + this.runnedTasks.delete(task.id) + if (task.status !== 'runned') return + task.status = 'resolved' + debug(`The task was resolved. id="${task.id}"`) + task.defer.resolve(task.inner.data) + this.metrics?.resolveTask?.(task.inner) + } + + private rejectRunnedTask(task: Task, error: unknown) { + if (task.tid) clearTimeout(task.tid) + task.status = 'rejected' + this.runnedTasks.delete(task.id) + task.controller?.abort(error) + task.defer.reject(error) + this.metrics?.rejectTask?.(error, task.inner) + debug(`The task was rejected. id="${task.id}"`) + } + + protected rejectPendingTask(task: Task, error: unknown) { + this.clearRunnerTimeout() + this.currentTask = null + this.metrics?.rejectTask?.(error, task.inner) + this.callAbortedRunner(task, error) + debug(`The task was rejected. id="${task.id}"`) + } + + protected callAbortedRunner(task: Task, error: unknown) { + task.status = 'rejected' + + if (this.options.callRejectedTask) this.callRunner(task, AbortSignal.abort(error), false) + task.defer.reject(error) + } +} diff --git a/src/utils/defer.ts b/src/utils/defer.ts new file mode 100644 index 0000000..121988e --- /dev/null +++ b/src/utils/defer.ts @@ -0,0 +1,21 @@ +export class Defer { + #resolveFn?: (value: T | PromiseLike) => void + #rejectFn?: (reason: any) => void + + readonly promise = new Promise((resolve, reject) => { + this.#resolveFn = resolve + this.#rejectFn = reject + }) + + constructor() { + this.promise.catch(() => {}) + } + + resolve(value: T | PromiseLike) { + this.#resolveFn?.(value) + } + + reject(reason: any) { + this.#rejectFn?.(reason) + } +} diff --git a/src/utils/errors.ts b/src/utils/errors.ts new file mode 100644 index 0000000..0252fc2 --- /dev/null +++ b/src/utils/errors.ts @@ -0,0 +1,28 @@ +export class LoaderError extends Error { + get name() { + return this.constructor.name + } +} + +export class TimeoutError extends LoaderError { + constructor(readonly delay: number) { + super(`Operation exceeded the maximum timeout of ${delay} ms.`) + } +} + +export class AbortError extends LoaderError { + constructor( + readonly operation: string, + reason?: unknown, + ) { + super(`Operation "${operation}" was aborted${reason && typeof reason === 'string' ? `. ${reason}` : ''}`, { cause: reason }) + } +} + +export class SilentAbortError extends AbortError {} + +export class RejectedAbortError extends AbortError { + constructor(operation: string) { + super(operation, 'Operation runner was rejected') + } +} diff --git a/src/utils/interfaces.ts b/src/utils/interfaces.ts new file mode 100644 index 0000000..0d808ea --- /dev/null +++ b/src/utils/interfaces.ts @@ -0,0 +1 @@ +export type Key = string | number diff --git a/src/utils/is.ts b/src/utils/is.ts new file mode 100644 index 0000000..0d820ac --- /dev/null +++ b/src/utils/is.ts @@ -0,0 +1,5 @@ +import { LoaderError } from './errors' + +const isObject = (value: any): value is Record => value && typeof value === 'object' +export const isError = (value: any): value is Error => isObject(value) && value instanceof Error +export const isLoaderError = (value: any): value is LoaderError => isError(value) && value instanceof LoaderError diff --git a/src/utils/signals.ts b/src/utils/signals.ts new file mode 100644 index 0000000..e69de29 diff --git a/src/utils/sleep.ts b/src/utils/sleep.ts new file mode 100644 index 0000000..d30c271 --- /dev/null +++ b/src/utils/sleep.ts @@ -0,0 +1,6 @@ +export const sleep = (t: number, unref?: boolean): Promise => + new Promise(r => { + const timer = setTimeout(r, t) + // @ts-ignore + if (unref) timer?.unref() + }) diff --git a/src/utils/string.ts b/src/utils/string.ts new file mode 100644 index 0000000..bd213c0 --- /dev/null +++ b/src/utils/string.ts @@ -0,0 +1,8 @@ +let num = 0 +const count = () => { + num++ + return num +} + +export const randomString = () => + `${count().toString(32)}${Date.now().toString(32).substring(4)}${Math.random().toString(32).substring(2, 5)}` diff --git a/tsconfig.json b/tsconfig.json index 44b7d06..f4d58f1 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -4,11 +4,9 @@ "types": ["node", "jest"], "moduleResolution": "Node", "declaration": true, - "removeComments": true, - "emitDecoratorMetadata": true, - "experimentalDecorators": true, + "removeComments": false, "allowSyntheticDefaultImports": true, - "target": "ES2022", + "target": "ES2023", "sourceMap": true, "outDir": "./dist", "baseUrl": "./src",