Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
### 4.10.0

* Added `AsyncSeq.withCancellation` β€” returns a new `AsyncSeq` that passes the given `CancellationToken` to `GetAsyncEnumerator`, overriding whatever token would otherwise be supplied. Mirrors `TaskSeq.withCancellation` and is useful when consuming sequences from libraries (e.g. Entity Framework) that accept a cancellation token through `GetAsyncEnumerator`. Part of ongoing design-parity work with FSharp.Control.TaskSeq (see #277).

### 4.9.0

* Performance: `filterAsync` β€” replaced `asyncSeq`-builder implementation with a direct optimised enumerator, reducing allocation and generator overhead.
Expand Down
7 changes: 7 additions & 0 deletions src/FSharp.Control.AsyncSeq/AsyncSeq.fs
Original file line number Diff line number Diff line change
Expand Up @@ -2450,7 +2450,7 @@

[<CompilerMessage("The result of groupBy must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.", 9999)>]
let groupBy (p:'a -> 'k) (s:AsyncSeq<'a>) : AsyncSeq<'k * AsyncSeq<'a>> =
groupByAsync (p >> async.Return) s

Check warning on line 2453 in src/FSharp.Control.AsyncSeq/AsyncSeq.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupByAsync must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.

Check warning on line 2453 in src/FSharp.Control.AsyncSeq/AsyncSeq.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupByAsync must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
#endif
#endif

Expand Down Expand Up @@ -2522,6 +2522,13 @@
(emptyAsync fillChannelTask)
}

/// Returns a new AsyncSeq that passes the given CancellationToken to GetAsyncEnumerator,
/// overriding whatever token would otherwise be used. Useful when consuming sequences from
/// libraries (such as Entity Framework) that accept a CancellationToken through GetAsyncEnumerator.
let withCancellation (cancellationToken: CancellationToken) (source: AsyncSeq<'T>) : AsyncSeq<'T> =
{ new IAsyncEnumerable<'T> with
member _.GetAsyncEnumerator(_ct) = source.GetAsyncEnumerator(cancellationToken) }

#endif


Expand Down
8 changes: 8 additions & 0 deletions src/FSharp.Control.AsyncSeq/AsyncSeq.fsi
Original file line number Diff line number Diff line change
Expand Up @@ -824,6 +824,14 @@ module AsyncSeq =
/// Transforms an async seq to a new one that fetches values ahead of time to improve throughput.
val prefetch<'T> : numberToPrefetch: int -> source: AsyncSeq<'T> -> AsyncSeq<'T>

/// <summary>
/// Returns a new <c>AsyncSeq</c> that passes the given <c>CancellationToken</c> to
/// <c>GetAsyncEnumerator</c>, overriding whatever token would otherwise be used when iterating.
/// This is useful when consuming sequences from libraries such as Entity Framework that
/// accept a <c>CancellationToken</c> through <c>GetAsyncEnumerator</c>.
/// </summary>
val withCancellation<'T> : cancellationToken: System.Threading.CancellationToken -> source: AsyncSeq<'T> -> AsyncSeq<'T>

#endif


Expand Down
57 changes: 57 additions & 0 deletions tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -2000,7 +2000,7 @@
let actual =
ls
|> AsyncSeq.ofSeq
|> AsyncSeq.groupBy p

Check warning on line 2003 in tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupBy must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
|> AsyncSeq.mapAsyncParallel (snd >> AsyncSeq.toListAsync)
Assert.AreEqual(expected, actual)

Expand All @@ -2009,7 +2009,7 @@
let expected = asyncSeq { raise (exn("test")) }
let actual =
asyncSeq { raise (exn("test")) }
|> AsyncSeq.groupBy (fun i -> i % 3)

Check warning on line 2012 in tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupBy must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
|> AsyncSeq.mapAsyncParallel (snd >> AsyncSeq.toListAsync)
Assert.AreEqual(expected, actual)

Expand Down Expand Up @@ -3662,3 +3662,60 @@
|> AsyncSeq.toArrayAsync
|> Async.RunSynchronously |> ignore)
|> ignore

// ===== withCancellation =====

[<Test>]
let ``AsyncSeq.withCancellation passes token to enumerator`` () =
use cts = new System.Threading.CancellationTokenSource()
let receivedToken = ref System.Threading.CancellationToken.None
let source =
{ new System.Collections.Generic.IAsyncEnumerable<int> with
member _.GetAsyncEnumerator(ct) =
receivedToken.Value <- ct
(AsyncSeq.ofSeq [1; 2; 3]).GetAsyncEnumerator(ct) }
source
|> AsyncSeq.withCancellation cts.Token
|> AsyncSeq.toArrayAsync
|> Async.RunSynchronously
|> ignore
Assert.AreEqual(cts.Token, receivedToken.Value)

[<Test>]
let ``AsyncSeq.withCancellation overrides incoming token`` () =
use cts1 = new System.Threading.CancellationTokenSource()
use cts2 = new System.Threading.CancellationTokenSource()
let receivedToken = ref System.Threading.CancellationToken.None
let source : System.Collections.Generic.IAsyncEnumerable<int> =
{ new System.Collections.Generic.IAsyncEnumerable<int> with
member _.GetAsyncEnumerator(ct) =
receivedToken.Value <- ct
(AsyncSeq.ofSeq [1; 2; 3]).GetAsyncEnumerator(ct) }
let wrapped = source |> AsyncSeq.withCancellation cts1.Token
// Enumerate with cts2's token - withCancellation should still pass cts1's token
let e = wrapped.GetAsyncEnumerator(cts2.Token)
e.MoveNextAsync().AsTask() |> Async.AwaitTask |> Async.RunSynchronously |> ignore
e.DisposeAsync() |> ignore
Assert.AreEqual(cts1.Token, receivedToken.Value)

[<Test>]
let ``AsyncSeq.withCancellation preserves sequence values`` () =
use cts = new System.Threading.CancellationTokenSource()
let result =
AsyncSeq.ofSeq [1; 2; 3; 4; 5]
|> AsyncSeq.withCancellation cts.Token
|> AsyncSeq.toArrayAsync
|> Async.RunSynchronously
Assert.AreEqual([| 1; 2; 3; 4; 5 |], result)

[<Test>]
let ``AsyncSeq.withCancellation with cancelled token raises OperationCanceledException`` () =
use cts = new System.Threading.CancellationTokenSource()
cts.Cancel()
Assert.Catch<System.OperationCanceledException>(fun () ->
AsyncSeq.ofSeq [1; 2; 3]
|> AsyncSeq.withCancellation cts.Token
|> AsyncSeq.toArrayAsync
|> Async.RunSynchronously
|> ignore)
|> ignore
2 changes: 1 addition & 1 deletion version.props
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
<Project>
<PropertyGroup>
<Version>4.8.0</Version>
<Version>4.10.0</Version>
</PropertyGroup>
</Project>
Loading