class Queue

Queue wrapper with Task/Stream semantics for error-tolerant message processing.

Examples

Basic usage

import { Queue, createInMemory } from "@anabranch/queue";

const connector = createInMemory();
const queue = await Queue.connect(connector).run();

await queue.send("notifications", { userId: 123 }).run();
await queue.ack("notifications", msg.id).run();

await queue.close().run();

Stream with concurrent processing

const { successes, errors } = await queue.stream("notifications")
  .withConcurrency(5)
  .map(async (msg) => await sendEmail(msg.data))
  .tapErr((err) => logError(err))
  .partition();

Continuous streaming

const ac = new AbortController();

queue.continuousStream("notifications", { signal: ac.signal })
  .withConcurrency(5)
  .tap(async (msg) => await processMessage(msg))
  .collect();

ac.abort();

Constructors

new
Queue(adapter: QueueAdapter)

Static Methods

Connect to a queue via a connector.

Methods

ack(
queue: string,
...ids: string[],
): Task<void, QueueAckFailed>

Acknowledge one or more messages as successfully processed.

Release the connection back to its source (e.g., pool).

continuousStream<T>(
queue: string,
options?: { count?: number; signal?: AbortSignal; prefetch?: number; backoff?: { initialDelay?: number; multiplier?: number; maxDelay?: number; }; },
): Source<QueueMessage<T>, QueueReceiveFailed>

Continuous stream that polls for messages until stopped.

Uses broker-native subscribe if available (StreamAdapter), otherwise falls back to polling-based implementation. Errors from the adapter are emitted as error results, allowing pipeline-style error handling with .recover(), .tapErr(), etc. Use an AbortSignal to stop.

nack(
queue: string,
id: string,
options?: { requeue?: boolean; delay?: number; deadLetter?: boolean; },
): Task<void, QueueAckFailed | QueueMaxAttemptsExceeded>

Negative acknowledgment - indicates processing failure.

send<T>(
queue: string,
data: T,
options?: SendOptions,
): Task<string, QueueSendFailed>

Send a message to a queue.

sendBatch<T>(
queue: string,
data: T[],
options?: SendOptions & { parallel?: boolean; },
): Task<string[], QueueSendFailed>

Send multiple messages to a queue in batch.

stream<T>(
queue: string,
options?: { count?: number; },
): Source<QueueMessage<T>, QueueReceiveFailed>

Stream messages from a queue for memory-efficient concurrent processing.

Messages are delivered one at a time but can be processed concurrently using withConcurrency(). Errors are collected alongside successes, allowing the stream to continue processing while you decide how to handle failures later.