Event log wrapper with Task/Stream semantics for event-sourced systems.
Provides high-level methods for appending events, consuming streams, and managing cursors. All operations return Tasks for composable error handling.
Basic usage
Basic usage
import { EventLog, createInMemory } from "@anabranch/eventlog"; const connector = createInMemory(); const log = await EventLog.connect(connector).run(); // Append an event const eventId = await log.append("users", { userId: 123 }).run(); // Consume events as a stream const { successes, errors } = await log .consume("users", "my-processor") .withConcurrency(5) .map(async (batch) => { for (const event of batch.events) { await processEvent(event.data); } }) .partition(); await log.close().run();
EventLog(adapter: EventLogAdapter<Cursor>)
connect<Cursor = string>(connector: EventLogConnector<Cursor>): Task<EventLog<Cursor>, EventLogConnectionFailed>
Connect to an event log via a connector.
append<T>(): Task<string, EventLogAppendFailed>
Append an event to a topic.
Returns the event ID which can be used for logging or correlation.
close(): Task<void, EventLogCloseFailed>
Close the event log connection.
After closing, no further operations can be performed on this instance.
commit(): Task<void, EventLogCommitCursorFailed>
Commit a cursor to mark progress for a consumer group.
This is for administrative use cases where you can't commit in-band, preferably when you're
not actively consuming events. For example, you might want to skip ahead after a downtime or reset to the beginning for reprocessing.
Do prefer to commit in-band, i.e. after processing each batch, by calling batch.commit().
After processing events, commit the cursor to resume from that position
on the next run. Cursors are obtained from batch.cursor in the consume
stream or from getCommittedCursor().
consume<T>(): Channel<EventBatch<T, Cursor>, EventLogConsumeFailed>
Consume events from a topic as a stream.
Returns a Channel that yields batches of events. Each batch includes
a cursor that can be committed to mark progress. Use stream methods
like withConcurrency(), map(), and partition() for processing.
Batches are delivered asynchronously as they become available. Use
take() to limit iterations or pass an AbortSignal in options to
cancel consumption.
getCommittedCursor(topic: string,consumerGroup: string,): Task<Cursor | null, EventLogGetCursorFailed>
Get the last committed cursor for a consumer group.
Returns null if no cursor has been committed yet. Use this to resume consumption from the last processed position.