# -*- coding: utf-8 -*-
# BASED ON:
# # https://github.com/dosimont/gst2paje/blob/master/gst2paje.py
# #
# # gst2paje : this program allows to convert a gstreamer debug trace to paje trace format
# #
# # Copyright (C) 2012 Damien Dosimont <damien.dosimont@gmail.com>
# #
# # This program is free software; you can redistribute it and/or modify it
# # under the terms of the GNU General Public License as published by the Free
# # Software Foundation; either version 3 of the License, or (at your option)
# # any later version.
# #
# # This program is distributed in the hope that it will be useful, but WITHOUT
# # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# # FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
# # more details.
# #
# # You should have received a copy of the GNU General Public License along with
# # this program. If not, see <http://www.gnu.org/licenses/>.
"""
Module proving Paje trace generation capabilities.
"""
import sys
import os
import logging; log = logging.getLogger(__name__)
log_user = logging.getLogger("mcgdb.log.user")
log_paje = logging.getLogger("mcgdb.log.paje")
# imported later
tempfile = None
ctypes = None
fcntl = None
initialized = False
import gdb
from . import my_gdb
""" Path to POTI shared library. """
LIBPOTI='/home/kevin/travail/tools/akypuera/libpoti/libpoti.so'
poti_lib = None
libc = None
trace_queue_read = None
trace_queue_write = None
@my_gdb.internal
[docs]class Event:
__all = []
__types = set()
__timestamp = 0.0
__tids = set()
def __init__(self, bp, params, do_finish=False, before=False):
self.bp = bp
# to be improved with model specific ids...
self.tid = gdb.selected_thread().num
self.params = {}
if params is not None:
try:
for k, v in params.items():
if not len(str(v).strip()):
continue
self.params[str(k)] = str(v).replace(" ", "_")\
.replace(",", "+")\
.replace("#", "@")
except Exception:
self.params["params"] = str(params)
else:
self.params["None"] = "rien"
self.time = Event.get_ts()
if before and do_finish:
self.params[""] = "<in>"
elif not before:
self.params[""] = "<out>"
Event.__types.add(bp.__class__)
Event.__all.append(self)
Event.__tids.add(self.tid)
def __str__(self):
return "{time}@{tid} {type} {params}".format(time=int(self.time.value),
params=", ".join(["{} ➢ {}".format(k, v) if k.strip() else v for k, v in self.params.items()]),
tid=self.tid,
type=self.bp.__class__.__name__.replace("_Breakpoint", ""))
@staticmethod
[docs] def get_ts(init=False):
if (init):
ts = 0
else:
Event.__timestamp += 1
ts = Event.__timestamp
return ctypes.c_double(ts)
@staticmethod
[docs] def feed_paje(no_header=False, tids=[]):
if not no_header:
poti_lib.poti_header(0, 0)
poti_lib.poti_DefineContainerType("Root".encode("ascii"),
"0".encode("ascii"),
"Root".encode("ascii"))
for tid in Event.__tids:
poti_lib.poti_CreateContainer(Event.get_ts(init=True),
"Thread_{}".format(tid).encode("ascii"),
"Root".encode("ascii"),
str(tid).encode("ascii"),
"Thread #{}".format(tid).encode("ascii"))
for bp_class in Event.__types:
poti_lib.poti_DefineEventType(bp_class.__name__.encode("ascii"),
"Root".encode("ascii"),
bp_class.__name__.encode("ascii"))
for evt in Event.__all:
if tids and not evt.tid in tids:
continue
params = "|".join(["{}=>{}".format(k,v) \
for k, v in evt.params.items()]).encode("ascii")
ts = Event.get_ts()
th = "Th_{}".format(evt.tid).encode("ascii")
ev = evt.bp.__class__.__name__.encode("ascii")
poti_lib.poti_NewEvent(ts, th, ev, params)
if not no_header:
for tid in Event.__tids:
poti_lib.poti_DestroyContainer(Event.get_ts(),
"Root".encode("ascii"),
"Th_{}".format(tid).encode("ascii"))
@staticmethod
[docs] def feed_raw(to_file, tids=[]):
for evt in Event.__all:
if tids and not evt.tid in tids:
continue
log_paje.info(evt)
[docs]def before(bp, params, do_finish):
"""
Creates an function call event `bp` with parameters `params`.
:param bp: Breakpoint that generated the event.
:param params: dictionnary of the parameters of the event.
:type params: dict
:param do_finish: True we can expect an `after` event.
"""
if not initialized: return
Event(bp, params, do_finish, before=True)
[docs]def after(bp, params):
"""
Creates finish function call event `bp` with parameters `params`.
:param bp: Breakpoint that generated the event.
:param params: dictionnary of the parameters of the event.
:type params: dict
"""
if not initialized: return
Event(bp, params, before=False)
##############
## Paje CLI ##
##############
[docs]class cmd_printPaje(gdb.Command):
def __init__(self, limited=False):
gdb.Command.__init__ (self, "paje_print", gdb.COMMAND_NONE)
self.limited = limited
[docs] def invoke (self, args, from_tty):
tids = set()
to_file = None
for arg in gdb.string_to_argv(args):
if arg.isdigit():
tids.add(int(arg))
else:
to_file = arg
to_screen = to_file is None
if to_screen:
to_file = tempfile.mkstemp()[1]
(self.invoke_limited if self.limited else self.invoke_paje)(to_screen, to_file, tids)
if to_screen:
with open(to_file) as output:
log_paje.info("".join(output.readlines()))
os.unlink(to_file)
[docs] def invoke_paje(self, to_screen, to_file, tids):
poti_lib.poti_open(to_file.encode("ascii"))
Event.feed_paje(no_header=to_screen, tids=tids)
poti_lib.poti_close()
[docs] def invoke_limited(self, to_screen, to_file, tids):
Event.feed_raw(to_file, tids)
@my_gdb.internal
[docs]def initialize():
global tempfile, ctypes, ctypes, fcntl, initialized
import tempfile
import ctypes
import fcntl
global poti_lib
try:
libc = ctypes.CDLL("libc.so.6")
poti_lib = ctypes.CDLL(LIBPOTI)
cmd_printPaje()
log_user.debug("Paje trace output enabled")
initialized = True
except Exception as e:
log.warn("Paje: Could *not* initialize Paje/libpoti tracing")
log.warn("Paje: Falling back on raw tracing.")
log.info(e)
cmd_printPaje(limited=True)
initialized = None