"""
Module with usefil extensions to GDB's default Python functionalities.
"""
from __future__ import print_function
import re, sys
try:
import pysigset # https://pypi.python.org/pypi/pysigset/
except ImportError as e:
pysigset = None
import gdb
from .python_utils import *
from .target import my_access
PY3 = sys.version[0] == '3'
# keep it after python_utils import
import logging; log = logging.getLogger(__name__)
log_user = logging.getLogger("mcgdb.log.user")
##########################################
## GDB command wrappers and extensions ##
##########################################
[docs]def set_prompt_hook(prompt):
old_prompt = gdb.prompt_hook
def a_prompt(current):
prompt()
gdb.prompt_hook = old_prompt
return old_prompt(current)
gdb.prompt_hook = a_prompt
[docs]def up_before_prompt():
def up():
gdb.execute("up", to_string=True)
before_prompt(up)
[docs]def before_prompt(fct):
old_prompt = gdb.prompt_hook
def prompt(prompt):
try:
fct()
except Exception as e:
log.warn("Before prompt function {} failed: {}".format(fct, e))
log.info(e)
gdb.prompt_hook = old_prompt
return old_prompt(prompt)
gdb.prompt_hook = prompt
[docs]def blocks_equal(a, b):
return a is b or a.start == b.start and a.end == b.end
[docs]def addr2num(addr):
try:
return int(addr) # Python 3
except:
return long(addr) # Python 2
[docs]def oldest_frame():
frame = gdb.selected_frame()
while frame.older(): frame = frame.older()
return frame
[docs]def address_to_variable(addr, frame):
LOOK_N_BITS_AFTER_POINTER = 256
blocks_to_check = []
blocks_checked = []
try:
blocks_to_check.append(frame.block())
except RuntimeError as e:
log.info("address_to_variable: {}".format(e))
return # cannot access frame.block()
closest = None, 9999 # symb, offset
while blocks_to_check:
to_check = blocks_to_check.pop(0)
if not to_check:
continue
# if already checked
if [b for b in blocks_checked if blocks_equal(b, to_check)]:
continue
blocks_checked.append(to_check)
for symb in to_check:
try:
if symb.addr_class == gdb.SYMBOL_LOC_COMPUTED:
if "jj" in str(symb) or "ii" in str(symb):
#log.warn("skip symbol {} because it is computed".format(symb))
#frame.select()
#continue
pass
pass
val = symb.value(frame)
if val.is_optimized_out: continue
except TypeError: continue
except gdb.error: continue
if val.address is None: continue
typ = symb.type
try:
start_addr = addr2num(val.address)
except gdb.error:
continue
if typ.code == gdb.TYPE_CODE_PTR and addr != start_addr:
# is it pointed by type ?
start_addr = addr2num(val)
stop_addr = start_addr + LOOK_N_BITS_AFTER_POINTER
else: # primitive, struct or array
stop_addr = start_addr + int(typ.sizeof)
if start_addr <= addr < stop_addr:
if addr-start_addr != 0:
offset = addr-start_addr
try:
offset = int(offset/primitive_type(typ).sizeof)
if offset < closest[1]:
closest = symb, offset
except RuntimeError as e: # Type does not have a target.
offset *= -1
print(e)
else:
offset = 0
return symb, offset
blocks_to_check.append(to_check.global_block)
blocks_to_check.append(to_check.static_block)
blocks_to_check.append(to_check.superblock)
symb, offset = closest
if symb is not None:
if offset > LOOK_N_BITS_AFTER_POINTER/2:
log.warn("Found {} with offset={}".format(symb, offset))
return closest
return None
first_reprint = True
[docs]def reprint_prompt():
prompt = gdb.prompt_hook
def gdb_prompt(ignored=None):
return gdb.parameter("prompt")
if prompt is None:
prompt = gdb_prompt
global first_reprint
if first_reprint:
log.warning("[Press arrow/any key to show the prompt.]")
first_reprint = False
print(prompt(gdb_prompt()), end="")
[docs]def get_current_executable():
"""
:returns: the name of the current executable, if any.
:rtype: str
"""
exec_name = " ".join(gdb.execute("info inferior %d" %
gdb.selected_inferior().num, to_string=True).split("\n")[1].split()[3:])
return exec_name if exec_name else None
[docs]def try_to_stop():
"""
Tries to stop the thread currently selected.
Trick: creates another inferior to trick GDB, and gets rid of it.
"""
assert gdb.selected_thread().is_running()
cur_inf_thr = current_infthr()
sleep = my_access.Popen(['sleep', '1000'])
sleeper_inf = attach(sleep.pid, "internal", silent=True)
if sleeper_inf is None:
error("couldn't attach to internal 'sleeper' process")
kill_current_inferior()
switch_to_infthr(cur_inf_thr, silent=True)
gdb.execute("remove-inferiors %d" % sleeper_inf.num, to_string=True)
assert not gdb.selected_thread().is_running()
[docs]def safe_finish(to_string=True):
"""
Tries to stop the selected thread first of al, then execute GDB's finish command.
:returns: the output of the `finish` command.
"""
if gdb.selected_thread().is_running():
try_to_stop()
return gdb.execute("finish", to_string=to_string)
[docs]def catch(what):
ret = gdb.execute("catch exec", to_string=True)
try:
catchline = re.match ("Catchpoint (\d+) \(%s\)" % what, ret).group(1)
return int(catchline)
except ValueError as e:
error("couldn't create a new catchpoint for '%s): %s" %
(what, catchline))
@deprecated
[docs]def delete_bp(num):
"""
Wrapper around GDB's `delete` command.
Use `Breakpoint.delete` instead.
"""
gdb.execute("delete %s" % num)
[docs]def parse_newprocess(where):
"""
Parses the PID of a new process in `where`.
:param where: GDB process creation string
:raises gdb.Error: if the `where` doesn't contain the GDB's process creation pattern.
:returns: the PID found
:rtype: int
"""
try:
match = re.match (".*\[New process (\d+)\].*", where)
return match.group(1) if match else None
except Exception as e:
error("couldn't parse new process PID '%s): %s" % (where, e))
[docs]def num_to_inf_obj(num):
"""
:returns: the inferior whose identifier is `num`, None otherwise.
:rtype: gdb.Inferior
"""
for inf in gdb.inferiors():
if int(num) == inf.num:
return inf
return None
[docs]def pid_to_inf_obj(pid):
"""
:returns: the inferior whose PID is `pid`, None otherwise.
:rtype: gdb.Inferior
"""
for inf in gdb.inferiors():
if int(pid) == inf.pid:
return inf
return None
late_exec = []
[docs]def late_function_breakpoint_execution(fct=None):
if fct:
late_exec.append(fct)
else:
ret = []
ret[:] = late_exec
late_exec[:] = []
return ret
[docs]def print_and_execute(command, to_string=False):
"""
Logs the `command` and pass it to GDB for execution.
:param to_string: passed to `gdb.execute`. Default: False
:returns: The execution string if `to_string` was True, None otherwise.
"""
log.info(command)
return gdb.execute(command, to_string=to_string)
FCT_VALUE = "(\S+) <(\S+)>"; FCT_VALUE_ADDR = 1; FCT_VALUE_NAME = 2
[docs]def value_to_methodNameAddr(meth_value):
"""
Parses the Name and Address from GDB's print string.
:param meth_value: the string to parse, under the pattern '{}'
:returns: #1 the function name if the parsing succeeded.
:returns: #2 the function address if the parsing succeeded.
:returns: (None, None) otherwise.
""".format(FCT_VALUE)
exp = re.match(FCT_VALUE, str(meth_value))
if exp is None:
return (None, None)
return exp.group(FCT_VALUE_NAME), exp.group(FCT_VALUE_ADDR)
[docs]def get_confirm():
"""
Wrapper around `gdb.parameter("confirm")`
"""
return gdb.parameter("confirm")
[docs]def set_confirm(val):
"""
Sets GDB `confirm` parameter to `val`.
"""
gdb.execute("set confirm %s" % (val and "on" or "off"))
[docs]def get_env(varname):
env = gdb.execute("show environment {}".format(varname), to_string=True)[:-1]
"""Environment variable "TOTO" not defined."""
if env.endswith(" not defined."):
return None
"""LINES = 67"""
return env.partition(" = ")[-1]
[docs]def set_env(varname, value):
gdb.execute("set environment {} {}".format(varname, value))
[docs]def extend_env(varname, value, join=":"):
current_val = get_env(varname)
if current_val and \
((value+join) in current_val
or current_val.endswith(value)):
return current_val
new_val = ("{}{}".format(current_val, join) if current_val else "") \
+ value
set_env(varname, new_val)
return new_val
[docs]def start_or_continue(to_string=False):
cmd = "continue" if gdb.inferiors() and gdb.inferiors()[0].pid != 0 \
else "run"
return gdb.execute(cmd, to_string)
[docs]def exec_on_sharedlibrary(to_call, soname):
libs = gdb.execute("info sharedlibrary", to_string=True)
# check if library is already loaded *and read*.
for line in libs.split("\n"):
if not line:
continue
try:
line = line.split()
except Exception as e:
log.warning("Cannot parse shared library line: {} ({})".format(line, e))
continue
if soname in line[-1] and "Yes" in line[2]:
to_call()
return True
def exec_if_soname(event):
if soname == "*" or (event.new_objfile and soname in event.new_objfile.filename):
to_call()
gdb.events.new_objfile.disconnect(exec_if_soname)
return
gdb.events.new_objfile.connect(exec_if_soname)
return False
[docs]def get_function_fname_and_lines(fct_symb):
fct_addr = addr2num(fct_symb.value().address)
disa = gdb.execute("disassemble {}".format(fct_addr), to_string=True)
filename = fct_symb.symtab.filename
from_line = fct_symb.line
to_line = 0
for disa_line in disa.split("\n"):
if "Dump of assembler code" in disa_line:
continue # skip first line
if "End of assembler dump." in disa_line:
break # we're at the end
try:
# parse the PC value
# => 0x00000000004009c1 <+32>: jmpq 0x401464 <main._omp_fn.0+2755>
pc = int(disa_line.replace("=>", "").split()[0], 16)
except:
log.warning("Could not parse disassembly line ...")
log.warning(disa_line)
continue
sal = gdb.find_pc_line(pc)
if not sal:
continue # hum, nothing known that that PC
# check for consistency that PC is in the right file
if not sal.symtab.filename == fct_symb.symtab.filename:
log.info("not the right file, inlined ?")
continue
# if function symbol doesn't specify its line
if fct_symb.line == 0:
if from_line == 0 or sal.line < from_line:
from_line = sal.line
# PCs may not be in order
if sal.line > to_line:
to_line = sal.line
return filename, from_line, to_line
[docs]def get_src(filename, lstart, lstop):
return gdb.execute("list {}:{}, {}:{}".format(filename, lstart, filename, lstop),
to_string=True)
###########################
## Breakpoint extensions ##
###########################
@internal
[docs]class StrongNextBreakpoint(gdb.Breakpoint):
"""
Breakpoint extension used to implent `strong_next` command.
:param keyword: optional description (mnemonic) of the strong_next startpoint.
:type keyword: str
"""
def __init__(self, spec, keyword=None):
gdb.Breakpoint.__init__(self, spec, internal = 1)
self.keyword = keyword
self.silent = True
[docs] def stop(self):
if keyword is not None:
log.info("[Strong next finished{}]".format(" ({})".format(self.keyword) if self.keyword else ""))
gdb.post_event(self.delete)
return True
@internal
[docs]def strong_next(keyword=None):
"""
Performes a "strong next" on the currently selected inferior:
steps forward, and if we're one step deeper in the callstack,
sets a `StrongNextBreakpoint` on the return point of the frame.
This function is usefull the `next` commands goes over a runtime
function call that switches the current user-level thread. As GDB
is not aware of that, it gets lost in its `next` and the execution
never stops again.
Set it with command `sn` (`communication.cli.my_gdb.cmd_sn`).
:param keyword: A message to be displayed when the strop next finishes.
:returns: True if a simple `step` was not enough to complete the `next` command.
"""
start_frame = gdb.newest_frame()
gdb.execute("step")
new_frame = gdb.newest_frame()
if start_frame == new_frame:
#log.warning("Nothing special to do over here ...")
return False
StrongNextBreakpoint("*%d" % start_frame.pc(), keyword)
#log.info("please continue manually")
gdb.execute("continue")
return True
[docs]class SiblingBreakpoint(gdb.Breakpoint):
"""
Copy of breakpoint `referent` in another program space.
Registers itself in `referent.siblings` list.
:param referent: the original breakpoint.
:type referent: AllInfBreakpoint
:var inf_id: the inferior in which this breakpoint was set.
"""
def __init__(self, referent):
assert referent is not None
gdb.Breakpoint.__init__(self, referent.location, internal=True)
self.silent = True
self.referent = referent
self.inf_id = gdb.selected_inferior().num
self.referent.siblings[self.inf_id] = self
[docs] def stop(self):
"""
Overides gdb.Breakpoint.stop.
:returns: False is the current inferior is not `self.inf_id`
:returns: Delegates to `referent.stop()`
"""
if self.referent.siblings[gdb.selected_inferior().num] != self:
log.warning("Sibling breakpoint hit in the wrong inferior")
return False
try:
return self.referent.stop()
except AttributeError:
return True
def __del__(self):
"""
Removes itself from `referent.siblings` list.
"""
del self.referent.siblings[self.inf_id]
[docs]class AllInfBreakpoint(gdb.Breakpoint):
"""
Subclass of `gdb.Breakpoint` that spreads itself to all the existing and future inferiors.
Set them with command `break_spread` (`communication.cli.my_gdb.cmd_AllInfBreakpoint`).
"""
breakpoints = []
def __init__(self, spec, internal=True):
gdb.Breakpoint.__init__(self, spec, internal=internal)
self.silent = True
inf_id = gdb.selected_inferior().num
self.siblings = {inf_id: self}
self.__class__.breakpoints.append(self)
def __del__(self):
for sibling in self.siblings.values():
if sibling != self:
sibling.delete()
@staticmethod
[docs] def handler_new_objfile (event):
"""
To connect to `gdb.events.new_objfile`.
Spreads AllInfBreakpoints to new interiors.
"""
AllInfBreakpoint.spread_to_other_inferiors()
@staticmethod
[docs] def spread_to_other_inferiors ():
"""
Tries to spread all the AllInfBreakpoint to new inferiors,
and remove the dead ones from the lists.
"""
tp_remove = []
for bp in AllInfBreakpoint.breakpoints:
if not bp.is_valid():
tp_remove.append(bp)
continue
inf_id = gdb.selected_inferior().num
if inf_id in bp.siblings and bp.siblings[inf_id] is not None:
continue
log.info("spread to %d" % inf_id)
# reached if there is no sibling for this inferior
log.info("Create '%s' sibling on inf. %d", bp.location, inf_id)
SiblingBreakpoint(bp)
for bp in tp_remove:
AllInfBreakpoint.breakpoints.remove(bp)
######################
## Prompt extension ##
######################
@internal
[docs]def defaultPrompt(old):
"""
Returns GDB's default prompt: (gdb)
:param old: unused
"""
return "(gdb) "
@internal
[docs]class Prompt:
"""
Extended prompt.
:var hooks: set of callable, triggered before returning the local prompt.
:var prompts: set of callable, applied recursively (`next_prompt(previous)`) to compte the prompt displayed by GDB.
:param old_prompt: the inner most prompt, (eg `(gdb)` or None)
"""
def __init__(self, old_prompt):
self.hooks = set()
self.prompts = []
self.pushPrompt(defaultPrompt if old_prompt is None else old_prompt)
def __call__(self, old_prompt):
for hook in self.hooks:
hook()
previous = old_prompt
for prompt in self.prompts:
ret = prompt(previous)
if ret is not None:
previous = ret
return previous
[docs] def registerHook(self, hook):
self.hooks.add(hook)
[docs] def unregisterHook(self, hook):
self.hooks.remove(hook)
[docs] def pushPrompt(self, prompt):
self.prompts.append(prompt)
[docs] def popPrompt(self, prompt):
self.prompts.remove(prompt)
#################################
## Task Manager implementation ##
#################################
[docs]def current_infthr():
"""
Returns GDB's currently selected inferior+thread.
List them with command `info infthreads` (`communication.cli.my_gdb.cmd_info_infthread`).
:returns: #1 GDB's selected inferior
:returns: #2 GDB's selected thread
"""
return (gdb.selected_inferior(), gdb.selected_thread())
[docs]def switch_to_infthr(infthr, silent=True):
"""
Switches to another inferior+thread.
:param infthr: the inferior+thread to switch to.
:param silent: print information on screen or not. Default: True
:returns: True if we did the switch.
"""
thr = None
if infthr[1] is not None and infthr[1].is_valid():
infthr[1].switch()
inf = gdb.execute ("inferior %s" % infthr[0].num, to_string=True)
inf = inf.partition("\n")[0]
if not silent:
log_user.info("Switched to inferior {}".format(inf))
if thr is not None:
log_user.info("Switched to thread {}".format(thr))
return True
[docs]def switch_to_inferior(inferior, silent=True):
"""
Switches to another inferior.
:param inferior: the inferior to switch to.
:param silent: print information on screen or not. Default: True
:returns: True if we did the switch.
"""
if inferior is None:
return False
gdb.execute ("inferior %s" % inferior.num, to_string=silent)
return True
[docs]def new_inferior(silent=False):
"""
Wrapper around GDB's `add-inferior` command.
:param silent: print information on screen or not. Default: True
:returns: the newly created inferior object, or None if it failed.
:rtype: gdb.Inferior
"""
try:
inf = gdb.execute ("add-inferior", to_string=silent)
inf_id = re.match ("Added inferior (\d+)", inf).group(1)
return num_to_inf_obj(inf_id)
except gdb.error as exc:
log.warning("couldn't create a new inferior ... ('%s'): %s", inf, exc)
return None
[docs]def remove_inferior(inf, silent=True):
"""
Wrapper around GDB's `add-inferior` command.
:param silent: print information on screen or not. Default: True
:returns: the GDB's remove string if silent was True.
"""
return gdb.execute ("remove-inferiors %s" % inf.num, to_string=silent)
[docs]def kill_current_inferior(silent=True):
"""
Wrapper around GDB's `kill` command.
:param silent: print information on screen or not. Default: True
:returns: the GDB's kill string if silent was True.
"""
return gdb.execute("kill", to_string=silent)
[docs]def attach(pid, name=None, comeback=False, silent=False):
"""
Tries to attach GDB to `pid` process.
:param comeback: if true, switch back to the initial inferior. Otherwise, stay on the newly attached inferior.
:param silent: print information on screen or not. Default: False.
:returns: the newly attached inferior object, or None if it failed.
:rtype: gdb.Inferior
"""
inf = pid_to_inf_obj(pid)
if inf is not None:
if not comeback:
switch_to_inferior(inf)
return inf
cur_inf_thr = current_infthr()
# use current inf. only if not used
# AND no execfile has been set
if (gdb.selected_inferior().pid == 0
and get_current_executable() == None):
inf = gdb.selected_inferior()
comeback = False
else:
#create and connect the new inferior
inf = new_inferior(silent=True)
switch_to_inferior(inf)
try:
my_access.do_attach(pid)
except gdb.error as exc:
switch_to_infthr(cur_inf_thr, silent=True)
remove_inferior(inf)
if not silent:
log.warning("[Couldn't attach to %s%s: %s]",
pid, name is not None and "/" + name or "", exc)
return None
if not silent:
log.info("[New Process %s]", (pid))
if comeback:
switch_to_infthr(cur_inf_thr, silent=True)
return pid_to_inf_obj(pid)
[docs]def set_parameter(name, value):
class temp:
def __init__(self):
self.old = None
def __enter__(self):
self.old = gdb.parameter(name)
if self.old is True: self.old = "on"
if self.old is False: self.old = "off"
# otherwise don't change
gdb.execute("set {} {}".format(name, value), to_string=True)
def __exit__(self, type, value, traceback):
gdb.execute("set {} {}".format(name, self.old), to_string=True)
return temp()
[docs]def active_thread(thread):
class temp:
def __init__(self):
self.selected = gdb.selected_thread()
def __enter__(self):
thread.switch()
def __exit__(self, type, value, traceback):
self.selected.switch()
return temp()
[docs]def main_thread(inferior=None):
if inferior is None:
inferior = gdb.selected_inferior()
return [th for th in inferior.threads() if th.num == 1][0]
[docs]def attach_all(pids):
"""
Tries to attach GDB to the processes described by [pids].
:param pids: list of PIDs
:type pids: list
:returns: the newly attached inferior objects, or [] if all failed.
:rtype: list(gdb.Inferior)
"""
if not pids:
log.info("Pid list is empty")
return [attach(pid) for pid in pids]
@internal
[docs]class DefaultTaskManager:
"""
Default task manager implementation, simply relying on GDB's inferior+threads.
"""
[docs] def get_selected_task(self):
"""
:returns: the current infthr.
"""
return current_infthr()
[docs] def get_mark(self, task):
"""
:returns: "*" if infthr `task` is currently selected or " "
"""
return "*" if self.is_selected_task(task) else " "
[docs] def is_selected_task(self, task):
"""
:returns: True if infthr `task` is valid and currently selected.
"""
return (task is not None
and task[1] is not None
and task[1].is_valid()
and task[1] == gdb.selected_thread())
[docs] def switch_to(self, task):
"""
Tries to switch to infthr `task`.
:returns: True if it succeeded, False otherwise.
"""
return switch_to_infthr(task)
################################
## Signals and thread safety ###
################################
def __empty_zone():
def __enter__():
pass
def __exit__(*args, **kwargs):
pass
notified = False
[docs]def may_start_threads():
global notified
if pysigset is None and not notified:
log.error("Could not load PySigSet")
log.error("Expect to encounter a GDB deadlock sooner or later ...")
notified = True
if pysigset:
return pysigset.suspended_signals(signal.SIGCHLD)
else:
return __empty_zone()
import signal
import sys
[docs]def signal_handler(signal, frame):
"""
Handler for C^c process interuption.
Pushes a stop request in mcGDB.
Connect it with `signal.signal(signal.SIGINT, signal_handler)`.
:param signal: not used
:param frame: not used
"""
log_user.info ('### You pressed Ctrl+C! ### %s')
push_stop_request('You pressed Ctrl+C!')
######################
[docs]def primitive_type(typ):
try:
while True:
typ = typ.target()
except RuntimeError: #Type does not have a target.
return typ
@internal
[docs]def find_type(orig, name):
typ = orig.strip_typedefs()
while True:
search = str(typ) + '::' + name
try:
return gdb.lookup_type(search)
except RuntimeError:
pass
# The type was not found, so try the superclass. We only need
# to check the first superclass, so we don't bother with
# anything fancier here.
field = typ.fields()[0]
if not field.is_base_class:
raise ValueError("Cannot find type %s::%s" % (str(orig), name))
typ = field.type
###########################################################################
@internal
[docs]def initialize():
#gdb.events.new_objfile.connect(AllInfBreakpoint.handler_new_objfile)
log.warn("Signal handler NOT registered (commented out)")
#signal.signal(signal.SIGINT, signal_handler)