Data Loss Prevention for Cloud Environments: Classification, Egress Controls, and Monitoring

Data Loss Prevention for Cloud Environments: Classification, Egress Controls, and Monitoring

Problem

Data loss in cloud environments follows a small number of common patterns that are well-understood and preventable:

  • Publicly accessible S3 buckets or GCS buckets. A bucket containing PII, financial records, or configuration files is created with public ACL or a bucket policy that grants access to *. Discovery takes minutes using tools like bucket-finder or GrayhatWarfare. This pattern caused the vast majority of cloud data breaches from 2018–2023.
  • Overpermissive IAM granting cross-account access. A resource-based policy allows any AWS account ("Principal": "*") to read an S3 bucket or an RDS snapshot. An attacker who discovers the resource ARN copies the data without owning any credentials.
  • Sensitive data in environment variables and logs. Database passwords, API keys, and PII that appear in application logs are shipped to a SIEM or log aggregator and become accessible to anyone with log access — a much broader audience than intended.
  • API endpoints that return more data than needed. A GraphQL or REST endpoint returns full records including fields (SSN, card number, date of birth) that the client doesn’t need. Mass assignment or excessive data exposure leaks data to any authenticated user.
  • Bulk export by a malicious insider. An employee with legitimate data access runs a mass query or export before leaving the organisation. Without monitoring for bulk data access patterns, the export goes undetected.
  • Unencrypted data in transit between services. Internal service-to-service communication uses HTTP instead of HTTPS. A compromised host on the network captures traffic containing PII.

DLP in cloud environments is not a single product — it is a set of controls at different layers: data classification, storage access enforcement, network egress filtering, and detection of anomalous access patterns.

Target systems: AWS Macie + S3 + IAM; GCP DLP API + GCS + VPC Service Controls; Azure Information Protection; Kubernetes NetworkPolicy for pod egress; CASB integration for SaaS egress.

Threat Model

  • Adversary 1 — Public storage bucket discovery: An attacker enumerates S3 bucket names using known naming patterns (company name + environment). They discover a bucket with public read access and download all contents — customer PII, financial records, internal documents.
  • Adversary 2 — Overpermissive API exfiltration: An authenticated user (employee, contractor, API key holder) queries an API that returns full PII records. They extract all records through repeated API calls. Without rate limiting or bulk access detection, this proceeds undetected.
  • Adversary 3 — Misconfigured cross-account access: An S3 bucket’s resource policy was intended to allow access from a specific partner account but was incorrectly written to allow *. A third party who discovers the ARN copies the data.
  • Adversary 4 — Sensitive data in log egress: Application logs containing credit card numbers or SSNs are shipped to a third-party log aggregator. The log aggregator is compromised. All logged PII is exposed.
  • Adversary 5 — Insider bulk export: A departing employee with legitimate database access runs SELECT * against customer tables and exports the result. The export is not logged or is logged but never reviewed.
  • Access level: Adversary 1 needs no credentials. Adversary 2 needs API credentials. Adversary 3 needs the S3 ARN. Adversaries 4 and 5 have legitimate access.
  • Objective: Extract customer PII, financial data, intellectual property, or configuration secrets for financial gain, competitive advantage, or extortion.
  • Blast radius: A single misconfigured S3 bucket can expose millions of records. A bulk insider export can copy an entire customer database.

Configuration

Step 1: Data Classification

Classify data before applying controls — controls must match data sensitivity:

# data-classification-policy.yaml
classification_levels:
  - level: "public"
    description: "Marketing content, public documentation"
    controls:
      encryption_at_rest: optional
      encryption_in_transit: optional
      access_logging: no
      public_access: permitted

  - level: "internal"
    description: "Internal business data; no PII"
    controls:
      encryption_at_rest: required
      encryption_in_transit: required
      access_logging: recommended
      public_access: prohibited

  - level: "confidential"
    description: "PII, financial data, health data, credentials"
    controls:
      encryption_at_rest: required (CMK)
      encryption_in_transit: required (TLS 1.2+)
      access_logging: required
      public_access: prohibited
      cross_account_access: prohibited except approved partners
      bulk_export: requires approval and logging
      retention: defined (delete after N days)

  - level: "restricted"
    description: "Trade secrets, regulated data (HIPAA PHI, PCI PAN)"
    controls:
      encryption_at_rest: required (CMK, FIPS 140-2)
      encryption_in_transit: required (TLS 1.2+, certificate pinning)
      access_logging: required (tamper-evident)
      public_access: prohibited
      cross_account_access: prohibited
      egress: restricted to approved destinations only
      bulk_export: prohibited without CISO approval

Step 2: Automated Data Discovery with AWS Macie

AWS Macie scans S3 buckets for sensitive data and access configuration issues:

# Enable Macie for the account.
aws macie2 enable-macie

# Create a classification job to scan all S3 buckets.
aws macie2 create-classification-job \
  --job-type SCHEDULED \
  --schedule-frequency DAILY \
  --name "full-account-dlp-scan" \
  --s3-job-definition '{
    "bucketDefinitions": [],
    "scoping": {
      "includes": {
        "and": [{"simpleScopeTerm": {"comparator": "EQ", "key": "OBJECT_SIZE", "values": ["1"]}}]
      }
    }
  }' \
  --sampling-percentage 100

# Enable automated findings for public bucket access.
aws macie2 create-findings-filter \
  --name "public-bucket-access" \
  --action ARCHIVE \
  --finding-criteria '{
    "criterion": {
      "category": {"eq": ["CLASSIFICATION"]},
      "severity.description": {"eq": ["High", "Critical"]}
    }
  }'
# Process Macie findings and alert on sensitive data discoveries.
import boto3
import json

def process_macie_findings():
    macie = boto3.client('macie2')
    findings = macie.list_findings(
        findingCriteria={
            'criterion': {
                'severity.description': {'eq': ['HIGH', 'CRITICAL']},
                'archived': {'eq': ['false']}
            }
        }
    )

    for finding_id in findings['findingIds']:
        detail = macie.get_findings(findingIds=[finding_id])['findings'][0]
        if detail['category'] == 'CLASSIFICATION':
            alert({
                'severity': detail['severity']['description'],
                'bucket': detail['resourcesAffected']['s3Bucket']['name'],
                'sensitive_data_types': [
                    t['name'] for t in
                    detail.get('classificationDetails', {})
                          .get('result', {})
                          .get('sensitiveData', [])
                ],
                'public_access': detail['resourcesAffected']['s3Bucket'].get('publicAccess', {}),
            })

Step 3: Storage Access Controls — Prevent Public Exposure

Enforce no-public-access at the account and bucket level:

# AWS: Block public access for all buckets in the account.
aws s3control put-public-access-block \
  --account-id $ACCOUNT_ID \
  --public-access-block-configuration \
    BlockPublicAcls=true,IgnorePublicAcls=true,BlockPublicPolicy=true,RestrictPublicBuckets=true

# Apply to individual bucket (belt-and-suspenders).
aws s3api put-public-access-block \
  --bucket my-sensitive-bucket \
  --public-access-block-configuration \
    BlockPublicAcls=true,IgnorePublicAcls=true,BlockPublicPolicy=true,RestrictPublicBuckets=true
# Terraform: enforce via SCP (Service Control Policy) at the organization level.
resource "aws_organizations_policy" "block_public_s3" {
  name = "block-public-s3-access"
  type = "SERVICE_CONTROL_POLICY"

  content = jsonencode({
    Version = "2012-10-17"
    Statement = [{
      Effect   = "Deny"
      Action   = ["s3:PutBucketPublicAccessBlock"]
      Resource = "*"
      Condition = {
        "ForAnyValue:StringEquals" = {
          "s3:PublicAccessBlockConfigurationValues" = [
            "FALSE"    # Deny any attempt to enable public access.
          ]
        }
      }
    }]
  })
}

Step 4: S3 Access Logging and Anomaly Detection

Enable server access logging for all buckets containing sensitive data:

# Enable S3 access logging — all requests logged to a separate audit bucket.
aws s3api put-bucket-logging \
  --bucket sensitive-data-bucket \
  --bucket-logging-status '{
    "LoggingEnabled": {
      "TargetBucket": "audit-logs-bucket",
      "TargetPrefix": "s3-access/sensitive-data-bucket/"
    }
  }'

# Enable CloudTrail for S3 data events (object-level operations).
aws cloudtrail put-event-selectors \
  --trail-name main-trail \
  --event-selectors '[{
    "ReadWriteType": "All",
    "IncludeManagementEvents": true,
    "DataResources": [{
      "Type": "AWS::S3::Object",
      "Values": ["arn:aws:s3:::sensitive-data-bucket/"]
    }]
  }]'

Detect bulk access patterns with Athena:

-- Athena query to detect bulk S3 downloads from a single source.
-- Run daily against CloudTrail logs.
SELECT
    userIdentity.arn AS accessor,
    bucket_name,
    COUNT(*) AS request_count,
    SUM(CAST(bytes_sent AS BIGINT)) AS total_bytes
FROM cloudtrail_logs
WHERE
    eventname = 'GetObject'
    AND eventtime > date_sub(NOW(), INTERVAL 24 HOUR)
GROUP BY userIdentity.arn, bucket_name
HAVING COUNT(*) > 1000   -- Alert on >1000 GetObject requests in 24h from one identity.
ORDER BY request_count DESC;

Step 5: Network Egress Controls for PII Data

Restrict where sensitive data can go at the network layer:

# VPC Service Controls (GCP) — prevent data exfiltration from the security perimeter.
# Data in a perimeter cannot leave even with valid credentials.

from google.cloud import accesscontextmanager_v1

client = accesscontextmanager_v1.AccessContextManagerClient()

# Create a VPC Service Controls perimeter.
perimeter = {
    "name": "accessPolicies/POLICY_ID/servicePerimeters/production-perimeter",
    "title": "Production Data Perimeter",
    "status": {
        "resources": ["projects/PROJECT_NUMBER"],
        "restricted_services": [
            "storage.googleapis.com",    # GCS.
            "bigquery.googleapis.com",   # BigQuery.
            "cloudsql.googleapis.com",   # Cloud SQL.
        ],
        "vpc_accessible_services": {
            "enable_restriction": True,
            "allowed_services": ["storage.googleapis.com"]
        }
    }
}

For Kubernetes pods with access to sensitive data:

# NetworkPolicy: restrict egress from pods handling PII.
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
  name: pii-processor-egress
  namespace: payments
spec:
  podSelector:
    matchLabels:
      handles-pii: "true"
  policyTypes:
    - Egress
  egress:
    # Only allow egress to specific approved databases and APIs.
    - to:
        - podSelector:
            matchLabels:
              app: payments-db
      ports:
        - port: 5432
    - to:
        - namespaceSelector:
            matchLabels:
              kubernetes.io/metadata.name: kube-system
      ports:
        - port: 53      # DNS.
          protocol: UDP
    # Block all other egress — no sending PII to external endpoints.

Step 6: Log Scrubbing to Prevent PII in Logs

Prevent sensitive data from entering the log pipeline:

# log_scrubber/scrubber.py
import re
from typing import Any

# Patterns for common sensitive data types.
PII_PATTERNS = {
    "credit_card": re.compile(r'\b(?:\d{4}[\s-]?){3}\d{4}\b'),
    "ssn": re.compile(r'\b\d{3}-\d{2}-\d{4}\b'),
    "email": re.compile(r'\b[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}\b'),
    "api_key": re.compile(r'(?i)(api[_-]?key|token|secret|password)["\s:=]+["\']?([a-z0-9_\-]{20,})["\']?'),
    "ip_internal": re.compile(r'\b10\.\d{1,3}\.\d{1,3}\.\d{1,3}\b'),  # Internal IPs.
}

def scrub_log_entry(log_entry: dict[str, Any]) -> dict[str, Any]:
    """Remove or redact PII from a log entry before shipping."""
    scrubbed = {}
    for key, value in log_entry.items():
        if isinstance(value, str):
            scrubbed[key] = _scrub_string(value)
        elif isinstance(value, dict):
            scrubbed[key] = scrub_log_entry(value)
        else:
            scrubbed[key] = value
    return scrubbed

def _scrub_string(text: str) -> str:
    for data_type, pattern in PII_PATTERNS.items():
        if data_type == "api_key":
            text = pattern.sub(r'\1=[REDACTED]', text)
        else:
            text = pattern.sub(f'[{data_type.upper()}-REDACTED]', text)
    return text

OpenTelemetry Collector filter processor for log scrubbing in the pipeline:

# otel-collector-config.yaml
processors:
  transform/scrub_pii:
    log_statements:
      - context: log
        statements:
          # Redact credit card patterns in log body.
          - replace_pattern(body, "\\b(?:\\d{4}[\\s-]?){3}\\d{4}\\b", "[CC-REDACTED]")
          # Redact SSN patterns.
          - replace_pattern(body, "\\b\\d{3}-\\d{2}-\\d{4}\\b", "[SSN-REDACTED]")
          # Drop logs with high PII density (>3 redacted fields = likely a record dump).
          - >
            where count_substrings(body, "REDACTED") > 3

Step 7: Bulk Access Detection and Response

# dlp_monitor/bulk_access_detector.py
from dataclasses import dataclass
from datetime import datetime, UTC, timedelta
from collections import defaultdict

@dataclass
class AccessEvent:
    accessor: str
    resource: str
    bytes_accessed: int
    timestamp: datetime
    operation: str

class BulkAccessDetector:
    def __init__(self, window_seconds: int = 3600, threshold_bytes: int = 100_000_000):
        self.window = timedelta(seconds=window_seconds)
        self.threshold_bytes = threshold_bytes    # 100 MB.
        self.access_log: dict[str, list[AccessEvent]] = defaultdict(list)

    def record_access(self, event: AccessEvent):
        key = f"{event.accessor}:{event.resource}"
        self.access_log[key].append(event)
        self._clean_old_events(key, event.timestamp)
        self._check_threshold(key, event)

    def _clean_old_events(self, key: str, now: datetime):
        cutoff = now - self.window
        self.access_log[key] = [
            e for e in self.access_log[key] if e.timestamp > cutoff
        ]

    def _check_threshold(self, key: str, latest: AccessEvent):
        total_bytes = sum(e.bytes_accessed for e in self.access_log[key])
        if total_bytes > self.threshold_bytes:
            self._alert_bulk_access(key, total_bytes, latest)

    def _alert_bulk_access(self, key: str, bytes_accessed: int, event: AccessEvent):
        accessor, resource = key.split(":", 1)
        raise_security_alert(
            severity="HIGH",
            title=f"Bulk data access detected",
            details={
                "accessor": accessor,
                "resource": resource,
                "bytes_accessed_1h": bytes_accessed,
                "threshold_bytes": self.threshold_bytes,
                "latest_event": event,
            }
        )

Step 8: Telemetry

dlp_sensitive_data_findings_total{bucket, data_type, severity}    counter
dlp_public_bucket_count{}                                          gauge
dlp_bulk_access_alerts_total{accessor, resource}                   counter
dlp_log_pii_redactions_total{data_type, service}                   counter
dlp_egress_policy_blocks_total{destination, policy}                counter
dlp_cross_account_access_events_total{source_account, bucket}      counter

Alert on:

  • dlp_public_bucket_count > 0 — a bucket is publicly accessible; immediate investigation and remediation.
  • dlp_sensitive_data_findings_total{severity="CRITICAL"} — Macie found critical PII in an unexpected location; review data placement.
  • dlp_bulk_access_alerts_total — an identity accessed >100 MB of data in an hour; possible insider threat or compromised credential.
  • dlp_cross_account_access_events_total from unexpected source_account — unexpected cross-account data access; verify resource policy.
  • dlp_egress_policy_blocks_total spike — something is attempting to send data to blocked destinations; investigate the source.

Expected Behaviour

Signal No DLP DLP controls in place
Publicly misconfigured bucket Discoverable by anyone; no alert Blocked by account-level public access block; Macie detects if bypassed
Bulk export by insider Undetected Bulk access detector fires after threshold crossed
PII in log pipeline Shipped to SIEM, accessible to all log readers Scrubbed before shipping; REDACTED markers instead
Cross-account data access Allowed by misconfigured policy SCP blocks disabling public access block; Macie alerts on cross-account findings
Pod sending data to external endpoint Permitted if firewall allows NetworkPolicy blocks all egress except approved destinations

Trade-offs

Aspect Benefit Cost Mitigation
Account-level public access block Prevents all public buckets Breaks any intentional public static hosting Use a separate AWS account or GCS project for public content
Log PII scrubbing Prevents PII in SIEM Scrubber must be maintained; false negatives possible Combine with Macie log scanning; treat as defence-in-depth
VPC Service Controls (GCP) Strong perimeter; exfiltration resistant Complex configuration; can break legitimate cross-project access Test thoroughly; use dry-run mode first
Bulk access detection threshold Catches bulk export May alert on legitimate ETL or backup jobs Allowlist known high-volume service accounts; tune threshold per accessor type

Failure Modes

Failure Symptom Detection Recovery
Macie job fails No DLP scan results; findings stop No new findings despite expected activity; job status = failed Restart Macie job; check IAM permissions for Macie service role
Log scrubber pattern mismatch New PII format not redacted Manual audit of log samples; Macie log scanning Add pattern to scrubber; retroactively remove any leaked PII from SIEM
NetworkPolicy too restrictive Pod cannot reach approved database Application errors; pod logs show connection refused Check NetworkPolicy selector labels; verify egress rules match pod labels
Bulk access false positive on ETL job Alert fires every night during backup Review alert history; identify recurring accessor/resource pair Allowlist the ETL service account with higher threshold
Public access block bypassed via pre-signed URL Sensitive data accessible via signed URL Macie does not detect pre-signed URL access Monitor CloudTrail for GeneratePresignedUrl calls on sensitive buckets