Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: added security access subfunction scanner #530

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions docs/uds/scan_modes.md
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,76 @@ For discovering available subFunctions the following error codes indicate the su

Each identifier or subFunction which responds with a different error code is considered available.

## Security Access Level Scan

This type of scan searches for available security access levels (SAs) within a UDS server on specified sessions. Security access levels are used to restrict access to certain UDS services and subfunctions. By identifying the available SAs, an attacker might gain insights into the ECU's security mechanisms and potentially exploit them to elevate privileges.

The `gallia` tool offers a class `SALevelScanner` to perform this security access level scan.

### Usage

The scanner can be invoked using the following command:

```
gallia scan uds security-access [OPTIONS]
```

**Arguments:**

* `--target <TARGET_URI>`: URI specifying the connection details to the target ECU (e.g., `isotp://vcan0?is_fd=false&is_extended=false&src_addr=0x701&dst_addr=0x700`).
* `--sessions <SESSION_ID>` (optional): Restricts the scan to specific sessions (space-separated list, e.g., `--sessions 1 2 3`). If not specified, only the current session is scanned.
* `--check-session` (optional): Additionally verifies the current session before each SA level test (only applicable if `--sessions` is used).
* `--scan-response-ids` (optional): Includes ID information in scan results for messages with the reply flag set.
* `--auto-reset` (optional): Resets the ECU with the `UDS ECU Reset` service before every request.
* `--skip <SKIP_SPEC>` (optional): Skips specific subfunctions per session. Refer to the following section for details on the skip specification format.

**Skip Specification Format**

The `--skip` argument allows you to exclude specific subfunctions from the scan on a per-session basis. The format for specifying skips is:

```
<SESSION_ID>:<SUBFUNCTION_RANGES>
```

* `<SESSION_ID>`: The diagnostic session ID (hexadecimal value).
* `<SUBFUNCTION_RANGES>`: Comma-separated list of subfunction ranges or individual subfunctions to skip. Each range or subfunction is specified as a hexadecimal value. A range can be defined using a hyphen (`-`) between the start and end subfunction values (inclusive).

Here are some examples of valid skip specifications:

* `0x01:0x0F` - Skips all subfunctions from 0x01 to 0x0F (inclusive) in session 0x01.
* `0x10-0x2F` - Skips subfunctions from 0x10 to 0x2F (inclusive) in the current session.
* `0x01:0x05,0x10` - Skips subfunctions 0x01 to 0x05 and 0x10 in the current session.
* `0x01:0x0F,0x10-0x2F:0x03` - Skips subfunctions 0x01 to 0x0F, 0x11 to 0x2F (inclusive), and 0x03 in session 0x01.

**Examples**

* Scan all available sessions for security access levels:

```
gallia scan uds security-access
```

* Scan sessions 0x01 and 0x02, verify the session before each test, and skip subfunctions 0x01 to 0x0A in session 0x01:

```
gallia scan uds security-access --sessions 0x01,0x02 --check-session --skip 0x01:0x0A
```

* Scan all sessions, include reply IDs in scan results, and reset the ECU before each request:

```
gallia scan uds security-access --scan-response-ids --auto-reset
```

### Scan Process

The `SALevelScanner` performs the following steps during a security access level scan:

1. **Parses command-line arguments:** The scanner processes the provided options and arguments to determine the target sessions, skip specifications, and other configuration settings.
2. **Iterates through sessions:**
* If no specific sessions are provided (`--sessions` not used), the scanner iterates through all available sessions. Otherwise, it focuses on the specified sessions.
3. **Session change (optional):** For each session included in the scan, the scanner attempts to establish the desired session using the `UDS SetSession` service. If session verification is enabled (`--check-session`), the scanner additionally verifies the current session before proceeding. In case of errors during session change, the scanner logs a warning and moves to the next session (if applicable).

## Memory Scan

TODO
2 changes: 2 additions & 0 deletions src/gallia/commands/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from gallia.commands.scan.uds.memory import MemoryFunctionsScanner
from gallia.commands.scan.uds.reset import ResetScanner
from gallia.commands.scan.uds.sa_dump_seeds import SASeedsDumper
from gallia.commands.scan.uds.sa_levels import SALevelScanner
from gallia.commands.scan.uds.services import ServicesScanner
from gallia.commands.scan.uds.sessions import SessionsScanner
from gallia.commands.script.vecu import VirtualECU
Expand All @@ -38,6 +39,7 @@
ReadByIdentifierPrimitive,
ResetScanner,
SASeedsDumper,
SALevelScanner,
ScanIdentifiers,
SessionsScanner,
ServicesScanner,
Expand Down
218 changes: 218 additions & 0 deletions src/gallia/commands/scan/uds/sa_levels.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
# SPDX-FileCopyrightText: AISEC Pentesting Team
#
# SPDX-License-Identifier: Apache-2.0

import reprlib
from argparse import BooleanOptionalAction, Namespace
from typing import Any

from gallia.command import UDSScanner
from gallia.log import get_logger
from gallia.services.uds import (
NegativeResponse,
UDSErrorCodes,
UDSRequestConfig,
UDSResponse,
)
from gallia.services.uds.core.exception import MalformedResponse, UDSException
from gallia.services.uds.core.utils import g_repr
from gallia.utils import ParseSkips, auto_int

logger = get_logger("gallia.scan.sa_levels")


class SALevelScanner(UDSScanner):
"""
This class implements a scanner for Security Access Levels (SA Levels) within the Unified Diagnostic Service (UDS) protocol.

It allows scanning for available security access levels on a UDS Server in specified sessions.

**Methods:**

* `main(self, args: Namespace) -> None` (async):
* The main entry point for the security access level scan.
* See docstring for details.
* `perform_scan(self, args: Namespace, session: int | None = None) -> dict[int, Any]` (async):
* Performs a security access level scan for a specific session and returns the results.
* See docstring for details.
"""

COMMAND = "security-access"
SHORT_HELP = "scan available security access levels"
EPILOG = "https://fraunhofer-aisec.github.io/gallia/uds/scan_modes.html#security-access-level-scan"

def configure_parser(self) -> None:
self.parser.add_argument(
"--sessions",
nargs="*",
type=auto_int,
default=None,
help="Set list of sessions to be tested; current if None",
)
self.parser.add_argument(
"--check-session",
action="store_true",
default=False,
help="check current session; only takes affect if --sessions is given",
)
self.parser.add_argument(
"--scan-response-ids",
default=False,
action=BooleanOptionalAction,
help="Include IDs in scan with reply flag set",
)
self.parser.add_argument(
"--auto-reset",
action="store_true",
default=False,
help="Reset ECU with UDS ECU Reset before every request",
)
self.parser.add_argument(
"--skip",
nargs="+",
default={},
type=str,
action=ParseSkips,
help="""
The subfunctions to be skipped per session.
A session specific skip is given by <session id>:<subfunctions>
where <subfunctions> is a comma separated list of single subfunctions or subfunction ranges using a dash.
Examples:
- 0x01:0xf3
- 0x10-0x2f
- 0x01:0xf3,0x10-0x2f
Multiple session specific skips are separated by space.
Only takes affect if --sessions is given.
""",
)

async def main(self, args: Namespace) -> None: # TODO: method identical to services_scan, unify?
"""
The main entry point for the security access level scan.

Performs the following steps:
* Parses command-line arguments.
* Iterates through specified sessions or the default session (0).
* For each session:
* Attempts to change to the session.
* Performs a security access level scan for all subfunctions.
* Leaves the session.
* Logs the scan results.

Args:
args (Namespace): The parsed command-line arguments.
"""

self.result: list[tuple[int, int]] = []
self.ecu.max_retry = 1
found: dict[int, dict[int, Any]] = {}

if args.sessions is None:
found[0] = await self.perform_scan(args)
else:
sessions = [s for s in args.sessions if s not in args.skip or args.skip[s] is not None]
logger.info(f"testing sessions {g_repr(sessions)}")

# TODO: Unified shortened output necessary here
logger.info(f"skipping subfunctions {reprlib.repr(args.skip)}")

for session in sessions:
logger.info(f"Changing to session {g_repr(session)}")
try:
resp: UDSResponse = await self.ecu.set_session(
session, UDSRequestConfig(tags=["preparation"])
)
except (
UDSException,
RuntimeError,
) as e: # FIXME why catch RuntimeError?
logger.warning(
f"Could not complete session change to {g_repr(session)}: {g_repr(e)}; skipping session"
)
continue
if isinstance(resp, NegativeResponse):
logger.warning(
f"Could not complete session change to {g_repr(session)}: {resp}; skipping session"
)
continue

logger.result(f"scanning in session {g_repr(session)}")

found[session] = await self.perform_scan(args, session)

await self.ecu.leave_session(session, sleep=args.power_cycle_sleep)

for key, value in found.items():
logger.result(f"Available SA levels in session 0x{key:02X}:")
for subfunc, response in value.items():
self.result.append((key, subfunc))
logger.result(f" SA Level [{g_repr(subfunc)}]: {response}")

async def perform_scan(self, args: Namespace, session: None | int = None) -> dict[int, Any]:
"""
Performs a security access level scan for a specific session and returns the results.

Scans all subfunctions (except explicitly skipped ones) with different payload lengths
to test ECU behavior.

Args:
args (Namespace): The parsed command-line arguments.
session (int, optional): The session to scan. Defaults to None (default session).

Returns:
dict[int, Any]: A dictionary containing the scan results for each subfunction.
Keys are subfunction IDs, values are the corresponding UDSResponse objects.
"""

result: dict[int, Any] = {}

# First subfunction is 0x01
subfunc = 0x00
while subfunc < 0x7F:
subfunc += 1
if subfunc % 2 == 0: # Scanning only odd subfunctions (requests), even are responses
continue

if subfunc & 0x40 and not args.scan_response_ids:
continue

if session in args.skip and subfunc in args.skip[session]:
logger.info(f"{g_repr(subfunc)}: skipped")
continue

if session is not None and args.check_session:
if not await self.ecu.check_and_set_session(session):
logger.error(
f"Aborting scan on session {g_repr(session)}; current subfunc was {g_repr(subfunc)}"
)
break

for length_payload in [0, 1, 2, 3, 5]:
try:
resp = await self.ecu.security_access_request_seed(security_access_type = subfunc, security_access_data_record = bytes(length_payload), suppress_response = False, config=UDSRequestConfig(tags=["ANALYZE"]))
except TimeoutError:
logger.info(f"{g_repr(subfunc)}: timeout")
continue
except MalformedResponse as e:
logger.warning(f"{g_repr(subfunc)}: {e!r} occurred, this needs to be investigated!")
continue

if isinstance(resp, NegativeResponse) and resp.response_code in [
UDSErrorCodes.serviceNotSupported,
UDSErrorCodes.serviceNotSupportedInActiveSession,
]:
logger.info(f"{g_repr(subfunc)}: not supported [{resp}]")
break

if isinstance(resp, NegativeResponse) and resp.response_code in [
UDSErrorCodes.incorrectMessageLengthOrInvalidFormat,
UDSErrorCodes.subFunctionNotSupported,
UDSErrorCodes.subFunctionNotSupportedInActiveSession,
]:
continue

logger.result(f"SA Level {g_repr(subfunc)}: available in session {g_repr(session)}: {resp}")
result[subfunc] = resp
break

return result