EPSS-Driven CVE Patch Prioritization for Kubernetes Workloads

EPSS-Driven CVE Patch Prioritization for Kubernetes Workloads

Problem

A container image scan across a typical Kubernetes cluster produces hundreds to thousands of CVE findings. A medium-sized cluster with 50 distinct images might yield 2,000+ CVE findings after a tool like Trivy or Grype runs. The question is not “which CVEs exist?” — it is “which CVEs do we patch first?”

CVSS (Common Vulnerability Scoring System) is the default answer, but it has known limitations in this context:

CVSS measures theoretical severity, not exploitation likelihood. A CVSS 9.8 (Critical) CVE in a library that is never called via a reachable code path is less dangerous than a CVSS 7.5 CVE in a library with a public exploit actively used by threat actors. CVSS does not know about real-world exploitation activity.

CVE volume makes CVSS-only triage unsustainable. With 5,000+ CVEs published per month across all software, most security teams cannot remediate all Critical CVEs within any SLA. A realistic prioritization system must select a manageable subset — the CVEs that represent actual risk.

EPSS is designed for this gap. The Exploit Prediction Scoring System, maintained by FIRST, provides a daily-updated probability score (0–1) representing the likelihood that a given CVE will be exploited in the wild within the next 30 days. A CVE with EPSS 0.95 is predicted to be exploited within 30 days with 95% probability. A CVSS Critical CVE with EPSS 0.001 is unlikely to see active exploitation.

EPSS + CVSS together. The recommended approach: use CVSS to identify severity, and EPSS to prioritize within each severity tier. A CVSS Critical + EPSS > 0.1 CVE is a different urgency level from a CVSS Critical + EPSS 0.0005 CVE.

CISA KEV as the override. CISA’s Known Exploited Vulnerabilities catalog lists CVEs that are already being actively exploited. Any CVE in the KEV catalog is immediate priority regardless of its EPSS or CVSS score.

Target systems: any Kubernetes cluster with container image vulnerability scanning; security teams responsible for CVE remediation SLAs; platform teams managing multiple clusters with varying image update cadences.


Threat Model

Risk 1 — Misallocated remediation effort. The team spends two weeks patching 50 CVSS Critical CVEs across images. The CVEs being actively exploited in production workloads (higher EPSS) are not in the top 50 by CVSS. An attacker exploits an unpatched CVE with EPSS 0.8 while the team patches CVEs with EPSS 0.001.

Risk 2 — Alert fatigue collapses the programme. The vulnerability scanner produces 3,000 findings per week. The team cannot triage them and begins ignoring the scanner output. A KEV-listed CVE is not acted on because it is lost in the noise. EPSS-based filtering would have surfaced it immediately.

Risk 3 — SLA breach on exploited CVE. Cyber insurance requires critical CVEs to be patched within 7 days. A CVE is classified as High (CVSS 7.5) but has EPSS 0.94 and is added to KEV two days after disclosure. CVSS-only tracking would not have triggered the 7-day SLA — EPSS+KEV would.


Configuration / Implementation

Step 1 — Fetch EPSS scores for your CVE findings

#!/bin/bash
# fetch-epss-scores.sh
# Fetch EPSS scores for a list of CVE IDs from the FIRST EPSS API

CVE_LIST_FILE="${1:?Usage: $0 <cve-list-file>}"  # One CVE ID per line

# FIRST EPSS API — free, no authentication required
EPSS_API="https://api.first.org/data/1.0/epss"

# Fetch scores in batches of 100 (API limit per request)
while IFS= read -r cve_id; do
    echo "$cve_id"
done < "$CVE_LIST_FILE" | \
paste - - - - - - - - - - | \
tr '\t' ',' | \
while IFS= read -r batch; do
    curl -s "${EPSS_API}?cve=${batch}" | \
        jq -r '.data[] | "\(.cve),\(.epss),\(.percentile)"'
done

# Output format: CVE-ID, EPSS-score, EPSS-percentile
# Example: CVE-2024-1234, 0.94320, 0.99876

Step 2 — Integrate EPSS into Trivy scan output

#!/usr/bin/env python3
# scripts/trivy-epss-enrichment.py
# Enriches Trivy JSON output with EPSS scores and produces prioritized output

import json
import sys
import urllib.request
import urllib.parse

def fetch_epss_scores(cve_ids: list[str]) -> dict[str, dict]:
    """Fetch EPSS scores for a list of CVE IDs."""
    scores = {}
    
    # Batch requests (100 CVEs per request)
    batch_size = 100
    for i in range(0, len(cve_ids), batch_size):
        batch = cve_ids[i:i + batch_size]
        cve_param = ",".join(batch)
        url = f"https://api.first.org/data/1.0/epss?cve={urllib.parse.quote(cve_param)}"
        
        try:
            with urllib.request.urlopen(url, timeout=15) as resp:
                data = json.loads(resp.read())
                for item in data.get("data", []):
                    scores[item["cve"]] = {
                        "epss": float(item.get("epss", 0)),
                        "percentile": float(item.get("percentile", 0))
                    }
        except Exception as e:
            print(f"Warning: EPSS fetch failed for batch: {e}", file=sys.stderr)
    
    return scores

def fetch_kev_catalog() -> set[str]:
    """Fetch CISA Known Exploited Vulnerabilities catalog."""
    url = "https://www.cisa.gov/sites/default/files/feeds/known_exploited_vulnerabilities.json"
    try:
        with urllib.request.urlopen(url, timeout=15) as resp:
            data = json.loads(resp.read())
            return {v["cveID"] for v in data.get("vulnerabilities", [])}
    except Exception as e:
        print(f"Warning: KEV fetch failed: {e}", file=sys.stderr)
        return set()

def prioritize_finding(cvss_score: float, epss_score: float, in_kev: bool) -> str:
    """Assign a remediation priority tier based on CVSS + EPSS + KEV."""
    if in_kev:
        return "P0-KEV"  # Immediate — actively exploited
    if cvss_score >= 9.0 and epss_score >= 0.1:
        return "P1-CRITICAL"  # Critical severity + high exploitation probability
    if cvss_score >= 7.0 and epss_score >= 0.3:
        return "P1-CRITICAL"  # High severity + very high exploitation probability
    if cvss_score >= 9.0:
        return "P2-HIGH"  # Critical severity but low exploitation probability
    if epss_score >= 0.5:
        return "P2-HIGH"  # High exploitation probability regardless of CVSS
    if cvss_score >= 7.0:
        return "P3-MEDIUM"
    return "P4-LOW"

def process_trivy_output(trivy_json: dict) -> list[dict]:
    """Process Trivy JSON output and add EPSS enrichment."""
    all_cves = []
    
    for result in trivy_json.get("Results", []):
        for vuln in result.get("Vulnerabilities", []):
            cve_id = vuln.get("VulnerabilityID", "")
            if cve_id.startswith("CVE-"):
                all_cves.append({
                    "cve_id": cve_id,
                    "package": vuln.get("PkgName", ""),
                    "installed_version": vuln.get("InstalledVersion", ""),
                    "fixed_version": vuln.get("FixedVersion", ""),
                    "cvss_score": vuln.get("CVSS", {}).get("nvd", {}).get("V3Score", 0.0),
                    "severity": vuln.get("Severity", "UNKNOWN"),
                    "target": result.get("Target", ""),
                    "title": vuln.get("Title", ""),
                })
    
    if not all_cves:
        return []
    
    # Fetch EPSS scores
    cve_ids = list({v["cve_id"] for v in all_cves})
    print(f"Fetching EPSS scores for {len(cve_ids)} unique CVEs...", file=sys.stderr)
    epss_scores = fetch_epss_scores(cve_ids)
    
    # Fetch KEV catalog
    print("Fetching CISA KEV catalog...", file=sys.stderr)
    kev_cves = fetch_kev_catalog()
    
    # Enrich findings
    enriched = []
    for finding in all_cves:
        cve_id = finding["cve_id"]
        epss_data = epss_scores.get(cve_id, {"epss": 0.0, "percentile": 0.0})
        in_kev = cve_id in kev_cves
        
        finding["epss_score"] = epss_data["epss"]
        finding["epss_percentile"] = epss_data["percentile"]
        finding["in_kev"] = in_kev
        finding["priority"] = prioritize_finding(
            finding["cvss_score"], finding["epss_score"], in_kev
        )
        enriched.append(finding)
    
    # Sort by priority tier then EPSS score
    priority_order = {"P0-KEV": 0, "P1-CRITICAL": 1, "P2-HIGH": 2, "P3-MEDIUM": 3, "P4-LOW": 4}
    enriched.sort(key=lambda x: (priority_order.get(x["priority"], 99), -x["epss_score"]))
    
    return enriched

if __name__ == "__main__":
    trivy_json = json.load(sys.stdin)
    findings = process_trivy_output(trivy_json)
    
    print(f"\n{'Priority':<15} {'CVE':<20} {'CVSS':>6} {'EPSS':>6} {'KEV':>5} {'Package':<30} {'Fixed'}")
    print("-" * 110)
    
    for f in findings:
        kev_flag = "YES" if f["in_kev"] else "no"
        fixed = f["fixed_version"] or "no fix"
        print(f"{f['priority']:<15} {f['cve_id']:<20} {f['cvss_score']:>6.1f} "
              f"{f['epss_score']:>6.4f} {kev_flag:>5} {f['package']:<30} {fixed}")
    
    # Summary statistics
    p0 = sum(1 for f in findings if f["priority"] == "P0-KEV")
    p1 = sum(1 for f in findings if f["priority"] == "P1-CRITICAL")
    p2 = sum(1 for f in findings if f["priority"] == "P2-HIGH")
    
    print(f"\nSummary: {p0} P0-KEV, {p1} P1-CRITICAL, {p2} P2-HIGH, {len(findings)} total")

Step 3 — Run enriched scanning in CI

# .github/workflows/epss-vulnerability-scan.yml

name: EPSS-Enriched Container Vulnerability Scan

on:
  push:
    paths: ["**/Dockerfile", "**/*.dockerfile"]
  schedule:
    - cron: "0 6 * * *"  # Daily scan for new CVEs against existing images

jobs:
  vulnerability-scan:
    runs-on: ubuntu-latest
    
    steps:
      - uses: actions/checkout@v4
      
      - name: Scan image with Trivy
        run: |
          trivy image \
            --format json \
            --output trivy-results.json \
            --ignore-unfixed \
            ${{ env.IMAGE_REF }}
      
      - name: Enrich with EPSS and KEV
        run: |
          python3 scripts/trivy-epss-enrichment.py < trivy-results.json \
            > epss-enriched-report.txt
          cat epss-enriched-report.txt
      
      - name: Fail on P0-KEV or P1-CRITICAL findings
        run: |
          if grep -q "P0-KEV\|P1-CRITICAL" epss-enriched-report.txt; then
            echo "FAIL: KEV-listed or high-EPSS critical CVEs found"
            grep "P0-KEV\|P1-CRITICAL" epss-enriched-report.txt
            exit 1
          fi
      
      - name: Upload enriched report
        uses: actions/upload-artifact@v4
        if: always()
        with:
          name: epss-vulnerability-report
          path: epss-enriched-report.txt

Step 4 — Set EPSS-aware SLAs

# vulnerability-sla-policy.yaml
# Define patch SLAs based on EPSS + CVSS + KEV combination

patch_sla_policy:
  version: "1.0"
  
  tiers:
    - priority: "P0-KEV"
      description: "CVE in CISA Known Exploited Vulnerabilities catalog"
      sla_hours: 24
      escalation: "immediate — page on-call security"
      
    - priority: "P1-CRITICAL"
      description: "CVSS >= 9.0 AND EPSS >= 0.1, OR CVSS >= 7.0 AND EPSS >= 0.3"
      sla_days: 7
      escalation: "security team ticket, tracked daily"
      
    - priority: "P2-HIGH"
      description: "CVSS >= 9.0 with low EPSS, OR EPSS >= 0.5 with any CVSS"
      sla_days: 30
      escalation: "standard sprint backlog"
      
    - priority: "P3-MEDIUM"
      description: "CVSS >= 7.0 with low EPSS"
      sla_days: 90
      escalation: "next quarterly base image update"
      
    - priority: "P4-LOW"
      description: "All other CVEs"
      sla_days: 180
      escalation: "accepted risk; review at next major version update"
  
  # When there is no fix available, document acceptance with review date
  no_fix_policy:
    document_acceptance: true
    review_interval_days: 30
    auto_escalate_on_kev: true  # Override acceptance if CVE enters KEV

Step 5 — Dashboard and alerting for EPSS changes

#!/usr/bin/env python3
# scripts/epss-drift-monitor.py
# Alert when EPSS score for a tracked CVE rises significantly

import json
import urllib.request
from datetime import datetime
from pathlib import Path

STATE_FILE = Path("/var/lib/epss-monitor/tracked-cves.json")

def load_tracked_cves() -> dict:
    if STATE_FILE.exists():
        return json.loads(STATE_FILE.read_text())
    return {}

def save_state(state: dict):
    STATE_FILE.parent.mkdir(parents=True, exist_ok=True)
    STATE_FILE.write_text(json.dumps(state, indent=2))

def fetch_epss(cve_ids: list[str]) -> dict:
    cve_param = ",".join(cve_ids[:100])
    url = f"https://api.first.org/data/1.0/epss?cve={cve_param}"
    with urllib.request.urlopen(url, timeout=15) as resp:
        data = json.loads(resp.read())
    return {item["cve"]: float(item["epss"]) for item in data.get("data", [])}

def check_epss_changes(tracked_cves: list[str], threshold: float = 0.1):
    """Alert when EPSS score increases by more than threshold."""
    state = load_tracked_cves()
    current_scores = fetch_epss(tracked_cves)
    alerts = []
    
    for cve_id, current_score in current_scores.items():
        previous_score = state.get(cve_id, {}).get("epss", 0.0)
        delta = current_score - previous_score
        
        if delta >= threshold:
            alerts.append({
                "cve": cve_id,
                "previous": previous_score,
                "current": current_score,
                "delta": delta,
                "alert": f"EPSS for {cve_id} rose from {previous_score:.4f} to {current_score:.4f} (+{delta:.4f})"
            })
        
        state[cve_id] = {
            "epss": current_score,
            "last_updated": datetime.utcnow().isoformat()
        }
    
    save_state(state)
    return alerts

# Run daily and alert when unpatched CVEs see rising EPSS
if __name__ == "__main__":
    # Load list of unpatched CVEs from your vulnerability tracking system
    tracked = ["CVE-2024-1234", "CVE-2024-5678"]  # Replace with your tracked CVEs
    alerts = check_epss_changes(tracked, threshold=0.05)
    
    for alert in alerts:
        print(f"EPSS_RISE: {alert['alert']}")

Expected Behaviour

Scenario CVSS-only prioritization EPSS + CVSS + KEV prioritization
2,000 CVE findings from cluster scan Sort by CVSS; top 50 are “Critical” P0-KEV: ~5 findings; P1-CRITICAL: ~20; actionable subset is clear
CVSS Critical CVE, EPSS 0.0003 Immediate remediation required P2-HIGH; 30-day SLA — theoretical risk, low exploitation probability
CVSS High CVE enters CISA KEV Not differentiated from other Highs P0-KEV override — 24-hour SLA, on-call paged
EPSS score rises from 0.01 to 0.8 overnight Not detected; no re-prioritization EPSS drift monitor alerts; CVE re-prioritized to P1
Team patches CVEs in CVSS order May patch low-exploitation-probability CVEs first Patches CVEs actively exploited in the wild first

Trade-offs

Aspect Benefit Cost Mitigation
EPSS-based prioritization Focuses effort on exploitation-likely CVEs EPSS is probabilistic — a low-EPSS CVE may still be exploited Use EPSS as prioritization, not triage — all CVEs should eventually be fixed
KEV as P0 override Guarantees immediate response to active exploitation KEV lags real-world exploitation by days Supplement KEV with threat intelligence feeds for faster detection
Lower SLA for CVSS Critical + low EPSS Reduces unnecessary urgency for theoretical CVEs Stakeholders expect Critical CVEs to be patched in 7 days Document and communicate the EPSS-based policy; get explicit acceptance
Daily EPSS score refresh Catches rising exploitation probability early EPSS API dependency — offline during API outage Cache last-known scores; alert on stale data older than 48 hours

Failure Modes

Failure Symptom Detection Recovery
EPSS API unreachable during CI scan EPSS enrichment step fails; scan may block deployment CI step failure; alert on API timeout Cache yesterday’s EPSS scores locally; fall back to CVSS-only if cache is stale
CVE added to KEV after EPSS-based deferral Deferred CVE is now actively exploited Daily KEV sync alerts on new additions; cross-reference against tracked CVEs Automated KEV subscription that re-triggers triage for all tracked deferred CVEs
EPSS score not available for a new CVE New CVE has no EPSS score (scores are typically assigned within 24h) EPSS field empty in enriched output Default to CVSS-only for CVEs with no EPSS data; recheck in 48 hours
Team ignores P3/P4 backlog indefinitely Low-priority CVEs accumulate; some eventually enter KEV KEV check on all tracked CVEs, not just P0-P2 Run daily KEV check across the full tracked CVE inventory regardless of priority tier