class Source
extends _StreamImpl<T, E>

The entry point for creating a Stream. Wraps an async generator so that yielded values become success results and any thrown error becomes a single error result.

Use Source.from to create a source from an existing AsyncIterable or generator function. Use Source.withConcurrency and Source.withBufferSize to configure parallel execution.

Examples

Example 1

import { Source } from "anabranch";

const stream = Source.from<number, Error>(async function* () {
  yield 1;
  yield 2;
  yield 3;
});

const results = await stream.collect();

Constructors

Type Parameters

Static Methods

from<
T,
E,
>
(source: AsyncIterable<T>): Source<T, E>

Creates a Source from an existing AsyncIterable or async generator function. Each value emitted becomes a success result; any thrown error becomes an error result.

from<
T,
E,
>
(fn: () => AsyncGenerator<T>): Source<T, E>
fromArray<T>(items: T[]): Source<T, never>

Creates a Source from an array of values.

fromResults<
T,
E,
>
(source: () => AsyncGenerator<Result<T, E>>): Source<T, E>

Creates a Source from an async generator that yields Result values directly. This is useful when you want to yield both successes and errors from the source without terminating it on the first error.

fromTask<
T,
E,
>
(task: Task<T, E>): Source<T, E>

Creates a Source from a Task.

Methods

withBufferSize(n: number): Source<T, E>

Sets the maximum number of buffered results before backpressure is applied to the stream. If the buffer is full, the stream will pause until there is space in the buffer for new results.

withConcurrency(n: number): Source<T, E>

Sets the maximum number of concurrent operations for the stream.