Skip to content

Commit

Permalink
Merge pull request #283 from ICRAR/node-experiments
Browse files Browse the repository at this point in the history
Setup new Node class to improve consistency of node usage in `dlg-engine`
  • Loading branch information
myxie authored Oct 22, 2024
2 parents 0fafa38 + 4efc36a commit 4be953f
Show file tree
Hide file tree
Showing 13 changed files with 418 additions and 194 deletions.
1 change: 1 addition & 0 deletions daliuge-common/dlg/clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ def __init__(
super(NodeManagerClient, self).__init__(host=host, port=port, timeout=timeout)

def add_node_subscriptions(self, sessionId, node_subscriptions):

self._post_json(
f"/sessions/{quote(sessionId)}/subscriptions", node_subscriptions
)
Expand Down
5 changes: 2 additions & 3 deletions daliuge-common/dlg/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,15 @@
"NODE_DEFAULT_RPC_PORT": 6666,
}

NODE_DEFAULT_HOSTNAME = "localhost"

# just for backwards compatibility
NODE_DEFAULT_REST_PORT = DEFAULT_PORTS["NODE_DEFAULT_REST_PORT"]
ISLAND_DEFAULT_REST_PORT = DEFAULT_PORTS["ISLAND_DEFAULT_REST_PORT"]
MASTER_DEFAULT_REST_PORT = DEFAULT_PORTS["MASTER_DEFAULT_REST_PORT"]

REPLAY_DEFAULT_REST_PORT = DEFAULT_PORTS["REPLAY_DEFAULT_REST_PORT"]

DAEMON_DEFAULT_REST_PORT = DEFAULT_PORTS["DAEMON_DEFAULT_REST_PORT"]


# Others ports used by the Node Managers
NODE_DEFAULT_EVENTS_PORT = DEFAULT_PORTS["NODE_DEFAULT_EVENTS_PORT"]
NODE_DEFAULT_RPC_PORT = DEFAULT_PORTS["NODE_DEFAULT_RPC_PORT"]
141 changes: 83 additions & 58 deletions daliuge-engine/dlg/manager/composite_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,21 @@
import multiprocessing.pool
import threading

from dlg.manager.client import NodeManagerClient
from dlg.manager.drop_manager import DROPManager
from dlg.manager.manager_data import Node

from dlg import constants
from .client import NodeManagerClient
from dlg.constants import ISLAND_DEFAULT_REST_PORT, NODE_DEFAULT_REST_PORT
from .drop_manager import DROPManager
from .. import graph_loader
from dlg import graph_loader
from dlg.common.reproducibility.reproducibility import init_pg_repro_data
from ..ddap_protocol import DROPRel
from dlg.ddap_protocol import DROPRel
from dlg.exceptions import (
InvalidGraphException,
DaliugeException,
SubManagerException,
)
from ..utils import portIsOpen
from dlg.utils import portIsOpen

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -105,7 +107,8 @@ def sanitize_relations(interDMRelations, graph):
def group_by_node(uids, graph):
uids_by_node = collections.defaultdict(list)
for uid in uids:
uids_by_node[graph[uid]["node"]].append(uid)
uids_by_node[Node(graph[uid]["node"])].append(uid)
logger.info(uids_by_node)
return uids_by_node


Expand Down Expand Up @@ -133,13 +136,13 @@ class allows for multiple levels of hierarchy seamlessly.
__metaclass__ = abc.ABCMeta

def __init__(
self,
dmPort,
partitionAttr,
subDmId,
dmHosts: list[str] = None,
pkeyPath=None,
dmCheckTimeout=10,
self,
dmPort,
partitionAttr,
subDmId,
dmHosts: list[str] = None,
pkeyPath=None,
dmCheckTimeout=10,
):
"""
Creates a new CompositeManager. The sub-DMs it manages are to be located
Expand All @@ -156,12 +159,12 @@ def __init__(
:param: dmCheckTimeout The timeout used before giving up and declaring
a sub-DM as not-yet-present in a given host
"""
if dmHosts and ":" in dmHosts[0]:
self._dmHosts = [Node(host) for host in dmHosts]
if self._dmHosts and self._dmHosts[0].rest_port_specified:
dmPort = -1
self._dmPort = dmPort
self._partitionAttr = partitionAttr
self._subDmId = subDmId
self._dmHosts = dmHosts if dmHosts else []
self._graph = {}
self._drop_rels = {}
self._sessionIds = (
Expand All @@ -176,6 +179,7 @@ def __init__(
# This list is different from the dmHosts, which are the machines that
# are directly managed by this manager (which in turn could manage more
# machines)
self.use_dmHosts = False
self._nodes = []

self.startDMChecker()
Expand Down Expand Up @@ -206,83 +210,101 @@ def _checkDM(self):
break
if not self.check_dm(host, self._dmPort, timeout=self._dmCheckTimeout):
logger.error(
"Couldn't contact manager for host %s:%d, will try again later",
"Couldn't contact manager for host %s with dmPort %d, will try again later",
host, self._dmPort,
)
if self._dmCheckerEvt.wait(60):
break

@property
def dmHosts(self):
return self._dmHosts[:]
return [str(n) for n in self._dmHosts[:]]

def addDmHost(self, host):
if not ":" in host:
host += f":{self._dmPort}"
def addDmHost(self, host_str: str):
host = Node(host_str)
if host not in self._dmHosts:
self._dmHosts.append(host)
logger.debug("Added sub-manager %s", host)
else:
logger.warning("Host %s already registered.", host)

def removeDmHost(self, host):
def removeDmHost(self, host_str):
host = Node(host_str)
if host in self._dmHosts:
self._dmHosts.remove(host)

@property
def nodes(self):
return self._nodes[:]
if self.use_dmHosts:
return [str(n) for n in self._dmHosts[:]]
else:
return self._nodes

def add_node(self, node):
self._nodes.append(node)
def add_node(self, node: Node):
if self.use_dmHosts:
return self._dmHosts.append(node)
else:
self._nodes.append(node)

def remove_node(self, node):
self._nodes.remove(node)
if self.use_dmHosts:
self._dmHosts.remove(node)
else:
self._nodes.remove(node)

def get_node_from_json(self, node_str):
"""
Given a node str, return the Node we have stored
Return: Node
Raises: ValueError if there is no existing Node added to the CompositeManager
"""

idx = self._nodes.index(Node(node_str))
return self._nodes[idx]

@property
def dmPort(self):
return self._dmPort

def check_dm(self, host, port=None, timeout=10):
if ":" in host:
host, port = host.split(":")
port = int(port)
def check_dm(self, host: Node, port: int = None, timeout=10):
host_name = host.host
if host.rest_port_specified:
port = host.port
else:
port = port or self._dmPort

logger.debug("Checking DM presence at %s port %d", host, port)
dm_is_there = portIsOpen(host, port, timeout)
return dm_is_there
return portIsOpen(host_name, port, timeout)

def dmAt(self, host, port=None):
if not self.check_dm(host, port):
def dmAt(self, host):
if not self.check_dm(host):
raise SubManagerException(
f"Manager expected but not running in {host}:{port}"
f"Manager expected but not running in {host.host}:{host.port}"
)
if not ":" in host:
port = port or self._dmPort
else:
host, port = host.split(":")
return NodeManagerClient(host, port, 10)

return NodeManagerClient(host.host, host.port, 10)

def getSessionIds(self):
return self._sessionIds

#
# Replication of commands to underlying drop managers
# If "collect" is given, then individual results are also kept in the given
# structure, which is either a dictionary or a list
#
def _do_in_host(self, action, sessionId, exceptions, f, collect, port, iterable):
"""
Replication of commands to underlying drop managers
If "collect" is given, then individual results are also kept in the given
structure, which is either a dictionary or a list
"""

host = iterable
if isinstance(iterable, (list, tuple)):
host = iterable[0]
if ":" in host:
host, port = host.split(":")
port = int(port)
host = iterable[0] # What's going on here?
# if ":" in host:
# host, port = host.split(":")
# port = int(port)
if isinstance(host, str):
host = Node(host)

try:
with self.dmAt(host, port) as dm:
with self.dmAt(host) as dm:
res = f(dm, iterable, sessionId)

if isinstance(collect, dict):
Expand All @@ -291,13 +313,14 @@ def _do_in_host(self, action, sessionId, exceptions, f, collect, port, iterable)
collect.append(res)

except Exception as e:
exceptions[host] = e
exceptions[str(host)] = e
logger.exception(
"Error while %s on host %s:%d, session %s",
"Error while %s on host %s:%d, session %s, when executing %s",
action,
host,
port,
host.host,
host.port,
sessionId,
f
)

def replicate(self, sessionId, f, action, collect=None, iterable=None, port=None):
Expand Down Expand Up @@ -398,7 +421,7 @@ def addGraphSpec(self, sessionId, graphSpec):
)
raise InvalidGraphException(msg)

partition = dropSpec[self._partitionAttr]
partition = Node(dropSpec[self._partitionAttr])
if partition not in self._dmHosts:
msg = (
f"Drop {dropSpec.get('oid', None)}'s {self._partitionAttr} {partition} "
Expand Down Expand Up @@ -431,8 +454,8 @@ def addGraphSpec(self, sessionId, graphSpec):
for rel in inter_partition_rels:
# rhn = self._graph[rel.rhs]["node"].split(":")[0]
# lhn = self._graph[rel.lhs]["node"].split(":")[0]
rhn = self._graph[rel.rhs]["node"]
lhn = self._graph[rel.lhs]["node"]
rhn = (self._graph[rel.rhs]["node"])
lhn = (self._graph[rel.lhs]["node"])
drop_rels[lhn][rhn].append(rel)
drop_rels[rhn][lhn].append(rel)

Expand Down Expand Up @@ -462,6 +485,7 @@ def _deploySession(self, dm, host, sessionId):

def _triggerDrops(self, dm, host_and_uids, sessionId):
host, uids = host_and_uids

dm.trigger_drops(sessionId, uids)
logger.info(
"Successfully triggered drops for session %s on %s",
Expand Down Expand Up @@ -551,7 +575,7 @@ def getGraph(self, sessionId):
return allGraphs

def _getSessionStatus(self, dm, host, sessionId):
return {host: dm.getSessionStatus(sessionId)}
return {str(host): dm.getSessionStatus(sessionId)}

def getSessionStatus(self, sessionId):
allStatus = {}
Expand Down Expand Up @@ -593,7 +617,8 @@ def __init__(self, dmHosts: list[str] = None, pkeyPath=None, dmCheckTimeout=10):
)

# In the case of the Data Island the dmHosts are the final nodes as well
self._nodes = dmHosts
self.use_dmHosts = True
# self._nodes = dmHosts
logger.info("Created DataIslandManager for hosts: %r", self._dmHosts)


Expand Down
Loading

0 comments on commit 4be953f

Please sign in to comment.