Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python 3.10 migration #1706

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/env
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
FLUTTER=3.24.2
PYVER=3.12.6
PYVER=3.12.7
50 changes: 27 additions & 23 deletions helper/helper/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

from queue import Queue
from threading import Thread, Event
from typing import Callable, Dict, List
from typing import Callable

import json
import logging
Expand Down Expand Up @@ -52,20 +52,21 @@ def _handle_incoming(event, recv, error, cmd_queue):
if not request:
break
try:
kind = request["kind"]
if kind == "signal":
# Cancel signals are handled here, the rest forwarded
if request["status"] == "cancel":
event.set()
else:
# Ignore other signals
logger.error("Unhandled signal: %r", request)
elif kind == "command":
cmd_queue.join() # Wait for existing command to complete
event.clear() # Reset event for next command
cmd_queue.put(request)
else:
error("invalid-command", "Unsupported request type")
match request["kind"]:
case "signal":
# Cancel signals are handled here, the rest forwarded
if request["status"] == "cancel":
logger.debug("Got cancel signal!")
event.set()
else:
# Ignore other signals
logger.error("Unhandled signal: %r", request)
case "command":
cmd_queue.join() # Wait for existing command to complete
event.clear() # Reset event for next command
cmd_queue.put(request)
case _:
error("invalid-command", "Unsupported request type")
except KeyError as e:
error("invalid-command", str(e))
except RpcException as e:
Expand All @@ -78,14 +79,14 @@ def _handle_incoming(event, recv, error, cmd_queue):


def process(
send: Callable[[Dict], None],
recv: Callable[[], Dict],
handler: Callable[[str, List, Dict, Event, Callable[[str], None]], RpcResponse],
send: Callable[[dict], None],
recv: Callable[[], dict],
handler: Callable[[str, list, dict, Event, Callable[[str], None]], RpcResponse],
) -> None:
def error(status: str, message: str, body: Dict = {}):
def error(status: str, message: str, body: dict = {}):
send(dict(kind="error", status=status, message=message, body=body))

def signal(status: str, body: Dict = {}):
def signal(status: str, body: dict = {}):
send(dict(kind="signal", status=status, body=body))

def success(response: RpcResponse):
Expand Down Expand Up @@ -121,8 +122,8 @@ def success(response: RpcResponse):


def run_rpc(
send: Callable[[Dict], None],
recv: Callable[[], Dict],
send: Callable[[dict], None],
recv: Callable[[], dict],
) -> None:
process(send, recv, RootNode())

Expand Down Expand Up @@ -171,7 +172,10 @@ def send(data):
def recv():
line = b""
while not line.endswith(b"\n"):
chunk = sock.recv(1024)
try:
chunk = sock.recv(1024)
except ConnectionError:
return None
if not chunk:
return None
line += chunk
Expand Down
18 changes: 16 additions & 2 deletions helper/helper/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from yubikit.core import InvalidPinError
from functools import partial

import inspect
import logging

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -127,7 +128,20 @@ def __call__(self, action, target, params, event, signal, traversed=None):
action, target[1:], params, event, signal, traversed
)
elif action in self.list_actions():
response = self.get_action(action)(params, event, signal)
action_f = self.get_action(action)
args = inspect.signature(action_f).parameters
# Decode any serialized bytes parameters
for key, param in args.items():
if param.annotation in (bytes, bytes | None):
value = params.get(key, None)
if value is not None:
params[key] = decode_bytes(value)
# Add event and signal if requested
if "event" in args:
params["event"] = event
if "signal" in args:
params["signal"] = signal
response = action_f(**params)
elif action in self.list_children():
traversed += [action]
response = self.get_child(action)(
Expand Down Expand Up @@ -224,7 +238,7 @@ def get_child(self, name):
return self._child

@action
def get(self, params, event, signal):
def get(self):
return dict(
data=self.get_data(),
actions=self.list_actions(),
Expand Down
26 changes: 13 additions & 13 deletions helper/helper/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePublicKey
from hashlib import sha256
from dataclasses import asdict
from typing import Mapping, Tuple
from typing import Mapping

import os
import sys
Expand Down Expand Up @@ -113,19 +113,19 @@ def nfc(self):
return self._readers

@action
def diagnose(self, *ignored):
def diagnose(self):
return dict(diagnostics=get_diagnostics())

@action(closes_child=False)
def logging(self, params, event, signal):
level = LOG_LEVEL[params["level"].upper()]
set_log_level(level)
logger.info(f"Log level set to: {level.name}")
def logging(self, level: str):
lvl = LOG_LEVEL[level.upper()]
set_log_level(lvl)
logger.info(f"Log level set to: {lvl.name}")
return dict()

@action(closes_child=False)
def qr(self, params, event, signal):
return dict(result=scan_qr(params.get("image")))
def qr(self, image: str | None = None):
return dict(result=scan_qr(image))


def _id_from_fingerprint(fp):
Expand All @@ -142,7 +142,7 @@ def __init__(self):
self._reader_mapping = {}

@action(closes_child=False)
def scan(self, *ignored):
def scan(self):
return self.list_children()

def list_children(self):
Expand Down Expand Up @@ -173,7 +173,7 @@ def create_child(self, name):

class _ScanDevices:
def __init__(self):
self._state: Tuple[Mapping[PID, int], int] = ({}, 0)
self._state: tuple[Mapping[PID, int], int] = ({}, 0)
self._caching = False

def __call__(self):
Expand Down Expand Up @@ -225,7 +225,7 @@ def close(self):
super().close()

@action(closes_child=False)
def scan(self, *ignored):
def scan(self):
return self.get_data()

def get_data(self):
Expand Down Expand Up @@ -460,8 +460,8 @@ def _refresh_data(self):
return dict(present=False, status="no-card")

@action(closes_child=False)
def get(self, params, event, signal):
return super().get(params, event, signal)
def get(self):
return super().get()

@child
def ccid(self):
Expand Down
29 changes: 11 additions & 18 deletions helper/helper/fido.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ def _prepare_reset_usb(device, event, signal):
raise TimeoutException()

@action
def reset(self, params, event, signal):
def reset(self, event, signal):
target = _ctap_id(self.ctap)
device = self.ctap.device
if isinstance(device, CtapPcscDevice):
Expand Down Expand Up @@ -206,8 +206,7 @@ def reset(self, params, event, signal):
return RpcResponse(dict(), ["device_info", "device_closed"])

@action(condition=lambda self: self._info.options["clientPin"])
def unlock(self, params, event, signal):
pin = params.pop("pin")
def unlock(self, pin: str):
permissions = ClientPin.PERMISSION(0)
if CredentialManagement.is_supported(self._info):
permissions |= ClientPin.PERMISSION.CREDENTIAL_MGMT
Expand All @@ -227,25 +226,21 @@ def unlock(self, params, event, signal):
return _handle_pin_error(e, self.client_pin)

@action
def set_pin(self, params, event, signal):
def set_pin(self, new_pin: str, pin: str | None = None):
has_pin = self.ctap.get_info().options["clientPin"]
try:
if has_pin:
self.client_pin.change_pin(
params.pop("pin"),
params.pop("new_pin"),
)
assert pin # nosec
self.client_pin.change_pin(pin, new_pin)
else:
self.client_pin.set_pin(
params.pop("new_pin"),
)
self.client_pin.set_pin(new_pin)
self._info = self.ctap.get_info()
return RpcResponse(dict(), ["device_info"])
except CtapError as e:
return _handle_pin_error(e, self.client_pin)

@action(condition=lambda self: Config.is_supported(self._info))
def enable_ep_attestation(self, params, event, signal):
def enable_ep_attestation(self):
if self._info.options["clientPin"] and not self._token:
raise AuthRequiredException()
config = Config(self.ctap, self.client_pin.protocol, self._token)
Expand Down Expand Up @@ -343,7 +338,7 @@ def get_data(self):
return self.data

@action
def delete(self, params, event, signal):
def delete(self):
self.credman.delete_cred(self.data["credential_id"])
self.refresh_rps()
return dict()
Expand Down Expand Up @@ -378,8 +373,7 @@ def create_child(self, name):
return super().create_child(name)

@action
def add(self, params, event, signal):
name = params.get("name", None)
def add(self, event, signal, name: str | None = None):
enroller = self.bio.enroll()
template_id = None
while template_id is None:
Expand Down Expand Up @@ -410,15 +404,14 @@ def get_data(self):
return dict(template_id=self.template_id, name=self.name)

@action
def rename(self, params, event, signal):
name = params.pop("name")
def rename(self, name: str):
self.bio.set_name(self.template_id, name)
self.name = name
self.refresh()
return dict()

@action
def delete(self, params, event, signal):
def delete(self):
self.bio.remove_enrollment(self.template_id)
self.refresh()
return dict()
53 changes: 38 additions & 15 deletions helper/helper/management.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,23 @@
# limitations under the License.

from .base import RpcResponse, RpcNode, action
from yubikit.core import require_version, NotSupportedError, TRANSPORT, Connection
from yubikit.core import (
require_version,
NotSupportedError,
TRANSPORT,
USB_INTERFACE,
Connection,
)
from yubikit.core.smartcard import SmartCardConnection
from yubikit.core.otp import OtpConnection
from yubikit.core.fido import FidoConnection
from yubikit.management import ManagementSession, DeviceConfig, Mode, CAPABILITY
from yubikit.management import (
ManagementSession,
DeviceConfig,
Mode,
CAPABILITY,
DEVICE_FLAG,
)
from ykman.device import list_all_devices
from dataclasses import asdict
from time import sleep
Expand Down Expand Up @@ -75,15 +87,21 @@ def _await_reboot(self, serial, usb_enabled):
logger.warning("Timed out waiting for device")

@action
def configure(self, params, event, signal):
reboot = params.pop("reboot", False)
cur_lock_code = bytes.fromhex(params.pop("cur_lock_code", "")) or None
new_lock_code = bytes.fromhex(params.pop("new_lock_code", "")) or None
def configure(
self,
reboot: bool = False,
cur_lock_code: bytes | None = None,
new_lock_code: bytes | None = None,
enabled_capabilities: dict = {},
auto_eject_timeout: int | None = None,
challenge_response_timeout: int | None = None,
device_flags: int | None = None,
):
config = DeviceConfig(
params.pop("enabled_capabilities", {}),
params.pop("auto_eject_timeout", None),
params.pop("challenge_response_timeout", None),
params.pop("device_flags", None),
enabled_capabilities,
auto_eject_timeout,
challenge_response_timeout,
DEVICE_FLAG(device_flags) if device_flags else None,
)
serial = self.session.read_device_info().serial
self.session.write_device_config(config, reboot, cur_lock_code, new_lock_code)
Expand All @@ -95,17 +113,22 @@ def configure(self, params, event, signal):
return RpcResponse(dict(), flags)

@action
def set_mode(self, params, event, signal):
def set_mode(
self,
interfaces: int,
challenge_response_timeout: int = 0,
auto_eject_timeout: int | None = None,
):
self.session.set_mode(
Mode(params["interfaces"]),
params.pop("challenge_response_timeout", 0),
params.pop("auto_eject_timeout"),
Mode(USB_INTERFACE(interfaces)),
challenge_response_timeout,
auto_eject_timeout,
)
return dict()

@action(
condition=lambda self: issubclass(self._connection_type, SmartCardConnection)
)
def device_reset(self, params, event, signal):
def device_reset(self):
self.session.device_reset()
return RpcResponse(dict(), ["device_info"])
Loading
Loading