diff --git a/baddns/cli.py b/baddns/cli.py index eeb2870..f756a01 100644 --- a/baddns/cli.py +++ b/baddns/cli.py @@ -72,10 +72,12 @@ def validate_modules(arg_value, pattern=re.compile(r"^[a-zA-Z0-9_]+(,[a-zA-Z0-9_ return arg_value -async def execute_module(ModuleClass, target, custom_nameservers, signatures, silent=False): +async def execute_module(ModuleClass, target, custom_nameservers, signatures, silent=False, direct_mode=False): findings = None try: - module_instance = ModuleClass(target, custom_nameservers=custom_nameservers, signatures=signatures, cli=True) + module_instance = ModuleClass( + target, custom_nameservers=custom_nameservers, signatures=signatures, cli=True, direct_mode=direct_mode + ) except BadDNSSignatureException as e: log.error(f"Error loading signatures: {e}") raise BadDNSCLIException(f"Error loading signatures: {e}") @@ -126,6 +128,7 @@ async def _main(): ) parser.add_argument("-d", "--debug", action="store_true", help="Enable debug logging") + parser.add_argument("-D", "--direct", action="store_true", help="Enable direct mode") parser.add_argument("target", nargs="?", type=validate_target, help="subdomain to analyze") args = parser.parse_args() @@ -154,15 +157,26 @@ async def _main(): # Get all available modules all_modules = get_all_modules() - # If the user provided the -m or --modules argument, filter the modules accordingly - if args.modules: - included_module_names = [name.strip().upper() for name in args.modules.split(",")] - modules_to_execute = [module for module in all_modules if module.name.upper() in included_module_names] - else: - modules_to_execute = all_modules # Default to all modules if -m is not provided - log.info( - f"Running with all modules [{', '.join([module.name for module in modules_to_execute])}] (-m to specify)" + direct_mode = False + + # if direct mode was specified, only the CNAME module will run + if args.direct: + log.warning( + "Direct mode specified. Only the CNAME module is enabled. Positive results may not be immediately exploitable without corresponding DNS records pointing to it (e.g., CNAME), or some other external resource which may try to interact with it" ) + modules_to_execute = [module for module in all_modules if module.name.upper() == "CNAME"] + direct_mode = True + + else: + # If the user provided the -m or --modules argument, filter the modules accordingly + if args.modules: + included_module_names = [name.strip().upper() for name in args.modules.split(",")] + modules_to_execute = [module for module in all_modules if module.name.upper() in included_module_names] + else: + modules_to_execute = all_modules # Default to all modules if -m is not provided + log.info( + f"Running with all modules [{', '.join([module.name for module in modules_to_execute])}] (-m to specify)" + ) custom_signatures = None if args.custom_signatures: @@ -177,7 +191,9 @@ async def _main(): signatures = load_signatures(signatures_dir=custom_signatures) for ModuleClass in modules_to_execute: - await execute_module(ModuleClass, args.target, custom_nameservers, signatures, silent=silent) + await execute_module( + ModuleClass, args.target, custom_nameservers, signatures, silent=silent, direct_mode=direct_mode + ) def main(): diff --git a/baddns/lib/dnsmanager.py b/baddns/lib/dnsmanager.py index 6ddb31f..b7771e4 100644 --- a/baddns/lib/dnsmanager.py +++ b/baddns/lib/dnsmanager.py @@ -116,6 +116,7 @@ async def do_resolve(self, target, rdatatype): result_cname = r[0] cname_chain.append(result_cname) target = result_cname + try: r = self.process_answer(await self.dns_client.resolve(target, "CNAME"), "CNAME") if len(r) == 0: diff --git a/baddns/lib/whoismanager.py b/baddns/lib/whoismanager.py index 43b3550..adc6fea 100644 --- a/baddns/lib/whoismanager.py +++ b/baddns/lib/whoismanager.py @@ -15,14 +15,18 @@ def __init__(self, target): async def dispatchWHOIS(self): ext = tldextract.extract(self.target) - log.debug(f"Extracted base domain [{ext.registered_domain}] from [{self.target}]") - log.debug(f"Submitting WHOIS query for {ext.registered_domain}") + if ext.registered_domain == "" or ext.registered_domain == None: + registered_domain = self.target + else: + registered_domain = ext.registered_domain + log.debug(f"Extracted base domain [{registered_domain}] from [{self.target}]") + log.debug(f"Submitting WHOIS query for {registered_domain}") try: - w = await asyncio.to_thread(whois.whois, ext.registered_domain, quiet=True) - log.debug(f"Got response to whois request for {ext.registered_domain}") + w = await asyncio.to_thread(whois.whois, registered_domain, quiet=True) + log.debug(f"Got response to whois request for {registered_domain}") self.whois_result = {"type": "response", "data": w} except whois.parser.PywhoisError as e: - log.debug(f"Got PywhoisError for whois request for {ext.registered_domain}") + log.debug(f"Got PywhoisError for whois request for {registered_domain}") self.whois_result = {"type": "error", "data": str(e)} except Exception as e: log.debug(f"Got unknown error from whois: {str(e)}") diff --git a/tests/cli_test.py b/tests/cli_test.py index 44da484..64fbf3f 100644 --- a/tests/cli_test.py +++ b/tests/cli_test.py @@ -64,7 +64,7 @@ def test_cli_cname_http(monkeypatch, capsys, mocker, httpx_mock, configure_mock_ "bad.dns", ], ) - mock_data = {"bad.dns": {"CNAME": ["baddns.bigcartel.com"]}, "baddns.bigcartel.com": {"A": "127.0.0.1"}} + mock_data = {"bad.dns": {"CNAME": ["baddns.bigcartel.com"]}, "baddns.bigcartel.com": {"A": ["127.0.0.1"]}} mock_resolver = configure_mock_resolver(mock_data) mocker.patch.object(dns.asyncresolver, "Resolver", return_value=mock_resolver) @@ -78,3 +78,29 @@ def test_cli_cname_http(monkeypatch, capsys, mocker, httpx_mock, configure_mock_ captured = capsys.readouterr() assert "Vulnerable!" in captured.out assert "Bigcartel Takeover Detection" in captured.out + + +def test_cli_direct(monkeypatch, capsys, mocker, httpx_mock, configure_mock_resolver): + monkeypatch.setattr( + "sys.argv", + [ + "python", + "--direct", + "bad.dns", + ], + ) + mock_data = {"bad.dns": {"A": ["127.0.0.1"]}} + mock_resolver = configure_mock_resolver(mock_data) + mocker.patch.object(dns.asyncresolver, "Resolver", return_value=mock_resolver) + + httpx_mock.add_response( + url="http://bad.dns/", + status_code=200, + text="The specified bucket does not exist", + ) + + cli.main() + captured = capsys.readouterr() + assert "Direct mode specified. Only the CNAME module is enabled" in captured.err + assert "Vulnerable!" in captured.out + assert "AWS Bucket Takeover Detection" in captured.out