Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
### 4.9.0

* Performance: `filterAsync` β€” replaced `asyncSeq`-builder implementation with a direct optimised enumerator, reducing allocation and generator overhead.
* Performance: `chooseAsync` β€” fallback (non-`AsyncSeqOp`) path now uses a direct optimised enumerator instead of the `asyncSeq` builder.
* Performance: `foldAsync` β€” fallback (non-`AsyncSeqOp`) path now uses a direct loop instead of composing `scanAsync` + `lastOrDefault`, avoiding intermediate sequence allocations.
* Benchmarks: added `AsyncSeqFilterChooseFoldBenchmarks` and `AsyncSeqPipelineBenchmarks` benchmark classes to measure `filterAsync`, `chooseAsync`, `foldAsync`, `toArrayAsync`, and common multi-step pipelines.

### 4.8.0

* Added `AsyncSeq.mapFoldAsync` β€” maps each element using an asynchronous folder that also threads an accumulator state, returning both the array of results and the final state; mirrors `Seq.mapFold`.
Expand Down
75 changes: 64 additions & 11 deletions src/FSharp.Control.AsyncSeq/AsyncSeq.fs
Original file line number Diff line number Diff line change
Expand Up @@ -924,6 +924,56 @@
disposed <- true
source.Dispose()

// Optimized filterAsync enumerator that avoids computation builder overhead
type private OptimizedFilterAsyncEnumerator<'T>(source: IAsyncSeqEnumerator<'T>, f: 'T -> Async<bool>) =
let mutable disposed = false

interface IAsyncSeqEnumerator<'T> with
member _.MoveNext() = async {
let mutable result: 'T option = None
let mutable isDone = false
while not isDone do
let! moveResult = source.MoveNext()
match moveResult with
| None -> isDone <- true
| Some value ->
let! keep = f value
if keep then
result <- Some value
isDone <- true
return result }

member _.Dispose() =
if not disposed then
disposed <- true
source.Dispose()

// Optimized chooseAsync enumerator that avoids computation builder overhead
type private OptimizedChooseAsyncEnumerator<'T, 'U>(source: IAsyncSeqEnumerator<'T>, f: 'T -> Async<'U option>) =
let mutable disposed = false

interface IAsyncSeqEnumerator<'U> with
member _.MoveNext() = async {
let mutable result: 'U option = None
let mutable isDone = false
while not isDone do
let! moveResult = source.MoveNext()
match moveResult with
| None -> isDone <- true
| Some value ->
let! chosen = f value
match chosen with
| Some u ->
result <- Some u
isDone <- true
| None -> ()
return result }

member _.Dispose() =
if not disposed then
disposed <- true
source.Dispose()

let mapAsync f (source : AsyncSeq<'T>) : AsyncSeq<'TResult> =
match source with
| :? AsyncSeqOp<'T> as source -> source.MapAsync f
Expand Down Expand Up @@ -1008,12 +1058,7 @@
match source with
| :? AsyncSeqOp<'T> as source -> source.ChooseAsync f
| _ ->
asyncSeq {
for itm in source do
let! v = f itm
match v with
| Some v -> yield v
| _ -> () }
AsyncSeqImpl(fun () -> new OptimizedChooseAsyncEnumerator<'T, 'U>(source.GetEnumerator(), f) :> IAsyncSeqEnumerator<'U>) :> AsyncSeq<'U>

let ofSeqAsync (source:seq<Async<'T>>) : AsyncSeq<'T> =
asyncSeq {
Expand All @@ -1022,10 +1067,8 @@
yield v
}

let filterAsync f (source : AsyncSeq<'T>) = asyncSeq {
for v in source do
let! b = f v
if b then yield v }
let filterAsync f (source : AsyncSeq<'T>) : AsyncSeq<'T> =
AsyncSeqImpl(fun () -> new OptimizedFilterAsyncEnumerator<'T>(source.GetEnumerator(), f) :> IAsyncSeqEnumerator<'T>) :> AsyncSeq<'T>

let tryLast (source : AsyncSeq<'T>) = async {
use ie = source.GetEnumerator()
Expand Down Expand Up @@ -1271,7 +1314,17 @@
let foldAsync f (state:'State) (source : AsyncSeq<'T>) =
match source with
| :? AsyncSeqOp<'T> as source -> source.FoldAsync f state
| _ -> source |> scanAsync f state |> lastOrDefault state
| _ -> async {
use ie = source.GetEnumerator()
let mutable st = state
let! move = ie.MoveNext()
let mutable b = move
while b.IsSome do
let! st' = f st b.Value
st <- st'
let! next = ie.MoveNext()
b <- next
return st }

let fold f (state:'State) (source : AsyncSeq<'T>) =
foldAsync (fun st v -> f st v |> async.Return) state source
Expand Down Expand Up @@ -2397,7 +2450,7 @@

[<CompilerMessage("The result of groupBy must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.", 9999)>]
let groupBy (p:'a -> 'k) (s:AsyncSeq<'a>) : AsyncSeq<'k * AsyncSeq<'a>> =
groupByAsync (p >> async.Return) s

Check warning on line 2453 in src/FSharp.Control.AsyncSeq/AsyncSeq.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupByAsync must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.

Check warning on line 2453 in src/FSharp.Control.AsyncSeq/AsyncSeq.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupByAsync must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
#endif
#endif

Expand Down
116 changes: 79 additions & 37 deletions tests/FSharp.Control.AsyncSeq.Benchmarks/AsyncSeqBenchmarks.fs
Original file line number Diff line number Diff line change
Expand Up @@ -113,43 +113,85 @@ type AsyncSeqBuilderBenchmarks() =
|> AsyncSeq.iterAsync (fun _ -> async.Return())
|> Async.RunSynchronously

/// Entry point for running benchmarks
/// Benchmarks for filter, choose, and fold operations (optimised direct-enumerator implementations)
[<MemoryDiagnoser>]
[<SimpleJob(RuntimeMoniker.Net80)>]
type AsyncSeqFilterChooseFoldBenchmarks() =

[<Params(1000, 10000)>]
member val ElementCount = 0 with get, set

/// Benchmark filterAsync β€” all elements pass the predicate
[<Benchmark(Baseline = true)>]
member this.FilterAsyncAllPass() =
AsyncSeq.replicate this.ElementCount 1
|> AsyncSeq.filterAsync (fun _ -> async.Return true)
|> AsyncSeq.iterAsync (fun _ -> async.Return())
|> Async.RunSynchronously

/// Benchmark filterAsync β€” no elements pass the predicate (entire sequence scanned)
[<Benchmark>]
member this.FilterAsyncNonePass() =
AsyncSeq.replicate this.ElementCount 1
|> AsyncSeq.filterAsync (fun _ -> async.Return false)
|> AsyncSeq.iterAsync (fun _ -> async.Return())
|> Async.RunSynchronously

/// Benchmark chooseAsync β€” all elements selected
[<Benchmark>]
member this.ChooseAsyncAllSelected() =
AsyncSeq.replicate this.ElementCount 42
|> AsyncSeq.chooseAsync (fun x -> async.Return (Some x))
|> AsyncSeq.iterAsync (fun _ -> async.Return())
|> Async.RunSynchronously

/// Benchmark foldAsync β€” sum all elements
[<Benchmark>]
member this.FoldAsync() =
AsyncSeq.replicate this.ElementCount 1
|> AsyncSeq.foldAsync (fun acc x -> async.Return (acc + x)) 0
|> Async.RunSynchronously
|> ignore

/// Benchmarks for multi-step pipeline composition
[<MemoryDiagnoser>]
[<SimpleJob(RuntimeMoniker.Net80)>]
type AsyncSeqPipelineBenchmarks() =

[<Params(1000, 10000)>]
member val ElementCount = 0 with get, set

/// Benchmark map β†’ filter β†’ fold pipeline (exercises the three optimised combinators together)
[<Benchmark(Baseline = true)>]
member this.MapFilterFold() =
AsyncSeq.replicate this.ElementCount 1
|> AsyncSeq.mapAsync (fun x -> async.Return (x * 2))
|> AsyncSeq.filterAsync (fun x -> async.Return (x > 0))
|> AsyncSeq.foldAsync (fun acc x -> async.Return (acc + x)) 0
|> Async.RunSynchronously
|> ignore

/// Benchmark collecting to an array
[<Benchmark>]
member this.ToArray() =
AsyncSeq.replicate this.ElementCount 1
|> AsyncSeq.toArrayAsync
|> Async.RunSynchronously
|> ignore

/// Entry point for running benchmarks.
/// Delegates directly to BenchmarkSwitcher so all BenchmarkDotNet CLI options
/// (--filter, --job short, --exporters, etc.) work out of the box.
/// Examples:
/// dotnet run -c Release # run all
/// dotnet run -c Release -- --filter '*Filter*' # specific class
/// dotnet run -c Release -- --filter '*' --job short # quick smoke-run
module AsyncSeqBenchmarkRunner =

[<EntryPoint>]
let Main args =
printfn "AsyncSeq Performance Benchmarks"
printfn "================================"
printfn "Running comprehensive performance benchmarks to establish baseline metrics"
printfn "and verify fixes for known performance issues (memory leaks, O(nΒ²) patterns)."
printfn ""

let result =
match args |> Array.tryHead with
| Some "core" ->
printfn "Running Core Operations Benchmarks..."
BenchmarkRunner.Run<AsyncSeqCoreBenchmarks>() |> ignore
0
| Some "append" ->
printfn "Running Append Operations Benchmarks..."
BenchmarkRunner.Run<AsyncSeqAppendBenchmarks>() |> ignore
0
| Some "builder" ->
printfn "Running Builder Pattern Benchmarks..."
BenchmarkRunner.Run<AsyncSeqBuilderBenchmarks>() |> ignore
0
| Some "all" | None ->
printfn "Running All Benchmarks..."
BenchmarkRunner.Run<AsyncSeqCoreBenchmarks>() |> ignore
BenchmarkRunner.Run<AsyncSeqAppendBenchmarks>() |> ignore
BenchmarkRunner.Run<AsyncSeqBuilderBenchmarks>() |> ignore
0
| Some suite ->
printfn "Unknown benchmark suite: %s" suite
printfn "Available suites: core, append, builder, all"
1

printfn ""
printfn "Benchmarks completed. Results provide baseline performance metrics"
printfn "for future performance improvements and regression detection."
result
BenchmarkSwitcher
.FromAssembly(typeof<AsyncSeqCoreBenchmarks>.Assembly)
.Run(args)
|> ignore
0
Loading