Work in Progress: This page is under development. Use the feedback button on the bottom right to help us improve it.

Kafka

Apache Kafka connector for reading from and writing to Kafka topics. Supports SASL authentication, AWS MSK IAM, and Schema Registry integration.

Quick Example

apiVersion: laminar.io/v1
kind: Profile
spec:
  name: my-kafka
  connector: kafka
  config:
    bootstrap_servers: localhost:9092
    authentication: {}
 
---
apiVersion: laminar.io/v1
kind: Table
spec:
  name: events
  connector: kafka
  connection_profile_id: my-kafka
  config:
    topic: user-events
    type:
      source_config:
        offset: latest
  schema:
    format:
      json: {}
    fields:
      - field_name: user_id
        field_type:
          type:
            primitive: Int64
        nullable: false
      - field_name: event_type
        field_type:
          type:
            primitive: Utf8
        nullable: false

Configuration

Required

PropertyTypeDescription
bootstrap_serversstringComma-separated list of Kafka brokers (e.g., broker1:9092,broker2:9092)
authenticationobjectAuthentication method (see below)

Authentication Options

No Authentication

authentication: {}

SASL

authentication:
  protocol: SASL_SSL          # or SASL_PLAINTEXT
  mechanism: SCRAM-SHA-256    # or SCRAM-SHA-512, PLAIN
  username: your-username
  password: your-password

AWS MSK IAM

authentication:
  region: us-east-1

Optional

PropertyTypeDescription
schema_registry.endpointstringConfluent Schema Registry URL
schema_registry.api_keystringSchema Registry API key
schema_registry.api_secretstringSchema Registry API secret
connection_propertiesobjectAdditional librdkafka properties

JSON Schema Reference

Connection Profile Schema
{
  "type": "object",
  "properties": {
    "bootstrap_servers": {
      "type": "string",
      "description": "Comma-separated list of Kafka servers to connect to"
    },
    "authentication": {
      "oneOf": [
        {"type": "object", "title": "None"},
        {
          "type": "object",
          "title": "SASL",
          "required": ["protocol", "mechanism", "username", "password"],
          "properties": {
            "protocol": {"type": "string"},
            "mechanism": {"type": "string"},
            "username": {"type": "string"},
            "password": {"type": "string"}
          }
        },
        {
          "type": "object",
          "title": "AWS_MSK_IAM",
          "required": ["region"],
          "properties": {
            "region": {"type": "string"}
          }
        }
      ]
    },
    "schema_registry": {
      "oneOf": [
        {"type": "object", "title": "None"},
        {
          "type": "object",
          "title": "Confluent Schema Registry",
          "required": ["endpoint"],
          "properties": {
            "endpoint": {"type": "string"},
            "api_key": {"type": "string"},
            "api_secret": {"type": "string"}
          }
        }
      ]
    },
    "connection_properties": {"type": "object"}
  },
  "required": ["bootstrap_servers", "authentication"]
}
Connection Table Schema
{
  "type": "object",
  "properties": {
    "topic": {"type": "string"},
    "type": {
      "oneOf": [
        {
          "type": "object",
          "title": "Source",
          "required": ["source_config"],
          "properties": {
            "source_config": {
              "type": "object",
              "required": ["offset"],
              "properties": {
                "offset": {"type": "string", "enum": ["latest", "earliest", "group"]},
                "read_mode": {"type": "string", "enum": ["read_uncommitted", "read_committed"]},
                "group_id": {"type": "string"},
                "group_id_prefix": {"type": "string"}
              }
            }
          }
        },
        {
          "type": "object",
          "title": "Sink",
          "required": ["sink_config"],
          "properties": {
            "sink_config": {
              "type": "object",
              "required": ["commit_mode"],
              "properties": {
                "commit_mode": {"type": "string", "enum": ["at_least_once", "exactly_once"]},
                "key_field": {"type": "string"},
                "timestamp_field": {"type": "string"}
              }
            }
          }
        }
      ]
    },
    "client_configs": {"type": "object"},
    "value_subject": {"type": "string"}
  },
  "required": ["topic", "type"]
}