Security
Introduction to Penetration Testing for DevOps Professionals
A practical guide to penetration testing methodologies and tools for DevOps professionals.
February 5, 2024
DevHub Team
10 min read
Learn advanced techniques for web application penetration testing, including methodologies, tools, and best practices for identifying and exploiting vulnerabilities.
Penetration Testing Methodology
graph TB
subgraph "Planning"
Scope["Scope Definition"]
Rules["Rules of Engagement"]
Auth["Authorization"]
end
subgraph "Testing"
Recon["Reconnaissance"]
Scan["Vulnerability Scanning"]
Exploit["Exploitation"]
end
subgraph "Analysis"
Report["Reporting"]
Remediation["Remediation"]
Verify["Verification"]
end
Scope --> Rules
Rules --> Auth
Auth --> Recon
Recon --> Scan
Scan --> Exploit
Exploit --> Report
Report --> Remediation
Remediation --> Verify
style Scope fill:#3b82f6,stroke:#2563eb,color:white
style Rules fill:#3b82f6,stroke:#2563eb,color:white
style Auth fill:#3b82f6,stroke:#2563eb,color:white
style Recon fill:#f1f5f9,stroke:#64748b
style Scan fill:#f1f5f9,stroke:#64748b
style Exploit fill:#f1f5f9,stroke:#64748b
style Report fill:#f1f5f9,stroke:#64748b
style Remediation fill:#f1f5f9,stroke:#64748b
style Verify fill:#f1f5f9,stroke:#64748b
Understanding Web Application Security
Web applications face various security challenges:
- Input Validation: SQL Injection, XSS
- Authentication: Weak Passwords, Session Management
- Authorization: Access Control, IDOR
- Data Protection: Encryption, Secure Communication
- Configuration: Security Headers, Error Handling
Implementation Guide
1. Reconnaissance
Gather information about the target:
# recon.py import subprocess import json from typing import List, Dict class Recon: def __init__(self, target: str): self.target = target self.results: Dict[str, List[str]] = { 'subdomains': [], 'technologies': [], 'endpoints': [] } def find_subdomains(self) -> None: """Find subdomains using amass.""" try: cmd = f"amass enum -d {self.target} -json /dev/stdout" process = subprocess.Popen( cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE ) for line in process.stdout: result = json.loads(line) if 'name' in result: self.results['subdomains'].append(result['name']) except Exception as e: print(f"Error during subdomain enumeration: {e}") def detect_technologies(self) -> None: """Detect technologies using wappalyzer.""" try: for subdomain in self.results['subdomains']: cmd = f"wappalyzer https://{subdomain} --json" process = subprocess.Popen( cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE ) output, _ = process.communicate() if output: tech_data = json.loads(output) for tech in tech_data.get('technologies', []): if tech['name'] not in self.results['technologies']: self.results['technologies'].append(tech['name']) except Exception as e: print(f"Error during technology detection: {e}") def discover_endpoints(self) -> None: """Discover endpoints using gobuster.""" try: wordlist = "/usr/share/wordlists/dirbuster/directory-list-2.3-medium.txt" cmd = f"gobuster dir -u https://{self.target} -w {wordlist} -t 50 -q" process = subprocess.Popen( cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE ) output, _ = process.communicate() if output: lines = output.decode().splitlines() for line in lines: if "Status: 200" in line: endpoint = line.split()[0] self.results['endpoints'].append(endpoint) except Exception as e: print(f"Error during endpoint discovery: {e}") def run_recon(self) -> Dict[str, List[str]]: """Run all reconnaissance tasks.""" print(f"Starting reconnaissance for {self.target}") self.find_subdomains() self.detect_technologies() self.discover_endpoints() return self.results if __name__ == "__main__": target = "example.com" recon = Recon(target) results = recon.run_recon() print(json.dumps(results, indent=2))
2. Vulnerability Scanning
Create a custom vulnerability scanner:
# scanner.py import aiohttp import asyncio from typing import List, Dict, Optional import logging import re class VulnerabilityScanner: def __init__(self, target: str, endpoints: List[str]): self.target = target self.endpoints = endpoints self.vulnerabilities: List[Dict] = [] self.session: Optional[aiohttp.ClientSession] = None # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) self.logger = logging.getLogger(__name__) async def init_session(self) -> None: """Initialize aiohttp session.""" self.session = aiohttp.ClientSession() async def close_session(self) -> None: """Close aiohttp session.""" if self.session: await self.session.close() async def test_sql_injection(self, endpoint: str) -> None: """Test for SQL injection vulnerabilities.""" payloads = ["'", "1' OR '1'='1", "1; DROP TABLE users"] for payload in payloads: url = f"https://{self.target}{endpoint}?id={payload}" try: async with self.session.get(url) as response: text = await response.text() if any(error in text.lower() for error in ['sql', 'mysql', 'postgresql']): self.vulnerabilities.append({ 'type': 'SQL Injection', 'endpoint': endpoint, 'payload': payload, 'severity': 'High' }) except Exception as e: self.logger.error(f"Error testing SQL injection: {e}") async def test_xss(self, endpoint: str) -> None: """Test for Cross-Site Scripting vulnerabilities.""" payloads = [ "<script>alert(1)</script>", "<img src=x onerror=alert(1)>", "javascript:alert(1)" ] for payload in payloads: url = f"https://{self.target}{endpoint}?q={payload}" try: async with self.session.get(url) as response: text = await response.text() if payload in text: self.vulnerabilities.append({ 'type': 'Cross-Site Scripting', 'endpoint': endpoint, 'payload': payload, 'severity': 'Medium' }) except Exception as e: self.logger.error(f"Error testing XSS: {e}") async def test_ssrf(self, endpoint: str) -> None: """Test for Server-Side Request Forgery vulnerabilities.""" payloads = [ "http://localhost", "file:///etc/passwd", "http://169.254.169.254/latest/meta-data/" ] for payload in payloads: url = f"https://{self.target}{endpoint}?url={payload}" try: async with self.session.get(url) as response: text = await response.text() if any(indicator in text for indicator in ['root:', 'ami-id']): self.vulnerabilities.append({ 'type': 'SSRF', 'endpoint': endpoint, 'payload': payload, 'severity': 'High' }) except Exception as e: self.logger.error(f"Error testing SSRF: {e}") async def scan_endpoint(self, endpoint: str) -> None: """Scan a single endpoint for vulnerabilities.""" self.logger.info(f"Scanning endpoint: {endpoint}") await asyncio.gather( self.test_sql_injection(endpoint), self.test_xss(endpoint), self.test_ssrf(endpoint) ) async def run_scan(self) -> List[Dict]: """Run vulnerability scan on all endpoints.""" self.logger.info(f"Starting vulnerability scan for {self.target}") await self.init_session() try: await asyncio.gather( *[self.scan_endpoint(endpoint) for endpoint in self.endpoints] ) finally: await self.close_session() return self.vulnerabilities if __name__ == "__main__": target = "example.com" endpoints = ["/search", "/profile", "/upload"] async def main(): scanner = VulnerabilityScanner(target, endpoints) vulnerabilities = await scanner.run_scan() print(json.dumps(vulnerabilities, indent=2)) asyncio.run(main())
3. API Security Testing
Test API endpoints for vulnerabilities:
# api_tester.py import requests import json from typing import Dict, List, Optional import jwt from datetime import datetime, timedelta class APISecurityTester: def __init__(self, base_url: str): self.base_url = base_url self.session = requests.Session() self.vulnerabilities: List[Dict] = [] def test_authentication(self, endpoint: str) -> None: """Test authentication vulnerabilities.""" # Test missing authentication response = self.session.get(f"{self.base_url}{endpoint}") if response.status_code != 401: self.vulnerabilities.append({ 'type': 'Missing Authentication', 'endpoint': endpoint, 'severity': 'High', 'details': 'Endpoint accessible without authentication' }) # Test weak JWT weak_jwt = self._generate_weak_jwt() headers = {'Authorization': f'Bearer {weak_jwt}'} response = self.session.get(f"{self.base_url}{endpoint}", headers=headers) if response.status_code == 200: self.vulnerabilities.append({ 'type': 'Weak JWT', 'endpoint': endpoint, 'severity': 'High', 'details': 'Endpoint accepts weak JWT' }) def test_authorization(self, endpoint: str) -> None: """Test authorization vulnerabilities.""" # Test IDOR for user_id in range(1, 5): response = self.session.get(f"{self.base_url}{endpoint}/{user_id}") if response.status_code == 200: self.vulnerabilities.append({ 'type': 'IDOR', 'endpoint': f"{endpoint}/{user_id}", 'severity': 'High', 'details': 'Possible IDOR vulnerability' }) def test_rate_limiting(self, endpoint: str) -> None: """Test rate limiting.""" requests_count = 0 start_time = datetime.now() while (datetime.now() - start_time).seconds < 60: response = self.session.get(f"{self.base_url}{endpoint}") requests_count += 1 if requests_count > 100 and response.status_code == 200: self.vulnerabilities.append({ 'type': 'Missing Rate Limiting', 'endpoint': endpoint, 'severity': 'Medium', 'details': f'Endpoint allowed {requests_count} requests per minute' }) break def test_input_validation(self, endpoint: str) -> None: """Test input validation.""" payloads = { 'sql_injection': "' OR '1'='1", 'command_injection': "; cat /etc/passwd", 'xss': "<script>alert(1)</script>" } for attack_type, payload in payloads.items(): response = self.session.post( f"{self.base_url}{endpoint}", json={'input': payload} ) if attack_type in response.text.lower(): self.vulnerabilities.append({ 'type': f'Input Validation - {attack_type}', 'endpoint': endpoint, 'severity': 'High', 'details': f'Endpoint vulnerable to {attack_type}' }) def _generate_weak_jwt(self) -> str: """Generate a weak JWT token.""" payload = { 'sub': '1234567890', 'name': 'Test User', 'iat': datetime.utcnow(), 'exp': datetime.utcnow() + timedelta(days=1) } return jwt.encode(payload, 'weak_secret', algorithm='HS256') def run_tests(self, endpoints: List[str]) -> List[Dict]: """Run all security tests on specified endpoints.""" for endpoint in endpoints: self.test_authentication(endpoint) self.test_authorization(endpoint) self.test_rate_limiting(endpoint) self.test_input_validation(endpoint) return self.vulnerabilities if __name__ == "__main__": base_url = "https://api.example.com" endpoints = ["/users", "/orders", "/products"] tester = APISecurityTester(base_url) vulnerabilities = tester.run_tests(endpoints) print(json.dumps(vulnerabilities, indent=2))
4. Automated Security Testing
Integrate security testing into CI/CD:
# .github/workflows/security.yml name: Security Testing on: push: branches: [ main ] pull_request: branches: [ main ] schedule: - cron: '0 0 * * *' jobs: security-scan: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 - name: Set up Python uses: actions/setup-python@v4 with: python-version: '3.11' - name: Install dependencies run: | python -m pip install --upgrade pip pip install -r requirements.txt pip install safety bandit - name: Run dependency check run: safety check - name: Run SAST run: bandit -r . -f json -o sast-results.json - name: Run DAST run: | python scanner.py --target ${{ secrets.TEST_TARGET }} \ --output dast-results.json - name: Run API security tests run: | python api_tester.py --base-url ${{ secrets.API_BASE_URL }} \ --output api-results.json - name: Process results run: | python process_results.py \ --sast sast-results.json \ --dast dast-results.json \ --api api-results.json \ --output security-report.html - name: Upload security report uses: actions/upload-artifact@v3 with: name: security-report path: security-report.html - name: Check for critical vulnerabilities run: | python check_critical.py \ --report security-report.html \ --fail-on critical
5. Continuous Security Testing
Monitor for vulnerabilities continuously:
# monitor.py import asyncio import aiohttp from typing import Dict, List import logging import json from datetime import datetime import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart class SecurityMonitor: def __init__(self, config_path: str): self.config = self._load_config(config_path) self.previous_results: Dict = {} self.current_results: Dict = {} # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', filename=self.config['log_file'] ) self.logger = logging.getLogger(__name__) def _load_config(self, config_path: str) -> Dict: """Load configuration from file.""" with open(config_path) as f: return json.load(f) async def check_endpoint(self, session: aiohttp.ClientSession, endpoint: Dict) -> Dict: """Check a single endpoint for security issues.""" url = endpoint['url'] try: async with session.get(url) as response: return { 'url': url, 'status': response.status, 'headers': dict(response.headers), 'response_time': response.elapsed.total_seconds(), 'timestamp': datetime.utcnow().isoformat() } except Exception as e: self.logger.error(f"Error checking {url}: {e}") return { 'url': url, 'error': str(e), 'timestamp': datetime.utcnow().isoformat() } async def run_checks(self) -> None: """Run security checks on all endpoints.""" async with aiohttp.ClientSession() as session: tasks = [] for endpoint in self.config['endpoints']: tasks.append(self.check_endpoint(session, endpoint)) results = await asyncio.gather(*tasks) self.current_results = { result['url']: result for result in results } def analyze_results(self) -> List[Dict]: """Analyze results for security issues.""" issues = [] for url, current in self.current_results.items(): previous = self.previous_results.get(url, {}) # Check for status code changes if previous and current.get('status') != previous.get('status'): issues.append({ 'type': 'Status Change', 'url': url, 'severity': 'Medium', 'details': f"Status changed from {previous.get('status')} to {current.get('status')}" }) # Check security headers if 'headers' in current: required_headers = [ 'Strict-Transport-Security', 'X-Content-Type-Options', 'X-Frame-Options', 'Content-Security-Policy' ] for header in required_headers: if header not in current['headers']: issues.append({ 'type': 'Missing Security Header', 'url': url, 'severity': 'Medium', 'details': f"Missing {header} header" }) # Check response time anomalies if 'response_time' in current and 'response_time' in previous: threshold = self.config.get('response_time_threshold', 1.5) if current['response_time'] > previous['response_time'] * threshold: issues.append({ 'type': 'Response Time Anomaly', 'url': url, 'severity': 'Low', 'details': f"Response time increased significantly" }) return issues def send_alert(self, issues: List[Dict]) -> None: """Send email alert for security issues.""" if not issues: return msg = MIMEMultipart() msg['Subject'] = 'Security Monitoring Alert' msg['From'] = self.config['email']['from'] msg['To'] = self.config['email']['to'] body = "The following security issues were detected:\n\n" for issue in issues: body += f"Type: {issue['type']}\n" body += f"URL: {issue['url']}\n" body += f"Severity: {issue['severity']}\n" body += f"Details: {issue['details']}\n\n" msg.attach(MIMEText(body, 'plain')) with smtplib.SMTP(self.config['email']['smtp_server']) as server: server.starttls() server.login( self.config['email']['username'], self.config['email']['password'] ) server.send_message(msg) async def monitor(self) -> None: """Run continuous security monitoring.""" while True: try: await self.run_checks() issues = self.analyze_results() if issues: self.logger.warning(f"Found {len(issues)} security issues") self.send_alert(issues) self.previous_results = self.current_results.copy() await asyncio.sleep(self.config['check_interval']) except Exception as e: self.logger.error(f"Error in monitoring loop: {e}") await asyncio.sleep(60) # Wait before retrying if __name__ == "__main__": monitor = SecurityMonitor("config.json") asyncio.run(monitor.monitor())
Best Practices
- Methodology
- Follow structured approach
- Document findings
- Maintain scope
- Regular testing
- Tools
- Keep updated
- Validate results
- Multiple tools
- Custom scripts
- Reporting
- Clear findings
- Risk assessment
- Remediation steps
- Follow-up testing
- Legal
- Authorization
- Scope agreement
- Data handling
- Confidentiality
Conclusion
Effective penetration testing requires:
- Structured methodology
- Comprehensive tools
- Clear reporting
- Legal compliance
- Regular updates
Remember to:
- Follow best practices
- Document everything
- Test thoroughly
- Update regularly
- Monitor continuously
Additional Resources
Penetration Testing
Security Testing
DevSecOps
Tools