Source code for mcgdb.toolbox.paje

# -*- coding: utf-8 -*-

# BASED ON:
# # https://github.com/dosimont/gst2paje/blob/master/gst2paje.py
# #
# # gst2paje : this program allows to convert a gstreamer debug trace to paje trace format
# #
# # Copyright (C) 2012 Damien Dosimont <damien.dosimont@gmail.com>
# #
# # This program is free software; you can redistribute it and/or modify it
# # under the terms of the GNU General Public License as published by the Free
# # Software Foundation; either version 3 of the License, or (at your option)
# # any later version.
# #
# # This program is distributed in the hope that it will be useful, but WITHOUT
# # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# # FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
# # more details.
# #
# # You should have received a copy of the GNU General Public License along with
# # this program. If not, see <http://www.gnu.org/licenses/>.


"""
Module proving Paje trace generation capabilities.
"""

import sys
import os
import logging; log = logging.getLogger(__name__)
log_user = logging.getLogger("mcgdb.log.user")
log_paje = logging.getLogger("mcgdb.log.paje")

# imported later
tempfile = None
ctypes = None
fcntl = None

initialized = False

import gdb
from . import my_gdb

""" Path to POTI shared library.  """
LIBPOTI='/home/kevin/travail/tools/akypuera/libpoti/libpoti.so'

poti_lib = None
libc = None
trace_queue_read = None
trace_queue_write = None

@my_gdb.internal
[docs]class Event: __all = [] __types = set() __timestamp = 0.0 __tids = set() def __init__(self, bp, params, do_finish=False, before=False): self.bp = bp # to be improved with model specific ids... self.tid = gdb.selected_thread().num self.params = {} if params is not None: try: for k, v in params.items(): if not len(str(v).strip()): continue self.params[str(k)] = str(v).replace(" ", "_")\ .replace(",", "+")\ .replace("#", "@") except Exception: self.params["params"] = str(params) else: self.params["None"] = "rien" self.time = Event.get_ts() if before and do_finish: self.params[""] = "<in>" elif not before: self.params[""] = "<out>" Event.__types.add(bp.__class__) Event.__all.append(self) Event.__tids.add(self.tid) def __str__(self): return "{time}@{tid} {type} {params}".format(time=int(self.time.value), params=", ".join(["{} ➢ {}".format(k, v) if k.strip() else v for k, v in self.params.items()]), tid=self.tid, type=self.bp.__class__.__name__.replace("_Breakpoint", "")) @staticmethod
[docs] def get_ts(init=False): if (init): ts = 0 else: Event.__timestamp += 1 ts = Event.__timestamp return ctypes.c_double(ts)
@staticmethod
[docs] def feed_paje(no_header=False, tids=[]): if not no_header: poti_lib.poti_header(0, 0) poti_lib.poti_DefineContainerType("Root".encode("ascii"), "0".encode("ascii"), "Root".encode("ascii")) for tid in Event.__tids: poti_lib.poti_CreateContainer(Event.get_ts(init=True), "Thread_{}".format(tid).encode("ascii"), "Root".encode("ascii"), str(tid).encode("ascii"), "Thread #{}".format(tid).encode("ascii")) for bp_class in Event.__types: poti_lib.poti_DefineEventType(bp_class.__name__.encode("ascii"), "Root".encode("ascii"), bp_class.__name__.encode("ascii")) for evt in Event.__all: if tids and not evt.tid in tids: continue params = "|".join(["{}=>{}".format(k,v) \ for k, v in evt.params.items()]).encode("ascii") ts = Event.get_ts() th = "Th_{}".format(evt.tid).encode("ascii") ev = evt.bp.__class__.__name__.encode("ascii") poti_lib.poti_NewEvent(ts, th, ev, params) if not no_header: for tid in Event.__tids: poti_lib.poti_DestroyContainer(Event.get_ts(), "Root".encode("ascii"), "Th_{}".format(tid).encode("ascii"))
@staticmethod
[docs] def feed_raw(to_file, tids=[]): for evt in Event.__all: if tids and not evt.tid in tids: continue log_paje.info(evt)
[docs]def before(bp, params, do_finish): """ Creates an function call event `bp` with parameters `params`. :param bp: Breakpoint that generated the event. :param params: dictionnary of the parameters of the event. :type params: dict :param do_finish: True we can expect an `after` event. """ if not initialized: return Event(bp, params, do_finish, before=True)
[docs]def after(bp, params): """ Creates finish function call event `bp` with parameters `params`. :param bp: Breakpoint that generated the event. :param params: dictionnary of the parameters of the event. :type params: dict """ if not initialized: return Event(bp, params, before=False)
############## ## Paje CLI ## ##############
[docs]class cmd_printPaje(gdb.Command): def __init__(self, limited=False): gdb.Command.__init__ (self, "paje_print", gdb.COMMAND_NONE) self.limited = limited
[docs] def invoke (self, args, from_tty): tids = set() to_file = None for arg in gdb.string_to_argv(args): if arg.isdigit(): tids.add(int(arg)) else: to_file = arg to_screen = to_file is None if to_screen: to_file = tempfile.mkstemp()[1] (self.invoke_limited if self.limited else self.invoke_paje)(to_screen, to_file, tids) if to_screen: with open(to_file) as output: log_paje.info("".join(output.readlines())) os.unlink(to_file)
[docs] def invoke_paje(self, to_screen, to_file, tids): poti_lib.poti_open(to_file.encode("ascii")) Event.feed_paje(no_header=to_screen, tids=tids) poti_lib.poti_close()
[docs] def invoke_limited(self, to_screen, to_file, tids): Event.feed_raw(to_file, tids)
@my_gdb.internal
[docs]def initialize(): global tempfile, ctypes, ctypes, fcntl, initialized import tempfile import ctypes import fcntl global poti_lib try: libc = ctypes.CDLL("libc.so.6") poti_lib = ctypes.CDLL(LIBPOTI) cmd_printPaje() log_user.debug("Paje trace output enabled") initialized = True except Exception as e: log.warn("Paje: Could *not* initialize Paje/libpoti tracing") log.warn("Paje: Falling back on raw tracing.") log.info(e) cmd_printPaje(limited=True) initialized = None