Source code for mcgdb.model.task.environment.p2012.npm.representation.dma_link

import gdb

from mcgdb.toolbox import my_gdb
from mcgdb.model.task import representation


            
[docs]class P2012HostDMAEndpoint: #(p2012_mon.P2012HostEndpoint): def __init__(self, id_, type_): p2012_mon.P2012HostEndpoint.__init__(self, id_, type_) self.baseAddr = 0 self.increment = 0
[docs] def do_configure(self, baseAddr, increment): self.state = "configured" self.baseAddr = long(baseAddr, 16) self.increment = long(increment)
def __str__(self): details = "" if self.baseAddr is not 0: details = "[%s+%d]" % (hex(self.baseAddr)[:-1], self.increment) return "%s%s" % (p2012_mon.P2012HostEndpoint.__str__(self), details)
[docs]class P2012HostDMAPullEndpoint: #(P2012HostDMAEndpoint): def __init__(self, id_): P2012HostDMAEndpoint.__init__(self, id_, "DMAPull")
[docs] def consume_message(self, msg): self.comm_entity.consume_message(self, self.get_message(msg)) self.baseAddr += self.increment stop_next, stop_next_msgs = self.get_stop_next(msg) my_gdb.push_stop_requests(stop_next, stop_next_msgs)
[docs] def get_message(self, msg): return None
[docs]class P2012HostDMAPushEndpoint: #(P2012HostDMAEndpoint): def __init__(self, id_): P2012HostDMAEndpoint.__init__(self, id_, "DMAPush")
[docs] def produce_message(self): msg = self.do_produce_message() self.baseAddr += self.increment stop_next, stop_next_msgs = self.get_stop_next(msg) my_gdb.push_stop_requests(stop_next, stop_next_msgs) return msg
[docs] def do_produce_message(self): """Default implementation Returns the array read by the DMA as a int list string """ msg = str(gdb.parse_and_eval("*((int *)%d)@%d" % (self.baseAddr, self.increment / gdb.lookup_type("int").sizeof))) return msg
[docs]def connect_dma_buffer(is_push_bp, buffer_id, this_compo, itf, size, nb): if is_push_bp: dma_class = overlay.get_overlay(P2012HostDMAPullEndpoint) comp_interface = push_buffer.P2012ComponentPushBufferInterface else: dma_class = overlay.get_overlay(P2012HostDMAPushEndpoint) comp_interface = pull_buffer.P2012ComponentPullBufferInterface host_ep = dma_class(buffer_id) comp_ep = comp_interface(this_compo, itf) DMAControllerLink(host_ep, comp_ep, size, nb)
[docs]def configure_dma_buffer(dma_ep, base_addr, increment): assert dma_ep is not None assert isinstance(dma_ep, P2012HostDMAEndpoint) dma_ep.do_configure(base_addr, increment)