Note
Click here to download the full example code
6.3. Physical/Techmapped Partition Conn Box 02ΒΆ
This example demonstrates partitioning a tech-mapped connection box based on the mapped gates.
We first convert a tech-mapped connection box netlist to a NetworkX graph. The graph is converted to a two-dimensional array and passed to the metis library which performs the partitioning based on assigned weights.
The partitioned graph is rendered in the following SVG.
before converting netlist to a graph (including connections between shift registers chain).
Horizontal Connection Box
import glob
import logging
import os
import tempfile
from itertools import chain
from pprint import pprint
import networkx as nx
from fnmatch import fnmatch
import numpy as np
import pymetis
import pydot
import spydrnet as sdn
from networkx.drawing.nx_agraph import write_dot
from networkx.drawing.nx_pydot import to_pydot
from spydrnet_physical.util import OpenFPGA, config_chain_simple, get_names
from spydrnet_physical.util import FloorPlanViz
from spydrnet_physical.util.shell import launch_shell
from spydrnet_physical.util import write_metis_graph, run_metis
from spydrnet_physical.util import RoutingRender
logger = logging.getLogger("spydrnet_logs")
sdn.enable_file_logging(LOG_LEVEL="INFO")
def main():
# = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
# Read FPGA Netlist
# = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
proj = "../homogeneous_fabric/*_Verilog"
task = "../homogeneous_fabric/*_Task"
source_files = glob.glob(f"{proj}/lb/*.v")
source_files += glob.glob(f"{proj}/routing/*.v")
source_files += glob.glob(f"{proj}/sub_module/*.v")
source_files += glob.glob(f"{task}/CustomModules/standard_cell_primitives.v")
source_files += glob.glob(f"{proj}/fpga_top.v")
# Create OpenFPGA object
fpga = OpenFPGA(grid=(4, 4), verilog_files=source_files)
# Get cbx_1__1_ module
cb_module = next(fpga.netlist.get_definitions("cbx_1__1_"))
fpga.netlist.top_instance = cb_module
# Flattern all the mux instances
for instance in list(cb_module.get_instances("*ipin*")):
logger.debug(f"Flattening {instance.name}")
cb_module.flatten_instance(instance)
# Remove global port and signals
for signal in [
"ccff_tail",
"ccff_head",
"prog_reset",
"cfg_done",
"prog_clk",
"chanx_left_out",
"chanx_right_out",
]:
cb_module.remove_port(next(cb_module.get_ports(signal)))
cb_module.remove_cable(next(cb_module.get_cables(signal)))
# Get unwanted instances
for instance in [
"*ASSIG*",
]:
cb_module.remove_children_from(cb_module.get_instances(instance))
# Disconnect scan_chain
for dff in cb_module.get_instances("*_CCDFF_*"):
port = next(dff.get_ports("*Q*"))
if port.pins[0].wire:
port.pins[0].wire.disconnect_pin(port.pins[0])
for instance in [
"*CCDFF*",
]:
cb_module.remove_children_from(cb_module.get_instances(instance))
# Split Chanx_ports
next(cb_module.get_ports("chanx_left_in")).split()
next(cb_module.get_ports("chanx_right_in")).split()
cb_module.combine_ports(
"top_pin_I", list(cb_module.get_ports("*top_grid_*_pin_I*"))
)
cb_module.combine_ports(
"bottom_pin_I", list(cb_module.get_ports("*bottom_grid_*_pin_I*"))
)
# Get connectivity graph
G = cb_module.get_connectivity_network()
nodes = list(nx.get_node_attributes(G, "label").values())
target = nodes.index("top_pin_I")
source = nodes.index("bottom_pin_I")
G.nodes[target]["weight"] = 100
G.nodes[source]["weight"] = 100
for p1, p2 in nx.edges(G, [target, source]):
print(G[p1][p2])
G[p1][p2]["weight"] = 10
G[p1][p2]["label"] = "[10]"
for indx, node_name in enumerate(nodes):
G.nodes[indx]["label"] = f"{node_name}_[{G.nodes[indx].get('weight', 0)}]"
nodes = list(nx.get_node_attributes(G, "label").values())
vweights = nx.get_node_attributes(G, "weight")
# n_cuts, membership = pymetis.part_graph(2, adjacency=nx.to_numpy_array(G),
# vweights=vweights)
# print(f"n_cuts {n_cuts}")
# Run using external metis
write_metis_graph(
nx.to_numpy_array(G),
eweights=True,
vweights=vweights,
filename="_partition_experiments_02.csr",
)
membership = run_metis(
filename="_partition_experiments_02.csr",
cuts=2,
options="-objtype cut -minconn -niter 100 -ncuts 3 ",
)
subgraph = pydot.Cluster("part1", label="", bgcolor="#c6ecba"), pydot.Cluster(
"part2", label="", bgcolor="#f3cfcf"
)
partitions = [[], []]
graph = to_pydot(G)
for each in subgraph:
graph.add_subgraph(each)
for index, color in enumerate(membership):
node = graph.get_node(str(index))[0]
node.set_color("red" if color else "green")
# node.set_shape("rect" if node.get_label(
# ).startswith("port_") else "circle")
subgraph[color].add_node(node)
partitions[color].append(node.get_label())
for index, color in enumerate(membership):
if color:
logger.debug(graph.get_node(str(index))[0])
logger.debug("")
for index, color in enumerate(membership):
if not color:
logger.debug(graph.get_node(str(index))[0])
print("============ Partition stats =============")
f_str = "{:<15s} {:<15} {:<15}"
print(f_str.format("", "P1", "P2"))
print("==========================================")
print(
f_str.format(
"chanx_left",
len([p for p in partitions[0] if "chanx_left" in p]),
len([p for p in partitions[1] if "chanx_left" in p]),
)
)
print(
f_str.format(
"chanx_right",
len([p for p in partitions[0] if "chanx_right" in p]),
len([p for p in partitions[1] if "chanx_right" in p]),
)
)
print(
f_str.format(
"pin_I",
len([p for p in partitions[0] if "pin_I" in p]),
len([p for p in partitions[1] if "pin_I" in p]),
)
)
print(
f_str.format(
"CCDFF",
len([p for p in partitions[0] if "CCDFF" in p]),
len([p for p in partitions[1] if "CCDFF" in p]),
)
)
print(
f_str.format(
"top_grid",
len([p for p in partitions[0] if "top_grid" in p]),
len([p for p in partitions[1] if "top_grid" in p]),
)
)
print(
f_str.format(
"bottom_grid",
len([p for p in partitions[0] if "bottom_grid" in p]),
len([p for p in partitions[1] if "bottom_grid" in p]),
)
)
for side in ["top", "bottom"]:
for pin in range(10):
p1, p2 = [], []
tag = f"{side}_ipin_{pin}"
for i in range(1, 5):
p1.append(
str(len([p for p in partitions[0] if fnmatch(p, f"*{tag}*l{i}*")]))
)
p2.append(
str(len([p for p in partitions[1] if fnmatch(p, f"*{tag}*l{i}*")]))
)
print(
f_str.format(
tag,
"-".join(p1) + f" [{sum(map(int,p1))}]",
"-".join(p2) + f" [{sum(map(int,p2))}]",
)
)
with open("_graph.part.0", "w", encoding="UTF-8") as fp:
fp.write("\n".join(partitions[0]))
with open("_graph.part.1", "w", encoding="UTF-8") as fp:
fp.write("\n".join(partitions[1]))
sb11_gsb = glob.glob(f"{proj}/../FPGA44_gsb/sb_1__1_.xml")[0]
sb_render = RoutingRender("sb_1__1_", sb11_gsb)
sw_top = sb_render.report_ipins("top", show=False)
sw_top[sw_top == "x"] = "t"
sw_bottom = sb_render.report_ipins("bottom", show=False)
sw_bottom[sw_bottom == "x"] = "b"
sw = np.vstack([sw_top, sw_bottom])
sb_render.render_ipin(sw)
def pin_name_to_number(e):
return (
int(e.split("_")[-2]) * 2
if "left" in e
else (1 + int(e.split("_")[-2]) * 2)
)
top = sorted([pin_name_to_number(e) for e in partitions[0] if "chanx" in e])
bottom = sorted([pin_name_to_number(e) for e in partitions[1] if "chanx" in e])
sb_render.render_ipin(sw[:, top])
sb_render.render_ipin(sw[:, bottom])
graph.set_rankdir("LR")
graph.write_dot("_graph.dot")
graph.write_png("_graph.png")
graph.write_svg("_graph.svg")
sdn.compose(
fpga.netlist,
"_cbx_1__1_extended.v",
definition_list=["cbx_1__1_"],
skip_constraints=True,
)
sb_render.render_switch_pattern()
sb_render.render_connection_box("left", filename="_cbx_1__1_2.svg")
sb_render.render_connection_box("top", filename="_cbx_1__2_2.svg")
def left_pinmap(x):
return (top + [None, None, None, None] + bottom).index(x)
sb_render.render_connection_box(
"left", pinmap=left_pinmap, filename="_cbx_1__1_3.svg"
)
if __name__ == "__main__":
main()
Total running time of the script: ( 0 minutes 0.000 seconds)