Queue wrapper with Task/Stream semantics for error-tolerant message processing.
Basic usage
Basic usage
import { Queue, createInMemory } from "@anabranch/queue"; const connector = createInMemory(); const queue = await Queue.connect(connector).run(); await queue.send("notifications", { userId: 123 }).run(); await queue.ack("notifications", msg.id).run(); await queue.close().run();
Queue(adapter: QueueAdapter)
connect(connector: QueueConnector): Task<Queue, QueueConnectionFailed>
Connect to a queue via a connector.
ack(queue: string,...ids: string[],): Task<void, QueueAckFailed>
Acknowledge one or more messages as successfully processed.
close(): Task<void, QueueCloseFailed>
Release the connection back to its source (e.g., pool).
continuousStream<T>(queue: string,options?: { count?: number; signal?: AbortSignal; prefetch?: number; backoff?: { initialDelay?: number; multiplier?: number; maxDelay?: number; }; },): Source<QueueMessage<T>, QueueReceiveFailed>
Continuous stream that polls for messages until stopped.
Uses broker-native subscribe if available (StreamAdapter), otherwise falls back to polling-based implementation. Errors from the adapter are emitted as error results, allowing pipeline-style error handling with .recover(), .tapErr(), etc. Use an AbortSignal to stop.
nack(queue: string,id: string,options?: { requeue?: boolean; delay?: number; deadLetter?: boolean; },): Task<void, QueueAckFailed | QueueMaxAttemptsExceeded>
Negative acknowledgment - indicates processing failure.
send<T>(): Task<string, QueueSendFailed>
Send a message to a queue.
sendBatch<T>(): Task<string[], QueueSendFailed>
Send multiple messages to a queue in batch.
stream<T>(queue: string,options?: { count?: number; },): Source<QueueMessage<T>, QueueReceiveFailed>
Stream messages from a queue for memory-efficient concurrent processing.
Messages are delivered one at a time but can be processed concurrently
using withConcurrency(). Errors are collected alongside successes,
allowing the stream to continue processing while you decide how to handle
failures later.