import gdb
from mcgdb.toolbox import my_gdb
from mcgdb.model.task import representation
[docs]class DMAControllerLink: #(p2012_mon.P2012Link):
def __init__(self, host_ep, comp_ep, size, nb):
representation.P2012Link.__init__(self)
self.add_endpoint(host_ep)
self.add_endpoint(comp_ep)
self.host = host_ep
self.comp = comp_ep
self.size = size
self.nb = nb
self.messages = []
[docs] def details(self):
return "..."
[docs] def get_messages(self):
return self.messages
#################
[docs] def do_pull(self):
pass
[docs] def finish_pull(self):
msg = self.host.produce_message()
if msg is not None:
msg.check_breakpoint("%s/%s" % (self.host.comm_entity,
self.host.name),
"%s/%s" % (self.comp.comm_entity,
self.comp.name))
self.comp.consume_message(msg)
stop_next, stop_next_msgs = self.comp.get_stop_next(msg)
my_gdb.push_stop_requests(stop_next, stop_next_msgs)
#################
[docs] def do_push(self):
msg = self.comp.produce_message()
if msg is not None:
msg.check_breakpoint("%s/%s" % (self.comp.comm_entity,
self.comp.name), "DMA")
self.messages.insert(0, msg)
stop_next, stop_next_msgs = self.comp.get_stop_next(msg)
my_gdb.push_stop_requests(stop_next, stop_next_msgs)
[docs] def do_waitTransfers(self):
pass
[docs] def finish_waitTransfers(self):
while len(self.messages) != 0:
msg = self.messages.pop()
if msg is not None and msg.breakpoint:
msg.check_breakpoint("DMA", "%s/%s" % (self.host.comm_entity,
self.host.name))
self.host.consume_message(msg)
[docs]class P2012HostDMAEndpoint: #(p2012_mon.P2012HostEndpoint):
def __init__(self, id_, type_):
p2012_mon.P2012HostEndpoint.__init__(self, id_, type_)
self.baseAddr = 0
self.increment = 0
def __str__(self):
details = ""
if self.baseAddr is not 0:
details = "[%s+%d]" % (hex(self.baseAddr)[:-1], self.increment)
return "%s%s" % (p2012_mon.P2012HostEndpoint.__str__(self), details)
[docs]class P2012HostDMAPullEndpoint: #(P2012HostDMAEndpoint):
def __init__(self, id_):
P2012HostDMAEndpoint.__init__(self, id_, "DMAPull")
[docs] def consume_message(self, msg):
self.comm_entity.consume_message(self, self.get_message(msg))
self.baseAddr += self.increment
stop_next, stop_next_msgs = self.get_stop_next(msg)
my_gdb.push_stop_requests(stop_next, stop_next_msgs)
[docs] def get_message(self, msg):
return None
[docs]class P2012HostDMAPushEndpoint: #(P2012HostDMAEndpoint):
def __init__(self, id_):
P2012HostDMAEndpoint.__init__(self, id_, "DMAPush")
[docs] def produce_message(self):
msg = self.do_produce_message()
self.baseAddr += self.increment
stop_next, stop_next_msgs = self.get_stop_next(msg)
my_gdb.push_stop_requests(stop_next, stop_next_msgs)
return msg
[docs] def do_produce_message(self):
"""Default implementation
Returns the array read by the DMA as a int list string
"""
msg = str(gdb.parse_and_eval("*((int *)%d)@%d" %
(self.baseAddr,
self.increment /
gdb.lookup_type("int").sizeof)))
return msg
[docs]def connect_dma_buffer(is_push_bp, buffer_id, this_compo, itf, size, nb):
if is_push_bp:
dma_class = overlay.get_overlay(P2012HostDMAPullEndpoint)
comp_interface = push_buffer.P2012ComponentPushBufferInterface
else:
dma_class = overlay.get_overlay(P2012HostDMAPushEndpoint)
comp_interface = pull_buffer.P2012ComponentPullBufferInterface
host_ep = dma_class(buffer_id)
comp_ep = comp_interface(this_compo, itf)
DMAControllerLink(host_ep, comp_ep, size, nb)