Source code for mcgdb.toolbox.my_gdb

"""
Module with usefil extensions to GDB's default Python functionalities.
"""
from __future__ import print_function
import re, sys

try:
    import pysigset # https://pypi.python.org/pypi/pysigset/
except ImportError as e:
    pysigset = None

import gdb
from .python_utils import *
from .target import my_access

PY3 = sys.version[0] == '3'

# keep it after python_utils import
import logging; log = logging.getLogger(__name__)
log_user = logging.getLogger("mcgdb.log.user")

##########################################
## GDB command wrappers and extensions ##
##########################################

[docs]def set_prompt_hook(prompt): old_prompt = gdb.prompt_hook def a_prompt(current): prompt() gdb.prompt_hook = old_prompt return old_prompt(current) gdb.prompt_hook = a_prompt
[docs]def up_before_prompt(): def up(): gdb.execute("up", to_string=True) before_prompt(up)
[docs]def before_prompt(fct): old_prompt = gdb.prompt_hook def prompt(prompt): try: fct() except Exception as e: log.warn("Before prompt function {} failed: {}".format(fct, e)) log.info(e) gdb.prompt_hook = old_prompt return old_prompt(prompt) gdb.prompt_hook = prompt
[docs]def blocks_equal(a, b): return a is b or a.start == b.start and a.end == b.end
[docs]def addr2num(addr): try: return int(addr) # Python 3 except: return long(addr) # Python 2
[docs]def oldest_frame(): frame = gdb.selected_frame() while frame.older(): frame = frame.older() return frame
[docs]def address_to_variable(addr, frame): LOOK_N_BITS_AFTER_POINTER = 256 blocks_to_check = [] blocks_checked = [] try: blocks_to_check.append(frame.block()) except RuntimeError as e: log.info("address_to_variable: {}".format(e)) return # cannot access frame.block() closest = None, 9999 # symb, offset while blocks_to_check: to_check = blocks_to_check.pop(0) if not to_check: continue # if already checked if [b for b in blocks_checked if blocks_equal(b, to_check)]: continue blocks_checked.append(to_check) for symb in to_check: try: if symb.addr_class == gdb.SYMBOL_LOC_COMPUTED: if "jj" in str(symb) or "ii" in str(symb): #log.warn("skip symbol {} because it is computed".format(symb)) #frame.select() #continue pass pass val = symb.value(frame) if val.is_optimized_out: continue except TypeError: continue except gdb.error: continue if val.address is None: continue typ = symb.type try: start_addr = addr2num(val.address) except gdb.error: continue if typ.code == gdb.TYPE_CODE_PTR and addr != start_addr: # is it pointed by type ? start_addr = addr2num(val) stop_addr = start_addr + LOOK_N_BITS_AFTER_POINTER else: # primitive, struct or array stop_addr = start_addr + int(typ.sizeof) if start_addr <= addr < stop_addr: if addr-start_addr != 0: offset = addr-start_addr try: offset = int(offset/primitive_type(typ).sizeof) if offset < closest[1]: closest = symb, offset except RuntimeError as e: # Type does not have a target. offset *= -1 print(e) else: offset = 0 return symb, offset blocks_to_check.append(to_check.global_block) blocks_to_check.append(to_check.static_block) blocks_to_check.append(to_check.superblock) symb, offset = closest if symb is not None: if offset > LOOK_N_BITS_AFTER_POINTER/2: log.warn("Found {} with offset={}".format(symb, offset)) return closest return None
first_reprint = True
[docs]def reprint_prompt(): prompt = gdb.prompt_hook def gdb_prompt(ignored=None): return gdb.parameter("prompt") if prompt is None: prompt = gdb_prompt global first_reprint if first_reprint: log.warning("[Press arrow/any key to show the prompt.]") first_reprint = False print(prompt(gdb_prompt()), end="")
[docs]def get_current_executable(): """ :returns: the name of the current executable, if any. :rtype: str """ exec_name = " ".join(gdb.execute("info inferior %d" % gdb.selected_inferior().num, to_string=True).split("\n")[1].split()[3:]) return exec_name if exec_name else None
[docs]def try_to_stop(): """ Tries to stop the thread currently selected. Trick: creates another inferior to trick GDB, and gets rid of it. """ assert gdb.selected_thread().is_running() cur_inf_thr = current_infthr() sleep = my_access.Popen(['sleep', '1000']) sleeper_inf = attach(sleep.pid, "internal", silent=True) if sleeper_inf is None: error("couldn't attach to internal 'sleeper' process") kill_current_inferior() switch_to_infthr(cur_inf_thr, silent=True) gdb.execute("remove-inferiors %d" % sleeper_inf.num, to_string=True) assert not gdb.selected_thread().is_running()
[docs]def safe_finish(to_string=True): """ Tries to stop the selected thread first of al, then execute GDB's finish command. :returns: the output of the `finish` command. """ if gdb.selected_thread().is_running(): try_to_stop() return gdb.execute("finish", to_string=to_string)
[docs]def catch(what): ret = gdb.execute("catch exec", to_string=True) try: catchline = re.match ("Catchpoint (\d+) \(%s\)" % what, ret).group(1) return int(catchline) except ValueError as e: error("couldn't create a new catchpoint for '%s): %s" % (what, catchline))
@deprecated
[docs]def delete_bp(num): """ Wrapper around GDB's `delete` command. Use `Breakpoint.delete` instead. """ gdb.execute("delete %s" % num)
[docs]def parse_newprocess(where): """ Parses the PID of a new process in `where`. :param where: GDB process creation string :raises gdb.Error: if the `where` doesn't contain the GDB's process creation pattern. :returns: the PID found :rtype: int """ try: match = re.match (".*\[New process (\d+)\].*", where) return match.group(1) if match else None except Exception as e: error("couldn't parse new process PID '%s): %s" % (where, e))
[docs]def num_to_inf_obj(num): """ :returns: the inferior whose identifier is `num`, None otherwise. :rtype: gdb.Inferior """ for inf in gdb.inferiors(): if int(num) == inf.num: return inf return None
[docs]def pid_to_inf_obj(pid): """ :returns: the inferior whose PID is `pid`, None otherwise. :rtype: gdb.Inferior """ for inf in gdb.inferiors(): if int(pid) == inf.pid: return inf return None
late_exec = []
[docs]def late_function_breakpoint_execution(fct=None): if fct: late_exec.append(fct) else: ret = [] ret[:] = late_exec late_exec[:] = [] return ret
FCT_VALUE = "(\S+) <(\S+)>"; FCT_VALUE_ADDR = 1; FCT_VALUE_NAME = 2
[docs]def value_to_methodNameAddr(meth_value): """ Parses the Name and Address from GDB's print string. :param meth_value: the string to parse, under the pattern '{}' :returns: #1 the function name if the parsing succeeded. :returns: #2 the function address if the parsing succeeded. :returns: (None, None) otherwise. """.format(FCT_VALUE) exp = re.match(FCT_VALUE, str(meth_value)) if exp is None: return (None, None) return exp.group(FCT_VALUE_NAME), exp.group(FCT_VALUE_ADDR)
[docs]def get_confirm(): """ Wrapper around `gdb.parameter("confirm")` """ return gdb.parameter("confirm")
[docs]def set_confirm(val): """ Sets GDB `confirm` parameter to `val`. """ gdb.execute("set confirm %s" % (val and "on" or "off"))
[docs]def get_env(varname): env = gdb.execute("show environment {}".format(varname), to_string=True)[:-1] """Environment variable "TOTO" not defined.""" if env.endswith(" not defined."): return None """LINES = 67""" return env.partition(" = ")[-1]
[docs]def set_env(varname, value): gdb.execute("set environment {} {}".format(varname, value))
[docs]def extend_env(varname, value, join=":"): current_val = get_env(varname) if current_val and \ ((value+join) in current_val or current_val.endswith(value)): return current_val new_val = ("{}{}".format(current_val, join) if current_val else "") \ + value set_env(varname, new_val) return new_val
[docs]def start_or_continue(to_string=False): cmd = "continue" if gdb.inferiors() and gdb.inferiors()[0].pid != 0 \ else "run" return gdb.execute(cmd, to_string)
[docs]def exec_on_sharedlibrary(to_call, soname): libs = gdb.execute("info sharedlibrary", to_string=True) # check if library is already loaded *and read*. for line in libs.split("\n"): if not line: continue try: line = line.split() except Exception as e: log.warning("Cannot parse shared library line: {} ({})".format(line, e)) continue if soname in line[-1] and "Yes" in line[2]: to_call() return True def exec_if_soname(event): if soname == "*" or (event.new_objfile and soname in event.new_objfile.filename): to_call() gdb.events.new_objfile.disconnect(exec_if_soname) return gdb.events.new_objfile.connect(exec_if_soname) return False
[docs]def get_function_fname_and_lines(fct_symb): fct_addr = addr2num(fct_symb.value().address) disa = gdb.execute("disassemble {}".format(fct_addr), to_string=True) filename = fct_symb.symtab.filename from_line = fct_symb.line to_line = 0 for disa_line in disa.split("\n"): if "Dump of assembler code" in disa_line: continue # skip first line if "End of assembler dump." in disa_line: break # we're at the end try: # parse the PC value # => 0x00000000004009c1 <+32>: jmpq 0x401464 <main._omp_fn.0+2755> pc = int(disa_line.replace("=>", "").split()[0], 16) except: log.warning("Could not parse disassembly line ...") log.warning(disa_line) continue sal = gdb.find_pc_line(pc) if not sal: continue # hum, nothing known that that PC # check for consistency that PC is in the right file if not sal.symtab.filename == fct_symb.symtab.filename: log.info("not the right file, inlined ?") continue # if function symbol doesn't specify its line if fct_symb.line == 0: if from_line == 0 or sal.line < from_line: from_line = sal.line # PCs may not be in order if sal.line > to_line: to_line = sal.line return filename, from_line, to_line
[docs]def get_src(filename, lstart, lstop): return gdb.execute("list {}:{}, {}:{}".format(filename, lstart, filename, lstop), to_string=True)
########################### ## Breakpoint extensions ## ########################### @internal
[docs]class StrongNextBreakpoint(gdb.Breakpoint): """ Breakpoint extension used to implent `strong_next` command. :param keyword: optional description (mnemonic) of the strong_next startpoint. :type keyword: str """ def __init__(self, spec, keyword=None): gdb.Breakpoint.__init__(self, spec, internal = 1) self.keyword = keyword self.silent = True
[docs] def stop(self): if keyword is not None: log.info("[Strong next finished{}]".format(" ({})".format(self.keyword) if self.keyword else "")) gdb.post_event(self.delete) return True
@internal
[docs]def strong_next(keyword=None): """ Performes a "strong next" on the currently selected inferior: steps forward, and if we're one step deeper in the callstack, sets a `StrongNextBreakpoint` on the return point of the frame. This function is usefull the `next` commands goes over a runtime function call that switches the current user-level thread. As GDB is not aware of that, it gets lost in its `next` and the execution never stops again. Set it with command `sn` (`communication.cli.my_gdb.cmd_sn`). :param keyword: A message to be displayed when the strop next finishes. :returns: True if a simple `step` was not enough to complete the `next` command. """ start_frame = gdb.newest_frame() gdb.execute("step") new_frame = gdb.newest_frame() if start_frame == new_frame: #log.warning("Nothing special to do over here ...") return False StrongNextBreakpoint("*%d" % start_frame.pc(), keyword) #log.info("please continue manually") gdb.execute("continue") return True
[docs]class SiblingBreakpoint(gdb.Breakpoint): """ Copy of breakpoint `referent` in another program space. Registers itself in `referent.siblings` list. :param referent: the original breakpoint. :type referent: AllInfBreakpoint :var inf_id: the inferior in which this breakpoint was set. """ def __init__(self, referent): assert referent is not None gdb.Breakpoint.__init__(self, referent.location, internal=True) self.silent = True self.referent = referent self.inf_id = gdb.selected_inferior().num self.referent.siblings[self.inf_id] = self
[docs] def stop(self): """ Overides gdb.Breakpoint.stop. :returns: False is the current inferior is not `self.inf_id` :returns: Delegates to `referent.stop()` """ if self.referent.siblings[gdb.selected_inferior().num] != self: log.warning("Sibling breakpoint hit in the wrong inferior") return False try: return self.referent.stop() except AttributeError: return True
def __del__(self): """ Removes itself from `referent.siblings` list. """ del self.referent.siblings[self.inf_id]
[docs]class AllInfBreakpoint(gdb.Breakpoint): """ Subclass of `gdb.Breakpoint` that spreads itself to all the existing and future inferiors. Set them with command `break_spread` (`communication.cli.my_gdb.cmd_AllInfBreakpoint`). """ breakpoints = [] def __init__(self, spec, internal=True): gdb.Breakpoint.__init__(self, spec, internal=internal) self.silent = True inf_id = gdb.selected_inferior().num self.siblings = {inf_id: self} self.__class__.breakpoints.append(self) def __del__(self): for sibling in self.siblings.values(): if sibling != self: sibling.delete() @staticmethod
[docs] def handler_new_objfile (event): """ To connect to `gdb.events.new_objfile`. Spreads AllInfBreakpoints to new interiors. """ AllInfBreakpoint.spread_to_other_inferiors()
@staticmethod
[docs] def spread_to_other_inferiors (): """ Tries to spread all the AllInfBreakpoint to new inferiors, and remove the dead ones from the lists. """ tp_remove = [] for bp in AllInfBreakpoint.breakpoints: if not bp.is_valid(): tp_remove.append(bp) continue inf_id = gdb.selected_inferior().num if inf_id in bp.siblings and bp.siblings[inf_id] is not None: continue log.info("spread to %d" % inf_id) # reached if there is no sibling for this inferior log.info("Create '%s' sibling on inf. %d", bp.location, inf_id) SiblingBreakpoint(bp) for bp in tp_remove: AllInfBreakpoint.breakpoints.remove(bp)
###################### ## Prompt extension ## ###################### @internal
[docs]def defaultPrompt(old): """ Returns GDB's default prompt: (gdb) :param old: unused """ return "(gdb) "
@internal
[docs]class Prompt: """ Extended prompt. :var hooks: set of callable, triggered before returning the local prompt. :var prompts: set of callable, applied recursively (`next_prompt(previous)`) to compte the prompt displayed by GDB. :param old_prompt: the inner most prompt, (eg `(gdb)` or None) """ def __init__(self, old_prompt): self.hooks = set() self.prompts = [] self.pushPrompt(defaultPrompt if old_prompt is None else old_prompt) def __call__(self, old_prompt): for hook in self.hooks: hook() previous = old_prompt for prompt in self.prompts: ret = prompt(previous) if ret is not None: previous = ret return previous
[docs] def registerHook(self, hook): self.hooks.add(hook)
[docs] def unregisterHook(self, hook): self.hooks.remove(hook)
[docs] def pushPrompt(self, prompt): self.prompts.append(prompt)
[docs] def popPrompt(self, prompt): self.prompts.remove(prompt)
################################# ## Task Manager implementation ## #################################
[docs]def current_infthr(): """ Returns GDB's currently selected inferior+thread. List them with command `info infthreads` (`communication.cli.my_gdb.cmd_info_infthread`). :returns: #1 GDB's selected inferior :returns: #2 GDB's selected thread """ return (gdb.selected_inferior(), gdb.selected_thread())
[docs]def switch_to_infthr(infthr, silent=True): """ Switches to another inferior+thread. :param infthr: the inferior+thread to switch to. :param silent: print information on screen or not. Default: True :returns: True if we did the switch. """ thr = None if infthr[1] is not None and infthr[1].is_valid(): infthr[1].switch() inf = gdb.execute ("inferior %s" % infthr[0].num, to_string=True) inf = inf.partition("\n")[0] if not silent: log_user.info("Switched to inferior {}".format(inf)) if thr is not None: log_user.info("Switched to thread {}".format(thr)) return True
[docs]def switch_to_inferior(inferior, silent=True): """ Switches to another inferior. :param inferior: the inferior to switch to. :param silent: print information on screen or not. Default: True :returns: True if we did the switch. """ if inferior is None: return False gdb.execute ("inferior %s" % inferior.num, to_string=silent) return True
[docs]def new_inferior(silent=False): """ Wrapper around GDB's `add-inferior` command. :param silent: print information on screen or not. Default: True :returns: the newly created inferior object, or None if it failed. :rtype: gdb.Inferior """ try: inf = gdb.execute ("add-inferior", to_string=silent) inf_id = re.match ("Added inferior (\d+)", inf).group(1) return num_to_inf_obj(inf_id) except gdb.error as exc: log.warning("couldn't create a new inferior ... ('%s'): %s", inf, exc) return None
[docs]def remove_inferior(inf, silent=True): """ Wrapper around GDB's `add-inferior` command. :param silent: print information on screen or not. Default: True :returns: the GDB's remove string if silent was True. """ return gdb.execute ("remove-inferiors %s" % inf.num, to_string=silent)
[docs]def kill_current_inferior(silent=True): """ Wrapper around GDB's `kill` command. :param silent: print information on screen or not. Default: True :returns: the GDB's kill string if silent was True. """ return gdb.execute("kill", to_string=silent)
[docs]def attach(pid, name=None, comeback=False, silent=False): """ Tries to attach GDB to `pid` process. :param comeback: if true, switch back to the initial inferior. Otherwise, stay on the newly attached inferior. :param silent: print information on screen or not. Default: False. :returns: the newly attached inferior object, or None if it failed. :rtype: gdb.Inferior """ inf = pid_to_inf_obj(pid) if inf is not None: if not comeback: switch_to_inferior(inf) return inf cur_inf_thr = current_infthr() # use current inf. only if not used # AND no execfile has been set if (gdb.selected_inferior().pid == 0 and get_current_executable() == None): inf = gdb.selected_inferior() comeback = False else: #create and connect the new inferior inf = new_inferior(silent=True) switch_to_inferior(inf) try: my_access.do_attach(pid) except gdb.error as exc: switch_to_infthr(cur_inf_thr, silent=True) remove_inferior(inf) if not silent: log.warning("[Couldn't attach to %s%s: %s]", pid, name is not None and "/" + name or "", exc) return None if not silent: log.info("[New Process %s]", (pid)) if comeback: switch_to_infthr(cur_inf_thr, silent=True) return pid_to_inf_obj(pid)
[docs]def set_parameter(name, value): class temp: def __init__(self): self.old = None def __enter__(self): self.old = gdb.parameter(name) if self.old is True: self.old = "on" if self.old is False: self.old = "off" # otherwise don't change gdb.execute("set {} {}".format(name, value), to_string=True) def __exit__(self, type, value, traceback): gdb.execute("set {} {}".format(name, self.old), to_string=True) return temp()
[docs]def active_thread(thread): class temp: def __init__(self): self.selected = gdb.selected_thread() def __enter__(self): thread.switch() def __exit__(self, type, value, traceback): self.selected.switch() return temp()
[docs]def main_thread(inferior=None): if inferior is None: inferior = gdb.selected_inferior() return [th for th in inferior.threads() if th.num == 1][0]
[docs]def attach_all(pids): """ Tries to attach GDB to the processes described by [pids]. :param pids: list of PIDs :type pids: list :returns: the newly attached inferior objects, or [] if all failed. :rtype: list(gdb.Inferior) """ if not pids: log.info("Pid list is empty") return [attach(pid) for pid in pids]
@internal
[docs]class DefaultTaskManager: """ Default task manager implementation, simply relying on GDB's inferior+threads. """
[docs] def get_selected_task(self): """ :returns: the current infthr. """ return current_infthr()
[docs] def get_mark(self, task): """ :returns: "*" if infthr `task` is currently selected or " " """ return "*" if self.is_selected_task(task) else " "
[docs] def is_selected_task(self, task): """ :returns: True if infthr `task` is valid and currently selected. """ return (task is not None and task[1] is not None and task[1].is_valid() and task[1] == gdb.selected_thread())
[docs] def switch_to(self, task): """ Tries to switch to infthr `task`. :returns: True if it succeeded, False otherwise. """ return switch_to_infthr(task)
################################ ## Signals and thread safety ### ################################ def __empty_zone(): def __enter__(): pass def __exit__(*args, **kwargs): pass notified = False
[docs]def may_start_threads(): global notified if pysigset is None and not notified: log.error("Could not load PySigSet") log.error("Expect to encounter a GDB deadlock sooner or later ...") notified = True if pysigset: return pysigset.suspended_signals(signal.SIGCHLD) else: return __empty_zone()
import signal import sys
[docs]def signal_handler(signal, frame): """ Handler for C^c process interuption. Pushes a stop request in mcGDB. Connect it with `signal.signal(signal.SIGINT, signal_handler)`. :param signal: not used :param frame: not used """ log_user.info ('### You pressed Ctrl+C! ### %s') push_stop_request('You pressed Ctrl+C!')
######################
[docs]def primitive_type(typ): try: while True: typ = typ.target() except RuntimeError: #Type does not have a target. return typ
@internal
[docs]def find_type(orig, name): typ = orig.strip_typedefs() while True: search = str(typ) + '::' + name try: return gdb.lookup_type(search) except RuntimeError: pass # The type was not found, so try the superclass. We only need # to check the first superclass, so we don't bother with # anything fancier here. field = typ.fields()[0] if not field.is_base_class: raise ValueError("Cannot find type %s::%s" % (str(orig), name)) typ = field.type
########################################################################### @internal
[docs]def initialize(): #gdb.events.new_objfile.connect(AllInfBreakpoint.handler_new_objfile) log.warn("Signal handler NOT registered (commented out)")
#signal.signal(signal.SIGINT, signal_handler)