Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support mixed-mode rx #17

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions takproto/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

DEFAULT_PROTO_HEADER = bytearray(b"\xbf")
DEFAULT_MESH_HEADER = bytearray(b"\xbf\x01\xbf")
DEFAULT_XML_HEADER = bytearray(b"<?xml")

W3C_XML_DATETIME: str = "%Y-%m-%dT%H:%M:%S.%fZ"
ISO_8601_UTC = W3C_XML_DATETIME # Issue 7: Not technically correct.
Expand Down
19 changes: 14 additions & 5 deletions takproto/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
ISO_8601_UTC,
DEFAULT_MESH_HEADER,
DEFAULT_PROTO_HEADER,
DEFAULT_XML_HEADER,
TAKProtoVer,
)
from takproto.proto import TakMessage
Expand All @@ -50,6 +51,8 @@ def parse_proto(msg: bytearray) -> Optional[bytearray]:
parsed = parse_mesh(msg)
elif msg[0] in DEFAULT_PROTO_HEADER:
parsed = parse_stream(msg)
elif msg[:5] == DEFAULT_XML_HEADER:
parsed = xml2message(msg)
return parsed


Expand All @@ -70,13 +73,12 @@ def parse_stream(msg):

def format_time(time: str) -> int:
"""Format timestamp as microseconds."""
s_time = datetime.strptime(time + "+0000", ISO_8601_UTC + "%z")
fmt = ISO_8601_UTC if "." in time else ISO_8601_UTC.replace(".%f", "")
s_time = datetime.strptime(time + "+0000", fmt + "%z")
return int(s_time.timestamp() * 1000)


def xml2proto(
xml: str, protover: Optional[TAKProtoVer] = None
): # NOQA pylint: disable=too-many-locals,too-many-branches,too-many-statements
def xml2message(xml: str) -> TakMessage: # NOQA pylint: disable=too-many-locals,too-many-branches,too-many-statements
"""Convert plain XML CoT to Protobuf."""
event = ET.fromstring(xml)
tak_message = TakMessage()
Expand Down Expand Up @@ -128,7 +130,8 @@ def xml2proto(
# If this is a GeoChat message, write the contents of <detail> in xmlDetail.
if uid and "GeoChat." in uid:
pattern = "<detail>(.*?)</detail>"
xmldetailstr = re.search(pattern, xml).group(1)
target = ET.tostring(detail).decode("utf-8")
xmldetailstr = re.search(pattern, target).group(1)
new_detail.xmlDetail = xmldetailstr
else:
# Add unknown elements to xmlDetail field.
Expand Down Expand Up @@ -193,6 +196,12 @@ def xml2proto(
if attrib_val:
setattr(new_detail.track, attrib, float(attrib_val))

return tak_message

def xml2proto(
xml: str, protover: Optional[TAKProtoVer] = None
):
tak_message = xml2message(xml)
output = msg2proto(tak_message, protover)
return output

Expand Down
36 changes: 34 additions & 2 deletions tests/test_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,19 @@ def test_format_timestamp(self):
)
self.assertEqual(time2, t_time)

def test_format_timestamp_without_subseconds(self):
"""Test formatting timestamp to and from Protobuf format."""
t_time = "2020-02-08T18:10:44Z"
t_ts = 1581185444000
ts = takproto.format_time(t_time)
self.assertEqual(ts, t_ts)

t_ts2 = t_ts / 1000
time2 = datetime.fromtimestamp(t_ts2, timezone.utc).strftime(
"%Y-%m-%dT%H:%M:%SZ"
)
self.assertEqual(time2, t_time)

def test_xml2proto_default(self):
"""Test encoding XML string as Protobuf bytearray."""
t_xml = """<?xml version='1.0' encoding='UTF-8' standalone='yes'?>
Expand Down Expand Up @@ -84,7 +97,7 @@ def test_xml2proto_mesh(self):
self.assertEqual(bytes(buf), bytes(t_ba))

def test_parse_proto_mesh(self):
"""Test encoding CoT XML string as TAK Protocol Version 1 Mesh Protobuf."""
"""Test deserializing TAK Protocol Version 1 Mesh bytes to TakMessage Protobuf."""
t_ba = bytearray(
b'\xbf\x01\xbf\x12\xb0\x02\n\x0ba-f-G-E-V-C*$aa0b0312-b5cd-4c2c-bbbc-9c4c702162610\xa0\xd1\xfc\xaf\x82.8\xa0\xd1\xfc\xaf\x82.@\x98\xa4\xfe\xaf\x82.J\x03h-eQ3\x98T\xa7b\xfdE@Y}*~\xbe\xf3\x84P\xc0aW\\\x1c\x95\x9b\xc4:@i\x00\x00\x00\xe0\xcf\x12cAq\x00\x00\x00\xe0\xcf\x12cAz\xb3\x01\n/<uid Droid="Eliopoli HQ" /><another test="1" />\x12$\n\x15192.168.1.10:4242:tcp\x12\x0bEliopoli HQ\x1a\x0c\n\x06Yellow\x12\x02HQ*\x02\x08d2F\n\x11LENOVO 20QV0007US\x12\nWinTAK-CIV\x1a\x19Microsoft Windows 10 Home"\n1.10.0.137:\x00'
)
Expand Down Expand Up @@ -115,7 +128,7 @@ def test_xml2proto_stream(self):
self.assertEqual(bytes(buf), bytes(t_ba))

def test_parse_proto_stream(self):
"""Test encoding CoT XML string as TAK Protocol Version 1 Stream Protobuf."""
"""Test deserializing TAK Protocol Version 1 Stream bytes to TakMessage Protobuf."""
t_ba = bytearray(
b'\xbf\xb3\x02\x12\xb0\x02\n\x0ba-f-G-E-V-C*$aa0b0312-b5cd-4c2c-bbbc-9c4c702162610\xa0\xd1\xfc\xaf\x82.8\xa0\xd1\xfc\xaf\x82.@\x98\xa4\xfe\xaf\x82.J\x03h-eQ3\x98T\xa7b\xfdE@Y}*~\xbe\xf3\x84P\xc0aW\\\x1c\x95\x9b\xc4:@i\x00\x00\x00\xe0\xcf\x12cAq\x00\x00\x00\xe0\xcf\x12cAz\xb3\x01\n/<uid Droid="Eliopoli HQ" /><another test="1" />\x12$\n\x15192.168.1.10:4242:tcp\x12\x0bEliopoli HQ\x1a\x0c\n\x06Yellow\x12\x02HQ*\x02\x08d2F\n\x11LENOVO 20QV0007US\x12\nWinTAK-CIV\x1a\x19Microsoft Windows 10 Home"\n1.10.0.137:\x00'
)
Expand All @@ -130,3 +143,22 @@ def test_parse_proto_stream(self):
'<uid Droid="Eliopoli HQ" /><another test="1" />',
)
self.assertEqual(cot_event.detail.contact.callsign, "Eliopoli HQ")

def test_parse_proto_xml(self):
"""Test deserializing CoT XML bytes to TakMessage Protobuf."""
t_xml = """<?xml version='1.0' encoding='UTF-8' standalone='yes'?>
<event version='2.0' uid='aa0b0312-b5cd-4c2c-bbbc-9c4c70216261' type='a-f-G-E-V-C' time='2020-02-08T18:10:44.000Z' start='2020-02-08T18:10:44.000Z' stale='2020-02-08T18:11:11.000Z' how='h-e'><point lat='43.97957317' lon='-66.07737696' hae='26.767999' ce='9999999.0' le='9999999.0' /><detail><uid Droid='Eliopoli HQ'/><contact callsign='Eliopoli HQ' endpoint='192.168.1.10:4242:tcp'/><__group name='Yellow' role='HQ'/><status battery='100'/><takv platform='WinTAK-CIV' device='LENOVO 20QV0007US' os='Microsoft Windows 10 Home' version='1.10.0.137'/><track speed='0.00000000' course='0.00000000'/><another test="1"/></detail></event>
"""

t_ba = bytearray(t_xml, encoding="utf-8")

parsed = takproto.parse_proto(t_ba)
cot_event = parsed.cotEvent

self.assertEqual(cot_event.type, "a-f-G-E-V-C")
self.assertEqual(cot_event.uid, "aa0b0312-b5cd-4c2c-bbbc-9c4c70216261")
self.assertEqual(
cot_event.detail.xmlDetail,
'<uid Droid="Eliopoli HQ" /><another test="1" />',
)
self.assertEqual(cot_event.detail.contact.callsign, "Eliopoli HQ")
Loading