Skip to content

Commit 1b3aa17

Browse files
Rename keyed worker and update terminal manager
- Rename the shared coalescing worker API and exports - Switch terminal persistence to the keyed worker - Drop the unused global drain helper
1 parent 4772126 commit 1b3aa17

4 files changed

Lines changed: 19 additions & 30 deletions

File tree

apps/server/src/terminal/Layers/Manager.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import {
66
type TerminalSessionSnapshot,
77
type TerminalSessionStatus,
88
} from "@t3tools/contracts";
9-
import { makeCoalescingDrainableWorker } from "@t3tools/shared/CoalescingDrainableWorker";
9+
import { makeKeyedCoalescingWorker } from "@t3tools/shared/KeyedCoalescingWorker";
1010
import {
1111
Data,
1212
Effect,
@@ -804,7 +804,7 @@ export const makeTerminalManagerWithOptions = Effect.fn("makeTerminalManagerWith
804804
yield* registerKillFiber(process, fiber);
805805
});
806806

807-
const persistWorker = yield* makeCoalescingDrainableWorker<
807+
const persistWorker = yield* makeKeyedCoalescingWorker<
808808
string,
809809
PersistHistoryRequest,
810810
never,

packages/shared/package.json

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,9 @@
2828
"types": "./src/DrainableWorker.ts",
2929
"import": "./src/DrainableWorker.ts"
3030
},
31-
"./CoalescingDrainableWorker": {
32-
"types": "./src/CoalescingDrainableWorker.ts",
33-
"import": "./src/CoalescingDrainableWorker.ts"
31+
"./KeyedCoalescingWorker": {
32+
"types": "./src/KeyedCoalescingWorker.ts",
33+
"import": "./src/KeyedCoalescingWorker.ts"
3434
},
3535
"./schemaJson": {
3636
"types": "./src/schemaJson.ts",

packages/shared/src/CoalescingDrainableWorker.test.ts renamed to packages/shared/src/KeyedCoalescingWorker.test.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@ import { it } from "@effect/vitest";
22
import { describe, expect } from "vitest";
33
import { Deferred, Effect } from "effect";
44

5-
import { makeCoalescingDrainableWorker } from "./CoalescingDrainableWorker";
5+
import { makeKeyedCoalescingWorker } from "./KeyedCoalescingWorker";
66

7-
describe("makeCoalescingDrainableWorker", () => {
7+
describe("makeKeyedCoalescingWorker", () => {
88
it.live("waits for latest work enqueued during active processing before draining the key", () =>
99
Effect.scoped(
1010
Effect.gen(function* () {
@@ -14,7 +14,7 @@ describe("makeCoalescingDrainableWorker", () => {
1414
const secondStarted = yield* Deferred.make<void>();
1515
const releaseSecond = yield* Deferred.make<void>();
1616

17-
const worker = yield* makeCoalescingDrainableWorker<string, string, never, never>({
17+
const worker = yield* makeKeyedCoalescingWorker<string, string, never, never>({
1818
merge: (_current, next) => next,
1919
process: (key, value) =>
2020
Effect.gen(function* () {
@@ -64,7 +64,7 @@ describe("makeCoalescingDrainableWorker", () => {
6464
const releaseFailure = yield* Deferred.make<void>();
6565
const secondProcessed = yield* Deferred.make<void>();
6666

67-
const worker = yield* makeCoalescingDrainableWorker<string, string, string, never>({
67+
const worker = yield* makeKeyedCoalescingWorker<string, string, string, never>({
6868
merge: (_current, next) => next,
6969
process: (key, value) =>
7070
Effect.gen(function* () {

packages/shared/src/CoalescingDrainableWorker.ts renamed to packages/shared/src/KeyedCoalescingWorker.ts

Lines changed: 10 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,33 @@
11
/**
2-
* CoalescingDrainableWorker - A keyed worker that keeps only the latest value per key.
2+
* KeyedCoalescingWorker - A keyed worker that keeps only the latest value per key.
33
*
44
* Enqueues for an active or already-queued key are merged atomically instead of
55
* creating duplicate queued items. `drainKey()` resolves only when that key has
66
* no queued, pending, or active work left.
77
*
8-
* @module CoalescingDrainableWorker
8+
* @module KeyedCoalescingWorker
99
*/
1010
import type { Scope } from "effect";
1111
import { Effect, TxQueue, TxRef } from "effect";
1212

13-
export interface CoalescingDrainableWorker<K, V> {
13+
export interface KeyedCoalescingWorker<K, V> {
1414
readonly enqueue: (key: K, value: V) => Effect.Effect<void>;
1515
readonly drainKey: (key: K) => Effect.Effect<void>;
16-
readonly drain: Effect.Effect<void>;
1716
}
1817

19-
interface CoalescingWorkerState<K, V> {
18+
interface KeyedCoalescingWorkerState<K, V> {
2019
readonly latestByKey: Map<K, V>;
2120
readonly queuedKeys: Set<K>;
2221
readonly activeKeys: Set<K>;
2322
}
2423

25-
export const makeCoalescingDrainableWorker = <K, V, E, R>(options: {
24+
export const makeKeyedCoalescingWorker = <K, V, E, R>(options: {
2625
readonly merge: (current: V, next: V) => V;
2726
readonly process: (key: K, value: V) => Effect.Effect<void, E, R>;
28-
}): Effect.Effect<CoalescingDrainableWorker<K, V>, never, Scope.Scope | R> =>
27+
}): Effect.Effect<KeyedCoalescingWorker<K, V>, never, Scope.Scope | R> =>
2928
Effect.gen(function* () {
3029
const queue = yield* Effect.acquireRelease(TxQueue.unbounded<K>(), TxQueue.shutdown);
31-
const stateRef = yield* TxRef.make<CoalescingWorkerState<K, V>>({
30+
const stateRef = yield* TxRef.make<KeyedCoalescingWorkerState<K, V>>({
3231
latestByKey: new Map(),
3332
queuedKeys: new Set(),
3433
activeKeys: new Set(),
@@ -107,7 +106,7 @@ export const makeCoalescingDrainableWorker = <K, V, E, R>(options: {
107106
Effect.forkScoped,
108107
);
109108

110-
const enqueue: CoalescingDrainableWorker<K, V>["enqueue"] = (key, value) =>
109+
const enqueue: KeyedCoalescingWorker<K, V>["enqueue"] = (key, value) =>
111110
TxRef.modify(stateRef, (state) => {
112111
const latestByKey = new Map(state.latestByKey);
113112
const existing = latestByKey.get(key);
@@ -126,17 +125,7 @@ export const makeCoalescingDrainableWorker = <K, V, E, R>(options: {
126125
Effect.asVoid,
127126
);
128127

129-
const drain: CoalescingDrainableWorker<K, V>["drain"] = TxRef.get(stateRef).pipe(
130-
Effect.tap((state) =>
131-
state.latestByKey.size > 0 || state.queuedKeys.size > 0 || state.activeKeys.size > 0
132-
? Effect.txRetry
133-
: Effect.void,
134-
),
135-
Effect.asVoid,
136-
Effect.tx,
137-
);
138-
139-
const drainKey: CoalescingDrainableWorker<K, V>["drainKey"] = (key) =>
128+
const drainKey: KeyedCoalescingWorker<K, V>["drainKey"] = (key) =>
140129
TxRef.get(stateRef).pipe(
141130
Effect.tap((state) =>
142131
state.latestByKey.has(key) || state.queuedKeys.has(key) || state.activeKeys.has(key)
@@ -147,5 +136,5 @@ export const makeCoalescingDrainableWorker = <K, V, E, R>(options: {
147136
Effect.tx,
148137
);
149138

150-
return { enqueue, drainKey, drain } satisfies CoalescingDrainableWorker<K, V>;
139+
return { enqueue, drainKey } satisfies KeyedCoalescingWorker<K, V>;
151140
});

0 commit comments

Comments
 (0)