method Queue.prototype.continuousStream
Queue.prototype.continuousStream<T>(
queue: string,
options?: { count?: number; signal?: AbortSignal; prefetch?: number; backoff?: { initialDelay?: number; multiplier?: number; maxDelay?: number; }; },
): Source<QueueMessage<T>, QueueReceiveFailed>

Continuous stream that polls for messages until stopped.

Uses broker-native subscribe if available (StreamAdapter), otherwise falls back to polling-based implementation. Errors from the adapter are emitted as error results, allowing pipeline-style error handling with .recover(), .tapErr(), etc. Use an AbortSignal to stop.

Examples

Example 1

const ac = new AbortController();

queue.continuousStream("notifications", { signal: ac.signal, count: 5 })
  .withConcurrency(10)
  .tap(async (msg) => await processMessage(msg))
  .tapErr((err) => console.error("Receive failed:", err.message))
  .collect();

ac.abort();

Type Parameters

Parameters

queue: string
optional
options: { count?: number; signal?: AbortSignal; prefetch?: number; backoff?: { initialDelay?: number; multiplier?: number; maxDelay?: number; }; }

Return Type

Usage

import { Queue } from ".";