#!/usr/bin/env python
from __future__ import annotations
import datetime
import threading
import xmlrpc.client as xmlclient
from xml.sax.saxutils import escape
from xmlrpc.server import SimpleXMLRPCServer
import idaapi
import idc
# Wait for any processing to get done
idaapi.auto_wait()
# On Windows with NTFS filesystem a filepath with ':'
# is treated as NTFS ADS (Alternative Data Stream)
# and so saving file with such name fails
dt = datetime.datetime.now().isoformat().replace(":", "-")
# Save the database so nothing gets lost.
idc.save_database(idc.get_idb_path() + "." + dt)
DEBUG_MARSHALLING = False
def create_marshaller(use_format=None, just_to_str=False):
assert (
use_format or just_to_str
), "Either pass format to use or make it converting the value to str."
def wrapper(_marshaller, value, appender):
if use_format:
marshalled = use_format % value
elif just_to_str:
marshalled = f"{escape(str(value))}"
if DEBUG_MARSHALLING:
print(f"Marshalled: '{marshalled}'")
appender(marshalled)
return wrapper
xmlclient.Marshaller.dispatch[type(1 << 63)] = create_marshaller("%d")
xmlclient.Marshaller.dispatch[int] = create_marshaller("%d")
xmlclient.Marshaller.dispatch[idaapi.cfuncptr_t] = create_marshaller(just_to_str=True)
host = "127.0.0.1"
port = 31337
mutex = threading.Condition()
def wrap(f):
def wrapper(*a, **kw):
rv = []
error = []
def work():
try:
result = f(*a, **kw)
rv.append(result)
except Exception as e:
error.append(e)
with mutex:
flags = idaapi.MFF_WRITE
if f == idc.set_color:
flags |= idaapi.MFF_NOWAIT
rv.append(None)
idaapi.execute_sync(work, flags)
if error:
msg = f"Failed on calling {f.__module__}.{f.__name__} with args: {a}, kwargs: {kw}\nException: {str(error[0])}"
print("[!!!] ERROR:", msg)
raise error[0]
return rv[0]
return wrapper
def register_module(module):
for name, function in module.__dict__.items():
if hasattr(function, "__call__"):
server.register_function(wrap(function), name)
def decompile(addr):
"""
Function that overwrites `idaapi.decompile` for xmlrpc so that instead
of throwing an exception on `idaapi.DecompilationFailure` it just returns `None`.
(so that we don't have to parse xmlrpc Fault's exception string on pwndbg side
as it differs between IDA versions).
"""
try:
return idaapi.decompile(addr)
except idaapi.DecompilationFailure:
return None
def get_decompile_coord_by_ea(cfunc, addr):
if idaapi.IDA_SDK_VERSION >= 720:
item = cfunc.body.find_closest_addr(addr)
y_holder = idaapi.int_pointer()
if not cfunc.find_item_coords(item, None, y_holder):
return None
y = y_holder.value()
else:
lnmap = {}
for i, line in enumerate(cfunc.pseudocode):
phead = idaapi.ctree_item_t()
pitem = idaapi.ctree_item_t()
ptail = idaapi.ctree_item_t()
ret = cfunc.get_line_item(line.line, 0, True, phead, pitem, ptail)
if ret and pitem.it:
lnmap[pitem.it.ea] = i
y = None
closest_ea = idaapi.BADADDR
for ea, line in lnmap.items():
if closest_ea == idaapi.BADADDR or abs(closest_ea - addr) > abs(ea - addr):
closest_ea = ea
y = lnmap[ea]
return y
def decompile_context(addr, context_lines):
cfunc = decompile(addr)
if cfunc is None:
return None
y = get_decompile_coord_by_ea(cfunc, addr)
if y is None:
return cfunc
lines = cfunc.get_pseudocode()
retlines = []
for lnnum in range(max(0, y - context_lines), min(len(lines), y + context_lines)):
retlines.append(idaapi.tag_remove(lines[lnnum].line))
if lnnum == y:
retlines[-1] = ">" + retlines[-1][1:]
return "\n".join(retlines)
def versions():
"""Returns IDA & Python versions"""
import sys
return {
"python": sys.version,
"ida": idaapi.get_kernel_version(),
"hexrays": idaapi.get_hexrays_version() if idaapi.init_hexrays_plugin() else None,
}
server = SimpleXMLRPCServer((host, port), logRequests=False, allow_none=True)
register_module(idaapi)
register_module(
idc
) # prioritize idc functions over above (e.g. idc.get_next_seg/ida_segment.get_next_seg)
server.register_function(lambda a: eval(a, globals(), locals()), "eval")
server.register_function(wrap(decompile)) # overwrites idaapi/ida_hexrays.decompile
server.register_function(wrap(decompile_context), "decompile_context") # support context decompile
server.register_function(wrap(versions))
server.register_introspection_functions()
print(f"IDA Pro xmlrpc hosted on http://{host}:{port}")
print("Call `shutdown()` to shutdown the IDA Pro xmlrpc server.")
thread = threading.Thread(target=server.serve_forever)
thread.daemon = True
thread.start()
def shutdown():
global server
global thread
server.shutdown()
server.server_close()
del server
del thread