Zero Trust AI Gateway Patterns for Enterprise
Infrastructure Guide 4 min read

Zero Trust AI Gateway Patterns for Enterprise

Architectural patterns for securing LLM traffic at the enterprise perimeter: prompt injection filtering, PII redaction, token quota enforcement, and audit pipeline design.

By Keith Rose

Problem Statement

Enterprise adoption of LLMs introduces a new trust boundary between internal applications and external model providers. Unlike traditional API gateways, an AI Gateway must validate semantic content, enforce token economics, and prevent data exfiltration through prompt engineering.

Reference Architecture

┌──────────────┐   ┌─────────────────────────────┐   ┌──────────────┐
│   Client     │──▶│      AI Gateway Layer       │──▶│  LLM Backend │
│  (Internal)  │   │  • Prompt Firewall          │   │ (OpenAI/     │
└──────────────┘   │  • PII Redaction            │   │  Anthropic/  │
                   │  • Rate Limit (Token)       │   │  Internal)   │
                   │  • Semantic Cache           │   └──────────────┘
                   │  • Audit Pipeline           │
                   └─────────────────────────────┘

Pattern 1: Prompt Injection Firewall

Deploy a lightweight classifier before the main LLM call to detect jailbreak and indirect injection attempts.

# Gateway middleware (Python/FastAPI example)
from transformers import pipeline

_classifier = pipeline(
    "text-classification",
    model="google/judge-prompt-injection-v1",
    device="cuda"
)

async def prompt_firewall(request: PromptRequest) -> PromptRequest:
    result = _classifier(request.messages[-1].content)[0]
    if result["label"] == "INJECTION" and result["score"] > 0.85:
        raise HTTPException(status_code=400, detail="Prompt injection detected")
    return request

Production considerations:

  • Use ONNX Runtime or TensorRT for sub-10ms classification latency
  • Maintain an allowlist of internal system prompts that bypass the firewall
  • Log all blocked attempts to SIEM with full prompt context

Pattern 2: PII Redaction Pipeline

Use deterministic NER to strip sensitive entities before egress to third-party LLMs.

import spacy

_nlp = spacy.load("en_core_web_lg")
_PII_LABELS = {"PERSON", "ORG", "GPE", "EMAIL", "PHONE", "SSN", "CREDIT_CARD"}

def redact_pii(text: str) -> tuple[str, list[dict]]:
    doc = _nlp(text)
    redacted = text
    annotations = []
    for ent in doc.ents:
        if ent.label_ in _PII_LABELS:
            placeholder = f"[{ent.label_}_{len(annotations)}]"
            redacted = redacted.replace(ent.text, placeholder)
            annotations.append({"type": ent.label_, "original": ent.text})
    return redacted, annotations

Re-inject de-redacted responses on the return path using a stateful session store (Redis/AWS Secrets Manager).

Pattern 3: Token Quota Enforcement

Implement token-bucket rate limiting per user and per model family.

-- OpenResty/Lua token bucket (Redis-backed)
local function check_token_quota(user_id, requested_tokens)
    local key = "token_quota:" .. user_id
    local bucket = redis:get(key) or "0"
    local capacity = 100000  -- per-minute token budget
    local refill_rate = 1667 -- tokens per second

    local now = ngx.now() * 1000
    local last_update = redis:get(key .. ":ts") or now
    local delta = now - tonumber(last_update)
    local new_bucket = math.min(capacity, tonumber(bucket) + (delta / 1000 * refill_rate))

    if new_bucket < requested_tokens then
        return ngx.HTTP_TOO_MANY_REQUESTS
    end

    redis:set(key, new_bucket - requested_tokens)
    redis:set(key .. ":ts", now)
    return ngx.HTTP_OK
end

Pattern 4: Semantic Caching Layer

Reduce redundant inference costs by caching responses keyed on embedding vectors.

import hashlib
from sentence_transformers import SentenceTransformer

_embedder = SentenceTransformer("all-MiniLM-L6-v2")
_cache = redis.Redis(host="redis", port=6379, db=0)

async def semantic_cache_lookup(prompt: str, threshold: float = 0.94) -> str | None:
    embedding = _embedder.encode(prompt, convert_to_numpy=True)
    # Approximate nearest neighbor search via Redis Vector DB
    results = _cache.ft().search(
        redis.commands.search.query.Query("*=>[KNN 1 @embedding $vec AS score]")
        .sort_by("score")
        .return_fields("response", "score")
        .dialect(2),
        query_params={"vec": embedding.tobytes()}
    )
    if results and results.docs[0].score >= threshold:
        return results.docs[0].response
    return None

Audit and Compliance

All gateway traffic must emit structured audit events:

{
  "event_type": "llm_request",
  "timestamp": "2024-10-05T14:32:01Z",
  "user_id": "u_12345",
  "session_id": "sess_abc",
  "model": "gpt-4-turbo",
  "tokens_prompt": 1240,
  "tokens_completion": 380,
  "latency_ms": 820,
  "pii_redacted": true,
  "prompt_injection_score": 0.02,
  "cache_hit": false
}

Stream these events to an immutable log store (e.g., AWS Kinesis → S3 with object lock) for compliance retention.

Vendor Landscape

ProductDeploymentStrength
LiteLLM ProxySelf-hostedUniversal model routing
PortkeySaaS/self-hostedCost tracking & fallbacks
Cloudflare AI GatewayEdgeDDoS + caching at PoP
Kong AI GatewaySelf-hostedEnterprise policy engine

Conclusion

A hardened AI Gateway is not optional for regulated enterprises. By combining prompt firewalls, PII redaction, token quotas, and semantic caching, organizations can safely operationalize LLMs without sacrificing data sovereignty.