-
Notifications
You must be signed in to change notification settings - Fork 8
/
proxy.py
executable file
·50 lines (42 loc) · 1.51 KB
/
proxy.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
#!/usr/bin/env python3
import asyncio
import logging
import os
import random
from dns.resolver import Resolver
logging.root.setLevel(logging.INFO)
mode = os.environ["MODE"]
ports = os.environ["PORT"].split()
max_connections = os.environ.get("MAX_CONNECTIONS", 100)
ip = target = os.environ["TARGET"]
udp_answers = os.environ.get("UDP_ANSWERS", "1")
# Resolve target if required
if os.environ["PRE_RESOLVE"] == "1":
resolver = Resolver()
resolver.nameservers = os.environ["NAMESERVERS"].split()
ip = random.choice([answer.address for answer in resolver.resolve(target)])
logging.info("Resolved %s to %s", target, ip)
async def netcat(port):
# Use a persistent BusyBox netcat server in listening mode
command = ["socat"]
# Verbose mode
if os.environ["VERBOSE"] == "1":
command.append("-v")
if mode == "udp" and udp_answers == "0":
command += [f"udp-recv:{port},reuseaddr", f"udp-sendto:{ip}:{port}"]
else:
command += [
f"{mode}-listen:{port},fork,reuseaddr,max-children={max_connections}",
f"{mode}-connect:{ip}:{port}",
]
# Create the process and wait until it exits
logging.info("Executing: %s", " ".join(command))
process = await asyncio.create_subprocess_exec(*command)
await process.wait()
# Wait until all proxies exited, if they ever do
try:
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(*map(netcat, ports)))
finally:
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()