import inspect
import gdb
import mcgdb.interaction
from .toolbox import my_gdb
from .toolbox import paje as tracing
from .toolbox.target import my_archi
from . import representation, interaction
from mcgdb.toolbox.python_utils import *
import mcgdb
import logging;
log = logging.getLogger(__name__)
log_fbp = logging.getLogger("mcgdb.log.event.fct_bpt")
[docs]class FrameStripperDecorator:
def __init__(self, parent):
self.parent = parent
[docs] def function(self):
return self.parent.function()
[docs] def frame_locals(self): return None
[docs] def frame_args(self): return None
[docs] def address(self): return None
[docs] def line(self): return None
[docs] def filename(self): return None
[docs] def inferior_frame(self):
return self.parent.inferior_frame()
[docs]class BreakpointDecoratorFrameFilter:
def __init__(self):
self.enabled = True
self.priority = 100
[docs] def filter(self, frames):
for frame in frames:
function = str(frame.inferior_frame().name())
# function might be prefixed by filename
bpts = [val for key, val in mcgdb.capture.FunctionBreakpoint.breakpointed.items() if key.endswith(function)]
if not bpts:
yield frame
continue
bpt = bpts[0]
assert hasattr(bpt, "decorator")
if not bpt.decorator:
yield frame
continue
yield bpt.decorator(frame)
"""
Lower priority is call last.
Lower priority means higher control.
"""
[docs]def frame_depth(frame):
current = frame
cpt = 0
while current.newer() is not None:
cpt += 1
current = current.newer()
return cpt
[docs]class NoPCFrame(FrameStripperDecorator):
[docs] def function(self): return self.parent.function()
[docs] def address(self): return None
[docs] def frame_locals(self): return self.parent.frame_locals()
[docs] def frame_args(self): return self.parent.frame_args()
[docs] def line(self): return self.parent.line()
[docs] def filename(self): return self.parent.filename()
[docs]class NoPCFrameFilter:
def __init__(self):
self.enabled = True
self.priority = 10
[docs] def filter(self, frames):
for frame in frames:
#if not frame.line():
if True:
yield NoPCFrame(frame)
else:
yield frame
################
[docs]class FunctionFinishBreakpoint (gdb.Breakpoint):
hit_count = 0
def __init__ (self, parent, fct_data, filter_params):
gdb.Breakpoint.__init__(self, "*%s" % gdb.selected_frame().older().pc(),
internal=True)
#gdb.FinishBreakpoint.__init__(self, "*%s" % gdb.newest_frame(),
# internal=True)
self.silent = True
self.parent = parent
self.fct_data = fct_data
self.filter_params = filter_params
self.thread = int(gdb.selected_thread().num)
@profile(immediate=False)
[docs] def stop(self):
log_fbp.info("thread {} finished {} ({})".format(
gdb.selected_thread().num,
self.parent.__class__.__name__,
self.parent.location))
FunctionFinishBreakpoint.hit_count += 1
self.parent.fhit_internal += 1
self.enabled = False
num = gdb.selected_thread().num
if self.parent.has_early:
tid = gdb.selected_thread().num
try:
self.parent.had_early_run.remove(tid) # may raise KeyError
except KeyError:
pass # `tid` had no early run
try:
fct_stop = self.parent.prepare_after(self.fct_data)
except KeyboardInterrupt as e:
log.critical("{}.prepare_after received a Keyboard interruption".format(self.__class__.__name__))
log.critical("The rest of the execution might not be reliable.")
return True
except Exception as e:
log.critical("{}.prepare_after failed: {}".format(
self.parent.__class__.__name__, e))
log.error(e)
fct_stop = False
mcgdb.toolbox.callback_crash("FunctionBreakpoint.prepare_after", e)
filter_stop = self.parent.stop_after_filters(self.filter_params)
requested_stop = interaction.proceed_stop_requests()
tracing.after(self.parent, self.fct_data)
self.enabled = False
gdb.post_event(self.delete)
return fct_stop or filter_stop or requested_stop
[docs] def out_of_scope(self):
pass
[docs]class FunctionType:
root = None
def __init__ (self, name, parent=None):
self.name = name
self.children = []
self.functions = []
self.enabled = True
if FunctionType.root is None:
FunctionType.root = self
else:
if parent is None:
parent = FunctionType.root
parent.children.append(self)
def __str__(self):
return "%s" % self.name
def __repr__(self):
return "<function type: %s>" % self.name
[docs] def to_enabled(self, status):
assert status is True or status is False
self.enabled = status
for bpt in self.functions:
bpt.enabled = status
[docs]class FunctionTypes:
def __init__(self):
pass
general_func = FunctionType("General")
define_func = FunctionType("Definition", general_func)
conf_func = FunctionType("Configuration", define_func)
comm_func = FunctionType("Communication", define_func)
info_func = FunctionType("Information", define_func)
exec_func = FunctionType("Execution", define_func)
[docs]class AlreadyInstalledException(Exception):
def __init__(self, actual):
self.actual = actual
def __str__(self):
return "Breakpoint {} already installed: {}".format(self.actual.location, self.actual)
@my_gdb.Listed
[docs]class FunctionBreakpoint(gdb.Breakpoint, representation.Filterable):
"""
Breakpoint set at the entry and return point of function `spec`.
Subclasses should extend methods `FunctionBreakpoint.prepare_before`
and `FunctionBreakpoint.prepare_after` to capture relevant information
(parameters) from the function calls.
:param spec: location at wich the breakpoint should be set.
:var func_type: a member of `FunctionTypes` indicating the nature \
of the breakpoint. This type will be used to enable/disable some \
families of breakpoints.
:var hit_internal: number of breakpoint hits.
:var fhit_internal: number of hits of the finish breakpoint.
"""
func_type = FunctionTypes.general_func
breakpointed = {}
decorator = FrameStripperDecorator
hit_count = 0
@staticmethod
# connected on stops
[docs] def check_breakpoint_validity(_=None):
for spec, bpt in list(FunctionBreakpoint.breakpointed.items()):
if not bpt.is_valid():
del FunctionBreakpoint.breakpointed[spec]
def __init__ (self, spec, temporary=False):
if spec in FunctionBreakpoint.breakpointed:
bp = FunctionBreakpoint.breakpointed[spec]
if bp.is_valid() and not bp.deleted:
raise AlreadyInstalledException(bp)
log_fbp.info("break on {} ({})".format(
self.__class__.__name__, spec))
FunctionBreakpoint.breakpointed[spec] = self
#my_gdb.AllInfBreakpoint.__init__ (self, spec, internal=True)
gdb.Breakpoint.__init__ (self, spec,
temporary=temporary, internal=True)
representation.Filterable.__init__(self, spec)
self.deleted = False
self.silent = True
self.hit_internal = 0
self.fhit_internal = 0
self.__class__.func_type.functions.append(self)
self.enabled = self.__class__.func_type.enabled
self.has_early = "early" in inspect.getargspec(self.prepare_before).args
self.had_early_run = set()
[docs] def run_early(self):
stopped_thread = gdb.selected_thread()
if stopped_thread.is_running():
return
bp_pc = gdb.newest_frame().pc()
for inf in gdb.inferiors():
for thr in inf.threads():
if thr.num in self.had_early_run: continue
if thr is stopped_thread: continue
thr.switch()
if gdb.newest_frame().pc() != bp_pc: continue
log_fbp.debug("Early run of {}.prepare_before".format(self.__class__.__name__))
self.prepare_before(early=True, first=True)
self.had_early_run.add(thr.num)
stopped_thread.switch()
@profile(immediate=False)
[docs] def stop (self):
log_fbp.info("thread #{} stopped in {} ({})".format(
gdb.selected_thread().num,
self.__class__.__name__,
self.location))
num = gdb.selected_thread().num
self.hit_internal += 1
events.bp_stop.trigger(self)
FunctionBreakpoint.hit_count += 1
args = {}
if self.has_early:
try:
self.run_early()
except KeyboardInterrupt as e:
log.critical("{}.run_early received a Keyboard interruption")
log.critical("The rest of the execution might not be reliable.")
return True
tid = gdb.selected_thread().num
first_run = tid not in self.had_early_run
args = {"early": False, "first": first_run}
self.had_early_run.add(tid)
try:
ret = self.prepare_before(**args)
except KeyboardInterrupt as e:
log.critical("{}.prepare_before received a Keyboard interruption")
log.critical("The rest of the execution might not be reliable.")
return True
except Exception as e:
log.critical("{}.prepare_before failed: {}".format(self.__class__.__name__, e))
log.error(e)
ret = None
mcgdb.toolbox.callback_crash("FunctionBreakpoint.prepare_before", e)
if ret is None: return False #spurious stop
(fct_stop, fct_finish, fct_data) = ret
tracing.before(self, fct_data, fct_finish)
filter_stop, filter_params = self.stop_before_filters()
if self.has_early or fct_finish or self.has_after_filters():
FunctionFinishBreakpoint(self, fct_data, filter_params)
for fct in my_gdb.late_function_breakpoint_execution():
try:
fct()
except Exception as e:
log.error("Late FunctionBreakpoint execution for {} failed: {}".format(fct, e))
mcgdb.toolbox.callback_crash("Late FunctionBreakpoint", e)
requested_stop = interaction.proceed_stop_requests()
return fct_stop or filter_stop or requested_stop
@my_gdb.virtual
[docs] def prepare_before(self):
"""
Function executed with the FunctionBreakpoint hits its target function.
By convention, return #3 contains a dictionnay will all the information
captured at the breakpoint stop.
The items of the dictionnary will be transmitted to the data logger for
offline trace generation.
:returns: `None` if this was a spurious stop. \
The execution won't be stop, nor the user notified.
:returns: #1 True if the execution should be stopped now.
:returns: #2 True if the finish breakpoint should be inserted.
:returns: #3 Any object to transmit to `FunctionBreakpoint.prepare_after`.
"""
return None
@my_gdb.virtual
[docs] def prepare_after(self, data):
"""
Function executed when the function targeted by this
FunctionBreakpoint finishes.
:param data: the object returned in position #3a \
by FunctionBreakpoint.prepare_before.
:returns: True if GDB should stop the execution.
"""
return False
def __str__(self):
return "FunctionBreakpoint at '%s'" % self.location
###################
[docs]def initialize():
gdb.frame_filters["No PC on backtrace"] = NoPCFrameFilter()
gdb.frame_filters["mcGDB breakpoint decorators"] = BreakpointDecoratorFrameFilter()
gdb.events.stop.connect(FunctionBreakpoint.check_breakpoint_validity)