NIST AI RMF 1.0 Technical Security Controls for Production AI Systems
The Problem
The NIST AI Risk Management Framework (AI RMF 1.0, published January 2023) provides a vocabulary and process structure for managing risks from AI systems, but it is deliberately technology-agnostic. The framework describes what organisations should do — govern AI deployment decisions, map the context and harms, measure risk indicators, and manage identified risks — without specifying which technical controls implement each activity.
This creates a gap for practitioners. Security and platform engineers tasked with “implementing the AI RMF” often produce governance documentation (policies, roles, impact assessments) while leaving the technical substrate — model access controls, output monitoring, drift detection, incident response automation — either absent or handled ad hoc by ML engineers who are not thinking in terms of the RMF. When an audit or regulatory review asks “how does your organisation detect and respond to harmful AI outputs?”, the answer is usually a policy document with no underlying technical evidence.
The gap matters because the AI RMF is increasingly referenced in regulatory contexts. The EU AI Act (enforcement from August 2026) maps closely to RMF concepts for high-risk AI systems, and the FTC’s algorithmic accountability guidance cites RMF practices. Organisations that have operationalised the RMF technically — with tooling, metrics, and runbooks — are audit-ready; those that have only the documentation are not.
This guide maps the RMF’s four core functions to specific technical controls, organised by function and sub-function. Each control is described with an implementation example and a measurable indicator.
Target systems: Production LLM-based applications (RAG pipelines, chat interfaces, decision-support systems); model serving infrastructure (vLLM, TGI, Bedrock, Azure OpenAI); organisations subject to EU AI Act high-risk requirements or seeking SOC 2 coverage for AI components.
Threat Model
For the purposes of AI RMF technical implementation, the relevant risk classes are:
1. Model output risk (the AI system produces harmful, biased, or incorrect outputs). Objective: detect and contain outputs before they cause harm. Technical control: output monitoring with configurable blocklists and semantic classifiers.
2. Data pipeline risk (training or retrieval data is poisoned, biased, or unauthorised). Objective: validate data provenance and content before it influences model behaviour. Technical control: data lineage tracking and content scanning at ingestion.
3. Access and misuse risk (authorised or unauthorised users extract sensitive information or misuse the AI system). Objective: enforce least-privilege access to model capabilities; detect misuse patterns. Technical control: API authentication, per-user rate limiting, and anomaly detection on usage patterns.
4. Operational drift risk (model behaviour changes over time due to model updates, prompt changes, or distribution shift in inputs). Objective: detect when AI system behaviour has deviated from its tested baseline. Technical control: automated evaluation pipelines and performance regression alerting.
Hardening Configuration
GOVERN Function: Technical Controls
G1: Establish AI risk policies with technical enforcement points
# policy-as-code for AI deployment (example: OPA policy)
# Enforces that every AI system in production has:
# - An impact assessment on record
# - Output monitoring enabled
# - An incident response runbook
package ai.governance
deny[msg] {
input.resource.type == "ai_model_deployment"
not input.resource.annotations["ai.rmf/impact-assessment-date"]
msg := sprintf("Deployment %v lacks required impact assessment annotation",
[input.resource.name])
}
deny[msg] {
input.resource.type == "ai_model_deployment"
not input.resource.annotations["ai.rmf/output-monitoring-enabled"]
msg := sprintf("Deployment %v does not have output monitoring enabled",
[input.resource.name])
}
G2: Assign AI risk owners with technical accountability
# Tag every AI-related resource (models, endpoints, data stores) with owner
# Use cloud provider resource tags or Kubernetes labels
kubectl label deployment my-llm-api \
ai.rmf/owner="team-platform@example.com" \
ai.rmf/risk-tier="high" \
ai.rmf/impact-assessment-date="2026-06-01" \
ai.rmf/output-monitoring="enabled" \
ai.rmf/last-evaluated="2026-06-01"
MAP Function: Technical Controls
M1: Document AI system boundaries and data flows
Use automated discovery to enumerate AI system components:
# Scan for AI-related deployments across namespaces
kubectl get deployments --all-namespaces -o json | \
jq '.items[] | select(.spec.template.spec.containers[].image |
test("vllm|tgi|ollama|openai|anthropic|llama")) |
{namespace: .metadata.namespace, name: .metadata.name,
images: [.spec.template.spec.containers[].image]}'
# Document model artifact provenance
gcloud artifacts docker images list \
REGION-docker.pkg.dev/PROJECT_ID/ai-models/ \
--include-tags --format=json | \
jq '.[] | {image: .package, tag: .tags, created: .createTime}'
M2: Map AI system inputs and outputs for harm identification
# Structured logging for AI system I/O — feeds MAP/MEASURE functions
import structlog
log = structlog.get_logger()
async def log_ai_interaction(
request_id: str,
user_id_hash: str, # Never log raw user IDs
input_length: int,
input_categories: list[str], # Output of a content classifier
output_length: int,
output_categories: list[str], # Harmful content categories detected
model_id: str,
latency_ms: float,
) -> None:
log.info(
"ai_interaction",
request_id=request_id,
user_id_hash=user_id_hash,
input_length=input_length,
input_categories=input_categories,
output_length=output_length,
output_categories=output_categories,
model_id=model_id,
latency_ms=latency_ms,
rmf_function="MAP",
)
MEASURE Function: Technical Controls
ME1: Automated evaluation pipelines (continuous testing)
# evaluation_pipeline.py — runs on every model deployment
# Implements MEASURE function continuous evaluation
from datasets import load_dataset
import openai
EVAL_DATASET = "internal-eval-set-v3"
METRICS = {
"harmful_output_rate": 0.001, # Must be below 0.1%
"factual_accuracy": 0.92, # Must be above 92%
"response_refusal_rate": 0.05, # Must be below 5% (avoid over-refusal)
"latency_p99_ms": 2000, # Must be below 2s
}
async def run_evaluation(model_endpoint: str) -> dict[str, float]:
dataset = load_dataset(EVAL_DATASET, split="test")
results = {}
harmful_count = 0
for sample in dataset:
response = await call_model(model_endpoint, sample["prompt"])
if is_harmful(response):
harmful_count += 1
results["harmful_output_rate"] = harmful_count / len(dataset)
# ... other metrics
return results
def check_metrics_pass(results: dict) -> bool:
for metric, threshold in METRICS.items():
if metric in ["harmful_output_rate", "latency_p99_ms", "response_refusal_rate"]:
if results[metric] > threshold:
print(f"FAIL: {metric}={results[metric]} exceeds threshold {threshold}")
return False
else:
if results[metric] < threshold:
print(f"FAIL: {metric}={results[metric]} below threshold {threshold}")
return False
return True
ME2: Drift detection for production inputs
# Monitor distribution of inputs to detect drift that may indicate misuse
# or a change in user population that invalidates impact assessments
from collections import Counter
import hashlib
class InputDriftMonitor:
def __init__(self, baseline_distribution: dict):
self._baseline = baseline_distribution
self._current_window: Counter = Counter()
self._window_size = 1000
def record(self, input_text: str) -> None:
# Use content classifier categories, not raw text
categories = classify_content(input_text)
for cat in categories:
self._current_window[cat] += 1
if sum(self._current_window.values()) >= self._window_size:
self._check_drift()
self._current_window.clear()
def _check_drift(self) -> None:
total = sum(self._current_window.values())
for category, baseline_fraction in self._baseline.items():
current_fraction = self._current_window[category] / total
# Alert if distribution has shifted more than 2x baseline
if abs(current_fraction - baseline_fraction) > baseline_fraction:
alert(
"ai_input_distribution_drift",
category=category,
baseline=baseline_fraction,
current=current_fraction,
)
ME3: Bias and fairness metrics
# For AI systems making decisions that affect users, measure outcome disparities
# across demographic groups (using proxy attributes if direct attributes unavailable)
def compute_demographic_parity(decisions: list[dict]) -> dict:
"""
Compute acceptance rate by demographic group proxy.
RMF MEASURE function: track fairness metrics over time.
"""
by_group: dict[str, list[int]] = {}
for decision in decisions:
group = decision.get("user_region", "unknown")
outcome = 1 if decision["ai_decision"] == "approve" else 0
by_group.setdefault(group, []).append(outcome)
parity = {}
overall_rate = sum(sum(v) for v in by_group.values()) / sum(len(v) for v in by_group.values())
for group, outcomes in by_group.items():
group_rate = sum(outcomes) / len(outcomes)
parity[group] = {
"approval_rate": group_rate,
"disparity_from_overall": group_rate - overall_rate,
}
return parity
MANAGE Function: Technical Controls
MN1: Incident response automation for AI harms
# AlertManager rule: trigger AI harm incident response
# feeds into MANAGE function automated response
groups:
- name: ai-harm-detection
rules:
- alert: AiHarmfulOutputRateHigh
expr: |
rate(ai_harmful_outputs_total[5m]) /
rate(ai_requests_total[5m]) > 0.01
for: 2m
labels:
severity: critical
rmf_function: MANAGE
annotations:
summary: "AI harmful output rate exceeds 1%"
runbook_url: "https://wiki.internal/runbooks/ai-harm-response"
action: >
1. Page on-call ML engineer and security engineer.
2. If rate > 5%: trigger circuit breaker to divert traffic to fallback model.
3. Sample 50 recent harmful outputs for root cause analysis.
4. Open incident ticket with model ID, time range, and sample.
MN2: Circuit breaker for AI model endpoints
# Implement circuit breaker pattern for AI endpoints
# RMF MANAGE: ability to contain AI harm quickly
import asyncio
from enum import Enum
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # AI endpoint blocked; fallback active
HALF_OPEN = "half_open" # Testing recovery
class AICircuitBreaker:
def __init__(self, failure_threshold: float = 0.02, recovery_timeout: int = 300):
self.state = CircuitState.CLOSED
self._failure_threshold = failure_threshold
self._failure_count = 0
self._request_count = 0
self._recovery_timeout = recovery_timeout
async def call(self, fn, *args, **kwargs):
if self.state == CircuitState.OPEN:
return await self._fallback_response()
try:
result = await fn(*args, **kwargs)
if is_harmful(result):
self._record_failure()
else:
self._record_success()
return result
except Exception:
self._record_failure()
raise
def _record_failure(self):
self._failure_count += 1
self._request_count += 1
rate = self._failure_count / self._request_count
if rate > self._failure_threshold and self._request_count > 100:
self.state = CircuitState.OPEN
log.warning("ai_circuit_breaker_open", failure_rate=rate)
asyncio.get_event_loop().call_later(
self._recovery_timeout, self._attempt_recovery
)
MN3: Model version rollback automation
# Kubernetes deployment rollback for AI model updates
# Triggered by MANAGE function incident response
#!/bin/bash
# rollback-ai-model.sh — called by incident response automation
DEPLOYMENT_NAME=$1
NAMESPACE=${2:-ai-production}
# Record current state before rollback
kubectl rollout history deployment/${DEPLOYMENT_NAME} -n ${NAMESPACE} > \
/tmp/rollback-state-$(date +%Y%m%d%H%M%S).txt
# Rollback to previous version
kubectl rollout undo deployment/${DEPLOYMENT_NAME} -n ${NAMESPACE}
# Wait for rollback to complete
kubectl rollout status deployment/${DEPLOYMENT_NAME} -n ${NAMESPACE} --timeout=120s
# Verify harmful output rate drops
sleep 30
RATE=$(kubectl exec -n monitoring deployment/prometheus -- \
promtool query instant \
'rate(ai_harmful_outputs_total{deployment="'${DEPLOYMENT_NAME}'"}[5m]) /
rate(ai_requests_total{deployment="'${DEPLOYMENT_NAME}'"}[5m])')
echo "Post-rollback harmful output rate: ${RATE}"
Expected Behaviour After Hardening
| RMF Function | Before | After |
|---|---|---|
| GOVERN — deployment gates | No policy enforcement; any image can deploy | OPA policy rejects deployments without impact assessment annotation |
| MAP — I/O documentation | Manual wiki pages, often stale | Automated discovery + structured I/O logging feeds up-to-date model registry |
| MEASURE — evaluation | Ad hoc manual testing at release time | Automated eval pipeline runs on every deployment; failures block promotion |
| MEASURE — drift detection | None; no alert if input population shifts | Distribution drift monitor alerts within 1,000 requests |
| MANAGE — incident response | Manual paging; no containment automation | AlertManager triggers runbook; circuit breaker activates at 2% harmful rate |
Verification:
# Confirm governance policy is enforced
kubectl apply --dry-run=server -f deploy-without-annotations.yaml
# Expected: OPA policy denial
# Confirm evaluation pipeline ran on last deployment
kubectl get job -n ai-ci -l rmf.function=MEASURE --sort-by=.metadata.creationTimestamp | tail -1
# Expected: recently-run COMPLETED job
# Confirm circuit breaker status
kubectl get configmap ai-circuit-breaker-state -n ai-production -o yaml
# Expected: state: closed (healthy)
Trade-offs and Operational Considerations
| Aspect | Benefit | Cost | Mitigation |
|---|---|---|---|
| Automated evaluation pipelines | Continuous evidence of MEASURE compliance | Eval dataset must be maintained; can become a gaming target | Rotate eval sets regularly; keep a held-out test set that CI never sees |
| Policy-as-code deployment gates | Audit-ready; provable governance | Annotation drift if teams forget to update; gates can slow emergency deploys | Automate annotation from CI; document break-glass procedure |
| Circuit breaker for AI harms | Fast containment of harmful output events | Fallback model may have lower capability; users see degraded service | Define fallback model in advance; test fallback path regularly |
| Structured I/O logging | Feeds MAP and MEASURE; supports retrospective analysis | Logs contain potentially sensitive model outputs | Apply data minimisation: log categories and lengths, not raw content |
| Drift detection | Early warning of population or model shift | Baseline must be established; alert tuning required | Run drift monitor in alert-only mode for 30 days before enabling paging |
Failure Modes
| Failure | Symptom | Detection | Recovery |
|---|---|---|---|
| Eval pipeline fails without blocking deployment | Harmful outputs reach production undetected | Post-hoc harm metrics spike; retrospective review | Make eval pipeline a required CI gate; fail deployment on eval pipeline failure |
| OPA policy too strict | Emergency deployments blocked | Deployment queue backs up; engineers bypass gate | Add break-glass annotation with time-limit; log all break-glass uses |
| Circuit breaker trips on false positive (benign unusual inputs) | Legitimate traffic routed to fallback model | User-facing capability degradation; circuit breaker status dashboard | Tune harmful output classifier; add human-review override for circuit breaker |
| Drift monitor baseline is wrong | Persistent false alerts or missed real drift | Alert noise leads to suppression; real drift missed | Re-baseline from a representative 30-day production window |
| Structured I/O logs contain PII despite filtering | Privacy incident if logs exfiltrated | PII scanner on log output; privacy audit | Re-evaluate I/O logging data minimisation; add PII scrubbing at log sink |