Security
How to Use AWS Security Hub for Centralized Threat Management
A comprehensive guide to implementing and leveraging AWS Security Hub for centralized security monitoring and threat management.
February 8, 2024
DevHub Team
6 min read
Learn how to implement AWS Security Hub for centralized security management, automated security checks, and compliance monitoring across AWS accounts. This comprehensive guide covers setup, integration, and best practices.
AWS Security Hub Architecture
    graph TB
      subgraph "Security Hub"
        SH["Security Hub"]
        Findings["Findings"]
        Insights["Insights"]
        Controls["Security Controls"]
      end
      
      subgraph "Security Services"
        GD["GuardDuty"]
        Inspector["Inspector"]
        Macie["Macie"]
        IAM["IAM Access Analyzer"]
      end
      
      subgraph "Response"
        EventBridge["EventBridge"]
        Lambda["Lambda"]
        SNS["SNS"]
      end
      
      GD --> SH
      Inspector --> SH
      Macie --> SH
      IAM --> SH
      
      SH --> Findings
      Findings --> Insights
      Controls --> Findings
      
      Findings --> EventBridge
      EventBridge --> Lambda
      Lambda --> SNS
      
      style SH fill:#3b82f6,stroke:#2563eb,color:white
      style Findings fill:#3b82f6,stroke:#2563eb,color:white
      style Insights fill:#3b82f6,stroke:#2563eb,color:white
      style Controls fill:#3b82f6,stroke:#2563eb,color:white
      style GD fill:#f1f5f9,stroke:#64748b
      style Inspector fill:#f1f5f9,stroke:#64748b
      style Macie fill:#f1f5f9,stroke:#64748b
      style IAM fill:#f1f5f9,stroke:#64748b
      style EventBridge fill:#f1f5f9,stroke:#64748b
      style Lambda fill:#f1f5f9,stroke:#64748b
      style SNS fill:#f1f5f9,stroke:#64748b
    
Understanding AWS Security Hub
AWS Security Hub provides:
- Centralized Security Management: Single pane of glass for security findings
- Automated Security Checks: Continuous monitoring and assessment
- Compliance Monitoring: Track compliance with security standards
- Integration: Works with AWS security services and third-party tools
- Automated Response: Trigger actions based on security findings
Implementation Guide
1. Initial Setup
Enable Security Hub using AWS SDK:
// Security Hub service import { SecurityHubClient, EnableSecurityHubCommand, EnableStandardsCommand, GetEnabledStandardsCommand, BatchEnableStandardsCommand, StandardsSubscriptionRequest } from '@aws-sdk/client-securityhub'; interface SecurityHubConfig { region: string; standards: string[]; } class SecurityHubService { private client: SecurityHubClient; constructor(config: SecurityHubConfig) { this.client = new SecurityHubClient({ region: config.region }); } async enableSecurityHub(): Promise<void> { try { const command = new EnableSecurityHubCommand({ EnableDefaultStandards: true, Tags: { Environment: 'Production', Service: 'SecurityHub' } }); await this.client.send(command); console.log('Security Hub enabled successfully'); } catch (error) { console.error('Error enabling Security Hub:', error); throw error; } } async enableSecurityStandards(standards: StandardsSubscriptionRequest[]): Promise<void> { try { const command = new BatchEnableStandardsCommand({ StandardsSubscriptionRequests: standards }); await this.client.send(command); console.log('Security standards enabled successfully'); } catch (error) { console.error('Error enabling security standards:', error); throw error; } } async getEnabledStandards(): Promise<void> { try { const command = new GetEnabledStandardsCommand({}); const response = await this.client.send(command); console.log('Enabled standards:', response.StandardsSubscriptions); } catch (error) { console.error('Error getting enabled standards:', error); throw error; } } } // Usage example async function main() { const securityHub = new SecurityHubService({ region: 'us-west-2', standards: [ 'arn:aws:securityhub:::ruleset/cis-aws-foundations-benchmark/v/1.2.0', 'arn:aws:securityhub:us-west-2::standards/aws-foundational-security-best-practices/v/1.0.0' ] }); // Enable Security Hub await securityHub.enableSecurityHub(); // Enable security standards const standards: StandardsSubscriptionRequest[] = [ { StandardsArn: 'arn:aws:securityhub:::ruleset/cis-aws-foundations-benchmark/v/1.2.0', EnabledRegions: ['us-west-2'] }, { StandardsArn: 'arn:aws:securityhub:us-west-2::standards/aws-foundational-security-best-practices/v/1.0.0', EnabledRegions: ['us-west-2'] } ]; await securityHub.enableSecurityStandards(standards); // Get enabled standards await securityHub.getEnabledStandards(); }
2. Processing Findings
Process Security Hub findings:
# Security Hub findings processor import boto3 import json from datetime import datetime, timezone from typing import Dict, List, Optional class SecurityHubProcessor: def __init__(self, region: str): self.client = boto3.client('securityhub', region_name=region) self.sns = boto3.client('sns', region_name=region) def get_findings(self, filters: Optional[Dict] = None) -> List[Dict]: """Get Security Hub findings with optional filters.""" try: if filters is None: filters = { 'RecordState': [{'Value': 'ACTIVE', 'Comparison': 'EQUALS'}], 'WorkflowStatus': [{'Value': 'NEW', 'Comparison': 'EQUALS'}] } response = self.client.get_findings(Filters=filters) return response['Findings'] except Exception as e: print(f"Error getting findings: {str(e)}") return [] def update_finding_status(self, finding_id: str, status: str) -> bool: """Update the workflow status of a finding.""" try: self.client.update_findings( FindingIdentifiers=[ { 'Id': finding_id, 'ProductArn': finding_id.split('/')[0] } ], WorkflowStatus=status ) return True except Exception as e: print(f"Error updating finding status: {str(e)}") return False def create_insight(self, name: str, filters: Dict, group_by_attribute: str) -> str: """Create a custom insight based on findings.""" try: response = self.client.create_insight( Name=name, Filters=filters, GroupByAttribute=group_by_attribute ) return response['InsightArn'] except Exception as e: print(f"Error creating insight: {str(e)}") return '' def process_high_severity_findings(self, sns_topic_arn: str) -> None: """Process high severity findings and send notifications.""" filters = { 'SeverityLabel': [{'Value': 'HIGH', 'Comparison': 'EQUALS'}], 'RecordState': [{'Value': 'ACTIVE', 'Comparison': 'EQUALS'}], 'WorkflowStatus': [{'Value': 'NEW', 'Comparison': 'EQUALS'}] } findings = self.get_findings(filters) for finding in findings: # Extract relevant information finding_id = finding['Id'] title = finding['Title'] description = finding['Description'] resource_id = finding['Resources'][0]['Id'] # Create notification message message = { 'finding_id': finding_id, 'title': title, 'description': description, 'resource_id': resource_id, 'severity': 'HIGH', 'timestamp': datetime.now(timezone.utc).isoformat() } # Send notification self.sns.publish( TopicArn=sns_topic_arn, Message=json.dumps(message), Subject='High Severity Security Finding' ) # Update finding status self.update_finding_status(finding_id, 'NOTIFIED') def create_compliance_report(self) -> Dict: """Generate a compliance report based on findings.""" try: # Get compliance-related findings filters = { 'ComplianceStatus': [{'Value': 'FAILED', 'Comparison': 'EQUALS'}], 'RecordState': [{'Value': 'ACTIVE', 'Comparison': 'EQUALS'}] } findings = self.get_findings(filters) # Aggregate findings by standard standards_summary = {} for finding in findings: standard = finding.get('Compliance', {}).get('SecurityControlId', 'Unknown') if standard not in standards_summary: standards_summary[standard] = { 'failed': 0, 'resources': set() } standards_summary[standard]['failed'] += 1 for resource in finding['Resources']: standards_summary[standard]['resources'].add(resource['Id']) # Format report report = { 'generated_at': datetime.now(timezone.utc).isoformat(), 'total_findings': len(findings), 'standards_summary': { standard: { 'failed_controls': info['failed'], 'affected_resources': len(info['resources']) } for standard, info in standards_summary.items() } } return report except Exception as e: print(f"Error generating compliance report: {str(e)}") return {} # Usage example def main(): processor = SecurityHubProcessor('us-west-2') # Process high severity findings processor.process_high_severity_findings( 'arn:aws:sns:us-west-2:123456789012:security-alerts' ) # Create custom insight filters = { 'SeverityLabel': [{'Value': 'HIGH', 'Comparison': 'EQUALS'}], 'ResourceType': [{'Value': 'AwsIamUser', 'Comparison': 'EQUALS'}] } processor.create_insight( 'High Severity IAM Findings', filters, 'ResourceId' ) # Generate compliance report report = processor.create_compliance_report() print(json.dumps(report, indent=2)) if __name__ == '__main__': main()
Security Hub Findings Flow
    graph TB
      subgraph "Finding Sources"
        GD["GuardDuty Finding"]
        Inspector["Inspector Finding"]
        Custom["Custom Finding"]
      end
      
      subgraph "Processing"
        Ingest["Ingest Finding"]
        Enrich["Enrich Data"]
        Analyze["Analyze Severity"]
      end
      
      subgraph "Response"
        High["High Severity"]
        Medium["Medium Severity"]
        Low["Low Severity"]
      end
      
      GD --> Ingest
      Inspector --> Ingest
      Custom --> Ingest
      
      Ingest --> Enrich
      Enrich --> Analyze
      
      Analyze --> High
      Analyze --> Medium
      Analyze --> Low
      
      High --> Alert["Send Alert"]
      Medium --> Ticket["Create Ticket"]
      Low --> Log["Log Finding"]
      
      style GD fill:#3b82f6,stroke:#2563eb,color:white
      style Inspector fill:#3b82f6,stroke:#2563eb,color:white
      style Custom fill:#3b82f6,stroke:#2563eb,color:white
      style Ingest fill:#f1f5f9,stroke:#64748b
      style Enrich fill:#f1f5f9,stroke:#64748b
      style Analyze fill:#f1f5f9,stroke:#64748b
      style High fill:#f1f5f9,stroke:#64748b
      style Medium fill:#f1f5f9,stroke:#64748b
      style Low fill:#f1f5f9,stroke:#64748b
      style Alert fill:#f1f5f9,stroke:#64748b
      style Ticket fill:#f1f5f9,stroke:#64748b
      style Log fill:#f1f5f9,stroke:#64748b
    
Best Practices
1. Security Score
- Monitor security score regularly
- Track score trends
- Address findings promptly
- Document improvements
- Regular assessments
2. Compliance
- Enable relevant standards
- Regular compliance checks
- Document exceptions
- Track remediation
- Maintain evidence
3. Integration
- Enable all relevant services
- Configure automated responses
- Set up notifications
- Monitor integration health
- Regular testing
4. Monitoring
- Set up dashboards
- Configure alerts
- Track metrics
- Regular reviews
- Incident response
Conclusion
Effective Security Hub implementation requires:
- Proper configuration
- Regular monitoring
- Automated responses
- Compliance tracking
- Continuous improvement
Remember to:
- Monitor regularly
- Respond promptly
- Document changes
- Test responses
- Update configurations
Additional Resources
AWS
Security Hub
Threat Management
Monitoring