Introduction to Penetration Testing for DevOps Professionals
Security

Introduction to Penetration Testing for DevOps Professionals

A practical guide to penetration testing methodologies and tools for DevOps professionals.

February 5, 2024
DevHub Team
10 min read

Learn advanced techniques for web application penetration testing, including methodologies, tools, and best practices for identifying and exploiting vulnerabilities.

Penetration Testing Methodology

graph TB subgraph "Planning" Scope["Scope Definition"] Rules["Rules of Engagement"] Auth["Authorization"] end subgraph "Testing" Recon["Reconnaissance"] Scan["Vulnerability Scanning"] Exploit["Exploitation"] end subgraph "Analysis" Report["Reporting"] Remediation["Remediation"] Verify["Verification"] end Scope --> Rules Rules --> Auth Auth --> Recon Recon --> Scan Scan --> Exploit Exploit --> Report Report --> Remediation Remediation --> Verify style Scope fill:#3b82f6,stroke:#2563eb,color:white style Rules fill:#3b82f6,stroke:#2563eb,color:white style Auth fill:#3b82f6,stroke:#2563eb,color:white style Recon fill:#f1f5f9,stroke:#64748b style Scan fill:#f1f5f9,stroke:#64748b style Exploit fill:#f1f5f9,stroke:#64748b style Report fill:#f1f5f9,stroke:#64748b style Remediation fill:#f1f5f9,stroke:#64748b style Verify fill:#f1f5f9,stroke:#64748b

Understanding Web Application Security

Web applications face various security challenges:

  1. Input Validation: SQL Injection, XSS
  2. Authentication: Weak Passwords, Session Management
  3. Authorization: Access Control, IDOR
  4. Data Protection: Encryption, Secure Communication
  5. Configuration: Security Headers, Error Handling

Implementation Guide

1. Reconnaissance

Gather information about the target:

# recon.py import subprocess import json from typing import List, Dict class Recon: def __init__(self, target: str): self.target = target self.results: Dict[str, List[str]] = { 'subdomains': [], 'technologies': [], 'endpoints': [] } def find_subdomains(self) -> None: """Find subdomains using amass.""" try: cmd = f"amass enum -d {self.target} -json /dev/stdout" process = subprocess.Popen( cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE ) for line in process.stdout: result = json.loads(line) if 'name' in result: self.results['subdomains'].append(result['name']) except Exception as e: print(f"Error during subdomain enumeration: {e}") def detect_technologies(self) -> None: """Detect technologies using wappalyzer.""" try: for subdomain in self.results['subdomains']: cmd = f"wappalyzer https://{subdomain} --json" process = subprocess.Popen( cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE ) output, _ = process.communicate() if output: tech_data = json.loads(output) for tech in tech_data.get('technologies', []): if tech['name'] not in self.results['technologies']: self.results['technologies'].append(tech['name']) except Exception as e: print(f"Error during technology detection: {e}") def discover_endpoints(self) -> None: """Discover endpoints using gobuster.""" try: wordlist = "/usr/share/wordlists/dirbuster/directory-list-2.3-medium.txt" cmd = f"gobuster dir -u https://{self.target} -w {wordlist} -t 50 -q" process = subprocess.Popen( cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE ) output, _ = process.communicate() if output: lines = output.decode().splitlines() for line in lines: if "Status: 200" in line: endpoint = line.split()[0] self.results['endpoints'].append(endpoint) except Exception as e: print(f"Error during endpoint discovery: {e}") def run_recon(self) -> Dict[str, List[str]]: """Run all reconnaissance tasks.""" print(f"Starting reconnaissance for {self.target}") self.find_subdomains() self.detect_technologies() self.discover_endpoints() return self.results if __name__ == "__main__": target = "example.com" recon = Recon(target) results = recon.run_recon() print(json.dumps(results, indent=2))

2. Vulnerability Scanning

Create a custom vulnerability scanner:

# scanner.py import aiohttp import asyncio from typing import List, Dict, Optional import logging import re class VulnerabilityScanner: def __init__(self, target: str, endpoints: List[str]): self.target = target self.endpoints = endpoints self.vulnerabilities: List[Dict] = [] self.session: Optional[aiohttp.ClientSession] = None # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) self.logger = logging.getLogger(__name__) async def init_session(self) -> None: """Initialize aiohttp session.""" self.session = aiohttp.ClientSession() async def close_session(self) -> None: """Close aiohttp session.""" if self.session: await self.session.close() async def test_sql_injection(self, endpoint: str) -> None: """Test for SQL injection vulnerabilities.""" payloads = ["'", "1' OR '1'='1", "1; DROP TABLE users"] for payload in payloads: url = f"https://{self.target}{endpoint}?id={payload}" try: async with self.session.get(url) as response: text = await response.text() if any(error in text.lower() for error in ['sql', 'mysql', 'postgresql']): self.vulnerabilities.append({ 'type': 'SQL Injection', 'endpoint': endpoint, 'payload': payload, 'severity': 'High' }) except Exception as e: self.logger.error(f"Error testing SQL injection: {e}") async def test_xss(self, endpoint: str) -> None: """Test for Cross-Site Scripting vulnerabilities.""" payloads = [ "<script>alert(1)</script>", "<img src=x onerror=alert(1)>", "javascript:alert(1)" ] for payload in payloads: url = f"https://{self.target}{endpoint}?q={payload}" try: async with self.session.get(url) as response: text = await response.text() if payload in text: self.vulnerabilities.append({ 'type': 'Cross-Site Scripting', 'endpoint': endpoint, 'payload': payload, 'severity': 'Medium' }) except Exception as e: self.logger.error(f"Error testing XSS: {e}") async def test_ssrf(self, endpoint: str) -> None: """Test for Server-Side Request Forgery vulnerabilities.""" payloads = [ "http://localhost", "file:///etc/passwd", "http://169.254.169.254/latest/meta-data/" ] for payload in payloads: url = f"https://{self.target}{endpoint}?url={payload}" try: async with self.session.get(url) as response: text = await response.text() if any(indicator in text for indicator in ['root:', 'ami-id']): self.vulnerabilities.append({ 'type': 'SSRF', 'endpoint': endpoint, 'payload': payload, 'severity': 'High' }) except Exception as e: self.logger.error(f"Error testing SSRF: {e}") async def scan_endpoint(self, endpoint: str) -> None: """Scan a single endpoint for vulnerabilities.""" self.logger.info(f"Scanning endpoint: {endpoint}") await asyncio.gather( self.test_sql_injection(endpoint), self.test_xss(endpoint), self.test_ssrf(endpoint) ) async def run_scan(self) -> List[Dict]: """Run vulnerability scan on all endpoints.""" self.logger.info(f"Starting vulnerability scan for {self.target}") await self.init_session() try: await asyncio.gather( *[self.scan_endpoint(endpoint) for endpoint in self.endpoints] ) finally: await self.close_session() return self.vulnerabilities if __name__ == "__main__": target = "example.com" endpoints = ["/search", "/profile", "/upload"] async def main(): scanner = VulnerabilityScanner(target, endpoints) vulnerabilities = await scanner.run_scan() print(json.dumps(vulnerabilities, indent=2)) asyncio.run(main())

3. API Security Testing

Test API endpoints for vulnerabilities:

# api_tester.py import requests import json from typing import Dict, List, Optional import jwt from datetime import datetime, timedelta class APISecurityTester: def __init__(self, base_url: str): self.base_url = base_url self.session = requests.Session() self.vulnerabilities: List[Dict] = [] def test_authentication(self, endpoint: str) -> None: """Test authentication vulnerabilities.""" # Test missing authentication response = self.session.get(f"{self.base_url}{endpoint}") if response.status_code != 401: self.vulnerabilities.append({ 'type': 'Missing Authentication', 'endpoint': endpoint, 'severity': 'High', 'details': 'Endpoint accessible without authentication' }) # Test weak JWT weak_jwt = self._generate_weak_jwt() headers = {'Authorization': f'Bearer {weak_jwt}'} response = self.session.get(f"{self.base_url}{endpoint}", headers=headers) if response.status_code == 200: self.vulnerabilities.append({ 'type': 'Weak JWT', 'endpoint': endpoint, 'severity': 'High', 'details': 'Endpoint accepts weak JWT' }) def test_authorization(self, endpoint: str) -> None: """Test authorization vulnerabilities.""" # Test IDOR for user_id in range(1, 5): response = self.session.get(f"{self.base_url}{endpoint}/{user_id}") if response.status_code == 200: self.vulnerabilities.append({ 'type': 'IDOR', 'endpoint': f"{endpoint}/{user_id}", 'severity': 'High', 'details': 'Possible IDOR vulnerability' }) def test_rate_limiting(self, endpoint: str) -> None: """Test rate limiting.""" requests_count = 0 start_time = datetime.now() while (datetime.now() - start_time).seconds < 60: response = self.session.get(f"{self.base_url}{endpoint}") requests_count += 1 if requests_count > 100 and response.status_code == 200: self.vulnerabilities.append({ 'type': 'Missing Rate Limiting', 'endpoint': endpoint, 'severity': 'Medium', 'details': f'Endpoint allowed {requests_count} requests per minute' }) break def test_input_validation(self, endpoint: str) -> None: """Test input validation.""" payloads = { 'sql_injection': "' OR '1'='1", 'command_injection': "; cat /etc/passwd", 'xss': "<script>alert(1)</script>" } for attack_type, payload in payloads.items(): response = self.session.post( f"{self.base_url}{endpoint}", json={'input': payload} ) if attack_type in response.text.lower(): self.vulnerabilities.append({ 'type': f'Input Validation - {attack_type}', 'endpoint': endpoint, 'severity': 'High', 'details': f'Endpoint vulnerable to {attack_type}' }) def _generate_weak_jwt(self) -> str: """Generate a weak JWT token.""" payload = { 'sub': '1234567890', 'name': 'Test User', 'iat': datetime.utcnow(), 'exp': datetime.utcnow() + timedelta(days=1) } return jwt.encode(payload, 'weak_secret', algorithm='HS256') def run_tests(self, endpoints: List[str]) -> List[Dict]: """Run all security tests on specified endpoints.""" for endpoint in endpoints: self.test_authentication(endpoint) self.test_authorization(endpoint) self.test_rate_limiting(endpoint) self.test_input_validation(endpoint) return self.vulnerabilities if __name__ == "__main__": base_url = "https://api.example.com" endpoints = ["/users", "/orders", "/products"] tester = APISecurityTester(base_url) vulnerabilities = tester.run_tests(endpoints) print(json.dumps(vulnerabilities, indent=2))

4. Automated Security Testing

Integrate security testing into CI/CD:

# .github/workflows/security.yml name: Security Testing on: push: branches: [ main ] pull_request: branches: [ main ] schedule: - cron: '0 0 * * *' jobs: security-scan: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 - name: Set up Python uses: actions/setup-python@v4 with: python-version: '3.11' - name: Install dependencies run: | python -m pip install --upgrade pip pip install -r requirements.txt pip install safety bandit - name: Run dependency check run: safety check - name: Run SAST run: bandit -r . -f json -o sast-results.json - name: Run DAST run: | python scanner.py --target ${{ secrets.TEST_TARGET }} \ --output dast-results.json - name: Run API security tests run: | python api_tester.py --base-url ${{ secrets.API_BASE_URL }} \ --output api-results.json - name: Process results run: | python process_results.py \ --sast sast-results.json \ --dast dast-results.json \ --api api-results.json \ --output security-report.html - name: Upload security report uses: actions/upload-artifact@v3 with: name: security-report path: security-report.html - name: Check for critical vulnerabilities run: | python check_critical.py \ --report security-report.html \ --fail-on critical

5. Continuous Security Testing

Monitor for vulnerabilities continuously:

# monitor.py import asyncio import aiohttp from typing import Dict, List import logging import json from datetime import datetime import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart class SecurityMonitor: def __init__(self, config_path: str): self.config = self._load_config(config_path) self.previous_results: Dict = {} self.current_results: Dict = {} # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', filename=self.config['log_file'] ) self.logger = logging.getLogger(__name__) def _load_config(self, config_path: str) -> Dict: """Load configuration from file.""" with open(config_path) as f: return json.load(f) async def check_endpoint(self, session: aiohttp.ClientSession, endpoint: Dict) -> Dict: """Check a single endpoint for security issues.""" url = endpoint['url'] try: async with session.get(url) as response: return { 'url': url, 'status': response.status, 'headers': dict(response.headers), 'response_time': response.elapsed.total_seconds(), 'timestamp': datetime.utcnow().isoformat() } except Exception as e: self.logger.error(f"Error checking {url}: {e}") return { 'url': url, 'error': str(e), 'timestamp': datetime.utcnow().isoformat() } async def run_checks(self) -> None: """Run security checks on all endpoints.""" async with aiohttp.ClientSession() as session: tasks = [] for endpoint in self.config['endpoints']: tasks.append(self.check_endpoint(session, endpoint)) results = await asyncio.gather(*tasks) self.current_results = { result['url']: result for result in results } def analyze_results(self) -> List[Dict]: """Analyze results for security issues.""" issues = [] for url, current in self.current_results.items(): previous = self.previous_results.get(url, {}) # Check for status code changes if previous and current.get('status') != previous.get('status'): issues.append({ 'type': 'Status Change', 'url': url, 'severity': 'Medium', 'details': f"Status changed from {previous.get('status')} to {current.get('status')}" }) # Check security headers if 'headers' in current: required_headers = [ 'Strict-Transport-Security', 'X-Content-Type-Options', 'X-Frame-Options', 'Content-Security-Policy' ] for header in required_headers: if header not in current['headers']: issues.append({ 'type': 'Missing Security Header', 'url': url, 'severity': 'Medium', 'details': f"Missing {header} header" }) # Check response time anomalies if 'response_time' in current and 'response_time' in previous: threshold = self.config.get('response_time_threshold', 1.5) if current['response_time'] > previous['response_time'] * threshold: issues.append({ 'type': 'Response Time Anomaly', 'url': url, 'severity': 'Low', 'details': f"Response time increased significantly" }) return issues def send_alert(self, issues: List[Dict]) -> None: """Send email alert for security issues.""" if not issues: return msg = MIMEMultipart() msg['Subject'] = 'Security Monitoring Alert' msg['From'] = self.config['email']['from'] msg['To'] = self.config['email']['to'] body = "The following security issues were detected:\n\n" for issue in issues: body += f"Type: {issue['type']}\n" body += f"URL: {issue['url']}\n" body += f"Severity: {issue['severity']}\n" body += f"Details: {issue['details']}\n\n" msg.attach(MIMEText(body, 'plain')) with smtplib.SMTP(self.config['email']['smtp_server']) as server: server.starttls() server.login( self.config['email']['username'], self.config['email']['password'] ) server.send_message(msg) async def monitor(self) -> None: """Run continuous security monitoring.""" while True: try: await self.run_checks() issues = self.analyze_results() if issues: self.logger.warning(f"Found {len(issues)} security issues") self.send_alert(issues) self.previous_results = self.current_results.copy() await asyncio.sleep(self.config['check_interval']) except Exception as e: self.logger.error(f"Error in monitoring loop: {e}") await asyncio.sleep(60) # Wait before retrying if __name__ == "__main__": monitor = SecurityMonitor("config.json") asyncio.run(monitor.monitor())

Best Practices

  1. Methodology
  • Follow structured approach
    • Document findings
    • Maintain scope
    • Regular testing
  1. Tools
    • Keep updated
    • Validate results
  • Multiple tools
  • Custom scripts
  1. Reporting
  • Clear findings
    • Risk assessment
  • Remediation steps
    • Follow-up testing
  1. Legal
    • Authorization
    • Scope agreement
    • Data handling
    • Confidentiality

Conclusion

Effective penetration testing requires:

  1. Structured methodology
  2. Comprehensive tools
  3. Clear reporting
  4. Legal compliance
  5. Regular updates

Remember to:

  • Follow best practices
  • Document everything
  • Test thoroughly
  • Update regularly
  • Monitor continuously

Additional Resources

  1. OWASP Testing Guide
  2. Burp Suite Documentation
  3. NIST Penetration Testing Guidelines
  4. Python Security Tools
  5. Web Security Academy
Penetration Testing
Security Testing
DevSecOps
Tools