From db5c936a44a7910c1ca8102439f6b0e3c4959186 Mon Sep 17 00:00:00 2001 From: Alexander Tiderko Date: Thu, 21 Dec 2023 18:12:30 +0100 Subject: [PATCH] added save/get files to crossbar --- .../crossbar/file_interface.py | 46 ++- .../fkie_node_manager_daemon/file_servicer.py | 166 ++++++--- .../fkie_node_manager_daemon/file_servicer.py | 345 ++++++++++++------ .../launch_servicer.py | 1 - 4 files changed, 398 insertions(+), 160 deletions(-) diff --git a/fkie_multimaster_pylib/fkie_multimaster_pylib/crossbar/file_interface.py b/fkie_multimaster_pylib/fkie_multimaster_pylib/crossbar/file_interface.py index ec7de15c..34fe1e79 100644 --- a/fkie_multimaster_pylib/fkie_multimaster_pylib/crossbar/file_interface.py +++ b/fkie_multimaster_pylib/fkie_multimaster_pylib/crossbar/file_interface.py @@ -31,6 +31,7 @@ # POSSIBILITY OF SUCH DAMAGE. import json +import os class RosPackage: @@ -43,12 +44,12 @@ def __str__(self): class PathItem: - ''' + """ :param str path: absolute path of the file or directory :param float mtime: time of last modification of path. The return value is a number giving the number of seconds since the epoch :param int size: size, in bytes, of path :param str path_type: one of types {file, dir, symlink, package} - ''' + """ def __init__(self, path: str, mtime: float, size: int, path_type: str) -> None: self.path = path @@ -61,15 +62,22 @@ def __str__(self): class LogPathItem: - ''' + """ :param str node: complete node name :param str screen_log: the absolute path to the screen log file. :param bool screen_log_exists: False if the file does not exists. :param str ros_log: the absolute path to the ros log file. :param bool ros_log_exists: False if the file does not exists. - ''' + """ - def __init__(self, node: str, screen_log: str = '', screen_log_exists: bool = False, ros_log: str = '', ros_log_exists: bool = False) -> None: + def __init__( + self, + node: str, + screen_log: str = "", + screen_log_exists: bool = False, + ros_log: str = "", + ros_log_exists: bool = False, + ) -> None: self.node = node self.screen_log = screen_log self.screen_log_exists = screen_log_exists @@ -78,3 +86,31 @@ def __init__(self, node: str, screen_log: str = '', screen_log_exists: bool = Fa def __str__(self): return json.dumps(dict(self), ensure_ascii=False) + + +class FileItem: + """ + :param str path: absolute path of the file or directory + :param float mtime: time of last modification of path. The return value is a number giving the number of seconds since the epoch + :param int size: size, in bytes, of path + :param str value: content of the file + """ + + def __init__( + self, + path: str, + mtime: float = 0, + size: int = 0, + value: str = "", + encoding="utf-8", + ) -> None: + self.path = path + self.fileName = os.path.split(path)[-1] + self.mtime = mtime + self.size = size + self.extension = path.rsplit(".", 1)[-1] + self.value = value + self.encoding = encoding + + def __str__(self): + return json.dumps(dict(self), ensure_ascii=False) diff --git a/fkie_node_manager_daemon/fkie_node_manager_daemon/file_servicer.py b/fkie_node_manager_daemon/fkie_node_manager_daemon/file_servicer.py index f83b7236..892a72e0 100644 --- a/fkie_node_manager_daemon/fkie_node_manager_daemon/file_servicer.py +++ b/fkie_node_manager_daemon/fkie_node_manager_daemon/file_servicer.py @@ -18,18 +18,19 @@ # limitations under the License. -import glob +from io import FileIO import os import re import asyncio from autobahn import wamp import json -import os +from types import SimpleNamespace from typing import List from fkie_multimaster_pylib import ros_pkg from fkie_multimaster_pylib.crossbar.base_session import CrossbarBaseSession from fkie_multimaster_pylib.crossbar.base_session import SelfEncoder +from fkie_multimaster_pylib.crossbar.file_interface import FileItem from fkie_multimaster_pylib.crossbar.file_interface import RosPackage from fkie_multimaster_pylib.crossbar.file_interface import PathItem from fkie_multimaster_pylib.crossbar.file_interface import LogPathItem @@ -39,122 +40,199 @@ class FileServicer(CrossbarBaseSession): - FILE_CHUNK_SIZE = 1024 - def __init__(self, loop: asyncio.AbstractEventLoop, realm: str = 'ros', port: int = 11911): + def __init__( + self, loop: asyncio.AbstractEventLoop, realm: str = "ros", port: int = 11911 + ): Log.info("Create ROS2 file manager servicer") CrossbarBaseSession.__init__(self, loop, realm, port) # TODO: clear cache after detected change or time? self.CB_DIR_CACHE = {} def stop(self): - ''' - ''' + """ """ self.shutdown() - @wamp.register('ros.packages.get_list') + @wamp.register("ros.packages.get_list") def getPackageList(self, clear_cache: bool = False) -> List[RosPackage]: - Log.info( - f"{self.__class__.__name__}: Request to [ros.packages.get_list]") + Log.info(f"{self.__class__.__name__}: Request to [ros.packages.get_list]") clear_cache = False if clear_cache: try: from roslaunch import substitution_args import rospkg + substitution_args._rospack = rospkg.RosPack() except Exception as err: Log.warn( - f"{self.__class__.__name__}: Cannot reset package cache: {err}") + f"{self.__class__.__name__}: Cannot reset package cache: {err}" + ) package_list: List[RosPackage] = [] # fill the input fields ret = ros_pkg.get_packages(None) for name, path in ret.items(): - package = RosPackage( - name=name, path=os.path.join(path, 'share', name)) + package = RosPackage(name=name, path=os.path.join(path, "share", name)) package_list.append(package) return json.dumps(package_list, cls=SelfEncoder) - @wamp.register('ros.path.get_log_paths') + @wamp.register("ros.path.get_log_paths") def getLogPaths(self, nodes: List[str]) -> List[LogPathItem]: Log.info( - f"{self.__class__.__name__}: Request to [ros.path.get_log_paths] for {nodes}") + f"{self.__class__.__name__}: Request to [ros.path.get_log_paths] for {nodes}" + ) result = [] for node in nodes: namespace = None node_name = node - namespace_search = re.search('/(.*)/', node_name) + namespace_search = re.search("/(.*)/", node_name) if namespace_search is not None: - namespace = f'/{namespace_search.group(1)}' - node_name = node.replace(f'/{namespace}/', '') + namespace = f"/{namespace_search.group(1)}" + node_name = node.replace(f"/{namespace}/", "") screen_log = get_logfile( - node=node_name, for_new_screen=True, namespace=namespace) + node=node_name, for_new_screen=True, namespace=namespace + ) ros_log = get_ros_logfile(node) - log_path_item = LogPathItem(node, - screen_log=screen_log, - screen_log_exists=os.path.exists( - screen_log), - ros_log=ros_log, - ros_log_exists=os.path.exists(ros_log)) + log_path_item = LogPathItem( + node, + screen_log=screen_log, + screen_log_exists=os.path.exists(screen_log), + ros_log=ros_log, + ros_log_exists=os.path.exists(ros_log), + ) result.append(log_path_item) return json.dumps(result, cls=SelfEncoder) - @wamp.register('ros.path.get_list') + @wamp.register("ros.path.get_list") def getPathList(self, inputPath: str) -> List[PathItem]: Log.info( - f"{self.__class__.__name__}: Request to [ros.path.get_list] for {inputPath}") + f"{self.__class__.__name__}: Request to [ros.path.get_list] for {inputPath}" + ) path_list: List[PathItem] = [] # list the path dirlist = os.listdir(inputPath) for cfile in dirlist: - path = os.path.normpath('%s%s%s' % (inputPath, os.path.sep, cfile)) + path = os.path.normpath("%s%s%s" % (inputPath, os.path.sep, cfile)) if os.path.isfile(path): - path_list.append(PathItem(path=path, mtime=os.path.getmtime( - path), size=os.path.getsize(path), path_type='file')) + path_list.append( + PathItem( + path=path, + mtime=os.path.getmtime(path), + size=os.path.getsize(path), + path_type="file", + ) + ) elif path in self.CB_DIR_CACHE: - path_list.append(PathItem(path=path, mtime=os.path.getmtime( - path), size=os.path.getsize(path), path_type=self.CB_DIR_CACHE[path])) + path_list.append( + PathItem( + path=path, + mtime=os.path.getmtime(path), + size=os.path.getsize(path), + path_type=self.CB_DIR_CACHE[path], + ) + ) elif os.path.isdir(path): try: fileList = os.listdir(path) file_type = None if ros_pkg.is_package(fileList): - file_type = 'package' + file_type = "package" else: - file_type = 'dir' + file_type = "dir" self.CB_DIR_CACHE[path] = file_type - path_list.append(PathItem(path=path, mtime=os.path.getmtime( - path), size=os.path.getsize(path), path_type=file_type)) + path_list.append( + PathItem( + path=path, + mtime=os.path.getmtime(path), + size=os.path.getsize(path), + path_type=file_type, + ) + ) except Exception as _: pass return json.dumps(path_list, cls=SelfEncoder) - def _glob(self, inputPath: str, recursive: bool = True, withHidden: bool = False, filter: List[str] = []) -> List[PathItem]: + def _glob( + self, + inputPath: str, + recursive: bool = True, + withHidden: bool = False, + filter: List[str] = [], + ) -> List[PathItem]: path_list: List[PathItem] = [] dir_list: List[str] = [] for name in os.listdir(inputPath): - if not withHidden and name.startswith('.'): + if not withHidden and name.startswith("."): continue filename = os.path.join(inputPath, name) if os.path.isfile(filename): - path_list.append(PathItem(path=filename, mtime=os.path.getmtime( - filename), size=os.path.getsize(filename), path_type='file')) + path_list.append( + PathItem( + path=filename, + mtime=os.path.getmtime(filename), + size=os.path.getsize(filename), + path_type="file", + ) + ) elif os.path.isdir(filename) and recursive: if name not in filter: dir_list.append(filename) # glob the directories at the end for filename in dir_list: - path_list.extend(self._glob( - inputPath=filename, recursive=recursive, withHidden=withHidden, filter=filter)) + path_list.extend( + self._glob( + inputPath=filename, + recursive=recursive, + withHidden=withHidden, + filter=filter, + ) + ) return path_list - @wamp.register('ros.path.get_list_recursive') + @wamp.register("ros.path.get_list_recursive") def getPathListRecursive(self, inputPath: str) -> List[PathItem]: Log.info( - f"{self.__class__.__name__}: Request to [ros.path.get_list_recursive] for {inputPath}") + f"{self.__class__.__name__}: Request to [ros.path.get_list_recursive] for {inputPath}" + ) path_list: List[PathItem] = self._glob( - inputPath, recursive=True, withHidden=False, filter=['node_modules']) + inputPath, recursive=True, withHidden=False, filter=["node_modules"] + ) return json.dumps(path_list, cls=SelfEncoder) + + @wamp.register("ros.file.get") + def getFileContent(self, requestPath: str) -> FileItem: + Log.info("Request to [ros.file.get] for %s" % requestPath) + with FileIO(requestPath, "r") as outfile: + mTime = os.path.getmtime(requestPath) + fSize = os.path.getsize(requestPath) + content = outfile.readall() + encoding = "utf-8" + try: + content = content.decode(encoding) + except: + content = content.hex() + encoding = "hex" + return json.dumps( + FileItem(requestPath, mTime, fSize, content, encoding), cls=SelfEncoder + ) + + @wamp.register("ros.file.save") + def saveFileContent(self, request_json: FileItem) -> int: + # Covert input dictionary into a proper python object + file = json.loads( + json.dumps(request_json), object_hook=lambda d: SimpleNamespace(**d) + ) + Log.info("Request to [ros.file.save] for %s" % file.path) + with FileIO(file.path, "w+") as outfile: + content = file.value + if file.encoding == "utf-8": + content = content.encode("utf-8") + elif file.encoding == "hex": + content = bytes.fromhex(content) + else: + raise TypeError(f"unknown encoding {file.encoding}") + bytesWritten = outfile.write(content) + return json.dumps(bytesWritten, cls=SelfEncoder) diff --git a/fkie_node_manager_daemon/src/fkie_node_manager_daemon/file_servicer.py b/fkie_node_manager_daemon/src/fkie_node_manager_daemon/file_servicer.py index 7411923e..b93efe12 100644 --- a/fkie_node_manager_daemon/src/fkie_node_manager_daemon/file_servicer.py +++ b/fkie_node_manager_daemon/src/fkie_node_manager_daemon/file_servicer.py @@ -33,14 +33,13 @@ from io import FileIO import os -import rospy import shutil -import glob import re import json import asyncio from autobahn import wamp +from types import SimpleNamespace import fkie_multimaster_msgs.grpc.file_pb2_grpc as fms_grpc import fkie_multimaster_msgs.grpc.file_pb2 as fms @@ -50,6 +49,7 @@ from fkie_multimaster_pylib import settings from fkie_multimaster_pylib.crossbar.base_session import CrossbarBaseSession from fkie_multimaster_pylib.crossbar.base_session import SelfEncoder +from fkie_multimaster_pylib.crossbar.file_interface import FileItem from fkie_multimaster_pylib.crossbar.file_interface import RosPackage from fkie_multimaster_pylib.crossbar.file_interface import PathItem from fkie_multimaster_pylib.crossbar.file_interface import LogPathItem @@ -63,24 +63,29 @@ from typing import List -OK = fms.ReturnStatus.StatusType.Value('OK') -ERROR = fms.ReturnStatus.StatusType.Value('ERROR') -IO_ERROR = fms.ReturnStatus.StatusType.Value('IO_ERROR') -OS_ERROR = fms.ReturnStatus.StatusType.Value('OS_ERROR') -CHANGED_FILE = fms.ReturnStatus.StatusType.Value('CHANGED_FILE') -REMOVED_FILE = fms.ReturnStatus.StatusType.Value('REMOVED_FILE') -PATH_PACKAGE = fms.PathObj.PathType.Value('PACKAGE') -PATH_DIR = fms.PathObj.PathType.Value('DIR') -PATH_FILE = fms.PathObj.PathType.Value('FILE') -PATH_SYMLINK = fms.PathObj.PathType.Value('SYMLINK') -MANIFEST_FILE = 'manifest.xml' +OK = fms.ReturnStatus.StatusType.Value("OK") +ERROR = fms.ReturnStatus.StatusType.Value("ERROR") +IO_ERROR = fms.ReturnStatus.StatusType.Value("IO_ERROR") +OS_ERROR = fms.ReturnStatus.StatusType.Value("OS_ERROR") +CHANGED_FILE = fms.ReturnStatus.StatusType.Value("CHANGED_FILE") +REMOVED_FILE = fms.ReturnStatus.StatusType.Value("REMOVED_FILE") +PATH_PACKAGE = fms.PathObj.PathType.Value("PACKAGE") +PATH_DIR = fms.PathObj.PathType.Value("DIR") +PATH_FILE = fms.PathObj.PathType.Value("FILE") +PATH_SYMLINK = fms.PathObj.PathType.Value("SYMLINK") +MANIFEST_FILE = "manifest.xml" class FileServicer(fms_grpc.FileServiceServicer, CrossbarBaseSession): - FILE_CHUNK_SIZE = 1024 - def __init__(self, loop: asyncio.AbstractEventLoop, realm: str = 'ros', port: int = 11911, test_env=False): + def __init__( + self, + loop: asyncio.AbstractEventLoop, + realm: str = "ros", + port: int = 11911, + test_env=False, + ): Log.info("Create file manger servicer") fms_grpc.FileServiceServicer.__init__(self) CrossbarBaseSession.__init__(self, loop, realm, port, test_env=test_env) @@ -88,20 +93,20 @@ def __init__(self, loop: asyncio.AbstractEventLoop, realm: str = 'ros', port: in self.CB_DIR_CACHE = {} self._peers = {} -# def _terminated(self): -# Log.info("terminated context") -# -# def _register_callback(self, context): -# if (context.peer() not in self._peers): -# Log.info("Add callback to peer context @%s" % context.peer()) -# if context.add_callback(self._terminated): -# pass -# # self._peers[context.peer()] = context + # def _terminated(self): + # Log.info("terminated context") + # + # def _register_callback(self, context): + # if (context.peer() not in self._peers): + # Log.info("Add callback to peer context @%s" % context.peer()) + # if context.add_callback(self._terminated): + # pass + # # self._peers[context.peer()] = context def GetFileContent(self, request, context): result = fms.GetFileContentReply() try: - with FileIO(request.path, 'r') as outfile: + with FileIO(request.path, "r") as outfile: result.file.path = xml.interpret_path(request.path) a = os.path.getmtime(request.path) result.file.mtime = a @@ -124,10 +129,45 @@ def GetFileContent(self, request, context): result.status.error_file = utf8(ioe.filename) yield result + @wamp.register("ros.file.get") + def getFileContent(self, requestPath: str) -> FileItem: + Log.info("Request to [ros.file.get] for %s" % requestPath) + with FileIO(requestPath, "r") as outfile: + mTime = os.path.getmtime(requestPath) + fSize = os.path.getsize(requestPath) + content = outfile.readall() + encoding = "utf-8" + try: + content = content.decode(encoding) + except: + content = content.hex() + encoding = "hex" + return json.dumps( + FileItem(requestPath, mTime, fSize, content, encoding), cls=SelfEncoder + ) + + @wamp.register("ros.file.save") + def saveFileContent(self, request_json: FileItem) -> int: + # Covert input dictionary into a proper python object + file = json.loads( + json.dumps(request_json), object_hook=lambda d: SimpleNamespace(**d) + ) + Log.info("Request to [ros.file.save] for %s" % file.path) + with FileIO(file.path, "w+") as outfile: + content = file.value + if file.encoding == "utf-8": + content = content.encode("utf-8") + elif file.encoding == "hex": + content = bytes.fromhex(content) + else: + raise TypeError(f"unknown encoding {file.encoding}") + bytesWritten = outfile.write(content) + return json.dumps(bytesWritten, cls=SelfEncoder) + def SaveFileContent(self, request_iterator, context): result = fms.SaveFileContentReply() try: - path = '' + path = "" dest_size = 0 curr_size = 0 first = True @@ -138,21 +178,25 @@ def SaveFileContent(self, request_iterator, context): pkg_path = ros_pkg.get_path(chunk.file.package) if pkg_path: path = os.path.join( - pkg_path, chunk.file.path.lstrip(os.path.sep)) + pkg_path, chunk.file.path.lstrip(os.path.sep) + ) else: path = chunk.file.path result = fms.SaveFileContentReply() if first: if os.path.exists(path): # checks for mtime - if chunk.overwrite or chunk.file.mtime == os.path.getmtime(path): - file_tmp = FileIO("%s.tmp" % path, 'w') + if chunk.overwrite or chunk.file.mtime == os.path.getmtime( + path + ): + file_tmp = FileIO("%s.tmp" % path, "w") dest_size = chunk.file.size else: result.status.code = CHANGED_FILE result.status.error_code = file_item.EFILE_CHANGED result.status.error_msg = utf8( - "file was changed in meantime") + "file was changed in meantime" + ) result.status.error_file = utf8(path) elif chunk.overwrite or chunk.file.mtime == 0: # mtime == 0 stands for create a new file @@ -160,13 +204,12 @@ def SaveFileContent(self, request_iterator, context): os.makedirs(os.path.dirname(path)) except OSError: pass - file_tmp = FileIO("%s.tmp" % path, 'w') + file_tmp = FileIO("%s.tmp" % path, "w") dest_size = chunk.file.size else: result.status.code = REMOVED_FILE result.status.error_code = file_item.EFILE_REMOVED - result.status.error_msg = utf8( - "file was removed in meantime") + result.status.error_msg = utf8("file was removed in meantime") result.status.error_file = utf8(path) first = False if result.status.code == 0: @@ -177,8 +220,7 @@ def SaveFileContent(self, request_iterator, context): written = len(chunk.file.data) if written != len(chunk.file.data): result.status.code = ERROR - result.status.error_msg = utf8( - "error while write to tmp file") + result.status.error_msg = utf8("error while write to tmp file") result.status.error_file = utf8(path) yield result curr_size += written @@ -231,16 +273,16 @@ def Rename(self, request, context): result.error_file = utf8(request.old) return result - def _gen_save_content_list(self, path, content, mtime, package=''): + def _gen_save_content_list(self, path, content, mtime, package=""): send_content = content while send_content: chunk = send_content # split into small parts on big files if len(chunk) > self.FILE_CHUNK_SIZE: - chunk = send_content[0:self.FILE_CHUNK_SIZE] - send_content = send_content[self.FILE_CHUNK_SIZE:] + chunk = send_content[0 : self.FILE_CHUNK_SIZE] + send_content = send_content[self.FILE_CHUNK_SIZE :] else: - send_content = '' + send_content = "" msg = fms.SaveFileContentRequest() msg.overwrite = mtime == 0 msg.file.path = path @@ -259,10 +301,9 @@ def CopyFileTo(self, request, context): pname, ppath = ros_pkg.get_name(dest_path) if pname: # we need relative package path without leading slash - prest = dest_path.replace(ppath, '').lstrip(os.path.sep) - with FileIO(path, 'r') as outfile: - mtime = 0.0 if request.overwrite else os.path.getmtime( - path) + prest = dest_path.replace(ppath, "").lstrip(os.path.sep) + with FileIO(path, "r") as outfile: + mtime = 0.0 if request.overwrite else os.path.getmtime(path) content = outfile.read() # get channel to the remote grpc server # TODO: get secure channel, if available @@ -270,8 +311,10 @@ def CopyFileTo(self, request, context): if channel is not None: # save file on remote server fs = fms_grpc.FileServiceStub(channel) - response_stream = fs.SaveFileContent(self._gen_save_content_list( - prest, content, mtime, pname), timeout=settings.GRPC_TIMEOUT) + response_stream = fs.SaveFileContent( + self._gen_save_content_list(prest, content, mtime, pname), + timeout=settings.GRPC_TIMEOUT, + ) for response in response_stream: if response.status.code == OK: result.code = OK @@ -284,12 +327,14 @@ def CopyFileTo(self, request, context): else: result.code = ERROR result.error_msg = utf8( - "can not establish insecure channel to '%s'" % dest_uri) + "can not establish insecure channel to '%s'" % dest_uri + ) result.error_file = utf8(request.path) else: result.code = ERROR result.error_msg = utf8( - "no package found! Only launch files from packages can be copied!") + "no package found! Only launch files from packages can be copied!" + ) result.error_file = utf8(request.path) except OSError as ose: result.code = OS_ERROR @@ -315,7 +360,7 @@ def ListPath(self, request, context): path_list = [] if not request.path: # list ROS root items - for p in os.getenv('ROS_PACKAGE_PATH').split(':'): + for p in os.getenv("ROS_PACKAGE_PATH").split(":"): try: path = os.path.normpath(p) fileList = os.listdir(path) @@ -325,8 +370,14 @@ def ListPath(self, request, context): else: file_type = PATH_DIR self.DIR_CACHE[path] = file_type - path_list.append(fms.PathObj(path=path, mtime=os.path.getmtime( - path), size=os.path.getsize(path), type=file_type)) + path_list.append( + fms.PathObj( + path=path, + mtime=os.path.getmtime(path), + size=os.path.getsize(path), + type=file_type, + ) + ) except Exception as _: pass else: @@ -334,14 +385,27 @@ def ListPath(self, request, context): # list the path dirlist = os.listdir(request.path) for cfile in dirlist: - path = os.path.normpath('%s%s%s' % ( - request.path, os.path.sep, cfile)) + path = os.path.normpath( + "%s%s%s" % (request.path, os.path.sep, cfile) + ) if os.path.isfile(path): - path_list.append(fms.PathObj(path=path, mtime=os.path.getmtime( - path), size=os.path.getsize(path), type=PATH_FILE)) + path_list.append( + fms.PathObj( + path=path, + mtime=os.path.getmtime(path), + size=os.path.getsize(path), + type=PATH_FILE, + ) + ) elif path in self.DIR_CACHE: - path_list.append(fms.PathObj(path=path, mtime=os.path.getmtime( - path), size=os.path.getsize(path), type=self.DIR_CACHE[path])) + path_list.append( + fms.PathObj( + path=path, + mtime=os.path.getmtime(path), + size=os.path.getsize(path), + type=self.DIR_CACHE[path], + ) + ) elif os.path.isdir(path): try: fileList = os.listdir(path) @@ -351,8 +415,14 @@ def ListPath(self, request, context): else: file_type = PATH_DIR self.DIR_CACHE[path] = file_type - path_list.append(fms.PathObj(path=path, mtime=os.path.getmtime( - path), size=os.path.getsize(path), type=file_type)) + path_list.append( + fms.PathObj( + path=path, + mtime=os.path.getmtime(path), + size=os.path.getsize(path), + type=file_type, + ) + ) except Exception as _: pass except OSError as ose: @@ -366,59 +436,98 @@ def ListPath(self, request, context): result.items.extend(path_list) return result - @wamp.register('ros.path.get_list') + @wamp.register("ros.path.get_list") def getPathList(self, inputPath: str) -> List[PathItem]: - Log.info('Request to [ros.path.get_list] for %s' % inputPath) + Log.info("Request to [ros.path.get_list] for %s" % inputPath) path_list: List[PathItem] = [] # list the path dirlist = os.listdir(inputPath) for cfile in dirlist: - path = os.path.normpath('%s%s%s' % (inputPath, os.path.sep, cfile)) + path = os.path.normpath("%s%s%s" % (inputPath, os.path.sep, cfile)) if os.path.isfile(path): - path_list.append(PathItem(path=path, mtime=os.path.getmtime( - path), size=os.path.getsize(path), path_type='file')) + path_list.append( + PathItem( + path=path, + mtime=os.path.getmtime(path), + size=os.path.getsize(path), + path_type="file", + ) + ) elif path in self.CB_DIR_CACHE: - path_list.append(PathItem(path=path, mtime=os.path.getmtime( - path), size=os.path.getsize(path), path_type=self.CB_DIR_CACHE[path])) + path_list.append( + PathItem( + path=path, + mtime=os.path.getmtime(path), + size=os.path.getsize(path), + path_type=self.CB_DIR_CACHE[path], + ) + ) elif os.path.isdir(path): try: fileList = os.listdir(path) file_type = None if ros_pkg.is_package(fileList): - file_type = 'package' + file_type = "package" else: - file_type = 'dir' + file_type = "dir" self.CB_DIR_CACHE[path] = file_type - path_list.append(PathItem(path=path, mtime=os.path.getmtime( - path), size=os.path.getsize(path), path_type=file_type)) + path_list.append( + PathItem( + path=path, + mtime=os.path.getmtime(path), + size=os.path.getsize(path), + path_type=file_type, + ) + ) except Exception as _: pass return json.dumps(path_list, cls=SelfEncoder) - def _glob(self, inputPath: str, recursive: bool = True, withHidden: bool = False, filter: List[str] = []) -> List[PathItem]: + def _glob( + self, + inputPath: str, + recursive: bool = True, + withHidden: bool = False, + filter: List[str] = [], + ) -> List[PathItem]: path_list: List[PathItem] = [] dir_list: List[str] = [] for name in os.listdir(inputPath): - if not withHidden and name.startswith('.'): + if not withHidden and name.startswith("."): continue filename = os.path.join(inputPath, name) if os.path.isfile(filename): - path_list.append(PathItem(path=filename, mtime=os.path.getmtime( - filename), size=os.path.getsize(filename), path_type='file')) + path_list.append( + PathItem( + path=filename, + mtime=os.path.getmtime(filename), + size=os.path.getsize(filename), + path_type="file", + ) + ) elif os.path.isdir(filename) and recursive: if name not in filter: dir_list.append(filename) # glob the directories at the end for filename in dir_list: - path_list.extend(self._glob(inputPath=filename, recursive=recursive, withHidden=withHidden, filter=filter)) + path_list.extend( + self._glob( + inputPath=filename, + recursive=recursive, + withHidden=withHidden, + filter=filter, + ) + ) return path_list - - @wamp.register('ros.path.get_list_recursive') - def getPathListRecursive(self, inputPath: str, filter=['node_modules']) -> List[PathItem]: - Log.info( - 'Request to [ros.path.get_list_recursive] for %s' % inputPath) - path_list: List[PathItem] = self._glob(inputPath, recursive=True, withHidden=False, filter=['node_modules']) + @wamp.register("ros.path.get_list_recursive") + def getPathListRecursive( + self, inputPath: str, filter=["node_modules"] + ) -> List[PathItem]: + Log.info("Request to [ros.path.get_list_recursive] for %s" % inputPath) + path_list: List[PathItem] = self._glob( + inputPath, recursive=True, withHidden=False, filter=["node_modules"] + ) return json.dumps(path_list, cls=SelfEncoder) def ListPackages(self, request, context): @@ -426,14 +535,16 @@ def ListPackages(self, request, context): try: from roslaunch import substitution_args import rospkg + substitution_args._rospack = rospkg.RosPack() except Exception as err: Log.warn("Cannot reset package cache: %s" % utf8(err)) result = fms.ListPackagesReply() try: # fill the input fields - root_paths = [os.path.normpath(p) for p in os.getenv( - "ROS_PACKAGE_PATH").split(':')] + root_paths = [ + os.path.normpath(p) for p in os.getenv("ROS_PACKAGE_PATH").split(":") + ] for p in root_paths: ret = ros_pkg.get_packages(p) for name, path in ret.items(): @@ -445,21 +556,23 @@ def ListPackages(self, request, context): result.status.error_msg = utf8(err) return result - @wamp.register('ros.packages.get_list') + @wamp.register("ros.packages.get_list") def getPackageList(self, clear_cache: bool = False) -> List[RosPackage]: - Log.info('Request to [ros.packages.get_list]') + Log.info("Request to [ros.packages.get_list]") clear_cache = False if clear_cache: try: from roslaunch import substitution_args import rospkg + substitution_args._rospack = rospkg.RosPack() except Exception as err: Log.warn("Cannot reset package cache: %s" % utf8(err)) package_list: List[RosPackage] = [] # fill the input fields - root_paths = [os.path.normpath(p) for p in os.getenv( - "ROS_PACKAGE_PATH").split(':')] + root_paths = [ + os.path.normpath(p) for p in os.getenv("ROS_PACKAGE_PATH").split(":") + ] packages = [] for p in root_paths: ret = ros_pkg.get_packages(p) @@ -470,28 +583,30 @@ def getPackageList(self, clear_cache: bool = False) -> List[RosPackage]: packages.append(name) return json.dumps(package_list, cls=SelfEncoder) - @wamp.register('ros.path.get_log_paths') + @wamp.register("ros.path.get_log_paths") def getLogPaths(self, nodes: List[str]) -> List[LogPathItem]: - Log.info('Request to [ros.path.get_log_paths] for %s' % nodes) + Log.info("Request to [ros.path.get_log_paths] for %s" % nodes) result = [] for node in nodes: namespace = None node_name = node - namespace_search = re.search('/(.*)/', node_name) + namespace_search = re.search("/(.*)/", node_name) if namespace_search is not None: - namespace = f'/{namespace_search.group(1)}' - node_name = node.replace(f'/{namespace}/', '') + namespace = f"/{namespace_search.group(1)}" + node_name = node.replace(f"/{namespace}/", "") screen_log = get_logfile( - node=node_name, for_new_screen=True, namespace=namespace) + node=node_name, for_new_screen=True, namespace=namespace + ) ros_log = get_ros_logfile(node) - log_path_item = LogPathItem(node, - screen_log=screen_log, - screen_log_exists=os.path.exists( - screen_log), - ros_log=ros_log, - ros_log_exists=os.path.exists(ros_log)) + log_path_item = LogPathItem( + node, + screen_log=screen_log, + screen_log_exists=os.path.exists(screen_log), + ros_log=ros_log, + ros_log_exists=os.path.exists(ros_log), + ) result.append(log_path_item) return json.dumps(result, cls=SelfEncoder) @@ -511,11 +626,16 @@ def _get_binaries(self, path, binaries): if os.path.isdir(path): fileList = os.listdir(path) for f in fileList: - if f and f[0] != '.' and f not in ['build'] and not f.endswith('.cfg') and not f.endswith('.so'): + if ( + f + and f[0] != "." + and f not in ["build"] + and not f.endswith(".cfg") + and not f.endswith(".so") + ): self._get_binaries(os.path.join(path, f), binaries) elif os.path.isfile(path) and os.access(path, os.X_OK): - binaries.append(fms.PathObj( - path=path, mtime=os.path.getmtime(path))) + binaries.append(fms.PathObj(path=path, mtime=os.path.getmtime(path))) def GetPackageBinaries(self, request, context): result = fms.PathList() @@ -525,12 +645,17 @@ def GetPackageBinaries(self, request, context): self._get_binaries(path, binaries) # find binaries in catkin workspace from catkin.find_in_workspaces import find_in_workspaces as catkin_find - search_paths = catkin_find(search_dirs=[ - 'libexec', 'share'], project=request.name, first_matching_workspace_only=True) + + search_paths = catkin_find( + search_dirs=["libexec", "share"], + project=request.name, + first_matching_workspace_only=True, + ) for p in search_paths: self._get_binaries(p, binaries) except Exception: import traceback + print(traceback.format_exc()) pass for b in binaries: @@ -581,10 +706,10 @@ def Delete(self, request, context): def _is_in_ros_root(self, path): # list ROS root items - for p in os.getenv('ROS_PACKAGE_PATH').split(':'): + for p in os.getenv("ROS_PACKAGE_PATH").split(":"): root = os.path.abspath(p) if path.startswith(root): - rest = path.replace(root, '').strip(os.path.sep) + rest = path.replace(root, "").strip(os.path.sep) if rest: return True return False @@ -596,14 +721,14 @@ def _contains_packages(self, path): if PACKAGE_FILE in files or MANIFEST_FILE in files: del dirs[:] return True - elif 'rospack_nosubdirs' in files: + elif "rospack_nosubdirs" in files: del dirs[:] continue # leaf # small optimization - elif '.svn' in dirs: - dirs.remove('.svn') - elif '.git' in dirs: - dirs.remove('.git') + elif ".svn" in dirs: + dirs.remove(".svn") + elif ".git" in dirs: + dirs.remove(".git") return False def New(self, request, context): diff --git a/fkie_node_manager_daemon/src/fkie_node_manager_daemon/launch_servicer.py b/fkie_node_manager_daemon/src/fkie_node_manager_daemon/launch_servicer.py index 052298b4..57172cd5 100644 --- a/fkie_node_manager_daemon/src/fkie_node_manager_daemon/launch_servicer.py +++ b/fkie_node_manager_daemon/src/fkie_node_manager_daemon/launch_servicer.py @@ -188,7 +188,6 @@ def __init__( self._loaded_files = dict() # dictionary of (CfgId: LaunchConfig) self._monitor_servicer = monitor_servicer self._watchdog_observer.start() - print(self.get_msg_struct("fkie_multimaster_msgs/LinkStatesStamped")) def _terminated(self): Log.info("terminated launch context")