diff --git a/daliuge-engine/dlg/apps/simple.py b/daliuge-engine/dlg/apps/simple.py index 56695cbe8..089997d45 100644 --- a/daliuge-engine/dlg/apps/simple.py +++ b/daliuge-engine/dlg/apps/simple.py @@ -190,9 +190,7 @@ def copyRecursive(self, inputDrop): self.copyRecursive(child) else: for outputDrop in self.outputs: - droputils.copyDropContents( - inputDrop, outputDrop, bufsize=self.bufsize - ) + droputils.copyDropContents(inputDrop, outputDrop, bufsize=self.bufsize) ## @@ -270,9 +268,7 @@ def run(self): # At least one output should have been added outs = self.outputs if len(outs) < 1: - raise Exception( - "At least one output should have been added to %r" % self - ) + raise Exception("At least one output should have been added to %r" % self) marray = self.generateRandomArray() if self._keep_array: self.marray = marray @@ -285,9 +281,7 @@ def generateRandomArray(self): if self.integer: # generate an array of self.size integers with numbers between # slef.low and self.high - marray = np.random.randint( - int(self.low), int(self.high), size=(self.size) - ) + marray = np.random.randint(int(self.low), int(self.high), size=(self.size)) else: # generate an array of self.size floats with numbers between # self.low and self.high @@ -353,9 +347,7 @@ def run(self): outs = self.outputs if len(outs) < 1: - raise Exception( - "At least one output should have been added to %r" % self - ) + raise Exception("At least one output should have been added to %r" % self) self.getInputArrays() self._avg = self.averageArray() for o in outs: @@ -371,9 +363,7 @@ def getInputArrays(self): """ ins = self.inputs if len(ins) < 1: - raise Exception( - "At least one input should have been added to %r" % self - ) + raise Exception("At least one input should have been added to %r" % self) marray = [] for inp in ins: sarray = droputils.allDropContents(inp) @@ -430,14 +420,15 @@ def readWriteData(self): inputs = self.inputs outputs = self.outputs total_len = 0 - for input in inputs: - total_len += input.len + # for input in inputs: + # total_len += input.size for output in outputs: - output.len = total_len for input in inputs: d = droputils.allDropContents(input) output.write(d) + # logger.debug(f">>> written {pickle.loads(d)} to {output}") + def run(self): self.readWriteData() @@ -494,17 +485,11 @@ class GenericNpyGatherApp(BarrierAppDROP): def run(self): if len(self.inputs) < 1: - raise Exception( - f"At least one input should have been added to {self}" - ) + raise Exception(f"At least one input should have been added to {self}") if len(self.outputs) < 1: - raise Exception( - f"At least one output should have been added to {self}" - ) + raise Exception(f"At least one output should have been added to {self}") if self.function not in self.functions: - raise Exception( - f"Function {self.function} not supported by {self}" - ) + raise Exception(f"Function {self.function} not supported by {self}") result = ( self.reduce_gather_inputs() @@ -542,11 +527,8 @@ def gather_inputs(self): for input in self.inputs: data = drop_loaders.load_numpy(input) # assign instead of gather for the first input - result = ( - data - if result is None - else gather(result, data, allow_pickle=True) - ) + # result = data if result is None else gather(result, data, allow_pickle=True) + result = data if result is None else gather(result, data) return result @@ -594,20 +576,14 @@ def run(self): raise Exception("Only one input expected for %r" % self) else: # the input is expected to be a vector. We'll use the first element try: - phrase = str( - pickle.loads(droputils.allDropContents(ins[0]))[0] - ) + phrase = str(pickle.loads(droputils.allDropContents(ins[0]))[0]) except _pickle.UnpicklingError: - phrase = str( - droputils.allDropContents(ins[0]), encoding="utf-8" - ) + phrase = str(droputils.allDropContents(ins[0]), encoding="utf-8") self.greeting = f"Hello {phrase}" outs = self.outputs if len(outs) < 1: - raise Exception( - "At least one output should have been added to %r" % self - ) + raise Exception("At least one output should have been added to %r" % self) for o in outs: o.len = len(self.greeting.encode()) o.write(self.greeting.encode()) # greet across all outputs @@ -655,9 +631,7 @@ def run(self): outs = self.outputs if len(outs) < 1: - raise Exception( - "At least one output should have been added to %r" % self - ) + raise Exception("At least one output should have been added to %r" % self) for o in outs: o.len = len(content) o.write(content) # send content to all outputs @@ -791,9 +765,7 @@ def run(self): raise err for split_index in range(self.num_of_copies): out_index = in_index * self.num_of_copies + split_index - drop_loaders.save_numpy( - self.outputs[out_index], result[split_index] - ) + drop_loaders.save_numpy(self.outputs[out_index], result[split_index]) class SimpleBranch(BranchAppDrop, NullBarrierApp): @@ -839,9 +811,7 @@ def readData(self): data = pickle.loads(droputils.allDropContents(input)) # make sure we always have a ndarray with at least 1dim. - if type(data) not in (list, tuple) and not isinstance( - data, (np.ndarray) - ): + if type(data) not in (list, tuple) and not isinstance(data, (np.ndarray)): raise TypeError if isinstance(data, np.ndarray) and data.ndim == 0: data = np.array([data]) @@ -915,9 +885,7 @@ def run(self): # At least one output should have been added outs = self.outputs if len(outs) < 1: - raise Exception( - "At least one output should have been added to %r" % self - ) + raise Exception("At least one output should have been added to %r" % self) self.marray = self.generateArray() for o in outs: d = pickle.dumps(self.marray) diff --git a/daliuge-engine/dlg/data/drops/memory.py b/daliuge-engine/dlg/data/drops/memory.py index d27934507..148b4ee40 100644 --- a/daliuge-engine/dlg/data/drops/memory.py +++ b/daliuge-engine/dlg/data/drops/memory.py @@ -72,7 +72,7 @@ def parse_pydata(pd_dict: dict) -> bytes: pydata = pickle.loads(pydata) except: raise - return pickle.dump(pydata) + return pickle.dumps(pydata) ## @@ -110,20 +110,22 @@ def initialize(self, **kwargs): """ args = [] pydata = None + field_names = ( + [f["name"] for f in kwargs["fields"]] if "fields" in kwargs else [] + ) if "pydata" in kwargs and not ( - "nodeAttributes" in kwargs and "pydata" in kwargs["nodeAttributes"] + "fields" in kwargs and "pydata" in field_names ): # means that is was passed directly pydata = kwargs.pop("pydata") - logger.debug("pydata value provided: %s", pydata) + logger.debug("pydata value provided: %s, %s", pydata, kwargs) try: # test whether given value is valid _ = pickle.loads(base64.b64decode(pydata)) pydata = base64.b64decode(pydata) except: pydata = None - elif ( - "nodeAttributes" in kwargs and "pydata" in kwargs["nodeAttributes"] - ): - pydata = parse_pydata(kwargs["nodeAttributes"]["pydata"]) + elif "fields" in kwargs and "pydata" in field_names: + data_pos = field_names.index("pydata") + pydata = parse_pydata(kwargs["fields"][data_pos]) args.append(pydata) logger.debug("Loaded into memory: %s", pydata) self._buf = io.BytesIO(*args) @@ -192,9 +194,7 @@ def initialize(self, **kwargs): pydata = base64.b64decode(pydata.encode("latin1")) except: pydata = None - elif ( - "nodeAttributes" in kwargs and "pydata" in kwargs["nodeAttributes"] - ): + elif "nodeAttributes" in kwargs and "pydata" in kwargs["nodeAttributes"]: pydata = parse_pydata(kwargs["nodeAttributes"]["pydata"]) args.append(pydata) self._buf = io.BytesIO(*args) @@ -206,9 +206,7 @@ def getIO(self): else: # Using Drop without manager, just generate a random name. sess_id = "".join( - random.choices( - string.ascii_uppercase + string.digits, k=10 - ) + random.choices(string.ascii_uppercase + string.digits, k=10) ) return SharedMemoryIO(self.oid, sess_id) else: diff --git a/daliuge-engine/dlg/data/io.py b/daliuge-engine/dlg/data/io.py index 51def3f1d..96294adbf 100644 --- a/daliuge-engine/dlg/data/io.py +++ b/daliuge-engine/dlg/data/io.py @@ -367,13 +367,13 @@ class FileIO(DataIO): A file-based implementation of DataIO """ - _desc: io.BufferedReader + _desc: io.BufferedRWPair def __init__(self, filename, **kwargs): super().__init__() self._fnm = filename - def _open(self, **kwargs) -> io.BufferedReader: + def _open(self, **kwargs) -> io.BufferedRWPair: flag = "r" if self._mode is OpenMode.OPEN_READ else "w" flag += "b" return open(self._fnm, flag) @@ -588,7 +588,11 @@ def _open(self, **kwargs): # when finishArchive is called. self._buf = b"" self._writtenDataSize = 0 - return self._getClient() + client = self._getClient() + else: + client = self._getClient() + self._read_gen = client + return client def _close(self, **kwargs): if self._mode == OpenMode.OPEN_WRITE: @@ -611,7 +615,11 @@ def _close(self, **kwargs): response.close() def _read(self, count=65536, **kwargs): - return self._desc.read(count) + try: + buf = self._read_gen.__next__() + except StopIteration: + buf = b"" + return buf def _write(self, data, **kwargs) -> int: if self._is_length_unknown(): diff --git a/daliuge-engine/dlg/ngaslite.py b/daliuge-engine/dlg/ngaslite.py index d2140f896..413fa3575 100644 --- a/daliuge-engine/dlg/ngaslite.py +++ b/daliuge-engine/dlg/ngaslite.py @@ -31,6 +31,7 @@ import http.client import logging import urllib.request +import requests from xml.dom.minidom import parseString logger = logging.getLogger(__name__) @@ -44,12 +45,13 @@ def open( mode=1, mimeType="application/octet-stream", length=-1, + chunksize=65536, ): logger.debug( "Opening NGAS drop %s with mode %d and length %s", fileId, mode, length ) if mode == 1: - return retrieve(host, fileId, port=port, timeout=timeout) + return retrieve(host, fileId, port=port, timeout=timeout, chunksize=chunksize) elif mode == 0: return beginArchive( host, fileId, port=port, timeout=timeout, mimeType=mimeType, length=length @@ -60,22 +62,25 @@ def open( return stat -def retrieve(host, fileId, port=7777, timeout=None): +def retrieve(host, fileId, port=7777, timeout=None, chunksize=65536): """ Retrieve the given fileId from the NGAS server located at `host`:`port` This method returns a file-like object that supports the `read` operation, and over which `close` must be invoked once no more data is read from it. """ - url = "http://%s:%d/RETRIEVE?file_id=%s" % (host, port, fileId) + scheme = "http" if port != 443 else "https" + url = "%s://%s:%d/RETRIEVE?file_id=%s" % (scheme, host, port, fileId) logger.debug("Issuing RETRIEVE request: %s", url) - conn = urllib.request.urlopen(url) - if conn.getcode() != http.HTTPStatus.OK: + resp = requests.request("GET", url, stream=True) + # conn = urllib.request.urlopen(url) + if resp.status_code != http.HTTPStatus.OK: raise Exception( "Error while RETRIEVE-ing %s from %s:%d: %d %s" - % (fileId, host, port, conn.getcode(), conn.msg) + % (fileId, host, port, resp.status_code, resp.msg) ) - return conn + read_gen = resp.iter_content(chunksize) + return read_gen def beginArchive( @@ -123,18 +128,20 @@ def fileStatus(host, port, fileId, timeout=10): TODO: This needs to use file_version as well, else it might return a value for a previous version. """ - url = "http://%s:%d/STATUS?file_id=%s" % (host, port, fileId) + scheme = "http" if port != 443 else "https" + url = "%s://%s:%d/STATUS?file_id=%s" % (scheme, host, port, fileId) logger.debug("Issuing STATUS request: %s", url) try: - conn = urllib.request.urlopen(url, timeout=timeout) - except urllib.error.HTTPError: + conn = requests.request("GET", url) + # conn = urllib.request.urlopen(url, timeout=timeout) + except ConnectionError: raise FileNotFoundError - if conn.getcode() != http.HTTPStatus.OK: + if conn.status_code != http.HTTPStatus.OK: raise Exception( "Error while getting STATUS %s from %s:%d: %d %s" - % (fileId, host, port, conn.getcode(), conn.msg) + % (fileId, host, port, conn.status_code, conn.text) ) - dom = parseString(conn.read().decode()) + dom = parseString(conn.content.decode()) stat = dict( dom.getElementsByTagName("NgamsStatus")[0] .getElementsByTagName("DiskStatus")[0] diff --git a/daliuge-engine/test/apps/test_docker.py b/daliuge-engine/test/apps/test_docker.py index f88163307..26beb7db1 100644 --- a/daliuge-engine/test/apps/test_docker.py +++ b/daliuge-engine/test/apps/test_docker.py @@ -120,9 +120,7 @@ def test_clientServer(self): image="ubuntu:14.04", command="cat %i0 > /dev/tcp/%containerIp[c]%/8000", ) - c = DockerApp( - "c", "c", image="ubuntu:14.04", command="nc -l 8000 > %o0" - ) + c = DockerApp("c", "c", image="ubuntu:14.04", command="nc -l 8000 > %o0") d = FileDROP("d", "d") b.addInput(a) @@ -181,16 +179,15 @@ def _ngas_and_fs_io(self, command): a = NgasDROP( "HelloWorld_out.txt", "HelloWorld_out.txt" ) # not a filesystem-related DROP, we can reference its URL in the command-line - a.ngasSrv = "ngas.ddns.net" + a.ngasSrv = "ngas.icrar.org" + a.ngasPort = 443 b = DockerApp("b", "b", image="ubuntu:14.04", command=command) c = FileDROP("c", "c") b.addInput(a) b.addOutput(c) with DROPWaiterCtx(self, c, 100): a.setCompleted() - self.assertEqual( - a.dataURL.encode("utf8"), droputils.allDropContents(c) - ) + self.assertEqual(a.dataURL.encode("utf8"), droputils.allDropContents(c)) def test_additional_bindings(self): # Some additional stuff to bind into docker diff --git a/daliuge-engine/test/apps/test_pyfunc.py b/daliuge-engine/test/apps/test_pyfunc.py index a3848c403..e584ba091 100644 --- a/daliuge-engine/test/apps/test_pyfunc.py +++ b/daliuge-engine/test/apps/test_pyfunc.py @@ -29,6 +29,7 @@ from dlg import droputils, drop_loaders from dlg.apps import pyfunc +from dlg.apps.simple_functions import string2json from dlg.ddap_protocol import DROPStates, DROPRel, DROPLinkType from dlg.data.drops.memory import InMemoryDROP from dlg.droputils import DROPWaiterCtx @@ -69,14 +70,10 @@ def _PyFuncApp(oid, uid, f, **kwargs): if isinstance(f, str): fname = f = "test.apps.test_pyfunc." + f fw_kwargs = { - k: v - for k, v in kwargs.items() - if k in ["input_parser", "output_parser"] + k: v for k, v in kwargs.items() if k in ["input_parser", "output_parser"] } input_kws = [ - {k: v} - for k, v in kwargs.items() - if k not in ["input_parser", "output_parser"] + {k: v} for k, v in kwargs.items() if k not in ["input_parser", "output_parser"] ] fcode, fdefaults = pyfunc.serialize_func(f) return pyfunc.PyFuncApp( @@ -129,9 +126,7 @@ def inner_function(x, y): _PyFuncApp("a", "a", inner_function) - def test_pickle_func( - self, f=lambda x: x, input_data="hello", output_data="hello" - ): + def test_pickle_func(self, f=lambda x: x, input_data="hello", output_data="hello"): a = InMemoryDROP("a", "a") b = _PyFuncApp("b", "b", f) c = InMemoryDROP("c", "c") @@ -173,11 +168,31 @@ def test_eval_func(self, f=lambda x: x, input_data=None, output_data=None): eval(droputils.allDropContents(c).decode("utf-8"), {}, {}), ) + def test_string2json_func(self, f=string2json, input_data=None, output_data=None): + input_data = '["a", "b", "c"]' if input_data is None else input_data + output_data = ["a", "b", "c"] if output_data is None else output_data + + a = InMemoryDROP("a", "a") + b = _PyFuncApp( + "b", + "b", + f, + ) + c = InMemoryDROP("c", "c") + + b.addInput(a) + b.addOutput(c) + + with DROPWaiterCtx(self, c, 5): + drop_loaders.save_pickle(a, input_data) + a.setCompleted() + for drop in a, b, c: + self.assertEqual(DROPStates.COMPLETED, drop.status) + numpy.testing.assert_equal(output_data, drop_loaders.load_pickle(c)) + def test_npy_func(self, f=lambda x: x, input_data=None, output_data=None): input_data = numpy.ones([2, 2]) if input_data is None else input_data - output_data = ( - numpy.ones([2, 2]) if output_data is None else output_data - ) + output_data = numpy.ones([2, 2]) if output_data is None else output_data a = InMemoryDROP("a", "a") b = _PyFuncApp( @@ -211,9 +226,7 @@ def _test_simple_functions(self, f, input_data, output_data): for drop in a, b, c: self.assertEqual(DROPStates.COMPLETED, drop.status) - self.assertEqual( - output_data, pickle.loads(droputils.allDropContents(c)) - ) + self.assertEqual(output_data, pickle.loads(droputils.allDropContents(c))) def test_func1(self): """Checks that func1 in this module works when wrapped""" @@ -300,9 +313,7 @@ def _do_test(func, expected_out, *args, **kwargs): for i in range(n_args): logger.debug(f"adding arg input: {args[i]}") si = arg_names[i] - arg_inputs.append( - InMemoryDROP(si, si, pydata=translate(args[i])) - ) + arg_inputs.append(InMemoryDROP(si, si, pydata=translate(args[i]))) i = n_args for name, value in kwargs.items(): si = name # use keyword name @@ -315,9 +326,7 @@ def _do_test(func, expected_out, *args, **kwargs): a = InMemoryDROP("a", "a", pydata=translate(1)) output = InMemoryDROP("o", "o") kwargs = {inp.uid: inp.oid for inp in arg_inputs} - kwargs.update( - {name: vals[0] for name, vals in kwarg_inputs.items()} - ) + kwargs.update({name: vals[0] for name, vals in kwarg_inputs.items()}) kwargs["a"] = a.oid app = _PyFuncApp( "f", @@ -459,9 +468,7 @@ def test_input_in_remote_nm(self): ] rels = [DROPRel("A", DROPLinkType.INPUT, "B")] a_data = os.urandom(32) - c_data = self._test_runGraphInTwoNMs( - g1, g2, rels, pickle.dumps(a_data), None - ) + c_data = self._test_runGraphInTwoNMs(g1, g2, rels, pickle.dumps(a_data), None) self.assertEqual(a_data, pickle.loads(c_data)) def test_output_in_remote_nm(self): @@ -497,7 +504,5 @@ def test_output_in_remote_nm(self): ] rels = [DROPRel("B", DROPLinkType.PRODUCER, "C")] a_data = os.urandom(32) - c_data = self._test_runGraphInTwoNMs( - g1, g2, rels, pickle.dumps(a_data), None - ) + c_data = self._test_runGraphInTwoNMs(g1, g2, rels, pickle.dumps(a_data), None) self.assertEqual(a_data, pickle.loads(c_data)) diff --git a/daliuge-engine/test/apps/test_simple.py b/daliuge-engine/test/apps/test_simple.py index 4810a81b0..2a1c4b009 100644 --- a/daliuge-engine/test/apps/test_simple.py +++ b/daliuge-engine/test/apps/test_simple.py @@ -144,9 +144,7 @@ def test_helloworldapp(self): h.addOutput(b) b.addProducer(h) h.execute() - self.assertEqual( - h.greeting.encode("utf8"), droputils.allDropContents(b) - ) + self.assertEqual(h.greeting.encode("utf8"), droputils.allDropContents(b)) def test_parallelHelloWorld(self): m0 = InMemoryDROP("m0", "m0") @@ -178,11 +176,13 @@ def test_parallelHelloWorld(self): # @unittest.skip def test_ngasio(self): nd_in = NgasDROP("HelloWorld_out.txt", "HelloWorld_out.txt") - nd_in.ngasSrv = "ngas.ddns.net" + nd_in.ngasSrv = "ngas.icrar.org" + nd_in.ngasPort = 443 b = CopyApp("b", "b") did = "HelloWorld-%f" % time.time() nd_out = NgasDROP(did, did, len=11) - nd_out.ngasSrv = "ngas.ddns.net" + nd_out.ngasSrv = "ngas.icrar.org" + nd_out.ngasPort = 443 nd_out.len = nd_in.size d = CopyApp("d", "d") i = InMemoryDROP("i", "i") @@ -193,7 +193,7 @@ def test_ngasio(self): d.addInput(nd_out) i.addProducer(d) # b.addOutput(i) - self._test_graph_runs((nd_in, b, nd_out, i, d), nd_in, i, timeout=10) + self._test_graph_runs((nd_in, b, nd_out, i, d), nd_in, i, timeout=60) self.assertEqual(b"Hello World", droputils.allDropContents(i)) def test_genericScatter(self): @@ -237,9 +237,7 @@ def test_genericNpyScatter_multi(self): c = InMemoryDROP("c", "c") drop_loaders.save_numpy(b, data1_in) drop_loaders.save_numpy(c, data2_in) - s = GenericNpyScatterApp( - "s", "s", num_of_copies=2, scatter_axes="[0,0]" - ) + s = GenericNpyScatterApp("s", "s", num_of_copies=2, scatter_axes="[0,0]") s.addInput(b) s.addInput(c) o1 = InMemoryDROP("o1", "o1") @@ -289,18 +287,13 @@ def test_multi_listappendthrashing(self, size=1000, parallel=True): X = AverageArraysApp("X", "X") Z = InMemoryDROP("Z", "Z") drops = [ListAppendThrashingApp(x, x, size=size) for x in drop_ids] - mdrops = [ - InMemoryDROP(chr(65 + x), chr(65 + x)) for x in range(max_threads) - ] + mdrops = [InMemoryDROP(chr(65 + x), chr(65 + x)) for x in range(max_threads)] if parallel: # a bit of magic to get the app drops using the processes _ = [drop.__setattr__("_tp", threadpool) for drop in drops] _ = [drop.__setattr__("_tp", threadpool) for drop in mdrops] _ = [drop.__setattr__("_sessID", session_id) for drop in mdrops] - _ = [ - memory_manager.register_drop(drop.uid, session_id) - for drop in mdrops - ] + _ = [memory_manager.register_drop(drop.uid, session_id) for drop in mdrops] X.__setattr__("_tp", threadpool) Z.__setattr__("_tp", threadpool) Z.__setattr__("_sessID", session_id) @@ -310,9 +303,7 @@ def test_multi_listappendthrashing(self, size=1000, parallel=True): _ = [d.addOutput(m) for d, m in zip(drops, mdrops)] _ = [X.addInput(m) for m in mdrops] X.addOutput(Z) - logger.info( - f"Number of inputs/outputs: {len(X.inputs)}, {len(X.outputs)}" - ) + logger.info(f"Number of inputs/outputs: {len(X.inputs)}, {len(X.outputs)}") self._test_graph_runs([S, X, Z] + drops + mdrops, S, Z, timeout=200) # Need to run our 'copy' of the averaging APP num_array = [] @@ -348,9 +339,7 @@ def test_speedup(self): st = time.time() self.test_multi_listappendthrashing(size=size, parallel=True) t2 = time.time() - st - logger.info( - f"Speedup: {t1 / t2:.2f} from {cpu_count(logical=False)} cores" - ) + logger.info(f"Speedup: {t1 / t2:.2f} from {cpu_count(logical=False)} cores") # TODO: This is unpredictable, but maybe we can do something meaningful. # self.assertAlmostEqual(t1/cpu_count(logical=False), t2, 1) # How about this? We only need to see some type of speedup