Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 35 additions & 31 deletions src/FSharp.Control.AsyncSeq/AsyncSeq.fs
Original file line number Diff line number Diff line change
Expand Up @@ -2332,43 +2332,47 @@

let traverseOptionAsync (f:'T -> Async<'U option>) (source:AsyncSeq<'T>) : Async<AsyncSeq<'U> option> = async {
use ie = source.GetEnumerator()
let! move = ie.MoveNext()
let b = ref move
let! first = ie.MoveNext()
let mutable current = first
let mutable failed = false
let buffer = ResizeArray<_>()
let fail = ref false
while b.Value.IsSome && not fail.Value do
let! vOpt = f b.Value.Value
match vOpt with
| Some v -> buffer.Add v
| None -> b := None; fail := true
let! moven = ie.MoveNext()
b := moven
if fail.Value then
return None
while current.IsSome && not failed do
let! vOpt = f current.Value
match vOpt with
| Some v ->
buffer.Add v
let! next = ie.MoveNext()
current <- next
| None ->
failed <- true
if failed then
return None
else
let res = buffer.ToArray()
return Some (asyncSeq { for v in res do yield v })
}
let res = buffer.ToArray()
return Some (asyncSeq { for v in res do yield v })
}

let traverseChoiceAsync (f:'T -> Async<Choice<'U, 'e>>) (source:AsyncSeq<'T>) : Async<Choice<AsyncSeq<'U>, 'e>> = async {
use ie = source.GetEnumerator()
let! move = ie.MoveNext()
let b = ref move
let! first = ie.MoveNext()
let mutable current = first
let mutable failWith = ValueNone
let buffer = ResizeArray<_>()
let fail = ref None
while b.Value.IsSome && fail.Value.IsNone do
let! vOpt = f b.Value.Value
match vOpt with
| Choice1Of2 v -> buffer.Add v
| Choice2Of2 err -> b := None; fail := Some err
let! moven = ie.MoveNext()
b := moven
match fail.Value with
| Some err -> return Choice2Of2 err
| None ->
let res = buffer.ToArray()
return Choice1Of2 (asyncSeq { for v in res do yield v })
}
while current.IsSome && failWith.IsNone do
let! vOpt = f current.Value
match vOpt with
| Choice1Of2 v ->
buffer.Add v
let! next = ie.MoveNext()
current <- next
| Choice2Of2 err ->
failWith <- ValueSome err
match failWith with
| ValueSome err -> return Choice2Of2 err
| ValueNone ->
let res = buffer.ToArray()
return Choice1Of2 (asyncSeq { for v in res do yield v })
}

#if (NETSTANDARD || NET)
#if !FABLE_COMPILER
Expand Down Expand Up @@ -2450,7 +2454,7 @@

[<CompilerMessage("The result of groupBy must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.", 9999)>]
let groupBy (p:'a -> 'k) (s:AsyncSeq<'a>) : AsyncSeq<'k * AsyncSeq<'a>> =
groupByAsync (p >> async.Return) s

Check warning on line 2457 in src/FSharp.Control.AsyncSeq/AsyncSeq.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupByAsync must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.

Check warning on line 2457 in src/FSharp.Control.AsyncSeq/AsyncSeq.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupByAsync must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
#endif
#endif

Expand Down
47 changes: 47 additions & 0 deletions tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -1211,6 +1211,53 @@
Assert.AreEqual("oh no", e)
Assert.True(([1;2] = (seen |> List.ofSeq)))

[<Test>]
let ``AsyncSeq.traverseOptionAsync returns Some sequence when all elements succeed``() =
let s = [1;2;3] |> AsyncSeq.ofSeq
let f i = Some (i * 10) |> async.Return
let r = AsyncSeq.traverseOptionAsync f s |> Async.RunSynchronously
match r with
| None -> Assert.Fail("Expected Some")
| Some result ->
let values = result |> AsyncSeq.toListAsync |> Async.RunSynchronously
Assert.AreEqual([10;20;30], values)

[<Test>]
let ``AsyncSeq.traverseOptionAsync does not read past failing element``() =
let readCount = ref 0
let s = asyncSeq {
for i in 1..10 do
incr readCount
yield i
}
let f i = (if i <= 3 then Some i else None) |> async.Return
let _r = AsyncSeq.traverseOptionAsync f s |> Async.RunSynchronously
// f returns None on element 4; only elements 1..4 should be read from source
Assert.AreEqual(4, readCount.Value)

[<Test>]
let ``AsyncSeq.traverseChoiceAsync returns Choice1Of2 sequence when all elements succeed``() =
let s = [1;2;3] |> AsyncSeq.ofSeq
let f i = Choice1Of2 (i * 10) |> async.Return
let r = AsyncSeq.traverseChoiceAsync f s |> Async.RunSynchronously
match r with
| Choice2Of2 _ -> Assert.Fail("Expected Choice1Of2")
| Choice1Of2 result ->
let values = result |> AsyncSeq.toListAsync |> Async.RunSynchronously
Assert.AreEqual([10;20;30], values)

[<Test>]
let ``AsyncSeq.traverseChoiceAsync does not read past failing element``() =
let readCount = ref 0
let s = asyncSeq {
for i in 1..10 do
incr readCount
yield i
}
let f i = (if i <= 3 then Choice1Of2 i else Choice2Of2 "stop") |> async.Return
let _r = AsyncSeq.traverseChoiceAsync f s |> Async.RunSynchronously
// f returns Choice2Of2 on element 4; only elements 1..4 should be read from source
Assert.AreEqual(4, readCount.Value)

[<Test>]
let ``AsyncSeq.toBlockingSeq does not hung forever and rethrows exception``() =
Expand Down Expand Up @@ -2000,7 +2047,7 @@
let actual =
ls
|> AsyncSeq.ofSeq
|> AsyncSeq.groupBy p

Check warning on line 2050 in tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupBy must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
|> AsyncSeq.mapAsyncParallel (snd >> AsyncSeq.toListAsync)
Assert.AreEqual(expected, actual)

Expand All @@ -2009,7 +2056,7 @@
let expected = asyncSeq { raise (exn("test")) }
let actual =
asyncSeq { raise (exn("test")) }
|> AsyncSeq.groupBy (fun i -> i % 3)

Check warning on line 2059 in tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupBy must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
|> AsyncSeq.mapAsyncParallel (snd >> AsyncSeq.toListAsync)
Assert.AreEqual(expected, actual)

Expand Down
Loading