diff --git a/kr8s/_api.py b/kr8s/_api.py index efa42a85..47d1c720 100644 --- a/kr8s/_api.py +++ b/kr8s/_api.py @@ -6,6 +6,7 @@ import contextlib import copy import json +import logging import ssl import threading import warnings @@ -29,6 +30,7 @@ from ._objects import APIObject ALL = "all" +logger = logging.getLogger(__name__) class Api: @@ -508,19 +510,37 @@ async def async_watch( allow_unknown_type: bool = True, ) -> AsyncGenerator[tuple[str, APIObject]]: """Watch a Kubernetes resource.""" - async with self.async_get_kind( - kind, - namespace=namespace, - label_selector=label_selector, - field_selector=field_selector, - params={"resourceVersion": since} if since else None, - watch=True, - timeout=None, - allow_unknown_type=allow_unknown_type, - ) as (obj_cls, response): - async for line in response.aiter_lines(): - event = json.loads(line) - yield event["type"], obj_cls(event["object"], api=self) + while True: + restart_watch = False + async with self.async_get_kind( + kind, + namespace=namespace, + label_selector=label_selector, + field_selector=field_selector, + params={"resourceVersion": since} if since else None, + watch=True, + timeout=None, + allow_unknown_type=allow_unknown_type, + ) as (obj_cls, response): + logger.debug( + f"Starting watch of {kind}{' at resourceVersion ' + since if since else ''}" + ) + async for line in response.aiter_lines(): + event = json.loads(line) + if ( + event["object"]["kind"] == "Status" + and event["object"].get("code") == 410 + ): + restart_watch = True + logger.debug( + f"Got 410 Gone: Restarting watch of {kind} at resourceVersion {since}" + ) + break + obj = obj_cls(event["object"], api=self) + since = obj.metadata.resourceVersion + yield event["type"], obj + if not restart_watch: + return async def api_resources(self) -> list[dict]: """Get the Kubernetes API resources."""