Source code for mcgdb.capture

import inspect
    
import gdb
import mcgdb.interaction
from .toolbox import my_gdb
from .toolbox import paje as tracing
from .toolbox.target import my_archi

from . import representation, interaction

from mcgdb.toolbox.python_utils import *
import mcgdb

import logging;
log = logging.getLogger(__name__)
log_fbp = logging.getLogger("mcgdb.log.event.fct_bpt")

[docs]class FrameStripperDecorator: def __init__(self, parent): self.parent = parent
[docs] def function(self): return self.parent.function()
[docs] def frame_locals(self): return None
[docs] def frame_args(self): return None
[docs] def address(self): return None
[docs] def line(self): return None
[docs] def filename(self): return None
[docs] def inferior_frame(self): return self.parent.inferior_frame()
[docs]class BreakpointDecoratorFrameFilter: def __init__(self): self.enabled = True self.priority = 100
[docs] def filter(self, frames): for frame in frames: function = str(frame.inferior_frame().name()) # function might be prefixed by filename bpts = [val for key, val in mcgdb.capture.FunctionBreakpoint.breakpointed.items() if key.endswith(function)] if not bpts: yield frame continue bpt = bpts[0] assert hasattr(bpt, "decorator") if not bpt.decorator: yield frame continue yield bpt.decorator(frame)
""" Lower priority is call last. Lower priority means higher control. """
[docs]def frame_depth(frame): current = frame cpt = 0 while current.newer() is not None: cpt += 1 current = current.newer() return cpt
[docs]class NoPCFrame(FrameStripperDecorator):
[docs] def function(self): return self.parent.function()
[docs] def address(self): return None
[docs] def frame_locals(self): return self.parent.frame_locals()
[docs] def frame_args(self): return self.parent.frame_args()
[docs] def line(self): return self.parent.line()
[docs] def filename(self): return self.parent.filename()
[docs]class NoPCFrameFilter: def __init__(self): self.enabled = True self.priority = 10
[docs] def filter(self, frames): for frame in frames: #if not frame.line(): if True: yield NoPCFrame(frame) else: yield frame
################
[docs]class FunctionFinishBreakpoint (gdb.Breakpoint): hit_count = 0 def __init__ (self, parent, fct_data, filter_params): gdb.Breakpoint.__init__(self, "*%s" % gdb.selected_frame().older().pc(), internal=True) #gdb.FinishBreakpoint.__init__(self, "*%s" % gdb.newest_frame(), # internal=True) self.silent = True self.parent = parent self.fct_data = fct_data self.filter_params = filter_params self.thread = int(gdb.selected_thread().num) @profile(immediate=False)
[docs] def stop(self): log_fbp.info("thread {} finished {} ({})".format( gdb.selected_thread().num, self.parent.__class__.__name__, self.parent.location)) FunctionFinishBreakpoint.hit_count += 1 self.parent.fhit_internal += 1 self.enabled = False num = gdb.selected_thread().num if self.parent.has_early: tid = gdb.selected_thread().num try: self.parent.had_early_run.remove(tid) # may raise KeyError except KeyError: pass # `tid` had no early run try: fct_stop = self.parent.prepare_after(self.fct_data) except KeyboardInterrupt as e: log.critical("{}.prepare_after received a Keyboard interruption".format(self.__class__.__name__)) log.critical("The rest of the execution might not be reliable.") return True except Exception as e: log.critical("{}.prepare_after failed: {}".format( self.parent.__class__.__name__, e)) log.error(e) fct_stop = False mcgdb.toolbox.callback_crash("FunctionBreakpoint.prepare_after", e) filter_stop = self.parent.stop_after_filters(self.filter_params) requested_stop = interaction.proceed_stop_requests() tracing.after(self.parent, self.fct_data) self.enabled = False gdb.post_event(self.delete) return fct_stop or filter_stop or requested_stop
[docs] def out_of_scope(self): pass
[docs]class FunctionType: root = None def __init__ (self, name, parent=None): self.name = name self.children = [] self.functions = [] self.enabled = True if FunctionType.root is None: FunctionType.root = self else: if parent is None: parent = FunctionType.root parent.children.append(self) def __str__(self): return "%s" % self.name def __repr__(self): return "<function type: %s>" % self.name
[docs] def to_enabled(self, status): assert status is True or status is False self.enabled = status for bpt in self.functions: bpt.enabled = status
[docs]class FunctionTypes: def __init__(self): pass general_func = FunctionType("General") define_func = FunctionType("Definition", general_func) conf_func = FunctionType("Configuration", define_func) comm_func = FunctionType("Communication", define_func) info_func = FunctionType("Information", define_func) exec_func = FunctionType("Execution", define_func)
[docs]class AlreadyInstalledException(Exception): def __init__(self, actual): self.actual = actual def __str__(self): return "Breakpoint {} already installed: {}".format(self.actual.location, self.actual)
@my_gdb.Listed
[docs]class FunctionBreakpoint(gdb.Breakpoint, representation.Filterable): """ Breakpoint set at the entry and return point of function `spec`. Subclasses should extend methods `FunctionBreakpoint.prepare_before` and `FunctionBreakpoint.prepare_after` to capture relevant information (parameters) from the function calls. :param spec: location at wich the breakpoint should be set. :var func_type: a member of `FunctionTypes` indicating the nature \ of the breakpoint. This type will be used to enable/disable some \ families of breakpoints. :var hit_internal: number of breakpoint hits. :var fhit_internal: number of hits of the finish breakpoint. """ func_type = FunctionTypes.general_func breakpointed = {} decorator = FrameStripperDecorator hit_count = 0 @staticmethod # connected on stops
[docs] def check_breakpoint_validity(_=None): for spec, bpt in list(FunctionBreakpoint.breakpointed.items()): if not bpt.is_valid(): del FunctionBreakpoint.breakpointed[spec]
def __init__ (self, spec, temporary=False): if spec in FunctionBreakpoint.breakpointed: bp = FunctionBreakpoint.breakpointed[spec] if bp.is_valid() and not bp.deleted: raise AlreadyInstalledException(bp) log_fbp.info("break on {} ({})".format( self.__class__.__name__, spec)) FunctionBreakpoint.breakpointed[spec] = self #my_gdb.AllInfBreakpoint.__init__ (self, spec, internal=True) gdb.Breakpoint.__init__ (self, spec, temporary=temporary, internal=True) representation.Filterable.__init__(self, spec) self.deleted = False self.silent = True self.hit_internal = 0 self.fhit_internal = 0 self.__class__.func_type.functions.append(self) self.enabled = self.__class__.func_type.enabled self.has_early = "early" in inspect.getargspec(self.prepare_before).args self.had_early_run = set()
[docs] def run_early(self): stopped_thread = gdb.selected_thread() if stopped_thread.is_running(): return bp_pc = gdb.newest_frame().pc() for inf in gdb.inferiors(): for thr in inf.threads(): if thr.num in self.had_early_run: continue if thr is stopped_thread: continue thr.switch() if gdb.newest_frame().pc() != bp_pc: continue log_fbp.debug("Early run of {}.prepare_before".format(self.__class__.__name__)) self.prepare_before(early=True, first=True) self.had_early_run.add(thr.num) stopped_thread.switch()
@profile(immediate=False)
[docs] def stop (self): log_fbp.info("thread #{} stopped in {} ({})".format( gdb.selected_thread().num, self.__class__.__name__, self.location)) num = gdb.selected_thread().num self.hit_internal += 1 events.bp_stop.trigger(self) FunctionBreakpoint.hit_count += 1 args = {} if self.has_early: try: self.run_early() except KeyboardInterrupt as e: log.critical("{}.run_early received a Keyboard interruption") log.critical("The rest of the execution might not be reliable.") return True tid = gdb.selected_thread().num first_run = tid not in self.had_early_run args = {"early": False, "first": first_run} self.had_early_run.add(tid) try: ret = self.prepare_before(**args) except KeyboardInterrupt as e: log.critical("{}.prepare_before received a Keyboard interruption") log.critical("The rest of the execution might not be reliable.") return True except Exception as e: log.critical("{}.prepare_before failed: {}".format(self.__class__.__name__, e)) log.error(e) ret = None mcgdb.toolbox.callback_crash("FunctionBreakpoint.prepare_before", e) if ret is None: return False #spurious stop (fct_stop, fct_finish, fct_data) = ret tracing.before(self, fct_data, fct_finish) filter_stop, filter_params = self.stop_before_filters() if self.has_early or fct_finish or self.has_after_filters(): FunctionFinishBreakpoint(self, fct_data, filter_params) for fct in my_gdb.late_function_breakpoint_execution(): try: fct() except Exception as e: log.error("Late FunctionBreakpoint execution for {} failed: {}".format(fct, e)) mcgdb.toolbox.callback_crash("Late FunctionBreakpoint", e) requested_stop = interaction.proceed_stop_requests() return fct_stop or filter_stop or requested_stop
@my_gdb.virtual
[docs] def prepare_before(self): """ Function executed with the FunctionBreakpoint hits its target function. By convention, return #3 contains a dictionnay will all the information captured at the breakpoint stop. The items of the dictionnary will be transmitted to the data logger for offline trace generation. :returns: `None` if this was a spurious stop. \ The execution won't be stop, nor the user notified. :returns: #1 True if the execution should be stopped now. :returns: #2 True if the finish breakpoint should be inserted. :returns: #3 Any object to transmit to `FunctionBreakpoint.prepare_after`. """ return None
@my_gdb.virtual
[docs] def prepare_after(self, data): """ Function executed when the function targeted by this FunctionBreakpoint finishes. :param data: the object returned in position #3a \ by FunctionBreakpoint.prepare_before. :returns: True if GDB should stop the execution. """ return False
def __str__(self): return "FunctionBreakpoint at '%s'" % self.location
###################
[docs]def initialize(): gdb.frame_filters["No PC on backtrace"] = NoPCFrameFilter() gdb.frame_filters["mcGDB breakpoint decorators"] = BreakpointDecoratorFrameFilter() gdb.events.stop.connect(FunctionBreakpoint.check_breakpoint_validity)