interface KafkaOptions
extends KafkaConfig

Configuration options for Kafka event log connector.

Extends KafkaJS configuration with additional event log options.

Examples

Basic configuration

const connector = createKafka({
  brokers: ["localhost:9092"],
  clientId: "my-app",
  consumer: {
    groupId: "my-consumer-group",
  },
});

With SASL authentication

const connector = createKafka({
  brokers: ["broker.confluent.cloud:9092"],
  clientId: "my-app",
  sasl: {
    mechanism: "plain",
    username: "apiKey",
    password: "apiSecret",
  },
  ssl: true,
  consumer: {
    groupId: "my-consumer-group",
  },
});

Properties

optional
onMalformedMessage: (
topic: string,
partition: number,
offset: string,
raw: string,
) => void

Callback for handling malformed messages that cannot be parsed.

Called when a message cannot be deserialized. If not provided, malformed messages are silently skipped.

optional
producer: ProducerConfig
optional
admin: AdminConfig & { dummyMaxWaitTimeInMs?: number; }
consumer: Omit<ConsumerConfig, "groupId">

Kafka consumer configuration. Must include groupId.