Source code for mcgdb.toolbox.target.system_utils

"""
Module proving system (Linux) utility functions.
"""

import re

from .. import my_gdb
from . import my_access

PS_LINE = "\s*(\d+)\s+(\d+)"
PS_PID = 1
PS_PPID = 2
PS_ARGS = ["ps", "-o", "pid,ppid", "ax"]

[docs]def grand_children(pid): """ Returns the grand children of a process. Mainly used for MPI's mpiruned processes. :param pid: PID of the mpirun process. :returns: PIDs of the grand children we found. :rtype: list(int) """ son = [] grand_son = [] out, err = my_access.Popen(PS_ARGS).communicate() for line in out.split("\n"): psline = re.match(PS_LINE, line) if psline is not None: cur_pid = int(psline.group(PS_PID)) cur_ppid = int(psline.group(PS_PPID)) if cur_ppid == pid: son.append(cur_pid) else: try: son.remove(cur_ppid) grand_son.append(cur_pid) except ValueError: #not in the list; continue pass if len(son) != 0: #second loop to catch processes with PID < PPID for line in out.split("\n"): psline = re.match(PS_LINE, line) if psline is not None: cur_pid = int(psline.group(PS_PID)) cur_ppid = int(psline.group(PS_PPID)) try: son.remove(cur_ppid) grand_son.append(cur_pid) except ValueError: #not in the list; continue pass return grand_son
[docs]def pidsOf(name): """ Returns the PIDs of processes named `name`. :returns: list of PIDs found. """ out, err = my_access.Popen(["pidof", name]).communicate() if out == '': return [] return out.replace("\n", "").split(" ")
[docs]def int_to_dotted_ip(intip): """ Converts an integer to an IP address """ octet = '' for exp in range(3, -1, -1): octet = "." + str(intip / ( 256 ** exp )) + octet intip = intip % ( 256 ** exp ) return octet.lstrip('.')
[docs]def hex_to_dec(s): """Return the integer value of a hexadecimal string s""" return int(s, 16)
@my_gdb.internal
[docs]class PrivateAttribute(object): __is_frozen = False def __setattr__(self, key, value): if self.__is_frozen and not hasattr(self, key): raise TypeError( "%r is a frozen class" % self ) object.__setattr__(self, key, value) def _freeze(self): self.__is_frozen = True
@my_gdb.internal
[docs]def initialize(): my_gdb.log.info("[System functions enabled]")