AWS Kinesis
Amazon Kinesis connector for reading from and writing to Kinesis streams. Uses AWS IAM credentials from the environment.
Quick Example
apiVersion: laminar.io/v1
kind: Table
spec:
name: kinesis_events
connector: kinesis
config:
stream_name: user-events
aws_region: us-east-1
type:
offset: latest
schema:
format:
json: {}
fields:
- field_name: user_id
field_type:
type:
primitive: Int64
nullable: false
- field_name: event_type
field_type:
type:
primitive: Utf8
nullable: falseConfiguration
Required
| Property | Type | Description |
|---|---|---|
stream_name | string | Kinesis stream name |
type.offset | string | Where to start reading: latest or earliest |
Optional
| Property | Type | Default | Description |
|---|---|---|---|
aws_region | string | from env | AWS region (e.g., us-east-1) |
Offset Modes
| Mode | Behavior |
|---|---|
latest | Start from newest records (skip existing) |
earliest | Start from oldest available records |
Authentication
Kinesis uses AWS IAM credentials from the environment. No connection profile is needed.
Supported credential sources:
- Environment variables (
AWS_ACCESS_KEY_ID,AWS_SECRET_ACCESS_KEY) - IAM instance profile (EC2/ECS)
- IAM role for service accounts (EKS)
JSON Schema Reference
Connection Table Schema
{
"type": "object",
"properties": {
"stream_name": {"type": "string"},
"aws_region": {"type": "string"},
"type": {
"oneOf": [
{
"title": "Source",
"required": ["offset"],
"properties": {
"offset": {"type": "string", "enum": ["latest", "earliest"]}
}
},
{
"title": "Sink",
"properties": {
"records_per_batch": {"type": "integer", "maximum": 500},
"batch_max_buffer_size": {"type": "integer", "maximum": 4000000},
"batch_flush_interval_millis": {"type": "integer"}
}
}
]
}
},
"required": ["stream_name", "type"]
}