from __future__ import print_function
import logging; log = logging.getLogger(__name__)
from collections import OrderedDict
import gdb
import mcgdb
from mcgdb.toolbox import my_gdb
from mcgdb.toolbox import aspect
from mcgdb.toolbox import SimpleClass
from mcgdb.model.task import representation as task_representation
from mcgdb.interaction import push_stop_request, cancel_stop_request
from . import toolbox
def _worker_not_impl():
raise LookupError("Current worker lookup not implemented ...")
_get_current_worker = _worker_not_impl
_worker_is_alive = _worker_not_impl
_get_dependency_name = None
_attach = None
_array_matching_dependency = None
is_attached = False
try:
from enum import Enum
[docs] class Zone(Enum):
Parallel = "parallel"
Barrier = "barrier"
Single = "single"
Critical = "critical"
except ImportError:
Zone = None
pass # do not fail now
[docs]def activate():
try:
from enum import Enum # fail now if necessary
except ImportError as e:
# not in Python2 standard library
log.error("pip2 install enum34 https://pypi.python.org/pypi/enum34")
raise e
@my_gdb.Listed
@my_gdb.Numbered
@my_gdb.Dicted
@my_gdb.Switchable
[docs]class Worker(task_representation.CommComponent, aspect.Tracker):
@staticmethod
[docs] def get_current_worker():
return _get_current_worker()
TASK_MANAGER = my_gdb.DefaultTaskManager
def __init__(self, thread_key):
task_representation.CommComponent.__init__(self)
aspect.Tracker.__init__(self)
Worker.init_dict(thread_key, self)
self.thread_key = thread_key
self.dead = False
self.job_stack = []
toolbox.say("New {}".format(self))
for worker in Worker.list_:
worker.say("Hello #{}".format(worker.numbers[Worker]))
toolbox.say()
[docs] def switch(self, force=False):
self.thread_key.switch()
[docs] def is_alive(self):
return _worker_is_alive(self)
[docs] def is_current(self):
return self is Worker.get_current_worker()
[docs] def say(self, what):
toolbox.say(what, self.numbers[Worker], len(self.list_))
[docs] def terminated(self):
self.say("/{}".format(self))
self.dead = True
self.thread_key = None
[docs] def create(self, where):
self.job_stack.append(where)
[docs] def work(self, where, start=False, stop=False):
assert start is not stop
if start:
if not self.job_stack or not self.job_stack[-1] is where:
self.job_stack.append(where)
else:
assert self.job_stack
if isinstance(where, TaskJob):
assert where in self.job_stack
self.job_stack.remove(where)
elif isinstance(where, ParallelJob):
assert where in self.job_stack
popped = None
while not popped is where:
popped = self.job_stack.pop()
else:
assert self.job_stack[-1] is where
if not (isinstance(where, ParallelJob) and self is where.parent_worker):
self.job_stack.pop()
def __str__(self):
return "Worker #{}".format(self.numbers[self.__class__])
def __repr__(self):
return "worker_{}".format(self.numbers[self.__class__])
def __lt__(self, other):
return str(self) < str(other)
@my_gdb.Listed
@my_gdb.Numbered
@my_gdb.Dicted
[docs]class Job(aspect.Tracker):
[docs] def say(self, what):
toolbox.say(what)
def __str__(self):
return "{} #{}".format(self.__class__.__name__, self.numbers[self.__class__])
def __repr__(self):
try:
return "{}_{}".format(self.__class__.__name__, self.numbers[self.__class__])
except:
return object.__repr__(self)
@my_gdb.Listed
@my_gdb.Numbered
@my_gdb.Dicted
[docs]class ParallelJob(Job):
def __init__(self, num_workers, parent_worker, section_count=0, is_for=False):
Job.__init__(self)
self.parent_worker = parent_worker
self.workers = set()
self.num_workers = num_workers
self.has_sections = section_count != 0
self.section = None
self.barrier = None
self.single = []
self.critical = []
self.count = 0
self.is_for = is_for
if self.has_sections != 0:
self.section_zone(section_count)
self.say(self)
[docs] def start_working(self, worker):
self.count += 1
assert worker not in self.workers
self.workers.add(worker)
worker.work(self, start=True)
worker.say("WORK on {}".format(self))
[docs] def stop_working(self, worker):
assert worker in self.workers
self.workers.remove(worker)
self.count -= 1
worker.say("DONE with {}".format(self))
worker.work(self, stop=True)
[docs] def completed(self):
if self.section:
self.stop_section_zone()
if self.count != 0:
log.warning("/!\\ strange zone counter detected /!\|")
log.info("(it's like {} workers are still inside ...".format(self.count))
self.say("/{}".format(self))
[docs] def start_section_zone(self, worker, count=0):
if self.section is None:
self.section = SectionJob(self, worker, count)
[docs] def stop_section_zone(self, is_parallel_job=False):
assert self.section
if self.has_sections:
self.section.completed()
def __str__(self):
return "Parallel{}Job #{}".format("For" if self.is_for else "", self.numbers[self.__class__])
@staticmethod
[docs] def work_completed(worker):
zone = ParallelJob.get_current_parallel_job(worker)
if not zone:
# zone already completed
return None
assert self.job.count == 1
assert zone is worker.job_stack[-1]
zone.stop_working(worker)
zone.completed()
return zone
@staticmethod
[docs] def get_current_parallel_job(worker=None):
if worker is None:
return ParallelJob.list_[-1] if ParallelJob.list_ else None
for job in worker.job_stack[::-1]:
if isinstance(job, ParallelJob):
return job
@my_gdb.Listed
@my_gdb.Numbered
@my_gdb.Dicted
[docs]class SectionJob(Job):
def __init__(self, parallel_job, worker, count=0):
Job.__init__(self)
self.parallel_job = parallel_job
self.section_count = count
self.sections = {}
self.ended = 0
self.has_completed = False
worker.create(self)
self.say("ZONE with {} SECTIONS".format(count))
[docs] def work_on_section(self, worker, section_id):
if section_id != 0:
assert section_id not in self.sections.keys()
self.sections[worker] = section_id
worker.say("WORK on {} {}/{}".format(self, section_id, self.section_count))
if not isinstance(worker.job_stack[-1], SectionJob):
worker.work(self, start=True)
else:
worker.say("NO MORE WORK on {}'s {} sections".format(self, self.section_count))
if isinstance(worker.job_stack[-1], SectionJob):
worker.work(self, stop=True)
[docs] def completed(self):
self.has_completed = False
self.say("/{} with {} SECTIONS".format(self, self.section_count))
@my_gdb.Listed
@my_gdb.Numbered
@my_gdb.Dicted
[docs]class SingleJob(Job):
def __init__(self, worker, parallel_job):
Job.__init__(self)
self.parallel_job = parallel_job
self.visitor = None
self.visitors = set()
self.finish = False # hugly fix ...
parallel_job.single.append(self)
parallel_job.barrier = Barrier(parallel_job, worker, single=self)
self.say(self)
[docs] def enter(self, inside, worker):
if inside:
assert self.visitor is None
worker.say("INSIDE {}".format(self))
self.visitor = worker
worker.work(self, start=True)
else:
worker.say("REJECTED from {}".format(self))
self.visitors.add(worker)
[docs] def finished(self):
self.visitor.say("OUTSIDE of {}".format(self))
self.finish = True
self.visitor.work(self, stop=True)
[docs] def completed(self):
self.parallel_job.single.remove(self)
self.say("/{}".format(self))
@staticmethod
[docs] def get_parallel_single_zone(worker):
top_job = worker.job_stack[-1]
if isinstance(top_job, SingleJob) and not worker in top_job.visitors:
return worker.job_stack[-1]
for job in reversed(worker.job_stack):
if not isinstance(job, ParallelJob): continue
for single_job in job.single:
assert single_job is not None
assert isinstance(single_job, SingleJob)
if worker not in single_job.visitors:
return single_job
else:
return SingleJob(worker, job)
else:
assert not "Could not find the single job or parent parallel zone..."
@my_gdb.Listed
@my_gdb.Numbered
@my_gdb.Dicted
[docs]class Barrier(Job, aspect.Tracker):
# threads_blocked_at[thread] = <barrier>
threads_blocked_at = {}
def __init__(self, parallel_job, worker, single=None, sections=None):
Job.__init__(self)
self.parallel_job = parallel_job
self.single = single
self.sections = sections
self.count = False
self.open = False
self.internal = single is not None or sections is not None
self._stop_all_after = False
self.loc = {} # worker -> filename:lineno
[docs] def reach_barrier(self, worker, location):
if self.count is False:
self.count = 0
self.parallel_job.say(self)
self.loc[worker] = location
if not len(set(self.loc.values())) == 1:
log.warning("Multiple barriers are being hit:")
list(map(log.warning, map(" - {}".format, self.loc.values())))
self.count += 1
Barrier.threads_blocked_at[worker] = self
if self.single and not self.single.finish \
and self.single.visitor is worker:
self.single.finished()
worker.work(self, start=True)
worker.say("BLOCKED at {} ({}/{})".format(self, self.count,
self.parallel_job.count))
[docs] def leave_barrier(self, worker):
if not self.open:
self.open = True
self.parallel_job.say("{} OPENED".format(self))
self.parallel_job.barrier = None
self.count -= 1
assert Barrier.threads_blocked_at[worker] is self
del Barrier.threads_blocked_at[worker]
worker.work(self, stop=True)
if self.count == 0:
self.completed()
[docs] def completed(self):
self.parallel_job.say("/ #{}".format(self))
if self.single:
self.single.completed()
if self.sections:
self.sections.completed()
@property
def stop_all_after(self):
return self._stop_all_after
@stop_all_after.setter
def stop_all_after(self, _stop_all_after):
self._stop_all_after = _stop_all_after
@staticmethod
[docs] def get_parallel_barrier(worker, reach):
if not reach:
return Barrier.threads_blocked_at[worker]
for job in worker.job_stack[::-1]:
if not hasattr(job, "barrier"):
continue
if job.barrier is None:
job.barrier = Barrier(job, worker)
return job.barrier
else:
log.error("Could not find a parallel job attached to this barrier...")
return None
@my_gdb.Listed
@my_gdb.Numbered
@my_gdb.Dicted
[docs]class CriticalJob(Job):
def __init__(self, worker, parallel_job):
Job.__init__(self)
self.parallel_job = parallel_job
self.visitor = None
self.visitors = set()
self.queueing = []
parallel_job.critical.append(self)
self.say(self)
worker.create(self)
[docs] def try_enter(self, worker):
assert worker not in self.queueing
worker.say("TRY TO ENTER {} ({}/{} queueing)".format(self, "empty" if self.visitor is None else "busy", len(self.queueing)))
self.queueing.append(worker)
[docs] def entered(self, worker):
assert worker in self.queueing
worker.say("INSIDE {}".format(self))
worker.work(self, start=True)
del self.queueing[self.queueing.index(worker)]
self.visitor = worker
self.visitors.add(worker)
[docs] def left(self, worker):
if is_attached:
if not worker in self.queueing:
self.try_enter(worker)
if not self.visitor is worker:
self.entered(worker)
assert self.visitor is worker
self.visitor = None
worker.work(self, stop=True)
worker.say("OUTSIDE {} ({} queuing, {}/{} passed)"\
.format(self, len(self.queueing),
len(self.visitors),
self.parallel_job.count))
if not self.queueing and len(self.visitors) == self.parallel_job.count:
self.completed()
[docs] def completed(self):
self.parallel_job.critical.remove(self)
self.say("/{}".format(self))
@staticmethod
[docs] def get_parallel_critical_zone(worker, pc=None, depth=-1, no_new=False):
try:
job = worker.job_stack[depth]
if isinstance(job, CriticalJob):
return job
for job in reversed(worker.job_stack):
if not isinstance(job, ParallelJob): continue
for critical_job in job.critical:
assert critical_job is not None
assert isinstance(critical_job, CriticalJob)
if worker not in critical_job.visitors or worker is critical_job.visitor:
return critical_job
else:
assert not no_new
return CriticalJob(worker, job)
else:
assert not "Could not find the critical job or parent parallel zone..."
except Exception as e:
if is_attached:
if len(CriticalJob.list_) == 1:
return CriticalJob.list_[0]
raise(e)
@my_gdb.Numbered
[docs]class Dependence(aspect.Tracker):
addr_to_dep = {} # address --> Dependence
def __init__(self, address, is_taskwait_inbound=None):
self.inouts = []
self.is_taskwait_inbound = is_taskwait_inbound
#self.properties = {}
if is_taskwait_inbound is not None:
# Taskwait
self.name = "Taskwait {}".format("inbound" if is_taskwait_inbound else "outbound")
self.address = -1
else:
try:
int(address, 16) #int() can't convert non-string with explicit base
# Scalar
self.address = address
self.name = None
self.symb = None
Dependence.addr_to_dep[address] = self
except TypeError:
# Array
self.array = array = address
if array.start:
self.address = hex(int(array.start))
else:
log.critical("No start adress for {}".format(array))
self.address = 0
array.__class__.resolvers.add(self)
self.name = str(array)
[docs] def add_reader(self, task, addr=None):
if not self.inouts:
self._add_inout()
inout = self.inouts[-1]
assert task is not inout.writer
if hasattr(addr, "fullname"):
# addr is an Array
name = addr.fullname
if task in inout.readers:
log.warn("{} already in Dep #{} reader list ({}). Maybe a code bug?".format(task, self.number, name))
inout.reader_names[task] += " || " + name
else:
inout.reader_names[task] = name
else:
# array is a simple address
assert task not in inout.readers
inout.readers.append(task)
return inout
[docs] def add_writer(self, task, addr=None):
if not self.inouts:
self._add_inout()
inout = self.inouts[-1]
if inout.writer or task in inout.readers:
inout = self._add_inout()
try:
inout.writer_name = addr.fullname
except AttributeError:
pass # that's not an array!
inout.writer = task
return inout
[docs] def get_task_inout(self, task, as_reader=False, as_writer=False):
for inout in reversed(self.inouts):
where = []
if as_reader: where += inout.readers
if as_writer: where += [inout.writer]
if task in where:
return inout
[docs] def set_task_inout_ready(self, task, as_reader=False, as_writer=False, val=None):
inout = self.get_task_inout(task, as_reader, as_writer)
if not inout:
return False
inout.ready = True
try:
self.symb.type.target() #ensure symb.type is primitive
except AttributeError: # symb is None
pass
except: # symb type has no target (good)
inout.val = str(gdb.parse_and_eval("*({}) {}".format(self.symb.type.pointer(), self.address)))
return inout
[docs] def set_name(self, symb, offset):
self.name = str(symb)
if offset:
self.name += "+{}".format(offset)
self.offset = offset
self.symb = symb
def _add_inout(self):
inout = SimpleClass(readers=[], writer=None, ready=False, val=None,
reader_names={}, writer_name=None)
self.inouts.append(inout)
return inout
def __str__(self):
return "Dep_{}_{}@{}".format(self.number,
self.name if self.name else "",
self.address)
@staticmethod
[docs] def get_dependency(address):
dep = None
try:
try:
int(address, 16) #TypeError: int() can't convert non-string with explicit base
dep = Dependence.addr_to_dep[address] #KeyError
except KeyError:
pass
except TypeError: #it's not an address
pass
if dep is None and _array_matching_dependency:
#address can belong to array
dep = _array_matching_dependency(address)
return dep if dep else Dependence(address)
@staticmethod
[docs] def get_dependency_inout(address, task):
dep = Dependence.get_dependency(address)
return dep.get_task_inout(task)
@my_gdb.Listed
@my_gdb.Numbered
@my_gdb.Dicted
[docs]class TaskJob(Job):
blockers = {}
preferred_blocked = None
last_taskwait = None
def __init__(self, worker, masterTask=False, taskwait=False):
Job.__init__(self)
TaskJob.init_dict(self.number, self)
self.completed = False
self.masterTask = masterTask
self.in_dependencies = []
self.inout_dependencies = []
self.out_dependencies = []
self.properties = OrderedDict()
self.properties_to_propagate = {}
self.is_ready = False
if taskwait:
TaskJob.last_taskwait = self
self.taskwait = True
self.set_property("definition", "taskwait")
self.outbound_dep = Dependence(None, is_taskwait_inbound=False)
else:
self.taskwait = None
self.set_property("debug_state", "created")
[docs] def set_task_source_lines(self, filename, start_line, stop_line):
self.set_property("sources", "{}:{}-{}".format(filename, start_line, stop_line))
definition = None
lookup_at_line = start_line
while True:
head = my_gdb.get_src(filename, lookup_at_line, lookup_at_line)[:-1]
try:
definition = head.split("#pragma omp task")[1].strip()
# handle multiline task definition
while head.endswith("\\"):
lookup_at_line += 1
head = my_gdb.get_src(filename, lookup_at_line, lookup_at_line)[:-1]
head = head.lstrip('0123456789 \t') # remove lineno and white spaces from beginning
definition = "{}{}".format(definition[:-1], head)
break
except IndexError:
# look a few lines before
LOOK_N_LINES_BEFORE = 5
lookup_at_line -= 1
if start_line - lookup_at_line > LOOK_N_LINES_BEFORE:
log.warn("Could not find task definition around {}:{}".format(filename, start_line))
break
if definition is None:
definition = "<could not parse task definition>"
self.set_property("definition", definition)
[docs] def start(self, worker):
#assert self.completed is False
worker.say("INSIDE {}".format(self))
worker.work(self, start=True)
self.completed = worker
for dep in self.in_dependencies+self.inout_dependencies:
inout = dep.get_task_inout(self, as_reader=True)
if inout.writer:
#assert inout and inout.ready
if not inout and inout.ready:
log.warning("Dependency {}/{} seems not ready ...".format(dep, inout))
else:
#log.warning("We don't know the writer for {}/{} ...".format(dep, inout))
pass
if self.properties.get("debug_state", None) == "blocked by the debugger":
blocker = TaskJob.blockers[TaskJob.preferred_blocked]
if blocker.lazy:
def exec_later():
if blocker.must_stop:
if gdb.selected_thread().is_running():
push_stop_request("Please block, thread is running ...")
my_gdb.before_prompt(exec_later)
return
if blocker.block(self):
self.set_property("debug_state", "actively blocked by the debugger")
log.info("[{} actively blocked by the debugger.]".format(self))
if blocker.must_stop:
gdb.execute("cont")
else:
log.warning("Blocking of {} failed...".format(self))
push_stop_request()
my_gdb.late_function_breakpoint_execution(exec_later)
else:
self.set_property("debug_state", "running")
self.set_property("worker", worker)
self.set_property("executed_by", worker)
[docs] def restart(self):
self.set_property("debug_state", "running")
[docs] def is_not_schedulable(self):
# put here any reasons that can block tasks
return any([self.properties.get("__blocked", False),
"unblocked" not in str(self.properties.get("debug_state", "")) \
and "blocked" in str(self.properties.get("debug_state", "")),
"finished" in str(self.properties.get("debug_state", "")),
"finished" in str(self.properties.get("ayu_state", ""))])
@staticmethod
[docs] def detect_lock():
schedulable_tasks = [task for task in TaskJob.list_ \
if not task.is_not_schedulable()]
# if no task can be schedule
if not schedulable_tasks:
# check if not all of them are finished
not_finished = [task for task in TaskJob.list_ \
if task.properties.get("debug_state", None) != "finished"]
else:
# there are schedulable tasks, so we're not finished
not_finished = True
return bool(not schedulable_tasks and not_finished)
[docs] def propagate_property(self, key, parent, children):
"""
set key->parent to self
and key->children to all dependent tasks
"""
self.set_property(key, parent)
if children is not None:
self.properties_to_propagate[key] = (parent, children)
else:
self.properties_to_propagate[key] = (None, None)
return
to_block = [self]
while to_block:
task = to_block.pop()
for dep in task.inout_dependencies+task.out_dependencies:
inout = dep.get_task_inout(task, as_writer=True)
if not inout: continue
assert inout.writer is task
for dep_task in inout.readers:
try:
# don't happpen to to_block if prop is already set
old_val = dep_task.properties[key] # may thow a KeyError
if old_val == children:
continue
dep_task.properties["__{}".format(key)] = old_val
except KeyError:
pass # ignore
dep_task.set_property(key, children)
to_block.append(dep_task)
@staticmethod
[docs] def repropagate_properties():
properties_to_reset = set()
tasks = []
# collect tasks with properties to propage and their name
for task in TaskJob.list_:
if not task.properties_to_propagate:
continue
tasks.append(task)
properties_to_reset.update(task.properties_to_propagate.keys())
# reset properties to propagate
for task in TaskJob.list_:
for prop in properties_to_reset:
try:
old_val = task.properties["__{}".format(prop)] # may throw KeyError
task.set_property(prop, old_val + " (unblocked)")
del task.properties["__{}".format(prop)]
del task.properties_to_propagate[prop]
except KeyError:
pass
for task in tasks:
for key, (parent, children) in task.properties_to_propagate.items():
if not parent: continue
task.propagate_property(key, parent, children)
[docs] def on_property_blocked(self, name, value):
if not TaskJob.blockers:
log.warning("Cannot block tasks, not blocker registered.")
return False
blocker = TaskJob.blockers[TaskJob.preferred_blocked]
if value:
done = True
if self.properties.get("worker", None) is None and blocker.lazy:
log.info("Lazy blocker, task will be blocked at the beginning of its execution.")
else:
active = self.properties.get("worker", None)
done = blocker.block(self, active)
if done:
try:
old_state = self.properties["debug_state"] # may throw KeyError
self.properties["__debug_state"] = old_state
except KeyError:
pass # no old state, ignore
self.propagate_property("debug_state",
"blocked by the debugger",
"depends of blocked task")
else:
if self.properties.get("__blocked", False) == True:
try:
blocker.unblock(self)
try:
old_state = self.properties["__debug_state"] # may throw KeyError
del self.properties["__debug_state"]
old_state += " (unblocked)"
except KeyError:
old_state = None # no old state
self.propagate_property("debug_state", old_state, None)
TaskJob.repropagate_properties()
except Exception as e:
log.warning("Could not unblock {}: {}".format(self, e))
else:
log.warning("Cannot unblocked {}: task not blocked ({}).".format(
self, self.properties.get("__blocked", "<property is missing>")))
[docs] def get_inherit_properties(self):
# search parents only
for dep in self.in_dependencies+self.inout_dependencies:
inout = dep.get_task_inout(self, as_reader=True)
if not inout.writer or not inout.writer.properties_to_propagate:
continue
return inout.writer.properties_to_propagate
return {}
[docs] def set_property(self, name, value):
if value == "None": value = None
if value == "False": value = False
if value == "True": value = True
if name == "__blocked":
self.on_property_blocked(name, value)
self.properties[name] = value
[docs] def finish(self):
if self.completed is True:
log.error("{} seem to have completed twice ...".format(self))
return
self.completed.work(self, stop=True)
self.completed.say("OUTSIDE {}".format(self))
self.completed = True
self.set_property("debug_state", "finished")
self.set_property("worker", None)
for dep in self.inout_dependencies+self.out_dependencies:
done = dep.set_task_inout_ready(self, as_writer=True)
assert done
[docs] def set_dependencies(self, in_dependencies, inout_dependencies, out_dependencies):
deps = set()
def add_dep(from_dependencies, add_to, is_in=False, is_out=False):
assert is_in or is_out # or both
for addr in from_dependencies:
dep = Dependence.get_dependency(addr)
if is_in:
dep.add_reader(self, addr)
if is_out:
dep.add_writer(self, addr)
add_to.append(dep)
deps.add(dep)
add_dep(in_dependencies, self.in_dependencies, is_in=True)
add_dep(inout_dependencies, self.inout_dependencies, is_in=True, is_out=True)
add_dep(out_dependencies, self.out_dependencies, is_out=True)
if _get_dependency_name:
for dep in deps:
if dep.name is not None:
continue
name = _get_dependency_name(dep.address)
if not name:
continue
symb, offset, frame = name
dep.set_name(symb, offset)
for key, (parent_val, child_val) in self.get_inherit_properties().items():
log.critical("{}: inherit from {} {}/{}".format(self, key, parent_val, child_val))
try:
# save old
self.properties["__{}".format(key)] = self.properties[key]
log.error("saved {}={} on {}".format(key, self.properties["__{}".format(key)], self))
except KeyError:
log.error("failed {}".format(e))
self.set_property(key, child_val)
self.properties_to_propagate[key] = (parent_val, child_val)
[docs] def ready(self):
if self.is_ready:
return
self.is_ready = True
self._set_inbound_taskwait()
def _set_inbound_taskwait(self):
if TaskJob.last_taskwait is None:
return
for dep in self.in_dependencies+self.inout_dependencies:
inout = dep.get_task_inout(self, as_reader=True)
if (inout
and inout.writer
and inout.writer.taskwait is None):
# self depends of an unwaited task
# so nothing to do
break
else:
# self is not connected to an unwaited task
# so connect it to the last taskwait
dep = TaskJob.last_taskwait.outbound_dep
dep.add_writer(TaskJob.last_taskwait)
dep.add_reader(self)
dep.set_task_inout_ready(self, as_reader=True)
[docs] def do_taskwait(self):
assert self.taskwait is True
self.set_property("debug_state", "running")
tw_dep = Dependence(None, is_taskwait_inbound=True)
for task in [t for t in TaskJob.list_
if t.taskwait is None]:
task.taskwait = self
for dep in task.inout_dependencies+task.out_dependencies:
inout = dep.get_task_inout(task, as_writer=True)
if inout and inout.readers:
# `task` is connected
break
else:
if task.masterTask:
continue
# `task` is not connected
tw_dep.add_writer(task)
tw_dep.add_reader(self)
if task.properties.get('debug_state', None) == 'finished':
tw_dep.set_task_inout_ready(self, as_reader=True)
[docs] def done_taskwait(self):
self.set_property("debug_state", "finished")
@staticmethod
[docs] def master_zone():
worker = _get_current_worker()
zone = TaskJob(worker, masterTask=True)
zone.start(worker)
zone.finish()
@staticmethod
[docs] def task_wait(worker):
taskwait = TaskJob(worker, taskwait=True)
taskwait.do_taskwait()
return taskwait
@staticmethod
[docs] def task_wait_done(taskwait):
taskwait.done_taskwait()
[docs]class Init:
break_after_init = False
after_init_cb = None
[docs]class DebuggerBarrier:
is_available = False
capture__set_barrier = None
capture__release_barrier = None
def __init__(self, zone, all_workers, workers=[]):
self.blocked_workers = set()
self.workers = {w for w in workers}
self.all = all_workers
self.zone = zone
DebuggerBarrier.capture__set_barrier(self, zone)
[docs] def arrived(self, worker):
if not self.all:
assert worker in self.workers
self.workers.remove(worker)
self.blocked_workers.add(worker)
[docs] def completed(self):
return len(self.blocked_workers) == rt_ctrl.max_threads if self.all \
else len(self.workers) == 0
[docs] def release(self, cb):
DebuggerBarrier.capture__release_barrier(self, self.zone, self.blocked_workers, cb)
[docs]class RuntimeController:
OMP_MAX_THREADS = None
def __init__(self):
self.current_barrier = None
@staticmethod
[docs] def is_available():
return DebuggerBarrier.is_available
@property
def max_threads(self):
return RuntimeController.OMP_MAX_THREADS
[docs] def set_max_threads(self, max_threads):
RuntimeController.OMP_MAX_THREADS = max_threads
log.info("mcGDB OMP configured with OMP_MAX_THREADS={}".format(max_threads))
[docs] def worker_at_barrier(self, worker):
if self.current_barrier is None:
return False
self.current_barrier[0].arrived(worker)
if not self.current_barrier[0].completed():
return False
self.current_barrier[0].release(self.current_barrier[1])
self.current_barrier = None
return True
[docs] def set_barrier(self, zone, cb):
assert self.current_barrier is None
if not RuntimeController.is_available():
raise RuntimeError("OpenMP runtime controller not available ...")
self.current_barrier = (DebuggerBarrier(zone, all_workers=True), cb)
rt_ctrl = RuntimeController()