From 5b586640fa00fcfe960a247d9c9d2985e45dc171 Mon Sep 17 00:00:00 2001 From: Boyuan Deng Date: Fri, 15 Sep 2023 19:29:52 -0400 Subject: [PATCH 1/5] Tentative proposal. --- comm/base_comm.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/comm/base_comm.py b/comm/base_comm.py index eeddfdc..bffaa4e 100644 --- a/comm/base_comm.py +++ b/comm/base_comm.py @@ -166,6 +166,7 @@ class CommManager: def __init__(self): self.comms = {} + self.closed_comms = {} self.targets = {} def register_target(self, target_name, f): @@ -255,6 +256,22 @@ def comm_msg(self, stream, ident, msg): comm_id = content["comm_id"] comm = self.get_comm(comm_id) if comm is None: + if comm_id in self.closed_comms: + # We receive communication to a closed comm + try: + closed_comm = self.closed_comms[comm_id] + closed_comm.publish_msg( + "comm_close", + data={}, + metadata=None, + buffers=None, + ) + except Exception: + logger.error( + """Could not send comm_close for a closed comm to reply + for incoming communication""", + exc_info=True, + ) return try: @@ -271,6 +288,7 @@ def comm_close(self, stream, ident, msg): return self.comms[comm_id]._closed = True + self.closed_comms[comm_id] = comm del self.comms[comm_id] try: From 4ab4a7ef6d89ad2eea46fb5376af307c48302ac5 Mon Sep 17 00:00:00 2001 From: Boyuan Deng Date: Thu, 5 Oct 2023 04:16:27 -0400 Subject: [PATCH 2/5] Addressing Jason's comments. --- comm/base_comm.py | 30 ++++++++++++++---------------- 1 file changed, 14 insertions(+), 16 deletions(-) diff --git a/comm/base_comm.py b/comm/base_comm.py index bffaa4e..5735905 100644 --- a/comm/base_comm.py +++ b/comm/base_comm.py @@ -256,22 +256,20 @@ def comm_msg(self, stream, ident, msg): comm_id = content["comm_id"] comm = self.get_comm(comm_id) if comm is None: - if comm_id in self.closed_comms: - # We receive communication to a closed comm - try: - closed_comm = self.closed_comms[comm_id] - closed_comm.publish_msg( - "comm_close", - data={}, - metadata=None, - buffers=None, - ) - except Exception: - logger.error( - """Could not send comm_close for a closed comm to reply - for incoming communication""", - exc_info=True, - ) + try: + from comm import create_comm + closed_comm = create_comm( + comm_id=comm_id, + primary=False, + target_name=None, + ) + closed_comm.close() + except Exception: + logger.error( + """Could not send comm_close for a closed comm to reply + for incoming communication""", + exc_info=True, + ) return try: From 7b46e372a887f239feea0973a657bab4848ace74 Mon Sep 17 00:00:00 2001 From: Boyuan Deng Date: Thu, 5 Oct 2023 11:30:21 -0400 Subject: [PATCH 3/5] Cleaning previous commits. --- comm/base_comm.py | 1 - 1 file changed, 1 deletion(-) diff --git a/comm/base_comm.py b/comm/base_comm.py index 5735905..e1105f2 100644 --- a/comm/base_comm.py +++ b/comm/base_comm.py @@ -286,7 +286,6 @@ def comm_close(self, stream, ident, msg): return self.comms[comm_id]._closed = True - self.closed_comms[comm_id] = comm del self.comms[comm_id] try: From 079c231718203f259bf44b5fa32f7cb5ea3f83df Mon Sep 17 00:00:00 2001 From: Boyuan Deng Date: Thu, 5 Oct 2023 11:30:53 -0400 Subject: [PATCH 4/5] More cleaning. --- comm/base_comm.py | 1 - 1 file changed, 1 deletion(-) diff --git a/comm/base_comm.py b/comm/base_comm.py index e1105f2..55f9c2e 100644 --- a/comm/base_comm.py +++ b/comm/base_comm.py @@ -166,7 +166,6 @@ class CommManager: def __init__(self): self.comms = {} - self.closed_comms = {} self.targets = {} def register_target(self, target_name, f): From 363d166a8d9facbd90e75ce9698ade015d6da764 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sat, 4 Nov 2023 16:39:03 +0000 Subject: [PATCH 5/5] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- comm/base_comm.py | 1 + 1 file changed, 1 insertion(+) diff --git a/comm/base_comm.py b/comm/base_comm.py index 2f8916b..623fcd0 100644 --- a/comm/base_comm.py +++ b/comm/base_comm.py @@ -286,6 +286,7 @@ def comm_msg(self, stream: ZMQStream, ident: str, msg: MessageType) -> None: if comm is None: try: from comm import create_comm + closed_comm = create_comm( comm_id=comm_id, primary=False,