from __future__ import print_function
import sys
import gdb
import mcgdb_lite as mcgdb
# logging
PRINT_DEBUG_CAPTURE = False
PRINT_INFO_REPRESENTATION = True
[docs]def debug_capture(*objects, **kwargs):
if not PRINT_DEBUG_CAPTURE: return
print("<capture>", end=" ")
print(*objects, **kwargs)
[docs]def info_representation(*objects, **kwargs):
if not PRINT_INFO_REPRESENTATION: return
print("[", end="")
kwargs["end"] = "]\n"
print(*objects, **kwargs)
pass
#############
## Capture ##
#############
RETURN_VALUE_REGISTER = "$eax"
## <Documentation capture example># Capture `task_new` function calls
[docs]class TaskNewBreakpoint(mcgdb.capture.FunctionBreakpoint):
def __init__(self):
mcgdb.capture.FunctionBreakpoint.__init__(self, "task_new")
[docs] def prepare_before (self):
data = {}
data["body"] = gdb.parse_and_eval("body")
TaskBodyExecutionBreakpoint(str(data["body"]).split(" ")[0])
return False, True, data
[docs] def prepare_after (self, data):
data["task_handle"] = gdb.parse_and_eval("(struct task_s *) {}".format(RETURN_VALUE_REGISTER))
debug_capture("New task created: {} (id: {})".format(data["body"], data["task_handle"]))
Task(data["task_handle"], data["body"])
## </Documentation capture example> ##
[docs]class TaskBodyExecutionBreakpoint(mcgdb.capture.FunctionBreakpoint):
def __init__(self, addr):
mcgdb.capture.FunctionBreakpoint.__init__(self, "*{}".format(addr))
[docs] def prepare_before (self):
data = {}
data["task_handle"] = gdb.newest_frame().older().read_var("task")
task = data["task"] = Task.tasks[str(data["task_handle"])]
task.start_execution()
return False, True, data
[docs] def prepare_after (self, data):
data["task"].finish_execution()
[docs]class TaskDestroyedBreakpoint(mcgdb.capture.FunctionBreakpoint):
def __init__(self):
mcgdb.capture.FunctionBreakpoint.__init__(self, "task_destroy")
[docs] def prepare_before (self):
data = {}
data["task_handle"] = gdb.parse_and_eval("task")
debug_capture("Task destroyed: {}".format(data["task_handle"]))
task = Task.tasks[str(data["task_handle"])]
task.destroy()
return False, False, None
[docs]class TaskRunBreakpoint(mcgdb.capture.FunctionBreakpoint):
def __init__(self):
mcgdb.capture.FunctionBreakpoint.__init__(self, "task_run")
[docs] def prepare_before (self):
data = {}
data["task_handle"] = gdb.parse_and_eval("task")
debug_capture("Task running: {}".format(data["task_handle"]))
task = Task.tasks[str(data["task_handle"])]
task.run()
return False, False, None
[docs]class TaskSetDependencyBreakpoint(mcgdb.capture.FunctionBreakpoint):
def __init__(self):
mcgdb.capture.FunctionBreakpoint.__init__(self, "task_set_dependency")
[docs] def prepare_before (self):
data = {}
data["src"] = gdb.parse_and_eval("src")
data["dst"] = gdb.parse_and_eval("dst")
debug_capture("Task dependency: {} --> {}".format(data["src"], data["dst"]))
dst_task = Task.tasks[str(data["dst"])]
if str(data["src"]) != "0x0":
src_task = Task.tasks[str(data["src"])]
else:
src_task = None
dst_task.depends_of(src_task)
return False, False, None
[docs]class TaskGetResultBreakpoint(mcgdb.capture.FunctionBreakpoint):
def __init__(self):
mcgdb.capture.FunctionBreakpoint.__init__(self, "task_get_result")
[docs] def prepare_before (self):
data = {}
data["task_handle"] = gdb.parse_and_eval("task")
return False, True, data
[docs] def prepare_after (self, data):
data["return_value"] = gdb.parse_and_eval("(int) {}".format(RETURN_VALUE_REGISTER))
debug_capture("Task result: {} {}".format(data["task_handle"], int(data["return_value"])))
task = Task.tasks[str(data["task_handle"])]
task.got_result(int(data["return_value"]))
[docs]class TaskGetFutureBreakpoint(mcgdb.capture.FunctionBreakpoint):
def __init__(self):
mcgdb.capture.FunctionBreakpoint.__init__(self, "task_get_future")
[docs] def prepare_before (self):
data = {}
data["future"] = gdb.parse_and_eval("future")
# Not very clean, assumes that `task_get_future`s parent (=`body`)'s
# parent is `task_get_result`.
# That won't work if task_get_future is called from a subfunction.
# We could look for `body` in the stack, and then we know for sure
# that its parent frame is `task_get_result`.
data["task_handle"] = gdb.newest_frame().older().older().read_var("task")
debug_capture("Task {} get future result from : {} ".format(
data["task_handle"],
data["future"]["task"]))
task = Task.tasks[str(data["task_handle"])]
remote_task = data["remote_task"] = Task.tasks[str(data["future"]["task"])]
task.ask_result_from(remote_task)
return False, True, data
[docs] def prepare_after (self, data):
data["return_value"] = gdb.parse_and_eval("(int) {}".format(RETURN_VALUE_REGISTER))
debug_capture("Task {} got future result : {} ".format(
data["task_handle"],
data["return_value"]))
task = Task.tasks[str(data["task_handle"])]
remote_task = data["remote_task"]
task.got_result_from(remote_task, int(data["return_value"]))
## <Documentation capture 2nd example># Instantiate capture breakpoints
[docs]def init_capture():
TaskNewBreakpoint()
TaskSetDependencyBreakpoint()
TaskRunBreakpoint()
TaskGetResultBreakpoint()
TaskGetFutureBreakpoint()
TaskDestroyedBreakpoint()
## </Documentation capture 2nd example>#
####################
## Representation ##
####################
## <Documentation representation example># Debugger representation for tasks
[docs]class Task:
tasks = {}
uids = 0
def __init__(self, handle, body):
self.body = body
self.alive = True
self.result = None
self.depends_of_task = None
self.executing = False
self.running = False
self.uid = Task.uids
Task.uids += 1
handle = str(handle)
self.handle = handle
Task.tasks[handle] = self
info_representation("New task {} #{}".format(handle, self.uid))
[docs] def run(self):
self.running = True
## <Documentation representation start_exec example> # called when the task starts its execution
[docs] def start_execution(self):
if mcgdb.toolbox.catchable.is_set("catch_exec", self.uid):
mcgdb.capture.FunctionBreakpoint.push_stop_request(
"Execution catchpoint triggered by Task #{}".format(self.uid))
self.executing = True
## </Documentation representation start_exec example>#
[docs] def finish_execution(self):
self.executing = False
[docs] def depends_of(self, src):
self.depends_of_task = src
[docs] def got_result(self, res):
self.running = False
self.result = res
[docs] def ask_result_from(self, rmt):
self.running = False
pass
[docs] def got_result_from(self, rmt, res):
self.running = True
[docs] def destroy(self):
self.alive = False
info_representation("Task {} #{} destroyed".format(self.handle, self.uid))
## </Documentation representation example>#
#################
## Interaction ##
#################
## Tasks listing
[docs]class cmd_task(gdb.Command):
def __init__(self):
gdb.Command.__init__(self, "task", gdb.COMMAND_OBSCURE, prefix=True)
## <Documentation interaction example># List the tasks of an application
[docs]class cmd_task_info(gdb.Command):
def __init__(self):
gdb.Command.__init__(self, "task info", gdb.COMMAND_OBSCURE, prefix=True)
[docs] def invoke(self, arg, from_tty):
for t in reversed(Task.tasks.values()):
print("#{} Task {} {}".format(t.uid, t.handle, str(t.body).split(" ")[1]))
if t.depends_of_task is not None:
print("\tDepends of: #{} Task {}".format(t.depends_of_task.uid, t.depends_of_task.handle))
print("\t{},\t{}, \tResult: {}".format(
"Alive" if t.alive else "Dead",
"Running" if t.running else "Not running",
t.result))
## </Documentation interaction example>#
[docs]def init_interaction_struct():
cmd_task()
cmd_task_info()
## Execution control
## <Documentation interaction 2nd example># Offer catchpoint on task body execution
[docs]class cmd_task_catch_exec(gdb.Command):
"""task catch_exec on|off [task id]*
Sets (on) or remove (off) a catchpoint on one (task id), several or all (empty) the tasks.
`task info` not implemented ...
"""
def __init__ (self):
gdb.Command.__init__(self, "task catch_exec", gdb.COMMAND_OBSCURE)
mcgdb.toolbox.catchable.register("catch_exec")
[docs] def invoke(self, arg, from_tty):
args = gdb.string_to_argv(arg)
if not args:
print("Invalid parameters ...")
print(cmd_task_catch_exec.__doc__)
return
do_activate = args.pop(0) == "on"
tids = [mcgdb.toolbox.catchable.ALL_ENTITIES] if not args else args
for tid in args:
try:
mcgdb.toolbox.catchable.activateRemove("catch_exec",
int(tid),
do_activate)
print("{} execution catchpoint on task #{}.".format("Set" if do_activate else "Unset",
tid))
except Exception as e:
print("Didn't work for task {}: {}".format(tid, e))
## </Documentation interaction 2nd example>#
[docs]def init_interaction_exec():
cmd_task_catch_exec()
####################
[docs]def initialize():
init_capture()
init_interaction_struct()
init_interaction_exec()
print("[Model-Centric Debugging for 'task model' enabled]")