From ef9fdd5f7f3804eed8e8c8811b51835e14be9bcf Mon Sep 17 00:00:00 2001 From: Stefan Tatschner Date: Mon, 27 Jun 2022 09:11:44 +0200 Subject: [PATCH] fix: Adjust CAN Filter to deal with exended ids Co-authored-by: Tobias Specht --- docs/transports.md | 3 - src/gallia/transports/can.py | 144 +++++++++------------- src/gallia/udscan/core.py | 3 +- src/gallia/udscan/scanner/find_can_ids.py | 47 ++++--- src/gallia/udscan/scanner/find_xcp.py | 27 ++-- 5 files changed, 103 insertions(+), 121 deletions(-) diff --git a/docs/transports.md b/docs/transports.md index b2f473a6a..c70c97f94 100644 --- a/docs/transports.md +++ b/docs/transports.md @@ -43,9 +43,6 @@ isotp://can0?src_addr=0x6f4&dst_addr=0x654&rx_ext_address=0xf4&ext_address=0x54& ## can-raw -* `src_addr` (required): The ISOTP source address as int. -* `dst_addr` (required): The ISOTP destination address as int. -* `is_extended` (optional): Use extended CAN identifiers. * `is_fd` (optional): Use CAN-FD frames. Example: diff --git a/src/gallia/transports/can.py b/src/gallia/transports/can.py index ee2441f9e..d0507d62e 100644 --- a/src/gallia/transports/can.py +++ b/src/gallia/transports/can.py @@ -203,6 +203,16 @@ class CANMessage(Message): # type: ignore CANFD_BRS = 0x01 CANFD_ESI = 0x02 + @property + def arbitration_id_repr(self) -> str: + if self.is_extended_id: + return f"{self.arbitration_id:08x}" + return f"{self.arbitration_id:03x}" + + @property + def msg_repr(self) -> str: + return f"{self.arbitration_id_repr}#{self.data.hex()}" + def _compose_arbitration_id(self) -> int: can_id = self.arbitration_id if self.is_extended_id: @@ -269,20 +279,12 @@ def unpack(cls, frame: bytes) -> CANMessage: _CAN_RAW_SPEC_TYPE = TypedDict( "_CAN_RAW_SPEC_TYPE", { - "src_addr": Optional[int], - "dst_addr": Optional[int], - "is_extended": bool, "is_fd": bool, - "bind": bool, }, ) spec_can_raw = { - "src_addr": (_int_spec(None), False), - "dst_addr": (_int_spec(None), False), - "is_extended": (_bool_spec(False), False), "is_fd": (_bool_spec(False), False), - "bind": (_bool_spec(False), False), } @@ -293,13 +295,10 @@ class RawCANTransport(BaseTransport, scheme="can-raw", spec=spec_can_raw): def __init__(self, target: TargetURI) -> None: super().__init__(target) self.args = cast(_CAN_RAW_SPEC_TYPE, self._args) - self.connected = False assert target.hostname is not None, "empty interface" self.interface = target.hostname self._sock: s.socket - self.src_addr: int - self.dst_addr: int async def connect(self, timeout: Optional[float] = None) -> None: self._sock = s.socket(s.PF_CAN, s.SOCK_RAW, s.CAN_RAW) @@ -308,111 +307,90 @@ async def connect(self, timeout: Optional[float] = None) -> None: self._sock.setsockopt(s.SOL_CAN_RAW, s.CAN_RAW_FD_FRAMES, 1) self._sock.setblocking(False) - if self.args["bind"]: - self.bind() - - def set_filter(self, can_ids: list[int], inv_filter: bool = False) -> None: - if not can_ids: + def set_filter(self, can_ids: list[dict], inv_filter: bool = False) -> None: + if len(can_ids) == 0: return - filter_mask = s.CAN_EFF_MASK if self.args["is_extended"] else s.CAN_SFF_MASK data = b"" for can_id in can_ids: + id_ = can_id["arbitration_id"] + filter_mask = s.CAN_EFF_MASK if can_id["is_extended_id"] else s.CAN_SFF_MASK if inv_filter: - can_id |= self.CAN_INV_FILTER - data += struct.pack("@II", can_id, filter_mask) + id_ |= self.CAN_INV_FILTER + data += struct.pack("@II", id_, filter_mask) self._sock.setsockopt(s.SOL_CAN_RAW, s.CAN_RAW_FILTER, data) if inv_filter: self._sock.setsockopt(s.SOL_CAN_RAW, s.CAN_RAW_JOIN_FILTERS, 1) - def bind(self) -> None: - if self.args["src_addr"] is None or self.args["dst_addr"] is None: - raise RuntimeError("no src_addr/dst_addr set") - - self.set_filter([self.args["src_addr"]]) - self.src_addr = self.args["src_addr"] - self.dst_addr = self.args["dst_addr"] - self.connected = True - - async def read( - self, timeout: Optional[float] = None, tags: Optional[list[str]] = None - ) -> bytes: - if not self.connected or not self.src_addr: - raise RuntimeError("transport is not connected; set bind=true") - _, data = await self.recvfrom(timeout, tags) - return data + async def flush_receiver(self, timeout: float = 1.0) -> None: + while True: + try: + await self.read_frame(timeout) + except asyncio.TimeoutError: + break - async def write( + async def read_frame( self, - data: bytes, timeout: Optional[float] = None, tags: Optional[list[str]] = None, - ) -> int: - if not self.connected or not self.dst_addr: - raise RuntimeError("transport is not connected; set bind=true") - return await self.sendto(data, self.dst_addr, timeout=timeout, tags=tags) + ) -> CANMessage: + loop = asyncio.get_running_loop() + can_frame = await asyncio.wait_for( + loop.sock_recv(self._sock, self.BUFSIZE), timeout + ) + msg = CANMessage.unpack(can_frame) + + self.logger.log_read(msg.msg_repr, tags=tags) + return msg - async def sendto( + async def write_frame( self, - data: bytes, - dst: int, + msg: CANMessage, timeout: Optional[float] = None, tags: Optional[list[str]] = None, - ) -> int: - msg = CANMessage( - arbitration_id=dst, - data=data, - is_extended_id=self.args["is_extended"], - is_fd=self.args["is_fd"], - check=True, - ) - if self.args["is_extended"]: - self.logger.log_write(f"{dst:08x}#{data.hex()}", tags=tags) - else: - self.logger.log_write(f"{dst:03x}#{data.hex()}", tags=tags) + ) -> None: + self.logger.log_write(msg.msg_repr, tags=tags) loop = asyncio.get_running_loop() await asyncio.wait_for(loop.sock_sendall(self._sock, msg.pack()), timeout) - return len(data) - async def recvfrom( - self, timeout: Optional[float] = None, tags: Optional[list[str]] = None - ) -> tuple[int, bytes]: - loop = asyncio.get_running_loop() - can_frame = await asyncio.wait_for( - loop.sock_recv(self._sock, self.BUFSIZE), timeout - ) - msg = CANMessage.unpack(can_frame) + async def read( + self, + timeout: Optional[float] = None, + tags: Optional[list[str]] = None, + ) -> bytes: + pass - if msg.is_extended_id: - self.logger.log_read( - f"{msg.arbitration_id:08x}#{msg.data.hex()}", tags=tags - ) - else: - self.logger.log_read( - f"{msg.arbitration_id:03x}#{msg.data.hex()}", tags=tags - ) - return msg.arbitration_id, msg.data + async def write( + self, + data: bytes, + timeout: Optional[float] = None, + tags: Optional[list[str]] = None, + ) -> int: + pass async def close(self) -> None: - pass + self._sock.close() async def reconnect(self, timeout: Optional[float] = None) -> None: pass - async def get_idle_traffic(self, sniff_time: float) -> list[int]: + async def get_idle_traffic(self, sniff_time: float) -> list[dict]: """Listen to traffic on the bus and return list of IDs which are seen in the specified period of time. The output of this function can be used as input to set_filter. """ - addr_idle: list[int] = list() + idle_msgs: list[dict] = [] t1 = time.time() while time.time() - t1 < sniff_time: try: - addr, _ = await self.recvfrom(timeout=1) - if addr not in addr_idle: - self.logger.log_info(f"Received a message from {addr:03x}") - addr_idle.append(addr) + msg = await self.read_frame(timeout=1) + msg_dict = { + "is_extended_id": msg.is_extended_id, + "arbitration_id": msg.arbitration_id, + } + if msg_dict not in idle_msgs: + self.logger.log_info(f"Received a message {msg.msg_repr}") + idle_msgs.append(msg_dict) except asyncio.TimeoutError: continue - addr_idle.sort() - return addr_idle + return idle_msgs diff --git a/src/gallia/udscan/core.py b/src/gallia/udscan/core.py index 8d941f90a..f30f19d49 100644 --- a/src/gallia/udscan/core.py +++ b/src/gallia/udscan/core.py @@ -28,7 +28,7 @@ from gallia.penlab import Dumpcap, PowerSupply, PowerSupplyURI from gallia.penlog import Logger from gallia.transports.base import BaseTransport, TargetURI -from gallia.transports.can import ISOTPTransport, RawCANTransport +from gallia.transports.can import ISOTPTransport from gallia.transports.doip import DoIPTransport from gallia.transports.tcp import TCPLineSepTransport from gallia.uds.ecu import ECU @@ -59,7 +59,6 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: def load_transport(target: TargetURI) -> BaseTransport: transports = [ ISOTPTransport, - RawCANTransport, DoIPTransport, TCPLineSepTransport, ] diff --git a/src/gallia/udscan/scanner/find_can_ids.py b/src/gallia/udscan/scanner/find_can_ids.py index e0c00fec7..a9cd3b3c5 100644 --- a/src/gallia/udscan/scanner/find_can_ids.py +++ b/src/gallia/udscan/scanner/find_can_ids.py @@ -8,7 +8,7 @@ from typing import Optional from gallia.transports.base import TargetURI -from gallia.transports.can import ISOTPTransport, RawCANTransport +from gallia.transports.can import ISOTPTransport, RawCANTransport, CANMessage from gallia.uds.core.service import UDSRequest, NegativeResponse from gallia.uds.core.client import UDSClient from gallia.udscan.core import DiscoveryScanner @@ -55,6 +55,11 @@ def add_parser(self) -> None: default=0.01, help="set sleeptime between loop iterations", ) + self.parser.add_argument( + "--extended-ids", + action="store_true", + help="use extended can identifiers", + ) self.parser.add_argument( "--extended-addr", action="store_true", @@ -110,7 +115,7 @@ async def query_description(self, target_list: list[TargetURI], did: int) -> Non if isinstance(resp, NegativeResponse): self.logger.log_summary(f"could not read did: {resp}") else: - self.logger.log_summary(f"response was: {resp}") + self.logger.log_summary(f"response was: {resp.data_records}") except Exception as e: self.logger.log_summary(f"reading description failed: {g_repr(e)}") @@ -155,10 +160,11 @@ async def main(self, args: Namespace) -> None: sniff_time: int = args.sniff_time self.logger.log_summary(f"Recording idle bus communication for {sniff_time}s") - addr_idle = await transport.get_idle_traffic(sniff_time) + idle_msgs = await transport.get_idle_traffic(sniff_time) - self.logger.log_summary(f"Found {len(addr_idle)} CAN Addresses on idle Bus") - transport.set_filter(addr_idle, inv_filter=True) + self.logger.log_summary(f"Found {len(idle_msgs)} CAN Messages on idle Bus") + transport.set_filter(idle_msgs, inv_filter=True) + await transport.flush_receiver(2) req = UDSRequest.parse_dynamic(args.pdu) pdu = self.build_isotp_frame(req, padding=args.padding) @@ -166,17 +172,22 @@ async def main(self, args: Namespace) -> None: for ID in range(args.start, args.stop + 1): await asyncio.sleep(args.sleep) - dst_addr = args.tester_addr if args.extended_addr else ID + arbitration_id = args.tester_addr if args.extended_addr else ID if args.extended_addr: pdu = self.build_isotp_frame(req, ID, padding=args.padding) self.logger.log_info(f"Testing ID {can_id_repr(ID)}") is_broadcast = False - await transport.sendto(pdu, timeout=0.1, dst=dst_addr) + msg = CANMessage( + arbitration_id=arbitration_id, + data=pdu, + is_extended_id=args.extended_ids, + ) + await transport.write_frame(msg, timeout=0.1) try: - addr, _ = await transport.recvfrom(timeout=0.1) - if addr == ID: + msg = await transport.read_frame(timeout=0.1) + if msg.arbitration_id == ID: self.logger.log_info( f"The same CAN ID {can_id_repr(ID)} answered. Skipping…" ) @@ -188,12 +199,12 @@ async def main(self, args: Namespace) -> None: # The recv buffer needs to be flushed to avoid # wrong results... try: - new_addr, _ = await transport.recvfrom(timeout=0.1) - if new_addr != addr: + new_msg = await transport.read_frame(timeout=0.1) + if new_msg.arbitration_id != msg.arbitration_id: is_broadcast = True self.logger.log_summary( f"seems that broadcast was triggered on CAN ID {can_id_repr(ID)}, " - f"got answer from {can_id_repr(new_addr)}" + f"got answer from {can_id_repr(new_msg.arbitration_id)}" ) else: self.logger.log_info( @@ -203,26 +214,24 @@ async def main(self, args: Namespace) -> None: if is_broadcast: self.logger.log_summary( f"seems that broadcast was triggered on CAN ID {can_id_repr(ID)}, " - f"got answer from {can_id_repr(addr)}" + f"got answer from {can_id_repr(new_msg.arbitration_id)}" ) else: self.logger.log_summary( - f"found endpoint on CAN ID [src:dst]: {can_id_repr(ID)}:{can_id_repr(addr)}" + f"found endpoint [src:dst]: {can_id_repr(ID)}:{can_id_repr(msg.arbitration_id)}" ) target_args = {} target_args["is_fd"] = str(transport.args["is_fd"]).lower() - target_args["is_extended"] = str( - transport.args["is_extended"] - ).lower() + target_args["is_extended"] = str(msg.is_extended_id).lower() if args.extended_addr: target_args["ext_address"] = hex(ID) target_args["rx_ext_address"] = args.tester_addr & 0xFF target_args["src_addr"] = args.tester_addr - target_args["dst_addr"] = hex(addr) + target_args["dst_addr"] = hex(msg.arbitration_id) else: target_args["src_addr"] = hex(ID) - target_args["dst_addr"] = hex(addr) + target_args["dst_addr"] = hex(msg.arbitration_id) if args.padding is not None: target_args["tx_padding"] = f"{args.padding}" diff --git a/src/gallia/udscan/scanner/find_xcp.py b/src/gallia/udscan/scanner/find_xcp.py index 6601002f4..10cf2ae7e 100644 --- a/src/gallia/udscan/scanner/find_xcp.py +++ b/src/gallia/udscan/scanner/find_xcp.py @@ -13,7 +13,7 @@ from gallia.penlog import Logger from gallia.transports.base import TargetURI -from gallia.transports.can import RawCANTransport +from gallia.transports.can import RawCANTransport, CANMessage from gallia.utils import bytes_repr, can_id_repr, g_repr @@ -225,33 +225,32 @@ async def test_can(self, args: Namespace) -> None: sniff_time: int = args.sniff_time self.logger.log_summary( - f"Listening to idle bus communication for {sniff_time}s..." + f"Listening to idle bus communication for {sniff_time}s…" ) addr_idle = await transport.get_idle_traffic(sniff_time) self.logger.log_summary(f"Found {len(addr_idle)} CAN Addresses on idle Bus") transport.set_filter(addr_idle, inv_filter=True) - # flush receive queue - await transport.get_idle_traffic(2) + await transport.flush_receiver() for can_id in range(0x800): self.logger.log_info(f"Testing CAN ID: {can_id_repr(can_id)}") pdu = bytes([0xFF, 0x00]) - await transport.sendto(pdu, can_id, timeout=0.1) + msg = CANMessage(arbitration_id=can_id, data=pdu) + await transport.write_frame(msg, timeout=0.1) try: while True: - master, data = await transport.recvfrom(timeout=0.1) - if data[0] == 0xFF: - msg = ( - f"Found XCP endpoint [master:slave]: CAN: {can_id_repr(master)}:{can_id_repr(can_id)} " - f"data: {bytes_repr(data)}" + msg = await transport.read_frame(timeout=0.1) + if msg.data[0] == 0xFF: + self.logger.log_summary( + f"Found endpoint [master:slave]: {msg.msg_repr} " + f"data: {bytes_repr(msg.data)}" ) - self.logger.log_summary(msg) - endpoints.append((can_id, master)) + endpoints.append((can_id, msg)) else: self.logger.log_info( - f"Received non XCP answer for CAN-ID {can_id_repr(can_id)}: {can_id_repr(master)}:" - f"{bytes_repr(data)}" + f"Received non XCP answer on {msg.msg_repr}:" + f"{bytes_repr(msg.data)}" ) except asyncio.TimeoutError: pass