Source code for spydrnet_physical.ir.definition

import logging
import typing
from itertools import combinations

import numpy as np
import spydrnet as sdn
from spydrnet.ir import Definition as DefinitionBase
from spydrnet.ir.innerpin import InnerPin
from spydrnet.ir.outerpin import OuterPin
from spydrnet.ir.port import Port
from spydrnet_physical.global_state import global_callback as sdnphy_global_callback
from spydrnet_physical.ir.shaping_utils import shaping_utils

logger = logging.getLogger("spydrnet_logs")
try:
    import networkx as nx
except ImportError:
    logger.debug("Networks module not loaded")


if typing.TYPE_CHECKING:
    from spydrnet.ir import Definition as DefinitionSDN
    from spydrnet_physical.ir.first_class_element import (
        FirstClassElement as FirstClassElementPhy,
    )

    DefinitionBase = type("DefinitionBase", (DefinitionSDN, FirstClassElementPhy), {})

PROP = "VERILOG.InlineConstraints"


[docs]class Definition(DefinitionBase): """ Extending the definitions representation """
[docs] def __init__(self, name=None, properties=None): super().__init__(name=name, properties=properties) properties = properties or dict() self.properties["WIDTH"] = properties.get("WIDTH", 50) self.properties["HEIGHT"] = properties.get("WIDTH", 50)
@property def utilization(self): """ Return utilization of this module """ if self.data[PROP].get("AREA_UM", 0): return self.data[PROP].get("AREA_UM", 0) / self.area else: return 0 @property def area(self): shape = self.data["VERILOG.InlineConstraints"].get("SHAPE", None) if shape == "cross": a, b, c, d, e, f = self.data["VERILOG.InlineConstraints"].get( "POINTS", [0, 0, 0, 0, 0, 0] ) return ((a + f + c) * d) + ((b + d + e) * a) - (d * a) elif shape == "custom": return shaping_utils.PolyArea2D( shaping_utils.get_custom_boundary( self.data["VERILOG.InlineConstraints"].get( "POINTS", [0, 0, 0, 0, 0, 0] ) ) ) else: W = self.data["VERILOG.InlineConstraints"].get("WIDTH", 0) H = self.data["VERILOG.InlineConstraints"].get("HEIGHT", 0) return W * H def _disconnect_port(self, port): """ This method disconnects the definition port from its cable This makes the port dangling, this method is used before removing the port """ assert port in self._ports, "Port does not belong to this definition" for pin in port.pins: if pin.wire: pin.wire.disconnect_pin(pin) del pin
[docs] def remove_cable(self, cable): """Remove a cable from the definition. The cable must be a member of the definition. parameters ---------- cable - (Cable) the cable to be removed from the definition """ assert cable.definition == self, "Cable is not included in definition" # Need to turn on this # for each_wire in cable.wires: # for pins in each_wire.pins: # each_wire.disconnect_pin(pins) self._remove_cable(cable) self._cables.remove(cable)
[docs] def remove_port(self, port): """ Remove port from the definition. (Overrides the base method) parameters ---------- port: (Port) the port to be removed, must be of this definition """ assert port.definition == self, "Port is not included in definition" self._remove_port(port) self._disconnect_port(port) for pin in port.pins: port.remove_pin(pin) self._ports.remove(port)
[docs] def create_ft_ports(self, *args, **kwargs): """Alias to create_feedthroughs_ports""" return self.create_feedthroughs_ports(*args, **kwargs)
[docs] def create_feedthroughs_ports( self, cable, suffix="ft", get_port_names=lambda x: None ): """ Given the cable object it creates a feedthrough ports on this definition - The new ports names as {cable_name}_{suffix}_in and {cable_name}_{suffix}_out - Direct assignment is created beetween newly added two ports args: cable (Port): The cable for which feedthrough needs to be created suffix (str): Sufffix used for the port naming get_port_names(callable): function to return custom names get_port_names(sdn.IN or sdn.out) Returns: tuple: Feedthrough port (inport and outport) """ inport_name = get_port_names(sdn.IN) or f"{cable.name}_{suffix}_in" outport_name = get_port_names(sdn.OUT) or f"{cable.name}_{suffix}_out" inport, outport = ( self.create_port( inport_name, pins=cable.size, is_scalar=cable.is_scalar, lower_index=cable.lower_index, direction=sdn.IN, ), self.create_port( outport_name, pins=cable.size, is_scalar=cable.is_scalar, lower_index=cable.lower_index, direction=sdn.OUT, ), ) # Input port cable and output port cable int_c = self.create_cable(inport_name, wires=cable.size) out_c = self.create_cable(outport_name, wires=cable.size) assign_lib = self._get_assignment_library() assign_def = self._get_assignment_definition(assign_lib, cable.size) inst_name = f"{inport_name}_{outport_name}_ft" i = 1 while next(self.get_instances(inst_name), None): inst_name = f"{inport_name}_{outport_name}_ft" + f"_{i}" i += 1 instance = self.create_child(inst_name, reference=assign_def) int_c.connect_port(inport) int_c.connect_instance_port(instance, next(assign_def.get_ports("i"))) out_c.connect_port(outport) out_c.connect_instance_port(instance, next(assign_def.get_ports("o"))) return (inport, outport)
# TODO: Creates problem when cable is output port cable
[docs] def create_feedthrough( self, instances_list, cable, get_port_names=lambda port_dir: None, get_cable_names=lambda indx, inst: None, ): """ Creates a feedthrough for a single cable passing through list of instances The driver cable name is unchanged and newly created feedthrough cable name {cable_name}_ft_{indx} args: instances_list (list[instance]): List of instances to create feedthrough from cable (Cable): cable fro which feedthrough needs to be creared get_port_names(callable): -- get_cable_names(callable): -- Returns: list(Cable): List of newly created cables in order """ if isinstance(instances_list, sdn.Instance): instances_list = (instances_list,) assert isinstance(cable, sdn.Cable), "Cable object required" assert ( cable.definition == self ), "Cable {cable.name} does not belog to this definition" assert all( inst in self._children for inst in instances_list ), "Found inst which does not belong to this definition" cable_list = [] for indx, instance in enumerate(instances_list): inport, outport = instance.reference.create_ft_ports( cable, get_port_names=get_port_names ) cable_name = get_cable_names(indx, instance) or f"{cable.name}_ft_in_{indx}" new_cable = self.create_cable(cable_name, wires=cable.size) new_cable.connect_instance_port(instance, outport) for indx, each_w in enumerate(cable.wires): for pin in set(each_w.pins): # These are loads and if ( isinstance(pin, OuterPin) and (pin.port.direction == sdn.IN) ) or ( isinstance(pin, InnerPin) and (pin.port.direction == sdn.OUT) ): each_w.disconnect_pin(pin) new_cable.wires[indx].connect_pin(pin) cable.connect_instance_port(instance, inport) cable_list.append(new_cable) return cable_list
def create_parallel_feedthrough(self, instances_list): # Same instances in each group assert len(set((len(inst) for _, inst in instances_list))) == 1 # Create ports ft_port_seq = [] for definition in set(inst.reference for inst in instances_list[0][1]): ft_port_seq.append( definition.create_feedthroughs_ports(instances_list[0][0], suffix="ft") ) # Create connection between instances for cable, inst_list in instances_list: # disconnect and store load pins store_load_pins = [] for wire in cable.wires: store_load_pins.append(tuple(wire.loads())) for pin in list(wire.loads()): wire.disconnect_pin(pin) cable.connect_instance_port(inst_list[0], ft_port_seq[0][0]) for indx, inst in enumerate(inst_list): new_cable = cable.clone() new_cable.name = f"{inst.name}_{cable.name}" self.add_cable(new_cable) new_cable.connect_instance_port(inst, ft_port_seq[indx][1]) for indx, wire in enumerate(new_cable.wires): for pin in store_load_pins[indx]: wire.connect_pin(pin) return new_cable
[docs] def create_ft_multiple(self, *args, **kwargs): """Alias to create_feedthrough_multiple""" return self.create_feedthrough_multiple(*args, **kwargs)
[docs] def create_feedthrough_multiple(self, instances_list): """ This creates feedthough from list of instances on multiple locations Expects the list of tuples in following format parameters ---------- instances_list: (Cable, (inst1, inst1, . . . .instn) """ cable, inst_tuple = instances_list # Check if first item is Cable assert isinstance(cable, sdn.Cable), "Cable object required" # Check if cable belognt to the same definition assert ( cable.definition == self ), "Cable {cable.name} does not belog to thos definition" # Compare instance list is from same reference for instance in inst_tuple: assert isinstance( instance, sdn.Instance ), "Found {type(instance) in the instances list}" for definition in set(inst.reference for inst in inst_tuple): definition.create_feedthroughs_ports(cable, suffix="ft") for indx, each_inst in enumerate(inst_tuple): cable_name = f"{cable.name}_{each_inst.name}_ft" if indx == 0: port = next(each_inst.get_ports(f"{cable.name}_ft_in")) cable.connect_instance_port(each_inst, port) else: new_cable = self.create_cable(cable_name, wires=cable.size) outport = next(inst_tuple[indx - 1].get_ports(f"{cable.name}_ft_out")) inport = next(each_inst.get_ports(f"{cable.name}_ft_in")) new_cable.connect_instance_port(inst_tuple[indx - 1], outport) new_cable.connect_instance_port(each_inst, inport) if indx == len(inst_tuple) - 1: cable_name = f"{cable.name}_out" new_cable = self.create_cable(cable_name, wires=cable.size) outport = next(each_inst.get_ports(f"{cable.name}_ft_out")) new_cable.connect_instance_port(each_inst, outport) for indx, wire in enumerate(cable.wires): for eachpin in wire.loads(): eachpin.wire.disconnect_pin(eachpin) new_cable.wires[indx].connect_pin(eachpin)
[docs] def merge_multiple_instance( self, instances_list_tuple, new_definition_name=None, pin_map=None ): """ This method can merge multiple group of instances having same order of reference definition. First pair of the instances_list_tuple is used to create new definition and that is reused while grouping remaining group of instances args: instances_list_tuple = [(inst_1, inst_2, ...., inst_n), <instance_name>] new_definition_name (str) = Name for the new definition pin_map (Callable) = Function of dictionary to rename pins """ main_def = None instance_list = [] for instances_list, instance_name in instances_list_tuple: new_def, new_inst, _ = self.merge_instance( instances_list, new_definition_name=f"{new_definition_name}_{instance_name}", new_instance_name=instance_name, pin_map=pin_map, ) instance_list.append(new_inst) if not main_def: # If this is first merge copy the newly created definition main_def = new_def main_def.name = new_definition_name else: new_inst.reference = main_def self.library.remove_definition(new_def) return main_def, instance_list
@staticmethod def _call_merged_instance(new_mod, new_instance, instances_list): # def_data = instances_list[0].data["VERILOG.InlineConstraints"] # if def_data: outline = [] new_mod.data[PROP]["AREA"] = 0 new_mod.data[PROP]["AREA_UM"] = 0 for each in instances_list: shape = each.reference.data[PROP].get("SHAPE", None) if shape == "rect": outline.extend(shaping_utils._convert_rect_to_pt(each)) if shape == "cross": outline.extend(shaping_utils._convert_cross_to_pt(each)) new_mod.data[PROP]["AREA"] += each.reference.data[PROP].get("AREA", 0) new_mod.data[PROP]["AREA_UM"] += each.reference.data[PROP].get("AREA_UM", 0) LOC_X = min([each.data[PROP].get("LOC_X", 0) for each in instances_list] or [0]) LOC_Y = min([each.data[PROP].get("LOC_Y", 0) for each in instances_list] or [0]) new_instance.data[PROP]["LOC_X"] = LOC_X new_instance.data[PROP]["LOC_Y"] = LOC_Y if outline: shape, points = shaping_utils.get_shapes_outline(outline) new_instance.reference.properties["SHAPE"] = shape if shape == "cross": new_instance.reference.properties["POINTS"] = points if shape == "custom": new_instance.reference.properties[ "POINTS" ] = shaping_utils.points_to_path(points) if shape == "rect": new_instance.reference.properties["WIDTH"] = points[0] new_instance.reference.properties["HEIGHT"] = points[1] shaping_utils._interpret_custom_to_shape(new_instance.reference) logger.debug( f"{new_instance.name} " + f"[{new_instance.reference.name:15}]" + f"[{new_instance.reference.properties['SHAPE']:15}]" + f"[{new_instance.reference.properties['WIDTH']:15}]" + f" {shape} {points}" ) # TODO: Try to break this method
[docs] def merge_instance( self, instances_list, new_definition_name="", new_instance_name="", pin_map=None ): """ Merges the list of instances to unique definition args: instances_list (List(Instance)): List of instances to be merged new_definition_name : Name of the new definition created new_instance_name : Name of the new instance created pin_map (Callable, Dict) : External function to map new pin name based in definition and instance name get_pin_name(<definition_name:<str>, <pin_name:<str>, <instance_name:<str>) returns: (Definition, Instance, Dict) """ rename_map = {} # Stores the final rename map # ====== Input Sanity checks for i, each_module in enumerate(instances_list): assert isinstance(each_module, sdn.Instance), ( "Modulelist contains none non-intance object " + "[%s] at location %d " % (type(each_module), i) ) if pin_map: if isinstance(pin_map, dict): pin_map_copy = pin_map def pin_map(x, y, _): return pin_map_copy.get(x, {}).get(y, {}) if not callable(pin_map): print( "pin_map argument should be dictionary or function, " + f"received {type(pin_map)}" ) # ====== Create a new definition if not new_definition_name: new_def_name = ( "_".join([each.reference.name for each in instances_list]) + "_merged" ) print(f"Inferred definition name {new_def_name} ") else: new_def_name = new_definition_name new_mod = self.library.create_definition(name=new_def_name) # ===== Create instance of the definition if not new_instance_name: new_instance_name = f"{new_def_name}_1" merged_module = self.create_child(name=new_instance_name, reference=new_mod) # ===== Interate over each module and create new module for index, eachM in enumerate(instances_list): rename_map[eachM.reference.name] = {} rename_map[eachM.reference.name][index] = {} currMap = rename_map[eachM.reference.name][index] IntInst = new_mod.create_child(name=eachM.name, reference=eachM.reference) # Iterate over each port of current instance for p in eachM.get_ports(): pClone = p.clone() # It copied all pins, wires and cables for eachSuffix in [""] + [f"_{i}" for i in range(1000)]: newName = pClone.name + eachSuffix if not len(list(new_mod.get_ports(newName))): break newCable = new_mod.create_cable( name=newName, is_downto=pClone.is_downto, is_scalar=pClone.is_scalar, lower_index=pClone.lower_index, ) # Create connection inside new definition for eachPClone, eachP in zip(pClone.pins, p.pins): w = newCable.create_wire() w.connect_pin(eachPClone) w.connect_pin(IntInst.pins[eachP]) pClone.change_name(newName) new_mod.add_port(pClone) currMap[p.name] = newName for eachPin in p.pins: inst_out_pin = eachM.pins[eachPin] conWire = inst_out_pin.wire instPin = merged_module.pins[pClone.pins[eachPin.index()]] if conWire: conWire.connect_pin(instPin) conWire.disconnect_pin(inst_out_pin) newCable.wires[eachPin.index()].connect_pin(inst_out_pin) self.remove_child(eachM) sdnphy_global_callback._call_merged_instance( new_mod, merged_module, instances_list ) self._call_merged_instance(new_mod, merged_module, instances_list) return new_mod, merged_module, rename_map
[docs] def OptPins(self, pins=lambda x: True, dry_run=False, merge=True, absorb=True, remove_unconn=False): """ This method optimizes the definitions pins bu inspecting all the instances of the definition parameters ---------- dry_run: Just performs the dryrun and list the pins which can be merged or absorbed pins: only consider specific pins, provide filter function absorb: if two pins are only connected to each other they will be absorbed and internal connection will be made merge: if two pins are connected to each other and few other instances, one of the pin will be absorbed and other will exist remove_unconn: Remove unconnected pins """ duplicatePins = [] # Set of all pins which can be merged or absorbed absorbPins = [] # Subset of duplicate pins unused_ports = [] # Subset of duplicate pins defPort = list([x for x in self.get_ports() if pins(x.name)]) # Iterate over all the ports pairs of the definition for fromPort, toPort in combinations(defPort, 2): if len(fromPort.pins) == len(toPort.pins): # Compare only when port has same width sameNet = True # Flag to detect boh ports are connected to same cable singleWire = True for eachPin1, eachPin2 in zip(fromPort.pins, toPort.pins): for eachInst in self.references: eachPin1 = eachInst.pins[eachPin1] eachPin2 = eachInst.pins[eachPin2] if (eachPin1.wire is None) or (eachPin2.wire is None): sameNet = False break elif not (eachPin1.wire == eachPin2.wire): sameNet = False break elif singleWire: if eachPin1.wire: singleWire = set(eachPin1.wire.pins) == set( (eachPin1, eachPin2) ) else: singleWire = False if sameNet: # Check if frompin exist in the previous pairs already_paired = next( ( dupliPins for dupliPins in duplicatePins if fromPort in dupliPins ), None, ) if already_paired: if not toPort in already_paired: already_paired.append(toPort) else: portPair = [fromPort, toPort] duplicatePins.append(portPair) if singleWire: absorbPins.append(portPair) if remove_unconn: for ports in defPort: is_wired = False for eachInst in self.references: if not all([(eachInst.pins[pin].wire is None) for pin in ports.pins]): is_wired = True break if not is_wired: unused_ports.append(ports) if not dry_run: for ports in duplicatePins[::-1]: ports.sort(key=lambda x: {sdn.IN: "1", sdn.OUT: "0"}[x.direction]) for eachP1Pin in ports[0].pins: ww = eachP1Pin.wire for eachPort in ports[1:]: # Remove all internal connection wwP2 = eachPort.pins[eachP1Pin.index()].wire for eachPin in wwP2.pins: if isinstance(eachPin, sdn.OuterPin): eachPin.wire.disconnect_pin(eachPin) # Selects pins connected to the instance ww.connect_pin(eachPin) for eachPort in ports[1:]: self.remove_cable(eachPort.pins[0].wire.cable) self.remove_port(eachPort) logger.debug(f"Merged Ports {ports[0].name}<-{eachPort.name}") if ports in absorbPins: self.remove_port(ports[0]) logger.debug(f"Absorbed port {ports[0].name}") if remove_unconn: for eachPort in unused_ports: self.remove_port(eachPort) return duplicatePins if merge else absorbPins if absorb else None
# TODO : Need to consider floating paraters
[docs] def OptWires(self, no_load=True, no_driver=False, floating=True): """ List the wires which can be optimised based on different constraints parameters ---------- no_load: (bool) (Default: True) Wires without load pin no_driver: (bool) (Default: False) Wires without drivers pin floating: (bool) (Default: True) Wires without any connection to pin """ for cable in self.get_cables(): for wire in cable.wires: if len(wire.pins) < 2: print(f"{wire} Wire an be trimmed ")
# def OptInstances(self, checkInputs=True, checkOutputs=True): # ''' # ''' # for c in self.children: # for p in c.pins.values(): # # if (p.port.direction == Port.Direction.IN) and not checkInputs: # # continue # # if (p.port.direction == Port.Direction.OUT) and not checkOutputs: # # continue # if p.wire: # break # else: # print(f"{p._instance} Instance an be trimmed ") def _get_assignment_library(self): """ Returns assignment library from the netlist of this definition. If the assignment library is missing it will create new and return """ assert self.library, "Library is not defined for the definition" assert self.library.netlist, "netlist is not defined for the library definition" netlist = self.library.netlist assign_library = next(netlist.get_libraries("SDN_VERILOG_ASSIGNMENT"), None) if assign_library is None: logger.info("Missing SDN_VERILOG_ASSIGNMENT libarary Creating new") assign_library = netlist.create_library(name="SDN_VERILOG_ASSIGNMENT") return assign_library def _get_assignment_definition(self, assign_library, size): """ Returns assignment definition for the give size parameters ---------- assign_library = ``SDN_VERILOG_ASSIGNMENT`` library size = (int) Size of the assignment block """ assign_def_name = f"SDN_VERILOG_ASSIGNMENT_{size}" definition = next(assign_library.get_definitions(assign_def_name), None) if definition is None: definition = assign_library.create_definition(name=assign_def_name) p_in = definition.create_port("i", pins=size) p_out = definition.create_port("o", pins=size) p_in.direction = p_in.Direction.IN p_out.direction = p_out.Direction.OUT cable = definition.create_cable("through", wires=size) cable.connect_port(p_in) cable.connect_port(p_out) return definition
[docs] def duplicate_port(self, port, port_name=None): """ Duplicates existing port in the definition Uses assign block beetween to short two cables If post in output port ``assign new_port = initial_port`` else ``assign initial_port = new_port`` parameters ---------- port: (Port) Port of this definition port_name: (str) Options New port name (default {port_name}_dup) """ assert isinstance(port, sdn.Port), f"Required Port but found {type(port)}" assert self.library, "Library is not defined for the definition" assert self.library.netlist, "netlist is not defined for the library definition" new_port_name = port_name if port_name else f"{port.name}_dup" new_port = self.create_port( new_port_name, direction=port.direction, is_scalar=port.is_scalar, lower_index=port.lower_index, is_downto=port.is_downto, ) new_port.create_pins(port.size) new_cable = self.create_cable( new_port_name, is_scalar=port.is_scalar, lower_index=port.lower_index, is_downto=port.is_downto, wires=port.size, ) port_cable = port.pins[0].wire.cable assign_library = self._get_assignment_library() definition = self._get_assignment_definition(assign_library, new_cable.size) instance = self.create_child( f"SDN_ASSIGNMENT_{port.name}_{new_port.name}", reference=definition ) if port.is_output: port_cable.connect_instance_port(instance, next(definition.get_ports("i"))) new_cable.connect_instance_port(instance, next(definition.get_ports("o"))) else: new_cable.connect_instance_port(instance, next(definition.get_ports("i"))) port_cable.connect_instance_port(instance, next(definition.get_ports("o"))) return new_port
[docs] def combine_cables(self, new_cable_name, cables, quiet=False): """ Combines multiple cables to new cable. Helpful in creating Bus/Vector from scalar wires parameters ---------- new_cable_name: (str) New cable name cables: (list[Cable]) List of cables quiet: (bool) Do not raise error if no cables are passed """ if len(cables) == 0: if quiet: return None else: assert False, "No cables provided" for c in cables[1:]: assert isinstance( c, sdn.Cable ), f"combine_cables can combine only cable found {type(c)}" assert ( self == c.definition ), f"all ports to combine should belong to same definition" assert cables.count(c) == 1, f"Cable defined multiple times {c.name}" newCable = self.create_cable(new_cable_name, is_scalar=False) for c in cables[::-1]: for wire in c.wires[::-1]: c.remove_wire(wire) newCable.add_wire(wire) self.remove_cable(c) return newCable
[docs] def combine_ports(self, port_name, ports, is_downto=False): """ This method can combine multiple input or output ports togther to create a bus structure. It does create a cable for internal wires, but does not change anything about the external wire connection ports[0] will be newport[0] default properties will be used for creating a new port args: port_name (str) : Name of the new port ports (list[Ports]) : List of ports to combine return: (new_port, new_cable) : return new port and internal cable """ direction = ports[0].direction for p in ports[1:]: assert isinstance( p, Port ), f"combine_ports can combine Ports found {type(p)}" assert ( direction == p.direction ), f"combine_ports combines only input or output ports, \ found {type(p.direction)}" assert ( self == p.definition ), f"all ports to combine should belong to same definition" new_port = self.create_port(port_name, direction=direction, is_downto=is_downto) new_cable = self.create_cable( port_name, is_scalar=new_port.is_scalar, is_downto=is_downto ) port_list = [] for p in ports: port_list.append(p.name) newPin = new_port.create_pin() pp = p.pins[0] ppWire = pp.wire # Switch Instances connection for instance in self.references: if instance.pins[pp].is_connected: instance.pins[pp].wire.connect_pin(instance.pins[newPin]) if ppWire: # Switch Internal Wire ppWire.connect_pin(newPin) self.remove_cable(ppWire.cable) ppWire.cable.remove_wire(ppWire) new_cable.add_wire(ppWire) logger.debug(f"Removing port {p.name}") self.remove_port(p) logger.debug( f"Combined with {new_port.name} " + f"created cable {new_cable.name}" ) logger.debug(f"{new_port.name} <- {port_list}") return new_port, new_cable
def create_unconn_wires(self): unconn_cable = self.create_cable("unconn") w = unconn_cable.create_wire() # dummy wire for instance in self.children: for pin in instance.get_port_pins(instance.reference.ports): if not pin.wire: w = unconn_cable.create_wire() w.connect_pin(pin) def merge_ports(self, new_port_name, port_sequence, reverse=False): new_port = self.create_port(new_port_name) new_cable = self.create_cable(new_port_name) for port in list(port_sequence): for pin in list(port.pins)[::-1]: wire = pin.wire port._pins.remove(pin) pin._port = None new_port.add_pin(pin) wire._cable = None new_cable.add_wire(wire) self.remove_cable(next(self.get_cables(port.name))) self.remove_port(port) new_port.direction = port.direction return new_port
[docs] def split_port(self, port): """ Split the given port Args: port (Port): Definition port to split into independent pins """ if isinstance(port, str): port = next(self.get_ports(port)) cable = next(self.get_cables(port.name)) for indx, pin in enumerate(port.pins[::-1]): new_port = self.create_port(f"{port.name}_{indx}", direction=port.direction) new_cable = self.create_cable(f"{port.name}_{indx}") cable.remove_wire(pin.wire) new_cable.add_wire(pin.wire) pin._port = None port._pins.remove(pin) new_port.add_pin(pin) self.remove_port(port) self.remove_cable(cable)
[docs] def flatten_instance(self, instance): """Flatterns single instance in the given definition""" assert isinstance(instance, sdn.Instance), "Argument not a Instance" assert instance in self.children, "Instance is not part of current definition" cable_map = {} # recreating internal cables on top level for cable in instance.reference.get_cables(): if not cable.is_port_cable: new_cable = cable.clone() new_cable.name = instance.name + "_" + new_cable.name self.add_cable(new_cable) cable_map[cable] = new_cable # recreating sub instance on the top level and create connections for sub_instance in instance.reference.children: new_instance = sub_instance.clone() new_instance.name = instance.name + "_" + new_instance.name self.add_child(new_instance) # create connection, iteratre over each port of sub-instance for port in sub_instance.reference.ports: for pin in sub_instance.get_port_pins(port.name): if not pin.wire: # skip if sub-instance pin is not connected continue if pin.wire.cable.is_port_cable: # if the pin wire is connected to instance port pin_top = next( filter(lambda x: isinstance(x, sdn.InnerPin), pin.wire.pins) ) pin_top = instance.pins[pin_top] if not pin_top.wire: # skip if instance pin is not connected continue wire = pin_top.wire else: # internal wire wire = cable_map[pin.wire.cable].wires[pin.wire.index()] wire.connect_pin(new_instance.pins[pin]) self.remove_child(instance)
[docs] def get_connectivity_network(self, get_weights=None, split_ports=False): """ This method converts current module into networkx graph each cell is represented as as node and nets are represented as edges - Netowrkx should be installed - Higher fanout nets are represented with independent edge from driver to each load """ assert "nx" not in dir(), "Netowrkx library not installed" get_weights = get_weights or (lambda x: 1) def get_node_name(pin): if isinstance(pin, sdn.OuterPin): return pin.instance.name else: if split_ports and (pin.port.size > 1): return f"{pin.port.name}_{pin.get_verilog_index}" else: return pin.port.name # Variables graph = nx.DiGraph() node_map = {} edges = [] elabel = [] # Create Port Nodes first node_indx = 0 logger.debug(f"Found {len(self.ports)} ports") for port in self.ports: for pin in port.pins: name = get_node_name(pin) graph.add_node( node_indx, label=name, weight=get_weights(pin), shape="rect", port=True, node_name=port.name, ) node_map[name] = node_indx node_indx += 1 if not split_ports: break # Create Instances Nodes logger.debug(f"Found {len(self.children)} instances") for instance in self.children: name = instance.name graph.add_node( node_indx, port=False, weight=get_weights(instance), node_name=instance.name, label=instance.name, ) node_map[instance.name] = node_indx node_indx += 1 # Create edges logger.debug(f"Found {len(list(self.get_cables()))} nets") for cable in list(self.get_cables()): for wire in cable.wires: # Skip adding edge if there is no driver if not wire.get_driver(): logger.debug(f"No driver found for {cable.name}") continue # Get driver [source node] # TODO: Consider multiple dirvers here driver_inst = get_node_name(wire.get_driver()[0]) # Make connection from drivers to each load for p in wire.pins: node = get_node_name(p) if node == driver_inst: continue edges.append((node_map[driver_inst], node_map[node])) elabel.append(f"{cable.name}_{wire.get_verilog_index}") logger.debug(f"Adding {len(set(edges))} edges") for edge in set(edges): weight = edges.count(edge) edge_name = elabel[edges.index(edge)] graph.add_edge( *edge, label=f"[{weight}]", edge_name=edge_name, weight=float(weight) ) return graph
def _remove_child(self, child): """ Internal function for dissociating a child instance from the definition. """ super()._remove_child(child=child) for pin in list(child.get_port_pins(child.get_ports())): if pin.wire: pin.wire.disconnect_pin(pin)
[docs] def make_instance_unique(self, instance, new_name, instance_list=()): """clone the definition and point the reference to the new definition""" assert instance in self.children, "Isntance is not part of this definition" reference = instance.reference lib = instance.reference.library index = lib.definitions.index(reference) new_def = instance.reference.clone() if instance.reference.name is not None: name = instance.reference.name new_def.name = new_name or (name + "_new") lib.add_definition(new_def, index + 1) instance.reference = new_def for each in instance_list: try: next(self.get_instances(each)).reference = new_def except StopIteration: logger.exception("%s instance not found during uniquifying", each) return new_def
[docs] def add_buffer(self, wire, buffer, instance_name, ports=("A", "Y"), new_cable_name=None): """ Adds buffer on the given net args: wire (sdn.Wire, sdn.Cable): Cable or a wire to be buffered buffer (str): instance_name (sdn.Instance): ports tuple(str, str): """ pre_buffer_w = new_cable_name or f"{instance_name}_pre_buffer" if isinstance(wire, sdn.Cable): for each_wire in wire.wires: self.add_buffer(each_wire, buffer, instance_name, ports) return # Instantiate buffer buffer = ( next(self.get_definitions(buffer)) if isinstance(buffer, str) else buffer ) buffer_inst = self.create_child(name=instance_name, reference=buffer) a_pin = next(buffer_inst.get_port_pins(ports[0])) y_pin = next(buffer_inst.get_port_pins(ports[1])) if wire.cable.is_port_cable: driver_pin = list( filter(lambda x: isinstance(x, sdn.InnerPin), wire.cable.wires[0].pins) )[0] if driver_pin.port.direction == sdn.IN: # if buffering input net load_pins = list(wire.get_pins( selection="OUTSIDE", filter=lambda x: x.inner_pin.port.direction == sdn.IN, )) post_buffer_w = new_cable_name or f"{instance_name}_post_buffer" buffer_input_wire = self.create_cable(post_buffer_w, wires=1).wires[0] for pin in load_pins: if pin.wire: pin.wire.disconnect_pin(pin) buffer_input_wire.connect_pin(pin) buffer_input_wire.connect_pin(y_pin) wire.connect_pin(a_pin) logger.debug( "Added buffer in %s with instance name %s", self.name, buffer_inst.name ) return # raise NotImplementedError("Buffer on input net is not supported") driver_pin = next( wire.get_pins( selection="OUTSIDE", filter=lambda x: x.inner_pin.port.direction == sdn.OUT, ) ) driver_pin.wire.disconnect_pin(driver_pin) buffer_input_wire = self.create_cable(pre_buffer_w, wires=1).wires[0] buffer_input_wire.connect_pin(driver_pin) buffer_input_wire.connect_pin(a_pin) wire.connect_pin(y_pin) logger.debug( "Added buffer in %s with instance name %s", self.name, buffer_inst.name )
# def sanity_check_cables(self): # allWires = list(self.get_wires()) # for eachCables in self.get_cables(): # ww = eachCables.wires # assert eachCables.is_scalar == (len(ww) == 1), \ # f"Wrong is_scalar attribute for {eachCables.name}" # for eachWire in ww: # assert eachWire.cable == eachCables, \ # f"Wrong cable attribute on wire {eachWire} " # allWires.remove(eachWire) # assert allWires == [], "{len(allWires)} Wires are not in cables" # def sanity_check_ports(self): # allPins = list(self.get_pins()) # for eachPort in self.get_ports(): # pp = eachPort.pins # assert eachPort.is_scalar == (len(pp) == 1), \ # f"Wrong is_scalar attribute for {eachPort.name}" # for eachPin in pp: # assert eachPin.port == eachPort, \ # f"Wrong cable attribute on wire {eachPin} " # allPins.remove(eachPin) # assert allPins == [], "{len(allPins)} Wires are not in cables"