From 1c6acca5eb2449bbecf0d787008c33a84541d8b5 Mon Sep 17 00:00:00 2001 From: Ryan Bunney Date: Fri, 13 Sep 2024 12:21:14 +0800 Subject: [PATCH 01/12] Node Class: Setup new node class to improve consistency of computing node description throughout dlg-engine. --- daliuge-common/dlg/constants.py | 5 +- .../dlg/manager/composite_manager.py | 89 ++++++++++------- daliuge-engine/dlg/manager/manager_data.py | 96 +++++++++++++++++++ daliuge-engine/dlg/manager/node_manager.py | 17 +++- daliuge-engine/dlg/manager/session.py | 19 ++-- 5 files changed, 177 insertions(+), 49 deletions(-) create mode 100644 daliuge-engine/dlg/manager/manager_data.py diff --git a/daliuge-common/dlg/constants.py b/daliuge-common/dlg/constants.py index e9a141a00..6907e50c0 100644 --- a/daliuge-common/dlg/constants.py +++ b/daliuge-common/dlg/constants.py @@ -31,16 +31,15 @@ "NODE_DEFAULT_RPC_PORT": 6666, } +NODE_DEFAULT_HOSTNAME = "localhost" + # just for backwards compatibility NODE_DEFAULT_REST_PORT = DEFAULT_PORTS["NODE_DEFAULT_REST_PORT"] ISLAND_DEFAULT_REST_PORT = DEFAULT_PORTS["ISLAND_DEFAULT_REST_PORT"] MASTER_DEFAULT_REST_PORT = DEFAULT_PORTS["MASTER_DEFAULT_REST_PORT"] - REPLAY_DEFAULT_REST_PORT = DEFAULT_PORTS["REPLAY_DEFAULT_REST_PORT"] - DAEMON_DEFAULT_REST_PORT = DEFAULT_PORTS["DAEMON_DEFAULT_REST_PORT"] - # Others ports used by the Node Managers NODE_DEFAULT_EVENTS_PORT = DEFAULT_PORTS["NODE_DEFAULT_EVENTS_PORT"] NODE_DEFAULT_RPC_PORT = DEFAULT_PORTS["NODE_DEFAULT_RPC_PORT"] diff --git a/daliuge-engine/dlg/manager/composite_manager.py b/daliuge-engine/dlg/manager/composite_manager.py index 8941732cb..b1d4016ea 100644 --- a/daliuge-engine/dlg/manager/composite_manager.py +++ b/daliuge-engine/dlg/manager/composite_manager.py @@ -28,19 +28,21 @@ import multiprocessing.pool import threading +from dlg.manager.client import NodeManagerClient +from dlg.manager.drop_manager import DROPManager +from dlg.manager.manager_data import Node + from dlg import constants -from .client import NodeManagerClient from dlg.constants import ISLAND_DEFAULT_REST_PORT, NODE_DEFAULT_REST_PORT -from .drop_manager import DROPManager -from .. import graph_loader +from dlg import graph_loader from dlg.common.reproducibility.reproducibility import init_pg_repro_data -from ..ddap_protocol import DROPRel +from dlg.ddap_protocol import DROPRel from dlg.exceptions import ( InvalidGraphException, DaliugeException, SubManagerException, ) -from ..utils import portIsOpen +from dlg.utils import portIsOpen logger = logging.getLogger(__name__) @@ -156,12 +158,12 @@ def __init__( :param: dmCheckTimeout The timeout used before giving up and declaring a sub-DM as not-yet-present in a given host """ - if dmHosts and ":" in dmHosts[0]: + self._dmHosts = [Node(host) for host in dmHosts] + if self._dmHosts and self._dmHosts[0].rest_port_specified: dmPort = -1 self._dmPort = dmPort self._partitionAttr = partitionAttr self._subDmId = subDmId - self._dmHosts = dmHosts if dmHosts else [] self._graph = {} self._drop_rels = {} self._sessionIds = ( @@ -176,6 +178,7 @@ def __init__( # This list is different from the dmHosts, which are the machines that # are directly managed by this manager (which in turn could manage more # machines) + self._use_dmHosts = False self._nodes = [] self.startDMChecker() @@ -214,44 +217,56 @@ def _checkDM(self): @property def dmHosts(self): - return self._dmHosts[:] + return [str(n) for n in self._dmHosts[:]] - def addDmHost(self, host): - if not ":" in host: - host += f":{self._dmPort}" + def addDmHost(self, host_str: str): + host = Node(host_str) + # if not ":" in host: + # host += f":{self._dmPort}" if host not in self._dmHosts: self._dmHosts.append(host) logger.debug("Added sub-manager %s", host) else: logger.warning("Host %s already registered.", host) - def removeDmHost(self, host): + def removeDmHost(self, host_str): + host = Node(host_str) if host in self._dmHosts: self._dmHosts.remove(host) @property def nodes(self): - return self._nodes[:] + if self._use_dmHosts: + return [str(n) for n in self._dmHosts[:]] + else: + return self._nodes def add_node(self, node): - self._nodes.append(node) + if self._use_dmHosts: + return self._dmHosts.append(Node(node)) + else: + self._nodes.append(node) + def remove_node(self, node): - self._nodes.remove(node) + if self._use_dmHosts: + self._dmHosts.remove(Node(node)) + else: + self._nodes.remove(node) @property def dmPort(self): return self._dmPort - def check_dm(self, host, port=None, timeout=10): - if ":" in host: - host, port = host.split(":") - port = int(port) + def check_dm(self, host: Node, port=None, timeout=10): + host_name = host.host + if host.rest_port_specified: + port = host.port else: port = port or self._dmPort logger.debug("Checking DM presence at %s port %d", host, port) - dm_is_there = portIsOpen(host, port, timeout) + dm_is_there = portIsOpen(host_name, port, timeout) return dm_is_there def dmAt(self, host, port=None): @@ -268,21 +283,22 @@ def dmAt(self, host, port=None): def getSessionIds(self): return self._sessionIds - # - # Replication of commands to underlying drop managers - # If "collect" is given, then individual results are also kept in the given - # structure, which is either a dictionary or a list - # def _do_in_host(self, action, sessionId, exceptions, f, collect, port, iterable): + """ + Replication of commands to underlying drop managers + If "collect" is given, then individual results are also kept in the given + structure, which is either a dictionary or a list + """ + host = iterable if isinstance(iterable, (list, tuple)): - host = iterable[0] - if ":" in host: - host, port = host.split(":") - port = int(port) + host = iterable[0] # What's going on here? + # if ":" in host: + # host, port = host.split(":") + # port = int(port) try: - with self.dmAt(host, port) as dm: + with self.dmAt(host) as dm: res = f(dm, iterable, sessionId) if isinstance(collect, dict): @@ -295,8 +311,8 @@ def _do_in_host(self, action, sessionId, exceptions, f, collect, port, iterable) logger.exception( "Error while %s on host %s:%d, session %s", action, - host, - port, + host.host, + host.port, sessionId, ) @@ -398,7 +414,7 @@ def addGraphSpec(self, sessionId, graphSpec): ) raise InvalidGraphException(msg) - partition = dropSpec[self._partitionAttr] + partition = Node(dropSpec[self._partitionAttr]) if partition not in self._dmHosts: msg = ( f"Drop {dropSpec.get('oid', None)}'s {self._partitionAttr} {partition} " @@ -431,8 +447,8 @@ def addGraphSpec(self, sessionId, graphSpec): for rel in inter_partition_rels: # rhn = self._graph[rel.rhs]["node"].split(":")[0] # lhn = self._graph[rel.lhs]["node"].split(":")[0] - rhn = self._graph[rel.rhs]["node"] - lhn = self._graph[rel.lhs]["node"] + rhn = Node(self._graph[rel.rhs]["node"]) + lhn = Node(self._graph[rel.lhs]["node"]) drop_rels[lhn][rhn].append(rel) drop_rels[rhn][lhn].append(rel) @@ -593,7 +609,8 @@ def __init__(self, dmHosts: list[str] = None, pkeyPath=None, dmCheckTimeout=10): ) # In the case of the Data Island the dmHosts are the final nodes as well - self._nodes = dmHosts + self._use_dmHosts = True + # self._nodes = dmHosts logger.info("Created DataIslandManager for hosts: %r", self._dmHosts) diff --git a/daliuge-engine/dlg/manager/manager_data.py b/daliuge-engine/dlg/manager/manager_data.py new file mode 100644 index 000000000..db689f602 --- /dev/null +++ b/daliuge-engine/dlg/manager/manager_data.py @@ -0,0 +1,96 @@ +# +# ICRAR - International Centre for Radio Astronomy Research +# (c) UWA - The University of Western Australia, 2024 +# Copyright by UWA (in the framework of the ICRAR) +# All rights reserved +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, +# MA 02111-1307 USA +# + +""" +This module contains classes and helper-methods to support the various manager classes +""" + +from dataclasses import dataclass + +from dlg import constants + +from enum import IntEnum + + +class NodeProtocolPosition(IntEnum): + HOST = 0 + PORT = 1 + RPC_PORT = 2 + EVENTS_PORT = 3 + + +class Node: + """ + Class for encapsulating compute node information to standardise + inter-node communication. + """ + + def __init__(self, host: str): + chunks = host.split(':') + num_chunks = len(chunks) + self.host = constants.NODE_DEFAULT_HOSTNAME + self.port = constants.NODE_DEFAULT_REST_PORT + self.rpc_port = constants.NODE_DEFAULT_RPC_PORT + self.events_port = constants.NODE_DEFAULT_RPC_PORT + self._rest_port_specified = False + + if num_chunks >= 1: + self.host = chunks[NodeProtocolPosition.HOST] + if num_chunks >= 2: + self.port = int(chunks[NodeProtocolPosition.PORT]) + self._rest_port_specified = True + if num_chunks >= 3: + self.rpc_port = int(chunks[NodeProtocolPosition.RPC_PORT]) + if num_chunks >= 4: + self.events_port = int(chunks[NodeProtocolPosition.EVENTS_PORT]) + + def serialize(self): + """ + Convert to the expect string representation of our Node using the + following 'protocol': + + "host:port:rpc_port:event_port" + + :return: str + """ + return f"{self.host}:{self.port}:{self.rpc_port}:{self.events_port}" + + @property + def rest_port_specified(self): + """ + Returns True if we specified a Node REST port when passing the list of nodes to + the DIM at startup. + """ + return self._rest_port_specified + + def __str__(self): + """ + Make our serialized Node the string. + :return: str + """ + return self.serialize() + + def __eq__(self, other): + return str(self) == str(other) + + def __hash__(self): + return hash(str(self)) diff --git a/daliuge-engine/dlg/manager/node_manager.py b/daliuge-engine/dlg/manager/node_manager.py index 138d0900d..b8547bb83 100644 --- a/daliuge-engine/dlg/manager/node_manager.py +++ b/daliuge-engine/dlg/manager/node_manager.py @@ -238,6 +238,7 @@ class NodeManagerBase(DROPManager): def __init__( self, + events_port, dlm_check_period=0, dlm_cleanup_period=0, dlm_enable_replication=False, @@ -248,6 +249,7 @@ def __init__( use_processes=False, logdir=utils.getDlgLogsDir(), ): + self._events_port = events_port self._dlm = DataLifecycleManager( check_period=dlm_check_period, cleanup_period=dlm_cleanup_period, @@ -433,6 +435,7 @@ def add_node_subscriptions(self, sessionId, relationships): # Set up event channels subscriptions for nodesub in relationships: + # This needs to be changed events_port = constants.NODE_DEFAULT_EVENTS_PORT if type(nodesub) is tuple: host, events_port, _ = nodesub @@ -445,6 +448,18 @@ def add_node_subscriptions(self, sessionId, relationships): logger.debug("Sending subscription to %s", f"{host}:{events_port}") self.subscribe(host, events_port) + def _convert_relationships_to_nodes(self, relationships): + """ + Load JSON representation of relationships into Node classes. + + :param relationships: dict, relationships receveived through REST call + :return: list of Node classes + """ + nodes = [] + for nodesub in relationships: + relationships + + def has_method(self, sessionId, uid, mname): self._check_session_id(sessionId) return self._sessions[sessionId].has_method(uid, mname) @@ -654,7 +669,7 @@ def __init__( **kwargs, ): host = host or "localhost" - NodeManagerBase.__init__(self, *args, **kwargs) + NodeManagerBase.__init__(self, events_port, *args, **kwargs) EventMixIn.__init__(self, host, events_port) RpcMixIn.__init__(self, host, rpc_port) self.start() diff --git a/daliuge-engine/dlg/manager/session.py b/daliuge-engine/dlg/manager/session.py index ce252bbf3..5c560bd21 100644 --- a/daliuge-engine/dlg/manager/session.py +++ b/daliuge-engine/dlg/manager/session.py @@ -36,14 +36,15 @@ from dlg.common.reproducibility.reproducibility import init_runtime_repro_data from dlg.utils import createDirIfMissing -from .. import constants -from .. import droputils -from .. import graph_loader -from .. import rpc -from .. import utils -from ..common.reproducibility.constants import ReproducibilityFlags, ALL_RMODES -from ..ddap_protocol import DROPLinkType, DROPRel, DROPStates -from ..drop import ( +from dlg import constants +# from .. import constants +from dlg import droputils +from dlg import graph_loader +from dlg import rpc +from dlg import utils +from dlg.common.reproducibility.constants import ReproducibilityFlags, ALL_RMODES +from dlg.ddap_protocol import DROPLinkType, DROPRel, DROPStates +from dlg.drop import ( AbstractDROP, LINKTYPE_1TON_APPEND_METHOD, LINKTYPE_1TON_BACK_APPEND_METHOD, @@ -51,7 +52,7 @@ from ..apps.app_base import AppDROP, InputFiredAppDROP from ..data.drops.data_base import EndDROP -from ..exceptions import ( +from dlg.exceptions import ( InvalidSessionState, InvalidGraphException, NoDropException, From c094e838adbd2ac63ff9f2bb4d65bc998528baeb Mon Sep 17 00:00:00 2001 From: Ryan Bunney Date: Fri, 13 Sep 2024 16:48:02 +0800 Subject: [PATCH 02/12] Node Class: Update REST interface to hide JSON node representation - rest.py acts as the interface between the JSON/string representation of nodes, and the DropManagers, which should only use the Node class from now on. Note: This will not successfully deploy a graph, as the translator is not functional with the new information. --- .../dlg/manager/composite_manager.py | 44 ++++--- daliuge-engine/dlg/manager/manager_data.py | 8 +- daliuge-engine/dlg/manager/node_manager.py | 1 + daliuge-engine/dlg/manager/rest.py | 116 ++++++++++++------ daliuge-engine/dlg/manager/session.py | 1 + 5 files changed, 113 insertions(+), 57 deletions(-) diff --git a/daliuge-engine/dlg/manager/composite_manager.py b/daliuge-engine/dlg/manager/composite_manager.py index b1d4016ea..768223a1b 100644 --- a/daliuge-engine/dlg/manager/composite_manager.py +++ b/daliuge-engine/dlg/manager/composite_manager.py @@ -178,7 +178,7 @@ def __init__( # This list is different from the dmHosts, which are the machines that # are directly managed by this manager (which in turn could manage more # machines) - self._use_dmHosts = False + self.use_dm_hosts = False self._nodes = [] self.startDMChecker() @@ -217,7 +217,7 @@ def _checkDM(self): @property def dmHosts(self): - return [str(n) for n in self._dmHosts[:]] + return self._dmHosts[:] def addDmHost(self, host_str: str): host = Node(host_str) @@ -236,20 +236,31 @@ def removeDmHost(self, host_str): @property def nodes(self): - if self._use_dmHosts: - return [str(n) for n in self._dmHosts[:]] + if self.use_dm_hosts: + return self._dmHosts[:] else: return self._nodes - def add_node(self, node): - if self._use_dmHosts: - return self._dmHosts.append(Node(node)) + def add_node(self, node: Node): + if self.use_dm_hosts: + return self._dmHosts.append(node) else: self._nodes.append(node) + def get_node_from_json(self, node_str): + """ + Given a node str, return the Node we have stored + + Return: Node + + Raises: ValueError if there is no existing Node added to the CompositeManager + """ + + idx = self.nodes.index(Node(node_str)) + return self._nodes[idx] def remove_node(self, node): - if self._use_dmHosts: + if self.use_dm_hosts: self._dmHosts.remove(Node(node)) else: self._nodes.remove(node) @@ -269,16 +280,13 @@ def check_dm(self, host: Node, port=None, timeout=10): dm_is_there = portIsOpen(host_name, port, timeout) return dm_is_there - def dmAt(self, host, port=None): - if not self.check_dm(host, port): + def dmAt(self, host: Node): + if not self.check_dm(host): raise SubManagerException( - f"Manager expected but not running in {host}:{port}" + f"Manager expected but not running in {host.host}:{host.port}" ) - if not ":" in host: - port = port or self._dmPort - else: - host, port = host.split(":") - return NodeManagerClient(host, port, 10) + + return NodeManagerClient(host.host, host.port, 10) def getSessionIds(self): return self._sessionIds @@ -296,7 +304,7 @@ def _do_in_host(self, action, sessionId, exceptions, f, collect, port, iterable) # if ":" in host: # host, port = host.split(":") # port = int(port) - + print(host) try: with self.dmAt(host) as dm: res = f(dm, iterable, sessionId) @@ -322,6 +330,7 @@ def replicate(self, sessionId, f, action, collect=None, iterable=None, port=None """ thrExs = {} iterable = iterable or self._dmHosts + print(iterable) port = port or self._dmPort # logger.debug("Replicating command: %s on hosts: %s", f, iterable) self._tp.map( @@ -461,6 +470,7 @@ def addGraphSpec(self, sessionId, graphSpec): for partition in perPartition: if self._graph.get("reprodata") is not None: perPartition[partition].append(self._graph["reprodata"]) + logger.debug(f"perPartition: {perPartition.items()}") self.replicate( sessionId, self._addGraphSpec, diff --git a/daliuge-engine/dlg/manager/manager_data.py b/daliuge-engine/dlg/manager/manager_data.py index db689f602..1db895224 100644 --- a/daliuge-engine/dlg/manager/manager_data.py +++ b/daliuge-engine/dlg/manager/manager_data.py @@ -89,8 +89,14 @@ def __str__(self): """ return self.serialize() + def __repr__(self): + return str(self) + def __eq__(self, other): - return str(self) == str(other) + if isinstance(other, Node): + return hash(self) == hash(other) + if isinstance(other, str): + return hash(self) == hash(other) def __hash__(self): return hash(str(self)) diff --git a/daliuge-engine/dlg/manager/node_manager.py b/daliuge-engine/dlg/manager/node_manager.py index b8547bb83..8f358927f 100644 --- a/daliuge-engine/dlg/manager/node_manager.py +++ b/daliuge-engine/dlg/manager/node_manager.py @@ -435,6 +435,7 @@ def add_node_subscriptions(self, sessionId, relationships): # Set up event channels subscriptions for nodesub in relationships: + node = Node(nodesub) # This needs to be changed events_port = constants.NODE_DEFAULT_EVENTS_PORT if type(nodesub) is tuple: diff --git a/daliuge-engine/dlg/manager/rest.py b/daliuge-engine/dlg/manager/rest.py index a2c781690..da6b95d98 100644 --- a/daliuge-engine/dlg/manager/rest.py +++ b/daliuge-engine/dlg/manager/rest.py @@ -57,6 +57,7 @@ from ..restutils import RestClient, RestClientException from .session import generateLogFileName from dlg.common.deployment_methods import DeploymentMethods +from dlg.manager.manager_data import Node logger = logging.getLogger(__name__) @@ -355,6 +356,7 @@ def addGraphParts(self, sessionId): graph_parts = bottle.json_loads(json_content.read()) + # Do something about host Nodes in graph_parts? return self.dm.addGraphSpec(sessionId, graph_parts) # return {"graph_parts": graph_parts} @@ -442,11 +444,16 @@ def linkGraphParts(self, sessionId): @daliuge_aware def add_node_subscriptions(self, sessionId): + # TODO translate node information here logger.debug("NM REST call: add_subscriptions %s", bottle.request.json) if bottle.request.content_type != "application/json": bottle.response.status = 415 return - self.dm.add_node_subscriptions(sessionId, bottle.request.json) + subscriptions = self._parse_subscriptions(bottle.request.json) + self.dm.add_node_subscriptions(sessionId, subscriptions) + + def _parse_subscriptions(self, json_request): + return [Node(n) for n in json_request] @daliuge_aware def trigger_drops(self, sessionId): @@ -506,36 +513,62 @@ def initializeSpecifics(self, app): @daliuge_aware def getCMStatus(self): + """ + REST (GET): /api/ + + Return JSON-compatible list of Composite Manager nodes and sessions + """ return { - "hosts": self.dm.dmHosts, + "hosts": [str(n) for n in self.dm.dmHosts], "sessionIds": self.dm.getSessionIds(), } @daliuge_aware def getCMNodes(self): - return self.dm.nodes + """ + REST (GET): /api/nodes - def getAllCMNodes(self): + Return JSON-compatible list of Composite Manager nodes + """ + return [str(n) for n in self.dm.nodes] + + def _getAllCMNodes(self): return self.dm.nodes @daliuge_aware def addCMNode(self, node): + """ + REST (POST): "/api/node/" + + Add the posted node to the Composite Manager + + Converts from JSON to our ser + """ logger.debug("Adding node %s", node) - self.dm.add_node(node) + self.dm.add_node(Node(node)) @daliuge_aware def removeCMNode(self, node): + """ + REST (DELETE): "/api/node/" + + Add the posted node to the Composite Manager + + """ logger.debug("Removing node %s", node) self.dm.remove_node(node) @daliuge_aware def getNodeSessions(self, node): - port = None - if node not in self.dm.nodes: - raise Exception(f"{node} not in current list of nodes") - if node.find(":") > 0: - node, port = node.split(":") - with NodeManagerClient(host=node, port=port) as dm: + """ + REST (GET): "/api/node//sessions" + + Retrieve sessions for given node + """ + host_node = Node(node) + if host_node not in self.dm.nodes: + raise Exception(f"{host_node} not in current list of nodes") + with NodeManagerClient(host=host_node.host, port=host_node.port) as dm: return dm.sessions() def _tarfile_write(self, tar, headers, stream): @@ -562,9 +595,8 @@ def _tarfile_write(self, tar, headers, stream): def getLogFile(self, sessionId): fh = io.BytesIO() with tarfile.open(fileobj=fh, mode="w:gz") as tar: - for node in self.getAllCMNodes(): - node, port = node.split(":") - with NodeManagerClient(host=node, port=port) as dm: + for node in self._getAllCMNodes(): + with NodeManagerClient(host=node.host, port=node.port) as dm: try: stream, resp = dm.get_log_file(sessionId) self._tarfile_write(tar, resp, stream) @@ -581,36 +613,42 @@ def getLogFile(self, sessionId): return data @daliuge_aware - def getNodeSessionInformation(self, node, sessionId): - if node not in self.dm.nodes: - raise Exception(f"{node} not in current list of nodes") - node, port = node.split(":") - with NodeManagerClient(host=node, port=port) as dm: - return dm.session(sessionId) + def getNodeSessionInformation(self, node_str, sessionId): + try: + node = self.dm.get_node_from_json(node_str) + with NodeManagerClient(host=node.host, port=node.port) as dm: + return dm.session(sessionId) + except ValueError as e: + raise Exception(f"{node_str} not in current list of nodes:", e) + @daliuge_aware - def getNodeSessionStatus(self, node, sessionId): - if node not in self.dm.nodes: - raise Exception(f"{node} not in current list of nodes") - node, port = node.split(":") - with NodeManagerClient(host=node, port=port) as dm: - return dm.session_status(sessionId) + def getNodeSessionStatus(self, node_str, sessionId): + try: + node = self.dm.get_node_from_json(node_str) + with NodeManagerClient(host=node.host, port=node.port) as dm: + return dm.session_status(sessionId) + except ValueError as e: + raise Exception(f"{node_str} not in current list of nodes:", e) @daliuge_aware - def getNodeGraph(self, node, sessionId): - if node not in self.dm.nodes: - raise Exception(f"{node} not in current list of nodes") - node, port = node.split(":") - with NodeManagerClient(host=node, port=port) as dm: - return dm.graph(sessionId) + def getNodeGraph(self, node_str, sessionId): + try: + node = self.dm.get_node_from_json(node_str) + with NodeManagerClient(host=node.host, port=node.port) as dm: + return dm.graph(sessionId) + except ValueError as e: + raise Exception(f"{node_str} not in current list of nodes:", e) @daliuge_aware - def getNodeGraphStatus(self, node, sessionId): - if node not in self.dm.nodes: - raise Exception(f"{node} not in current list of nodes") - node, port = node.split(":") - with NodeManagerClient(host=node, port=port) as dm: - return dm.graph_status(sessionId) + def getNodeGraphStatus(self, node_str, sessionId): + try: + node = self.dm.get_node_from_json(node_str) + with NodeManagerClient(host=node.host, port=node.port) as dm: + return dm.graph_status(sessionId) + + except ValueError as e: + raise Exception(f"{node_str} not in current list of nodes:", e) # =========================================================================== # non-REST methods @@ -734,7 +772,7 @@ def getMMInfo(self, host): ) as c: return json.loads(c._GET("/managers/master").read()) - def getAllCMNodes(self): + def _getAllCMNodes(self): nodes = [] for node in self.dm.dmHosts: with DataIslandManagerClient(host=node) as dm: diff --git a/daliuge-engine/dlg/manager/session.py b/daliuge-engine/dlg/manager/session.py index 5c560bd21..a768983dd 100644 --- a/daliuge-engine/dlg/manager/session.py +++ b/daliuge-engine/dlg/manager/session.py @@ -512,6 +512,7 @@ def deliver_event(self, evt): drop.handleEvent(evt) def add_node_subscriptions(self, relationships): + # do we translate on the REST side? Probably best to do this actually. evt_consumer = ( DROPLinkType.CONSUMER, From a87b5f051630a94d6a6346bf27920466d38ffdee Mon Sep 17 00:00:00 2001 From: Ryan Bunney Date: Mon, 16 Sep 2024 18:25:14 +0800 Subject: [PATCH 03/12] Node Class: Successfully use Nodes class for non-partitioned example. - Test with HelloWorld-Simple and ArrayLoop graphs. - Unittests appear to work as well --- .../dlg/manager/composite_manager.py | 11 +++-- daliuge-engine/dlg/manager/manager_data.py | 15 +++---- daliuge-engine/dlg/manager/node_manager.py | 43 ++++++++----------- daliuge-engine/dlg/manager/rest.py | 27 ++++++------ daliuge-engine/dlg/manager/session.py | 6 +-- daliuge-engine/dlg/rpc.py | 16 ++++--- daliuge-engine/test/manager/test_dim.py | 17 ++++---- daliuge-engine/test/manager/test_dm.py | 10 +++-- 8 files changed, 76 insertions(+), 69 deletions(-) diff --git a/daliuge-engine/dlg/manager/composite_manager.py b/daliuge-engine/dlg/manager/composite_manager.py index 768223a1b..5af85a87e 100644 --- a/daliuge-engine/dlg/manager/composite_manager.py +++ b/daliuge-engine/dlg/manager/composite_manager.py @@ -281,6 +281,7 @@ def check_dm(self, host: Node, port=None, timeout=10): return dm_is_there def dmAt(self, host: Node): + assert(isinstance(host, Node)) if not self.check_dm(host): raise SubManagerException( f"Manager expected but not running in {host.host}:{host.port}" @@ -304,8 +305,12 @@ def _do_in_host(self, action, sessionId, exceptions, f, collect, port, iterable) # if ":" in host: # host, port = host.split(":") # port = int(port) - print(host) + if not isinstance(host, Node): + host = Node(host) try: + if not isinstance(host, Node): + host = Node(host) + with self.dmAt(host) as dm: res = f(dm, iterable, sessionId) @@ -577,7 +582,7 @@ def getGraph(self, sessionId): return allGraphs def _getSessionStatus(self, dm, host, sessionId): - return {host: dm.getSessionStatus(sessionId)} + return {str(host): dm.getSessionStatus(sessionId)} def getSessionStatus(self, sessionId): allStatus = {} @@ -619,7 +624,7 @@ def __init__(self, dmHosts: list[str] = None, pkeyPath=None, dmCheckTimeout=10): ) # In the case of the Data Island the dmHosts are the final nodes as well - self._use_dmHosts = True + self.use_dm_hosts = True # self._nodes = dmHosts logger.info("Created DataIslandManager for hosts: %r", self._dmHosts) diff --git a/daliuge-engine/dlg/manager/manager_data.py b/daliuge-engine/dlg/manager/manager_data.py index 1db895224..2157c61ce 100644 --- a/daliuge-engine/dlg/manager/manager_data.py +++ b/daliuge-engine/dlg/manager/manager_data.py @@ -34,9 +34,8 @@ class NodeProtocolPosition(IntEnum): HOST = 0 PORT = 1 - RPC_PORT = 2 - EVENTS_PORT = 3 - + EVENTS_PORT = 2 + RPC_PORT = 3 class Node: """ @@ -49,8 +48,8 @@ def __init__(self, host: str): num_chunks = len(chunks) self.host = constants.NODE_DEFAULT_HOSTNAME self.port = constants.NODE_DEFAULT_REST_PORT + self.events_port = constants.NODE_DEFAULT_EVENTS_PORT self.rpc_port = constants.NODE_DEFAULT_RPC_PORT - self.events_port = constants.NODE_DEFAULT_RPC_PORT self._rest_port_specified = False if num_chunks >= 1: @@ -59,20 +58,20 @@ def __init__(self, host: str): self.port = int(chunks[NodeProtocolPosition.PORT]) self._rest_port_specified = True if num_chunks >= 3: - self.rpc_port = int(chunks[NodeProtocolPosition.RPC_PORT]) - if num_chunks >= 4: self.events_port = int(chunks[NodeProtocolPosition.EVENTS_PORT]) + if num_chunks >= 4: + self.rpc_port = int(chunks[NodeProtocolPosition.RPC_PORT]) def serialize(self): """ Convert to the expect string representation of our Node using the following 'protocol': - "host:port:rpc_port:event_port" + "host:port:event_port:rpc_port" :return: str """ - return f"{self.host}:{self.port}:{self.rpc_port}:{self.events_port}" + return f"{self.host}:{self.port}:{self.events_port}:{self.rpc_port}" @property def rest_port_specified(self): diff --git a/daliuge-engine/dlg/manager/node_manager.py b/daliuge-engine/dlg/manager/node_manager.py index 8f358927f..e03a3c2f1 100644 --- a/daliuge-engine/dlg/manager/node_manager.py +++ b/daliuge-engine/dlg/manager/node_manager.py @@ -41,20 +41,22 @@ import typing from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, Future -from .. import constants -from .drop_manager import DROPManager -from .session import Session - -from .. import rpc, utils -from ..ddap_protocol import DROPStates -from ..apps.app_base import AppDROP, DropRunner -from ..exceptions import ( +from dlg import constants +from dlg.manager.drop_manager import DROPManager +from dlg.manager.session import Session + +from dlg import rpc, utils +from dlg.ddap_protocol import DROPStates +from dlg.apps.app_base import AppDROP, DropRunner +from dlg.exceptions import ( NoSessionException, SessionAlreadyExistsException, DaliugeException, ) from ..lifecycle.dlm import DataLifecycleManager +from dlg.manager.manager_data import Node + logger = logging.getLogger(__name__) @@ -435,31 +437,20 @@ def add_node_subscriptions(self, sessionId, relationships): # Set up event channels subscriptions for nodesub in relationships: - node = Node(nodesub) + # node = Node(nodesub) # This needs to be changed - events_port = constants.NODE_DEFAULT_EVENTS_PORT + events_port = nodesub.events_port #constants.NODE_DEFAULT_EVENTS_PORT if type(nodesub) is tuple: host, events_port, _ = nodesub else: # TODO: we also have to unsubscribe from them at some point - if nodesub.find(":") > 0: - host, _ = nodesub.split(":") - else: - host = nodesub + # if nodesub.find(":") > 0: + # host, _ = nodesub.split(":") + # else: + host = nodesub logger.debug("Sending subscription to %s", f"{host}:{events_port}") self.subscribe(host, events_port) - def _convert_relationships_to_nodes(self, relationships): - """ - Load JSON representation of relationships into Node classes. - - :param relationships: dict, relationships receveived through REST call - :return: list of Node classes - """ - nodes = [] - for nodesub in relationships: - relationships - def has_method(self, sessionId, uid, mname): self._check_session_id(sessionId) @@ -546,7 +537,7 @@ def publish_event(self, evt): def subscribe(self, host, port): timeout = 5 finished_evt = threading.Event() - endpoint = "tcp://%s:%d" % (utils.zmq_safe(host), port) + endpoint = "tcp://%s:%d" % (utils.zmq_safe(host.host), port) self._subscriptions.put(ZMQPubSubMixIn.subscription(endpoint, finished_evt)) if not finished_evt.wait(timeout): raise DaliugeException( diff --git a/daliuge-engine/dlg/manager/rest.py b/daliuge-engine/dlg/manager/rest.py index da6b95d98..651184ad2 100644 --- a/daliuge-engine/dlg/manager/rest.py +++ b/daliuge-engine/dlg/manager/rest.py @@ -41,9 +41,9 @@ from bottle import static_file from dlg import constants -from .client import NodeManagerClient, DataIslandManagerClient -from .. import utils -from ..exceptions import ( +from dlg.manager.client import NodeManagerClient, DataIslandManagerClient +from dlg import utils +from dlg.exceptions import ( InvalidGraphException, InvalidSessionState, DaliugeException, @@ -53,9 +53,9 @@ InvalidRelationshipException, SubManagerException, ) -from ..restserver import RestServer -from ..restutils import RestClient, RestClientException -from .session import generateLogFileName +from dlg.restserver import RestServer +from dlg.restutils import RestClient, RestClientException +from dlg.manager.session import generateLogFileName from dlg.common.deployment_methods import DeploymentMethods from dlg.manager.manager_data import Node @@ -108,7 +108,7 @@ def fwrapper(*args, **kwargs): # logger.debug("Bottle sending back result: %s", jres[: min(len(jres), 80)]) return jres except Exception as e: - logger.exception("Error while fulfilling request") + logger.exception("Error while fulfilling request for func %s ",func) status, eargs = 500, () if isinstance(e, NotImplementedError): @@ -453,6 +453,7 @@ def add_node_subscriptions(self, sessionId): self.dm.add_node_subscriptions(sessionId, subscriptions) def _parse_subscriptions(self, json_request): + return [Node(n) for n in json_request] @daliuge_aware @@ -666,8 +667,8 @@ def visualizeDIM(self): dmType=self.dm.__class__.__name__, dmPort=self.dm.dmPort, serverUrl=serverUrl, - dmHosts=json.dumps(self.dm.dmHosts), - nodes=json.dumps(self.dm.nodes), + dmHosts=json.dumps([str(n) for n in self.dm.dmHosts]), + nodes=json.dumps([str(n) for n in self.dm.nodes]), selectedNode=selectedNode, ) @@ -717,7 +718,7 @@ def removeDIM(self, dim): @daliuge_aware def getNMs(self): - return {"nodes": self.dm.nodes} + return {"nodes": [str(n) for n in self.dm.nodes]} @daliuge_aware def startNM(self, host): @@ -774,7 +775,7 @@ def getMMInfo(self, host): def _getAllCMNodes(self): nodes = [] - for node in self.dm.dmHosts: - with DataIslandManagerClient(host=node) as dm: + for host in self.dm.dmHosts: + with DataIslandManagerClient(host=host.host, port=host.port) as dm: nodes += dm.nodes() - return nodes + return [str(n) for n in nodes] diff --git a/daliuge-engine/dlg/manager/session.py b/daliuge-engine/dlg/manager/session.py index a768983dd..fd9e89da3 100644 --- a/daliuge-engine/dlg/manager/session.py +++ b/daliuge-engine/dlg/manager/session.py @@ -531,9 +531,9 @@ def add_node_subscriptions(self, relationships): droprels = [DROPRel(*x) for x in droprels] # Sanitize the host/rpc_port info if needed - rpc_port = constants.NODE_DEFAULT_RPC_PORT - if type(host) is tuple: - host, _, rpc_port = host + rpc_port = host.rpc_port #constants.NODE_DEFAULT_RPC_PORT + # if type(host) is tuple: + # host, _, rpc_port = host # Store which drops should receive events from which remote drops dropsubs = collections.defaultdict(set) diff --git a/daliuge-engine/dlg/rpc.py b/daliuge-engine/dlg/rpc.py index e8921711d..8720e6070 100644 --- a/daliuge-engine/dlg/rpc.py +++ b/daliuge-engine/dlg/rpc.py @@ -35,6 +35,8 @@ import gevent import zerorpc +from dlg.manager.manager_data import Node + from . import utils logger = logging.getLogger(__name__) @@ -55,7 +57,7 @@ class RPCClientBase(RPCObject): def get_drop_attribute(self, hostname, port, session_id, uid, name): - hostname = hostname.split(":")[0] + # hostname = hostname.split(":")[0] logger.debug( "Getting attribute %s for drop %s of session %s at %s:%d", @@ -65,7 +67,7 @@ def get_drop_attribute(self, hostname, port, session_id, uid, name): hostname, port, ) - hostname = hostname.split(":")[0] + # hostname = hostname.split(":")[0] client, closer = self.get_rpc_client(hostname, port) @@ -140,8 +142,11 @@ def shutdown(self): def get_client_for_endpoint(self, host, port): - host = host.split(":")[0] - endpoint = (host, port) + # host = host.split(":")[0] + if isinstance(host, Node): + endpoint = (host.host, port) + else: + endpoint = (host, port) with self._zrpcclient_acquisition_lock: if endpoint in self._zrpcclients: @@ -183,7 +188,8 @@ def has_method(self, session_id, uid, name): return client def run_zrpcclient(self, host, port, req_queue): - host = host.split(":")[0] + if isinstance(host, Node): + host = host.host # split(":")[0] client = zerorpc.Client("tcp://%s:%d" % (host, port), context=self._context) forwarder = gevent.spawn(self.forward_requests, req_queue, client) diff --git a/daliuge-engine/test/manager/test_dim.py b/daliuge-engine/test/manager/test_dim.py index d7d6a7238..b1dc09fbe 100644 --- a/daliuge-engine/test/manager/test_dim.py +++ b/daliuge-engine/test/manager/test_dim.py @@ -28,15 +28,16 @@ import pkg_resources -from dlg import droputils -from dlg import utils +from dlg import utils, droputils +from dlg.testutils import ManagerStarter from dlg.common import tool from dlg.constants import ISLAND_DEFAULT_REST_PORT, NODE_DEFAULT_REST_PORT from dlg.ddap_protocol import DROPStates from dlg.manager.composite_manager import DataIslandManager from dlg.manager.session import SessionStates -from dlg.testutils import ManagerStarter -from test.manager import testutils +from dlg.manager.manager_data import Node + +from . import testutils hostname = "localhost" dim_host = f"{hostname}:{ISLAND_DEFAULT_REST_PORT}" @@ -210,8 +211,8 @@ def test_sessionStatus(self): def assertSessionStatus(sessionId, status): sessionStatus = self.dim.getSessionStatus(sessionId) self.assertEqual(1, len(sessionStatus)) - self.assertIn(nm_host, sessionStatus) - self.assertEqual(status, sessionStatus[nm_host]) + self.assertIn(str(Node(nm_host)), sessionStatus) + self.assertEqual(status, sessionStatus[Node(nm_host)]) self.assertEqual(status, self.dm.getSessionStatus(sessionId)) sessionId = "lala" @@ -379,7 +380,7 @@ def test_fullRound(self): self.assertEqual(0, len(sessions)) dimStatus = testutils.get(self, "", dimPort) self.assertEqual(1, len(dimStatus["hosts"])) - self.assertEqual(f"{hostname}:{nmPort}", dimStatus["hosts"][0]) + self.assertEqual(Node(f"{hostname}:{nmPort}"), dimStatus["hosts"][0]) self.assertEqual(0, len(dimStatus["sessionIds"])) # Create a session and check it exists @@ -389,7 +390,7 @@ def test_fullRound(self): sessions = testutils.get(self, "/sessions", dimPort) self.assertEqual(1, len(sessions)) self.assertEqual(sessionId, sessions[0]["sessionId"]) - nm_name = f"{hostname}:{nmPort}" + nm_name = str(Node(f"{hostname}:{nmPort}")) self.assertDictEqual( {nm_name: SessionStates.PRISTINE}, sessions[0]["status"] ) diff --git a/daliuge-engine/test/manager/test_dm.py b/daliuge-engine/test/manager/test_dm.py index 583697efc..aaee8bfbd 100644 --- a/daliuge-engine/test/manager/test_dm.py +++ b/daliuge-engine/test/manager/test_dm.py @@ -33,6 +33,7 @@ from dlg.ddap_protocol import DROPStates, DROPRel, DROPLinkType from dlg.apps.app_base import BarrierAppDROP from dlg.manager.node_manager import NodeManager +from dlg.manager.manager_data import Node try: from crc32c import crc32c # @UnusedImport @@ -107,8 +108,11 @@ def run(self): raise Exception("Sorry, we always fail") -def nm_conninfo(n): - return "localhost", 5553 + n, 6666 + n +def nm_conninfo(n, return_tuple=False): + if return_tuple: + return "localhost", 5553 + n, 6666 + n + else: + return Node(f"localhost:{8000}:{5553+n}:{6666+n}") class NMTestsMixIn(object): @@ -118,7 +122,7 @@ def __init__(self, *args, **kwargs): self.use_processes = False def _start_dm(self, threads=0, **kwargs): - host, events_port, rpc_port = nm_conninfo(len(self._dms)) + host, events_port, rpc_port = nm_conninfo(len(self._dms), return_tuple=True) nm = NodeManager( host=host, events_port=events_port, From 09960c1f862bb146f9fa17333b09dc2ffd40ed50 Mon Sep 17 00:00:00 2001 From: Ryan Bunney Date: Tue, 17 Sep 2024 15:32:59 +0800 Subject: [PATCH 04/12] Node Class: Finalised test cases. --- .../dlg/manager/composite_manager.py | 69 ++++++++----------- daliuge-engine/dlg/manager/manager_data.py | 4 +- daliuge-engine/test/manager/test_mm.py | 65 ++++++++++------- daliuge-engine/test/manager/test_rest.py | 7 +- 4 files changed, 74 insertions(+), 71 deletions(-) diff --git a/daliuge-engine/dlg/manager/composite_manager.py b/daliuge-engine/dlg/manager/composite_manager.py index 5af85a87e..0c175ab4c 100644 --- a/daliuge-engine/dlg/manager/composite_manager.py +++ b/daliuge-engine/dlg/manager/composite_manager.py @@ -107,7 +107,8 @@ def sanitize_relations(interDMRelations, graph): def group_by_node(uids, graph): uids_by_node = collections.defaultdict(list) for uid in uids: - uids_by_node[graph[uid]["node"]].append(uid) + uids_by_node[Node(graph[uid]["node"])].append(uid) + logger.info(uids_by_node) return uids_by_node @@ -135,13 +136,13 @@ class allows for multiple levels of hierarchy seamlessly. __metaclass__ = abc.ABCMeta def __init__( - self, - dmPort, - partitionAttr, - subDmId, - dmHosts: list[str] = None, - pkeyPath=None, - dmCheckTimeout=10, + self, + dmPort, + partitionAttr, + subDmId, + dmHosts: list[str] = None, + pkeyPath=None, + dmCheckTimeout=10, ): """ Creates a new CompositeManager. The sub-DMs it manages are to be located @@ -178,7 +179,7 @@ def __init__( # This list is different from the dmHosts, which are the machines that # are directly managed by this manager (which in turn could manage more # machines) - self.use_dm_hosts = False + self._use_dmHosts = False self._nodes = [] self.startDMChecker() @@ -217,7 +218,7 @@ def _checkDM(self): @property def dmHosts(self): - return self._dmHosts[:] + return [str(n) for n in self._dmHosts[:]] def addDmHost(self, host_str: str): host = Node(host_str) @@ -236,31 +237,19 @@ def removeDmHost(self, host_str): @property def nodes(self): - if self.use_dm_hosts: - return self._dmHosts[:] + if self._use_dmHosts: + return [str(n) for n in self._dmHosts[:]] else: return self._nodes - def add_node(self, node: Node): - if self.use_dm_hosts: - return self._dmHosts.append(node) + def add_node(self, node): + if self._use_dmHosts: + return self._dmHosts.append(Node(node)) else: self._nodes.append(node) - def get_node_from_json(self, node_str): - """ - Given a node str, return the Node we have stored - - Return: Node - - Raises: ValueError if there is no existing Node added to the CompositeManager - """ - - idx = self.nodes.index(Node(node_str)) - return self._nodes[idx] - def remove_node(self, node): - if self.use_dm_hosts: + if self._use_dmHosts: self._dmHosts.remove(Node(node)) else: self._nodes.remove(node) @@ -269,7 +258,7 @@ def remove_node(self, node): def dmPort(self): return self._dmPort - def check_dm(self, host: Node, port=None, timeout=10): + def check_dm(self, host: Node, port: int = None, timeout=10): host_name = host.host if host.rest_port_specified: port = host.port @@ -280,13 +269,15 @@ def check_dm(self, host: Node, port=None, timeout=10): dm_is_there = portIsOpen(host_name, port, timeout) return dm_is_there - def dmAt(self, host: Node): - assert(isinstance(host, Node)) + def dmAt(self, host): if not self.check_dm(host): raise SubManagerException( f"Manager expected but not running in {host.host}:{host.port}" ) - + # if not ":" in host: + # port = port or self._dmPort + # else: + # host, port = host.split(":") return NodeManagerClient(host.host, host.port, 10) def getSessionIds(self): @@ -305,12 +296,8 @@ def _do_in_host(self, action, sessionId, exceptions, f, collect, port, iterable) # if ":" in host: # host, port = host.split(":") # port = int(port) - if not isinstance(host, Node): - host = Node(host) - try: - if not isinstance(host, Node): - host = Node(host) + try: with self.dmAt(host) as dm: res = f(dm, iterable, sessionId) @@ -320,13 +307,14 @@ def _do_in_host(self, action, sessionId, exceptions, f, collect, port, iterable) collect.append(res) except Exception as e: - exceptions[host] = e + exceptions[str(host)] = e logger.exception( - "Error while %s on host %s:%d, session %s", + "Error while %s on host %s:%d, session %s, when executing %s", action, host.host, host.port, sessionId, + f ) def replicate(self, sessionId, f, action, collect=None, iterable=None, port=None): @@ -335,7 +323,6 @@ def replicate(self, sessionId, f, action, collect=None, iterable=None, port=None """ thrExs = {} iterable = iterable or self._dmHosts - print(iterable) port = port or self._dmPort # logger.debug("Replicating command: %s on hosts: %s", f, iterable) self._tp.map( @@ -475,7 +462,6 @@ def addGraphSpec(self, sessionId, graphSpec): for partition in perPartition: if self._graph.get("reprodata") is not None: perPartition[partition].append(self._graph["reprodata"]) - logger.debug(f"perPartition: {perPartition.items()}") self.replicate( sessionId, self._addGraphSpec, @@ -493,6 +479,7 @@ def _deploySession(self, dm, host, sessionId): def _triggerDrops(self, dm, host_and_uids, sessionId): host, uids = host_and_uids + dm.trigger_drops(sessionId, uids) logger.info( "Successfully triggered drops for session %s on %s", diff --git a/daliuge-engine/dlg/manager/manager_data.py b/daliuge-engine/dlg/manager/manager_data.py index 2157c61ce..8b5f8327d 100644 --- a/daliuge-engine/dlg/manager/manager_data.py +++ b/daliuge-engine/dlg/manager/manager_data.py @@ -88,8 +88,8 @@ def __str__(self): """ return self.serialize() - def __repr__(self): - return str(self) + # def __repr__(self): + # return str(self) def __eq__(self, other): if isinstance(other, Node): diff --git a/daliuge-engine/test/manager/test_mm.py b/daliuge-engine/test/manager/test_mm.py index e77328d61..d99ab5bbc 100644 --- a/daliuge-engine/test/manager/test_mm.py +++ b/daliuge-engine/test/manager/test_mm.py @@ -34,6 +34,7 @@ from dlg import constants from dlg.manager.composite_manager import MasterManager from dlg.manager.session import SessionStates +from dlg.manager.manager_data import Node from dlg.testutils import ManagerStarter from dlg.exceptions import NoSessionException from test.manager import testutils @@ -68,11 +69,13 @@ def add_test_reprodata(graph: list): class DimAndNMStarter(ManagerStarter): def setUp(self): super(DimAndNMStarter, self).setUp() - self.nm_info = self.start_nm_in_thread() - self.dim_info = self.start_dim_in_thread() + nm_port = constants.NODE_DEFAULT_REST_PORT + dim_port = constants.ISLAND_DEFAULT_REST_PORT + self.nm_info = self.start_nm_in_thread(port=nm_port) + self.dim_info = self.start_dim_in_thread(port=dim_port) self.nm = self.nm_info.manager self.dim = self.dim_info.manager - self.mm = MasterManager([f"{hostname}"]) + self.mm = MasterManager([f"{hostname}:{dim_port}"]) def tearDown(self): self.mm.shutdown() @@ -82,13 +85,15 @@ def tearDown(self): class TestMM(DimAndNMStarter, unittest.TestCase): def createSessionAndAddTypicalGraph(self, sessionId, sleepTime=0): + nm_node = Node(f"{hostname}:{constants.NODE_DEFAULT_REST_PORT}") + dim_node = Node(f"{hostname}:{constants.ISLAND_DEFAULT_REST_PORT}") graphSpec = [ { "oid": "A", "categoryType": "Data", "dropclass": "dlg.data.drops.memory.InMemoryDROP", - "island": f"{hostname}", - "node": f"{hostname}:{constants.NODE_DEFAULT_REST_PORT}", + "island": str(dim_node), + "node": str(nm_node), "consumers": ["B"], }, { @@ -97,15 +102,15 @@ def createSessionAndAddTypicalGraph(self, sessionId, sleepTime=0): "dropclass": "dlg.apps.simple.SleepAndCopyApp", "sleep_time": sleepTime, "outputs": ["C"], - "node": f"{hostname}:{constants.NODE_DEFAULT_REST_PORT}", - "island": f"{hostname}", + "node": str(nm_node), + "island": str(dim_node), }, { "oid": "C", "categoryType": "Data", "dropclass": "dlg.data.drops.memory.InMemoryDROP", - "island": f"{hostname}", - "node": f"{hostname}:{constants.NODE_DEFAULT_REST_PORT}", + "island": str(dim_node), + "node": str(nm_node), }, ] add_test_reprodata(graphSpec) @@ -141,14 +146,15 @@ def test_addGraphSpec(self): } ] self.assertRaises(Exception, self.mm.addGraphSpec, sessionId, graphSpec) - + nm_node = Node(f"{hostname}:{constants.NODE_DEFAULT_REST_PORT}") + dim_node = Node(f"{hostname}:{constants.ISLAND_DEFAULT_REST_PORT}") # No island specified graphSpec = [ { "oid": "A", "categoryType": "Data", "dropclass": "dlg.data.drops.memory.InMemoryDROP", - "node": f"{hostname}:{constants.NODE_DEFAULT_REST_PORT}", + "node": str(nm_node), } ] self.assertRaises(Exception, self.mm.addGraphSpec, sessionId, graphSpec) @@ -159,7 +165,7 @@ def test_addGraphSpec(self): "oid": "A", "categoryType": "Data", "dropclass": "dlg.data.drops.memory.InMemoryDROP", - "node": f"{hostname}:{constants.NODE_DEFAULT_REST_PORT}", + "node": str(nm_node), "island": "unknown_host", } ] @@ -171,8 +177,8 @@ def test_addGraphSpec(self): "oid": "A", "categoryType": "Data", "dropclass": "dlg.data.drops.memory.InMemoryDROP", - "node": f"{hostname}:{constants.NODE_DEFAULT_REST_PORT}", - "island": f"{hostname}", + "node": str(nm_node), + "island": str(dim_node) } ] self.mm.createSession(sessionId) @@ -221,20 +227,29 @@ def test_deployGraphWithCompletedDOs(self): def test_sessionStatus(self): def assertSessionStatus(sessionId, status): + """ + MasterManager -> DIM(s) -> NM(s) + + We expect the keys of the sessions status of MM to be the DIMs, and + the keys of the DIMs session status to be the NMs. + """ sessionStatusMM = self.mm.getSessionStatus(sessionId) sessionStatusDIM = self.dim.getSessionStatus(sessionId) sessionStatusNM = self.nm.getSessionStatus(sessionId) self.assertEqual(1, len(sessionStatusMM)) - self.assertIn(hostname, sessionStatusMM) + dimNode = str(Node(f"{hostname}:{constants.ISLAND_DEFAULT_REST_PORT}")) + self.assertIn( + dimNode, + sessionStatusMM + ) self.assertDictEqual( sessionStatusDIM, - sessionStatusMM[f"{hostname}"], + sessionStatusMM[dimNode] ) + nmNode = str(Node(f"{hostname}:{constants.NODE_DEFAULT_REST_PORT}")) self.assertEqual( sessionStatusNM, - sessionStatusMM[f"{hostname}"][ - f"{hostname}:{constants.NODE_DEFAULT_REST_PORT}" - ], + sessionStatusMM[dimNode][nmNode], ) self.assertEqual(sessionStatusNM, status) @@ -327,7 +342,7 @@ def test_fullRound(self): host = f"{dimStatus['hosts'][0].split(':', 1)[0]}:{restPort}" self.assertEqual(1, len(dimStatus["hosts"])) self.assertEqual( - f"{hostname}:{constants.NODE_DEFAULT_REST_PORT}", dimStatus["hosts"][0] + Node(f"{hostname}:{constants.NODE_DEFAULT_REST_PORT}"), dimStatus["hosts"][0] ) self.assertEqual(0, len(dimStatus["sessionIds"])) @@ -340,7 +355,7 @@ def test_fullRound(self): self.assertEqual(sessionId, sessions[0]["sessionId"]) self.assertDictEqual( { - f"{hostname}:{constants.NODE_DEFAULT_REST_PORT}": SessionStates.PRISTINE + Node(f"{hostname}:{constants.NODE_DEFAULT_REST_PORT}"): SessionStates.PRISTINE }, sessions[0]["status"], ) @@ -364,7 +379,7 @@ def test_fullRound(self): json.dumps(complexGraphSpec), ) self.assertEqual( - {f"{hostname}:8000": SessionStates.BUILDING}, + {Node(f"{hostname}:8000"): SessionStates.BUILDING}, testutils.get(self, "/sessions/%s/status" % (sessionId), restPort), ) @@ -377,7 +392,7 @@ def test_fullRound(self): mimeType="application/x-www-form-urlencoded", ) self.assertEqual( - {f"{hostname}:8000": SessionStates.RUNNING}, + {Node(f"{hostname}:8000"): SessionStates.RUNNING}, testutils.get(self, "/sessions/%s/status" % (sessionId), restPort), ) @@ -393,14 +408,14 @@ def test_fullRound(self): # it finished by polling the status of the session while SessionStates.RUNNING in [ testutils.get(self, "/sessions/%s/status" % (sessionId), restPort)[ - f"{hostname}:{constants.NODE_DEFAULT_REST_PORT}" + str(Node(f"{hostname}:{constants.NODE_DEFAULT_REST_PORT}")) ] ]: time.sleep(0.2) self.assertEqual( { - f"{hostname}:{constants.NODE_DEFAULT_REST_PORT}": SessionStates.FINISHED + str(Node(f"{hostname}:{constants.NODE_DEFAULT_REST_PORT}")): SessionStates.FINISHED }, testutils.get(self, "/sessions/%s/status" % (sessionId), restPort), ) diff --git a/daliuge-engine/test/manager/test_rest.py b/daliuge-engine/test/manager/test_rest.py index 9d373c195..576523123 100644 --- a/daliuge-engine/test/manager/test_rest.py +++ b/daliuge-engine/test/manager/test_rest.py @@ -31,6 +31,7 @@ from dlg.manager.client import NodeManagerClient, DataIslandManagerClient from dlg.manager.composite_manager import DataIslandManager from dlg.manager.node_manager import NodeManager +from dlg.manager.manager_data import Node from dlg.manager.rest import NMRestServer, CompositeManagerRestServer from dlg.restutils import RestClient @@ -194,15 +195,15 @@ def test_recursive(self): "oid": "a", "categoryType": "Application", "dropclass": "doesnt.exist", - "node": hostname, + "node": str(Node(hostname)), "reprodata": default_repro.copy(), }, default_graph_repro.copy(), ], ) ex = cm.exception - self.assertTrue(hostname in ex.args[0]) - self.assertTrue(isinstance(ex.args[0][hostname], InvalidGraphException)) + self.assertTrue(Node(hostname) in ex.args[0]) + self.assertTrue(isinstance(ex.args[0][Node(hostname)], InvalidGraphException)) def test_reprodata_get(self): """ From 03925222d269a13fe35f2aece66cc6a98c854018 Mon Sep 17 00:00:00 2001 From: Ryan Bunney Date: Wed, 18 Sep 2024 12:09:09 +0800 Subject: [PATCH 05/12] Node Class: Successful testing with distributed local case (ArrayLoop) --- daliuge-common/dlg/clients.py | 1 + daliuge-engine/dlg/manager/composite_manager.py | 16 +++++++++------- daliuge-engine/dlg/manager/rest.py | 7 ++++--- 3 files changed, 14 insertions(+), 10 deletions(-) diff --git a/daliuge-common/dlg/clients.py b/daliuge-common/dlg/clients.py index 2cff59f71..d1fd9da39 100644 --- a/daliuge-common/dlg/clients.py +++ b/daliuge-common/dlg/clients.py @@ -233,6 +233,7 @@ def __init__( super(NodeManagerClient, self).__init__(host=host, port=port, timeout=timeout) def add_node_subscriptions(self, sessionId, node_subscriptions): + self._post_json( f"/sessions/{quote(sessionId)}/subscriptions", node_subscriptions ) diff --git a/daliuge-engine/dlg/manager/composite_manager.py b/daliuge-engine/dlg/manager/composite_manager.py index 0c175ab4c..b87c93f45 100644 --- a/daliuge-engine/dlg/manager/composite_manager.py +++ b/daliuge-engine/dlg/manager/composite_manager.py @@ -179,7 +179,7 @@ def __init__( # This list is different from the dmHosts, which are the machines that # are directly managed by this manager (which in turn could manage more # machines) - self._use_dmHosts = False + self.use_dmHosts = False self._nodes = [] self.startDMChecker() @@ -237,19 +237,19 @@ def removeDmHost(self, host_str): @property def nodes(self): - if self._use_dmHosts: + if self.use_dmHosts: return [str(n) for n in self._dmHosts[:]] else: return self._nodes def add_node(self, node): - if self._use_dmHosts: + if self.use_dmHosts: return self._dmHosts.append(Node(node)) else: self._nodes.append(node) def remove_node(self, node): - if self._use_dmHosts: + if self.use_dmHosts: self._dmHosts.remove(Node(node)) else: self._nodes.remove(node) @@ -296,6 +296,8 @@ def _do_in_host(self, action, sessionId, exceptions, f, collect, port, iterable) # if ":" in host: # host, port = host.split(":") # port = int(port) + if isinstance(host, str): + host = Node(host) try: with self.dmAt(host) as dm: @@ -448,8 +450,8 @@ def addGraphSpec(self, sessionId, graphSpec): for rel in inter_partition_rels: # rhn = self._graph[rel.rhs]["node"].split(":")[0] # lhn = self._graph[rel.lhs]["node"].split(":")[0] - rhn = Node(self._graph[rel.rhs]["node"]) - lhn = Node(self._graph[rel.lhs]["node"]) + rhn = (self._graph[rel.rhs]["node"]) + lhn = (self._graph[rel.lhs]["node"]) drop_rels[lhn][rhn].append(rel) drop_rels[rhn][lhn].append(rel) @@ -611,7 +613,7 @@ def __init__(self, dmHosts: list[str] = None, pkeyPath=None, dmCheckTimeout=10): ) # In the case of the Data Island the dmHosts are the final nodes as well - self.use_dm_hosts = True + self.use_dmHosts = True # self._nodes = dmHosts logger.info("Created DataIslandManager for hosts: %r", self._dmHosts) diff --git a/daliuge-engine/dlg/manager/rest.py b/daliuge-engine/dlg/manager/rest.py index 651184ad2..46e7e5446 100644 --- a/daliuge-engine/dlg/manager/rest.py +++ b/daliuge-engine/dlg/manager/rest.py @@ -444,7 +444,6 @@ def linkGraphParts(self, sessionId): @daliuge_aware def add_node_subscriptions(self, sessionId): - # TODO translate node information here logger.debug("NM REST call: add_subscriptions %s", bottle.request.json) if bottle.request.content_type != "application/json": bottle.response.status = 415 @@ -453,8 +452,10 @@ def add_node_subscriptions(self, sessionId): self.dm.add_node_subscriptions(sessionId, subscriptions) def _parse_subscriptions(self, json_request): - - return [Node(n) for n in json_request] + relationships = {} + for host, droprels in json_request.items(): + relationships[Node(host)] = droprels + return relationships @daliuge_aware def trigger_drops(self, sessionId): From d90d053e36dcd0c7722155a37d04675630efdd84 Mon Sep 17 00:00:00 2001 From: Ryan Bunney Date: Wed, 18 Sep 2024 15:56:55 +0800 Subject: [PATCH 06/12] Node Class: Testing daemon with more debug --- .github/workflows/run-unit-tests.yml | 2 +- daliuge-engine/dlg/manager/composite_manager.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/run-unit-tests.yml b/.github/workflows/run-unit-tests.yml index 3f93fe992..71d994a78 100644 --- a/.github/workflows/run-unit-tests.yml +++ b/.github/workflows/run-unit-tests.yml @@ -64,7 +64,7 @@ jobs: COVFILES="$COVFILES daliuge-engine/.coverage" echo "COVFILES=$COVFILES" >> $GITHUB_ENV cd daliuge-engine - py.test --cov --show-capture=no + py.test --cov --log-cli-level=DEBUG daliuge-engine/test/manager/test_daemon.py - name: Combine coverage run: coverage combine $COVFILES diff --git a/daliuge-engine/dlg/manager/composite_manager.py b/daliuge-engine/dlg/manager/composite_manager.py index b87c93f45..5cb4768c9 100644 --- a/daliuge-engine/dlg/manager/composite_manager.py +++ b/daliuge-engine/dlg/manager/composite_manager.py @@ -210,7 +210,7 @@ def _checkDM(self): break if not self.check_dm(host, self._dmPort, timeout=self._dmCheckTimeout): logger.error( - "Couldn't contact manager for host %s:%d, will try again later", + "Couldn't contact manager for host %s with dmPort %d, will try again later", host, self._dmPort, ) if self._dmCheckerEvt.wait(60): From 33e1f899d3a777853485f4f51b232c0e1690925c Mon Sep 17 00:00:00 2001 From: Ryan Bunney Date: Wed, 18 Sep 2024 15:58:43 +0800 Subject: [PATCH 07/12] Node Class: Testing daemon with more debug --- .github/workflows/run-unit-tests.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/run-unit-tests.yml b/.github/workflows/run-unit-tests.yml index 71d994a78..87c5b8ba4 100644 --- a/.github/workflows/run-unit-tests.yml +++ b/.github/workflows/run-unit-tests.yml @@ -64,7 +64,7 @@ jobs: COVFILES="$COVFILES daliuge-engine/.coverage" echo "COVFILES=$COVFILES" >> $GITHUB_ENV cd daliuge-engine - py.test --cov --log-cli-level=DEBUG daliuge-engine/test/manager/test_daemon.py + py.test --cov --log-cli-level=DEBUG test/manager/test_daemon.py - name: Combine coverage run: coverage combine $COVFILES From 6441cc7544049470b234ead39a448ab01859929f Mon Sep 17 00:00:00 2001 From: Ryan Bunney Date: Wed, 18 Sep 2024 16:08:08 +0800 Subject: [PATCH 08/12] Node Class: Don't instantiate node twice --- daliuge-engine/dlg/manager/composite_manager.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/daliuge-engine/dlg/manager/composite_manager.py b/daliuge-engine/dlg/manager/composite_manager.py index 5cb4768c9..a61f1a991 100644 --- a/daliuge-engine/dlg/manager/composite_manager.py +++ b/daliuge-engine/dlg/manager/composite_manager.py @@ -242,9 +242,9 @@ def nodes(self): else: return self._nodes - def add_node(self, node): + def add_node(self, node: Node): if self.use_dmHosts: - return self._dmHosts.append(Node(node)) + return self._dmHosts.append(node) else: self._nodes.append(node) From bf55fb37afc661db9ca752c3ed5a7968d3eafd4e Mon Sep 17 00:00:00 2001 From: Ryan Bunney Date: Wed, 18 Sep 2024 16:12:50 +0800 Subject: [PATCH 09/12] Node Class: Attempt correct removal --- daliuge-engine/dlg/manager/composite_manager.py | 2 +- daliuge-engine/dlg/manager/rest.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/daliuge-engine/dlg/manager/composite_manager.py b/daliuge-engine/dlg/manager/composite_manager.py index a61f1a991..f01a38585 100644 --- a/daliuge-engine/dlg/manager/composite_manager.py +++ b/daliuge-engine/dlg/manager/composite_manager.py @@ -250,7 +250,7 @@ def add_node(self, node: Node): def remove_node(self, node): if self.use_dmHosts: - self._dmHosts.remove(Node(node)) + self._dmHosts.remove(node) else: self._nodes.remove(node) diff --git a/daliuge-engine/dlg/manager/rest.py b/daliuge-engine/dlg/manager/rest.py index 46e7e5446..a09dddba2 100644 --- a/daliuge-engine/dlg/manager/rest.py +++ b/daliuge-engine/dlg/manager/rest.py @@ -558,7 +558,7 @@ def removeCMNode(self, node): """ logger.debug("Removing node %s", node) - self.dm.remove_node(node) + self.dm.remove_node(Node(node)) @daliuge_aware def getNodeSessions(self, node): From 895c228236c86f73fd3d2f4b2ecc64d18ba531db Mon Sep 17 00:00:00 2001 From: Ryan Bunney Date: Wed, 18 Sep 2024 16:15:21 +0800 Subject: [PATCH 10/12] Node Class: Reinstate all tests --- .github/workflows/run-unit-tests.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/run-unit-tests.yml b/.github/workflows/run-unit-tests.yml index 87c5b8ba4..3f93fe992 100644 --- a/.github/workflows/run-unit-tests.yml +++ b/.github/workflows/run-unit-tests.yml @@ -64,7 +64,7 @@ jobs: COVFILES="$COVFILES daliuge-engine/.coverage" echo "COVFILES=$COVFILES" >> $GITHUB_ENV cd daliuge-engine - py.test --cov --log-cli-level=DEBUG test/manager/test_daemon.py + py.test --cov --show-capture=no - name: Combine coverage run: coverage combine $COVFILES From 77fcc0038ce4a5207fad29e33c88ed51b53bd3ee Mon Sep 17 00:00:00 2001 From: Ryan Bunney Date: Thu, 26 Sep 2024 16:14:30 +0800 Subject: [PATCH 11/12] LIU-406: Code review updates --- .../dlg/manager/composite_manager.py | 20 +++--- daliuge-engine/dlg/manager/manager_data.py | 72 +++++++++++++------ daliuge-engine/dlg/manager/rest.py | 37 +++++----- daliuge-translator/setup.py | 3 + 4 files changed, 80 insertions(+), 52 deletions(-) diff --git a/daliuge-engine/dlg/manager/composite_manager.py b/daliuge-engine/dlg/manager/composite_manager.py index f01a38585..e4943ebb2 100644 --- a/daliuge-engine/dlg/manager/composite_manager.py +++ b/daliuge-engine/dlg/manager/composite_manager.py @@ -222,8 +222,6 @@ def dmHosts(self): def addDmHost(self, host_str: str): host = Node(host_str) - # if not ":" in host: - # host += f":{self._dmPort}" if host not in self._dmHosts: self._dmHosts.append(host) logger.debug("Added sub-manager %s", host) @@ -254,6 +252,16 @@ def remove_node(self, node): else: self._nodes.remove(node) + def get_node_from_json(self, node_str): + """ + Given a node str, return the Node we have stored + Return: Node + Raises: ValueError if there is no existing Node added to the CompositeManager + """ + + idx = self._nodes.index(Node(node_str)) + return self._nodes[idx] + @property def dmPort(self): return self._dmPort @@ -266,18 +274,14 @@ def check_dm(self, host: Node, port: int = None, timeout=10): port = port or self._dmPort logger.debug("Checking DM presence at %s port %d", host, port) - dm_is_there = portIsOpen(host_name, port, timeout) - return dm_is_there + return portIsOpen(host_name, port, timeout) def dmAt(self, host): if not self.check_dm(host): raise SubManagerException( f"Manager expected but not running in {host.host}:{host.port}" ) - # if not ":" in host: - # port = port or self._dmPort - # else: - # host, port = host.split(":") + return NodeManagerClient(host.host, host.port, 10) def getSessionIds(self): diff --git a/daliuge-engine/dlg/manager/manager_data.py b/daliuge-engine/dlg/manager/manager_data.py index 8b5f8327d..bde43642e 100644 --- a/daliuge-engine/dlg/manager/manager_data.py +++ b/daliuge-engine/dlg/manager/manager_data.py @@ -24,12 +24,12 @@ This module contains classes and helper-methods to support the various manager classes """ -from dataclasses import dataclass - +import logging from dlg import constants - from enum import IntEnum +logger = logging.getLogger(__name__) + class NodeProtocolPosition(IntEnum): HOST = 0 @@ -37,6 +37,7 @@ class NodeProtocolPosition(IntEnum): EVENTS_PORT = 2 RPC_PORT = 3 + class Node: """ Class for encapsulating compute node information to standardise @@ -44,23 +45,51 @@ class Node: """ def __init__(self, host: str): - chunks = host.split(':') - num_chunks = len(chunks) - self.host = constants.NODE_DEFAULT_HOSTNAME - self.port = constants.NODE_DEFAULT_REST_PORT - self.events_port = constants.NODE_DEFAULT_EVENTS_PORT - self.rpc_port = constants.NODE_DEFAULT_RPC_PORT - self._rest_port_specified = False - - if num_chunks >= 1: - self.host = chunks[NodeProtocolPosition.HOST] - if num_chunks >= 2: - self.port = int(chunks[NodeProtocolPosition.PORT]) - self._rest_port_specified = True - if num_chunks >= 3: - self.events_port = int(chunks[NodeProtocolPosition.EVENTS_PORT]) - if num_chunks >= 4: - self.rpc_port = int(chunks[NodeProtocolPosition.RPC_PORT]) + try: + chunks = host.split(':') + num_chunks = len(chunks) + self.host = constants.NODE_DEFAULT_HOSTNAME + self.port = constants.NODE_DEFAULT_REST_PORT + self.events_port = constants.NODE_DEFAULT_EVENTS_PORT + self.rpc_port = constants.NODE_DEFAULT_RPC_PORT + self._rest_port_specified = False + + if num_chunks >= 1: + self.host = chunks[NodeProtocolPosition.HOST] + if num_chunks >= 2: + self.port = self._validate_port(chunks[NodeProtocolPosition.PORT]) + self._rest_port_specified = True + if num_chunks >= 3: + self.events_port = self._validate_port( + chunks[NodeProtocolPosition.EVENTS_PORT]) + if num_chunks >= 4: + self.rpc_port = self._validate_port(chunks[NodeProtocolPosition.RPC_PORT]) + except AttributeError as e: + logger.exception("Node has been passed non-string object: ", e) + raise RuntimeError( + "Constructor has been passed non-string object and cannot" + "be converted to Node: type %s.", type(host), + ) + except ValueError as e: + logger.error("An issue has occured with translating node information", + exc_info=e) + + def _validate_port(self, port: str) -> int: + """ + Confirm the port provided is within the correct range of ports and returns it + as an integer. + + Param: + + Raises: Invalid port + + :return: int, integer representation of port passed to the command line. + """ + validated_port = int(port) + if not 1 <= validated_port <= 65535: + raise ValueError("Port numbers must be between 1 and 65535") + else: + return validated_port def serialize(self): """ @@ -88,9 +117,6 @@ def __str__(self): """ return self.serialize() - # def __repr__(self): - # return str(self) - def __eq__(self, other): if isinstance(other, Node): return hash(self) == hash(other) diff --git a/daliuge-engine/dlg/manager/rest.py b/daliuge-engine/dlg/manager/rest.py index a09dddba2..f15160a65 100644 --- a/daliuge-engine/dlg/manager/rest.py +++ b/daliuge-engine/dlg/manager/rest.py @@ -82,8 +82,8 @@ def fwrapper(*args, **kwargs): # logger.debug("CORS request comming from: %s", origin) # logger.debug("Request method: %s", bottle.request.method) if origin is None or re.match( - r"(http://dlg-trans.local:80[0-9][0-9]|https://dlg-trans.icrar.org)", - origin, + r"(http://dlg-trans.local:80[0-9][0-9]|https://dlg-trans.icrar.org)", + origin, ): pass elif re.match(r"http://((localhost)|(127.0.0.1)):80[0-9][0-9]", origin): @@ -108,7 +108,7 @@ def fwrapper(*args, **kwargs): # logger.debug("Bottle sending back result: %s", jres[: min(len(jres), 80)]) return jres except Exception as e: - logger.exception("Error while fulfilling request for func %s ",func) + logger.exception("Error while fulfilling request for func %s ", func) status, eargs = 500, () if isinstance(e, NotImplementedError): @@ -340,8 +340,8 @@ def addGraphParts(self, sessionId): # WARNING: TODO: Somehow, the content_type can be overwritten to 'text/plain' logger.debug("Graph content type: %s", bottle.request.content_type) if ( - "application/json" not in bottle.request.content_type - and "text/plain" not in bottle.request.content_type + "application/json" not in bottle.request.content_type + and "text/plain" not in bottle.request.content_type ): bottle.response.status = 415 return @@ -452,10 +452,9 @@ def add_node_subscriptions(self, sessionId): self.dm.add_node_subscriptions(sessionId, subscriptions) def _parse_subscriptions(self, json_request): - relationships = {} - for host, droprels in json_request.items(): - relationships[Node(host)] = droprels - return relationships + return {Node(host): droprels + for host, droprels + in json_request.items()} @daliuge_aware def trigger_drops(self, sessionId): @@ -569,7 +568,7 @@ def getNodeSessions(self, node): """ host_node = Node(node) if host_node not in self.dm.nodes: - raise Exception(f"{host_node} not in current list of nodes") + raise RuntimeError(f"{host_node} not in current list of nodes") with NodeManagerClient(host=host_node.host, port=host_node.port) as dm: return dm.sessions() @@ -579,8 +578,6 @@ def _tarfile_write(self, tar, headers, stream): m = Message() m.add_header("content-disposition", file_header) filename = m.get_params("filename") - # _, params = cgi.parse_header(file_header) - # filename = params["filename"] info = tarfile.TarInfo(filename) info.size = int(length) @@ -621,8 +618,7 @@ def getNodeSessionInformation(self, node_str, sessionId): with NodeManagerClient(host=node.host, port=node.port) as dm: return dm.session(sessionId) except ValueError as e: - raise Exception(f"{node_str} not in current list of nodes:", e) - + raise ValueError(f"{node_str} not in current list of nodes") from e @daliuge_aware def getNodeSessionStatus(self, node_str, sessionId): @@ -631,7 +627,7 @@ def getNodeSessionStatus(self, node_str, sessionId): with NodeManagerClient(host=node.host, port=node.port) as dm: return dm.session_status(sessionId) except ValueError as e: - raise Exception(f"{node_str} not in current list of nodes:", e) + raise ValueError(f"{node_str} not in current list of nodes") from e @daliuge_aware def getNodeGraph(self, node_str, sessionId): @@ -640,7 +636,7 @@ def getNodeGraph(self, node_str, sessionId): with NodeManagerClient(host=node.host, port=node.port) as dm: return dm.graph(sessionId) except ValueError as e: - raise Exception(f"{node_str} not in current list of nodes:", e) + raise ValueError(f"{node_str} not in current list of nodes") from e @daliuge_aware def getNodeGraphStatus(self, node_str, sessionId): @@ -648,9 +644,8 @@ def getNodeGraphStatus(self, node_str, sessionId): node = self.dm.get_node_from_json(node_str) with NodeManagerClient(host=node.host, port=node.port) as dm: return dm.graph_status(sessionId) - except ValueError as e: - raise Exception(f"{node_str} not in current list of nodes:", e) + raise ValueError(f"{node_str} not in current list of nodes") from e # =========================================================================== # non-REST methods @@ -697,7 +692,7 @@ def initializeSpecifics(self, app): @daliuge_aware def createDataIsland(self, host): with RestClient( - host=host, port=constants.DAEMON_DEFAULT_REST_PORT, timeout=10 + host=host, port=constants.DAEMON_DEFAULT_REST_PORT, timeout=10 ) as c: c._post_json("/managers/island/start", bottle.request.body.read()) self.dm.addDmHost(host) @@ -763,14 +758,14 @@ def getNMInfo(self, host): @daliuge_aware def getDIMInfo(self, host): with RestClient( - host=host, port=constants.DAEMON_DEFAULT_REST_PORT, timeout=10 + host=host, port=constants.DAEMON_DEFAULT_REST_PORT, timeout=10 ) as c: return json.loads(c._GET("/managers/island").read()) @daliuge_aware def getMMInfo(self, host): with RestClient( - host=host, port=constants.DAEMON_DEFAULT_REST_PORT, timeout=10 + host=host, port=constants.DAEMON_DEFAULT_REST_PORT, timeout=10 ) as c: return json.loads(c._GET("/managers/master").read()) diff --git a/daliuge-translator/setup.py b/daliuge-translator/setup.py index d8d58434d..f7738ef41 100644 --- a/daliuge-translator/setup.py +++ b/daliuge-translator/setup.py @@ -122,6 +122,8 @@ def package_files(directory): "wheel", ] +tests_requires = {"test": ['pytest']} + setup( name="daliuge-translator", version=get_version_info()[0], @@ -132,6 +134,7 @@ def package_files(directory): url="https://github.com/ICRAR/daliuge", license="LGPLv2+", install_requires=install_requires, + extras_require=tests_requires, packages=find_packages(), package_data={"dlg": src_files}, entry_points={"dlg.tool_commands": ["translator=dlg.translator.tool_commands"]}, From f973ba396fdf0772d7ee408dd4a7b79486a58292 Mon Sep 17 00:00:00 2001 From: Ryan Bunney Date: Thu, 26 Sep 2024 16:20:53 +0800 Subject: [PATCH 12/12] Node experiments: fix pylint errors --- daliuge-engine/dlg/manager/manager_data.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/daliuge-engine/dlg/manager/manager_data.py b/daliuge-engine/dlg/manager/manager_data.py index bde43642e..d382144fe 100644 --- a/daliuge-engine/dlg/manager/manager_data.py +++ b/daliuge-engine/dlg/manager/manager_data.py @@ -65,11 +65,10 @@ def __init__(self, host: str): if num_chunks >= 4: self.rpc_port = self._validate_port(chunks[NodeProtocolPosition.RPC_PORT]) except AttributeError as e: - logger.exception("Node has been passed non-string object: ", e) raise RuntimeError( "Constructor has been passed non-string object and cannot" "be converted to Node: type %s.", type(host), - ) + ) from e except ValueError as e: logger.error("An issue has occured with translating node information", exc_info=e)