Basic send with Task semantics
Basic send with Task semantics
import { Queue, createInMemory } from "@anabranch/queue"; const connector = createInMemory(); const queue = await Queue.connect(connector).run(); const messageId = await queue .send("notifications", { userId: 123, type: "welcome" }) .run();
Stream messages with concurrent processing and error collection
Stream messages with concurrent processing and error collection
const connector = createInMemory(); const queue = await Queue.connect(connector).run(); const { successes, errors } = await queue .stream("notifications") .withConcurrency(5) .map(async (msg) => await sendEmail(msg.data)) .tapErr((err) => logError(err)) .partition();
Delayed messages with visibility timeout
Delayed messages with visibility timeout
import { Queue, createInMemory } from "@anabranch/queue"; const connector = createInMemory({ visibilityTimeout: 60_000 }); const queue = await Queue.connect(connector).run(); await queue.send("notifications", reminder, { delayMs: 30_000 }).run();
Dead letter queue with max attempts
Dead letter queue with max attempts
import { Queue, createInMemory } from "@anabranch/queue"; const connector = createInMemory({ queues: { orders: { maxAttempts: 3, deadLetterQueue: "orders-dlq", }, }, }); const queue = await Queue.connect(connector).run(); await queue.nack("orders", msg.id, { deadLetter: true }).run();
Queue wrapper with Task/Stream semantics for error-tolerant message processing.
-
ack(): Task<void, QueueAckFailed>queue: string,...ids: string[]
Acknowledge one or more messages as successfully processed.
-
close(): Task<void, QueueCloseFailed>
Release the connection back to its source (e.g., pool).
-
connect(connector: QueueConnector): Task<Queue, QueueConnectionFailed>
Connect to a queue via a connector.
-
continuousStream<T>(): Source<QueueMessage<T>, QueueReceiveFailed>queue: string,options?: { count?: number; signal?: AbortSignal; prefetch?: number; backoff?: { initialDelay?: number; multiplier?: number; maxDelay?: number; }; }
Continuous stream that polls for messages until stopped.
-
nack(): Task<void, QueueAckFailed | QueueMaxAttemptsExceeded>queue: string,id: string,options?: { requeue?: boolean; delay?: number; deadLetter?: boolean; }
Negative acknowledgment - indicates processing failure.
-
send<T>(): Task<string, QueueSendFailed>queue: string,data: T,options?: SendOptions
Send a message to a queue.
-
sendBatch<T>(): Task<string[], QueueSendFailed>queue: string,data: T[],options?: SendOptions & { parallel?: boolean; }
Send multiple messages to a queue in batch.
-
stream<T>(): Source<QueueMessage<T>, QueueReceiveFailed>queue: string,options?: { count?: number; }
Stream messages from a queue for memory-efficient concurrent processing.
Operation was aborted via an AbortSignal.
Acknowledgment operation failed.
Operation failed because the queue's internal buffer is full.
Connection close operation failed.
Configuration or capability error.
Connection establishment failed.
Base error for all queue-related failures.
Message exceeded maximum delivery attempts.
Negative acknowledgment operation failed.
Message receive operation failed.
Message send operation failed.
The entry point for creating a Stream. Wraps an async generator so that yielded values become success results and any thrown error becomes a single error result.
-
from<T, E>(source: AsyncIterable<T>): Source<T, E>
Creates a Source from an existing
AsyncIterableor async generator function. Each value emitted becomes a success result; any thrown error becomes an error result. -
fromArray<T>(items: T[]): Source<T, never>
Creates a Source from an array of values.
-
fromRange(): Source<number, never>start: number,end: number
Creates a Source that emits a range of numbers from
start(inclusive) toend(exclusive). Each number is emitted as a success result. -
fromResults<T, E>(source: () => AsyncGenerator<Result<T, E>>): Source<T, E>
Creates a Source from an async generator that yields Result values directly. This is useful when you want to yield both successes and errors from the source without terminating it on the first error.
-
fromSchedule(): Source<Tick, never>cron: string,options?: { signal?: AbortSignal; }
Creates a Source that yields a Tick on each cron schedule match. Supports 5-field (minute) and 6-field (second) cron expressions, day/month names (MON-FRI, JAN), and aliases (@daily, @hourly).
-
fromTask<T, E>(task: Task<T, E>): Source<T, E>
Creates a Source from a Task.
-
withBufferSize(n: number): Source<T, E>
Sets the maximum number of buffered results before backpressure is applied to the stream. If the buffer is full, the stream will pause until there is space in the buffer for new results.
-
withConcurrency(n: number): Source<T, E>
Sets the maximum number of concurrent operations for the stream.
A single async task with error-aware utilities like retries and timeouts.
-
acquireRelease<R, T, E>(unnamed 0: { acquire: (signal?: AbortSignal) => Promise<R>; release: (resource: R) => Promise<void>; use: (resource: R) => Task<T, E>; }): Task<T, E>
Acquires a resource, runs a task that uses it, and releases it regardless of success or failure. Useful for resource lifecycle management when the use computation is a composed Task chain.
-
all<T, E>(tasks: Task<T, E>[]): Task<T[], E>
Runs multiple tasks and collects results. Rejects on the first failure.
-
allSettled<T, E>(tasks: Task<T, E>[]): Task<Result<T, E>[], never>
Runs multiple tasks and collects all results without throwing.
-
chain(arr: []): Task<void, never>
Runs tasks sequentially, passing the successful value from one to the next. If any task fails, the chain is short-circuited and the error is returned.
-
delayWithSignal(): Promise<void>ms: number,signal?: AbortSignal
-
flatMap<U, E2 = E>(fn: (value: T) => Task<U, E2>): Task<U, E | E2>
Chains another task based on the successful value.
-
flatMapErr<F>(fn: (error: E) => Task<T, F>): Task<T, F>
Chains another task based on the error value.
-
map<U, E2 = E>(fn: (value: T) => Promisable<U>): Task<U, E | E2>
Maps the successful value. Errors are passed through unchanged.
-
mapErr<F>(fn: (error: E) => Promisable<F>): Task<T, F>
Maps the error value. Successful values are passed through unchanged.
-
mergeExternalSignals(): { signal: AbortSignal; cleanup: () => void; }outer: AbortSignal,inner: AbortSignal
-
mergeSignals(): { signal: AbortSignal; cleanup: () => void; }outer: AbortSignal,inner?: AbortSignal
-
of<R, E>(task: (signal?: AbortSignal) => Promisable<R>): Task<R, E>
Creates a Task from a sync or async function. The function receives an optional AbortSignal that is active when the task is run with a signal via withSignal or run.
-
race<T, E>(tasks: Task<T, E>[]): Task<Result<T, E[]>, never>
Runs tasks concurrently and resolves with the first settled result.
-
recover<U>(fn: (error: E) => Promisable<U>): Task<T | U, never>
Recovers from errors by mapping them to a successful value.
-
recoverWhen<E2 extends E, U>(): Task<T | U, Exclude<E, E2>>guard: (error: E) => error is E2,fn: (error: E2) => Promisable<U>
Recovers from specific error types by mapping them to a successful value.
-
result(): Promise<Result<T, E>>
Executes the task and returns a tagged result instead of throwing.
-
retry(options: { attempts: number; delay?: number | (() => number); when?: (error: E) => boolean; }): Task<T, E>attempt: number,error: E
Retries the task when the predicate returns true.
-
run(): Promise<T>
Executes the task and resolves the value or throws the error.
- runTask: (signal?: AbortSignal) => Promise<T>
- runWithSignal(signal?: AbortSignal): Promise<T>
-
tap(fn: (value: T) => Promisable<void>): Task<T, E>
Runs a side effect on the successful value.
-
tapErr(fn: (error: E) => Promisable<void>): Task<T, E>
Runs a side effect on the error value.
-
timeout(): Task<T, E>ms: number,error?: E
Fails if the task does not complete within the specified time.
-
withSignal(signal: AbortSignal): Task<T, E>
Wraps the task with an external abort signal.
Creates an in-memory queue connector using a simple message store.
Options for negative acknowledgment.
-
deadLetter: boolean
Explicit dead letter queue target
-
delay: number
Delay before the message is requeued
-
requeue: boolean
Requeue the message instead of dead-letter routing
Queue adapter interface for low-level queue operations.
-
ack(): Promise<void>queue: string,...ids: string[]
Acknowledge that one or more messages have been successfully processed. The broker should remove these messages from the queue.
-
close(): Promise<void>
Release any resources held by this adapter instance. Does not necessarily terminate the underlying connection (which is managed by the Connector).
-
nack(): Promise<void>queue: string,id: string,options?: NackOptions
Indicate that a message processing failed.
-
receive<T>(): Promise<QueueMessage<T>[]>queue: string,count?: number
Retrieve one or more messages from the specified queue.
-
send<T>(): Promise<string>queue: string,data: T,options?: SendOptions
Send a single message to the specified queue.
-
sendBatch<T>(): Promise<string[]>queue: string,data: T[],options?: SendOptions
Send a batch of messages to the specified queue. Implementation should optimize this operation if possible (e.g. using pipelines or batch APIs).
Connector that produces connected QueueAdapter instances.
-
connect(signal?: AbortSignal): Promise<QueueAdapter>
Acquire a connected adapter.
-
end(): Promise<void>
Close all connections and clean up resources. After calling end(), the connector cannot be used to create new adapters.
Queue adapter interface for queue-agnostic operations.
-
attempt: number
Number of times this message has been delivered
-
data: T
Message payload.
-
id: string
Unique message identifier
-
metadata: { [key: string]: unknown; headers?: Record<string, string>; }
Optional metadata from the broker
-
timestamp: number
Timestamp when the message was first enqueued
Queue configuration options.
-
deadLetterOptions: { maxAttempts?: number; delay?: number; }
Dead letter queue specific options
-
deadLetterQueue: string
Default dead letter queue for failed messages
-
maxAttempts: number
Maximum delivery attempts before routing to dead letter queue
-
visibilityTimeout: number
Message visibility timeout (milliseconds) - time between delivery and ACK/NACK
Options for sending a message with delay or scheduling.
-
deadLetterQueue: string
Override the default dead letter queue for this message
-
delayMs: number
Delay in milliseconds before the message becomes available
-
headers: Record<string, string>
Arbitrary key-value pairs to attach to the message
-
priority: number
Message priority (higher = more important, if supported)
-
scheduledAt: Date
Explicit scheduled delivery time
A TypeScript library that provides a powerful and flexible way to handle errors in asynchronous streams. It allows you to collect and manage errors alongside successful values in a stream, enabling you to process data while gracefully handling any issues that may arise.
-
_getSource(): () => AsyncGenerator<Result<T, E>>
Internal method used to enable things like
zipThere's probably no reason to call this -
chunks(size: number): Stream<T[], E>
Collects consecutive successful values into fixed-size arrays. Errors pass through without breaking the current chunk.
-
collect(): Promise<T[]>
Collects all successful values emitted by the stream into an array. If any errors were collected during the stream processing, they will be thrown as an
AggregateErrorcontaining all collected errors. -
errors(): AsyncIterable<E>
Returns an async iterable of all errors collected during the stream processing. If any successful values were emitted during the stream processing, they will be ignored in this iterable.
-
filter<U extends T>(fn: () => value is U): Stream<U, E>value: T,arrivalIndex: number
Similar to
Array.prototype.filter, but works on the stream of results. If the provided function throws an error or returns a rejected promise, the error will be collected and emitted as an error result in the stream. -
filterErr<F extends E>(fn: () => error is F): Stream<T, F>error: E,errorIndex: number
Similar to
Array.prototype.filter, but works on the stream of errors. If the provided function throws an error or returns a rejected promise, the error will be collected and emitted as an error result in the stream. -
flatMap<U, E2 = E>(fn: () => Promisable<AsyncIterable<U> | Iterable<U>>): Stream<U, E | E2>value: T,arrivalIndex: number
Similar to
Array.prototype.flatMap, but works on the stream of results. If the provided function throws an error or returns a rejected promise, the error will be collected and emitted as an error result in the stream. -
flatten<U>(this: Stream<Iterable<U> | AsyncIterable<U>, E>): Stream<U, E>
Flattens a stream of iterables into a stream of individual values. If the provided function throws an error or returns a rejected promise, the error will be collected and emitted as an error result in the stream. Handles concurrency via the
flatMapmethod, so if the inner iterables are produced out of order due to concurrency, the flattened results will reflect that order. -
fold<U>(): Promise<U>fn: () => Promisable<U>,acc: U,value: T,arrivalIndex: numberinitialValue: U
Similar to
Array.prototype.reduce, but works on the stream of results. If the provided function throws an error or returns a rejected promise, the error will be collected and emitted as an error result in the stream. -
foldErr<F>(): Promise<F>fn: () => Promisable<F>,acc: F,error: E,errorIndex: numberinitialValue: F
Similar to
Array.prototype.reduce, but works on the stream of errors. If the provided function throws an error or returns a rejected promise, the new error will be collected and emitted as an error result in the stream. -
map<U, E2 = E>(fn: () => Promisable<U>): Stream<U, E | E2>value: T,arrivalIndex: number
Similar to
Array.prototype.map, but works on the stream of results. If the provided function throws an error or returns a rejected promise, the error will be collected and emitted as an error result in the stream. -
mapErr<F>(fn: () => Promisable<F>): Stream<T, F>error: E,errorIndex: number
Similar to
Array.prototype.map, but works on the stream of errors. If the provided function throws an error or returns a rejected promise, the new error will be collected and emitted as an error result in the stream. -
merge(other: Stream<T, E>): Stream<T, E>
Merges two streams by interleaving their results. Both streams must have compatible error types. The merged stream yields values from either stream as they become available.
-
partition(): Promise<{ successes: T[]; errors: E[]; }>
Collects all results into separate
successesanderrorsarrays. Unlikecollect(), this never throws. -
recover<U>(fn: () => Promisable<U>): Stream<T | U, never>error: E,errorIndex: number
Recovers from all errors by applying the provided function to transform them into successful values. This allows you to handle all errors gracefully while still collecting successful values in the stream.
-
recoverWhen<E2 extends E, U>(): Stream<T | U, Exclude<E, E2>>guard: () => error is E2,error: E,errorIndex: numberfn: () => Promisable<U>error: E2,recoveryIndex: number
Recovers from specific error types by applying the provided function to transform them into successful values. This allows you to handle specific errors gracefully while still collecting other errors in the stream.
-
scan<U>(): Stream<U, E>fn: () => Promisable<U>,acc: U,value: T,arrivalIndex: numberinitialValue: U
Like
foldbut emits the running accumulator after each successful value, allowing downstream operations to react to intermediate states. -
scanErr<F>(): Stream<T, F>fn: () => Promisable<F>,acc: F,error: E,errorIndex: numberinitialValue: F
Like
scanbut works on the stream of errors. Emits the running accumulator after each error, allowing downstream operations to react to intermediate error states. If the provided function throws an error or returns a rejected promise, the new error will be collected and emitted as an error result in the stream. -
splitBy<K extends string | number | symbol>(): Record<keys: readonly K[],cb: () => Promisable<K>,value: T,arrivalIndex: numberbufferSize: number,options?: SplitOptions>K,Stream<T, E | MissingKeyError | PumpError | NoKeysError>
Splits the stream into separate streams based on computed keys. Each result is sent to the stream corresponding to its computed key. If a result's key does not match any of the provided keys, an error result is emitted for that value. The split streams share the same source, so they are not independent; if one split stream is slower than the others, it will cause backpressure on the source stream and all other splits until it catches up. Use with caution to avoid unintended bottlenecks.
-
splitN(): [Stream<T, E | PumpError>]n: 0 | 1,bufferSize: number,options?: SplitOptions
Splits the stream into
nseparate streams that each receive the same results. -
successes(): AsyncIterable<T>
Returns an async iterable of all successful values emitted by the stream. If any errors were collected during the stream processing, they will be ignored in this iterable.
-
take(n: number): Stream<T, E>
Limits the stream to at most
nsuccessful values. Errors pass through without counting against the limit. Afternsuccesses are yielded, the stream stops immediately (any pending errors from earlier in the pipeline may still be yielded before stopping). -
takeWhile(fn: () => Promisable<boolean>): Stream<T, E>value: T,arrivalIndex: number
Yields successful values while the predicate returns true. Once the predicate returns false, iteration stops. Errors pass through until the stream is stopped. If the predicate throws, an error result is emitted and iteration stops.
-
tap(fn: () => Promisable<void>): Stream<T, E>value: T,arrivalIndex: number
Runs a side-effect function on each successful value without transforming it. If the provided function throws an error or returns a rejected promise, the error will be collected and emitted as an error result in place of the original value.
-
tapErr(fn: () => Promisable<void>): Stream<T, E>error: E,arrivalIndex: number
Runs a side-effect function on each error without transforming it. If the provided function throws an error or returns a rejected promise, the new error replaces the original.
-
throwOn<E2 extends E>(guard: (error: E) => error is E2): Stream<T, Exclude<E, E2>>
Throws the specified error types if they are encountered in the stream. This allows you to handle specific errors immediately while continuing to process other errors.
-
toArray(): Promise<Result<T, E>[]>
Collects all results emitted by the stream into an array of
Resultobjects, which can represent either successful values or errors. This method allows you to see the full outcome of the stream processing, including both successes and errors, without throwing an aggregate error. -
zip<U, F>(other: Stream<U, F>): Stream<[T, U], E | F>
Combines this stream with another into tuples, yielding one result per pair of values. The shorter stream determines when zipping completes.
Extended adapter interface for broker-native streaming. Implement this if your broker has push-based message consumption (e.g., RabbitMQ channels, Kafka consumer groups, SQS long polling).