Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
* Performance: `filterAsync` β€” replaced `asyncSeq`-builder implementation with a direct optimised enumerator, reducing allocation and generator overhead.
* Performance: `chooseAsync` β€” fallback (non-`AsyncSeqOp`) path now uses a direct optimised enumerator instead of the `asyncSeq` builder.
* Performance: `foldAsync` β€” fallback (non-`AsyncSeqOp`) path now uses a direct loop instead of composing `scanAsync` + `lastOrDefault`, avoiding intermediate sequence allocations.
* Benchmarks: added `AsyncSeqFilterChooseFoldBenchmarks` and `AsyncSeqPipelineBenchmarks` benchmark classes to measure `filterAsync`, `chooseAsync`, `foldAsync`, `toArrayAsync`, and common multi-step pipelines.
* Performance: `take` β€” replaced `asyncSeq`-builder implementation with a direct optimised enumerator (`OptimizedTakeEnumerator`), eliminating generator-machinery overhead for this common slicing operation.
* Performance: `skip` β€” replaced `asyncSeq`-builder implementation with a direct optimised enumerator (`OptimizedSkipEnumerator`), eliminating generator-machinery overhead for this common slicing operation.
* Benchmarks: added `AsyncSeqFilterChooseFoldBenchmarks`, `AsyncSeqPipelineBenchmarks`, and `AsyncSeqSliceBenchmarks` benchmark classes.

### 4.8.0

Expand Down
83 changes: 56 additions & 27 deletions src/FSharp.Control.AsyncSeq/AsyncSeq.fs
Original file line number Diff line number Diff line change
Expand Up @@ -974,6 +974,56 @@ module AsyncSeq =
disposed <- true
source.Dispose()

// Optimized take enumerator: stops after yielding `count` elements without asyncSeq builder overhead
type private OptimizedTakeEnumerator<'T>(source: IAsyncSeqEnumerator<'T>, count: int) =
let mutable disposed = false
let mutable remaining = count

interface IAsyncSeqEnumerator<'T> with
member _.MoveNext() = async {
if remaining <= 0 then return None
else
let! result = source.MoveNext()
match result with
| None -> return None
| Some value ->
remaining <- remaining - 1
return Some value }

member _.Dispose() =
if not disposed then
disposed <- true
source.Dispose()

// Optimized skip enumerator: discards the first `count` elements without asyncSeq builder overhead
type private OptimizedSkipEnumerator<'T>(source: IAsyncSeqEnumerator<'T>, count: int) =
let mutable disposed = false
let mutable toSkip = count
let mutable exhausted = false

interface IAsyncSeqEnumerator<'T> with
member _.MoveNext() = async {
if exhausted then return None
else
// Drain skipped elements on the first call (toSkip > 0 only initially)
let mutable doneSkipping = false
while toSkip > 0 && not doneSkipping do
let! result = source.MoveNext()
match result with
| None ->
toSkip <- 0
exhausted <- true
doneSkipping <- true
| Some _ ->
toSkip <- toSkip - 1
if exhausted then return None
else return! source.MoveNext() }

member _.Dispose() =
if not disposed then
disposed <- true
source.Dispose()

let mapAsync f (source : AsyncSeq<'T>) : AsyncSeq<'TResult> =
match source with
| :? AsyncSeqOp<'T> as source -> source.MapAsync f
Expand Down Expand Up @@ -1897,36 +1947,15 @@ module AsyncSeq =
let skipWhile p (source : AsyncSeq<'T>) =
skipWhileAsync (p >> async.Return) source

let take count (source : AsyncSeq<'T>) : AsyncSeq<_> = asyncSeq {
if (count < 0) then invalidArg "count" "must be non-negative"
use ie = source.GetEnumerator()
let n = ref count
if n.Value > 0 then
let! move = ie.MoveNext()
let b = ref move
while b.Value.IsSome do
yield b.Value.Value
n := n.Value - 1
if n.Value > 0 then
let! moven = ie.MoveNext()
b := moven
else b := None }
let take count (source : AsyncSeq<'T>) : AsyncSeq<_> =
if count < 0 then invalidArg "count" "must be non-negative"
AsyncSeqImpl(fun () -> new OptimizedTakeEnumerator<'T>(source.GetEnumerator(), count) :> IAsyncSeqEnumerator<'T>) :> AsyncSeq<'T>

let truncate count source = take count source

let skip count (source : AsyncSeq<'T>) : AsyncSeq<_> = asyncSeq {
if (count < 0) then invalidArg "count" "must be non-negative"
use ie = source.GetEnumerator()
let! move = ie.MoveNext()
let b = ref move
let n = ref count
while b.Value.IsSome do
if n.Value = 0 then
yield b.Value.Value
else
n := n.Value - 1
let! moven = ie.MoveNext()
b := moven }
let skip count (source : AsyncSeq<'T>) : AsyncSeq<_> =
if count < 0 then invalidArg "count" "must be non-negative"
AsyncSeqImpl(fun () -> new OptimizedSkipEnumerator<'T>(source.GetEnumerator(), count) :> IAsyncSeqEnumerator<'T>) :> AsyncSeq<'T>

let tail (source : AsyncSeq<'T>) : AsyncSeq<'T> = skip 1 source

Expand Down
35 changes: 35 additions & 0 deletions tests/FSharp.Control.AsyncSeq.Benchmarks/AsyncSeqBenchmarks.fs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,41 @@ type AsyncSeqPipelineBenchmarks() =
|> Async.RunSynchronously
|> ignore

/// Benchmarks for take and skip β€” common slicing operations
[<MemoryDiagnoser>]
[<SimpleJob(RuntimeMoniker.Net80)>]
type AsyncSeqSliceBenchmarks() =

[<Params(1000, 10000)>]
member val ElementCount = 0 with get, set

/// Benchmark take: stops after N elements
[<Benchmark(Baseline = true)>]
member this.Take() =
AsyncSeq.replicateInfinite 1
|> AsyncSeq.take this.ElementCount
|> AsyncSeq.iterAsync (fun _ -> async.Return())
|> Async.RunSynchronously

/// Benchmark skip then iterate remaining elements
[<Benchmark>]
member this.Skip() =
let skipCount = this.ElementCount / 2
AsyncSeq.replicate this.ElementCount 1
|> AsyncSeq.skip skipCount
|> AsyncSeq.iterAsync (fun _ -> async.Return())
|> Async.RunSynchronously

/// Benchmark skip then take (common pagination pattern)
[<Benchmark>]
member this.SkipThenTake() =
let page = this.ElementCount / 10
AsyncSeq.replicate this.ElementCount 1
|> AsyncSeq.skip page
|> AsyncSeq.take page
|> AsyncSeq.iterAsync (fun _ -> async.Return())
|> Async.RunSynchronously

/// Entry point for running benchmarks.
/// Delegates directly to BenchmarkSwitcher so all BenchmarkDotNet CLI options
/// (--filter, --job short, --exporters, etc.) work out of the box.
Expand Down
56 changes: 56 additions & 0 deletions tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -3662,3 +3662,59 @@ let ``AsyncSeq.insertAt raises ArgumentException when index exceeds length`` ()
|> AsyncSeq.toArrayAsync
|> Async.RunSynchronously |> ignore)
|> ignore

[<Test>]
let ``AsyncSeq.take more than length returns all elements`` () =
let result =
AsyncSeq.ofSeq [ 1; 2; 3 ]
|> AsyncSeq.take 10
|> AsyncSeq.toArrayAsync
|> Async.RunSynchronously
Assert.AreEqual([| 1; 2; 3 |], result)

[<Test>]
let ``AsyncSeq.take raises ArgumentException for negative count`` () =
Assert.Throws<System.ArgumentException>(fun () ->
AsyncSeq.ofSeq [ 1; 2; 3 ]
|> AsyncSeq.take -1
|> AsyncSeq.toArrayAsync
|> Async.RunSynchronously |> ignore)
|> ignore

[<Test>]
let ``AsyncSeq.take from infinite sequence`` () =
let result =
AsyncSeq.replicateInfinite 7
|> AsyncSeq.take 5
|> AsyncSeq.toArrayAsync
|> Async.RunSynchronously
Assert.AreEqual([| 7; 7; 7; 7; 7 |], result)

[<Test>]
let ``AsyncSeq.skip more than length returns empty`` () =
let result =
AsyncSeq.ofSeq [ 1; 2; 3 ]
|> AsyncSeq.skip 10
|> AsyncSeq.toArrayAsync
|> Async.RunSynchronously
Assert.AreEqual([||], result)

[<Test>]
let ``AsyncSeq.skip raises ArgumentException for negative count`` () =
Assert.Throws<System.ArgumentException>(fun () ->
AsyncSeq.ofSeq [ 1; 2; 3 ]
|> AsyncSeq.skip -1
|> AsyncSeq.toArrayAsync
|> Async.RunSynchronously |> ignore)
|> ignore

[<Test>]
let ``AsyncSeq.take then skip roundtrip`` () =
let source = [| 1..20 |]
let result =
AsyncSeq.ofSeq source
|> AsyncSeq.skip 5
|> AsyncSeq.take 10
|> AsyncSeq.toArrayAsync
|> Async.RunSynchronously
Assert.AreEqual([| 6..15 |], result)