Skip to content

Commit

Permalink
Merge pull request #18 from sei-jmattson/parse-proto-xml
Browse files Browse the repository at this point in the history
Support mixed-mode rx
  • Loading branch information
ampledata authored Jun 22, 2024
2 parents 24bf0fa + 86726c1 commit 5d8b25f
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 6 deletions.
1 change: 1 addition & 0 deletions takproto/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

DEFAULT_PROTO_HEADER = bytearray(b"\xbf")
DEFAULT_MESH_HEADER = bytearray(b"\xbf\x01\xbf")
DEFAULT_XML_HEADER = bytearray(b"<?xml")

W3C_XML_DATETIME: str = "%Y-%m-%dT%H:%M:%S.%fZ"
ISO_8601_UTC: str = "%Y-%m-%dT%H:%M:%SZ"
Expand Down
16 changes: 12 additions & 4 deletions takproto/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
W3C_XML_DATETIME,
DEFAULT_MESH_HEADER,
DEFAULT_PROTO_HEADER,
DEFAULT_XML_HEADER,
TAKProtoVer,
)
from takproto.proto import TakMessage
Expand All @@ -51,6 +52,8 @@ def parse_proto(msg: bytearray) -> Optional[bytearray]:
parsed = parse_mesh(msg)
elif msg[0] in DEFAULT_PROTO_HEADER:
parsed = parse_stream(msg)
elif msg[:5] == DEFAULT_XML_HEADER:
parsed = xml2message(msg)
return parsed


Expand Down Expand Up @@ -78,9 +81,7 @@ def format_time(time: str) -> int:
return int(s_time.timestamp() * 1000)


def xml2proto(
xml: str, protover: Optional[TAKProtoVer] = None
): # NOQA pylint: disable=too-many-locals,too-many-branches,too-many-statements
def xml2message(xml: str) -> TakMessage: # NOQA pylint: disable=too-many-locals,too-many-branches,too-many-statements
"""Convert plain XML CoT to Protobuf."""
event = ET.fromstring(xml)
tak_message = TakMessage()
Expand Down Expand Up @@ -132,7 +133,8 @@ def xml2proto(
# If this is a GeoChat message, write the contents of <detail> in xmlDetail.
if uid and "GeoChat." in uid:
pattern = "<detail>(.*?)</detail>"
xmldetailstr = re.search(pattern, xml).group(1)
target = ET.tostring(detail).decode("utf-8")
xmldetailstr = re.search(pattern, target).group(1)
new_detail.xmlDetail = xmldetailstr
else:
# Add unknown elements to xmlDetail field.
Expand Down Expand Up @@ -197,6 +199,12 @@ def xml2proto(
if attrib_val:
setattr(new_detail.track, attrib, float(attrib_val))

return tak_message

def xml2proto(
xml: str, protover: Optional[TAKProtoVer] = None
):
tak_message = xml2message(xml)
output = msg2proto(tak_message, protover)
return output

Expand Down
23 changes: 21 additions & 2 deletions tests/test_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ def test_xml2proto_mesh(self):
self.assertEqual(bytes(buf), bytes(t_ba))

def test_parse_proto_mesh(self):
"""Test encoding CoT XML string as TAK Protocol Version 1 Mesh Protobuf."""
"""Test deserializing TAK Protocol Version 1 Mesh bytes to TakMessage Protobuf."""
t_ba = bytearray(
b'\xbf\x01\xbf\x12\xb0\x02\n\x0ba-f-G-E-V-C*$aa0b0312-b5cd-4c2c-bbbc-9c4c702162610\xa0\xd1\xfc\xaf\x82.8\xa0\xd1\xfc\xaf\x82.@\x98\xa4\xfe\xaf\x82.J\x03h-eQ3\x98T\xa7b\xfdE@Y}*~\xbe\xf3\x84P\xc0aW\\\x1c\x95\x9b\xc4:@i\x00\x00\x00\xe0\xcf\x12cAq\x00\x00\x00\xe0\xcf\x12cAz\xb3\x01\n/<uid Droid="Eliopoli HQ" /><another test="1" />\x12$\n\x15192.168.1.10:4242:tcp\x12\x0bEliopoli HQ\x1a\x0c\n\x06Yellow\x12\x02HQ*\x02\x08d2F\n\x11LENOVO 20QV0007US\x12\nWinTAK-CIV\x1a\x19Microsoft Windows 10 Home"\n1.10.0.137:\x00'
)
Expand Down Expand Up @@ -128,7 +128,7 @@ def test_xml2proto_stream(self):
self.assertEqual(bytes(buf), bytes(t_ba))

def test_parse_proto_stream(self):
"""Test encoding CoT XML string as TAK Protocol Version 1 Stream Protobuf."""
"""Test deserializing TAK Protocol Version 1 Stream bytes to TakMessage Protobuf."""
t_ba = bytearray(
b'\xbf\xb3\x02\x12\xb0\x02\n\x0ba-f-G-E-V-C*$aa0b0312-b5cd-4c2c-bbbc-9c4c702162610\xa0\xd1\xfc\xaf\x82.8\xa0\xd1\xfc\xaf\x82.@\x98\xa4\xfe\xaf\x82.J\x03h-eQ3\x98T\xa7b\xfdE@Y}*~\xbe\xf3\x84P\xc0aW\\\x1c\x95\x9b\xc4:@i\x00\x00\x00\xe0\xcf\x12cAq\x00\x00\x00\xe0\xcf\x12cAz\xb3\x01\n/<uid Droid="Eliopoli HQ" /><another test="1" />\x12$\n\x15192.168.1.10:4242:tcp\x12\x0bEliopoli HQ\x1a\x0c\n\x06Yellow\x12\x02HQ*\x02\x08d2F\n\x11LENOVO 20QV0007US\x12\nWinTAK-CIV\x1a\x19Microsoft Windows 10 Home"\n1.10.0.137:\x00'
)
Expand All @@ -143,3 +143,22 @@ def test_parse_proto_stream(self):
'<uid Droid="Eliopoli HQ" /><another test="1" />',
)
self.assertEqual(cot_event.detail.contact.callsign, "Eliopoli HQ")

def test_parse_proto_xml(self):
"""Test deserializing CoT XML bytes to TakMessage Protobuf."""
t_xml = """<?xml version='1.0' encoding='UTF-8' standalone='yes'?>
<event version='2.0' uid='aa0b0312-b5cd-4c2c-bbbc-9c4c70216261' type='a-f-G-E-V-C' time='2020-02-08T18:10:44.000Z' start='2020-02-08T18:10:44.000Z' stale='2020-02-08T18:11:11.000Z' how='h-e'><point lat='43.97957317' lon='-66.07737696' hae='26.767999' ce='9999999.0' le='9999999.0' /><detail><uid Droid='Eliopoli HQ'/><contact callsign='Eliopoli HQ' endpoint='192.168.1.10:4242:tcp'/><__group name='Yellow' role='HQ'/><status battery='100'/><takv platform='WinTAK-CIV' device='LENOVO 20QV0007US' os='Microsoft Windows 10 Home' version='1.10.0.137'/><track speed='0.00000000' course='0.00000000'/><another test="1"/></detail></event>
"""

t_ba = bytearray(t_xml, encoding="utf-8")

parsed = takproto.parse_proto(t_ba)
cot_event = parsed.cotEvent

self.assertEqual(cot_event.type, "a-f-G-E-V-C")
self.assertEqual(cot_event.uid, "aa0b0312-b5cd-4c2c-bbbc-9c4c70216261")
self.assertEqual(
cot_event.detail.xmlDetail,
'<uid Droid="Eliopoli HQ" /><another test="1" />',
)
self.assertEqual(cot_event.detail.contact.callsign, "Eliopoli HQ")

0 comments on commit 5d8b25f

Please sign in to comment.