HTTP Source
Accept HTTP POST requests as a streaming data source. Ideal for webhooks, application event ingestion, and simple integrations without a message broker.
Quick Example
apiVersion: laminar.io/v1
kind: Table
spec:
name: webhook_events
connector: http_source
config:
port: 8080
path: /events
schema:
format:
json: {}
fields:
- field_name: event_type
field_type:
type:
primitive: String
nullable: false
- field_name: payload
field_type:
type:
primitive: Json
nullable: true
- field_name: timestamp
field_type:
type:
primitive: String
nullable: falseSend data to the running pipeline:
curl -X POST http://localhost:8080/events \
-H "Content-Type: application/json" \
-d '{"event_type": "user.created", "payload": {"user_id": 123}, "timestamp": "2024-01-15T10:30:00Z"}'Configuration
Settings for the config block.
Required
| Property | Type | Description |
|---|---|---|
port | integer | Port number to listen on (e.g., 8080). The Laminar worker binds to this port locally; expose it via Kubernetes Service/Ingress or Docker port mapping for external access. |
Optional
| Property | Type | Default | Description |
|---|---|---|---|
path | string | / | HTTP endpoint path where POST requests are accepted (e.g., /events, /webhooks/stripe). Requests to other paths return 404. |
bind_address | string | 0.0.0.0 | Network interface to bind to. Use 0.0.0.0 for all interfaces, 127.0.0.1 for localhost only. |
buffer_size | integer | 1000 | Maximum HTTP requests to buffer in memory before returning 429 (backpressure). Default assumes ~100KB payloads, targeting ~100MB memory usage. Increase for smaller payloads, decrease for larger ones. |
max_body_size | integer | 5242880 | Maximum request body size in bytes. Requests exceeding this limit receive 413 (Payload Too Large). Default is 5MB. |
Notes
- No connection profile required: HTTP source runs its own embedded server
- Parallelism must be 1: Only one worker instance can bind to a given port
Schema
Settings for the schema block. Schema is required for HTTP source.
Required
| Property | Type | Description |
|---|---|---|
format | object | Data format for deserialization. See Format Options below. |
fields | array | Field definitions for the expected payload structure. Each field requires field_name, field_type, and nullable. |
Optional
| Property | Type | Description |
|---|---|---|
framing | object | How to split incoming data into records. Use newline_delimited: {} for newline-delimited JSON. |
bad_data | object | How to handle malformed records. Use drop: {} to skip bad records, or fail: {} to stop the pipeline on errors. |
Format Options
| Format | Configuration | Description |
|---|---|---|
| JSON | json: {} | JSON format (default) |
| Avro | avro: {} | Apache Avro format |
| Protobuf | protobuf: {} | Protocol Buffers format |
| Parquet | parquet: {} | Apache Parquet format |
| Raw String | raw_string: {} | Raw text, no parsing |
| Raw Bytes | raw_bytes: {} | Raw binary, no parsing |
Sending Data
| Method | Path | Body |
|---|---|---|
POST | Configured path (default /) | Data matching schema format |
Single Record
curl -X POST http://localhost:8080/events \
-d '{"event_type": "order.placed", "order_id": 12345}'Multiple Records (newline-delimited)
Configure framing in schema, then send multiple JSON objects:
schema:
format:
json: {}
framing:
newline_delimited: {}
fields:
# ...curl -X POST http://localhost:8080/events \
-d '{"event_type": "a"}
{"event_type": "b"}'Response Codes
| Code | Status | Description |
|---|---|---|
200 | OK | Data accepted |
400 | Bad Request | Empty request body |
413 | Payload Too Large | Request body exceeds max_body_size limit |
429 | Too Many Requests | Buffer full (backpressure) - retry with backoff |
503 | Service Unavailable | Pipeline shutting down |
JSON Schema Reference
Connection Table Schema
{
"type": "object",
"properties": {
"port": {
"type": "integer",
"description": "Port number to listen on",
"minimum": 1,
"maximum": 65535
},
"path": {
"type": "string",
"description": "HTTP endpoint path",
"default": "/"
},
"bind_address": {
"type": "string",
"description": "Network interface to bind to",
"default": "0.0.0.0"
},
"buffer_size": {
"type": "integer",
"description": "Maximum requests to buffer",
"default": 1000,
"minimum": 1
},
"max_body_size": {
"type": "integer",
"description": "Maximum request body size in bytes",
"default": 5242880,
"minimum": 1
}
},
"required": ["port"]
}