Skip to content

Commit

Permalink
Fixed pydata and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
awicenec committed Mar 11, 2024
1 parent 3bdf3bd commit 21ba3e8
Show file tree
Hide file tree
Showing 7 changed files with 111 additions and 139 deletions.
74 changes: 21 additions & 53 deletions daliuge-engine/dlg/apps/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,9 +190,7 @@ def copyRecursive(self, inputDrop):
self.copyRecursive(child)
else:
for outputDrop in self.outputs:
droputils.copyDropContents(
inputDrop, outputDrop, bufsize=self.bufsize
)
droputils.copyDropContents(inputDrop, outputDrop, bufsize=self.bufsize)


##
Expand Down Expand Up @@ -270,9 +268,7 @@ def run(self):
# At least one output should have been added
outs = self.outputs
if len(outs) < 1:
raise Exception(
"At least one output should have been added to %r" % self
)
raise Exception("At least one output should have been added to %r" % self)
marray = self.generateRandomArray()
if self._keep_array:
self.marray = marray
Expand All @@ -285,9 +281,7 @@ def generateRandomArray(self):
if self.integer:
# generate an array of self.size integers with numbers between
# slef.low and self.high
marray = np.random.randint(
int(self.low), int(self.high), size=(self.size)
)
marray = np.random.randint(int(self.low), int(self.high), size=(self.size))
else:
# generate an array of self.size floats with numbers between
# self.low and self.high
Expand Down Expand Up @@ -353,9 +347,7 @@ def run(self):

outs = self.outputs
if len(outs) < 1:
raise Exception(
"At least one output should have been added to %r" % self
)
raise Exception("At least one output should have been added to %r" % self)
self.getInputArrays()
self._avg = self.averageArray()
for o in outs:
Expand All @@ -371,9 +363,7 @@ def getInputArrays(self):
"""
ins = self.inputs
if len(ins) < 1:
raise Exception(
"At least one input should have been added to %r" % self
)
raise Exception("At least one input should have been added to %r" % self)
marray = []
for inp in ins:
sarray = droputils.allDropContents(inp)
Expand Down Expand Up @@ -430,14 +420,15 @@ def readWriteData(self):
inputs = self.inputs
outputs = self.outputs
total_len = 0
for input in inputs:
total_len += input.len
# for input in inputs:
# total_len += input.size
for output in outputs:
output.len = total_len
for input in inputs:
d = droputils.allDropContents(input)
output.write(d)

# logger.debug(f">>> written {pickle.loads(d)} to {output}")

def run(self):
self.readWriteData()

Expand Down Expand Up @@ -494,17 +485,11 @@ class GenericNpyGatherApp(BarrierAppDROP):

def run(self):
if len(self.inputs) < 1:
raise Exception(
f"At least one input should have been added to {self}"
)
raise Exception(f"At least one input should have been added to {self}")
if len(self.outputs) < 1:
raise Exception(
f"At least one output should have been added to {self}"
)
raise Exception(f"At least one output should have been added to {self}")
if self.function not in self.functions:
raise Exception(
f"Function {self.function} not supported by {self}"
)
raise Exception(f"Function {self.function} not supported by {self}")

result = (
self.reduce_gather_inputs()
Expand Down Expand Up @@ -542,11 +527,8 @@ def gather_inputs(self):
for input in self.inputs:
data = drop_loaders.load_numpy(input)
# assign instead of gather for the first input
result = (
data
if result is None
else gather(result, data, allow_pickle=True)
)
# result = data if result is None else gather(result, data, allow_pickle=True)
result = data if result is None else gather(result, data)
return result


Expand Down Expand Up @@ -594,20 +576,14 @@ def run(self):
raise Exception("Only one input expected for %r" % self)
else: # the input is expected to be a vector. We'll use the first element
try:
phrase = str(
pickle.loads(droputils.allDropContents(ins[0]))[0]
)
phrase = str(pickle.loads(droputils.allDropContents(ins[0]))[0])
except _pickle.UnpicklingError:
phrase = str(
droputils.allDropContents(ins[0]), encoding="utf-8"
)
phrase = str(droputils.allDropContents(ins[0]), encoding="utf-8")
self.greeting = f"Hello {phrase}"

outs = self.outputs
if len(outs) < 1:
raise Exception(
"At least one output should have been added to %r" % self
)
raise Exception("At least one output should have been added to %r" % self)
for o in outs:
o.len = len(self.greeting.encode())
o.write(self.greeting.encode()) # greet across all outputs
Expand Down Expand Up @@ -655,9 +631,7 @@ def run(self):

outs = self.outputs
if len(outs) < 1:
raise Exception(
"At least one output should have been added to %r" % self
)
raise Exception("At least one output should have been added to %r" % self)
for o in outs:
o.len = len(content)
o.write(content) # send content to all outputs
Expand Down Expand Up @@ -791,9 +765,7 @@ def run(self):
raise err
for split_index in range(self.num_of_copies):
out_index = in_index * self.num_of_copies + split_index
drop_loaders.save_numpy(
self.outputs[out_index], result[split_index]
)
drop_loaders.save_numpy(self.outputs[out_index], result[split_index])


class SimpleBranch(BranchAppDrop, NullBarrierApp):
Expand Down Expand Up @@ -839,9 +811,7 @@ def readData(self):
data = pickle.loads(droputils.allDropContents(input))

# make sure we always have a ndarray with at least 1dim.
if type(data) not in (list, tuple) and not isinstance(
data, (np.ndarray)
):
if type(data) not in (list, tuple) and not isinstance(data, (np.ndarray)):
raise TypeError
if isinstance(data, np.ndarray) and data.ndim == 0:
data = np.array([data])
Expand Down Expand Up @@ -915,9 +885,7 @@ def run(self):
# At least one output should have been added
outs = self.outputs
if len(outs) < 1:
raise Exception(
"At least one output should have been added to %r" % self
)
raise Exception("At least one output should have been added to %r" % self)
self.marray = self.generateArray()
for o in outs:
d = pickle.dumps(self.marray)
Expand Down
24 changes: 11 additions & 13 deletions daliuge-engine/dlg/data/drops/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def parse_pydata(pd_dict: dict) -> bytes:
pydata = pickle.loads(pydata)
except:
raise
return pickle.dump(pydata)
return pickle.dumps(pydata)


##
Expand Down Expand Up @@ -110,20 +110,22 @@ def initialize(self, **kwargs):
"""
args = []
pydata = None
field_names = (
[f["name"] for f in kwargs["fields"]] if "fields" in kwargs else []
)
if "pydata" in kwargs and not (
"nodeAttributes" in kwargs and "pydata" in kwargs["nodeAttributes"]
"fields" in kwargs and "pydata" in field_names
): # means that is was passed directly
pydata = kwargs.pop("pydata")
logger.debug("pydata value provided: %s", pydata)
logger.debug("pydata value provided: %s, %s", pydata, kwargs)
try: # test whether given value is valid
_ = pickle.loads(base64.b64decode(pydata))
pydata = base64.b64decode(pydata)
except:
pydata = None
elif (
"nodeAttributes" in kwargs and "pydata" in kwargs["nodeAttributes"]
):
pydata = parse_pydata(kwargs["nodeAttributes"]["pydata"])
elif "fields" in kwargs and "pydata" in field_names:
data_pos = field_names.index("pydata")
pydata = parse_pydata(kwargs["fields"][data_pos])
args.append(pydata)
logger.debug("Loaded into memory: %s", pydata)
self._buf = io.BytesIO(*args)
Expand Down Expand Up @@ -192,9 +194,7 @@ def initialize(self, **kwargs):
pydata = base64.b64decode(pydata.encode("latin1"))
except:
pydata = None
elif (
"nodeAttributes" in kwargs and "pydata" in kwargs["nodeAttributes"]
):
elif "nodeAttributes" in kwargs and "pydata" in kwargs["nodeAttributes"]:
pydata = parse_pydata(kwargs["nodeAttributes"]["pydata"])
args.append(pydata)
self._buf = io.BytesIO(*args)
Expand All @@ -206,9 +206,7 @@ def getIO(self):
else:
# Using Drop without manager, just generate a random name.
sess_id = "".join(
random.choices(
string.ascii_uppercase + string.digits, k=10
)
random.choices(string.ascii_uppercase + string.digits, k=10)
)
return SharedMemoryIO(self.oid, sess_id)
else:
Expand Down
16 changes: 12 additions & 4 deletions daliuge-engine/dlg/data/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -367,13 +367,13 @@ class FileIO(DataIO):
A file-based implementation of DataIO
"""

_desc: io.BufferedReader
_desc: io.BufferedRWPair

def __init__(self, filename, **kwargs):
super().__init__()
self._fnm = filename

def _open(self, **kwargs) -> io.BufferedReader:
def _open(self, **kwargs) -> io.BufferedRWPair:
flag = "r" if self._mode is OpenMode.OPEN_READ else "w"
flag += "b"
return open(self._fnm, flag)
Expand Down Expand Up @@ -588,7 +588,11 @@ def _open(self, **kwargs):
# when finishArchive is called.
self._buf = b""
self._writtenDataSize = 0
return self._getClient()
client = self._getClient()
else:
client = self._getClient()
self._read_gen = client
return client

def _close(self, **kwargs):
if self._mode == OpenMode.OPEN_WRITE:
Expand All @@ -611,7 +615,11 @@ def _close(self, **kwargs):
response.close()

def _read(self, count=65536, **kwargs):
return self._desc.read(count)
try:
buf = self._read_gen.__next__()
except StopIteration:
buf = b""
return buf

def _write(self, data, **kwargs) -> int:
if self._is_length_unknown():
Expand Down
33 changes: 20 additions & 13 deletions daliuge-engine/dlg/ngaslite.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import http.client
import logging
import urllib.request
import requests
from xml.dom.minidom import parseString

logger = logging.getLogger(__name__)
Expand All @@ -44,12 +45,13 @@ def open(
mode=1,
mimeType="application/octet-stream",
length=-1,
chunksize=65536,
):
logger.debug(
"Opening NGAS drop %s with mode %d and length %s", fileId, mode, length
)
if mode == 1:
return retrieve(host, fileId, port=port, timeout=timeout)
return retrieve(host, fileId, port=port, timeout=timeout, chunksize=chunksize)
elif mode == 0:
return beginArchive(
host, fileId, port=port, timeout=timeout, mimeType=mimeType, length=length
Expand All @@ -60,22 +62,25 @@ def open(
return stat


def retrieve(host, fileId, port=7777, timeout=None):
def retrieve(host, fileId, port=7777, timeout=None, chunksize=65536):
"""
Retrieve the given fileId from the NGAS server located at `host`:`port`
This method returns a file-like object that supports the `read` operation,
and over which `close` must be invoked once no more data is read from it.
"""
url = "http://%s:%d/RETRIEVE?file_id=%s" % (host, port, fileId)
scheme = "http" if port != 443 else "https"
url = "%s://%s:%d/RETRIEVE?file_id=%s" % (scheme, host, port, fileId)
logger.debug("Issuing RETRIEVE request: %s", url)
conn = urllib.request.urlopen(url)
if conn.getcode() != http.HTTPStatus.OK:
resp = requests.request("GET", url, stream=True)
# conn = urllib.request.urlopen(url)
if resp.status_code != http.HTTPStatus.OK:
raise Exception(
"Error while RETRIEVE-ing %s from %s:%d: %d %s"
% (fileId, host, port, conn.getcode(), conn.msg)
% (fileId, host, port, resp.status_code, resp.msg)
)
return conn
read_gen = resp.iter_content(chunksize)
return read_gen


def beginArchive(
Expand Down Expand Up @@ -123,18 +128,20 @@ def fileStatus(host, port, fileId, timeout=10):
TODO: This needs to use file_version as well, else it might
return a value for a previous version.
"""
url = "http://%s:%d/STATUS?file_id=%s" % (host, port, fileId)
scheme = "http" if port != 443 else "https"
url = "%s://%s:%d/STATUS?file_id=%s" % (scheme, host, port, fileId)
logger.debug("Issuing STATUS request: %s", url)
try:
conn = urllib.request.urlopen(url, timeout=timeout)
except urllib.error.HTTPError:
conn = requests.request("GET", url)
# conn = urllib.request.urlopen(url, timeout=timeout)
except ConnectionError:
raise FileNotFoundError
if conn.getcode() != http.HTTPStatus.OK:
if conn.status_code != http.HTTPStatus.OK:
raise Exception(
"Error while getting STATUS %s from %s:%d: %d %s"
% (fileId, host, port, conn.getcode(), conn.msg)
% (fileId, host, port, conn.status_code, conn.text)
)
dom = parseString(conn.read().decode())
dom = parseString(conn.content.decode())
stat = dict(
dom.getElementsByTagName("NgamsStatus")[0]
.getElementsByTagName("DiskStatus")[0]
Expand Down
11 changes: 4 additions & 7 deletions daliuge-engine/test/apps/test_docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,7 @@ def test_clientServer(self):
image="ubuntu:14.04",
command="cat %i0 > /dev/tcp/%containerIp[c]%/8000",
)
c = DockerApp(
"c", "c", image="ubuntu:14.04", command="nc -l 8000 > %o0"
)
c = DockerApp("c", "c", image="ubuntu:14.04", command="nc -l 8000 > %o0")
d = FileDROP("d", "d")

b.addInput(a)
Expand Down Expand Up @@ -181,16 +179,15 @@ def _ngas_and_fs_io(self, command):
a = NgasDROP(
"HelloWorld_out.txt", "HelloWorld_out.txt"
) # not a filesystem-related DROP, we can reference its URL in the command-line
a.ngasSrv = "ngas.ddns.net"
a.ngasSrv = "ngas.icrar.org"
a.ngasPort = 443
b = DockerApp("b", "b", image="ubuntu:14.04", command=command)
c = FileDROP("c", "c")
b.addInput(a)
b.addOutput(c)
with DROPWaiterCtx(self, c, 100):
a.setCompleted()
self.assertEqual(
a.dataURL.encode("utf8"), droputils.allDropContents(c)
)
self.assertEqual(a.dataURL.encode("utf8"), droputils.allDropContents(c))

def test_additional_bindings(self):
# Some additional stuff to bind into docker
Expand Down
Loading

0 comments on commit 21ba3e8

Please sign in to comment.