method Queue.prototype.stream
Queue.prototype.stream<T>(
queue: string,
options?: { count?: number; },
): Source<QueueMessage<T>, QueueReceiveFailed>

Stream messages from a queue for memory-efficient concurrent processing.

Messages are delivered one at a time but can be processed concurrently using withConcurrency(). Errors are collected alongside successes, allowing the stream to continue processing while you decide how to handle failures later.

Examples

Example 1

const { successes, errors } = await queue.stream("notifications")
  .withConcurrency(10)
  .map(async (msg) => await sendNotification(msg.data))
  .tapErr((err) => console.error("Failed:", err))
  .partition();

Type Parameters

Parameters

queue: string
optional
options: { count?: number; }

Return Type

Usage

import { Queue } from ".";