Work in Progress: This page is under development. Use the feedback button on the bottom right to help us improve it.

HTTP Source

Accept HTTP POST requests as a streaming data source. Ideal for webhooks, application event ingestion, and simple integrations without a message broker.

Quick Example

apiVersion: laminar.io/v1
kind: Table
spec:
  name: webhook_events
  connector: http_source
  config:
    port: 8080
    path: /events
  schema:
    format:
      json: {}
    fields:
      - field_name: event_type
        field_type:
          type:
            primitive: String
        nullable: false
      - field_name: payload
        field_type:
          type:
            primitive: Json
        nullable: true
      - field_name: timestamp
        field_type:
          type:
            primitive: String
        nullable: false

Send data to the running pipeline:

curl -X POST http://localhost:8080/events \
  -H "Content-Type: application/json" \
  -d '{"event_type": "user.created", "payload": {"user_id": 123}, "timestamp": "2024-01-15T10:30:00Z"}'

Configuration

Settings for the config block.

Required

PropertyTypeDescription
portintegerPort number to listen on (e.g., 8080). The Laminar worker binds to this port locally; expose it via Kubernetes Service/Ingress or Docker port mapping for external access.

Optional

PropertyTypeDefaultDescription
pathstring/HTTP endpoint path where POST requests are accepted (e.g., /events, /webhooks/stripe). Requests to other paths return 404.
bind_addressstring0.0.0.0Network interface to bind to. Use 0.0.0.0 for all interfaces, 127.0.0.1 for localhost only.
buffer_sizeinteger1000Maximum HTTP requests to buffer in memory before returning 429 (backpressure). Default assumes ~100KB payloads, targeting ~100MB memory usage. Increase for smaller payloads, decrease for larger ones.
max_body_sizeinteger5242880Maximum request body size in bytes. Requests exceeding this limit receive 413 (Payload Too Large). Default is 5MB.

Notes

  • No connection profile required: HTTP source runs its own embedded server
  • Parallelism must be 1: Only one worker instance can bind to a given port

Schema

Settings for the schema block. Schema is required for HTTP source.

Required

PropertyTypeDescription
formatobjectData format for deserialization. See Format Options below.
fieldsarrayField definitions for the expected payload structure. Each field requires field_name, field_type, and nullable.

Optional

PropertyTypeDescription
framingobjectHow to split incoming data into records. Use newline_delimited: {} for newline-delimited JSON.
bad_dataobjectHow to handle malformed records. Use drop: {} to skip bad records, or fail: {} to stop the pipeline on errors.

Format Options

FormatConfigurationDescription
JSONjson: {}JSON format (default)
Avroavro: {}Apache Avro format
Protobufprotobuf: {}Protocol Buffers format
Parquetparquet: {}Apache Parquet format
Raw Stringraw_string: {}Raw text, no parsing
Raw Bytesraw_bytes: {}Raw binary, no parsing

Sending Data

MethodPathBody
POSTConfigured path (default /)Data matching schema format

Single Record

curl -X POST http://localhost:8080/events \
  -d '{"event_type": "order.placed", "order_id": 12345}'

Multiple Records (newline-delimited)

Configure framing in schema, then send multiple JSON objects:

schema:
  format:
    json: {}
  framing:
    newline_delimited: {}
  fields:
    # ...
curl -X POST http://localhost:8080/events \
  -d '{"event_type": "a"}
{"event_type": "b"}'

Response Codes

CodeStatusDescription
200OKData accepted
400Bad RequestEmpty request body
413Payload Too LargeRequest body exceeds max_body_size limit
429Too Many RequestsBuffer full (backpressure) - retry with backoff
503Service UnavailablePipeline shutting down

JSON Schema Reference

Connection Table Schema
{
  "type": "object",
  "properties": {
    "port": {
      "type": "integer",
      "description": "Port number to listen on",
      "minimum": 1,
      "maximum": 65535
    },
    "path": {
      "type": "string",
      "description": "HTTP endpoint path",
      "default": "/"
    },
    "bind_address": {
      "type": "string",
      "description": "Network interface to bind to",
      "default": "0.0.0.0"
    },
    "buffer_size": {
      "type": "integer",
      "description": "Maximum requests to buffer",
      "default": 1000,
      "minimum": 1
    },
    "max_body_size": {
      "type": "integer",
      "description": "Maximum request body size in bytes",
      "default": 5242880,
      "minimum": 1
    }
  },
  "required": ["port"]
}