default

Examples

Basic send with Task semantics

import { Queue, createInMemory } from "@anabranch/queue";

const connector = createInMemory();
const queue = await Queue.connect(connector).run();

const messageId = await queue
  .send("notifications", { userId: 123, type: "welcome" })
  .run();

Stream messages with concurrent processing and error collection

const connector = createInMemory();
const queue = await Queue.connect(connector).run();

const { successes, errors } = await queue
  .stream("notifications")
  .withConcurrency(5)
  .map(async (msg) => await sendEmail(msg.data))
  .tapErr((err) => logError(err))
  .partition();

Delayed messages with visibility timeout

import { Queue, createInMemory } from "@anabranch/queue";

const connector = createInMemory({ visibilityTimeout: 60_000 });
const queue = await Queue.connect(connector).run();

await queue.send("notifications", reminder, { delayMs: 30_000 }).run();

Dead letter queue with max attempts

import { Queue, createInMemory } from "@anabranch/queue";

const connector = createInMemory({
  queues: {
    orders: {
      maxAttempts: 3,
      deadLetterQueue: "orders-dlq",
    },
  },
});
const queue = await Queue.connect(connector).run();

await queue.nack("orders", msg.id, { deadLetter: true }).run();

Classes

c
QueueAborted(
message?: string,
queue?: string
)

Operation was aborted via an AbortSignal.

c
QueueBufferFull(queue: string)

Operation failed because the queue's internal buffer is full.

c
QueueConfigError(
message: string,
queue?: string
)

Configuration or capability error.

c
QueueError(
message: string,
queue?: string,
messageId?: string
)

Base error for all queue-related failures.

c
Source<T, E>(
resultSource: () => AsyncGenerator<Result<T, E>>,
concurrency?: number,
bufferSize?: number
)

The entry point for creating a Stream. Wraps an async generator so that yielded values become success results and any thrown error becomes a single error result.

c
Task<T, E>(task: (signal?: AbortSignal) => Promise<T>)

A single async task with error-aware utilities like retries and timeouts.

Functions

f
createInMemory(options?: InMemoryOptions): InMemoryConnector

Creates an in-memory queue connector using a simple message store.

Interfaces

I
NackOptions

Options for negative acknowledgment.

I
QueueAdapter

Queue adapter interface for low-level queue operations.

I
QueueConnector

Connector that produces connected QueueAdapter instances.

I
QueueMessage

Queue adapter interface for queue-agnostic operations.

I
QueueOptions

Queue configuration options.

I
SendOptions

Options for sending a message with delay or scheduling.

I
Stream

A TypeScript library that provides a powerful and flexible way to handle errors in asynchronous streams. It allows you to collect and manage errors alongside successful values in a stream, enabling you to process data while gracefully handling any issues that may arise.

I
StreamAdapter

Extended adapter interface for broker-native streaming. Implement this if your broker has push-based message consumption (e.g., RabbitMQ channels, Kafka consumer groups, SQS long polling).