diff --git a/README.md b/README.md index 53d4b6d..912178a 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,3 @@ -![ATAK Screenshot with PyTAK Logo.](https://takproto.readthedocs.io/en/latest/media/atak_screenshot_with_pytak_logo-x25.jpg) - # Send & Receive data from TAK with Python TAKProto is a Python module for encoding & decoding TAK Protocol Payloads, for use diff --git a/takproto/functions.py b/takproto/functions.py index bc20da6..2520aaf 100644 --- a/takproto/functions.py +++ b/takproto/functions.py @@ -46,15 +46,14 @@ from takproto.proto import TakMessage -def parse_proto(msg: Union[bytearray, str]) -> Optional[bytearray]: +def parse_proto(msg: bytearray) -> Optional[TakMessage]: """Parse TAK Protocol Version 1 Mesh & Stream message.""" parsed = None - if msg[:3] == DEFAULT_MESH_HEADER: parsed = parse_mesh(msg) - elif msg[0] == DEFAULT_PROTO_HEADER: + elif msg[0] in DEFAULT_PROTO_HEADER: parsed = parse_stream(msg) - elif msg[:5] == DEFAULT_XML_HEADER and isinstance(msg, str): + elif msg[:5] == DEFAULT_XML_HEADER: parsed = xml2message(msg) return parsed @@ -67,7 +66,7 @@ def parse_mesh(msg): return protobuf -def parse_stream(msg): +def parse_stream(msg) -> TakMessage: """Parse TAK Protocol Version 1 Stream message.""" bio = BytesIO(msg[1:]) msg = dpb.read(bio, TakMessage) @@ -77,14 +76,14 @@ def parse_stream(msg): def format_time(time: str) -> int: """Format timestamp as microseconds.""" try: - s_time = datetime.strptime(time, ISO_8601_UTC) + s_time = datetime.strptime(time + "+0000", ISO_8601_UTC + "%z") except ValueError: - s_time = datetime.strptime(time, W3C_XML_DATETIME) + s_time = datetime.strptime(time + "+0000", W3C_XML_DATETIME + "%z") return int(s_time.timestamp() * 1000) def xml2message( - xml: str, + xml: bytearray, ) -> ( TakMessage ): # NOQA pylint: disable=too-many-locals,too-many-branches,too-many-statements diff --git a/tests/test_functions.py b/tests/test_functions.py index 840b1d6..e253099 100644 --- a/tests/test_functions.py +++ b/tests/test_functions.py @@ -28,32 +28,36 @@ import takproto -__author__ = "Greg Albrecht " -__copyright__ = "Copyright 2023 Sensors & Signals LLC" -__license__ = "Apache License, Version 2.0" - - class TestFunctions(unittest.TestCase): - def test_format_timestamp(self): - """Test formatting timestamp to and from Protobuf format.""" + + def test_format_timestamp1(self): + """Test formatting timestamp to and from Protobuf format 1.""" t_time = "2020-02-08T18:10:44.000000Z" t_ts = 1581185444000 ts = takproto.format_time(t_time) self.assertEqual(ts, t_ts) + def test_format_timestamp2(self): + """Test formatting timestamp to and from Protobuf format 2.""" + t_time = "2020-02-08T18:10:44.000000Z" + t_ts = 1581185444000 t_ts2 = t_ts / 1000 time2 = datetime.fromtimestamp(t_ts2, timezone.utc).strftime( "%Y-%m-%dT%H:%M:%S.%fZ" ) self.assertEqual(time2, t_time) - def test_format_timestamp_without_subseconds(self): - """Test formatting timestamp to and from Protobuf format.""" + def test_format_timestamp_without_subseconds1(self): + """Test formatting timestamp to and from Protobuf format 1.""" t_time = "2020-02-08T18:10:44Z" t_ts = 1581185444000 ts = takproto.format_time(t_time) self.assertEqual(ts, t_ts) + def test_format_timestamp_without_subseconds2(self): + """Test formatting timestamp to and from Protobuf format 2.""" + t_time = "2020-02-08T18:10:44Z" + t_ts = 1581185444000 t_ts2 = t_ts / 1000 time2 = datetime.fromtimestamp(t_ts2, timezone.utc).strftime( "%Y-%m-%dT%H:%M:%SZ" @@ -153,6 +157,8 @@ def test_parse_proto_xml(self): t_ba = bytearray(t_xml, encoding="utf-8") parsed = takproto.parse_proto(t_ba) + print(parsed) + print(type(parsed)) cot_event = parsed.cotEvent self.assertEqual(cot_event.type, "a-f-G-E-V-C")