Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix CAN Filter to deal with exended ids #185

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions docs/transports.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,6 @@ isotp://can0?src_addr=0x6f4&dst_addr=0x654&rx_ext_address=0xf4&ext_address=0x54&

## can-raw

* `src_addr` (required): The ISOTP source address as int.
* `dst_addr` (required): The ISOTP destination address as int.
* `is_extended` (optional): Use extended CAN identifiers.
* `is_fd` (optional): Use CAN-FD frames.

Example:
Expand Down
144 changes: 61 additions & 83 deletions src/gallia/transports/can.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,16 @@ class CANMessage(Message): # type: ignore
CANFD_BRS = 0x01
CANFD_ESI = 0x02

@property
def arbitration_id_repr(self) -> str:
if self.is_extended_id:
return f"{self.arbitration_id:08x}"
return f"{self.arbitration_id:03x}"

@property
def msg_repr(self) -> str:
return f"{self.arbitration_id_repr}#{self.data.hex()}"

def _compose_arbitration_id(self) -> int:
can_id = self.arbitration_id
if self.is_extended_id:
Expand Down Expand Up @@ -269,20 +279,12 @@ def unpack(cls, frame: bytes) -> CANMessage:
_CAN_RAW_SPEC_TYPE = TypedDict(
"_CAN_RAW_SPEC_TYPE",
{
"src_addr": Optional[int],
"dst_addr": Optional[int],
"is_extended": bool,
"is_fd": bool,
"bind": bool,
},
)

spec_can_raw = {
"src_addr": (_int_spec(None), False),
"dst_addr": (_int_spec(None), False),
"is_extended": (_bool_spec(False), False),
"is_fd": (_bool_spec(False), False),
"bind": (_bool_spec(False), False),
}


Expand All @@ -293,13 +295,10 @@ class RawCANTransport(BaseTransport, scheme="can-raw", spec=spec_can_raw):
def __init__(self, target: TargetURI) -> None:
super().__init__(target)
self.args = cast(_CAN_RAW_SPEC_TYPE, self._args)
self.connected = False

assert target.hostname is not None, "empty interface"
self.interface = target.hostname
self._sock: s.socket
self.src_addr: int
self.dst_addr: int

async def connect(self, timeout: Optional[float] = None) -> None:
self._sock = s.socket(s.PF_CAN, s.SOCK_RAW, s.CAN_RAW)
Expand All @@ -308,111 +307,90 @@ async def connect(self, timeout: Optional[float] = None) -> None:
self._sock.setsockopt(s.SOL_CAN_RAW, s.CAN_RAW_FD_FRAMES, 1)
self._sock.setblocking(False)

if self.args["bind"]:
self.bind()

def set_filter(self, can_ids: list[int], inv_filter: bool = False) -> None:
if not can_ids:
def set_filter(self, can_ids: list[dict], inv_filter: bool = False) -> None:
if len(can_ids) == 0:
return
filter_mask = s.CAN_EFF_MASK if self.args["is_extended"] else s.CAN_SFF_MASK
data = b""
for can_id in can_ids:
id_ = can_id["arbitration_id"]
filter_mask = s.CAN_EFF_MASK if can_id["is_extended_id"] else s.CAN_SFF_MASK
if inv_filter:
can_id |= self.CAN_INV_FILTER
data += struct.pack("@II", can_id, filter_mask)
id_ |= self.CAN_INV_FILTER
data += struct.pack("@II", id_, filter_mask)
self._sock.setsockopt(s.SOL_CAN_RAW, s.CAN_RAW_FILTER, data)
if inv_filter:
self._sock.setsockopt(s.SOL_CAN_RAW, s.CAN_RAW_JOIN_FILTERS, 1)

def bind(self) -> None:
if self.args["src_addr"] is None or self.args["dst_addr"] is None:
raise RuntimeError("no src_addr/dst_addr set")

self.set_filter([self.args["src_addr"]])
self.src_addr = self.args["src_addr"]
self.dst_addr = self.args["dst_addr"]
self.connected = True

async def read(
self, timeout: Optional[float] = None, tags: Optional[list[str]] = None
) -> bytes:
if not self.connected or not self.src_addr:
raise RuntimeError("transport is not connected; set bind=true")
_, data = await self.recvfrom(timeout, tags)
return data
async def flush_receiver(self, timeout: float = 1.0) -> None:
while True:
try:
await self.read_frame(timeout)
except asyncio.TimeoutError:
break

async def write(
async def read_frame(
self,
data: bytes,
timeout: Optional[float] = None,
tags: Optional[list[str]] = None,
) -> int:
if not self.connected or not self.dst_addr:
raise RuntimeError("transport is not connected; set bind=true")
return await self.sendto(data, self.dst_addr, timeout=timeout, tags=tags)
) -> CANMessage:
loop = asyncio.get_running_loop()
can_frame = await asyncio.wait_for(
loop.sock_recv(self._sock, self.BUFSIZE), timeout
)
msg = CANMessage.unpack(can_frame)

self.logger.log_read(msg.msg_repr, tags=tags)
return msg

async def sendto(
async def write_frame(
self,
data: bytes,
dst: int,
msg: CANMessage,
timeout: Optional[float] = None,
tags: Optional[list[str]] = None,
) -> int:
msg = CANMessage(
arbitration_id=dst,
data=data,
is_extended_id=self.args["is_extended"],
is_fd=self.args["is_fd"],
check=True,
)
if self.args["is_extended"]:
self.logger.log_write(f"{dst:08x}#{data.hex()}", tags=tags)
else:
self.logger.log_write(f"{dst:03x}#{data.hex()}", tags=tags)
) -> None:
self.logger.log_write(msg.msg_repr, tags=tags)

loop = asyncio.get_running_loop()
await asyncio.wait_for(loop.sock_sendall(self._sock, msg.pack()), timeout)
return len(data)

async def recvfrom(
self, timeout: Optional[float] = None, tags: Optional[list[str]] = None
) -> tuple[int, bytes]:
loop = asyncio.get_running_loop()
can_frame = await asyncio.wait_for(
loop.sock_recv(self._sock, self.BUFSIZE), timeout
)
msg = CANMessage.unpack(can_frame)
async def read(
self,
timeout: Optional[float] = None,
tags: Optional[list[str]] = None,
) -> bytes:
pass

if msg.is_extended_id:
self.logger.log_read(
f"{msg.arbitration_id:08x}#{msg.data.hex()}", tags=tags
)
else:
self.logger.log_read(
f"{msg.arbitration_id:03x}#{msg.data.hex()}", tags=tags
)
return msg.arbitration_id, msg.data
async def write(
self,
data: bytes,
timeout: Optional[float] = None,
tags: Optional[list[str]] = None,
) -> int:
pass

async def close(self) -> None:
pass
self._sock.close()

async def reconnect(self, timeout: Optional[float] = None) -> None:
pass

async def get_idle_traffic(self, sniff_time: float) -> list[int]:
async def get_idle_traffic(self, sniff_time: float) -> list[dict]:
"""Listen to traffic on the bus and return list of IDs
which are seen in the specified period of time.
The output of this function can be used as input to set_filter.
"""
addr_idle: list[int] = list()
idle_msgs: list[dict] = []
t1 = time.time()
while time.time() - t1 < sniff_time:
try:
addr, _ = await self.recvfrom(timeout=1)
if addr not in addr_idle:
self.logger.log_info(f"Received a message from {addr:03x}")
addr_idle.append(addr)
msg = await self.read_frame(timeout=1)
msg_dict = {
"is_extended_id": msg.is_extended_id,
"arbitration_id": msg.arbitration_id,
}
if msg_dict not in idle_msgs:
self.logger.log_info(f"Received a message {msg.msg_repr}")
idle_msgs.append(msg_dict)
except asyncio.TimeoutError:
continue
addr_idle.sort()
return addr_idle
return idle_msgs
3 changes: 1 addition & 2 deletions src/gallia/udscan/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from gallia.penlab import Dumpcap, PowerSupply, PowerSupplyURI
from gallia.penlog import Logger
from gallia.transports.base import BaseTransport, TargetURI
from gallia.transports.can import ISOTPTransport, RawCANTransport
from gallia.transports.can import ISOTPTransport
from gallia.transports.doip import DoIPTransport
from gallia.transports.tcp import TCPLineSepTransport
from gallia.uds.ecu import ECU
Expand Down Expand Up @@ -59,7 +59,6 @@ def __init__(self, *args: Any, **kwargs: Any) -> None:
def load_transport(target: TargetURI) -> BaseTransport:
transports = [
ISOTPTransport,
RawCANTransport,
DoIPTransport,
TCPLineSepTransport,
]
Expand Down
47 changes: 28 additions & 19 deletions src/gallia/udscan/scanner/find_can_ids.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from typing import Optional

from gallia.transports.base import TargetURI
from gallia.transports.can import ISOTPTransport, RawCANTransport
from gallia.transports.can import ISOTPTransport, RawCANTransport, CANMessage
from gallia.uds.core.service import UDSRequest, NegativeResponse
from gallia.uds.core.client import UDSClient
from gallia.udscan.core import DiscoveryScanner
Expand Down Expand Up @@ -55,6 +55,11 @@ def add_parser(self) -> None:
default=0.01,
help="set sleeptime between loop iterations",
)
self.parser.add_argument(
"--extended-ids",
action="store_true",
help="use extended can identifiers",
)
self.parser.add_argument(
"--extended-addr",
action="store_true",
Expand Down Expand Up @@ -110,7 +115,7 @@ async def query_description(self, target_list: list[TargetURI], did: int) -> Non
if isinstance(resp, NegativeResponse):
self.logger.log_summary(f"could not read did: {resp}")
else:
self.logger.log_summary(f"response was: {resp}")
self.logger.log_summary(f"response was: {resp.data_records}")
except Exception as e:
self.logger.log_summary(f"reading description failed: {g_repr(e)}")

Expand Down Expand Up @@ -155,28 +160,34 @@ async def main(self, args: Namespace) -> None:

sniff_time: int = args.sniff_time
self.logger.log_summary(f"Recording idle bus communication for {sniff_time}s")
addr_idle = await transport.get_idle_traffic(sniff_time)
idle_msgs = await transport.get_idle_traffic(sniff_time)

self.logger.log_summary(f"Found {len(addr_idle)} CAN Addresses on idle Bus")
transport.set_filter(addr_idle, inv_filter=True)
self.logger.log_summary(f"Found {len(idle_msgs)} CAN Messages on idle Bus")
transport.set_filter(idle_msgs, inv_filter=True)
await transport.flush_receiver(2)

req = UDSRequest.parse_dynamic(args.pdu)
pdu = self.build_isotp_frame(req, padding=args.padding)

for ID in range(args.start, args.stop + 1):
await asyncio.sleep(args.sleep)

dst_addr = args.tester_addr if args.extended_addr else ID
arbitration_id = args.tester_addr if args.extended_addr else ID
if args.extended_addr:
pdu = self.build_isotp_frame(req, ID, padding=args.padding)

self.logger.log_info(f"Testing ID {can_id_repr(ID)}")
is_broadcast = False

await transport.sendto(pdu, timeout=0.1, dst=dst_addr)
msg = CANMessage(
arbitration_id=arbitration_id,
data=pdu,
is_extended_id=args.extended_ids,
)
await transport.write_frame(msg, timeout=0.1)
try:
addr, _ = await transport.recvfrom(timeout=0.1)
if addr == ID:
msg = await transport.read_frame(timeout=0.1)
if msg.arbitration_id == ID:
self.logger.log_info(
f"The same CAN ID {can_id_repr(ID)} answered. Skipping…"
)
Expand All @@ -188,12 +199,12 @@ async def main(self, args: Namespace) -> None:
# The recv buffer needs to be flushed to avoid
# wrong results...
try:
new_addr, _ = await transport.recvfrom(timeout=0.1)
if new_addr != addr:
new_msg = await transport.read_frame(timeout=0.1)
if new_msg.arbitration_id != msg.arbitration_id:
is_broadcast = True
self.logger.log_summary(
f"seems that broadcast was triggered on CAN ID {can_id_repr(ID)}, "
f"got answer from {can_id_repr(new_addr)}"
f"got answer from {can_id_repr(new_msg.arbitration_id)}"
)
else:
self.logger.log_info(
Expand All @@ -203,26 +214,24 @@ async def main(self, args: Namespace) -> None:
if is_broadcast:
self.logger.log_summary(
f"seems that broadcast was triggered on CAN ID {can_id_repr(ID)}, "
f"got answer from {can_id_repr(addr)}"
f"got answer from {can_id_repr(new_msg.arbitration_id)}"
)
else:
self.logger.log_summary(
f"found endpoint on CAN ID [src:dst]: {can_id_repr(ID)}:{can_id_repr(addr)}"
f"found endpoint [src:dst]: {can_id_repr(ID)}:{can_id_repr(msg.arbitration_id)}"
)
target_args = {}
target_args["is_fd"] = str(transport.args["is_fd"]).lower()
target_args["is_extended"] = str(
transport.args["is_extended"]
).lower()
target_args["is_extended"] = str(msg.is_extended_id).lower()

if args.extended_addr:
target_args["ext_address"] = hex(ID)
target_args["rx_ext_address"] = args.tester_addr & 0xFF
target_args["src_addr"] = args.tester_addr
target_args["dst_addr"] = hex(addr)
target_args["dst_addr"] = hex(msg.arbitration_id)
else:
target_args["src_addr"] = hex(ID)
target_args["dst_addr"] = hex(addr)
target_args["dst_addr"] = hex(msg.arbitration_id)

if args.padding is not None:
target_args["tx_padding"] = f"{args.padding}"
Expand Down
Loading