Application Security Logging: Structured Events, PII Redaction, and SIEM Integration
Problem
Application logs are the record of what your system did, to whom, and with what result. In a security incident, they answer: which account was accessed, when did the attacker authenticate, what data did they read, and which API calls preceded the breach.
Most application logging is built for debugging, not security:
- Authentication events log the wrong fields. A successful login logs
"User logged in"with no username, IP, or session ID. A failed login logs nothing, or logs the attempted username in plaintext — useful for debugging, but not for SIEM correlation. - Authorisation decisions are unlogged. When a user is denied access to a resource, the event is silently dropped. An attacker probing access controls leaves no trace.
- PII leaks into logs. Request bodies containing credit card numbers, SSNs, or passwords are logged verbatim. These logs are stored in SIEM systems, S3 buckets, and shipped to third-party log aggregators — each an exposure point.
- No correlation ID across services. A user action that touches five microservices produces five unconnected log entries. Reconstructing the full request path during an incident requires hours of manual correlation.
- Log levels swallow security events. Security-relevant events (auth failures, privilege escalation, data exports) are logged at DEBUG or INFO and purged after 7 days. The attacker’s activity is gone before the investigation starts.
Target systems: Any application producing logs; structuring applies equally to Go, Python, Node.js, Java; SIEM targets: Elasticsearch/OpenSearch, Splunk, Google Chronicle, Datadog. Specific examples use Go’s slog, Python’s structlog, and OpenTelemetry log exporter.
Threat Model
- Adversary 1 — Credential stuffing: An attacker attempts thousands of username/password combinations against the login endpoint. Without per-IP failed-auth logging, the attack is invisible until accounts are compromised.
- Adversary 2 — IDOR (Insecure Direct Object Reference): An attacker increments a user ID in API calls to read other users’ data. Without logging which user ID was accessed alongside the requesting user ID, the probe is undetectable in logs.
- Adversary 3 — Privilege escalation: An attacker with a low-privilege account calls an admin API endpoint. Without logging the denied authorisation decision with the caller’s identity, the probe is invisible.
- Adversary 4 — Data exfiltration: An attacker with legitimate API access exports large volumes of records. Without logging response sizes and record counts, bulk export is indistinguishable from normal usage.
- Adversary 5 — Log injection: An attacker submits input containing newlines or JSON-breaking characters to manipulate structured log output and forge false entries. Structured logging with proper escaping prevents this.
- Access level: Adversaries 1 and 2 have API access. Adversary 3 has authenticated user access. Adversary 4 has legitimate but over-scoped API access. Adversary 5 controls input to a logged field.
- Objective: Exfiltrate data without detection, probe access controls without generating alerts, cover tracks by manipulating logs.
- Blast radius: Without security-grade application logging, an attacker has a wide window to probe, escalate, and exfiltrate with no trace in the logs. With structured security events, every significant action creates an auditable record correlated to a session and user identity.
Configuration
Step 1: Define the Security Event Schema
Agree on a schema for all security-relevant events before writing any code. Every event must carry:
{
"timestamp": "2026-04-30T12:34:56.789Z",
"level": "INFO",
"event_type": "auth.login.success",
"trace_id": "4bf92f3577b34da6a3ce929d0e0e4736",
"span_id": "00f067aa0ba902b7",
"service": "payments-api",
"version": "v2.3.1",
"user_id": "usr_abc123",
"session_id": "sess_xyz789",
"ip": "203.0.113.42",
"user_agent": "Mozilla/5.0 ...",
"result": "success",
"duration_ms": 45
}
Event type taxonomy (use dot-separated hierarchical names for SIEM filtering):
auth.login.success
auth.login.failure
auth.logout
auth.mfa.challenge.success
auth.mfa.challenge.failure
auth.password.reset.requested
auth.session.expired
authz.access.granted
authz.access.denied
authz.role.assigned
authz.role.removed
data.read # record count and resource type
data.write
data.delete
data.export # bulk operations; include record count
api.rate_limit.exceeded
api.input.validation.failure
admin.user.created
admin.user.suspended
admin.config.changed
Step 2: Instrument Authentication Events (Go/slog)
package security
import (
"context"
"log/slog"
"net/http"
"time"
)
type SecurityLogger struct {
logger *slog.Logger
}
func (s *SecurityLogger) LogAuthEvent(ctx context.Context, r *http.Request, eventType string, attrs ...slog.Attr) {
args := []any{
slog.String("event_type", eventType),
slog.String("trace_id", traceIDFromContext(ctx)),
slog.String("span_id", spanIDFromContext(ctx)),
slog.String("ip", clientIP(r)),
slog.String("user_agent", r.Header.Get("User-Agent")),
slog.String("method", r.Method),
slog.String("path", r.URL.Path),
slog.Time("timestamp", time.Now().UTC()),
}
for _, a := range attrs {
args = append(args, a)
}
// Always use WARN or higher for security events; they must survive log retention filters.
s.logger.LogAttrs(ctx, slog.LevelWarn, "security_event", args...)
}
// Usage in auth handler:
func (h *AuthHandler) Login(w http.ResponseWriter, r *http.Request) {
start := time.Now()
username := r.FormValue("username") // Validated; not logged directly.
userID, err := h.authenticate(r)
if err != nil {
h.security.LogAuthEvent(r.Context(), r, "auth.login.failure",
slog.String("result", "failure"),
slog.String("reason", classifyAuthError(err)), // "invalid_password", "account_locked", etc.
slog.String("user_id_hash", hashUserID(username)), // Hashed, not plaintext.
slog.Duration("duration", time.Since(start)),
)
// Return generic error to client; specific reason only in logs.
http.Error(w, "Authentication failed", http.StatusUnauthorized)
return
}
h.security.LogAuthEvent(r.Context(), r, "auth.login.success",
slog.String("result", "success"),
slog.String("user_id", userID),
slog.String("session_id", sessionID),
slog.Duration("duration", time.Since(start)),
)
}
Step 3: Instrument Authorisation Events (Python/structlog)
import structlog
import functools
from typing import Callable
security_log = structlog.get_logger("security")
def require_permission(permission: str):
"""Decorator that logs authorisation decisions for every protected endpoint."""
def decorator(fn: Callable):
@functools.wraps(fn)
def wrapper(request, *args, **kwargs):
user = request.user
resource_id = kwargs.get("id") or kwargs.get("resource_id")
if not user.has_permission(permission):
security_log.warning(
"authz.access.denied",
user_id=user.id,
permission=permission,
resource_id=resource_id,
resource_type=fn.__name__,
ip=get_client_ip(request),
trace_id=request.headers.get("X-Trace-ID"),
result="denied",
)
raise PermissionDenied()
result = fn(request, *args, **kwargs)
security_log.info(
"authz.access.granted",
user_id=user.id,
permission=permission,
resource_id=resource_id,
resource_type=fn.__name__,
trace_id=request.headers.get("X-Trace-ID"),
result="granted",
)
return result
return wrapper
return decorator
# Usage:
@require_permission("payments:read")
def get_payment(request, payment_id: str):
payment = Payment.objects.get(id=payment_id, owner=request.user)
security_log.info(
"data.read",
user_id=request.user.id,
resource_type="payment",
resource_id=payment_id,
record_count=1,
trace_id=request.headers.get("X-Trace-ID"),
)
return payment
Log bulk data access with record counts (detects exfiltration):
def export_payments(request):
queryset = Payment.objects.filter(owner=request.user)
count = queryset.count()
security_log.warning(
"data.export",
user_id=request.user.id,
resource_type="payment",
record_count=count,
filter_params=safe_filter_repr(request.GET), # Sanitised, not raw query string.
trace_id=request.headers.get("X-Trace-ID"),
result="success" if count <= MAX_EXPORT else "blocked",
)
if count > MAX_EXPORT:
raise PermissionDenied("Export limit exceeded")
return queryset
Step 4: PII Redaction Before Logging
Never log raw PII. Redact at the point of log creation, not post-hoc:
import hashlib, re
CARD_PATTERN = re.compile(r'\b(?:\d{4}[\s-]?){3}\d{4}\b')
SSN_PATTERN = re.compile(r'\b\d{3}-\d{2}-\d{4}\b')
EMAIL_PATTERN = re.compile(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b')
def redact_pii(value: str) -> str:
"""Replace known PII patterns with redaction markers."""
value = CARD_PATTERN.sub('[CARD_REDACTED]', value)
value = SSN_PATTERN.sub('[SSN_REDACTED]', value)
value = EMAIL_PATTERN.sub('[EMAIL_REDACTED]', value)
return value
def hash_user_identifier(identifier: str, salt: str) -> str:
"""Pseudonymise user identifiers for logging. Same input always produces
the same hash — sufficient for correlation without storing plaintext."""
return hashlib.sha256(f"{salt}:{identifier}".encode()).hexdigest()[:16]
# structlog processor to automatically redact PII from all log messages.
def pii_redact_processor(logger, method, event_dict):
for key in ["message", "error", "query", "input"]:
if key in event_dict and isinstance(event_dict[key], str):
event_dict[key] = redact_pii(event_dict[key])
return event_dict
structlog.configure(
processors=[
pii_redact_processor,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.JSONRenderer(),
]
)
Fields that must never be logged in plaintext:
- Passwords or password hashes
- Full credit card numbers (last 4 digits only)
- SSNs, national ID numbers
- Full email addresses (use hashed version for correlation)
- API keys and tokens (log key ID, not value)
- Authentication cookies or session tokens
Step 5: Correlation IDs Across Microservices
Every request must carry a trace ID that flows through all service calls:
// Middleware: inject a trace ID on every incoming request if not present.
func TraceMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
traceID := r.Header.Get("X-Trace-ID")
if traceID == "" {
traceID = generateTraceID()
}
// Forward to downstream services.
r.Header.Set("X-Trace-ID", traceID)
// Add to context for log extraction.
ctx := context.WithValue(r.Context(), traceIDKey, traceID)
// Add to response so clients can reference it.
w.Header().Set("X-Trace-ID", traceID)
next.ServeHTTP(w, r.WithContext(ctx))
})
}
Use OpenTelemetry trace context (W3C traceparent header) for interoperability:
import "go.opentelemetry.io/otel/propagation"
// Extract trace context from incoming request.
ctx := otel.GetTextMapPropagator().Extract(r.Context(), propagation.HeaderCarrier(r.Header))
// Inject into outgoing requests to downstream services.
otel.GetTextMapPropagator().Inject(ctx, propagation.HeaderCarrier(req.Header))
Step 6: Ship to SIEM with Retention Tiers
Security events must be retained longer than debug logs:
# Filebeat/Vector pipeline: separate security events from debug logs.
vector:
sources:
app_logs:
type: file
include: ["/var/log/app/*.json"]
transforms:
security_events:
type: filter
inputs: [app_logs]
condition: .level == "WARN" || .level == "ERROR" || .event_type != null
debug_logs:
type: filter
inputs: [app_logs]
condition: .level == "DEBUG" || .level == "INFO"
sinks:
siem:
type: elasticsearch
inputs: [security_events]
endpoint: https://siem.internal:9200
index: security-events-%Y.%m.%d
# Retention: 365 days via ILM.
debug_store:
type: s3
inputs: [debug_logs]
bucket: app-debug-logs
# Retention: 14 days via S3 lifecycle.
Set minimum log level for security events to WARN so they survive log-level filtering:
// Security events always log at WARN regardless of the application's log level setting.
s.logger.LogAttrs(ctx, slog.LevelWarn, "security_event", ...)
Step 7: SIEM Alert Rules
With structured events in the SIEM, write precise alert queries:
# Elasticsearch: credential stuffing detection.
# Alert if more than 10 auth.login.failure events from the same IP in 5 minutes.
{
"query": {
"bool": {
"must": [
{"term": {"event_type": "auth.login.failure"}},
{"range": {"@timestamp": {"gte": "now-5m"}}}
]
}
},
"aggs": {
"by_ip": {
"terms": {"field": "ip", "min_doc_count": 10}
}
}
}
# IDOR probe: one user reading many different users' resources.
# Alert if user X reads > 50 distinct resource IDs belonging to other users.
{
"query": {
"bool": {
"must": [
{"term": {"event_type": "data.read"}},
{"range": {"@timestamp": {"gte": "now-1h"}}}
],
"must_not": [
{"term": {"user_id": "${resource_owner_id}"}}
]
}
},
"aggs": {
"by_user": {"terms": {"field": "user_id", "min_doc_count": 50}}
}
}
Step 8: Telemetry
security_event_total{event_type, result, service} counter
security_event_pii_redacted_total{field, service} counter
auth_failure_rate{ip, user_id_hash} gauge
authz_denial_rate{user_id, resource_type} gauge
data_export_record_count{user_id} histogram
log_correlation_gap_total{service} counter (events missing trace_id)
Alert on:
auth_failure_rateper IP > 10/5min — credential stuffing.authz_denial_rateper user > 5/min — access control probing.data_export_record_count> threshold — bulk exfiltration candidate.log_correlation_gap_total> 0 — events missing trace IDs; correlation gaps in incident investigation.
Expected Behaviour
| Signal | Unstructured debug logs | Structured security logging |
|---|---|---|
| Failed login | ERROR login failed (no IP, no user) |
JSON event with IP, user hash, reason, duration |
| Authorisation denial | Not logged | authz.access.denied with user, resource, permission |
| Bulk data export | Not distinguishable | data.export with record count; alert if > threshold |
| PII in request body | Logged verbatim | Redacted before reaching log sink |
| Cross-service correlation | Not possible | Trace ID follows request through all services |
| SIEM retention | Mixed with debug (short retention) | Security events on 365-day retention tier |
Trade-offs
| Aspect | Benefit | Cost | Mitigation |
|---|---|---|---|
| WARN level for security events | Survive log-level filtering | More noise in WARN logs | Filter SIEM on event_type field rather than log level for precision. |
| Structured JSON logging | Machine-parseable; SIEM-ready | Larger log volume than text | Compress at sink; structured logs compress well (repeated field names). |
| PII redaction at source | No PII reaches log sinks | May lose some debugging context | Use pseudonymised hash for correlation; store raw only in encrypted audit store if needed. |
| Trace ID propagation | Full request lineage | Requires middleware in every service | Add once to HTTP middleware and gRPC interceptor; flows automatically thereafter. |
| Record count in data.read logs | Enables exfiltration detection | Slightly more work per data access | Add to data access abstraction layer once; applies to all callers. |
Failure Modes
| Failure | Symptom | Detection | Recovery |
|---|---|---|---|
| Security events dropped at WARN level filter | Auth failures invisible in SIEM | security_event_total drops; SIEM shows no auth events |
Check log pipeline log-level configuration; ensure WARN and above flows to SIEM. |
| PII redaction regex too narrow | New PII format reaches logs | Periodic PII scan of log sample | Expand regex patterns; add test cases for new PII formats. |
| Trace ID missing from downstream service | Correlation gaps in investigation | log_correlation_gap_total counter rises |
Add propagation middleware to the service that’s dropping trace IDs. |
| Log injection via unescaped user input | Forged log entries in SIEM | Suspicious JSON-breaking characters in log fields | Use JSON logger (not string interpolation); JSON escapes automatically. |
| SIEM ingestion lag | Security events arrive > 5 minutes late | SIEM ingest lag metric | Check Vector/Filebeat pipeline throughput; scale ingestion workers. |
| Bulk export not logged due to streaming | Large exports bypass record count logging | No data.export events despite high data volume |
Log count before streaming; or log a checkpoint every N records. |