diff --git a/release-notes.txt b/release-notes.txt index 6f7379e9..9e083131 100644 --- a/release-notes.txt +++ b/release-notes.txt @@ -3,6 +3,12 @@ Release notes: 1.0.0 - adds TaskSeq.withCancellation, #167 + - adds TaskSeq.replicateInfinite, replicateInfiniteAsync, replicateUntilNoneAsync, #345 + - adds TaskSeq.firstOrDefault, lastOrDefault, #345 + - adds TaskSeq.splitAt, #345 + - adds TaskSeq.zipWith, zipWithAsync, zipWith3, zipWithAsync3, #345 + - adds TaskSeq.chunkBy, chunkByAsync, #345 + - adds TaskSeq.threadState, threadStateAsync, #345 - adds docs/ with fsdocs-based documentation site covering generating, transforming, consuming, combining and advanced operations 0.7.0 diff --git a/src/FSharp.Control.TaskSeq.Test/FSharp.Control.TaskSeq.Test.fsproj b/src/FSharp.Control.TaskSeq.Test/FSharp.Control.TaskSeq.Test.fsproj index c8b5051d..059c6597 100644 --- a/src/FSharp.Control.TaskSeq.Test/FSharp.Control.TaskSeq.Test.fsproj +++ b/src/FSharp.Control.TaskSeq.Test/FSharp.Control.TaskSeq.Test.fsproj @@ -50,17 +50,23 @@ + + + + + + diff --git a/src/FSharp.Control.TaskSeq.Test/TaskSeq.ChunkBy.Tests.fs b/src/FSharp.Control.TaskSeq.Test/TaskSeq.ChunkBy.Tests.fs new file mode 100644 index 00000000..76f684c1 --- /dev/null +++ b/src/FSharp.Control.TaskSeq.Test/TaskSeq.ChunkBy.Tests.fs @@ -0,0 +1,135 @@ +module TaskSeq.Tests.ChunkBy + +open Xunit +open FsUnit.Xunit + +open FSharp.Control + +// +// TaskSeq.chunkBy +// TaskSeq.chunkByAsync +// + +module EmptySeq = + [] + let ``Null source is invalid`` () = + assertNullArg + <| fun () -> TaskSeq.chunkBy id (null: TaskSeq) + + assertNullArg + <| fun () -> TaskSeq.chunkByAsync (fun x -> Task.fromResult x) (null: TaskSeq) + + [)>] + let ``TaskSeq-chunkBy on empty gives empty`` variant = + Gen.getEmptyVariant variant + |> TaskSeq.chunkBy id + |> verifyEmpty + + [)>] + let ``TaskSeq-chunkByAsync on empty gives empty`` variant = + Gen.getEmptyVariant variant + |> TaskSeq.chunkByAsync (fun x -> Task.fromResult x) + |> verifyEmpty + + +module Functionality = + [] + let ``TaskSeq-chunkBy groups consecutive equal elements`` () = task { + let ts = taskSeq { yield! [ 1; 1; 2; 2; 2; 3 ] } + let! result = TaskSeq.chunkBy id ts |> TaskSeq.toArrayAsync + result |> should haveLength 3 + result[0] |> should equal (1, [| 1; 1 |]) + result[1] |> should equal (2, [| 2; 2; 2 |]) + result[2] |> should equal (3, [| 3 |]) + } + + [] + let ``TaskSeq-chunkBy with all same key yields one chunk`` () = task { + let ts = taskSeq { yield! [ 5; 5; 5; 5 ] } + let! result = TaskSeq.chunkBy id ts |> TaskSeq.toArrayAsync + result |> should haveLength 1 + result[0] |> should equal (5, [| 5; 5; 5; 5 |]) + } + + [] + let ``TaskSeq-chunkBy with all different keys yields singleton chunks`` () = task { + let ts = taskSeq { yield! [ 1..5 ] } + let! result = TaskSeq.chunkBy id ts |> TaskSeq.toArrayAsync + result |> should haveLength 5 + + result + |> Array.iteri (fun i (k, arr) -> + k |> should equal (i + 1) + arr |> should equal [| i + 1 |]) + } + + [] + let ``TaskSeq-chunkBy with singleton source yields one chunk`` () = task { + let ts = TaskSeq.singleton 42 + let! result = TaskSeq.chunkBy id ts |> TaskSeq.toArrayAsync + result |> should haveLength 1 + result[0] |> should equal (42, [| 42 |]) + } + + [] + let ``TaskSeq-chunkBy uses projection key, not element`` () = task { + let ts = taskSeq { + yield "a1" + yield "a2" + yield "b1" + yield "b2" + yield "a3" + } + + let! result = + TaskSeq.chunkBy (fun (s: string) -> s[0]) ts + |> TaskSeq.toArrayAsync + + result |> should haveLength 3 + let k0, arr0 = result[0] + k0 |> should equal 'a' + arr0 |> should equal [| "a1"; "a2" |] + let k1, arr1 = result[1] + k1 |> should equal 'b' + arr1 |> should equal [| "b1"; "b2" |] + let k2, arr2 = result[2] + k2 |> should equal 'a' + arr2 |> should equal [| "a3" |] + } + + [] + let ``TaskSeq-chunkBy does not merge non-consecutive equal keys`` () = task { + // Key alternates: 1, 2, 1, 2 — should produce 4 chunks not 2 + let ts = taskSeq { yield! [ 1; 2; 1; 2 ] } + let! result = TaskSeq.chunkBy id ts |> TaskSeq.toArrayAsync + result |> should haveLength 4 + } + + [] + let ``TaskSeq-chunkByAsync groups consecutive by async key`` () = task { + let ts = taskSeq { yield! [ 1; 1; 2; 3; 3 ] } + + let! result = + TaskSeq.chunkByAsync (fun x -> Task.fromResult (x % 2 = 0)) ts + |> TaskSeq.toArrayAsync + // odd, even, odd -> 3 chunks + result |> should haveLength 3 + let k0, arr0 = result[0] + k0 |> should equal false + arr0 |> should equal [| 1; 1 |] + let k1, arr1 = result[1] + k1 |> should equal true + arr1 |> should equal [| 2 |] + let k2, arr2 = result[2] + k2 |> should equal false + arr2 |> should equal [| 3; 3 |] + } + + [)>] + let ``TaskSeq-chunkBy all elements same key as variants`` variant = task { + let ts = Gen.getSeqImmutable variant + let! result = TaskSeq.chunkBy (fun _ -> 0) ts |> TaskSeq.toArrayAsync + result |> should haveLength 1 + let _, arr = result[0] + arr |> should haveLength 10 + } diff --git a/src/FSharp.Control.TaskSeq.Test/TaskSeq.FirstLastDefault.Tests.fs b/src/FSharp.Control.TaskSeq.Test/TaskSeq.FirstLastDefault.Tests.fs new file mode 100644 index 00000000..47d0974c --- /dev/null +++ b/src/FSharp.Control.TaskSeq.Test/TaskSeq.FirstLastDefault.Tests.fs @@ -0,0 +1,93 @@ +module TaskSeq.Tests.FirstLastDefault + +open Xunit +open FsUnit.Xunit + +open FSharp.Control + +// +// TaskSeq.firstOrDefault +// TaskSeq.lastOrDefault +// + +module EmptySeq = + [] + let ``Null source is invalid`` () = + assertNullArg <| fun () -> TaskSeq.firstOrDefault 0 null + assertNullArg <| fun () -> TaskSeq.lastOrDefault 0 null + + [)>] + let ``TaskSeq-firstOrDefault returns default for empty`` variant = task { + let! result = Gen.getEmptyVariant variant |> TaskSeq.firstOrDefault 42 + result |> should equal 42 + } + + [)>] + let ``TaskSeq-lastOrDefault returns default for empty`` variant = task { + let! result = Gen.getEmptyVariant variant |> TaskSeq.lastOrDefault 99 + result |> should equal 99 + } + + [] + let ``TaskSeq-firstOrDefault returns default with reference type`` () = task { + let! result = TaskSeq.empty |> TaskSeq.firstOrDefault "hello" + result |> should equal "hello" + } + + [] + let ``TaskSeq-lastOrDefault returns default with reference type`` () = task { + let! result = TaskSeq.empty |> TaskSeq.lastOrDefault "world" + result |> should equal "world" + } + + +module Immutable = + [)>] + let ``TaskSeq-firstOrDefault returns first element`` variant = task { + let ts = Gen.getSeqImmutable variant + let! result = TaskSeq.firstOrDefault 0 ts + result |> should equal 1 + } + + [)>] + let ``TaskSeq-lastOrDefault returns last element`` variant = task { + let ts = Gen.getSeqImmutable variant + let! result = TaskSeq.lastOrDefault 0 ts + result |> should equal 10 + } + + [] + let ``TaskSeq-firstOrDefault does not use default when non-empty`` () = task { + let! result = + taskSeq { + yield 5 + yield 6 + } + |> TaskSeq.firstOrDefault -1 + + result |> should equal 5 + } + + [] + let ``TaskSeq-lastOrDefault does not use default when non-empty`` () = task { + let! result = + taskSeq { + yield 5 + yield 6 + } + |> TaskSeq.lastOrDefault -1 + + result |> should equal 6 + } + + [] + let ``TaskSeq-firstOrDefault with singleton`` () = task { + let! result = TaskSeq.singleton 42 |> TaskSeq.firstOrDefault 0 + result |> should equal 42 + } + + [] + let ``TaskSeq-lastOrDefault with singleton`` () = task { + let! result = TaskSeq.singleton 42 |> TaskSeq.lastOrDefault 0 + result |> should equal 42 + } diff --git a/src/FSharp.Control.TaskSeq.Test/TaskSeq.ReplicateInfinite.Tests.fs b/src/FSharp.Control.TaskSeq.Test/TaskSeq.ReplicateInfinite.Tests.fs new file mode 100644 index 00000000..34e40fe2 --- /dev/null +++ b/src/FSharp.Control.TaskSeq.Test/TaskSeq.ReplicateInfinite.Tests.fs @@ -0,0 +1,158 @@ +module TaskSeq.Tests.ReplicateInfinite + +open System + +open Xunit +open FsUnit.Xunit + +open FSharp.Control + +// +// TaskSeq.replicateInfinite +// TaskSeq.replicateInfiniteAsync +// TaskSeq.replicateUntilNoneAsync +// + +module ReplicateInfinite = + [] + let ``TaskSeq-replicateInfinite yields value indefinitely`` () = task { + let! arr = + TaskSeq.replicateInfinite 7 + |> TaskSeq.take 5 + |> TaskSeq.toArrayAsync + + arr |> should equal [| 7; 7; 7; 7; 7 |] + } + + [] + let ``TaskSeq-replicateInfinite with take 0 gives empty`` () = TaskSeq.replicateInfinite 1 |> TaskSeq.take 0 |> verifyEmpty + + [] + let ``TaskSeq-replicateInfinite can be consumed multiple times`` () = task { + let ts = TaskSeq.replicateInfinite "x" + let! arr1 = ts |> TaskSeq.take 3 |> TaskSeq.toArrayAsync + let! arr2 = ts |> TaskSeq.take 3 |> TaskSeq.toArrayAsync + arr1 |> should equal [| "x"; "x"; "x" |] + arr2 |> should equal arr1 + } + + [] + let ``TaskSeq-replicateInfinite with large take`` () = task { + let count = 10_000 + + let! arr = + TaskSeq.replicateInfinite 42 + |> TaskSeq.take count + |> TaskSeq.toArrayAsync + + arr |> should haveLength count + arr |> Array.forall ((=) 42) |> should be True + } + + [] + let ``TaskSeq-replicateInfinite value captured at call site`` () = task { + let mutable x = 1 + let ts = TaskSeq.replicateInfinite x + x <- 999 + let! arr = ts |> TaskSeq.take 3 |> TaskSeq.toArrayAsync + // value type is captured at call time + arr |> should equal [| 1; 1; 1 |] + } + + +module ReplicateInfiniteAsync = + [] + let ``TaskSeq-replicateInfiniteAsync yields computed value indefinitely`` () = task { + let mutable n = 0 + + let comp () = task { + n <- n + 1 + return n + } + + let! arr = + TaskSeq.replicateInfiniteAsync comp + |> TaskSeq.take 4 + |> TaskSeq.toArrayAsync + + arr |> should equal [| 1; 2; 3; 4 |] + } + + [] + let ``TaskSeq-replicateInfiniteAsync with take 0 gives empty`` () = + let comp () = Task.fromResult 99 + + TaskSeq.replicateInfiniteAsync comp + |> TaskSeq.take 0 + |> verifyEmpty + + [] + let ``TaskSeq-replicateInfiniteAsync constant computation`` () = task { + let comp () = Task.fromResult "hello" + + let! arr = + TaskSeq.replicateInfiniteAsync comp + |> TaskSeq.take 3 + |> TaskSeq.toArrayAsync + + arr |> should equal [| "hello"; "hello"; "hello" |] + } + + +module ReplicateUntilNoneAsync = + [] + let ``TaskSeq-replicateUntilNoneAsync stops on None`` () = task { + let mutable n = 0 + + let comp () = task { + n <- n + 1 + + if n <= 3 then return Some n else return None + } + + let! arr = TaskSeq.replicateUntilNoneAsync comp |> TaskSeq.toArrayAsync + arr |> should equal [| 1; 2; 3 |] + } + + [] + let ``TaskSeq-replicateUntilNoneAsync returns empty when first call is None`` () = task { + let comp () = Task.fromResult None + let ts = TaskSeq.replicateUntilNoneAsync comp + let! arr = ts |> TaskSeq.toArrayAsync + arr |> should haveLength 0 + } + + [] + let ``TaskSeq-replicateUntilNoneAsync yields single element`` () = task { + let mutable called = false + + let comp () = task { + if not called then + called <- true + return Some 42 + else + return None + } + + let! arr = TaskSeq.replicateUntilNoneAsync comp |> TaskSeq.toArrayAsync + arr |> should equal [| 42 |] + } + + [] + let ``TaskSeq-replicateUntilNoneAsync with counter`` () = task { + let count = 100 + let mutable i = 0 + + let comp () = task { + if i < count then + i <- i + 1 + return Some i + else + return None + } + + let! arr = TaskSeq.replicateUntilNoneAsync comp |> TaskSeq.toArrayAsync + arr |> should haveLength count + arr[0] |> should equal 1 + arr[count - 1] |> should equal count + } diff --git a/src/FSharp.Control.TaskSeq.Test/TaskSeq.SplitAt.Tests.fs b/src/FSharp.Control.TaskSeq.Test/TaskSeq.SplitAt.Tests.fs new file mode 100644 index 00000000..2dfc8a64 --- /dev/null +++ b/src/FSharp.Control.TaskSeq.Test/TaskSeq.SplitAt.Tests.fs @@ -0,0 +1,97 @@ +module TaskSeq.Tests.SplitAt + +open System + +open Xunit +open FsUnit.Xunit + +open FSharp.Control + +// +// TaskSeq.splitAt +// + +module EmptySeq = + [] + let ``Null source is invalid`` () = assertNullArg <| fun () -> TaskSeq.splitAt 0 null + + [] + let ``Negative count raises immediately`` () = + fun () -> TaskSeq.splitAt -1 (taskSeq { yield 1 }) |> Task.ignore + + |> should throwAsyncExact typeof + + [)>] + let ``TaskSeq-splitAt 0 on empty gives empty prefix and empty rest`` variant = task { + let! prefix, rest = Gen.getEmptyVariant variant |> TaskSeq.splitAt 0 + prefix |> should haveLength 0 + do! verifyEmpty rest + } + + [)>] + let ``TaskSeq-splitAt n on empty gives empty prefix and empty rest`` variant = task { + let! prefix, rest = Gen.getEmptyVariant variant |> TaskSeq.splitAt 5 + prefix |> should haveLength 0 + do! verifyEmpty rest + } + + +module Immutable = + [)>] + let ``TaskSeq-splitAt 0 returns empty prefix and full rest`` variant = task { + let ts = Gen.getSeqImmutable variant + let! prefix, rest = TaskSeq.splitAt 0 ts + prefix |> should haveLength 0 + let! restArr = TaskSeq.toArrayAsync rest + restArr |> should equal [| 1..10 |] + } + + [)>] + let ``TaskSeq-splitAt at length gives full prefix and empty rest`` variant = task { + let ts = Gen.getSeqImmutable variant + let! prefix, rest = TaskSeq.splitAt 10 ts + prefix |> should equal [| 1..10 |] + do! verifyEmpty rest + } + + [)>] + let ``TaskSeq-splitAt in the middle`` variant = task { + let ts = Gen.getSeqImmutable variant + let! prefix, rest = TaskSeq.splitAt 3 ts + prefix |> should equal [| 1; 2; 3 |] + let! restArr = TaskSeq.toArrayAsync rest + restArr |> should equal [| 4..10 |] + } + + [)>] + let ``TaskSeq-splitAt 1 returns singleton prefix`` variant = task { + let ts = Gen.getSeqImmutable variant + let! prefix, rest = TaskSeq.splitAt 1 ts + prefix |> should equal [| 1 |] + let! restArr = TaskSeq.toArrayAsync rest + restArr |> should equal [| 2..10 |] + } + + [] + let ``TaskSeq-splitAt beyond length gives full prefix and empty rest`` () = task { + let ts = taskSeq { + yield 1 + yield 2 + yield 3 + } + + let! prefix, rest = TaskSeq.splitAt 100 ts + prefix |> should equal [| 1; 2; 3 |] + do! verifyEmpty rest + } + + [] + let ``TaskSeq-splitAt prefix and rest together contain all elements`` () = task { + let data = [| 1..20 |] + let ts = TaskSeq.ofArray data + let splitPoint = 7 + let! prefix, rest = TaskSeq.splitAt splitPoint ts + let! restArr = TaskSeq.toArrayAsync rest + let combined = Array.append prefix restArr + combined |> should equal data + } diff --git a/src/FSharp.Control.TaskSeq.Test/TaskSeq.ThreadState.Tests.fs b/src/FSharp.Control.TaskSeq.Test/TaskSeq.ThreadState.Tests.fs new file mode 100644 index 00000000..c62189fe --- /dev/null +++ b/src/FSharp.Control.TaskSeq.Test/TaskSeq.ThreadState.Tests.fs @@ -0,0 +1,149 @@ +module TaskSeq.Tests.ThreadState + +open Xunit +open FsUnit.Xunit + +open FSharp.Control + +// +// TaskSeq.threadState +// TaskSeq.threadStateAsync +// + +module EmptySeq = + [] + let ``Null source is invalid`` () = + assertNullArg + <| fun () -> TaskSeq.threadState (fun s _ -> 0, s) 0 null + + assertNullArg + <| fun () -> TaskSeq.threadStateAsync (fun s _ -> Task.fromResult (0, s)) 0 null + + [)>] + let ``TaskSeq-threadState on empty gives empty`` variant = + Gen.getEmptyVariant variant + |> TaskSeq.threadState (fun s _ -> 0, s) 0 + |> verifyEmpty + + [)>] + let ``TaskSeq-threadStateAsync on empty gives empty`` variant = + Gen.getEmptyVariant variant + |> TaskSeq.threadStateAsync (fun s _ -> Task.fromResult (0, s)) 0 + |> verifyEmpty + + +module Functionality = + [] + let ``TaskSeq-threadState produces running index`` () = task { + let ts = taskSeq { yield! [ "a"; "b"; "c" ] } + + let folder state item = (state, item), state + 1 + + let! result = TaskSeq.threadState folder 0 ts |> TaskSeq.toArrayAsync + result |> should equal [| (0, "a"); (1, "b"); (2, "c") |] + } + + [] + let ``TaskSeq-threadState running sum`` () = task { + let ts = taskSeq { yield! [ 1..5 ] } + + let folder acc x = acc + x, acc + x + + let! result = TaskSeq.threadState folder 0 ts |> TaskSeq.toArrayAsync + result |> should equal [| 1; 3; 6; 10; 15 |] + } + + [] + let ``TaskSeq-threadState state is threaded correctly`` () = task { + let ts = taskSeq { yield! [ 10; 20; 30 ] } + + let folder state x = x * state, state + 1 + + let! result = TaskSeq.threadState folder 1 ts |> TaskSeq.toArrayAsync + // state starts at 1: 10*1=10, state=2; 20*2=40, state=3; 30*3=90, state=4 + result |> should equal [| 10; 40; 90 |] + } + + [] + let ``TaskSeq-threadState with singleton`` () = task { + let ts = TaskSeq.singleton 42 + + let! result = + TaskSeq.threadState (fun s x -> x + s, s + 1) 10 ts + |> TaskSeq.toArrayAsync + + result |> should equal [| 52 |] + } + + [)>] + let ``TaskSeq-threadState produces correct length`` variant = task { + let ts = Gen.getSeqImmutable variant + + let! result = + TaskSeq.threadState (fun s x -> x, s + 1) 0 ts + |> TaskSeq.toArrayAsync + + result |> should haveLength 10 + result |> should equal [| 1..10 |] + } + + [] + let ``TaskSeq-threadStateAsync running sum`` () = task { + let ts = taskSeq { yield! [ 1..5 ] } + + let folder acc x = Task.fromResult (acc + x, acc + x) + + let! result = TaskSeq.threadStateAsync folder 0 ts |> TaskSeq.toArrayAsync + result |> should equal [| 1; 3; 6; 10; 15 |] + } + + [] + let ``TaskSeq-threadStateAsync and threadState produce same results for pure function`` () = task { + let ts = taskSeq { yield! [ 1..10 ] } + let ts2 = taskSeq { yield! [ 1..10 ] } + + let syncFolder acc x = x - acc, x + acc + let asyncFolder acc x = Task.fromResult (syncFolder acc x) + + let! syncResult = TaskSeq.threadState syncFolder 0 ts |> TaskSeq.toArrayAsync + + let! asyncResult = + TaskSeq.threadStateAsync asyncFolder 0 ts2 + |> TaskSeq.toArrayAsync + + syncResult |> should equal asyncResult + } + + [] + let ``TaskSeq-threadStateAsync with genuinely async folder`` () = task { + let ts = taskSeq { yield! [ 1..3 ] } + + let folder state x = task { + // Use a real async operation to verify the async path works + let! v = Task.fromResult (x * 10) + return v, state + x + } + + let! result = TaskSeq.threadStateAsync folder 0 ts |> TaskSeq.toArrayAsync + // state: 0; x=1: result=10, state=1; x=2: result=20, state=3; x=3: result=30, state=6 + result |> should equal [| 10; 20; 30 |] + } + + [] + let ``TaskSeq-threadState is equivalent to scan minus initial state`` () = task { + // threadState folder 0 [1;2;3] gives the running sums [1;3;6] + // scan (fun acc x -> acc + x) 0 [1;2;3] gives [0;1;3;6] — drop the head + let ts = taskSeq { yield! [ 1..5 ] } + let ts2 = taskSeq { yield! [ 1..5 ] } + + let! viaThread = + TaskSeq.threadState (fun acc x -> acc + x, acc + x) 0 ts + |> TaskSeq.toArrayAsync + + let! viaScan = + TaskSeq.scan (fun acc x -> acc + x) 0 ts2 + |> TaskSeq.skip 1 + |> TaskSeq.toArrayAsync + + viaThread |> should equal viaScan + } diff --git a/src/FSharp.Control.TaskSeq.Test/TaskSeq.ZipWith.Tests.fs b/src/FSharp.Control.TaskSeq.Test/TaskSeq.ZipWith.Tests.fs new file mode 100644 index 00000000..c6c5b6ac --- /dev/null +++ b/src/FSharp.Control.TaskSeq.Test/TaskSeq.ZipWith.Tests.fs @@ -0,0 +1,174 @@ +module TaskSeq.Tests.ZipWith + +open Xunit +open FsUnit.Xunit + +open FSharp.Control + +// +// TaskSeq.zipWith +// TaskSeq.zipWithAsync +// TaskSeq.zipWith3 +// TaskSeq.zipWithAsync3 +// + +module EmptySeq = + [] + let ``Null source is invalid for zipWith`` () = + assertNullArg + <| fun () -> TaskSeq.zipWith (+) null TaskSeq.empty + + assertNullArg + <| fun () -> TaskSeq.zipWith (+) TaskSeq.empty null + + assertNullArg + <| fun () -> TaskSeq.zipWith (+) null (null: TaskSeq) + + [] + let ``Null source is invalid for zipWithAsync`` () = + assertNullArg + <| fun () -> TaskSeq.zipWithAsync (fun a b -> Task.fromResult (a + b)) null TaskSeq.empty + + assertNullArg + <| fun () -> TaskSeq.zipWithAsync (fun a b -> Task.fromResult (a + b)) TaskSeq.empty null + + [] + let ``Null source is invalid for zipWith3`` () = + assertNullArg + <| fun () -> TaskSeq.zipWith3 (fun a b c -> a) null TaskSeq.empty TaskSeq.empty + + assertNullArg + <| fun () -> TaskSeq.zipWith3 (fun a b c -> a) TaskSeq.empty null TaskSeq.empty + + assertNullArg + <| fun () -> TaskSeq.zipWith3 (fun a b c -> a) TaskSeq.empty TaskSeq.empty null + + [] + let ``Null source is invalid for zipWithAsync3`` () = + let f a b c = Task.fromResult (a + b + c) + + assertNullArg + <| fun () -> TaskSeq.zipWithAsync3 f null TaskSeq.empty TaskSeq.empty + + assertNullArg + <| fun () -> TaskSeq.zipWithAsync3 f TaskSeq.empty null TaskSeq.empty + + assertNullArg + <| fun () -> TaskSeq.zipWithAsync3 f TaskSeq.empty TaskSeq.empty null + + [)>] + let ``TaskSeq-zipWith with two empty gives empty`` variant = + TaskSeq.zipWith (+) (Gen.getEmptyVariant variant) (Gen.getEmptyVariant variant) + |> verifyEmpty + + [)>] + let ``TaskSeq-zipWith with one empty gives empty`` variant = + TaskSeq.zipWith (+) TaskSeq.empty (Gen.getEmptyVariant variant) + |> verifyEmpty + + [)>] + let ``TaskSeq-zipWith3 with empties gives empty`` variant = + TaskSeq.zipWith3 (fun a b c -> a + b + c) (Gen.getEmptyVariant variant) (Gen.getEmptyVariant variant) (Gen.getEmptyVariant variant) + |> verifyEmpty + + +module Immutable = + [)>] + let ``TaskSeq-zipWith applies mapping correctly`` variant = task { + let one = Gen.getSeqImmutable variant + let two = Gen.getSeqImmutable variant + let! result = TaskSeq.zipWith (+) one two |> TaskSeq.toArrayAsync + + result + |> should equal (Array.init 10 (fun i -> (i + 1) + (i + 1))) + } + + [)>] + let ``TaskSeq-zipWithAsync applies async mapping correctly`` variant = task { + let one = Gen.getSeqImmutable variant + let two = Gen.getSeqImmutable variant + + let! result = + TaskSeq.zipWithAsync (fun a b -> Task.fromResult (a * b)) one two + |> TaskSeq.toArrayAsync + + result + |> should equal (Array.init 10 (fun i -> (i + 1) * (i + 1))) + } + + [)>] + let ``TaskSeq-zipWith3 applies three-way mapping`` variant = task { + let s1 = Gen.getSeqImmutable variant + let s2 = Gen.getSeqImmutable variant + let s3 = Gen.getSeqImmutable variant + + let! result = + TaskSeq.zipWith3 (fun a b c -> a + b + c) s1 s2 s3 + |> TaskSeq.toArrayAsync + + result + |> should equal (Array.init 10 (fun i -> 3 * (i + 1))) + } + + [)>] + let ``TaskSeq-zipWithAsync3 applies async three-way mapping`` variant = task { + let s1 = Gen.getSeqImmutable variant + let s2 = Gen.getSeqImmutable variant + let s3 = Gen.getSeqImmutable variant + + let! result = + TaskSeq.zipWithAsync3 (fun a b c -> Task.fromResult (a + b + c)) s1 s2 s3 + |> TaskSeq.toArrayAsync + + result + |> should equal (Array.init 10 (fun i -> 3 * (i + 1))) + } + + [] + let ``TaskSeq-zipWith truncates to shorter sequence`` () = task { + let short = taskSeq { + yield 1 + yield 2 + } + + let long = taskSeq { yield! [ 10..20 ] } + let! result = TaskSeq.zipWith (+) short long |> TaskSeq.toArrayAsync + result |> should equal [| 11; 13 |] + } + + [] + let ``TaskSeq-zipWith string concatenation`` () = task { + let keys = taskSeq { + yield "a" + yield "b" + yield "c" + } + + let values = taskSeq { + yield 1 + yield 2 + yield 3 + } + + let! result = + TaskSeq.zipWith (fun k v -> sprintf "%s=%d" k v) keys values + |> TaskSeq.toArrayAsync + + result |> should equal [| "a=1"; "b=2"; "c=3" |] + } + + [] + let ``TaskSeq-zipWith is equivalent to zip-then-map`` () = task { + let s1 = taskSeq { yield! [ 1..5 ] } + let s2 = taskSeq { yield! [ 10..14 ] } + let! viaZipWith = TaskSeq.zipWith (+) s1 s2 |> TaskSeq.toArrayAsync + let s1b = taskSeq { yield! [ 1..5 ] } + let s2b = taskSeq { yield! [ 10..14 ] } + + let! viaZipMap = + TaskSeq.zip s1b s2b + |> TaskSeq.map (fun (a, b) -> a + b) + |> TaskSeq.toArrayAsync + + viaZipWith |> should equal viaZipMap + } diff --git a/src/FSharp.Control.TaskSeq/TaskSeq.fs b/src/FSharp.Control.TaskSeq/TaskSeq.fs index 28e1f136..4f59e0e6 100644 --- a/src/FSharp.Control.TaskSeq/TaskSeq.fs +++ b/src/FSharp.Control.TaskSeq/TaskSeq.fs @@ -124,6 +124,9 @@ type TaskSeq private () = static member singleton(value: 'T) = Internal.singleton value static member replicate count value = Internal.replicate count value + static member replicateInfinite value = Internal.replicateInfinite value + static member replicateInfiniteAsync computation = Internal.replicateInfiniteAsync computation + static member replicateUntilNoneAsync computation = Internal.replicateUntilNoneAsync computation static member isEmpty source = Internal.isEmpty source @@ -401,6 +404,10 @@ type TaskSeq private () = Internal.tryLast source |> Task.map (Option.defaultWith Internal.raiseEmptySeq) + static member firstOrDefault defaultValue source = Internal.firstOrDefault defaultValue source + static member lastOrDefault defaultValue source = Internal.lastOrDefault defaultValue source + static member splitAt count source = Internal.splitAt count source + static member tryTail source = Internal.tryTail source static member tail source = @@ -478,6 +485,8 @@ type TaskSeq private () = static member distinctUntilChanged source = Internal.distinctUntilChanged source static member pairwise source = Internal.pairwise source static member chunkBySize chunkSize source = Internal.chunkBySize chunkSize source + static member chunkBy projection source = Internal.chunkBy projection source + static member chunkByAsync projection source = Internal.chunkByAsync projection source static member windowed windowSize source = Internal.windowed windowSize source static member forall predicate source = Internal.forall (Predicate predicate) source @@ -519,6 +528,10 @@ type TaskSeq private () = static member zip source1 source2 = Internal.zip source1 source2 static member zip3 source1 source2 source3 = Internal.zip3 source1 source2 source3 + static member zipWith mapping source1 source2 = Internal.zipWith mapping source1 source2 + static member zipWithAsync mapping source1 source2 = Internal.zipWithAsync mapping source1 source2 + static member zipWith3 mapping source1 source2 source3 = Internal.zipWith3 mapping source1 source2 source3 + static member zipWithAsync3 mapping source1 source2 source3 = Internal.zipWithAsync3 mapping source1 source2 source3 static member compareWith comparer source1 source2 = Internal.compareWith comparer source1 source2 static member compareWithAsync comparer source1 source2 = Internal.compareWithAsync comparer source1 source2 static member fold folder state source = Internal.fold (FolderAction folder) state source @@ -540,3 +553,5 @@ type TaskSeq private () = static member partitionAsync predicate source = Internal.partition (PredicateAsync predicate) source static member mapFold mapping state source = Internal.mapFold (MapFolderAction mapping) state source static member mapFoldAsync mapping state source = Internal.mapFold (AsyncMapFolderAction mapping) state source + static member threadState folder state source = Internal.threadState folder state source + static member threadStateAsync folder state source = Internal.threadStateAsync folder state source diff --git a/src/FSharp.Control.TaskSeq/TaskSeq.fsi b/src/FSharp.Control.TaskSeq/TaskSeq.fsi index ef343901..b9bf933b 100644 --- a/src/FSharp.Control.TaskSeq/TaskSeq.fsi +++ b/src/FSharp.Control.TaskSeq/TaskSeq.fsi @@ -114,6 +114,39 @@ type TaskSeq = /// Thrown when is negative. static member replicate: count: int -> value: 'T -> TaskSeq<'T> + /// + /// Creates an infinite task sequence by repeating indefinitely. + /// The sequence never ends; use , , + /// or similar to bound consumption. + /// + /// + /// The value to repeat. + /// An infinite task sequence of . + static member replicateInfinite: value: 'T -> TaskSeq<'T> + + /// + /// Creates an infinite task sequence by repeatedly executing , + /// yielding each produced value indefinitely. + /// The sequence never ends; use , , + /// or similar to bound consumption. + /// If the computation is synchronous, consider using . + /// + /// + /// A function that produces the next value on each invocation. + /// An infinite task sequence of values produced by . + static member replicateInfiniteAsync: computation: (unit -> #Task<'T>) -> TaskSeq<'T> + + /// + /// Creates a task sequence by repeatedly executing until it returns + /// , yielding each value in order. + /// + /// + /// + /// A function that returns value to emit, or to end the sequence. + /// + /// A task sequence of values produced by until it returns . + static member replicateUntilNoneAsync: computation: (unit -> #Task<'T option>) -> TaskSeq<'T> + /// /// Returns if the task sequence contains no elements, otherwise. /// @@ -893,6 +926,28 @@ type TaskSeq = /// Thrown when the task sequence is empty. static member last: source: TaskSeq<'T> -> Task<'T> + /// + /// Returns the first element of the input task sequence given by , + /// or if the sequence is empty. + /// + /// + /// The value to return when the source sequence is empty. + /// The input task sequence. + /// The first element of the task sequence, or if empty. + /// Thrown when the input task sequence is null. + static member firstOrDefault: defaultValue: 'T -> source: TaskSeq<'T> -> Task<'T> + + /// + /// Returns the last element of the input task sequence given by , + /// or if the sequence is empty. + /// + /// + /// The value to return when the source sequence is empty. + /// The input task sequence. + /// The last element of the task sequence, or if empty. + /// Thrown when the input task sequence is null. + static member lastOrDefault: defaultValue: 'T -> source: TaskSeq<'T> -> Task<'T> + /// /// Returns the nth element of the input task sequence given by , /// or if the sequence does not contain enough elements. @@ -1101,6 +1156,27 @@ type TaskSeq = /// Thrown when is less than zero. static member truncate: count: int -> source: TaskSeq<'T> -> TaskSeq<'T> + /// + /// Splits the task sequence into a prefix array of at most elements and a task + /// sequence containing the remaining elements. The prefix is eagerly evaluated in a single pass; the + /// remaining sequence is lazy. If the source has fewer than elements, the prefix + /// array is shorter than and the remaining sequence is empty. + /// + /// + /// + /// The prefix array and the remaining task sequence share a single enumerator over . + /// For sequences backed by replayable data (arrays, lists, taskSeq builders, etc.) this is always safe. + /// For externally-managed resources (network streams, database cursors) the remaining sequence should be + /// consumed or disposed before any other operation on . + /// + /// + /// The maximum number of elements in the prefix. Must be non-negative. + /// The input task sequence. + /// A task returning a tuple (prefix, rest) where prefix is an array of the first elements and rest is the remaining task sequence. + /// Thrown when the input task sequence is null. + /// Thrown when is negative. + static member splitAt: count: int -> source: TaskSeq<'T> -> Task<'T[] * TaskSeq<'T>> + /// /// Returns a task sequence that, when iterated, yields elements of the underlying sequence while the /// given function returns , and then returns no further elements. @@ -1530,6 +1606,35 @@ type TaskSeq = /// Thrown when is not positive. static member chunkBySize: chunkSize: int -> source: TaskSeq<'T> -> TaskSeq<'T[]> + /// + /// Groups consecutive elements of the task sequence by a key derived from each element using + /// , yielding (key, elements[]) pairs. A new group is started + /// each time the key changes from one element to the next. Unlike , + /// only consecutive elements with the same key are merged. + /// If the function is asynchronous, consider using . + /// + /// + /// A function that computes the key for each element. + /// The input task sequence. + /// A task sequence of (key, elements[]) pairs for each run of equal keys. + /// Thrown when the input task sequence is null. + static member chunkBy: projection: ('T -> 'Key) -> source: TaskSeq<'T> -> TaskSeq<'Key * 'T[]> when 'Key: equality + + /// + /// Groups consecutive elements of the task sequence by a key derived from each element using the + /// asynchronous function , yielding (key, elements[]) pairs. + /// A new group is started each time the key changes from one element to the next. + /// Unlike , only consecutive elements with the same key are merged. + /// If the function is synchronous, consider using . + /// + /// + /// An asynchronous function that computes the key for each element. + /// The input task sequence. + /// A task sequence of (key, elements[]) pairs for each run of equal keys. + /// Thrown when the input task sequence is null. + static member chunkByAsync: + projection: ('T -> #Task<'Key>) -> source: TaskSeq<'T> -> TaskSeq<'Key * 'T[]> when 'Key: equality + /// /// Returns a task sequence of sliding windows of a given size over the source sequence. /// Each window is a fresh array of exactly consecutive elements. @@ -1572,6 +1677,75 @@ type TaskSeq = static member zip3: source1: TaskSeq<'T1> -> source2: TaskSeq<'T2> -> source3: TaskSeq<'T3> -> TaskSeq<'T1 * 'T2 * 'T3> + /// + /// Combines two task sequences and applies the given function to corresponding + /// element pairs. The sequences need not have equal lengths: when one is exhausted any remaining elements in + /// the other are ignored. + /// If the function is asynchronous, consider using . + /// + /// + /// A function to apply to each element pair. + /// The first input task sequence. + /// The second input task sequence. + /// The result task sequence of mapped values. + /// Thrown when either of the two input task sequences is null. + static member zipWith: mapping: ('T -> 'U -> 'V) -> source1: TaskSeq<'T> -> source2: TaskSeq<'U> -> TaskSeq<'V> + + /// + /// Combines two task sequences and applies the given asynchronous function to + /// corresponding element pairs. The sequences need not have equal lengths: when one is exhausted any remaining + /// elements in the other are ignored. + /// If the function is synchronous, consider using . + /// + /// + /// An asynchronous function to apply to each element pair. + /// The first input task sequence. + /// The second input task sequence. + /// The result task sequence of mapped values. + /// Thrown when either of the two input task sequences is null. + static member zipWithAsync: + mapping: ('T -> 'U -> #Task<'V>) -> source1: TaskSeq<'T> -> source2: TaskSeq<'U> -> TaskSeq<'V> + + /// + /// Combines three task sequences and applies the given function to corresponding + /// element triples. The sequences need not have equal lengths: when one is exhausted any remaining elements in + /// the others are ignored. + /// If the function is asynchronous, consider using . + /// + /// + /// A function to apply to each element triple. + /// The first input task sequence. + /// The second input task sequence. + /// The third input task sequence. + /// The result task sequence of mapped values. + /// Thrown when any of the three input task sequences is null. + static member zipWith3: + mapping: ('T1 -> 'T2 -> 'T3 -> 'V) -> + source1: TaskSeq<'T1> -> + source2: TaskSeq<'T2> -> + source3: TaskSeq<'T3> -> + TaskSeq<'V> + + /// + /// Combines three task sequences and applies the given asynchronous function to + /// corresponding element triples. The sequences need not have equal lengths: when one is exhausted any remaining + /// elements in the others are ignored. + /// If the function is synchronous, consider using . + /// + /// + /// An asynchronous function to apply to each element triple. + /// The first input task sequence. + /// The second input task sequence. + /// The third input task sequence. + /// The result task sequence of mapped values. + /// Thrown when any of the three input task sequences is null. + static member zipWithAsync3: + mapping: ('T1 -> 'T2 -> 'T3 -> #Task<'V>) -> + source1: TaskSeq<'T1> -> + source2: TaskSeq<'T2> -> + source3: TaskSeq<'T3> -> + TaskSeq<'V> + /// /// Applies a comparer function to corresponding elements of two task sequences, returning the result of the /// first comparison that is non-zero, or zero if all compared elements are equal. The sequences are compared @@ -1699,6 +1873,38 @@ type TaskSeq = source: TaskSeq<'T> -> Task<'Result[] * 'State> + /// + /// Applies the function to each element of the task sequence, threading a state + /// argument through the computation, and lazily yields each mapped result as a new task sequence. Unlike + /// , the results are streamed rather than collected into an array, and the + /// final state is not returned. + /// If the function is asynchronous, consider using . + /// + /// + /// A function that maps each element to a result while also updating the state. + /// The initial state. + /// The input task sequence. + /// A task sequence of mapped results, produced lazily in source order. + /// Thrown when the input task sequence is null. + static member threadState: + folder: ('State -> 'T -> 'Result * 'State) -> state: 'State -> source: TaskSeq<'T> -> TaskSeq<'Result> + + /// + /// Applies the asynchronous function to each element of the task sequence, threading + /// a state argument through the computation, and lazily yields each mapped result as a new task sequence. Unlike + /// , the results are streamed rather than collected into an array, and the + /// final state is not returned. + /// If the function is synchronous, consider using . + /// + /// + /// An asynchronous function that maps each element to a result while also updating the state. + /// The initial state. + /// The input task sequence. + /// A task sequence of mapped results, produced lazily in source order. + /// Thrown when the input task sequence is null. + static member threadStateAsync: + folder: ('State -> 'T -> #Task<'Result * 'State>) -> state: 'State -> source: TaskSeq<'T> -> TaskSeq<'Result> + /// /// Applies the function to each element of the task sequence, threading diff --git a/src/FSharp.Control.TaskSeq/TaskSeqInternal.fs b/src/FSharp.Control.TaskSeq/TaskSeqInternal.fs index 91937ec0..ffff487d 100644 --- a/src/FSharp.Control.TaskSeq/TaskSeqInternal.fs +++ b/src/FSharp.Control.TaskSeq/TaskSeqInternal.fs @@ -152,6 +152,28 @@ module internal TaskSeqInternal = yield value } + let replicateInfinite value = taskSeq { + while true do + yield value + } + + let replicateInfiniteAsync (computation: unit -> #Task<'T>) = taskSeq { + while true do + let! value = computation () + yield value + } + + let replicateUntilNoneAsync (computation: unit -> #Task<'T option>) = taskSeq { + let mutable go = true + + while go do + let! result = computation () + + match result with + | Some value -> yield value + | None -> go <- false + } + /// Returns length unconditionally, or based on a predicate let lengthBy predicate (source: TaskSeq<_>) = checkNonNull (nameof source) source @@ -500,6 +522,30 @@ module internal TaskSeqInternal = return results.ToArray(), state } + let threadState (folder: 'State -> 'T -> 'U * 'State) initial (source: TaskSeq<'T>) : TaskSeq<'U> = + checkNonNull (nameof source) source + + taskSeq { + let mutable state = initial + + for item in source do + let result, newState = folder state item + state <- newState + yield result + } + + let threadStateAsync (folder: 'State -> 'T -> #Task<'U * 'State>) initial (source: TaskSeq<'T>) : TaskSeq<'U> = + checkNonNull (nameof source) source + + taskSeq { + let mutable state = initial + + for item in source do + let! (result, newState) = folder state item + state <- newState + yield result + } + let toResizeArrayAsync source = checkNonNull (nameof source) source @@ -585,6 +631,92 @@ module internal TaskSeqInternal = go <- step1 && step2 && step3 } + let zipWith (mapping: 'T -> 'U -> 'V) (source1: TaskSeq<'T>) (source2: TaskSeq<'U>) = + checkNonNull (nameof source1) source1 + checkNonNull (nameof source2) source2 + + taskSeq { + use e1 = source1.GetAsyncEnumerator CancellationToken.None + use e2 = source2.GetAsyncEnumerator CancellationToken.None + let mutable go = true + let! step1 = e1.MoveNextAsync() + let! step2 = e2.MoveNextAsync() + go <- step1 && step2 + + while go do + yield mapping e1.Current e2.Current + let! step1 = e1.MoveNextAsync() + let! step2 = e2.MoveNextAsync() + go <- step1 && step2 + } + + let zipWithAsync (mapping: 'T -> 'U -> #Task<'V>) (source1: TaskSeq<'T>) (source2: TaskSeq<'U>) = + checkNonNull (nameof source1) source1 + checkNonNull (nameof source2) source2 + + taskSeq { + use e1 = source1.GetAsyncEnumerator CancellationToken.None + use e2 = source2.GetAsyncEnumerator CancellationToken.None + let mutable go = true + let! step1 = e1.MoveNextAsync() + let! step2 = e2.MoveNextAsync() + go <- step1 && step2 + + while go do + let! result = mapping e1.Current e2.Current + yield result + let! step1 = e1.MoveNextAsync() + let! step2 = e2.MoveNextAsync() + go <- step1 && step2 + } + + let zipWith3 (mapping: 'T1 -> 'T2 -> 'T3 -> 'V) (source1: TaskSeq<'T1>) (source2: TaskSeq<'T2>) (source3: TaskSeq<'T3>) = + checkNonNull (nameof source1) source1 + checkNonNull (nameof source2) source2 + checkNonNull (nameof source3) source3 + + taskSeq { + use e1 = source1.GetAsyncEnumerator CancellationToken.None + use e2 = source2.GetAsyncEnumerator CancellationToken.None + use e3 = source3.GetAsyncEnumerator CancellationToken.None + let mutable go = true + let! step1 = e1.MoveNextAsync() + let! step2 = e2.MoveNextAsync() + let! step3 = e3.MoveNextAsync() + go <- step1 && step2 && step3 + + while go do + yield mapping e1.Current e2.Current e3.Current + let! step1 = e1.MoveNextAsync() + let! step2 = e2.MoveNextAsync() + let! step3 = e3.MoveNextAsync() + go <- step1 && step2 && step3 + } + + let zipWithAsync3 (mapping: 'T1 -> 'T2 -> 'T3 -> #Task<'V>) (source1: TaskSeq<'T1>) (source2: TaskSeq<'T2>) (source3: TaskSeq<'T3>) = + checkNonNull (nameof source1) source1 + checkNonNull (nameof source2) source2 + checkNonNull (nameof source3) source3 + + taskSeq { + use e1 = source1.GetAsyncEnumerator CancellationToken.None + use e2 = source2.GetAsyncEnumerator CancellationToken.None + use e3 = source3.GetAsyncEnumerator CancellationToken.None + let mutable go = true + let! step1 = e1.MoveNextAsync() + let! step2 = e2.MoveNextAsync() + let! step3 = e3.MoveNextAsync() + go <- step1 && step2 && step3 + + while go do + let! result = mapping e1.Current e2.Current e3.Current + yield result + let! step1 = e1.MoveNextAsync() + let! step2 = e2.MoveNextAsync() + let! step3 = e3.MoveNextAsync() + go <- step1 && step2 && step3 + } + let compareWith (comparer: 'T -> 'T -> int) (source1: TaskSeq<'T>) (source2: TaskSeq<'T>) = checkNonNull (nameof source1) source1 checkNonNull (nameof source2) source2 @@ -735,6 +867,52 @@ module internal TaskSeqInternal = |> Some } + let firstOrDefault defaultValue source = + tryHead source + |> Task.map (Option.defaultValue defaultValue) + + let lastOrDefault defaultValue source = + tryLast source + |> Task.map (Option.defaultValue defaultValue) + + let splitAt count (source: TaskSeq<'T>) = + checkNonNull (nameof source) source + + if count < 0 then + invalidArg (nameof count) $"The value must be non-negative, but was {count}." + + task { + use e = source.GetAsyncEnumerator CancellationToken.None + let first = ResizeArray<'T>(count) + let mutable i = 0 + let mutable go = true + + while go && i < count do + let! step = e.MoveNextAsync() + + if step then + first.Add e.Current + i <- i + 1 + else + go <- false + + // 'rest' captures 'e' from the outer task block, following the same pattern as tryTail. + let rest = taskSeq { + let mutable go2 = go + + if go2 then + let! step = e.MoveNextAsync() + go2 <- step + + while go2 do + yield e.Current + let! step = e.MoveNextAsync() + go2 <- step + } + + return first.ToArray(), rest + } + let tryItem index (source: TaskSeq<_>) = checkNonNull (nameof source) source @@ -1522,6 +1700,62 @@ module internal TaskSeqInternal = yield buffer.[0 .. count - 1] } + let chunkBy (projection: 'T -> 'Key) (source: TaskSeq<'T>) : TaskSeq<'Key * 'T[]> = + checkNonNull (nameof source) source + + taskSeq { + let mutable maybeCurrentKey = ValueNone + let mutable currentChunk = ResizeArray<'T>() + + for item in source do + let key = projection item + + match maybeCurrentKey with + | ValueNone -> + maybeCurrentKey <- ValueSome key + currentChunk.Add item + | ValueSome prevKey -> + if prevKey = key then + currentChunk.Add item + else + yield prevKey, currentChunk.ToArray() + currentChunk <- ResizeArray<'T>() + currentChunk.Add item + maybeCurrentKey <- ValueSome key + + match maybeCurrentKey with + | ValueNone -> () + | ValueSome lastKey -> yield lastKey, currentChunk.ToArray() + } + + let chunkByAsync (projection: 'T -> #Task<'Key>) (source: TaskSeq<'T>) : TaskSeq<'Key * 'T[]> = + checkNonNull (nameof source) source + + taskSeq { + let mutable maybeCurrentKey = ValueNone + let mutable currentChunk = ResizeArray<'T>() + + for item in source do + let! key = projection item + + match maybeCurrentKey with + | ValueNone -> + maybeCurrentKey <- ValueSome key + currentChunk.Add item + | ValueSome prevKey -> + if prevKey = key then + currentChunk.Add item + else + yield prevKey, currentChunk.ToArray() + currentChunk <- ResizeArray<'T>() + currentChunk.Add item + maybeCurrentKey <- ValueSome key + + match maybeCurrentKey with + | ValueNone -> () + | ValueSome lastKey -> yield lastKey, currentChunk.ToArray() + } + let windowed windowSize (source: TaskSeq<_>) = if windowSize <= 0 then invalidArg (nameof windowSize) $"The value must be positive, but was %i{windowSize}."