TLS related enhancements (#1536)

* Refactor TLS module

- Replace unreliable `__errno_location()` trick with `pthread_self()` to acquire TLS address
- Consolidate heap heuristics checks about TLS within the `pwndbg.gdblib.tls` module for better organization

* Bug fix for the `errno` command

Calling `__errno_location()` without locking the scheduler can cause another thread to inadvertently continue execution

* Refactor code about heap heuristics of thread-local variables

- Replace some checks with some functions in `pwndbg.gdblib.tls`
- Try to find tcache with `mp_.sbrk_base + 0x10` if the target is single-threaded

* Add tests for heap heuristics with multi-threaded

* Refacotr scheduler-locking related functions

- Move these functions into `pwndbg.gdblib.scheduler`
- Fetch the parameter value once (https://github.com/pwndbg/pwndbg/pull/1536#discussion_r1082549746)

* Avoid bug caused by GLIBC_TUNABLES

See https://github.com/pwndbg/pwndbg/pull/1536#discussion_r1083202815

* Add note about `set scheduler-locking on`

* Add comment for `lock_scheduler`

Co-authored-by: Disconnect3d <dominik.b.czarnota@gmail.com>

* Update DEVELOPING.md

Co-authored-by: Disconnect3d <dominik.b.czarnota@gmail.com>
This commit is contained in:
Alan Li 2023-01-25 13:19:39 +08:00 committed by GitHub
parent 52a479211c
commit 2a5f563444
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 269 additions and 293 deletions

View File

@ -108,3 +108,4 @@ Feel free to update the list below!
* We would like to add proper tests for pwndbg - see tests framework PR if you want to help on that.
* If you want to use `gdb.parse_and_eval("a_function_name()")` or something similar that call a function, please remember this might cause another thread to continue execution without `set scheduler-locking on`. If you didn't expect that, you should use `parse_and_eval_with_scheduler_lock` from `pwndbg.gdblib.scheduler` instead.

View File

@ -10,6 +10,7 @@ import pwndbg.commands
import pwndbg.gdblib.regs
import pwndbg.gdblib.symbol
from pwndbg.commands import CommandCategory
from pwndbg.gdblib.scheduler import parse_and_eval_with_scheduler_lock
errno.errorcode[0] = "OK" # type: ignore # manually add error code 0 for "OK"
@ -44,7 +45,11 @@ def errno_(err) -> None:
if errno_loc_gotplt is None or pwndbg.gdblib.vmmap.find(
pwndbg.gdblib.memory.pvoid(errno_loc_gotplt)
):
err = int(gdb.parse_and_eval("*((int *(*) (void)) __errno_location) ()"))
err = int(
parse_and_eval_with_scheduler_lock(
"*((int *(*) (void)) __errno_location) ()"
)
)
else:
print(
"Could not determine error code automatically: the __errno_location@got.plt has no valid address yet (perhaps libc.so hasn't been loaded yet?)"

View File

@ -45,7 +45,6 @@ def on_start() -> None:
@pwndbg.gdblib.events.exit
def on_exit() -> None:
pwndbg.gdblib.tls.reset()
pwndbg.gdblib.file.reset_remote_files()
pwndbg.gdblib.next.clear_temp_breaks()

View File

@ -0,0 +1,30 @@
from contextlib import contextmanager
import gdb
@contextmanager
def lock_scheduler():
"""
This context manager can be used to run GDB commands with threads scheduling
being locked which means that other threads will be stopped during execution.
This is useful to prevent bugs where e.g.: gdb.parse_and_eval("(int)foo()")
would execute foo() on the current debugee thread but would also unlock other
threads for being executed and those other threads may for example hit a
breakpoint we set previously which would be confusing for the user.
See also: https://sourceware.org/gdb/onlinedocs/gdb/All_002dStop-Mode.html
"""
old_config = gdb.parameter("scheduler-locking")
if old_config != "on":
gdb.execute("set scheduler-locking on")
yield
gdb.execute("set scheduler-locking %s" % old_config)
else:
yield
def parse_and_eval_with_scheduler_lock(expr: str) -> gdb.Value:
with lock_scheduler():
return gdb.parse_and_eval(expr)

View File

@ -12,76 +12,42 @@ import pwndbg.gdblib.memory
import pwndbg.gdblib.regs
import pwndbg.gdblib.symbol
import pwndbg.gdblib.vmmap
from pwndbg.gdblib.scheduler import parse_and_eval_with_scheduler_lock
class module(ModuleType):
"""Getting Thread Local Storage (TLS) information."""
_errno_offset = None
def is_thread_local_variable_offset(self, offset: int) -> bool:
"""Check if the offset to TLS is a valid offset for the heap heuristics."""
if pwndbg.gdblib.arch.current in ("x86-64", "i386"):
is_valid = 0 < -offset < 0x250
else: # elif pwndbg.gdblib.arch.current in ("aarch64", "arm"):
is_valid = 0 < offset < 0x250
# check alignment
return is_valid and offset % pwndbg.gdblib.arch.ptrsize == 0
def get_tls_base_via_errno_location(self) -> int:
"""Heuristically determine the base address of the TLS."""
if pwndbg.gdblib.symbol.address(
"__errno_location"
) is None or pwndbg.gdblib.arch.current not in (
"x86-64",
"i386",
"arm",
):
# Note: We doesn't implement this for aarch64 because its TPIDR_EL0 register seems always work
# If oneday we can't get TLS base via TPIDR_EL0, we should implement this for aarch64
def is_thread_local_variable(self, addr: int) -> bool:
"""Check if the address is a valid thread local variable's address for the heap heuristics."""
if not self.address:
# Since we can not get the TLS base address, we trust that the address is valid.
return True
return self.is_thread_local_variable_offset(
addr - self.address
) and addr in pwndbg.gdblib.vmmap.find(self.address)
def call_pthread_self(self) -> int:
"""Get the address of TLS by calling pthread_self()."""
if pwndbg.gdblib.symbol.address("pthread_self") is None:
return 0
try:
return int(parse_and_eval_with_scheduler_lock("(void *)pthread_self()"))
except gdb.error:
return 0
already_lock = gdb.parameter("scheduler-locking") == "on"
old_config = gdb.parameter("scheduler-locking")
if not already_lock:
gdb.execute("set scheduler-locking on")
errno_addr = int(gdb.parse_and_eval("(int *)__errno_location()"))
if not already_lock:
gdb.execute("set scheduler-locking %s" % old_config)
if not self._errno_offset:
__errno_location_instr = pwndbg.disasm.near(
pwndbg.gdblib.symbol.address("__errno_location"), 5, show_prev_insns=False
)
if pwndbg.gdblib.arch.current == "x86-64":
for instr in __errno_location_instr:
# Find something like: mov rax, qword ptr [rip + disp]
if instr.mnemonic == "mov":
self._errno_offset = pwndbg.gdblib.memory.s64(instr.next + instr.disp)
break
elif pwndbg.gdblib.arch.current == "i386":
for instr in __errno_location_instr:
# Find something like: mov eax, dword ptr [eax + disp]
# (disp is a negative value)
if instr.mnemonic == "mov":
# base offset is from the first `add eax` after `call __x86.get_pc_thunk.bx`
base_offset_instr = next(
instr for instr in __errno_location_instr if instr.mnemonic == "add"
)
base_offset = base_offset_instr.address + base_offset_instr.operands[1].int
self._errno_offset = pwndbg.gdblib.memory.s32(base_offset + instr.disp)
break
elif pwndbg.gdblib.arch.current == "arm":
ldr_instr = None
for instr in __errno_location_instr:
if not ldr_instr and instr.mnemonic == "ldr":
ldr_instr = instr
elif ldr_instr and instr.mnemonic == "add":
offset = ldr_instr.operands[1].mem.disp
offset = pwndbg.gdblib.memory.s32((ldr_instr.address + 4 & -4) + offset)
self._errno_offset = pwndbg.gdblib.memory.s32(instr.address + 4 + offset)
break
if not self._errno_offset:
raise OSError("Can not find tls base")
return errno_addr - self._errno_offset
@property
def address(self) -> int:
"""Get the base address of TLS."""
if pwndbg.gdblib.arch.current not in ("x86-64", "i386", "aarch64", "arm"):
# Not supported yet
return 0
tls_base = 0
if pwndbg.gdblib.arch.current == "x86-64":
@ -91,20 +57,12 @@ class module(ModuleType):
elif pwndbg.gdblib.arch.current == "aarch64":
tls_base = int(pwndbg.gdblib.regs.TPIDR_EL0)
# Sometimes, we need to get TLS base via errno location for the following reason:
# Sometimes, we need to get TLS base via pthread_self() for the following reason:
# For x86-64, fsbase might be 0 if we are remotely debugging and the GDB version <= 8.X
# For i386, gsbase might be 0 if we are remotely debugging
# For arm (32-bit), we doesn't have other choice
# For other archs, we can't get the TLS base address via register
# Note: aarch64 seems doesn't have this issue
is_valid_tls_base = (
pwndbg.gdblib.vmmap.find(tls_base) is not None
and tls_base % pwndbg.gdblib.arch.ptrsize == 0
)
return tls_base if is_valid_tls_base else self.get_tls_base_via_errno_location()
def reset(self) -> None:
# We should reset the offset when we attach to a new process
self._errno_offset = None
return tls_base if tls_base else self.call_pthread_self()
# To prevent garbage collection

View File

@ -1136,7 +1136,7 @@ class DebugSymsHeap(GlibcMemoryAllocator):
thread's tcache.
"""
if self.has_tcache():
tcache = self.mp["sbrk_base"] + 0x10
tcache = self.get_sbrk_heap_region().vaddr + 0x10
if self.multithreaded:
tcache_addr = pwndbg.gdblib.memory.pvoid(
pwndbg.gdblib.symbol.static_linkage_symbol_address("tcache")
@ -1440,27 +1440,12 @@ class HeuristicHeap(GlibcMemoryAllocator):
if thread_arena_via_config > 0:
return Arena(thread_arena_via_config)
elif thread_arena_via_symbol:
if pwndbg.gdblib.symbol.static_linkage_symbol_address("thread_arena"):
# If the symbol is static-linkage symbol, we trust it.
return Arena(pwndbg.gdblib.memory.u(thread_arena_via_symbol))
# Check &thread_arena is nearby TLS base or not to avoid false positive.
tls_base = pwndbg.gdblib.tls.address
if tls_base:
if pwndbg.gdblib.arch.current in ("x86-64", "i386"):
is_valid_address = 0 < tls_base - thread_arena_via_symbol < 0x250
else: # elif pwndbg.gdblib.arch.current in ("aarch64", "arm"):
is_valid_address = 0 < thread_arena_via_symbol - tls_base < 0x250
is_valid_address = (
is_valid_address
and thread_arena_via_symbol in pwndbg.gdblib.vmmap.find(tls_base)
)
if is_valid_address:
thread_arena_struct_addr = pwndbg.gdblib.memory.u(thread_arena_via_symbol)
# Check &thread_arena is a valid address or not to avoid false positive.
if pwndbg.gdblib.vmmap.find(thread_arena_struct_addr):
return Arena(thread_arena_struct_addr)
if pwndbg.gdblib.tls.is_thread_local_variable(thread_arena_via_symbol):
thread_arena_struct_addr = pwndbg.gdblib.memory.u(thread_arena_via_symbol)
# Check &thread_arena is a valid address or not to avoid false positive.
if pwndbg.gdblib.vmmap.find(thread_arena_struct_addr):
return Arena(thread_arena_struct_addr)
if not self._thread_arena_offset and pwndbg.gdblib.symbol.address("__libc_calloc"):
# TODO/FIXME: This method should be updated if we find a better way to find the target assembly code
@ -1554,7 +1539,6 @@ class HeuristicHeap(GlibcMemoryAllocator):
base_offset + offset
)
break
elif pwndbg.gdblib.arch.current == "arm":
# We need to find something near the first `mrc 15, ......`
# The flow of assembly code will like:
@ -1584,7 +1568,9 @@ class HeuristicHeap(GlibcMemoryAllocator):
)
break
if self._thread_arena_offset:
if self._thread_arena_offset and pwndbg.gdblib.tls.is_thread_local_variable_offset(
self._thread_arena_offset
):
tls_base = pwndbg.gdblib.tls.address
if tls_base:
thread_arena_struct_addr = tls_base + self._thread_arena_offset
@ -1600,6 +1586,9 @@ class HeuristicHeap(GlibcMemoryAllocator):
"""Locate a thread's tcache struct. We try to find its address in Thread Local Storage (TLS) first,
and if that fails, we guess it's at the first chunk of the heap.
"""
if not self.has_tcache():
print(message.warn("This version of GLIBC was not compiled with tcache support."))
return None
thread_cache_via_config = int(str(pwndbg.gdblib.config.tcache), 0)
thread_cache_via_symbol = pwndbg.gdblib.symbol.static_linkage_symbol_address(
"tcache"
@ -1608,218 +1597,189 @@ class HeuristicHeap(GlibcMemoryAllocator):
self._thread_cache = self.tcache_perthread_struct(thread_cache_via_config)
return self._thread_cache
elif thread_cache_via_symbol:
if pwndbg.gdblib.symbol.static_linkage_symbol_address("tcache"):
# If the symbol is static-linkage symbol, we trust it.
thread_cache_struct_addr = pwndbg.gdblib.memory.u(thread_cache_via_symbol)
self._thread_cache = self.tcache_perthread_struct(thread_cache_struct_addr)
return self._thread_cache
# Check &tcache is nearby TLS base or not to avoid false positive.
tls_base = pwndbg.gdblib.tls.address
if tls_base:
if pwndbg.gdblib.arch.current in ("x86-64", "i386"):
is_valid_address = 0 < tls_base - thread_cache_via_symbol < 0x250
else: # elif pwndbg.gdblib.arch.current in ("aarch64", "arm"):
is_valid_address = 0 < thread_cache_via_symbol - tls_base < 0x250
is_valid_address = (
is_valid_address
and thread_cache_via_symbol in pwndbg.gdblib.vmmap.find(tls_base)
)
if is_valid_address:
thread_cache_struct_addr = pwndbg.gdblib.memory.u(thread_cache_via_symbol)
if pwndbg.gdblib.tls.is_thread_local_variable(thread_cache_via_symbol):
thread_cache_struct_addr = pwndbg.gdblib.memory.u(thread_cache_via_symbol)
if pwndbg.gdblib.vmmap.find(thread_cache_struct_addr):
self._thread_cache = self.tcache_perthread_struct(thread_cache_struct_addr)
return self._thread_cache
if self.has_tcache():
# Each thread has a tcache struct, and the address of the tcache struct is stored in the TLS.
# If target is single-threaded, then the tcache struct is at the first chunk of the heap.
# We try to find the address by using mp_.srck_base + 0x10 first since it's more reliable than other methods.
if not self.multithreaded:
try:
thread_cache_struct_addr = self.get_sbrk_heap_region().vaddr + 0x10
if pwndbg.gdblib.vmmap.find(thread_cache_struct_addr):
self._thread_cache = self.tcache_perthread_struct(thread_cache_struct_addr)
return self._thread_cache
except SymbolUnresolvableError:
# mp_ is not available
pass
# Try to find tcache in TLS, so first we need to find the offset of tcache to TLS base
if not self._thread_cache_offset and pwndbg.gdblib.symbol.address("__libc_malloc"):
# TODO/FIXME: This method should be updated if we find a better way to find the target assembly code
__libc_malloc_instruction = pwndbg.disasm.near(
pwndbg.gdblib.symbol.address("__libc_malloc"), 100, show_prev_insns=False
)[10:]
# Try to find the reference to tcache in __libc_malloc, the target C code is like this:
# `if (tc_idx < mp_.tcache_bins && tcache && ......`
if pwndbg.gdblib.arch.current == "x86-64":
# Find the last `mov reg1, qword ptr [rip + disp]` before the first `mov reg2, fs:[reg1]`
# In other words, find the first __thread variable
# Each thread has a tcache struct, and the address of the tcache struct is stored in the TLS.
# Try to find tcache in TLS, so first we need to find the offset of tcache to TLS base
if not self._thread_cache_offset and pwndbg.gdblib.symbol.address("__libc_malloc"):
# TODO/FIXME: This method should be updated if we find a better way to find the target assembly code
__libc_malloc_instruction = pwndbg.disasm.near(
pwndbg.gdblib.symbol.address("__libc_malloc"), 100, show_prev_insns=False
)[10:]
# Try to find the reference to tcache in __libc_malloc, the target C code is like this:
# `if (tc_idx < mp_.tcache_bins && tcache && ......`
if pwndbg.gdblib.arch.current == "x86-64":
# Find the last `mov reg1, qword ptr [rip + disp]` before the first `mov reg2, fs:[reg1]`
# In other words, find the first __thread variable
get_offset_instruction = None
get_offset_instruction = None
for instr in __libc_malloc_instruction:
if ", qword ptr [rip +" in instr.op_str:
get_offset_instruction = instr
if ", qword ptr fs:[r" in instr.op_str:
break
for instr in __libc_malloc_instruction:
if ", qword ptr [rip +" in instr.op_str:
get_offset_instruction = instr
if ", qword ptr fs:[r" in instr.op_str:
break
if get_offset_instruction:
# rip + disp
self._thread_cache_offset = pwndbg.gdblib.memory.s64(
get_offset_instruction.next + get_offset_instruction.disp
)
elif pwndbg.gdblib.arch.current == "i386" and self.possible_page_of_symbols:
# We still need to find the first __thread variable like we did for x86-64 But the assembly code
# of i386 is a little bit unstable sometimes(idk why), there are two versions of the code:
# 1. Find the last `mov reg1, dword ptr [reg0 + disp]` before the first `mov reg2, gs:[reg1]`(disp
# is a negative value)
# 2. Find the first `mov reg1, dword ptr [reg0 + disp]` after `mov reg3,
# [reg1 + reg2]` (disp is a negative value), and reg2 is from `mov reg2, gs:[0]`
get_offset_instruction = None
find_after = False
for instr in __libc_malloc_instruction:
if (
instr.disp < 0
and instr.mnemonic == "mov"
and ", dword ptr [e" in instr.op_str
):
get_offset_instruction = instr
if find_after:
break
if ", dword ptr gs:[e" in instr.op_str:
break
elif instr.op_str.endswith("gs:[0]") and instr.mnemonic == "mov":
find_after = True
if get_offset_instruction:
# reg + disp (value of reg is the page start of the last libc page)
base_offset = self.possible_page_of_symbols.vaddr
self._thread_cache_offset = pwndbg.gdblib.memory.s32(
base_offset + get_offset_instruction.disp
)
elif pwndbg.gdblib.arch.current == "aarch64":
# The logic is the same as the previous one..
# The assembly code to access tcache is sth like:
# `mrs reg1, tpidr_el0;
# adrp reg2, #base_offset;
# ldr reg2, [reg2, #offset]
# ...
# add reg3, reg1, reg2;
# ldr reg3, [reg3, #8]`
# Or:
# `adrp reg2, #base_offset;
# mrs reg1, tpidr_el0;
# ldr reg2, [reg2, #offset]
# ...
# add reg3, reg1, reg2;
# ldr reg3, [reg3, #8]`
# , then reg3 will be &tcache
mrs_instr = next(
instr for instr in __libc_malloc_instruction if instr.mnemonic == "mrs"
if get_offset_instruction:
# rip + disp
self._thread_cache_offset = pwndbg.gdblib.memory.s64(
get_offset_instruction.next + get_offset_instruction.disp
)
min_adrp_distance = 0x1000 # just a big enough number
nearest_adrp = None
nearest_adrp_idx = 0
for i, instr in enumerate(__libc_malloc_instruction):
if (
instr.mnemonic == "adrp"
and abs(mrs_instr.address - instr.address) < min_adrp_distance
):
reg = instr.operands[0].str
nearest_adrp = instr
nearest_adrp_idx = i
min_adrp_distance = abs(mrs_instr.address - instr.address)
if instr.address - mrs_instr.address > min_adrp_distance:
elif pwndbg.gdblib.arch.current == "i386" and self.possible_page_of_symbols:
# We still need to find the first __thread variable like we did for x86-64 But the assembly code
# of i386 is a little bit unstable sometimes(idk why), there are two versions of the code:
# 1. Find the last `mov reg1, dword ptr [reg0 + disp]` before the first `mov reg2, gs:[reg1]`(disp
# is a negative value)
# 2. Find the first `mov reg1, dword ptr [reg0 + disp]` after `mov reg3,
# [reg1 + reg2]` (disp is a negative value), and reg2 is from `mov reg2, gs:[0]`
get_offset_instruction = None
find_after = False
for instr in __libc_malloc_instruction:
if (
instr.disp < 0
and instr.mnemonic == "mov"
and ", dword ptr [e" in instr.op_str
):
get_offset_instruction = instr
if find_after:
break
for instr in __libc_malloc_instruction[nearest_adrp_idx + 1 :]:
if instr.mnemonic == "ldr":
base_offset = nearest_adrp.operands[1].int
offset = instr.operands[1].mem.disp
if ", dword ptr gs:[e" in instr.op_str:
break
elif instr.op_str.endswith("gs:[0]") and instr.mnemonic == "mov":
find_after = True
if get_offset_instruction:
# reg + disp (value of reg is the page start of the last libc page)
base_offset = self.possible_page_of_symbols.vaddr
self._thread_cache_offset = pwndbg.gdblib.memory.s32(
base_offset + get_offset_instruction.disp
)
elif pwndbg.gdblib.arch.current == "aarch64":
# The logic is the same as the previous one..
# The assembly code to access tcache is sth like:
# `mrs reg1, tpidr_el0;
# adrp reg2, #base_offset;
# ldr reg2, [reg2, #offset]
# ...
# add reg3, reg1, reg2;
# ldr reg3, [reg3, #8]`
# Or:
# `adrp reg2, #base_offset;
# mrs reg1, tpidr_el0;
# ldr reg2, [reg2, #offset]
# ...
# add reg3, reg1, reg2;
# ldr reg3, [reg3, #8]`
# , then reg3 will be &tcache
mrs_instr = next(
instr for instr in __libc_malloc_instruction if instr.mnemonic == "mrs"
)
min_adrp_distance = 0x1000 # just a big enough number
nearest_adrp = None
nearest_adrp_idx = 0
for i, instr in enumerate(__libc_malloc_instruction):
if (
instr.mnemonic == "adrp"
and abs(mrs_instr.address - instr.address) < min_adrp_distance
):
reg = instr.operands[0].str
nearest_adrp = instr
nearest_adrp_idx = i
min_adrp_distance = abs(mrs_instr.address - instr.address)
if instr.address - mrs_instr.address > min_adrp_distance:
break
for instr in __libc_malloc_instruction[nearest_adrp_idx + 1 :]:
if instr.mnemonic == "ldr":
base_offset = nearest_adrp.operands[1].int
offset = instr.operands[1].mem.disp
self._thread_cache_offset = (
pwndbg.gdblib.memory.s64(base_offset + offset) + 8
)
break
elif pwndbg.gdblib.arch.current == "arm":
# We need to find something near the first `mrc 15, ......`
# The flow of assembly code will like:
# `ldr reg1, [pc, #offset];
# ...
# mrc 15, 0, reg2, cr13, cr0, {3};
# ...
# add reg1, pc;
# ldr reg1, [reg1];
# ...
# add reg1, reg2
# ...
# ldr reg3, [reg1, #4]`
# , then reg3 will be tcache address
found_mrc = False
ldr_instr = None
for instr in __libc_malloc_instruction:
if not found_mrc:
if instr.mnemonic == "mrc":
found_mrc = True
elif instr.mnemonic == "ldr":
ldr_instr = instr
else:
reg = ldr_instr.operands[0].str
if instr.mnemonic == "add" and instr.op_str == reg + ", pc":
offset = ldr_instr.operands[1].mem.disp
offset = pwndbg.gdblib.memory.s32((ldr_instr.address + 4 & -4) + offset)
self._thread_cache_offset = (
pwndbg.gdblib.memory.s64(base_offset + offset) + 8
pwndbg.gdblib.memory.s32(instr.address + 4 + offset) + 4
)
break
elif pwndbg.gdblib.arch.current == "arm":
# We need to find something near the first `mrc 15, ......`
# The flow of assembly code will like:
# `ldr reg1, [pc, #offset];
# ...
# mrc 15, 0, reg2, cr13, cr0, {3};
# ...
# add reg1, pc;
# ldr reg1, [reg1];
# ...
# add reg1, reg2
# ...
# ldr reg3, [reg1, #4]`
# , then reg3 will be tcache address
found_mrc = False
ldr_instr = None
for instr in __libc_malloc_instruction:
if not found_mrc:
if instr.mnemonic == "mrc":
found_mrc = True
elif instr.mnemonic == "ldr":
ldr_instr = instr
else:
reg = ldr_instr.operands[0].str
if instr.mnemonic == "add" and instr.op_str == reg + ", pc":
offset = ldr_instr.operands[1].mem.disp
offset = pwndbg.gdblib.memory.s32(
(ldr_instr.address + 4 & -4) + offset
)
self._thread_cache_offset = (
pwndbg.gdblib.memory.s32(instr.address + 4 + offset) + 4
)
break
# Validate the the offset we found
is_offset_valid = False
if pwndbg.gdblib.arch.current in ("x86-64", "i386"):
# The offset to tls should be a negative integer for x86/x64, but it can't be too small
# If it is too small, we find a wrong value
is_offset_valid = (
self._thread_cache_offset and -0x250 < self._thread_cache_offset < 0
)
elif pwndbg.gdblib.arch.current in ("aarch64", "arm"):
# The offset to tls should be a positive integer for aarch64, but it can't be too big
# If it is too big, we find a wrong value
is_offset_valid = (
self._thread_cache_offset and 0 < self._thread_cache_offset < 0x250
# If the offset is valid, we add the offset to TLS base to locate the tcache struct
# Note: We do a lot of checks here to make sure the offset and address we found is valid,
# so we can use our fallback if they're invalid
if self._thread_cache_offset and pwndbg.gdblib.tls.is_thread_local_variable_offset(
self._thread_cache_offset
):
tls_base = pwndbg.gdblib.tls.address
if tls_base:
thread_cache_struct_addr = pwndbg.gdblib.memory.pvoid(
tls_base + self._thread_cache_offset
)
if pwndbg.gdblib.vmmap.find(thread_cache_struct_addr):
self._thread_cache = self.tcache_perthread_struct(thread_cache_struct_addr)
return self._thread_cache
is_offset_valid = (
is_offset_valid and self._thread_cache_offset % pwndbg.gdblib.arch.ptrsize == 0
)
# If we still can't find the tcache, we guess tcache is in the first chunk of the heap
# Note: The result might be wrong if the arena is being shared by multiple threads
# And that's why we need to find the tcache address in TLS first
arena = self.thread_arena
ptr_size = pwndbg.gdblib.arch.ptrsize
# If the offset is valid, we add the offset to TLS base to locate the tcache struct
# Note: We do a lot of checks here to make sure the offset and address we found is valid,
# so we can use our fallback if they're invalid
if is_offset_valid:
tls_base = pwndbg.gdblib.tls.address
if tls_base:
thread_cache_struct_addr = pwndbg.gdblib.memory.pvoid(
tls_base + self._thread_cache_offset
)
if pwndbg.gdblib.vmmap.find(thread_cache_struct_addr):
self._thread_cache = self.tcache_perthread_struct(thread_cache_struct_addr)
return self._thread_cache
cursor = arena.active_heap.start
# If we still can't find the tcache, we guess tcache is in the first chunk of the heap
# Note: The result might be wrong if the arena is being shared by multiple threads
# And that's why we need to find the tcache address in TLS first
arena = self.thread_arena
ptr_size = pwndbg.gdblib.arch.ptrsize
# i686 alignment heuristic
first_chunk_size = pwndbg.gdblib.arch.unpack(
pwndbg.gdblib.memory.read(cursor + ptr_size, ptr_size)
)
if first_chunk_size == 0:
cursor += ptr_size * 2
cursor = arena.active_heap.start
self._thread_cache = self.tcache_perthread_struct(cursor + ptr_size * 2)
# i686 alignment heuristic
first_chunk_size = pwndbg.gdblib.arch.unpack(
pwndbg.gdblib.memory.read(cursor + ptr_size, ptr_size)
)
if first_chunk_size == 0:
cursor += ptr_size * 2
self._thread_cache = self.tcache_perthread_struct(cursor + ptr_size * 2)
return self._thread_cache
print(message.warn("This version of GLIBC was not compiled with tcache support."))
return None
return self._thread_cache
@property
def mp(self):

View File

@ -1,4 +1,5 @@
import gdb
import pytest
import pwndbg
import pwndbg.gdblib.arch
@ -260,9 +261,13 @@ def test_main_arena_heuristic(start_binary):
pwndbg.heap.current = type(pwndbg.heap.current)() # Reset the heap object of pwndbg
# Level 3: We check we can get the address of `main_arena` by parsing the memory
with mock_for_heuristic(mock_all=True):
# Check the address of `main_arena` is correct
assert pwndbg.heap.current.main_arena.address == main_arena_addr_via_debug_symbol
for _ in range(2):
with mock_for_heuristic(mock_all=True):
# Check the address of `main_arena` is correct
assert pwndbg.heap.current.main_arena.address == main_arena_addr_via_debug_symbol
# Check if it works when there's more than one arena
gdb.execute("continue")
assert gdb.selected_thread().num == 2
def test_mp_heuristic(start_binary):
@ -328,12 +333,18 @@ def test_global_max_fast_heuristic(start_binary):
assert pwndbg.heap.current._global_max_fast_addr == global_max_fast_addr_via_debug_symbol
def test_thread_cache_heuristic(start_binary):
@pytest.mark.parametrize(
"is_multi_threaded", [False, True], ids=["single-threaded", "multi-threaded"]
)
def test_thread_cache_heuristic(start_binary, is_multi_threaded):
# TODO: Support other architectures or different libc versions
start_binary(HEAP_MALLOC_CHUNK)
gdb.execute("set resolve-heap-via-heuristic force")
gdb.execute("break break_here")
gdb.execute("continue")
if is_multi_threaded:
gdb.execute("continue")
assert gdb.selected_thread().num == 2
# Use the debug symbol to find the address of `thread_cache`
tcache_addr_via_debug_symbol = pwndbg.gdblib.symbol.static_linkage_symbol_address(
@ -363,12 +374,18 @@ def test_thread_cache_heuristic(start_binary):
assert pwndbg.heap.current.thread_cache.address == thread_cache_addr_via_debug_symbol
def test_thread_arena_heuristic(start_binary):
@pytest.mark.parametrize(
"is_multi_threaded", [False, True], ids=["single-threaded", "multi-threaded"]
)
def test_thread_arena_heuristic(start_binary, is_multi_threaded):
# TODO: Support other architectures or different libc versions
start_binary(HEAP_MALLOC_CHUNK)
gdb.execute("set resolve-heap-via-heuristic force")
gdb.execute("break break_here")
gdb.execute("continue")
if is_multi_threaded:
gdb.execute("continue")
assert gdb.selected_thread().num == 2
# Use the debug symbol to find the value of `thread_arena`
thread_arena_via_debug_symbol = pwndbg.gdblib.symbol.static_linkage_symbol_address(
@ -392,12 +409,18 @@ def test_thread_arena_heuristic(start_binary):
assert pwndbg.heap.current.thread_arena.address == thread_arena_via_debug_symbol
def test_heuristic_fail_gracefully(start_binary):
@pytest.mark.parametrize(
"is_multi_threaded", [False, True], ids=["single-threaded", "multi-threaded"]
)
def test_heuristic_fail_gracefully(start_binary, is_multi_threaded):
# TODO: Support other architectures or different libc versions
start_binary(HEAP_MALLOC_CHUNK)
gdb.execute("set resolve-heap-via-heuristic force")
gdb.execute("break break_here")
gdb.execute("continue")
if is_multi_threaded:
gdb.execute("continue")
assert gdb.selected_thread().num == 2
def _test_heuristic_fail_gracefully(name):
try: