How to Use AWS Security Hub for Centralized Threat Management
Security

How to Use AWS Security Hub for Centralized Threat Management

A comprehensive guide to implementing and leveraging AWS Security Hub for centralized security monitoring and threat management.

February 8, 2024
DevHub Team
6 min read

Learn how to implement AWS Security Hub for centralized security management, automated security checks, and compliance monitoring across AWS accounts. This comprehensive guide covers setup, integration, and best practices.

AWS Security Hub Architecture

graph TB subgraph "Security Hub" SH["Security Hub"] Findings["Findings"] Insights["Insights"] Controls["Security Controls"] end subgraph "Security Services" GD["GuardDuty"] Inspector["Inspector"] Macie["Macie"] IAM["IAM Access Analyzer"] end subgraph "Response" EventBridge["EventBridge"] Lambda["Lambda"] SNS["SNS"] end GD --> SH Inspector --> SH Macie --> SH IAM --> SH SH --> Findings Findings --> Insights Controls --> Findings Findings --> EventBridge EventBridge --> Lambda Lambda --> SNS style SH fill:#3b82f6,stroke:#2563eb,color:white style Findings fill:#3b82f6,stroke:#2563eb,color:white style Insights fill:#3b82f6,stroke:#2563eb,color:white style Controls fill:#3b82f6,stroke:#2563eb,color:white style GD fill:#f1f5f9,stroke:#64748b style Inspector fill:#f1f5f9,stroke:#64748b style Macie fill:#f1f5f9,stroke:#64748b style IAM fill:#f1f5f9,stroke:#64748b style EventBridge fill:#f1f5f9,stroke:#64748b style Lambda fill:#f1f5f9,stroke:#64748b style SNS fill:#f1f5f9,stroke:#64748b

Understanding AWS Security Hub

AWS Security Hub provides:

  1. Centralized Security Management: Single pane of glass for security findings
  2. Automated Security Checks: Continuous monitoring and assessment
  3. Compliance Monitoring: Track compliance with security standards
  4. Integration: Works with AWS security services and third-party tools
  5. Automated Response: Trigger actions based on security findings

Implementation Guide

1. Initial Setup

Enable Security Hub using AWS SDK:

// Security Hub service import { SecurityHubClient, EnableSecurityHubCommand, EnableStandardsCommand, GetEnabledStandardsCommand, BatchEnableStandardsCommand, StandardsSubscriptionRequest } from '@aws-sdk/client-securityhub'; interface SecurityHubConfig { region: string; standards: string[]; } class SecurityHubService { private client: SecurityHubClient; constructor(config: SecurityHubConfig) { this.client = new SecurityHubClient({ region: config.region }); } async enableSecurityHub(): Promise<void> { try { const command = new EnableSecurityHubCommand({ EnableDefaultStandards: true, Tags: { Environment: 'Production', Service: 'SecurityHub' } }); await this.client.send(command); console.log('Security Hub enabled successfully'); } catch (error) { console.error('Error enabling Security Hub:', error); throw error; } } async enableSecurityStandards(standards: StandardsSubscriptionRequest[]): Promise<void> { try { const command = new BatchEnableStandardsCommand({ StandardsSubscriptionRequests: standards }); await this.client.send(command); console.log('Security standards enabled successfully'); } catch (error) { console.error('Error enabling security standards:', error); throw error; } } async getEnabledStandards(): Promise<void> { try { const command = new GetEnabledStandardsCommand({}); const response = await this.client.send(command); console.log('Enabled standards:', response.StandardsSubscriptions); } catch (error) { console.error('Error getting enabled standards:', error); throw error; } } } // Usage example async function main() { const securityHub = new SecurityHubService({ region: 'us-west-2', standards: [ 'arn:aws:securityhub:::ruleset/cis-aws-foundations-benchmark/v/1.2.0', 'arn:aws:securityhub:us-west-2::standards/aws-foundational-security-best-practices/v/1.0.0' ] }); // Enable Security Hub await securityHub.enableSecurityHub(); // Enable security standards const standards: StandardsSubscriptionRequest[] = [ { StandardsArn: 'arn:aws:securityhub:::ruleset/cis-aws-foundations-benchmark/v/1.2.0', EnabledRegions: ['us-west-2'] }, { StandardsArn: 'arn:aws:securityhub:us-west-2::standards/aws-foundational-security-best-practices/v/1.0.0', EnabledRegions: ['us-west-2'] } ]; await securityHub.enableSecurityStandards(standards); // Get enabled standards await securityHub.getEnabledStandards(); }

2. Processing Findings

Process Security Hub findings:

# Security Hub findings processor import boto3 import json from datetime import datetime, timezone from typing import Dict, List, Optional class SecurityHubProcessor: def __init__(self, region: str): self.client = boto3.client('securityhub', region_name=region) self.sns = boto3.client('sns', region_name=region) def get_findings(self, filters: Optional[Dict] = None) -> List[Dict]: """Get Security Hub findings with optional filters.""" try: if filters is None: filters = { 'RecordState': [{'Value': 'ACTIVE', 'Comparison': 'EQUALS'}], 'WorkflowStatus': [{'Value': 'NEW', 'Comparison': 'EQUALS'}] } response = self.client.get_findings(Filters=filters) return response['Findings'] except Exception as e: print(f"Error getting findings: {str(e)}") return [] def update_finding_status(self, finding_id: str, status: str) -> bool: """Update the workflow status of a finding.""" try: self.client.update_findings( FindingIdentifiers=[ { 'Id': finding_id, 'ProductArn': finding_id.split('/')[0] } ], WorkflowStatus=status ) return True except Exception as e: print(f"Error updating finding status: {str(e)}") return False def create_insight(self, name: str, filters: Dict, group_by_attribute: str) -> str: """Create a custom insight based on findings.""" try: response = self.client.create_insight( Name=name, Filters=filters, GroupByAttribute=group_by_attribute ) return response['InsightArn'] except Exception as e: print(f"Error creating insight: {str(e)}") return '' def process_high_severity_findings(self, sns_topic_arn: str) -> None: """Process high severity findings and send notifications.""" filters = { 'SeverityLabel': [{'Value': 'HIGH', 'Comparison': 'EQUALS'}], 'RecordState': [{'Value': 'ACTIVE', 'Comparison': 'EQUALS'}], 'WorkflowStatus': [{'Value': 'NEW', 'Comparison': 'EQUALS'}] } findings = self.get_findings(filters) for finding in findings: # Extract relevant information finding_id = finding['Id'] title = finding['Title'] description = finding['Description'] resource_id = finding['Resources'][0]['Id'] # Create notification message message = { 'finding_id': finding_id, 'title': title, 'description': description, 'resource_id': resource_id, 'severity': 'HIGH', 'timestamp': datetime.now(timezone.utc).isoformat() } # Send notification self.sns.publish( TopicArn=sns_topic_arn, Message=json.dumps(message), Subject='High Severity Security Finding' ) # Update finding status self.update_finding_status(finding_id, 'NOTIFIED') def create_compliance_report(self) -> Dict: """Generate a compliance report based on findings.""" try: # Get compliance-related findings filters = { 'ComplianceStatus': [{'Value': 'FAILED', 'Comparison': 'EQUALS'}], 'RecordState': [{'Value': 'ACTIVE', 'Comparison': 'EQUALS'}] } findings = self.get_findings(filters) # Aggregate findings by standard standards_summary = {} for finding in findings: standard = finding.get('Compliance', {}).get('SecurityControlId', 'Unknown') if standard not in standards_summary: standards_summary[standard] = { 'failed': 0, 'resources': set() } standards_summary[standard]['failed'] += 1 for resource in finding['Resources']: standards_summary[standard]['resources'].add(resource['Id']) # Format report report = { 'generated_at': datetime.now(timezone.utc).isoformat(), 'total_findings': len(findings), 'standards_summary': { standard: { 'failed_controls': info['failed'], 'affected_resources': len(info['resources']) } for standard, info in standards_summary.items() } } return report except Exception as e: print(f"Error generating compliance report: {str(e)}") return {} # Usage example def main(): processor = SecurityHubProcessor('us-west-2') # Process high severity findings processor.process_high_severity_findings( 'arn:aws:sns:us-west-2:123456789012:security-alerts' ) # Create custom insight filters = { 'SeverityLabel': [{'Value': 'HIGH', 'Comparison': 'EQUALS'}], 'ResourceType': [{'Value': 'AwsIamUser', 'Comparison': 'EQUALS'}] } processor.create_insight( 'High Severity IAM Findings', filters, 'ResourceId' ) # Generate compliance report report = processor.create_compliance_report() print(json.dumps(report, indent=2)) if __name__ == '__main__': main()

Security Hub Findings Flow

graph TB subgraph "Finding Sources" GD["GuardDuty Finding"] Inspector["Inspector Finding"] Custom["Custom Finding"] end subgraph "Processing" Ingest["Ingest Finding"] Enrich["Enrich Data"] Analyze["Analyze Severity"] end subgraph "Response" High["High Severity"] Medium["Medium Severity"] Low["Low Severity"] end GD --> Ingest Inspector --> Ingest Custom --> Ingest Ingest --> Enrich Enrich --> Analyze Analyze --> High Analyze --> Medium Analyze --> Low High --> Alert["Send Alert"] Medium --> Ticket["Create Ticket"] Low --> Log["Log Finding"] style GD fill:#3b82f6,stroke:#2563eb,color:white style Inspector fill:#3b82f6,stroke:#2563eb,color:white style Custom fill:#3b82f6,stroke:#2563eb,color:white style Ingest fill:#f1f5f9,stroke:#64748b style Enrich fill:#f1f5f9,stroke:#64748b style Analyze fill:#f1f5f9,stroke:#64748b style High fill:#f1f5f9,stroke:#64748b style Medium fill:#f1f5f9,stroke:#64748b style Low fill:#f1f5f9,stroke:#64748b style Alert fill:#f1f5f9,stroke:#64748b style Ticket fill:#f1f5f9,stroke:#64748b style Log fill:#f1f5f9,stroke:#64748b

Best Practices

1. Security Score

  • Monitor security score regularly
  • Track score trends
  • Address findings promptly
  • Document improvements
  • Regular assessments

2. Compliance

  • Enable relevant standards
  • Regular compliance checks
  • Document exceptions
  • Track remediation
  • Maintain evidence

3. Integration

  • Enable all relevant services
  • Configure automated responses
  • Set up notifications
  • Monitor integration health
  • Regular testing

4. Monitoring

  • Set up dashboards
  • Configure alerts
  • Track metrics
  • Regular reviews
  • Incident response

Conclusion

Effective Security Hub implementation requires:

  1. Proper configuration
  2. Regular monitoring
  3. Automated responses
  4. Compliance tracking
  5. Continuous improvement

Remember to:

  • Monitor regularly
  • Respond promptly
  • Document changes
  • Test responses
  • Update configurations

Additional Resources

  1. AWS Security Hub Documentation
  2. Security Standards
  3. Integration Guide
  4. Best Practices
  5. API Reference
AWS
Security Hub
Threat Management
Monitoring