EventLog.prototype.consume<T>(topic: string,consumerGroup: string,options?: ConsumeOptions<Cursor>,): Channel<EventBatch<T, Cursor>, EventLogConsumeFailed>
Consume events from a topic as a stream.
Returns a Channel that yields batches of events. Each batch includes
a cursor that can be committed to mark progress. Use stream methods
like withConcurrency(), map(), and partition() for processing.
Batches are delivered asynchronously as they become available. Use
take() to limit iterations or pass an AbortSignal in options to
cancel consumption.
Example 1
Example 1
const ac = new AbortController(); await log.consume("users", "processor-1", { signal: ac.signal }) .withConcurrency(10) .map(async (batch) => { for (const event of batch.events) { await processUser(event.data); } await batch.commit(); // Mark progress }) .partition(); ac.abort(); // Stop consumption
Channel<EventBatch<T, Cursor>, EventLogConsumeFailed>