class EventLog

Event log wrapper with Task/Stream semantics for event-sourced systems.

Provides high-level methods for appending events, consuming streams, and managing cursors. All operations return Tasks for composable error handling.

Examples

Basic usage

import { EventLog, createInMemory } from "@anabranch/eventlog";

const connector = createInMemory();
const log = await EventLog.connect(connector).run();

// Append an event
const eventId = await log.append("users", { userId: 123 }).run();

// Consume events as a stream
const { successes, errors } = await log
  .consume("users", "my-processor")
  .withConcurrency(5)
  .map(async (batch) => {
    for (const event of batch.events) {
      await processEvent(event.data);
    }
  })
  .partition();

await log.close().run();

Manual cursor management

// Get current cursor position
const cursor = await log.getCommittedCursor("users", "my-processor").run();

// Save cursor after processing
await log.commit("users", "my-processor", batch.cursor).run();

Constructors

new
EventLog(adapter: EventLogAdapter<Cursor>)

Type Parameters

Cursor = string

Static Methods

connect<Cursor = string>(connector: EventLogConnector<Cursor>): Task<EventLog<Cursor>, EventLogConnectionFailed>

Connect to an event log via a connector.

Methods

append<T>(
topic: string,
data: T,
options?: AppendOptions,
): Task<string, EventLogAppendFailed>

Append an event to a topic.

Returns the event ID which can be used for logging or correlation.

close(): Task<void, EventLogCloseFailed>

Close the event log connection.

After closing, no further operations can be performed on this instance.

commit(
topic: string,
consumerGroup: string,
cursor: Cursor,
): Task<void, EventLogCommitCursorFailed>

Commit a cursor to mark progress for a consumer group.

This is for administrative use cases where you can't commit in-band, preferably when you're not actively consuming events. For example, you might want to skip ahead after a downtime or reset to the beginning for reprocessing. Do prefer to commit in-band, i.e. after processing each batch, by calling batch.commit().

After processing events, commit the cursor to resume from that position on the next run. Cursors are obtained from batch.cursor in the consume stream or from getCommittedCursor().

consume<T>(
topic: string,
consumerGroup: string,
options?: ConsumeOptions<Cursor>,
): Channel<EventBatch<T, Cursor>, EventLogConsumeFailed>

Consume events from a topic as a stream.

Returns a Channel that yields batches of events. Each batch includes a cursor that can be committed to mark progress. Use stream methods like withConcurrency(), map(), and partition() for processing.

Batches are delivered asynchronously as they become available. Use take() to limit iterations or pass an AbortSignal in options to cancel consumption.

getCommittedCursor(
topic: string,
consumerGroup: string,
): Task<Cursor | null, EventLogGetCursorFailed>

Get the last committed cursor for a consumer group.

Returns null if no cursor has been committed yet. Use this to resume consumption from the last processed position.