class Source
extends _StreamImpl<T, E>

The entry point for creating a Stream. Wraps an async generator so that yielded values become success results and any thrown error becomes a single error result.

Use Source.from to create a source from an existing AsyncIterable or generator function. Use Source.withConcurrency and Source.withBufferSize to configure parallel execution.

Examples

Example 1

import { Source } from "anabranch";

const stream = Source.from<number, Error>(async function* () {
  yield 1;
  yield 2;
  yield 3;
});

const results = await stream.collect();

Type Parameters

Static Methods

from<T, E>(source: AsyncIterable<T>): Source<T, E>

Creates a Source from an existing AsyncIterable or async generator function. Each value emitted becomes a success result; any thrown error becomes an error result.

from<T, E>(fn: () => AsyncGenerator<T>): Source<T, E>
fromArray<T>(items: T[]): Source<T, never>

Creates a Source from an array of values.

fromRange(
start: number,
end: number
): Source<number, never>

Creates a Source that emits a range of numbers from start (inclusive) to end (exclusive). Each number is emitted as a success result.

fromResults<T, E>(source: () => AsyncGenerator<Result<T, E>>): Source<T, E>

Creates a Source from an async generator that yields Result values directly. This is useful when you want to yield both successes and errors from the source without terminating it on the first error.

fromSchedule(
cron: string,
options?: { signal?: AbortSignal; }
): Source<Tick, never>

Creates a Source that yields a Tick on each cron schedule match. Supports 5-field (minute) and 6-field (second) cron expressions, day/month names (MON-FRI, JAN), and aliases (@daily, @hourly).

This is an in-process scheduler — timing is best-effort (typically within ~50ms). If a tick handler runs longer than the interval, the next tick fires after the handler completes (ticks are not queued). State is not persisted across process restarts.

fromTask<T, E>(task: Task<T, E>): Source<T, E>

Creates a Source from a Task.

Methods

withBufferSize(n: number): Source<T, E>

Sets the maximum number of buffered results before backpressure is applied to the stream. If the buffer is full, the stream will pause until there is space in the buffer for new results.

withConcurrency(n: number): Source<T, E>

Sets the maximum number of concurrent operations for the stream.