Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert the evaluation from Redis pubsub to Redis Streams #677

2 changes: 1 addition & 1 deletion pytest.ini
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@

[pytest]
addopts = --import-mode=importlib
addopts = --import-mode=importlib --ignore=python/services/evaluationservice --ignore=python/gui --ignore=scripts/test_django.py
consider_namespace_packages = true
; environment variables that will be added before tests are run
; key=value pairs with no spaces
Expand Down
2 changes: 0 additions & 2 deletions python/gui/MaaS/static/maas/js/ready_evaluation.js
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,6 @@ async function submit_evaluation(event) {
editorData.view.setOption("readonly", true);
}

$(event.target).button("disable");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this accidentally committed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it's a fix for annoying behavior I encountered while working. You can submit an evaluation, but the button becomes permanently disabled. I removed the behavior. It will allow you to rapid fire identical evaluations (not ideal), but if something fails and you only need to change a part of the config, you no longer need to refresh the page.


$("#tabs").tabs("option", "active", 1);
}

Expand Down
101 changes: 81 additions & 20 deletions python/gui/forwarding/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
Defines a websocket consumer that does nothing other than pass messages directly to and from another websocket
to another service
"""
import json
import typing
import pathlib
import asyncio
Expand All @@ -21,15 +22,38 @@
LOGGER = logging.ConfiguredLogger()


ASGIView = typing.Callable[
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use a protocol instead?

class ASGIView(typing.Protocol):
    def __call__(
        self,
        scope: typing.Mapping,
        receive: typing.Callable[[bool | None, float | None], typing.Mapping],
        send: typing.Callable[[typing.Mapping], None],
    ) -> typing.Coroutine[typing.Any, typing.Any, None]: ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the benefit of using a protocol in this instance? The purpose is to describe a hint for an expected signature for type hinting, not a promise for an object structure.

Would changing its name to ASGIViewSignature make more sense?

[
typing.Mapping,
typing.Callable[[typing.Optional[bool], typing.Optional[float]], typing.Mapping],
typing.Callable[[typing.Mapping], None]
], typing.Coroutine[typing.Any, typing.Any, None]
]
"""
The signature for a function that yields an ASGI View

Args:
scope: HTTP Scope information
receive: A function to retrieve data from a queue
send: A function to send data through the socket
"""


class ForwardingSocket(SocketConsumer):
"""
A WebSocket Consumer that simply passes messages to and from another connection
"""
@classmethod
def asgi_from_configuration(
cls,
configuration: ForwardingConfiguration
) -> typing.Coroutine[typing.Any, typing.Any, None]:
def asgi_from_configuration(cls, configuration: ForwardingConfiguration) -> ASGIView:
"""
Create an asgi view with parameters provided by a ForwardingConfiguration

Args:
configuration: A configuration dictating where to forward socket messages

Returns:
A view function that requests may route to
"""
interface = cls.as_asgi(
target_host_name=configuration.name,
target_host_url=configuration.url,
Expand Down Expand Up @@ -101,10 +125,16 @@ def target_host_port(self) -> typing.Optional[typing.Union[str, int]]:

@property
def uses_ssl(self) -> bool:
"""
Whether the websocket connection is using SSL
"""
return self.__use_ssl

@property
def certificate_path(self) -> typing.Optional[str]:
"""
The path to an SSL certificate to use if SSL is to be employed
"""
return self._certificate_path

@property
Expand Down Expand Up @@ -139,31 +169,50 @@ def target_connection_url(self) -> str:

@property
def ssl_context(self) -> typing.Optional[ssl.SSLContext]:
if not self.__use_ssl:
return None

if not self._ssl_context:
"""
Get the SSL context to use if SSL is to be employed when connecting to a remote websocket
"""
if self.__use_ssl and not self._ssl_context:
self._ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
if not self._certificate_path:
raise ValueError(
f"An SSL certificate is required to connect to {self.__target_host_name} as configured, "
f"but none was given."
)
elif not os.path.exists(self._certificate_path):

if not os.path.exists(self._certificate_path):
raise ValueError(
f"The SSL Certificate needed to connect to {self.__target_host_name} was not "
f"found at {self._certificate_path}"
)
elif os.path.isfile(self._certificate_path):

if os.path.isfile(self._certificate_path):
self._ssl_context.load_verify_locations(cafile=self._certificate_path)
else:
self._ssl_context.load_verify_locations(capath=self._certificate_path)

return self._ssl_context

async def _connect_to_target(self):
"""
Connect to a remote websocket
"""
self.__connection = await connect_to_socket(uri=self.target_connection_url, ssl=self.ssl_context)

try:
# The connection was created, but throw an error if it can't be connected through
await self.__connection.ping()
except BaseException as ping_exception:
message = "Connection to remote server could not be established"
error_message = {
"event": "error",
"data": {
"message": message
}
}
await self.send(json.dumps(error_message))
raise Exception(message) from ping_exception

if self.__listen_task is None:
self.__listen_task = asyncio.create_task(self.listen(), name=f"ListenTo{self.__target_host_name}")

Expand All @@ -174,10 +223,22 @@ async def connect(self):
await super().accept()
await self._connect_to_target()

async def get_connection(self) -> WebSocketClientProtocol:
"""
Get a connection to the remote websocket

Returns:
A socket connection that facilitates sending and receiving messages
"""
if self.__connection is None or self.__connection.closed:
await self._connect_to_target()
return self.__connection

async def disconnect(self, code):
"""
Handler for when a client disconnects
"""
LOGGER.debug(f"Received signal for {self} to disconnect with code {code}")
# Attempt to cancel the task. This is mostly a safety measure. Cancelling here is preferred but a
# failure isn't the end of the world
if self.__listen_task is not None and not self.__listen_task.done():
Expand Down Expand Up @@ -233,14 +294,13 @@ async def listen(self):
"""
Listen for messages from the target connection and send them through the caller connection
"""
if self.__connection is None:
await self._connect_to_target()

async for message in self.__connection:
if isinstance(message, bytes):
await self.send(bytes_data=message)
else:
await self.send(message)
while True:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this in unbounded, we should add the return type hint of -> typing.NoReturn

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a bad idea. Might also be handy to detail how this is being managed and why to make it clear that this is a dependent operation that will be interrupted and won't necessarily create an infinite blocking loop.

connection = await self.get_connection()
async for message in connection:
if isinstance(message, bytes):
await self.send(bytes_data=message)
else:
await self.send(message)

async def receive(self, text_data: str = None, bytes_data: bytes = None, **kwargs):
"""
Expand All @@ -253,10 +313,11 @@ async def receive(self, text_data: str = None, bytes_data: bytes = None, **kwarg
bytes_data: Bytes data sent over the socket
**kwargs:
"""
connection = await self.get_connection()
if bytes_data and not text_data:
await self.__connection.send(bytes_data)
await connection.send(bytes_data)
elif text_data:
await self.__connection.send(text_data)
await connection.send(text_data)
else:
LOGGER.warn("A message was received from the client but not text or bytes data was received.")

Expand Down
2 changes: 1 addition & 1 deletion python/lib/core/dmod/core/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '0.19.0'
__version__ = '0.20.0'
165 changes: 0 additions & 165 deletions python/lib/core/dmod/core/common/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,171 +238,6 @@ def __contains__(self, obj: object) -> bool:
return obj in self.__data


class _OccurrenceTracker(typing.Generic[_T]):
"""
Keeps track of occurrences of a type of value that have been encountered within a duration
"""
def __init__(self, key: _T, duration: timedelta, threshold: int, on_filled: typing.Callable[[_T], typing.Any]):
self.__key = key
self.__duration = duration
self.__threshold = threshold
self.__occurences: typing.List[datetime] = []
self.__on_filled = on_filled

def value_encountered(self):
"""
Inform the tracker that the value has been encountered again
"""
self.update_occurrences()
self.__occurences.append(datetime.now())
if len(self.__occurences) >= self.__threshold:
self.__on_filled(self.__key)

def update_occurrences(self) -> int:
"""
Update the list of occurrences to include only those within the current duration

Returns:
The number of occurrences still being tracked
"""
cutoff: datetime = datetime.now() - self.__duration
self.__occurences = [
occurrence
for occurrence in self.__occurences
if occurrence > cutoff
]
return len(self.__occurences)

@property
def key(self):
"""
The identifier that is being tracked
"""
return self.__key

def __len__(self):
return len(self.__occurences)

def __str__(self):
if len(self.__occurences) == 0:
occurrences_details = f"No Occurences within the last {self.__duration.total_seconds()} seconds."
else:
occurrences_details = (f"{len(self.__occurences)} occurrences since "
f"{self.__occurences[0].strftime('%Y-%m-%d %H:%M:%S')}")
return f"{self.key}: {occurrences_details}"


class TimedOccurrenceWatcher:
"""
Keeps track of the amount of occurrences of items within a range of time
"""
MINIMUM_TRACKING_SECONDS: typing.Final[float] = 0.1
"""
The lowest number of seconds to watch for multiple occurrences. Only acting when multiple occurrences are tracked
in under 100ms would create a scenario where the watcher will most likely never trigger an action, rendering
this the wrong tool for the job.
"""

@staticmethod
def default_key_function(obj: object) -> type:
"""
The function used to find a common identifier for an object if one is not provided
"""
return type(obj)

def __init__(
self,
duration: timedelta,
threshold: int,
on_filled: typing.Callable[[_T], typing.Any],
key_function: typing.Callable[[_VT], _KT] = None
):
if not isinstance(duration, timedelta):
raise ValueError(f"Cannot create a {self.__class__.__name__} - {duration} is not a timedelta object")

if duration.total_seconds() < self.MINIMUM_TRACKING_SECONDS:
raise ValueError(
f"Cannot create a {self.__class__.__name__} - the duration is too short ({duration.total_seconds()}s)"
)

self.__duration = duration

if not isinstance(key_function, typing.Callable):
key_function = self.default_key_function

self.__key_function = key_function
self.__entries: typing.Dict[uuid.UUID, _OccurrenceTracker] = {}
self.__threshold = threshold
self.__on_filled = on_filled

def value_encountered(self, value: _T):
"""
Add an occurrence of an object to track

Args:
value: The item to track
"""
self.__update_trackers()
self._get_tracker(value).value_encountered()

def _get_tracker(self, value: _T) -> _OccurrenceTracker[_T]:
"""
Get an occurrence tracker for the given value

Args:
value: The value to track

Returns:
A tracker for the value
"""
key = self.__key_function(value)

for tracker in self.__entries.values():
if tracker.key == key:
return tracker

new_tracker = _OccurrenceTracker(
key=key,
duration=self.__duration,
threshold=self.__threshold,
on_filled=self.__on_filled
)
self.__entries[uuid.uuid1()] = new_tracker
return new_tracker

def __update_trackers(self):
"""
Update the amount of items in each tracker

If a tracker becomes empty it will be removed
"""
for tracker_id, tracker in self.__entries.items():
amount_left = tracker.update_occurrences()
if amount_left == 0:
del self.__entries[tracker_id]

@property
def size(self) -> int:
"""
The number of items encountered within the duration
"""
self.__update_trackers()
return sum(len(tracker) for tracker in self.__entries.values())

@property
def duration(self) -> timedelta:
"""
The amount of time to track items for
"""
return self.__duration

def __str__(self):
return f"{self.__class__.__name__}: {self.size} items within the last {self.duration.total_seconds()} Seconds"

def __repr__(self):
return self.__str__()


class EventfulMap(abc.ABC, typing.MutableMapping[_KT, _VT], typing.Generic[_KT, _VT]):
@abc.abstractmethod
def get_handlers(self) -> typing.Dict[CollectionEvent, typing.MutableSequence[typing.Callable]]:
Expand Down
1 change: 1 addition & 0 deletions python/lib/core/dmod/core/context/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ def end_scope(self):
"""
Override to add extra logic for when this scope is supposed to reach its end
"""
self.logger.warning(f"Ending scope '{self.__scope_id}' for: {self}")
self.drop_references()
self.__scope_closed()

Expand Down
Loading
Loading