Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
### 4.11.0

* Design parity with FSharp.Control.TaskSeq (#277, batch 2):
* Added `AsyncSeq.tryTail` β€” returns `None` if the sequence is empty; otherwise returns `Some` of the tail. Safe counterpart to `tail`. Mirrors `TaskSeq.tryTail`.
* Added `AsyncSeq.where` / `AsyncSeq.whereAsync` β€” aliases for `filter` / `filterAsync`, mirroring the naming convention in `TaskSeq` and F# 8 collection expressions.
* Added `AsyncSeq.lengthBy` / `AsyncSeq.lengthByAsync` β€” counts elements satisfying a predicate. Mirrors `TaskSeq.lengthBy` / `TaskSeq.lengthByAsync`.
* Added `AsyncSeq.compareWith` / `AsyncSeq.compareWithAsync` β€” lexicographically compares two async sequences using a comparison function. Mirrors `TaskSeq.compareWith` / `TaskSeq.compareWithAsync`.
* Added `AsyncSeq.takeWhileInclusiveAsync` β€” async variant of the existing `takeWhileInclusive`. Mirrors `TaskSeq.takeWhileInclusiveAsync`.
* Added `AsyncSeq.skipWhileInclusive` / `AsyncSeq.skipWhileInclusiveAsync` β€” skips elements while predicate holds and also skips the first non-matching boundary element. Mirrors `TaskSeq.skipWhileInclusive` / `TaskSeq.skipWhileInclusiveAsync`.
* Added `AsyncSeq.appendSeq` β€” appends a synchronous `seq<'T>` after an async sequence. Mirrors `TaskSeq.appendSeq`.
* Added `AsyncSeq.prependSeq` β€” prepends a synchronous `seq<'T>` before an async sequence. Mirrors `TaskSeq.prependSeq`.
* Added `AsyncSeq.delay` β€” defers sequence creation to enumeration time by calling a factory function each time `GetAsyncEnumerator` is called. Mirrors `TaskSeq.delay`.
* Added `AsyncSeq.collectAsync` β€” like `collect` but the mapping function is asynchronous (`'T -> Async<AsyncSeq<'U>>`). Mirrors `TaskSeq.collectAsync`.
* Added `AsyncSeq.partition` / `AsyncSeq.partitionAsync` β€” splits a sequence into two arrays using a (optionally async) predicate; the first array contains matching elements, the second non-matching. Mirrors `TaskSeq.partition` / `TaskSeq.partitionAsync`.

### 4.10.0

* Added `AsyncSeq.withCancellation` β€” returns a new `AsyncSeq` that passes the given `CancellationToken` to `GetAsyncEnumerator`, overriding whatever token would otherwise be supplied. Mirrors `TaskSeq.withCancellation` and is useful when consuming sequences from libraries (e.g. Entity Framework) that accept a cancellation token through `GetAsyncEnumerator`. Part of ongoing design-parity work with FSharp.Control.TaskSeq (see #277).
Expand Down
121 changes: 119 additions & 2 deletions src/FSharp.Control.AsyncSeq/AsyncSeq.fs
Original file line number Diff line number Diff line change
Expand Up @@ -492,8 +492,8 @@
| Some enum -> dispose enum
| None -> () }) :> AsyncSeq<'T>

let inline delay (f: unit -> AsyncSeq<'T>) : AsyncSeq<'T> =
AsyncGenerator.delay f
let delay (f: unit -> AsyncSeq<'T>) : AsyncSeq<'T> =
AsyncSeqImpl(fun () -> (f()).GetEnumerator()) :> AsyncSeq<'T>

let bindAsync (f:'T -> AsyncSeq<'U>) (inp:Async<'T>) : AsyncSeq<'U> =
AsyncSeqImpl(fun () ->
Expand Down Expand Up @@ -703,6 +703,9 @@
let collect (f: 'T -> AsyncSeq<'U>) (inp: AsyncSeq<'T>) : AsyncSeq<'U> =
AsyncSeqImpl(fun () -> new OptimizedCollectEnumerator<'T, 'U>(f, inp) :> IAsyncSeqEnumerator<'U>) :> AsyncSeq<'U>

let collectAsync (mapping: 'T -> Async<AsyncSeq<'U>>) (source: AsyncSeq<'T>) : AsyncSeq<'U> =
collect (fun x -> bindAsync id (mapping x)) source

// let collect (f: 'T -> AsyncSeq<'U>) (inp: AsyncSeq<'T>) : AsyncSeq<'U> =
// AsyncGenerator.collect f inp

Expand Down Expand Up @@ -787,6 +790,12 @@
dispose e
| _ -> () }) :> AsyncSeq<'T>

let appendSeq (seq2: seq<'T>) (source: AsyncSeq<'T>) : AsyncSeq<'T> =
append source (ofSeq seq2)

let prependSeq (seq1: seq<'T>) (source: AsyncSeq<'T>) : AsyncSeq<'T> =
append (ofSeq seq1) source

// Optimized iterAsync implementation to reduce allocations
type internal OptimizedIterAsyncEnumerator<'T>(enumerator: IAsyncSeqEnumerator<'T>, f: 'T -> Async<unit>) =
let mutable disposed = false
Expand Down Expand Up @@ -1311,6 +1320,33 @@
let forallAsync f (source : AsyncSeq<'T>) =
source |> existsAsync (fun v -> async { let! b = f v in return not b }) |> Async.map not

let compareWithAsync (comparer: 'T -> 'T -> Async<int>) (source1: AsyncSeq<'T>) (source2: AsyncSeq<'T>) : Async<int> = async {
use ie1 = source1.GetEnumerator()
use ie2 = source2.GetEnumerator()
let! m1 = ie1.MoveNext()
let! m2 = ie2.MoveNext()
let b1 = ref m1
let b2 = ref m2
let result = ref 0
let isDone = ref false
while not isDone.Value do
match b1.Value, b2.Value with
| None, None -> isDone := true
| None, Some _ -> result := -1; isDone := true
| Some _, None -> result := 1; isDone := true
| Some v1, Some v2 ->
let! c = comparer v1 v2
if c <> 0 then result := c; isDone := true
else
let! n1 = ie1.MoveNext()
let! n2 = ie2.MoveNext()
b1 := n1
b2 := n2
return result.Value }

let compareWith (comparer: 'T -> 'T -> int) (source1: AsyncSeq<'T>) (source2: AsyncSeq<'T>) : Async<int> =
compareWithAsync (fun a b -> comparer a b |> async.Return) source1 source2

let foldAsync f (state:'State) (source : AsyncSeq<'T>) =
match source with
| :? AsyncSeqOp<'T> as source -> source.FoldAsync f state
Expand Down Expand Up @@ -1368,6 +1404,12 @@
let length (source : AsyncSeq<'T>) =
fold (fun st _ -> st + 1L) 0L source

let lengthByAsync (predicate: 'T -> Async<bool>) (source: AsyncSeq<'T>) : Async<int64> =
foldAsync (fun acc x -> async { let! ok = predicate x in return if ok then acc + 1L else acc }) 0L source

let lengthBy (predicate: 'T -> bool) (source: AsyncSeq<'T>) : Async<int64> =
lengthByAsync (predicate >> async.Return) source

let inline sum (source : AsyncSeq<'T>) : Async<'T> =
(LanguagePrimitives.GenericZero, source) ||> fold (+)

Expand Down Expand Up @@ -1498,6 +1540,12 @@
let filter f (source : AsyncSeq<'T>) =
filterAsync (f >> async.Return) source

let where (predicate: 'T -> bool) (source: AsyncSeq<'T>) : AsyncSeq<'T> =
filter predicate source

let whereAsync (predicate: 'T -> Async<bool>) (source: AsyncSeq<'T>) : AsyncSeq<'T> =
filterAsync predicate source

let except (excluded : seq<'T>) (source : AsyncSeq<'T>) : AsyncSeq<'T> =
let s = System.Collections.Generic.HashSet(excluded)
source |> filter (fun x -> not (s.Contains(x)))
Expand Down Expand Up @@ -1848,6 +1896,24 @@
interface System.IDisposable with
member _.Dispose() = en.Dispose() }) :> AsyncSeq<'a>

let takeWhileInclusiveAsync (predicate: 'T -> Async<bool>) (source: AsyncSeq<'T>) : AsyncSeq<'T> =
AsyncSeqImpl(fun () ->
let en = source.GetEnumerator()
let fin = ref false
{ new IAsyncSeqEnumerator<'T> with
member _.MoveNext() = async {
if !fin then return None
else
let! next = en.MoveNext()
match next with
| None -> return None
| Some a ->
let! ok = predicate a
if ok then return Some a
else fin := true; return Some a }
interface System.IDisposable with
member _.Dispose() = en.Dispose() }) :> AsyncSeq<'T>

let skipWhileAsync p (source : AsyncSeq<'T>) : AsyncSeq<_> = asyncSeq {
use ie = source.GetEnumerator()
let! move = ie.MoveNext()
Expand All @@ -1865,6 +1931,27 @@
let! moven = ie.MoveNext()
b := moven }

let skipWhileInclusiveAsync (predicate: 'T -> Async<bool>) (source: AsyncSeq<'T>) : AsyncSeq<'T> = asyncSeq {
use ie = source.GetEnumerator()
let! move = ie.MoveNext()
let b = ref move
let doneSkipping = ref false
while b.Value.IsSome do
let v = b.Value.Value
if doneSkipping.Value then
yield v
let! moven = ie.MoveNext()
b := moven
else
let! test = predicate v
if not test then
doneSkipping := true // skip this boundary element; do not yield it
let! moven = ie.MoveNext()
b := moven }

let skipWhileInclusive (predicate: 'T -> bool) (source: AsyncSeq<'T>) : AsyncSeq<'T> =
skipWhileInclusiveAsync (predicate >> async.Return) source

#if !FABLE_COMPILER
let skipUntilSignal (signal:Async<unit>) (source:AsyncSeq<'T>) : AsyncSeq<'T> = asyncSeq {
use ie = source.GetEnumerator()
Expand Down Expand Up @@ -1930,6 +2017,25 @@

let tail (source : AsyncSeq<'T>) : AsyncSeq<'T> = skip 1 source

let tryTail (source: AsyncSeq<'T>) : Async<AsyncSeq<'T> option> = async {
let ie = source.GetEnumerator()
let! first = ie.MoveNext()
match first with
| None ->
ie.Dispose()
return None
| Some _ ->
return Some (asyncSeq {
try
let! next = ie.MoveNext()
let b = ref next
while b.Value.IsSome do
yield b.Value.Value
let! moven = ie.MoveNext()
b := moven
finally
ie.Dispose() }) }

/// Splits an async sequence at the given index, returning the first `count` elements as an array
/// and the remaining elements as a new AsyncSeq. The source is enumerated once.
let splitAt (count: int) (source: AsyncSeq<'T>) : Async<'T array * AsyncSeq<'T>> = async {
Expand Down Expand Up @@ -1976,6 +2082,17 @@
let toArraySynchronously (source:AsyncSeq<'T>) = toArrayAsync source |> Async.RunSynchronously
#endif

let partitionAsync (predicate: 'T -> Async<bool>) (source: AsyncSeq<'T>) : Async<'T[] * 'T[]> = async {
let trues = ResizeArray<'T>()
let falses = ResizeArray<'T>()
do! source |> iterAsync (fun x -> async {
let! ok = predicate x
(if ok then trues else falses).Add(x) })
return trues.ToArray(), falses.ToArray() }

let partition (predicate: 'T -> bool) (source: AsyncSeq<'T>) : Async<'T[] * 'T[]> =
partitionAsync (predicate >> async.Return) source

let concatSeq (source:AsyncSeq<#seq<'T>>) : AsyncSeq<'T> = asyncSeq {
use ie = source.GetEnumerator()
let! move = ie.MoveNext()
Expand Down Expand Up @@ -2450,7 +2567,7 @@

[<CompilerMessage("The result of groupBy must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.", 9999)>]
let groupBy (p:'a -> 'k) (s:AsyncSeq<'a>) : AsyncSeq<'k * AsyncSeq<'a>> =
groupByAsync (p >> async.Return) s

Check warning on line 2570 in src/FSharp.Control.AsyncSeq/AsyncSeq.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupByAsync must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.

Check warning on line 2570 in src/FSharp.Control.AsyncSeq/AsyncSeq.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupByAsync must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
#endif
#endif

Expand Down
61 changes: 61 additions & 0 deletions src/FSharp.Control.AsyncSeq/AsyncSeq.fsi
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,18 @@ module AsyncSeq =
/// all elements of the second asynchronous sequence.
val append : seq1:AsyncSeq<'T> -> seq2:AsyncSeq<'T> -> AsyncSeq<'T>

/// Yields all elements of the source asynchronous sequence and then all elements of the
/// synchronous sequence appended at the end.
val appendSeq : seq2:seq<'T> -> source:AsyncSeq<'T> -> AsyncSeq<'T>

/// Yields all elements of the synchronous sequence first and then all elements of the
/// source asynchronous sequence.
val prependSeq : seq1:seq<'T> -> source:AsyncSeq<'T> -> AsyncSeq<'T>

/// Returns a new async sequence whose enumeration calls the factory function each time
/// it is enumerated. Useful for deferring the creation of a sequence until enumeration begins.
val delay : f:(unit -> AsyncSeq<'T>) -> AsyncSeq<'T>

/// Computation builder that allows creating of asynchronous
/// sequences using the 'asyncSeq { ... }' syntax
type AsyncSeqBuilder =
Expand Down Expand Up @@ -125,6 +137,10 @@ module AsyncSeq =
/// the 'for' keyword in asyncSeq computation).
val collect : mapping:('T -> AsyncSeq<'TResult>) -> source:AsyncSeq<'T> -> AsyncSeq<'TResult>

/// Like AsyncSeq.collect but the mapping function is asynchronous. For every input element
/// it calls the specified async function and iterates over all elements of the returned sequence.
val collectAsync : mapping:('T -> Async<AsyncSeq<'TResult>>) -> source:AsyncSeq<'T> -> AsyncSeq<'TResult>

/// Builds a new asynchronous sequence whose elements are generated by
/// applying the specified function to all elements of the input sequence.
///
Expand Down Expand Up @@ -389,12 +405,26 @@ module AsyncSeq =
/// Asynchronously determine if the async predicate returns true for all values in the sequence
val forallAsync : predicate:('T -> Async<bool>) -> source:AsyncSeq<'T> -> Async<bool>

/// Compares two async sequences lexicographically using the given synchronous comparison function.
/// Returns a negative integer if source1 < source2, 0 if equal, and a positive integer if source1 > source2.
val compareWith : comparer:('T -> 'T -> int) -> source1:AsyncSeq<'T> -> source2:AsyncSeq<'T> -> Async<int>

/// Compares two async sequences lexicographically using the given asynchronous comparison function.
/// Returns a negative integer if source1 < source2, 0 if equal, and a positive integer if source1 > source2.
val compareWithAsync : comparer:('T -> 'T -> Async<int>) -> source1:AsyncSeq<'T> -> source2:AsyncSeq<'T> -> Async<int>

/// Return an asynchronous sequence which, when iterated, includes an integer indicating the index of each element in the sequence.
val indexed : source:AsyncSeq<'T> -> AsyncSeq<int64 * 'T>

/// Asynchronously determine the number of elements in the sequence
val length : source:AsyncSeq<'T> -> Async<int64>

/// Asynchronously returns the number of elements in the sequence for which the predicate returns true.
val lengthBy : predicate:('T -> bool) -> source:AsyncSeq<'T> -> Async<int64>

/// Asynchronously returns the number of elements in the sequence for which the async predicate returns true.
val lengthByAsync : predicate:('T -> Async<bool>) -> source:AsyncSeq<'T> -> Async<int64>

/// Same as AsyncSeq.scanAsync, but the specified function is synchronous.
val scan : folder:('State -> 'T -> 'State) -> state:'State -> source:AsyncSeq<'T> -> AsyncSeq<'State>

Expand All @@ -414,6 +444,13 @@ module AsyncSeq =
/// and processes the input element immediately.
val filter : predicate:('T -> bool) -> source:AsyncSeq<'T> -> AsyncSeq<'T>

/// Alias for AsyncSeq.filter. Returns elements for which the predicate returns true.
/// Mirrors the naming convention in TaskSeq and FSharp.Core.
val where : predicate:('T -> bool) -> source:AsyncSeq<'T> -> AsyncSeq<'T>

/// Alias for AsyncSeq.filterAsync. Returns elements for which the async predicate returns true.
val whereAsync : predicate:('T -> Async<bool>) -> source:AsyncSeq<'T> -> AsyncSeq<'T>

/// Returns a new asynchronous sequence containing only elements that are not present
/// in the given excluded collection. Uses a HashSet for O(1) lookup. Mirrors Seq.except.
val except : excluded:seq<'T> -> source:AsyncSeq<'T> -> AsyncSeq<'T> when 'T : equality
Expand Down Expand Up @@ -593,11 +630,23 @@ module AsyncSeq =
/// Does return the first element that predicate fails
val takeWhileInclusive : predicate:('T -> bool) -> source:AsyncSeq<'T> -> AsyncSeq<'T>

/// Returns elements from an asynchronous sequence while the specified async predicate holds,
/// and also returns the first element for which the predicate returns false (inclusive).
val takeWhileInclusiveAsync : predicate:('T -> Async<bool>) -> source:AsyncSeq<'T> -> AsyncSeq<'T>

/// Skips elements from an asynchronous sequence while the specified
/// predicate holds and then returns the rest of the sequence. The
/// predicate is evaluated asynchronously.
val skipWhile : predicate:('T -> bool) -> source:AsyncSeq<'T> -> AsyncSeq<'T>

/// Skips elements from an asynchronous sequence while the predicate holds AND also skips the
/// first element for which the predicate returns false (the boundary element), then returns the rest.
val skipWhileInclusive : predicate:('T -> bool) -> source:AsyncSeq<'T> -> AsyncSeq<'T>

/// Skips elements from an asynchronous sequence while the async predicate holds AND also skips the
/// first element for which the predicate returns false (the boundary element), then returns the rest.
val skipWhileInclusiveAsync : predicate:('T -> Async<bool>) -> source:AsyncSeq<'T> -> AsyncSeq<'T>

/// Returns the first N elements of an asynchronous sequence
/// does not cast an exception if count is larger than the sequence length.
val take : count:int -> source:AsyncSeq<'T> -> AsyncSeq<'T>
Expand All @@ -613,6 +662,10 @@ module AsyncSeq =
/// Returns an empty sequence if the source is empty.
val tail : source:AsyncSeq<'T> -> AsyncSeq<'T>

/// Returns None if the source sequence is empty; otherwise returns Some of an async sequence
/// containing all elements except the first. The source is enumerated once.
val tryTail : source:AsyncSeq<'T> -> Async<AsyncSeq<'T> option>

/// Splits an async sequence at the given index. Returns an async computation that yields
/// the first `count` elements as an array and the remaining elements as a new AsyncSeq.
/// The source is enumerated once; the returned AsyncSeq lazily produces the remainder.
Expand All @@ -632,6 +685,14 @@ module AsyncSeq =
val toArraySynchronously : source:AsyncSeq<'T> -> 'T []
#endif

/// Splits the sequence into two arrays: the first contains elements for which the predicate
/// returns true, the second contains elements for which it returns false. Mirrors Seq.partition.
val partition : predicate:('T -> bool) -> source:AsyncSeq<'T> -> Async<'T [] * 'T []>

/// Splits the sequence into two arrays using an async predicate: the first contains elements
/// for which the predicate returns true, the second contains elements for which it returns false.
val partitionAsync : predicate:('T -> Async<bool>) -> source:AsyncSeq<'T> -> Async<'T [] * 'T []>

/// Flattens an AsyncSeq of synchronous sequences.
val concatSeq : source:AsyncSeq<#seq<'T>> -> AsyncSeq<'T>

Expand Down
Loading
Loading