Basic usage with Task semantics
Basic usage with Task semantics
import { EventLog, createInMemory } from "@anabranch/eventlog"; const connector = createInMemory(); const log = await EventLog.connect(connector).run(); const eventId = await log.append("users", { action: "created", userId: 123 }).run(); const events = await log.list("users").run();
Consuming events as a stream with auto-commit
Consuming events as a stream with auto-commit
const connector = createInMemory(); const log = await EventLog.connect(connector).run(); const { successes, errors } = await log .consume("users", "my-processor") .withConcurrency(5) .map(async (batch) => { for (const event of batch.events) { await processEvent(event.data); } }) .partition();
Event log wrapper with Task/Stream semantics for event-sourced systems.
-
append<T>(): Task<string, EventLogAppendFailed>topic: string,data: T,options?: AppendOptions
Append an event to a topic.
-
close(): Task<void, EventLogCloseFailed>
Close the event log connection.
-
commit(): Task<void, EventLogCommitCursorFailed>topic: string,consumerGroup: string,cursor: Cursor
Commit a cursor to mark progress for a consumer group.
-
connect<Cursor = string>(connector: EventLogConnector<Cursor>): Task<EventLog<Cursor>, EventLogConnectionFailed>
Connect to an event log via a connector.
-
consume<T>(): Channel<EventBatch<T, Cursor>, EventLogConsumeFailed>topic: string,consumerGroup: string,options?: ConsumeOptions<Cursor>
Consume events from a topic as a stream.
-
getCommittedCursor(): Task<Cursor | null, EventLogGetCursorFailed>topic: string,consumerGroup: string
Get the last committed cursor for a consumer group.
Error thrown when an append operation fails.
Error thrown when closing an event log connection fails.
Error thrown when committing a cursor fails.
Error thrown when an event log connection cannot be established.
Error thrown when consuming events fails.
Error thrown when getting a cursor fails.
The entry point for creating a Stream. Wraps an async generator so that yielded values become success results and any thrown error becomes a single error result.
-
from<T, E>(source: AsyncIterable<T>): Source<T, E>
Creates a Source from an existing
AsyncIterableor async generator function. Each value emitted becomes a success result; any thrown error becomes an error result. -
fromArray<T>(items: T[]): Source<T, never>
Creates a Source from an array of values.
-
fromRange(): Source<number, never>start: number,end: number
Creates a Source that emits a range of numbers from
start(inclusive) toend(exclusive). Each number is emitted as a success result. -
fromResults<T, E>(source: () => AsyncGenerator<Result<T, E>>): Source<T, E>
Creates a Source from an async generator that yields Result values directly. This is useful when you want to yield both successes and errors from the source without terminating it on the first error.
-
fromSchedule(): Source<Tick, never>cron: string,options?: { signal?: AbortSignal; }
Creates a Source that yields a Tick on each cron schedule match. Supports 5-field (minute) and 6-field (second) cron expressions, day/month names (MON-FRI, JAN), and aliases (@daily, @hourly).
-
fromTask<T, E>(task: Task<T, E>): Source<T, E>
Creates a Source from a Task.
-
withBufferSize(n: number): Source<T, E>
Sets the maximum number of buffered results before backpressure is applied to the stream. If the buffer is full, the stream will pause until there is space in the buffer for new results.
-
withConcurrency(n: number): Source<T, E>
Sets the maximum number of concurrent operations for the stream.
A single async task with error-aware utilities like retries and timeouts.
-
acquireRelease<R, T, E>(unnamed 0: { acquire: (signal?: AbortSignal) => Promise<R>; release: (resource: R) => Promise<void>; use: (resource: R) => Task<T, E>; }): Task<T, E>
Acquires a resource, runs a task that uses it, and releases it regardless of success or failure. Useful for resource lifecycle management when the use computation is a composed Task chain.
-
all<T, E>(tasks: Task<T, E>[]): Task<T[], E>
Runs multiple tasks and collects results. Rejects on the first failure.
-
allSettled<T, E>(tasks: Task<T, E>[]): Task<Result<T, E>[], never>
Runs multiple tasks and collects all results without throwing.
-
chain(arr: []): Task<void, never>
Runs tasks sequentially, passing the successful value from one to the next. If any task fails, the chain is short-circuited and the error is returned.
-
delayWithSignal(): Promise<void>ms: number,signal?: AbortSignal
-
flatMap<U, E2 = E>(fn: (value: T) => Task<U, E2>): Task<U, E | E2>
Chains another task based on the successful value.
-
flatMapErr<F>(fn: (error: E) => Task<T, F>): Task<T, F>
Chains another task based on the error value.
-
map<U, E2 = E>(fn: (value: T) => Promisable<U>): Task<U, E | E2>
Maps the successful value. Errors are passed through unchanged.
-
mapErr<F>(fn: (error: E) => Promisable<F>): Task<T, F>
Maps the error value. Successful values are passed through unchanged.
-
mergeExternalSignals(): { signal: AbortSignal; cleanup: () => void; }outer: AbortSignal,inner: AbortSignal
-
mergeSignals(): { signal: AbortSignal; cleanup: () => void; }outer: AbortSignal,inner?: AbortSignal
-
of<R, E>(task: (signal?: AbortSignal) => Promisable<R>): Task<R, E>
Creates a Task from a sync or async function. The function receives an optional AbortSignal that is active when the task is run with a signal via withSignal or run.
-
race<T, E>(tasks: Task<T, E>[]): Task<Result<T, E[]>, never>
Runs tasks concurrently and resolves with the first settled result.
-
recover<U>(fn: (error: E) => Promisable<U>): Task<T | U, never>
Recovers from errors by mapping them to a successful value.
-
recoverWhen<E2 extends E, U>(): Task<T | U, Exclude<E, E2>>guard: (error: E) => error is E2,fn: (error: E2) => Promisable<U>
Recovers from specific error types by mapping them to a successful value.
-
result(): Promise<Result<T, E>>
Executes the task and returns a tagged result instead of throwing.
-
retry(options: { attempts: number; delay?: number | (() => number); when?: (error: E) => boolean; }): Task<T, E>attempt: number,error: E
Retries the task when the predicate returns true.
-
run(): Promise<T>
Executes the task and resolves the value or throws the error.
- runTask: (signal?: AbortSignal) => Promise<T>
- runWithSignal(signal?: AbortSignal): Promise<T>
-
tap(fn: (value: T) => Promisable<void>): Task<T, E>
Runs a side effect on the successful value.
-
tapErr(fn: (error: E) => Promisable<void>): Task<T, E>
Runs a side effect on the error value.
-
timeout(): Task<T, E>ms: number,error?: E
Fails if the task does not complete within the specified time.
-
withSignal(signal: AbortSignal): Task<T, E>
Wraps the task with an external abort signal.
Options for appending events.
-
metadata: Record<string, unknown>
Custom metadata to attach to the event.
-
partitionKey: string
Key for partitioning and ordering. Events with the same key are ordered.
-
timestamp: number
Custom timestamp in milliseconds. Defaults to Date.now().
Options for consuming events.
-
batchSize: number
Maximum number of events per batch. Defaults to adapter-specific value.
-
bufferSize: number
Maximum number of batches to buffer. Defaults to adapter-specific value. When the buffer is full, new batches will be dropped and onError will be called with an EventLogConsumeFailed error.
-
cursor: Cursor | null
Cursor to resume from. If null, starts from the beginning.
-
signal: AbortSignal
Abort signal to cancel consumption.
A single event in the event log.
-
data: T
The event data payload.
-
id: string
Unique identifier for this event.
-
metadata: Record<string, unknown>
Optional metadata associated with the event.
-
partitionKey: string
Partition key for ordering guarantees.
-
sequenceNumber: string
Monotonically increasing sequence number within the topic. Represented as a string to support bigint while remaining serializable.
-
timestamp: number
Unix timestamp in milliseconds when the event was created.
-
topic: string
The topic this event belongs to.
A batch of events delivered to a consumer.
-
commit(): Promise<void>
Commit this batch's cursor to mark progress.
-
consumerGroup: string
The consumer group that received this batch.
-
cursor: Cursor
Cursor representing the position after this batch. Use for manual commits.
-
events: Event<T>[]
Events in this batch.
-
topic: string
The topic this batch was received from.
Low-level adapter interface for event log implementations.
-
append<T>(): Promise<string>topic: string,data: T,options?: AppendOptions
Append an event to a topic.
-
close(): Promise<void>
Close the adapter and release resources.
-
commitCursor(): Promise<void>topic: string,consumerGroup: string,cursor: Cursor
Commit a cursor for a consumer group.
-
consume<T>(): { close: () => Promise<void>; }topic: string,consumerGroup: string,onBatch: (batch: EventBatch<T, Cursor>) => Promisable<void>,onError: (error: EventLogConsumeFailed) => Promisable<void>,options?: ConsumeOptions<Cursor>
Consume events from a topic.
-
getCursor(): Promise<Cursor | null>topic: string,consumerGroup: string
Get the last committed cursor for a consumer group.
Factory for creating event log connections.
-
connect(signal?: AbortSignal): Promise<EventLogAdapter<Cursor>>
Connect to the event log.
-
end(): Promise<void>
End the connector and release all resources.
Configuration options for event log implementations.
-
defaultPartitionKey: string
Default partition key for events without explicit keys.
Creates an in-memory event log connector for testing and development.
Configuration options for in-memory event log.
A TypeScript library that provides a powerful and flexible way to handle errors in asynchronous streams. It allows you to collect and manage errors alongside successful values in a stream, enabling you to process data while gracefully handling any issues that may arise.
-
_getSource(): () => AsyncGenerator<Result<T, E>>
Internal method used to enable things like
zipThere's probably no reason to call this -
chunks(size: number): Stream<T[], E>
Collects consecutive successful values into fixed-size arrays. Errors pass through without breaking the current chunk.
-
collect(): Promise<T[]>
Collects all successful values emitted by the stream into an array. If any errors were collected during the stream processing, they will be thrown as an
AggregateErrorcontaining all collected errors. -
errors(): AsyncIterable<E>
Returns an async iterable of all errors collected during the stream processing. If any successful values were emitted during the stream processing, they will be ignored in this iterable.
-
filter<U extends T>(fn: () => value is U): Stream<U, E>value: T,arrivalIndex: number
Similar to
Array.prototype.filter, but works on the stream of results. If the provided function throws an error or returns a rejected promise, the error will be collected and emitted as an error result in the stream. -
filterErr<F extends E>(fn: () => error is F): Stream<T, F>error: E,errorIndex: number
Similar to
Array.prototype.filter, but works on the stream of errors. If the provided function throws an error or returns a rejected promise, the error will be collected and emitted as an error result in the stream. -
flatMap<U, E2 = E>(fn: () => Promisable<AsyncIterable<U> | Iterable<U>>): Stream<U, E | E2>value: T,arrivalIndex: number
Similar to
Array.prototype.flatMap, but works on the stream of results. If the provided function throws an error or returns a rejected promise, the error will be collected and emitted as an error result in the stream. -
flatten<U>(this: Stream<Iterable<U> | AsyncIterable<U>, E>): Stream<U, E>
Flattens a stream of iterables into a stream of individual values. If the provided function throws an error or returns a rejected promise, the error will be collected and emitted as an error result in the stream. Handles concurrency via the
flatMapmethod, so if the inner iterables are produced out of order due to concurrency, the flattened results will reflect that order. -
fold<U>(): Promise<U>fn: () => Promisable<U>,acc: U,value: T,arrivalIndex: numberinitialValue: U
Similar to
Array.prototype.reduce, but works on the stream of results. If the provided function throws an error or returns a rejected promise, the error will be collected and emitted as an error result in the stream. -
foldErr<F>(): Promise<F>fn: () => Promisable<F>,acc: F,error: E,errorIndex: numberinitialValue: F
Similar to
Array.prototype.reduce, but works on the stream of errors. If the provided function throws an error or returns a rejected promise, the new error will be collected and emitted as an error result in the stream. -
map<U, E2 = E>(fn: () => Promisable<U>): Stream<U, E | E2>value: T,arrivalIndex: number
Similar to
Array.prototype.map, but works on the stream of results. If the provided function throws an error or returns a rejected promise, the error will be collected and emitted as an error result in the stream. -
mapErr<F>(fn: () => Promisable<F>): Stream<T, F>error: E,errorIndex: number
Similar to
Array.prototype.map, but works on the stream of errors. If the provided function throws an error or returns a rejected promise, the new error will be collected and emitted as an error result in the stream. -
merge(other: Stream<T, E>): Stream<T, E>
Merges two streams by interleaving their results. Both streams must have compatible error types. The merged stream yields values from either stream as they become available.
-
partition(): Promise<{ successes: T[]; errors: E[]; }>
Collects all results into separate
successesanderrorsarrays. Unlikecollect(), this never throws. -
recover<U>(fn: () => Promisable<U>): Stream<T | U, never>error: E,errorIndex: number
Recovers from all errors by applying the provided function to transform them into successful values. This allows you to handle all errors gracefully while still collecting successful values in the stream.
-
recoverWhen<E2 extends E, U>(): Stream<T | U, Exclude<E, E2>>guard: () => error is E2,error: E,errorIndex: numberfn: () => Promisable<U>error: E2,recoveryIndex: number
Recovers from specific error types by applying the provided function to transform them into successful values. This allows you to handle specific errors gracefully while still collecting other errors in the stream.
-
scan<U>(): Stream<U, E>fn: () => Promisable<U>,acc: U,value: T,arrivalIndex: numberinitialValue: U
Like
foldbut emits the running accumulator after each successful value, allowing downstream operations to react to intermediate states. -
scanErr<F>(): Stream<T, F>fn: () => Promisable<F>,acc: F,error: E,errorIndex: numberinitialValue: F
Like
scanbut works on the stream of errors. Emits the running accumulator after each error, allowing downstream operations to react to intermediate error states. If the provided function throws an error or returns a rejected promise, the new error will be collected and emitted as an error result in the stream. -
splitBy<K extends string | number | symbol>(): Record<keys: readonly K[],cb: () => Promisable<K>,value: T,arrivalIndex: numberbufferSize: number,options?: SplitOptions>K,Stream<T, E | MissingKeyError | PumpError | NoKeysError>
Splits the stream into separate streams based on computed keys. Each result is sent to the stream corresponding to its computed key. If a result's key does not match any of the provided keys, an error result is emitted for that value. The split streams share the same source, so they are not independent; if one split stream is slower than the others, it will cause backpressure on the source stream and all other splits until it catches up. Use with caution to avoid unintended bottlenecks.
-
splitN(): [Stream<T, E | PumpError>]n: 0 | 1,bufferSize: number,options?: SplitOptions
Splits the stream into
nseparate streams that each receive the same results. -
successes(): AsyncIterable<T>
Returns an async iterable of all successful values emitted by the stream. If any errors were collected during the stream processing, they will be ignored in this iterable.
-
take(n: number): Stream<T, E>
Limits the stream to at most
nsuccessful values. Errors pass through without counting against the limit. Afternsuccesses are yielded, the stream stops immediately (any pending errors from earlier in the pipeline may still be yielded before stopping). -
takeWhile(fn: () => Promisable<boolean>): Stream<T, E>value: T,arrivalIndex: number
Yields successful values while the predicate returns true. Once the predicate returns false, iteration stops. Errors pass through until the stream is stopped. If the predicate throws, an error result is emitted and iteration stops.
-
tap(fn: () => Promisable<void>): Stream<T, E>value: T,arrivalIndex: number
Runs a side-effect function on each successful value without transforming it. If the provided function throws an error or returns a rejected promise, the error will be collected and emitted as an error result in place of the original value.
-
tapErr(fn: () => Promisable<void>): Stream<T, E>error: E,arrivalIndex: number
Runs a side-effect function on each error without transforming it. If the provided function throws an error or returns a rejected promise, the new error replaces the original.
-
throwOn<E2 extends E>(guard: (error: E) => error is E2): Stream<T, Exclude<E, E2>>
Throws the specified error types if they are encountered in the stream. This allows you to handle specific errors immediately while continuing to process other errors.
-
toArray(): Promise<Result<T, E>[]>
Collects all results emitted by the stream into an array of
Resultobjects, which can represent either successful values or errors. This method allows you to see the full outcome of the stream processing, including both successes and errors, without throwing an aggregate error. -
zip<U, F>(other: Stream<U, F>): Stream<[T, U], E | F>
Combines this stream with another into tuples, yielding one result per pair of values. The shorter stream determines when zipping completes.