Skip to content

Commit

Permalink
Improve mqtt client
Browse files Browse the repository at this point in the history
  • Loading branch information
Andre0512 committed Mar 29, 2024
1 parent bef55f7 commit 11da4eb
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 99 deletions.
11 changes: 1 addition & 10 deletions pyhon/appliance.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import re
from datetime import datetime, timedelta
from pathlib import Path
from typing import Optional, Dict, Any, TYPE_CHECKING, List, TypeVar, overload, Callable
from typing import Optional, Dict, Any, TYPE_CHECKING, List, TypeVar, overload

from pyhon import diagnose, exceptions
from pyhon.appliances.base import ApplianceBase
Expand Down Expand Up @@ -43,7 +43,6 @@ def __init__(
self._additional_data: Dict[str, Any] = {}
self._last_update: Optional[datetime] = None
self._default_setting = HonParameter("", {}, "")
self._notify_function: Optional[Callable[[Any], None]] = None

try:
self._extra: Optional[ApplianceBase] = importlib.import_module(
Expand Down Expand Up @@ -313,11 +312,3 @@ def sync_parameter(self, main: Parameter, target: Parameter) -> None:
elif isinstance(target, HonParameterEnum):
target.values = main.values
target.value = main.value

def subscribe(self, notify_function: Callable[[Any], None]) -> None:
self._notify_function = notify_function

def notify(self) -> None:
self.sync_params_to_command("settings")
if self._notify_function:
self._notify_function(self.attributes)
7 changes: 0 additions & 7 deletions pyhon/connection/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,10 @@
from typing import Dict, Optional, Any, List, no_type_check, Type

from aiohttp import ClientSession
from awscrt import mqtt5
from typing_extensions import Self

from pyhon import const, exceptions
from pyhon.appliance import HonAppliance
from pyhon.connection import mqtt
from pyhon.connection.auth import HonAuth
from pyhon.connection.handler.anonym import HonAnonymousConnectionHandler
from pyhon.connection.handler.hon import HonConnectionHandler
Expand Down Expand Up @@ -40,7 +38,6 @@ def __init__(
self._hon_handler: Optional[HonConnectionHandler] = None
self._hon_anonymous_handler: Optional[HonAnonymousConnectionHandler] = None
self._session: Optional[ClientSession] = session
self._mqtt_client: mqtt5.Client | None = None

async def __aenter__(self) -> Self:
return await self.create()
Expand Down Expand Up @@ -269,10 +266,6 @@ async def translation_keys(self, language: str = "en") -> Dict[str, Any]:
result: Dict[str, Any] = await response.json()
return result

async def subscribe_mqtt(self, appliances: list[HonAppliance]) -> None:
if not self._mqtt_client:
self._mqtt_client = await mqtt.start(self, appliances)

async def close(self) -> None:
if self._hon_handler is not None:
await self._hon_handler.close()
Expand Down
195 changes: 116 additions & 79 deletions pyhon/connection/mqtt.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import json
import logging
import secrets
Expand All @@ -10,91 +11,127 @@
from pyhon.appliance import HonAppliance

if TYPE_CHECKING:
from pyhon import HonAPI
from pyhon import Hon

_LOGGER = logging.getLogger(__name__)

appliances: list[HonAppliance] = []


def on_lifecycle_stopped(lifecycle_stopped_data: mqtt5.LifecycleStoppedData) -> None:
_LOGGER.info("Lifecycle Stopped: %s", str(lifecycle_stopped_data))


def on_lifecycle_connection_success(
lifecycle_connect_success_data: mqtt5.LifecycleConnectSuccessData,
) -> None:
_LOGGER.info(
"Lifecycle Connection Success: %s", str(lifecycle_connect_success_data)
)


def on_lifecycle_attempting_connect(
lifecycle_attempting_connect_data: mqtt5.LifecycleAttemptingConnectData,
) -> None:
_LOGGER.info(
"Lifecycle Attempting Connect - %s", str(lifecycle_attempting_connect_data)
)


def on_lifecycle_connection_failure(
lifecycle_connection_failure_data: mqtt5.LifecycleConnectFailureData,
) -> None:
_LOGGER.info(
"Lifecycle Connection Failure - %s", str(lifecycle_connection_failure_data)
)

class MQTTClient:
def __init__(self, hon: "Hon"):
self._client: mqtt5.Client | None = None
self._hon = hon
self._api = hon.api
self._appliances = hon.appliances
self._connection = False
self._watchdog_task: asyncio.Task[None] | None = None

@property
def client(self) -> mqtt5.Client:
if self._client is not None:
return self._client
raise AttributeError("Client is not set")

async def create(self) -> "MQTTClient":
await self._start()
self._subscribe_appliances()
return self

def _on_lifecycle_stopped(
self, lifecycle_stopped_data: mqtt5.LifecycleStoppedData
) -> None:
_LOGGER.info("Lifecycle Stopped: %s", str(lifecycle_stopped_data))

def _on_lifecycle_connection_success(
self,
lifecycle_connect_success_data: mqtt5.LifecycleConnectSuccessData,
) -> None:
self._connection = True
_LOGGER.info(
"Lifecycle Connection Success: %s", str(lifecycle_connect_success_data)
)

def on_lifecycle_disconnection(
lifecycle_disconnect_data: mqtt5.LifecycleDisconnectData,
) -> None:
_LOGGER.info("Lifecycle Disconnection - %s", str(lifecycle_disconnect_data))
def _on_lifecycle_attempting_connect(
self,
lifecycle_attempting_connect_data: mqtt5.LifecycleAttemptingConnectData,
) -> None:
_LOGGER.info(
"Lifecycle Attempting Connect - %s", str(lifecycle_attempting_connect_data)
)

def _on_lifecycle_connection_failure(
self,
lifecycle_connection_failure_data: mqtt5.LifecycleConnectFailureData,
) -> None:
_LOGGER.info(
"Lifecycle Connection Failure - %s", str(lifecycle_connection_failure_data)
)

def on_publish_received(data: mqtt5.PublishReceivedData) -> None:
if not (data and data.publish_packet and data.publish_packet.payload):
return
payload = json.loads(data.publish_packet.payload.decode())
topic = data.publish_packet.topic
if topic and "appliancestatus" in topic:
def _on_lifecycle_disconnection(
self,
lifecycle_disconnect_data: mqtt5.LifecycleDisconnectData,
) -> None:
self._connection = False
_LOGGER.info("Lifecycle Disconnection - %s", str(lifecycle_disconnect_data))

def _on_publish_received(self, data: mqtt5.PublishReceivedData) -> None:
if not (data and data.publish_packet and data.publish_packet.payload):
return
payload = json.loads(data.publish_packet.payload.decode())
topic = data.publish_packet.topic
appliance = next(
a for a in appliances if topic in a.info["topics"]["subscribe"]
a for a in self._appliances if topic in a.info["topics"]["subscribe"]
)
if topic and "appliancestatus" in topic:
for parameter in payload["parameters"]:
appliance.attributes["parameters"][parameter["parName"]].update(
parameter
)
appliance.sync_params_to_command("settings")
self._hon.notify()
elif topic and "connected" in topic:
_LOGGER.info("Connected %s", appliance.nick_name)
elif topic and "disconnected" in topic:
_LOGGER.info("Disconnected %s", appliance.nick_name)
elif topic and "discovery" in topic:
_LOGGER.info("Discovered %s", appliance.nick_name)
_LOGGER.info("%s - %s", topic, payload)

async def _start(self) -> None:
self._client = mqtt5_client_builder.websockets_with_custom_authorizer(
endpoint=const.AWS_ENDPOINT,
auth_authorizer_name=const.AWS_AUTHORIZER,
auth_authorizer_signature=await self._api.load_aws_token(),
auth_token_key_name="token",
auth_token_value=self._api.auth.id_token,
client_id=f"{const.MOBILE_ID}_{secrets.token_hex(8)}",
on_lifecycle_stopped=self._on_lifecycle_stopped,
on_lifecycle_connection_success=self._on_lifecycle_connection_success,
on_lifecycle_attempting_connect=self._on_lifecycle_attempting_connect,
on_lifecycle_connection_failure=self._on_lifecycle_connection_failure,
on_lifecycle_disconnection=self._on_lifecycle_disconnection,
on_publish_received=self._on_publish_received,
)
for parameter in payload["parameters"]:
appliance.attributes["parameters"][parameter["parName"]].update(parameter)
appliance.notify()
_LOGGER.info("%s - %s", topic, payload)


async def create_mqtt_client(api: "HonAPI") -> mqtt5.Client:
client: mqtt5.Client = mqtt5_client_builder.websockets_with_custom_authorizer(
endpoint=const.AWS_ENDPOINT,
auth_authorizer_name=const.AWS_AUTHORIZER,
auth_authorizer_signature=await api.load_aws_token(),
auth_token_key_name="token",
auth_token_value=api.auth.id_token,
client_id=f"{const.MOBILE_ID}_{secrets.token_hex(8)}",
on_lifecycle_stopped=on_lifecycle_stopped,
on_lifecycle_connection_success=on_lifecycle_connection_success,
on_lifecycle_attempting_connect=on_lifecycle_attempting_connect,
on_lifecycle_connection_failure=on_lifecycle_connection_failure,
on_lifecycle_disconnection=on_lifecycle_disconnection,
on_publish_received=on_publish_received,
)
client.start()
return client


def subscribe(client: mqtt5.Client, appliance: HonAppliance) -> None:
for topic in appliance.info.get("topics", {}).get("subscribe", []):
client.subscribe(mqtt5.SubscribePacket([mqtt5.Subscription(topic)])).result(10)
_LOGGER.info("Subscribed to topic %s", topic)


async def start(api: "HonAPI", app: list[HonAppliance]) -> mqtt5.Client:
client = await create_mqtt_client(api)
global appliances # pylint: disable=global-statement
appliances = app
for appliance in appliances:
subscribe(client, appliance)
return client
self.client.start()

def _subscribe_appliances(self) -> None:
for appliance in self._appliances:
self._subscribe(appliance)

def _subscribe(self, appliance: HonAppliance) -> None:
for topic in appliance.info.get("topics", {}).get("subscribe", []):
self.client.subscribe(
mqtt5.SubscribePacket([mqtt5.Subscription(topic)])
).result(10)
_LOGGER.info("Subscribed to topic %s", topic)

async def start_watchdog(self) -> None:
if not self._watchdog_task or self._watchdog_task.done():
await asyncio.create_task(self._watchdog())

async def _watchdog(self) -> None:
while True:
await asyncio.sleep(5)
if not self._connection:
_LOGGER.info("Restart mqtt connection")
await self._start()
self._subscribe_appliances()
15 changes: 13 additions & 2 deletions pyhon/hon.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@
import logging
from pathlib import Path
from types import TracebackType
from typing import List, Optional, Dict, Any, Type
from typing import List, Optional, Dict, Any, Type, Callable

from aiohttp import ClientSession
from typing_extensions import Self

from pyhon.appliance import HonAppliance
from pyhon.connection.api import HonAPI
from pyhon.connection.api import TestAPI
from pyhon.connection.mqtt import MQTTClient
from pyhon.exceptions import NoAuthenticationException

_LOGGER = logging.getLogger(__name__)
Expand All @@ -33,6 +34,8 @@ def __init__(
self._test_data_path: Path = test_data_path or Path().cwd()
self._mobile_id: str = mobile_id
self._refresh_token: str = refresh_token
self._mqtt_client: MQTTClient | None = None
self._notify_function: Optional[Callable[[Any], None]] = None

async def __aenter__(self) -> Self:
return await self.create()
Expand Down Expand Up @@ -120,7 +123,15 @@ async def setup(self) -> None:
api = TestAPI(test_data)
for appliance in await api.load_appliances():
await self._create_appliance(appliance, api)
await self.api.subscribe_mqtt(self.appliances)
if not self._mqtt_client:
self._mqtt_client = await MQTTClient(self).create()

def subscribe_updates(self, notify_function: Callable[[Any], None]) -> None:
self._notify_function = notify_function

def notify(self) -> None:
if self._notify_function:
self._notify_function(None)

async def close(self) -> None:
await self.api.close()
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

setup(
name="pyhOn",
version="0.17.1",
version="0.17.2",
author="Andre Basche",
description="Control hOn devices with python",
long_description=long_description,
Expand Down

0 comments on commit 11da4eb

Please sign in to comment.