function createKafka
createKafka(options: KafkaOptions): KafkaConnector

Creates a Kafka event log connector for production use.

Supports Apache Kafka, Confluent Cloud, Redpanda, and other Kafka-compatible services. Topics are auto-created on first append if they don't exist.

Examples

Basic usage

import { EventLog } from "@anabranch/eventlog";
import { createKafka } from "@anabranch/eventlog-kafka";

const connector = createKafka({
  brokers: ["localhost:9092"],
  clientId: "my-app",
  consumer: {
    groupId: "my-consumer-group",
  },
});

const log = await EventLog.connect(connector).run();

With SASL authentication (Confluent Cloud)

const connector = createKafka({
  brokers: ["broker.confluent.cloud:9092"],
  clientId: "my-app",
  sasl: {
    mechanism: "plain",
    username: process.env.KAFKA_API_KEY,
    password: process.env.KAFKA_API_SECRET,
  },
  ssl: true,
  consumer: {
    groupId: "my-consumer-group",
  },
});

Handle malformed messages

const connector = createKafka({
  brokers: ["localhost:9092"],
  clientId: "my-app",
  consumer: { groupId: "my-group" },
  onMalformedMessage: (topic, partition, offset, raw) => {
    console.error(`Malformed message at ${topic}[${partition}]@${offset}: ${raw}`);
  },
});

Parameters

Return Type