Skip to main content
A copy-paste-runnable webhook receiver in Python + Flask. Around 80 lines. Demonstrates the four things you have to get right: raw-body capture, constant-time signature compare, idempotency on x-klikit-event-id, and fast 2xx acknowledgement.
The source also lives in the partner-api repo at examples/webhook-receiver/python so you can clone it directly.

Run it

pip install -r requirements.txt
KLIKIT_WEBHOOK_SECRET=<your secret> python server.py
Listens on :8080 and exposes POST /webhooks/klikit.

Source

server.py
"""Klikit webhook receiver — minimal reference (Python + Flask)."""
import hashlib
import hmac
import logging
import os
import threading

from flask import Flask, jsonify, request

SECRET = os.environ.get("KLIKIT_WEBHOOK_SECRET")
if not SECRET:
    raise SystemExit("KLIKIT_WEBHOOK_SECRET not set")

SECRET_BYTES = SECRET.encode("utf-8")

app = Flask(__name__)
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s")

# Replace with your own persistent store (Redis SETNX, Postgres unique index, etc).
_seen_event_ids: set[str] = set()
_seen_lock = threading.Lock()


@app.post("/webhooks/klikit")
def receive():
    sig_header = request.headers.get("x-klikit-signature")
    event_id = request.headers.get("x-klikit-event-id")
    event_type = request.headers.get("x-klikit-event-type")

    if not sig_header or not event_id or not event_type:
        return jsonify(error="missing klikit headers"), 400

    # IMPORTANT: compute HMAC over the raw body bytes, not the parsed JSON.
    raw_body = request.get_data(cache=True)
    expected = hmac.new(SECRET_BYTES, raw_body, hashlib.sha256).hexdigest()

    if not hmac.compare_digest(sig_header, expected):
        return jsonify(error="invalid signature"), 401

    with _seen_lock:
        if event_id in _seen_event_ids:
            return jsonify(status="duplicate-ignored"), 200
        _seen_event_ids.add(event_id)

    payload = request.get_json(silent=True) or {}

    # Process asynchronously so klikit's 10s timeout isn't tied to downstream work.
    threading.Thread(
        target=_handle_event, args=(event_type, payload), daemon=True
    ).start()

    return jsonify(status="received"), 200


def _handle_event(event_type: str, payload: dict) -> None:
    brand_id = payload.get("brand_id")
    branch_id = payload.get("branch_id")
    orders = payload.get("orders") or []
    logging.info(
        "[%s] brand=%s branch=%s orders=%d",
        event_type, brand_id, branch_id, len(orders),
    )

    if event_type == "klikit.order.created.v2":
        pass  # push to POS / kitchen display
    elif event_type == "klikit.order.status.updated":
        pass  # update order record
    elif event_type == "klikit.order.cart.updated":
        pass  # replace cart state
    else:
        logging.warning("unknown event type: %s", event_type)


if __name__ == "__main__":
    app.run(host="0.0.0.0", port=int(os.environ.get("PORT", "8080")))
requirements.txt
flask>=3.0

Hardening for production

The reference above runs as-is, but two things need real-world replacements before you put it in production:
  • _seen_event_ids is in-memory. Replace with Redis (SET key NX EX 86400) or a Postgres unique index. An in-memory set evaporates on every restart, letting duplicates back in.
  • threading.Thread is a single-process worker. For real throughput, push to a durable queue (SQS, Celery, RQ) and process from a separate worker so an OOM in your processor can’t drop events.
  • Use a real WSGI server (gunicorn, uvicorn with an ASGI shim, etc.) in production. Flask’s dev server is not suitable for live traffic.