Skip to content

Commit

Permalink
Merge pull request #204 from obsidianforensics/timestamp-parser-refactor
Browse files Browse the repository at this point in the history
Refactor the timestamp parser with more structured data types and tim…
  • Loading branch information
obsidianforensics authored Nov 20, 2024
2 parents e687b96 + 61f5a43 commit ea44658
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 36 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/unit-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
runs-on: ${{ matrix.os }}
strategy:
matrix:
python-version: ['3.10', '3.11', '3.12']
python-version: ['3.11', '3.12', '3.13']
os: [ubuntu-latest, windows-latest, macos-latest]

steps:
Expand Down
89 changes: 72 additions & 17 deletions unfurl/parsers/parse_timestamp.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2020 Google LLC
# Copyright 2024 Ryan Benson
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -14,6 +14,7 @@

import datetime
import re

from unfurl import utils

timestamp_edge = {
Expand Down Expand Up @@ -50,7 +51,11 @@ def decode_epoch_seconds(seconds):
2030: 1900000000
"""
return datetime.datetime.utcfromtimestamp(float(seconds)), 'Epoch seconds'
return {
'data_type': 'timestamp.epoch-seconds',
'display_type': 'Epoch seconds',
'timestamp_value': str(datetime.datetime.fromtimestamp(float(seconds), datetime.UTC))
}


def decode_epoch_centiseconds(centiseconds):
Expand All @@ -68,9 +73,13 @@ def decode_epoch_centiseconds(centiseconds):
"""
# Trim off the 4 trailing 0s (don't add precision that wasn't in the timestamp)
converted_ts = trim_zero_fractional_seconds(
str(datetime.datetime.utcfromtimestamp(float(centiseconds) / 100)), 4)
return converted_ts, 'Epoch centiseconds'
str(datetime.datetime.fromtimestamp(float(centiseconds) / 100, datetime.UTC)), 4)

return {
'data_type': 'timestamp.epoch-centiseconds',
'display_type': 'Epoch centiseconds',
'timestamp_value': converted_ts
}

def decode_epoch_milliseconds(milliseconds):
"""Decode a numeric timestamp in Epoch milliseconds format to a human-readable timestamp.
Expand All @@ -87,7 +96,12 @@ def decode_epoch_milliseconds(milliseconds):
converted_dt = datetime.datetime(1970, 1, 1) + datetime.timedelta(milliseconds=float(milliseconds))
# Trim off the 3 trailing 0s (don't add precision that wasn't in the timestamp)
converted_ts = trim_zero_fractional_seconds(str(converted_dt), 3)
return converted_ts, 'Epoch milliseconds'

return {
'data_type': 'timestamp.epoch-milliseconds',
'display_type': 'Epoch milliseconds',
'timestamp_value': converted_ts
}


def decode_epoch_ten_microseconds(ten_microseconds):
Expand All @@ -105,9 +119,13 @@ def decode_epoch_ten_microseconds(ten_microseconds):
"""
# Trim off the trailing 0 (don't add precision that wasn't in the timestamp)
converted_ts = trim_zero_fractional_seconds(
str(datetime.datetime.utcfromtimestamp(float(ten_microseconds) / 100000)), 1)
return converted_ts, 'Epoch ten-microsecond increments'
str(datetime.datetime.fromtimestamp(float(ten_microseconds) / 100000, datetime.UTC)), 1)

return {
'data_type': 'timestamp.epoch-ten-microseconds',
'display_type': 'Epoch ten-microsecond increments',
'timestamp_value': converted_ts
}

def decode_epoch_microseconds(microseconds):
"""Decode a numeric timestamp in Epoch microseconds format to a human-readable timestamp.
Expand All @@ -121,8 +139,13 @@ def decode_epoch_microseconds(microseconds):
2030: 1900000000000000
"""
converted_ts = str(datetime.datetime.utcfromtimestamp(float(microseconds) / 1000000))
return converted_ts, 'Epoch microseconds'
converted_ts = datetime.datetime.fromtimestamp(float(microseconds) / 1000000, datetime.UTC)

return {
'data_type': 'timestamp.epoch-microseconds',
'display_type': 'Epoch microseconds',
'timestamp_value': str(converted_ts)
}


def decode_webkit(microseconds):
Expand All @@ -136,8 +159,13 @@ def decode_webkit(microseconds):
2025: 13380163200000000
"""
return datetime.datetime.utcfromtimestamp((float(microseconds) / 1000000) - 11644473600), 'Webkit'
converted_ts = datetime.datetime.fromtimestamp((float(microseconds) / 1000000) - 11644473600, datetime.UTC)

return {
'data_type': 'timestamp.webkit',
'display_type': 'Webkit',
'timestamp_value': str(converted_ts)
}

def decode_windows_filetime(intervals):
"""Decode a numeric timestamp in Windows FileTime format to a human-readable timestamp.
Expand All @@ -152,8 +180,13 @@ def decode_windows_filetime(intervals):
2065: 146424672000000000
"""
return datetime.datetime.utcfromtimestamp((float(intervals) / 10000000) - 11644473600), 'Windows FileTime'
converted_ts = datetime.datetime.fromtimestamp((float(intervals) / 10000000) - 11644473600, datetime.UTC)

return {
'data_type': 'timestamp.windows-filetime',
'display_type': 'Windows FileTime',
'timestamp_value': str(converted_ts)
}

def decode_datetime_ticks(ticks):
"""Decode a numeric timestamp in .Net/C# DateTime ticks format to a human-readable timestamp.
Expand All @@ -175,7 +208,13 @@ def decode_datetime_ticks(ticks):
"""
seconds = (ticks - 621355968000000000) / 10000000
return (datetime.datetime.fromtimestamp(seconds)), 'DateTime ticks'
converted_ts = datetime.datetime.fromtimestamp(seconds)

return {
'data_type': 'timestamp.datetime-ticks',
'display_type': 'DateTime ticks',
'timestamp_value': str(converted_ts)
}


def decode_mac_absolute_time(seconds):
Expand All @@ -194,7 +233,13 @@ def decode_mac_absolute_time(seconds):
2035: 1072915200
"""
return datetime.datetime.utcfromtimestamp(float(seconds)+978307200), 'Mac Absolute Time / Cocoa'
converted_ts = datetime.datetime.fromtimestamp(float(seconds) + 978307200, datetime.UTC)

return {
'data_type': 'timestamp.mac-absolute-time',
'display_type': 'Mac Absolute Time / Cocoa',
'timestamp_value': str(converted_ts)
}


def decode_epoch_hex(seconds):
Expand All @@ -209,7 +254,12 @@ def decode_epoch_hex(seconds):
"""
timestamp, _ = decode_epoch_seconds(int(seconds, 16))
return timestamp, 'Epoch seconds (hex)'

return {
'data_type': 'timestamp.epoch-seconds-hex',
'display_type': 'Epoch seconds (hex)',
'timestamp_value': str(timestamp)
}


def decode_windows_filetime_hex(intervals):
Expand All @@ -227,7 +277,12 @@ def decode_windows_filetime_hex(intervals):
"""
int_right = int(intervals, 16)
timestamp, _ = decode_windows_filetime(int_right)
return timestamp, 'Windows FileTime (hex)'

return {
'data_type': 'timestamp.windows-filetime-hex',
'display_type': 'Windows FileTime (hex)',
'timestamp_value': str(timestamp)
}


def run(unfurl, node):
Expand Down Expand Up @@ -333,6 +388,6 @@ def run(unfurl, node):

if new_timestamp != (None, 'unknown'):
unfurl.add_to_queue(
data_type=new_timestamp[1], key=None, value=new_timestamp[0],
hover=f'Converted as {new_timestamp[1]}', parent_id=node.node_id,
data_type=new_timestamp['data_type'], key=None, value=new_timestamp['timestamp_value'],
hover=f'Converted as {new_timestamp["display_type"]}', parent_id=node.node_id,
incoming_edge_config=timestamp_edge)
5 changes: 3 additions & 2 deletions unfurl/parsers/parse_url.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2019 Google LLC
# Copyright 2024 Ryan Benson
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -43,7 +43,8 @@ def parse_delimited_string(unfurl_instance, node, delimiter, pairs=False) -> Non

def try_url_unquote(unfurl_instance, node) -> bool:
unquoted = urllib.parse.unquote_plus(node.value)
if unquoted != node.value:
# The regex is to avoid erroneously unquoting a timestamp string (ending with +00:00)
if unquoted != node.value and not re.match(r'.*\+\d\d:\d\d$', node.value):
unfurl_instance.add_to_queue(
data_type='string', key=None, value=unquoted,
hover='Unquoted URL (replaced %xx escapes with their single-character equivalent)',
Expand Down
4 changes: 2 additions & 2 deletions unfurl/tests/unit/test_bluesky.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def test_bluesky_post(self):
self.assertEqual(1732040395098000, test.nodes[12].value)

# embedded timestamp parses correctly
self.assertEqual('2024-11-19 18:19:55.098000', test.nodes[13].value)
self.assertEqual('2024-11-19 18:19:55.098000+00:00', test.nodes[13].value)

def test_bluesky_bare_tid(self):
""" Test parsing a Bluesky/ATProto TID"""
Expand All @@ -46,7 +46,7 @@ def test_bluesky_bare_tid(self):
self.assertEqual(1731543333133695, test.nodes[2].value)

# embedded timestamp parses correctly
self.assertEqual('2024-11-14 00:15:33.133695', test.nodes[3].value)
self.assertEqual('2024-11-14 00:15:33.133695+00:00', test.nodes[3].value)

if __name__ == '__main__':
unittest.main()
15 changes: 1 addition & 14 deletions unfurl/tests/unit/test_jwt.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from unfurl.core import Unfurl
import datetime
import unittest


Expand Down Expand Up @@ -32,10 +31,6 @@ def test_jwt_simple(self):
# confirm that the explanation of the standard "typ" parameter was added
self.assertIn('declare the media type', test.nodes[12].label)

# make sure the queue finished empty
self.assertTrue(test.queue.empty())
self.assertEqual(len(test.edges), 0)

def test_jwt_iat_timestamp(self):
"""Parse a sole JWT with an iat field that is parsed as a timestamp.
Expand Down Expand Up @@ -64,11 +59,7 @@ def test_jwt_iat_timestamp(self):
self.assertEqual(1422779638, test.nodes[10].value)

# confirm that the "iat" claim was detected and parsed as a timestamp
self.assertEqual(datetime.datetime(2015, 2, 1, 8, 33, 58), test.nodes[14].value)

# make sure the queue finished empty
self.assertTrue(test.queue.empty())
self.assertEqual(len(test.edges), 0)
self.assertEqual('2015-02-01 08:33:58+00:00', test.nodes[14].value)

def test_jwt_as_url_segment(self):
"""Parse a JWT that is part of the URL.
Expand Down Expand Up @@ -101,10 +92,6 @@ def test_jwt_as_url_segment(self):
# confirm that the header was parsed as JSON
self.assertEqual('alg', test.nodes[19].key)

# make sure the queue finished empty
self.assertTrue(test.queue.empty())
self.assertEqual(len(test.edges), 0)


if __name__ == '__main__':
unittest.main()

0 comments on commit ea44658

Please sign in to comment.