Security
How to Use AWS Security Hub for Centralized Threat Management
A comprehensive guide to implementing and leveraging AWS Security Hub for centralized security monitoring and threat management.
February 8, 2024
DevHub Team
6 min read
Learn how to implement AWS Security Hub for centralized security management, automated security checks, and compliance monitoring across AWS accounts. This comprehensive guide covers setup, integration, and best practices.
AWS Security Hub Architecture
graph TB
subgraph "Security Hub"
SH["Security Hub"]
Findings["Findings"]
Insights["Insights"]
Controls["Security Controls"]
end
subgraph "Security Services"
GD["GuardDuty"]
Inspector["Inspector"]
Macie["Macie"]
IAM["IAM Access Analyzer"]
end
subgraph "Response"
EventBridge["EventBridge"]
Lambda["Lambda"]
SNS["SNS"]
end
GD --> SH
Inspector --> SH
Macie --> SH
IAM --> SH
SH --> Findings
Findings --> Insights
Controls --> Findings
Findings --> EventBridge
EventBridge --> Lambda
Lambda --> SNS
style SH fill:#3b82f6,stroke:#2563eb,color:white
style Findings fill:#3b82f6,stroke:#2563eb,color:white
style Insights fill:#3b82f6,stroke:#2563eb,color:white
style Controls fill:#3b82f6,stroke:#2563eb,color:white
style GD fill:#f1f5f9,stroke:#64748b
style Inspector fill:#f1f5f9,stroke:#64748b
style Macie fill:#f1f5f9,stroke:#64748b
style IAM fill:#f1f5f9,stroke:#64748b
style EventBridge fill:#f1f5f9,stroke:#64748b
style Lambda fill:#f1f5f9,stroke:#64748b
style SNS fill:#f1f5f9,stroke:#64748b
Understanding AWS Security Hub
AWS Security Hub provides:
- Centralized Security Management: Single pane of glass for security findings
- Automated Security Checks: Continuous monitoring and assessment
- Compliance Monitoring: Track compliance with security standards
- Integration: Works with AWS security services and third-party tools
- Automated Response: Trigger actions based on security findings
Implementation Guide
1. Initial Setup
Enable Security Hub using AWS SDK:
// Security Hub service import { SecurityHubClient, EnableSecurityHubCommand, EnableStandardsCommand, GetEnabledStandardsCommand, BatchEnableStandardsCommand, StandardsSubscriptionRequest } from '@aws-sdk/client-securityhub'; interface SecurityHubConfig { region: string; standards: string[]; } class SecurityHubService { private client: SecurityHubClient; constructor(config: SecurityHubConfig) { this.client = new SecurityHubClient({ region: config.region }); } async enableSecurityHub(): Promise<void> { try { const command = new EnableSecurityHubCommand({ EnableDefaultStandards: true, Tags: { Environment: 'Production', Service: 'SecurityHub' } }); await this.client.send(command); console.log('Security Hub enabled successfully'); } catch (error) { console.error('Error enabling Security Hub:', error); throw error; } } async enableSecurityStandards(standards: StandardsSubscriptionRequest[]): Promise<void> { try { const command = new BatchEnableStandardsCommand({ StandardsSubscriptionRequests: standards }); await this.client.send(command); console.log('Security standards enabled successfully'); } catch (error) { console.error('Error enabling security standards:', error); throw error; } } async getEnabledStandards(): Promise<void> { try { const command = new GetEnabledStandardsCommand({}); const response = await this.client.send(command); console.log('Enabled standards:', response.StandardsSubscriptions); } catch (error) { console.error('Error getting enabled standards:', error); throw error; } } } // Usage example async function main() { const securityHub = new SecurityHubService({ region: 'us-west-2', standards: [ 'arn:aws:securityhub:::ruleset/cis-aws-foundations-benchmark/v/1.2.0', 'arn:aws:securityhub:us-west-2::standards/aws-foundational-security-best-practices/v/1.0.0' ] }); // Enable Security Hub await securityHub.enableSecurityHub(); // Enable security standards const standards: StandardsSubscriptionRequest[] = [ { StandardsArn: 'arn:aws:securityhub:::ruleset/cis-aws-foundations-benchmark/v/1.2.0', EnabledRegions: ['us-west-2'] }, { StandardsArn: 'arn:aws:securityhub:us-west-2::standards/aws-foundational-security-best-practices/v/1.0.0', EnabledRegions: ['us-west-2'] } ]; await securityHub.enableSecurityStandards(standards); // Get enabled standards await securityHub.getEnabledStandards(); }
2. Processing Findings
Process Security Hub findings:
# Security Hub findings processor import boto3 import json from datetime import datetime, timezone from typing import Dict, List, Optional class SecurityHubProcessor: def __init__(self, region: str): self.client = boto3.client('securityhub', region_name=region) self.sns = boto3.client('sns', region_name=region) def get_findings(self, filters: Optional[Dict] = None) -> List[Dict]: """Get Security Hub findings with optional filters.""" try: if filters is None: filters = { 'RecordState': [{'Value': 'ACTIVE', 'Comparison': 'EQUALS'}], 'WorkflowStatus': [{'Value': 'NEW', 'Comparison': 'EQUALS'}] } response = self.client.get_findings(Filters=filters) return response['Findings'] except Exception as e: print(f"Error getting findings: {str(e)}") return [] def update_finding_status(self, finding_id: str, status: str) -> bool: """Update the workflow status of a finding.""" try: self.client.update_findings( FindingIdentifiers=[ { 'Id': finding_id, 'ProductArn': finding_id.split('/')[0] } ], WorkflowStatus=status ) return True except Exception as e: print(f"Error updating finding status: {str(e)}") return False def create_insight(self, name: str, filters: Dict, group_by_attribute: str) -> str: """Create a custom insight based on findings.""" try: response = self.client.create_insight( Name=name, Filters=filters, GroupByAttribute=group_by_attribute ) return response['InsightArn'] except Exception as e: print(f"Error creating insight: {str(e)}") return '' def process_high_severity_findings(self, sns_topic_arn: str) -> None: """Process high severity findings and send notifications.""" filters = { 'SeverityLabel': [{'Value': 'HIGH', 'Comparison': 'EQUALS'}], 'RecordState': [{'Value': 'ACTIVE', 'Comparison': 'EQUALS'}], 'WorkflowStatus': [{'Value': 'NEW', 'Comparison': 'EQUALS'}] } findings = self.get_findings(filters) for finding in findings: # Extract relevant information finding_id = finding['Id'] title = finding['Title'] description = finding['Description'] resource_id = finding['Resources'][0]['Id'] # Create notification message message = { 'finding_id': finding_id, 'title': title, 'description': description, 'resource_id': resource_id, 'severity': 'HIGH', 'timestamp': datetime.now(timezone.utc).isoformat() } # Send notification self.sns.publish( TopicArn=sns_topic_arn, Message=json.dumps(message), Subject='High Severity Security Finding' ) # Update finding status self.update_finding_status(finding_id, 'NOTIFIED') def create_compliance_report(self) -> Dict: """Generate a compliance report based on findings.""" try: # Get compliance-related findings filters = { 'ComplianceStatus': [{'Value': 'FAILED', 'Comparison': 'EQUALS'}], 'RecordState': [{'Value': 'ACTIVE', 'Comparison': 'EQUALS'}] } findings = self.get_findings(filters) # Aggregate findings by standard standards_summary = {} for finding in findings: standard = finding.get('Compliance', {}).get('SecurityControlId', 'Unknown') if standard not in standards_summary: standards_summary[standard] = { 'failed': 0, 'resources': set() } standards_summary[standard]['failed'] += 1 for resource in finding['Resources']: standards_summary[standard]['resources'].add(resource['Id']) # Format report report = { 'generated_at': datetime.now(timezone.utc).isoformat(), 'total_findings': len(findings), 'standards_summary': { standard: { 'failed_controls': info['failed'], 'affected_resources': len(info['resources']) } for standard, info in standards_summary.items() } } return report except Exception as e: print(f"Error generating compliance report: {str(e)}") return {} # Usage example def main(): processor = SecurityHubProcessor('us-west-2') # Process high severity findings processor.process_high_severity_findings( 'arn:aws:sns:us-west-2:123456789012:security-alerts' ) # Create custom insight filters = { 'SeverityLabel': [{'Value': 'HIGH', 'Comparison': 'EQUALS'}], 'ResourceType': [{'Value': 'AwsIamUser', 'Comparison': 'EQUALS'}] } processor.create_insight( 'High Severity IAM Findings', filters, 'ResourceId' ) # Generate compliance report report = processor.create_compliance_report() print(json.dumps(report, indent=2)) if __name__ == '__main__': main()
Security Hub Findings Flow
graph TB
subgraph "Finding Sources"
GD["GuardDuty Finding"]
Inspector["Inspector Finding"]
Custom["Custom Finding"]
end
subgraph "Processing"
Ingest["Ingest Finding"]
Enrich["Enrich Data"]
Analyze["Analyze Severity"]
end
subgraph "Response"
High["High Severity"]
Medium["Medium Severity"]
Low["Low Severity"]
end
GD --> Ingest
Inspector --> Ingest
Custom --> Ingest
Ingest --> Enrich
Enrich --> Analyze
Analyze --> High
Analyze --> Medium
Analyze --> Low
High --> Alert["Send Alert"]
Medium --> Ticket["Create Ticket"]
Low --> Log["Log Finding"]
style GD fill:#3b82f6,stroke:#2563eb,color:white
style Inspector fill:#3b82f6,stroke:#2563eb,color:white
style Custom fill:#3b82f6,stroke:#2563eb,color:white
style Ingest fill:#f1f5f9,stroke:#64748b
style Enrich fill:#f1f5f9,stroke:#64748b
style Analyze fill:#f1f5f9,stroke:#64748b
style High fill:#f1f5f9,stroke:#64748b
style Medium fill:#f1f5f9,stroke:#64748b
style Low fill:#f1f5f9,stroke:#64748b
style Alert fill:#f1f5f9,stroke:#64748b
style Ticket fill:#f1f5f9,stroke:#64748b
style Log fill:#f1f5f9,stroke:#64748b
Best Practices
1. Security Score
- Monitor security score regularly
- Track score trends
- Address findings promptly
- Document improvements
- Regular assessments
2. Compliance
- Enable relevant standards
- Regular compliance checks
- Document exceptions
- Track remediation
- Maintain evidence
3. Integration
- Enable all relevant services
- Configure automated responses
- Set up notifications
- Monitor integration health
- Regular testing
4. Monitoring
- Set up dashboards
- Configure alerts
- Track metrics
- Regular reviews
- Incident response
Conclusion
Effective Security Hub implementation requires:
- Proper configuration
- Regular monitoring
- Automated responses
- Compliance tracking
- Continuous improvement
Remember to:
- Monitor regularly
- Respond promptly
- Document changes
- Test responses
- Update configurations
Additional Resources
AWS
Security Hub
Threat Management
Monitoring