Kafka
Apache Kafka connector for reading from and writing to Kafka topics. Supports SASL authentication, AWS MSK IAM, and Schema Registry integration.
Quick Example
apiVersion: laminar.io/v1
kind: Profile
spec:
name: my-kafka
connector: kafka
config:
bootstrap_servers: localhost:9092
authentication: {}
---
apiVersion: laminar.io/v1
kind: Table
spec:
name: events
connector: kafka
connection_profile_id: my-kafka
config:
topic: user-events
type:
source_config:
offset: latest
schema:
format:
json: {}
fields:
- field_name: user_id
field_type:
type:
primitive: Int64
nullable: false
- field_name: event_type
field_type:
type:
primitive: Utf8
nullable: falseConfiguration
Required
| Property | Type | Description |
|---|---|---|
bootstrap_servers | string | Comma-separated list of Kafka brokers (e.g., broker1:9092,broker2:9092) |
authentication | object | Authentication method (see below) |
Authentication Options
No Authentication
authentication: {}SASL
authentication:
protocol: SASL_SSL # or SASL_PLAINTEXT
mechanism: SCRAM-SHA-256 # or SCRAM-SHA-512, PLAIN
username: your-username
password: your-passwordAWS MSK IAM
authentication:
region: us-east-1Optional
| Property | Type | Description |
|---|---|---|
schema_registry.endpoint | string | Confluent Schema Registry URL |
schema_registry.api_key | string | Schema Registry API key |
schema_registry.api_secret | string | Schema Registry API secret |
connection_properties | object | Additional librdkafka properties |
JSON Schema Reference
Connection Profile Schema
{
"type": "object",
"properties": {
"bootstrap_servers": {
"type": "string",
"description": "Comma-separated list of Kafka servers to connect to"
},
"authentication": {
"oneOf": [
{"type": "object", "title": "None"},
{
"type": "object",
"title": "SASL",
"required": ["protocol", "mechanism", "username", "password"],
"properties": {
"protocol": {"type": "string"},
"mechanism": {"type": "string"},
"username": {"type": "string"},
"password": {"type": "string"}
}
},
{
"type": "object",
"title": "AWS_MSK_IAM",
"required": ["region"],
"properties": {
"region": {"type": "string"}
}
}
]
},
"schema_registry": {
"oneOf": [
{"type": "object", "title": "None"},
{
"type": "object",
"title": "Confluent Schema Registry",
"required": ["endpoint"],
"properties": {
"endpoint": {"type": "string"},
"api_key": {"type": "string"},
"api_secret": {"type": "string"}
}
}
]
},
"connection_properties": {"type": "object"}
},
"required": ["bootstrap_servers", "authentication"]
}Connection Table Schema
{
"type": "object",
"properties": {
"topic": {"type": "string"},
"type": {
"oneOf": [
{
"type": "object",
"title": "Source",
"required": ["source_config"],
"properties": {
"source_config": {
"type": "object",
"required": ["offset"],
"properties": {
"offset": {"type": "string", "enum": ["latest", "earliest", "group"]},
"read_mode": {"type": "string", "enum": ["read_uncommitted", "read_committed"]},
"group_id": {"type": "string"},
"group_id_prefix": {"type": "string"}
}
}
}
},
{
"type": "object",
"title": "Sink",
"required": ["sink_config"],
"properties": {
"sink_config": {
"type": "object",
"required": ["commit_mode"],
"properties": {
"commit_mode": {"type": "string", "enum": ["at_least_once", "exactly_once"]},
"key_field": {"type": "string"},
"timestamp_field": {"type": "string"}
}
}
}
}
]
},
"client_configs": {"type": "object"},
"value_subject": {"type": "string"}
},
"required": ["topic", "type"]
}