method EventLog.prototype.consume
EventLog.prototype.consume<T>(
topic: string,
consumerGroup: string,
options?: ConsumeOptions<Cursor>,
): Channel<EventBatch<T, Cursor>, EventLogConsumeFailed>

Consume events from a topic as a stream.

Returns a Channel that yields batches of events. Each batch includes a cursor that can be committed to mark progress. Use stream methods like withConcurrency(), map(), and partition() for processing.

Batches are delivered asynchronously as they become available. Use take() to limit iterations or pass an AbortSignal in options to cancel consumption.

Examples

Example 1

const ac = new AbortController();

await log.consume("users", "processor-1", { signal: ac.signal })
  .withConcurrency(10)
  .map(async (batch) => {
    for (const event of batch.events) {
      await processUser(event.data);
    }
    await batch.commit(); // Mark progress
  })
  .partition();

ac.abort(); // Stop consumption

Resume from a saved cursor

const cursor = await log.getCommittedCursor("users", "processor-1").run();
const stream = log.consume("users", "processor-1", { cursor });

Type Parameters

Parameters

topic: string
consumerGroup: string
optional
options: ConsumeOptions<Cursor>

Return Type

Channel<EventBatch<T, Cursor>, EventLogConsumeFailed>