Source code for mcgdb.model.task.environment.openmp.representation

from __future__ import print_function

import logging; log = logging.getLogger(__name__)
from collections import OrderedDict

import gdb

import mcgdb
from mcgdb.toolbox import my_gdb
from mcgdb.toolbox import aspect
from mcgdb.toolbox import SimpleClass
from mcgdb.model.task import representation as task_representation
from mcgdb.interaction import push_stop_request, cancel_stop_request

from . import toolbox

def _worker_not_impl():
    raise LookupError("Current worker lookup not implemented ...")

_get_current_worker = _worker_not_impl
_worker_is_alive = _worker_not_impl
_get_dependency_name = None
_attach = None
_array_matching_dependency = None

is_attached = False

try:
    from enum import Enum
[docs] class Zone(Enum): Parallel = "parallel" Barrier = "barrier" Single = "single" Critical = "critical"
except ImportError: Zone = None pass # do not fail now
[docs]def activate(): try: from enum import Enum # fail now if necessary except ImportError as e: # not in Python2 standard library log.error("pip2 install enum34 https://pypi.python.org/pypi/enum34") raise e
@my_gdb.Listed @my_gdb.Numbered @my_gdb.Dicted @my_gdb.Switchable
[docs]class Worker(task_representation.CommComponent, aspect.Tracker): @staticmethod
[docs] def get_current_worker(): return _get_current_worker()
TASK_MANAGER = my_gdb.DefaultTaskManager def __init__(self, thread_key): task_representation.CommComponent.__init__(self) aspect.Tracker.__init__(self) Worker.init_dict(thread_key, self) self.thread_key = thread_key self.dead = False self.job_stack = [] toolbox.say("New {}".format(self)) for worker in Worker.list_: worker.say("Hello #{}".format(worker.numbers[Worker])) toolbox.say()
[docs] def switch(self, force=False): self.thread_key.switch()
[docs] def is_alive(self): return _worker_is_alive(self)
[docs] def is_current(self): return self is Worker.get_current_worker()
[docs] def say(self, what): toolbox.say(what, self.numbers[Worker], len(self.list_))
[docs] def terminated(self): self.say("/{}".format(self)) self.dead = True self.thread_key = None
[docs] def create(self, where): self.job_stack.append(where)
[docs] def work(self, where, start=False, stop=False): assert start is not stop if start: if not self.job_stack or not self.job_stack[-1] is where: self.job_stack.append(where) else: assert self.job_stack if isinstance(where, TaskJob): assert where in self.job_stack self.job_stack.remove(where) elif isinstance(where, ParallelJob): assert where in self.job_stack popped = None while not popped is where: popped = self.job_stack.pop() else: assert self.job_stack[-1] is where if not (isinstance(where, ParallelJob) and self is where.parent_worker): self.job_stack.pop()
def __str__(self): return "Worker #{}".format(self.numbers[self.__class__]) def __repr__(self): return "worker_{}".format(self.numbers[self.__class__]) def __lt__(self, other): return str(self) < str(other)
@my_gdb.Listed @my_gdb.Numbered @my_gdb.Dicted
[docs]class Job(aspect.Tracker):
[docs] def say(self, what): toolbox.say(what)
def __str__(self): return "{} #{}".format(self.__class__.__name__, self.numbers[self.__class__]) def __repr__(self): try: return "{}_{}".format(self.__class__.__name__, self.numbers[self.__class__]) except: return object.__repr__(self)
@my_gdb.Listed @my_gdb.Numbered @my_gdb.Dicted
[docs]class ParallelJob(Job): def __init__(self, num_workers, parent_worker, section_count=0, is_for=False): Job.__init__(self) self.parent_worker = parent_worker self.workers = set() self.num_workers = num_workers self.has_sections = section_count != 0 self.section = None self.barrier = None self.single = [] self.critical = [] self.count = 0 self.is_for = is_for if self.has_sections != 0: self.section_zone(section_count) self.say(self)
[docs] def start_working(self, worker): self.count += 1 assert worker not in self.workers self.workers.add(worker) worker.work(self, start=True) worker.say("WORK on {}".format(self))
[docs] def stop_working(self, worker): assert worker in self.workers self.workers.remove(worker) self.count -= 1 worker.say("DONE with {}".format(self)) worker.work(self, stop=True)
[docs] def completed(self): if self.section: self.stop_section_zone() if self.count != 0: log.warning("/!\\ strange zone counter detected /!\|") log.info("(it's like {} workers are still inside ...".format(self.count)) self.say("/{}".format(self))
[docs] def start_section_zone(self, worker, count=0): if self.section is None: self.section = SectionJob(self, worker, count)
[docs] def stop_section_zone(self, is_parallel_job=False): assert self.section if self.has_sections: self.section.completed()
def __str__(self): return "Parallel{}Job #{}".format("For" if self.is_for else "", self.numbers[self.__class__]) @staticmethod
[docs] def work_completed(worker): zone = ParallelJob.get_current_parallel_job(worker) if not zone: # zone already completed return None assert self.job.count == 1 assert zone is worker.job_stack[-1] zone.stop_working(worker) zone.completed() return zone
@staticmethod
[docs] def get_current_parallel_job(worker=None): if worker is None: return ParallelJob.list_[-1] if ParallelJob.list_ else None for job in worker.job_stack[::-1]: if isinstance(job, ParallelJob): return job
@my_gdb.Listed @my_gdb.Numbered @my_gdb.Dicted
[docs]class SectionJob(Job): def __init__(self, parallel_job, worker, count=0): Job.__init__(self) self.parallel_job = parallel_job self.section_count = count self.sections = {} self.ended = 0 self.has_completed = False worker.create(self) self.say("ZONE with {} SECTIONS".format(count))
[docs] def work_on_section(self, worker, section_id): if section_id != 0: assert section_id not in self.sections.keys() self.sections[worker] = section_id worker.say("WORK on {} {}/{}".format(self, section_id, self.section_count)) if not isinstance(worker.job_stack[-1], SectionJob): worker.work(self, start=True) else: worker.say("NO MORE WORK on {}'s {} sections".format(self, self.section_count)) if isinstance(worker.job_stack[-1], SectionJob): worker.work(self, stop=True)
[docs] def completed(self): self.has_completed = False self.say("/{} with {} SECTIONS".format(self, self.section_count))
@my_gdb.Listed @my_gdb.Numbered @my_gdb.Dicted
[docs]class SingleJob(Job): def __init__(self, worker, parallel_job): Job.__init__(self) self.parallel_job = parallel_job self.visitor = None self.visitors = set() self.finish = False # hugly fix ... parallel_job.single.append(self) parallel_job.barrier = Barrier(parallel_job, worker, single=self) self.say(self)
[docs] def enter(self, inside, worker): if inside: assert self.visitor is None worker.say("INSIDE {}".format(self)) self.visitor = worker worker.work(self, start=True) else: worker.say("REJECTED from {}".format(self)) self.visitors.add(worker)
[docs] def finished(self): self.visitor.say("OUTSIDE of {}".format(self)) self.finish = True self.visitor.work(self, stop=True)
[docs] def completed(self): self.parallel_job.single.remove(self) self.say("/{}".format(self))
@staticmethod
[docs] def get_parallel_single_zone(worker): top_job = worker.job_stack[-1] if isinstance(top_job, SingleJob) and not worker in top_job.visitors: return worker.job_stack[-1] for job in reversed(worker.job_stack): if not isinstance(job, ParallelJob): continue for single_job in job.single: assert single_job is not None assert isinstance(single_job, SingleJob) if worker not in single_job.visitors: return single_job else: return SingleJob(worker, job) else: assert not "Could not find the single job or parent parallel zone..."
@my_gdb.Listed @my_gdb.Numbered @my_gdb.Dicted
[docs]class Barrier(Job, aspect.Tracker): # threads_blocked_at[thread] = <barrier> threads_blocked_at = {} def __init__(self, parallel_job, worker, single=None, sections=None): Job.__init__(self) self.parallel_job = parallel_job self.single = single self.sections = sections self.count = False self.open = False self.internal = single is not None or sections is not None self._stop_all_after = False self.loc = {} # worker -> filename:lineno
[docs] def reach_barrier(self, worker, location): if self.count is False: self.count = 0 self.parallel_job.say(self) self.loc[worker] = location if not len(set(self.loc.values())) == 1: log.warning("Multiple barriers are being hit:") list(map(log.warning, map(" - {}".format, self.loc.values()))) self.count += 1 Barrier.threads_blocked_at[worker] = self if self.single and not self.single.finish \ and self.single.visitor is worker: self.single.finished() worker.work(self, start=True) worker.say("BLOCKED at {} ({}/{})".format(self, self.count, self.parallel_job.count))
[docs] def leave_barrier(self, worker): if not self.open: self.open = True self.parallel_job.say("{} OPENED".format(self)) self.parallel_job.barrier = None self.count -= 1 assert Barrier.threads_blocked_at[worker] is self del Barrier.threads_blocked_at[worker] worker.work(self, stop=True) if self.count == 0: self.completed()
[docs] def completed(self): self.parallel_job.say("/ #{}".format(self)) if self.single: self.single.completed() if self.sections: self.sections.completed()
@property def stop_all_after(self): return self._stop_all_after @stop_all_after.setter def stop_all_after(self, _stop_all_after): self._stop_all_after = _stop_all_after @staticmethod
[docs] def get_parallel_barrier(worker, reach): if not reach: return Barrier.threads_blocked_at[worker] for job in worker.job_stack[::-1]: if not hasattr(job, "barrier"): continue if job.barrier is None: job.barrier = Barrier(job, worker) return job.barrier else: log.error("Could not find a parallel job attached to this barrier...") return None
@my_gdb.Listed @my_gdb.Numbered @my_gdb.Dicted
[docs]class CriticalJob(Job): def __init__(self, worker, parallel_job): Job.__init__(self) self.parallel_job = parallel_job self.visitor = None self.visitors = set() self.queueing = [] parallel_job.critical.append(self) self.say(self) worker.create(self)
[docs] def try_enter(self, worker): assert worker not in self.queueing worker.say("TRY TO ENTER {} ({}/{} queueing)".format(self, "empty" if self.visitor is None else "busy", len(self.queueing))) self.queueing.append(worker)
[docs] def entered(self, worker): assert worker in self.queueing worker.say("INSIDE {}".format(self)) worker.work(self, start=True) del self.queueing[self.queueing.index(worker)] self.visitor = worker self.visitors.add(worker)
[docs] def left(self, worker): if is_attached: if not worker in self.queueing: self.try_enter(worker) if not self.visitor is worker: self.entered(worker) assert self.visitor is worker self.visitor = None worker.work(self, stop=True) worker.say("OUTSIDE {} ({} queuing, {}/{} passed)"\ .format(self, len(self.queueing), len(self.visitors), self.parallel_job.count)) if not self.queueing and len(self.visitors) == self.parallel_job.count: self.completed()
[docs] def completed(self): self.parallel_job.critical.remove(self) self.say("/{}".format(self))
@staticmethod
[docs] def get_parallel_critical_zone(worker, pc=None, depth=-1, no_new=False): try: job = worker.job_stack[depth] if isinstance(job, CriticalJob): return job for job in reversed(worker.job_stack): if not isinstance(job, ParallelJob): continue for critical_job in job.critical: assert critical_job is not None assert isinstance(critical_job, CriticalJob) if worker not in critical_job.visitors or worker is critical_job.visitor: return critical_job else: assert not no_new return CriticalJob(worker, job) else: assert not "Could not find the critical job or parent parallel zone..." except Exception as e: if is_attached: if len(CriticalJob.list_) == 1: return CriticalJob.list_[0] raise(e)
@my_gdb.Numbered
[docs]class Dependence(aspect.Tracker): addr_to_dep = {} # address --> Dependence def __init__(self, address, is_taskwait_inbound=None): self.inouts = [] self.is_taskwait_inbound = is_taskwait_inbound #self.properties = {} if is_taskwait_inbound is not None: # Taskwait self.name = "Taskwait {}".format("inbound" if is_taskwait_inbound else "outbound") self.address = -1 else: try: int(address, 16) #int() can't convert non-string with explicit base # Scalar self.address = address self.name = None self.symb = None Dependence.addr_to_dep[address] = self except TypeError: # Array self.array = array = address if array.start: self.address = hex(int(array.start)) else: log.critical("No start adress for {}".format(array)) self.address = 0 array.__class__.resolvers.add(self) self.name = str(array)
[docs] def add_reader(self, task, addr=None): if not self.inouts: self._add_inout() inout = self.inouts[-1] assert task is not inout.writer if hasattr(addr, "fullname"): # addr is an Array name = addr.fullname if task in inout.readers: log.warn("{} already in Dep #{} reader list ({}). Maybe a code bug?".format(task, self.number, name)) inout.reader_names[task] += " || " + name else: inout.reader_names[task] = name else: # array is a simple address assert task not in inout.readers inout.readers.append(task) return inout
[docs] def add_writer(self, task, addr=None): if not self.inouts: self._add_inout() inout = self.inouts[-1] if inout.writer or task in inout.readers: inout = self._add_inout() try: inout.writer_name = addr.fullname except AttributeError: pass # that's not an array! inout.writer = task return inout
[docs] def get_task_inout(self, task, as_reader=False, as_writer=False): for inout in reversed(self.inouts): where = [] if as_reader: where += inout.readers if as_writer: where += [inout.writer] if task in where: return inout
[docs] def set_task_inout_ready(self, task, as_reader=False, as_writer=False, val=None): inout = self.get_task_inout(task, as_reader, as_writer) if not inout: return False inout.ready = True try: self.symb.type.target() #ensure symb.type is primitive except AttributeError: # symb is None pass except: # symb type has no target (good) inout.val = str(gdb.parse_and_eval("*({}) {}".format(self.symb.type.pointer(), self.address))) return inout
[docs] def set_name(self, symb, offset): self.name = str(symb) if offset: self.name += "+{}".format(offset) self.offset = offset self.symb = symb
def _add_inout(self): inout = SimpleClass(readers=[], writer=None, ready=False, val=None, reader_names={}, writer_name=None) self.inouts.append(inout) return inout def __str__(self): return "Dep_{}_{}@{}".format(self.number, self.name if self.name else "", self.address) @staticmethod
[docs] def get_dependency(address): dep = None try: try: int(address, 16) #TypeError: int() can't convert non-string with explicit base dep = Dependence.addr_to_dep[address] #KeyError except KeyError: pass except TypeError: #it's not an address pass if dep is None and _array_matching_dependency: #address can belong to array dep = _array_matching_dependency(address) return dep if dep else Dependence(address)
@staticmethod
[docs] def get_dependency_inout(address, task): dep = Dependence.get_dependency(address) return dep.get_task_inout(task)
@my_gdb.Listed @my_gdb.Numbered @my_gdb.Dicted
[docs]class TaskJob(Job): blockers = {} preferred_blocked = None last_taskwait = None def __init__(self, worker, masterTask=False, taskwait=False): Job.__init__(self) TaskJob.init_dict(self.number, self) self.completed = False self.masterTask = masterTask self.in_dependencies = [] self.inout_dependencies = [] self.out_dependencies = [] self.properties = OrderedDict() self.properties_to_propagate = {} self.is_ready = False if taskwait: TaskJob.last_taskwait = self self.taskwait = True self.set_property("definition", "taskwait") self.outbound_dep = Dependence(None, is_taskwait_inbound=False) else: self.taskwait = None self.set_property("debug_state", "created")
[docs] def set_task_source_lines(self, filename, start_line, stop_line): self.set_property("sources", "{}:{}-{}".format(filename, start_line, stop_line)) definition = None lookup_at_line = start_line while True: head = my_gdb.get_src(filename, lookup_at_line, lookup_at_line)[:-1] try: definition = head.split("#pragma omp task")[1].strip() # handle multiline task definition while head.endswith("\\"): lookup_at_line += 1 head = my_gdb.get_src(filename, lookup_at_line, lookup_at_line)[:-1] head = head.lstrip('0123456789 \t') # remove lineno and white spaces from beginning definition = "{}{}".format(definition[:-1], head) break except IndexError: # look a few lines before LOOK_N_LINES_BEFORE = 5 lookup_at_line -= 1 if start_line - lookup_at_line > LOOK_N_LINES_BEFORE: log.warn("Could not find task definition around {}:{}".format(filename, start_line)) break if definition is None: definition = "<could not parse task definition>" self.set_property("definition", definition)
[docs] def start(self, worker): #assert self.completed is False worker.say("INSIDE {}".format(self)) worker.work(self, start=True) self.completed = worker for dep in self.in_dependencies+self.inout_dependencies: inout = dep.get_task_inout(self, as_reader=True) if inout.writer: #assert inout and inout.ready if not inout and inout.ready: log.warning("Dependency {}/{} seems not ready ...".format(dep, inout)) else: #log.warning("We don't know the writer for {}/{} ...".format(dep, inout)) pass if self.properties.get("debug_state", None) == "blocked by the debugger": blocker = TaskJob.blockers[TaskJob.preferred_blocked] if blocker.lazy: def exec_later(): if blocker.must_stop: if gdb.selected_thread().is_running(): push_stop_request("Please block, thread is running ...") my_gdb.before_prompt(exec_later) return if blocker.block(self): self.set_property("debug_state", "actively blocked by the debugger") log.info("[{} actively blocked by the debugger.]".format(self)) if blocker.must_stop: gdb.execute("cont") else: log.warning("Blocking of {} failed...".format(self)) push_stop_request() my_gdb.late_function_breakpoint_execution(exec_later) else: self.set_property("debug_state", "running") self.set_property("worker", worker) self.set_property("executed_by", worker)
[docs] def restart(self): self.set_property("debug_state", "running")
[docs] def is_not_schedulable(self): # put here any reasons that can block tasks return any([self.properties.get("__blocked", False), "unblocked" not in str(self.properties.get("debug_state", "")) \ and "blocked" in str(self.properties.get("debug_state", "")), "finished" in str(self.properties.get("debug_state", "")), "finished" in str(self.properties.get("ayu_state", ""))])
@staticmethod
[docs] def detect_lock(): schedulable_tasks = [task for task in TaskJob.list_ \ if not task.is_not_schedulable()] # if no task can be schedule if not schedulable_tasks: # check if not all of them are finished not_finished = [task for task in TaskJob.list_ \ if task.properties.get("debug_state", None) != "finished"] else: # there are schedulable tasks, so we're not finished not_finished = True return bool(not schedulable_tasks and not_finished)
[docs] def propagate_property(self, key, parent, children): """ set key->parent to self and key->children to all dependent tasks """ self.set_property(key, parent) if children is not None: self.properties_to_propagate[key] = (parent, children) else: self.properties_to_propagate[key] = (None, None) return to_block = [self] while to_block: task = to_block.pop() for dep in task.inout_dependencies+task.out_dependencies: inout = dep.get_task_inout(task, as_writer=True) if not inout: continue assert inout.writer is task for dep_task in inout.readers: try: # don't happpen to to_block if prop is already set old_val = dep_task.properties[key] # may thow a KeyError if old_val == children: continue dep_task.properties["__{}".format(key)] = old_val except KeyError: pass # ignore dep_task.set_property(key, children) to_block.append(dep_task)
@staticmethod
[docs] def repropagate_properties(): properties_to_reset = set() tasks = [] # collect tasks with properties to propage and their name for task in TaskJob.list_: if not task.properties_to_propagate: continue tasks.append(task) properties_to_reset.update(task.properties_to_propagate.keys()) # reset properties to propagate for task in TaskJob.list_: for prop in properties_to_reset: try: old_val = task.properties["__{}".format(prop)] # may throw KeyError task.set_property(prop, old_val + " (unblocked)") del task.properties["__{}".format(prop)] del task.properties_to_propagate[prop] except KeyError: pass for task in tasks: for key, (parent, children) in task.properties_to_propagate.items(): if not parent: continue task.propagate_property(key, parent, children)
[docs] def on_property_blocked(self, name, value): if not TaskJob.blockers: log.warning("Cannot block tasks, not blocker registered.") return False blocker = TaskJob.blockers[TaskJob.preferred_blocked] if value: done = True if self.properties.get("worker", None) is None and blocker.lazy: log.info("Lazy blocker, task will be blocked at the beginning of its execution.") else: active = self.properties.get("worker", None) done = blocker.block(self, active) if done: try: old_state = self.properties["debug_state"] # may throw KeyError self.properties["__debug_state"] = old_state except KeyError: pass # no old state, ignore self.propagate_property("debug_state", "blocked by the debugger", "depends of blocked task") else: if self.properties.get("__blocked", False) == True: try: blocker.unblock(self) try: old_state = self.properties["__debug_state"] # may throw KeyError del self.properties["__debug_state"] old_state += " (unblocked)" except KeyError: old_state = None # no old state self.propagate_property("debug_state", old_state, None) TaskJob.repropagate_properties() except Exception as e: log.warning("Could not unblock {}: {}".format(self, e)) else: log.warning("Cannot unblocked {}: task not blocked ({}).".format( self, self.properties.get("__blocked", "<property is missing>")))
[docs] def get_inherit_properties(self): # search parents only for dep in self.in_dependencies+self.inout_dependencies: inout = dep.get_task_inout(self, as_reader=True) if not inout.writer or not inout.writer.properties_to_propagate: continue return inout.writer.properties_to_propagate return {}
[docs] def set_property(self, name, value): if value == "None": value = None if value == "False": value = False if value == "True": value = True if name == "__blocked": self.on_property_blocked(name, value) self.properties[name] = value
[docs] def finish(self): if self.completed is True: log.error("{} seem to have completed twice ...".format(self)) return self.completed.work(self, stop=True) self.completed.say("OUTSIDE {}".format(self)) self.completed = True self.set_property("debug_state", "finished") self.set_property("worker", None) for dep in self.inout_dependencies+self.out_dependencies: done = dep.set_task_inout_ready(self, as_writer=True) assert done
[docs] def set_dependencies(self, in_dependencies, inout_dependencies, out_dependencies): deps = set() def add_dep(from_dependencies, add_to, is_in=False, is_out=False): assert is_in or is_out # or both for addr in from_dependencies: dep = Dependence.get_dependency(addr) if is_in: dep.add_reader(self, addr) if is_out: dep.add_writer(self, addr) add_to.append(dep) deps.add(dep) add_dep(in_dependencies, self.in_dependencies, is_in=True) add_dep(inout_dependencies, self.inout_dependencies, is_in=True, is_out=True) add_dep(out_dependencies, self.out_dependencies, is_out=True) if _get_dependency_name: for dep in deps: if dep.name is not None: continue name = _get_dependency_name(dep.address) if not name: continue symb, offset, frame = name dep.set_name(symb, offset) for key, (parent_val, child_val) in self.get_inherit_properties().items(): log.critical("{}: inherit from {} {}/{}".format(self, key, parent_val, child_val)) try: # save old self.properties["__{}".format(key)] = self.properties[key] log.error("saved {}={} on {}".format(key, self.properties["__{}".format(key)], self)) except KeyError: log.error("failed {}".format(e)) self.set_property(key, child_val) self.properties_to_propagate[key] = (parent_val, child_val)
[docs] def ready(self): if self.is_ready: return self.is_ready = True self._set_inbound_taskwait()
def _set_inbound_taskwait(self): if TaskJob.last_taskwait is None: return for dep in self.in_dependencies+self.inout_dependencies: inout = dep.get_task_inout(self, as_reader=True) if (inout and inout.writer and inout.writer.taskwait is None): # self depends of an unwaited task # so nothing to do break else: # self is not connected to an unwaited task # so connect it to the last taskwait dep = TaskJob.last_taskwait.outbound_dep dep.add_writer(TaskJob.last_taskwait) dep.add_reader(self) dep.set_task_inout_ready(self, as_reader=True)
[docs] def do_taskwait(self): assert self.taskwait is True self.set_property("debug_state", "running") tw_dep = Dependence(None, is_taskwait_inbound=True) for task in [t for t in TaskJob.list_ if t.taskwait is None]: task.taskwait = self for dep in task.inout_dependencies+task.out_dependencies: inout = dep.get_task_inout(task, as_writer=True) if inout and inout.readers: # `task` is connected break else: if task.masterTask: continue # `task` is not connected tw_dep.add_writer(task) tw_dep.add_reader(self) if task.properties.get('debug_state', None) == 'finished': tw_dep.set_task_inout_ready(self, as_reader=True)
[docs] def done_taskwait(self): self.set_property("debug_state", "finished")
@staticmethod
[docs] def master_zone(): worker = _get_current_worker() zone = TaskJob(worker, masterTask=True) zone.start(worker) zone.finish()
@staticmethod
[docs] def task_wait(worker): taskwait = TaskJob(worker, taskwait=True) taskwait.do_taskwait() return taskwait
@staticmethod
[docs] def task_wait_done(taskwait): taskwait.done_taskwait()
[docs]class Init: break_after_init = False after_init_cb = None
[docs]class DebuggerBarrier: is_available = False capture__set_barrier = None capture__release_barrier = None def __init__(self, zone, all_workers, workers=[]): self.blocked_workers = set() self.workers = {w for w in workers} self.all = all_workers self.zone = zone DebuggerBarrier.capture__set_barrier(self, zone)
[docs] def arrived(self, worker): if not self.all: assert worker in self.workers self.workers.remove(worker) self.blocked_workers.add(worker)
[docs] def completed(self): return len(self.blocked_workers) == rt_ctrl.max_threads if self.all \ else len(self.workers) == 0
[docs] def release(self, cb): DebuggerBarrier.capture__release_barrier(self, self.zone, self.blocked_workers, cb)
[docs]class RuntimeController: OMP_MAX_THREADS = None def __init__(self): self.current_barrier = None @staticmethod
[docs] def is_available(): return DebuggerBarrier.is_available
@property def max_threads(self): return RuntimeController.OMP_MAX_THREADS
[docs] def set_max_threads(self, max_threads): RuntimeController.OMP_MAX_THREADS = max_threads log.info("mcGDB OMP configured with OMP_MAX_THREADS={}".format(max_threads))
[docs] def worker_at_barrier(self, worker): if self.current_barrier is None: return False self.current_barrier[0].arrived(worker) if not self.current_barrier[0].completed(): return False self.current_barrier[0].release(self.current_barrier[1]) self.current_barrier = None return True
[docs] def set_barrier(self, zone, cb): assert self.current_barrier is None if not RuntimeController.is_available(): raise RuntimeError("OpenMP runtime controller not available ...") self.current_barrier = (DebuggerBarrier(zone, all_workers=True), cb)
rt_ctrl = RuntimeController()