-
Notifications
You must be signed in to change notification settings - Fork 41
/
scout.py
134 lines (118 loc) · 5.94 KB
/
scout.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
#!/usr/bin/python
########################################################################
# Copyright (c) 2017
# Daniel Plohmann <daniel.plohmann<at>mailbox<dot>org>
# All rights reserved.
########################################################################
#
# This file is part of apiscout
#
# apiscout is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see
# <http://www.gnu.org/licenses/>.
#
########################################################################
import os
import re
import sys
import json
import logging
import argparse
from apiscout.ApiScout import ApiScout
LOG = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO, format="%(asctime)-15s %(message)s")
def get_this_dir():
return os.path.abspath(os.path.join(os.path.dirname(__file__)))
def get_all_db_files():
db_dir = get_this_dir() + os.sep + "dbs" + os.sep
return [db_dir + fn for fn in os.listdir(db_dir) if fn.endswith(".json")]
def get_base_addr(args):
if args.base_addr:
return int(args.base_addr, 16) if args.base_addr.startswith("0x") else int(args.base_addr)
# try to infer from filename:
baddr_match = re.search(re.compile("0x(?P<base_addr>[0-9a-fA-F]{8,16})$"), args.binary_path)
if baddr_match:
return int(baddr_match.group("base_addr"), 16)
return 0
def write_results_to_json(output_file, filtered_results):
json_results = []
for api_map_name in filtered_results:
for api in filtered_results[api_map_name]:
json_results.append({
"offset": api[0],
"apiAddress": api[1],
"dll": api[2].split('_')[0]+f"({api[4]} bit)",
"api": api[3],
"references": sorted(api[7])
})
with open(output_file, 'w') as f:
json.dump(json_results, f)
def main():
parser = argparse.ArgumentParser(description='Demo: Use apiscout with a prepared api database (created using DatabaseBuilder.py) to crawl a dump for imports and render the results.')
parser.add_argument('-f', '--filter', type=int, default=0, help='Filter out APIs that do not have a neighbour within N bytes (e.g. 32 or 2048).')
parser.add_argument('-i', '--ignore_aslr', action='store_true', help='Do not apply the per-module ASLR offset potentially contained in a API DB file.')
parser.add_argument('-c', '--collection_file', type=str, default='', help='Optionally match the output against a WinApi1024 vector collection file.')
parser.add_argument('-b', '--base_addr', type=str, default='', help='Set base address to given value (int or 0x-hex format).')
parser.add_argument('-t', '--import_table_only', action='store_true', help='Do not crawl for API references but only parse the import table instead - assumes an unmapped PE file as input.')
parser.add_argument('-o', '--json_file', type=str, default='', help='Write mapping from addresses to APIs found to JSON file.')
parser.add_argument('-s', '--silent', action='store_true', default=False, help='Suppress all output.')
parser.add_argument('binary_path', type=str, default='', help='Path to the memory dump to crawl.')
parser.add_argument('db_path', type=str, nargs='*', help='Path to the DB(s). If no argument is given, use all files found in "./dbs"')
args = parser.parse_args()
if args.binary_path:
binary = ""
if os.path.isfile(args.binary_path):
with open(args.binary_path, "rb") as f_binary:
binary = f_binary.read()
scout = ApiScout()
base_addr = get_base_addr(args)
if not args.silent:
print("Using base address 0x{:x} to infer reference counts.".format(base_addr))
scout.setBaseAddress(base_addr)
# override potential ASLR offsets that are stored in the API DB files.
scout.ignoreAslrOffsets(args.ignore_aslr)
# load DB file
db_paths = []
if args.db_path:
db_paths = args.db_path
elif not args.import_table_only:
db_paths = get_all_db_files()
for db_path in db_paths:
scout.loadDbFile(db_path)
# scout the binary
results = {}
if args.import_table_only:
if not args.silent:
print("Parsing Import Table for\n {}.".format(args.binary_path))
results = scout.evaluateImportTable(binary, is_unmapped=True)
else:
num_apis_loaded = scout.getNumApisLoaded()
filter_info = " - neighbour filter: 0x%x" % args.filter if args.filter else ""
if not args.silent:
print("Using \n {}\nto analyze\n {}.".format("\n ".join(db_paths), args.binary_path))
print("Buffer size is {} bytes, {} APIs loaded{}.\n".format(len(binary), num_apis_loaded, filter_info))
results = scout.crawl(binary)
filtered_results = scout.filter(results, 0, 0, args.filter)
if not args.silent:
print(scout.render(filtered_results))
print(scout.renderVectorResults(filtered_results))
if args.json_file:
write_results_to_json(args.json_file, filtered_results)
if not args.silent:
print("Wrote ApiScout results to {}.".format(args.json_file))
if args.collection_file and not args.silent:
print(scout.renderResultsVsCollection(filtered_results, args.collection_file))
else:
parser.print_help()
if __name__ == "__main__":
sys.exit(main())