From cace6e899801946056327666148db3f02cdaf7cd Mon Sep 17 00:00:00 2001 From: Raman Gupta <7243222+raman325@users.noreply.github.com> Date: Fri, 5 Apr 2024 00:54:28 -0400 Subject: [PATCH] Add support for virtual locks (#103) --- README.md | 12 +-- .../lock_code_manager/__init__.py | 11 ++- .../lock_code_manager/helpers.py | 1 - .../lock_code_manager/manifest.json | 9 ++- .../lock_code_manager/providers/__init__.py | 2 + .../lock_code_manager/providers/_base.py | 49 ++++++----- .../lock_code_manager/providers/virtual.py | 81 +++++++++++++++++++ .../lock_code_manager/providers/zwave_js.py | 5 +- tests/_base/test_provider.py | 2 +- tests/common.py | 2 +- tests/virtual/__init__.py | 1 + tests/virtual/test_provider.py | 55 +++++++++++++ 12 files changed, 190 insertions(+), 40 deletions(-) create mode 100644 custom_components/lock_code_manager/providers/virtual.py create mode 100644 tests/virtual/__init__.py create mode 100644 tests/virtual/test_provider.py diff --git a/README.md b/README.md index 9ce0ee8..9cdc9b1 100644 --- a/README.md +++ b/README.md @@ -12,7 +12,8 @@ Features: Locks from the following integrations are currently supported: -- Z-Wave JS +- Z-Wave +- [Virtual](https://github.com/twrecked/hass-virtual) custom integration. See the [Wiki page on this integration](https://github.com/raman325/lock_code_manager/wiki/Virtual-integration) for more details on why it was built and how it works. The code was written to make it (I think) easy to add support for locks in other integrations. Check the [Wiki](https://github.com/raman325/lock_code_manager/wiki) if you want to learn more about that and take a stab at it. Contributors welcome! @@ -20,10 +21,11 @@ The code was written to make it (I think) easy to add support for locks in other The best way to install this integration is via HACS. -1. Add this repository as a custom integration repository in HACS -2. Go to Settings > Devices & Services > Add Integration -3. Select Lock Code Manager -4. Follow the prompts - additional information about the configuration options is available in the Wiki +1. Set up your locks as entities to your Home Assistant instance through the corresponding integration (e.g. Z-Wave) +2. Add this repository as a custom integration repository in HACS +3. Go to Settings > Devices & Services > Add Integration +4. Select Lock Code Manager +5. Follow the prompts - additional information about the configuration options are available in the Wiki ## Learn More diff --git a/custom_components/lock_code_manager/__init__.py b/custom_components/lock_code_manager/__init__.py index d02b6e1..fae28e1 100644 --- a/custom_components/lock_code_manager/__init__.py +++ b/custom_components/lock_code_manager/__init__.py @@ -213,7 +213,10 @@ async def async_setup_entry(hass: HomeAssistant, config_entry: ConfigEntry) -> b async def async_unload_lock( - hass: HomeAssistant, config_entry: ConfigEntry, lock_entity_id: str | None = None + hass: HomeAssistant, + config_entry: ConfigEntry, + lock_entity_id: str | None = None, + remove_permanently: bool = False, ): """Unload lock.""" hass_data = hass.data[DOMAIN] @@ -227,7 +230,7 @@ async def async_unload_lock( for entry in hass.config_entries.async_entries(DOMAIN) ): lock: BaseLock = hass_data[CONF_LOCKS].pop(lock_entity_id) - await lock.async_unload() + await lock.async_unload(remove_permanently) hass_data[entry_id][CONF_LOCKS].pop(lock_entity_id) @@ -355,7 +358,9 @@ async def async_update_listener(hass: HomeAssistant, config_entry: ConfigEntry) dev_reg.async_update_device( lock.device_entry.id, remove_config_entry_id=entry_id ) - await async_unload_lock(hass, config_entry, lock_entity_id) + await async_unload_lock( + hass, config_entry, lock_entity_id=lock_entity_id, remove_permanently=True + ) # Notify any existing entities that additional locks have been added then create # slot PIN sensors for the new locks diff --git a/custom_components/lock_code_manager/helpers.py b/custom_components/lock_code_manager/helpers.py index 745014e..ece6ab5 100644 --- a/custom_components/lock_code_manager/helpers.py +++ b/custom_components/lock_code_manager/helpers.py @@ -29,7 +29,6 @@ def async_create_lock_instance( lock_entry = ent_reg.async_get(lock_entity_id) assert lock_entry lock_config_entry = hass.config_entries.async_get_entry(lock_entry.config_entry_id) - assert lock_config_entry lock = INTEGRATIONS_CLASS_MAP[lock_entry.platform]( hass, dev_reg, ent_reg, lock_config_entry, lock_entry ) diff --git a/custom_components/lock_code_manager/manifest.json b/custom_components/lock_code_manager/manifest.json index 70ecb70..2466aaf 100644 --- a/custom_components/lock_code_manager/manifest.json +++ b/custom_components/lock_code_manager/manifest.json @@ -6,11 +6,16 @@ "frontend", "hacs", "lovelace", + "virtual", "zwave_js" ], - "codeowners": ["@raman325"], + "codeowners": [ + "@raman325" + ], "config_flow": true, - "dependencies": ["http"], + "dependencies": [ + "http" + ], "documentation": "https://github.com/raman325/lock_code_manager", "iot_class": "local_polling", "issue_tracker": "https://github.com/raman325/lock_code_manager/issues", diff --git a/custom_components/lock_code_manager/providers/__init__.py b/custom_components/lock_code_manager/providers/__init__.py index 60a526a..6973cd4 100644 --- a/custom_components/lock_code_manager/providers/__init__.py +++ b/custom_components/lock_code_manager/providers/__init__.py @@ -7,8 +7,10 @@ from __future__ import annotations from ._base import BaseLock +from .virtual import VirtualLock from .zwave_js import ZWaveJSLock INTEGRATIONS_CLASS_MAP: dict[str, type[BaseLock]] = { + "virtual": VirtualLock, "zwave_js": ZWaveJSLock, } diff --git a/custom_components/lock_code_manager/providers/_base.py b/custom_components/lock_code_manager/providers/_base.py index 74a0316..c4c0300 100644 --- a/custom_components/lock_code_manager/providers/_base.py +++ b/custom_components/lock_code_manager/providers/_base.py @@ -47,7 +47,7 @@ class BaseLock: hass: HomeAssistant = field(repr=False) dev_reg: dr.DeviceRegistry = field(repr=False) ent_reg: er.EntityRegistry = field(repr=False) - lock_config_entry: ConfigEntry = field(repr=False) + lock_config_entry: ConfigEntry | None = field(repr=False) lock: er.RegistryEntry device_entry: dr.DeviceEntry | None = field(default=None, init=False) @@ -94,13 +94,13 @@ async def async_setup(self) -> None: """Set up lock.""" await self.hass.async_add_executor_job(self.setup) - def unload(self) -> None: + def unload(self, remove_permanently: bool) -> None: """Unload lock.""" pass - async def async_unload(self) -> None: + async def async_unload(self, remove_permanently: bool) -> None: """Unload lock.""" - await self.hass.async_add_executor_job(self.unload) + await self.hass.async_add_executor_job(self.unload, remove_permanently) def is_connection_up(self) -> bool: """Return whether connection to lock is up.""" @@ -297,24 +297,23 @@ def async_fire_code_slot_event( elif isinstance(source_data, dict): extra_data = source_data - self.hass.bus.async_fire( - EVENT_LOCK_STATE_CHANGED, - event_data={ - ATTR_NOTIFICATION_SOURCE: notification_source, - ATTR_ENTITY_ID: lock_entity_id, - ATTR_DEVICE_ID: lock_device_id, - ATTR_LCM_CONFIG_ENTRY_ID: config_entry_id, - ATTR_LOCK_CONFIG_ENTRY_ID: self.lock_config_entry.entry_id, - ATTR_STATE: ( - state.state - if (state := self.hass.states.get(lock_entity_id)) - else "" - ), - ATTR_ACTION_TEXT: action_text, - ATTR_CODE_SLOT: code_slot or 0, - ATTR_CODE_SLOT_NAME: name_state.state if name_state else "", - ATTR_FROM: from_state, - ATTR_TO: to_state, - ATTR_EXTRA_DATA: extra_data, - }, - ) + event_data = { + ATTR_NOTIFICATION_SOURCE: notification_source, + ATTR_ENTITY_ID: lock_entity_id, + ATTR_DEVICE_ID: lock_device_id, + ATTR_LCM_CONFIG_ENTRY_ID: config_entry_id, + ATTR_STATE: ( + state.state if (state := self.hass.states.get(lock_entity_id)) else "" + ), + ATTR_ACTION_TEXT: action_text, + ATTR_CODE_SLOT: code_slot or 0, + ATTR_CODE_SLOT_NAME: name_state.state if name_state else "", + ATTR_FROM: from_state, + ATTR_TO: to_state, + ATTR_EXTRA_DATA: extra_data, + } + + if self.lock_config_entry: + event_data[ATTR_LOCK_CONFIG_ENTRY_ID] = self.lock_config_entry.entry_id + + self.hass.bus.async_fire(EVENT_LOCK_STATE_CHANGED, event_data=event_data) diff --git a/custom_components/lock_code_manager/providers/virtual.py b/custom_components/lock_code_manager/providers/virtual.py new file mode 100644 index 0000000..ed42133 --- /dev/null +++ b/custom_components/lock_code_manager/providers/virtual.py @@ -0,0 +1,81 @@ +"""Module for Virtual locks.""" + +from __future__ import annotations + +from dataclasses import dataclass, field +import logging +from typing import TypedDict + +from homeassistant.exceptions import HomeAssistantError +from homeassistant.helpers.storage import Store + +from ..const import DOMAIN +from ._base import BaseLock + +_LOGGER = logging.getLogger(__name__) + + +class CodeSlotData(TypedDict): + """Type for code slot data.""" + + code: int | str + name: str | None + + +@dataclass(repr=False, eq=False) +class VirtualLock(BaseLock): + """Class to represent Virtual lock.""" + + _store: Store[dict[str, CodeSlotData]] = field(init=False, repr=False) + _data: dict[str, CodeSlotData] = field(default_factory=dict, init=False, repr=False) + + @property + def domain(self) -> str: + """Return integration domain.""" + return "virtual" + + async def async_setup(self) -> None: + """Set up lock.""" + self._store = Store( + self.hass, 1, f"{self.domain}_{DOMAIN}_{self.lock.entity_id}" + ) + await self.async_hard_refresh_codes() + + async def async_unload(self, remove_permanently: bool) -> None: + """Unload lock.""" + if remove_permanently: + await self._store.async_remove() + else: + await self._store.async_save(self._data) + + async def async_is_connection_up(self) -> bool: + """Return whether connection to lock is up.""" + return True + + async def async_hard_refresh_codes(self) -> None: + """ + Perform hard refresh of all codes. + + Needed for integrations where usercodes are cached and may get out of sync with + the lock. + """ + self._data = data if (data := await self._store.async_load()) else {} + + async def async_set_usercode( + self, code_slot: int, usercode: int | str, name: str | None = None + ) -> None: + """Set a usercode on a code slot.""" + self._data[str(code_slot)] = CodeSlotData(code=usercode, name=name) + + async def async_clear_usercode(self, code_slot: int) -> None: + """Clear a usercode on a code slot.""" + if str(code_slot) not in self._data: + raise HomeAssistantError(f"Code slot {code_slot} not found") + self._data.pop(str(code_slot)) + + async def async_get_usercodes(self) -> dict[int, int | str]: + """Get dictionary of code slots and usercodes.""" + return { + int(slot_num): code_slot["code"] + for slot_num, code_slot in self._data.items() + } diff --git a/custom_components/lock_code_manager/providers/zwave_js.py b/custom_components/lock_code_manager/providers/zwave_js.py index f4bdee4..971e8ce 100644 --- a/custom_components/lock_code_manager/providers/zwave_js.py +++ b/custom_components/lock_code_manager/providers/zwave_js.py @@ -30,7 +30,7 @@ ZWAVE_JS_NOTIFICATION_EVENT, ) from homeassistant.components.zwave_js.helpers import async_get_node_from_entity_id -from homeassistant.config_entries import ConfigEntryState +from homeassistant.config_entries import ConfigEntry, ConfigEntryState from homeassistant.const import ( ATTR_DEVICE_ID, ATTR_ENTITY_ID, @@ -72,6 +72,7 @@ class ZWaveJSLock(BaseLock): """Class to represent ZWave JS lock.""" + lock_config_entry: ConfigEntry = field(repr=False) _listeners: list[Callable[[], None]] = field(init=False, default_factory=list) @property @@ -135,7 +136,7 @@ async def async_setup(self) -> None: ) ) - async def async_unload(self) -> None: + async def async_unload(self, remove_permanently: bool) -> None: """Unload lock.""" for listener in self._listeners: listener() diff --git a/tests/_base/test_provider.py b/tests/_base/test_provider.py index c0712bf..cfa8be2 100644 --- a/tests/_base/test_provider.py +++ b/tests/_base/test_provider.py @@ -22,7 +22,7 @@ async def test_base(hass: HomeAssistant): er.RegistryEntry("lock.test", "blah", "blah"), ) assert await lock.async_setup() is None - assert await lock.async_unload() is None + assert await lock.async_unload(False) is None assert lock.usercode_scan_interval == timedelta(minutes=1) with pytest.raises(NotImplementedError): lock.domain diff --git a/tests/common.py b/tests/common.py index 39dda5c..954a9a8 100644 --- a/tests/common.py +++ b/tests/common.py @@ -66,7 +66,7 @@ def setup(self) -> None: {"codes": {1: "1234", 2: "5678"}, "service_calls": defaultdict(list)}, ) - def unload(self) -> None: + def unload(self, remove_permanently: bool) -> None: """Unload lock.""" self.hass.data[LOCK_DATA].pop(self.lock.entity_id) if not self.hass.data[LOCK_DATA]: diff --git a/tests/virtual/__init__.py b/tests/virtual/__init__.py new file mode 100644 index 0000000..ab79f08 --- /dev/null +++ b/tests/virtual/__init__.py @@ -0,0 +1 @@ +"""Virtual lock tests.""" diff --git a/tests/virtual/test_provider.py b/tests/virtual/test_provider.py new file mode 100644 index 0000000..32e76d3 --- /dev/null +++ b/tests/virtual/test_provider.py @@ -0,0 +1,55 @@ +"""Test the Virtual lock platform.""" + +from datetime import timedelta + +import pytest +from pytest_homeassistant_custom_component.common import MockConfigEntry + +from homeassistant.core import HomeAssistant +from homeassistant.exceptions import HomeAssistantError +from homeassistant.helpers import device_registry as dr, entity_registry as er + +from custom_components.lock_code_manager.providers.virtual import VirtualLock + + +async def test_door_lock(hass: HomeAssistant): + """Test a lock entity.""" + lock = VirtualLock( + hass, + dr.async_get(hass), + er.async_get(hass), + MockConfigEntry(), + er.RegistryEntry("lock.test", "blah", "blah"), + ) + assert await lock.async_setup() is None + assert lock.usercode_scan_interval == timedelta(minutes=1) + assert lock.domain == "virtual" + assert await lock.async_is_connection_up() + assert lock._data == {} + await lock.async_hard_refresh_codes() + assert lock._data == {} + # we should not be able to clear a usercode that does not exist + with pytest.raises(HomeAssistantError): + await lock.async_clear_usercode(1) + + # we should be able to set a usercode and see it in the data + await lock.async_set_usercode(1, 1, "test") + assert lock._data["1"] == {"code": 1, "name": "test"} + await lock.async_get_usercodes() + assert lock._data["1"] == {"code": 1, "name": "test"} + + # if we unload without removing permanently, the data should be saved + assert await lock.async_unload(False) is None + assert await lock.async_setup() is None + assert lock._data["1"] == {"code": 1, "name": "test"} + + # we can clear a valid usercode + await lock.async_set_usercode(2, 2, "test2") + assert lock._data["2"] == {"code": 2, "name": "test2"} + await lock.async_clear_usercode(2) + assert "2" not in lock._data + + # if we unload with removing permanently, the data should be removed + assert await lock.async_unload(True) is None + assert await lock.async_setup() is None + assert not lock._data