Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 116 additions & 2 deletions edg/abstract_parts/IoController.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from itertools import chain
from typing import List, Dict, Tuple, Type, Optional, Any, Union
from typing import List, Dict, Tuple, Type, Optional, Any, Union, Callable, cast
from typing_extensions import override
from deprecated import deprecated

Expand Down Expand Up @@ -77,7 +77,7 @@ def _type_of_io(self, io_port: BasePort) -> Type[Port]:
else:
raise NotImplementedError(f"unknown port type {io_port}")

@deprecated("use BaseIoControllerExportable")
@deprecated("use _export_ios_inner")
def _export_ios_from(self, inner: "BaseIoController", excludes: List[BasePort] = []) -> None:
"""Exports all the IO ports from an inner BaseIoController to this block's IO ports.
Optional exclude list, for example if a more complex connection is needed."""
Expand All @@ -92,6 +92,120 @@ def _export_ios_from(self, inner: "BaseIoController", excludes: List[BasePort] =
self.connect(self_ios_by_type[inner_io_type], inner_io)
self.assign(self.io_current_draw, inner.io_current_draw)

ExportPortTransform = Callable[[BasePort, Optional[str]], Optional[BasePort]]

def _export_ios_inner(
self, inner: "BaseIoController", transforms: Dict[Type[BasePort], ExportPortTransform] = {}
) -> ArrayStringExpr:
"""Exports all IOs from some inner BaseIoController.
This must be called in contents() or generate(), after IOs have been defined.

Optionally specify a transform function, by port type, on IOs.
This function takes the outer IO port and assignment name (if specified) and returns the transformed IO port.
For example, this can be used add resistors inline to USB ports.
If returned (transformed) port is None, the connection is discarded (though connections can be made
within the transform function as a side effect).
Otherwise, the returned port is connected to the inner port.
If this function is used, this must be a GeneratorBlock and the transformed port requested()s and
device pin_assigns must be generator params.

Returns the filtered pin assigns, to pass into the inner block's pin assign.

In most cases, use _wrap_inner which provides all the wrapping functionality, though
this may be useful where other logic needs to happen with parameters.
"""
from ..core.Blocks import BlockElaborationState

assert self._elaboration_state in (
BlockElaborationState.contents,
BlockElaborationState.generate,
), "can only run in contents() or generate()"

if transforms:
assert isinstance(self, GeneratorBlock), "transforms require a GeneratorBlock to work"
assert self._elaboration_state in (BlockElaborationState.generate,), "transforms can only run in generate()"

assigns_raw = self.get(self.pin_assigns)
# mutated in-place during _make_export_*
assigns = cast(List[Optional[str]], assigns_raw.copy())
assign_index_by_name = {assign.split("=")[0]: i for i, assign in enumerate(assigns_raw)}
else:
assigns = None
assign_index_by_name = {}

def connect_port_transformed(self_io: BasePort, inner_io: BasePort, name: str) -> None:
assert transform_fn is not None and assigns is not None
assign_index = assign_index_by_name.get(name)
assign = assigns[assign_index] if assign_index is not None else None
transform_result = transform_fn(self_io, assign)
if transform_result is not None:
self.connect(transform_result, inner_io)
else:
if assign_index is not None:
assigns[assign_index] = None

inner_ios_by_type = {self._type_of_io(io_port): io_port for io_port in inner._io_ports}
for self_io in self._io_ports:
self_io_type = self._type_of_io(self_io)
assert self_io_type in inner_ios_by_type, f"inner missing IO of type {self_io_type}"
inner_io = inner_ios_by_type[self_io_type]

transform_fn = transforms.get(self_io_type, None)

if isinstance(self_io, Vector):
assert isinstance(inner_io, Vector)
if transform_fn is None:
# TODO: this eats up the entire inner_io, requiring a hack to add additional items to inner_io
# using inner_io.request_vector is not yet supported
self.connect(self_io, inner_io)
else:
self_io.defined()
for io_requested in self.get(self_io.requested()):
self_io_elt = self_io.append_elt(self_io.elt_type().empty(), io_requested)
connect_port_transformed(self_io_elt, inner_io.request(io_requested), io_requested)
elif isinstance(inner_io, Port):
if transform_fn is None:
self.connect(self_io, inner_io)
else:
connect_port_transformed(self_io, inner_io, self_io._name_from(self))
else:
raise NotImplementedError(f"unknown port type {self_io}")

if assigns is not None:
filtered_assigns = [assign for assign in assigns if assign is not None]
return ArrayStringExpr._to_expr_type(filtered_assigns)
else:
return self.pin_assigns

def _wrap_inner(
self, inner: "BaseIoController", transforms: Dict[Type[BasePort], ExportPortTransform] = {}
) -> None:
"""Wraps an inner BaseIoController, a wrapper around _export_ios_inner as well as any parameters
that needs to be assigned inward or outward."""
inner_pin_assigns = self._export_ios_inner(inner, transforms)
self.assign(inner.pin_assigns, inner_pin_assigns)
self.assign(self.actual_pin_assigns, inner.actual_pin_assigns)
self.assign(self.io_current_draw, inner.io_current_draw)

def _export_tap_ios_inner(self, inner: "BaseIoController") -> None:
"""Export-taps all IO ports from some inner BaseIoController.
This must be a SubboardBlock to support the export_tap connection.
This must be called in contents() or generate(), after IOs have been defined."""
from ..core.Blocks import BlockElaborationState

assert isinstance(self, WrapperSubboardBlock)
assert self._elaboration_state in (
BlockElaborationState.contents,
BlockElaborationState.generate,
), "can only run in contents() or generate()"

inner_ios_by_type = {self._type_of_io(io_port): io_port for io_port in inner._io_ports}
for self_io in self._io_ports:
self_io_type = self._type_of_io(self_io)
assert self_io_type in inner_ios_by_type, f"inner missing IO of type {self_io_type}"
inner_io = inner_ios_by_type[self_io_type]
self.export_tap(self_io, inner_io)

@staticmethod
def _instantiate_from(
ios: List[BasePort], allocations: List[AllocatedResource]
Expand Down
5 changes: 3 additions & 2 deletions edg/abstract_parts/IoControllerMixins.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from typing import Any
from typing_extensions import override

from .IoController import IoController
Expand All @@ -11,8 +12,8 @@ class WithCrystalGenerator(IoController, GeneratorBlock):

DEFAULT_CRYSTAL_FREQUENCY: RangeLike

def __init__(self) -> None:
super().__init__()
def __init__(self, **kwargs: Any) -> None:
super().__init__(**kwargs)
self.xtal_node = self.connect() # connect this internal node to the microcontroller; this may be empty

def _crystal_required(self) -> bool:
Expand Down
53 changes: 23 additions & 30 deletions edg/abstract_parts/IoControllerProgramming.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from typing import List

from typing import List, Any
from typing_extensions import override

from ..electronics_model import *
Expand All @@ -10,52 +9,46 @@
SwdCortexTargetConnectorTdi,
)
from .IoController import IoController
from .IoControllerExportable import BaseIoControllerExportable


@non_library
class IoControllerWithSwdTargetConnector(IoController, BaseIoControllerExportable):
"""An IoController with a SWD programming header and optional SWO and TDI pins that
can be assigned to any microcontroller pin.
class IoControllerWithSwdTargetConnector(IoController, GeneratorBlock):
"""An IoController with a SWD programming header, with connected power and ground,
and optional SWO and TDI pins that are connected to the microcontroller's GPIO Vector.
These are defined with names swd_swo and swd_tdi and can be pinned with the normal pin_assigns.

By default, SWD and SWO pins are not connected, but the reset pin is connected.

This defines the interface for the SWO and TDI pin spec (passed to the pin assignment),
and instantiates a SWD target with connected power and ground. SWD must be connected by
the subclass."""
Subclasses must connect the swd_mode and reset_node to the microcontroller.
Subclasses must also ensure the internal GPIO vector is left open (not direct vector-connected).
"""

def __init__(
self, swd_swo_pin: StringLike = "NC", swd_tdi_pin: StringLike = "NC", swd_connect_reset: BoolLike = True
self,
swd_connect_swo: BoolLike = False,
swd_connect_tdi: BoolLike = False,
swd_connect_reset: BoolLike = True,
**kwargs: Any,
):
super().__init__()
self.swd_swo_pin = self.ArgParameter(swd_swo_pin)
self.swd_tdi_pin = self.ArgParameter(swd_tdi_pin)
super().__init__(**kwargs)
self.swd_connect_swo = self.ArgParameter(swd_connect_swo)
self.swd_connect_tdi = self.ArgParameter(swd_connect_tdi)
self.swd_connect_reset = self.ArgParameter(swd_connect_reset)
self.generator_param(self.swd_swo_pin, self.swd_tdi_pin, self.swd_connect_reset)
self.generator_param(self.swd_connect_swo, self.swd_connect_tdi, self.swd_connect_reset)
self.swd_node = self.connect() # connect this internal node to the microcontroller
self.reset_node = self.connect() # connect this internal node to the microcontroller

@override
def contents(self) -> None:
super().contents()
def generate(self) -> None:
super().generate()
self.swd = self.Block(SwdCortexTargetConnector())
self.connect(self.swd.gnd, self.gnd)
self.connect(self.swd.pwr, self.pwr)
self.connect(self.swd_node, self.swd.swd)

@override
def _inner_pin_assigns(self, assigns: List[str]) -> List[str]:
assigns = super()._inner_pin_assigns(assigns)
if self.get(self.swd_swo_pin) != "NC":
assigns.append(f"swd_swo={self.get(self.swd_swo_pin)}")
if self.get(self.swd_tdi_pin) != "NC":
assigns.append(f"swd_tdi={self.get(self.swd_tdi_pin)}")
return assigns

@override
def generate(self) -> None:
super().generate()
if self.get(self.swd_swo_pin) != "NC":
if self.get(self.swd_connect_swo):
self.connect(self.ic.gpio.request("swd_swo"), self.swd.with_mixin(SwdCortexTargetConnectorSwo()).swo)
if self.get(self.swd_tdi_pin) != "NC":
if self.get(self.swd_connect_tdi):
self.connect(self.ic.gpio.request("swd_tdi"), self.swd.with_mixin(SwdCortexTargetConnectorTdi()).tdi)
if self.get(self.swd_connect_reset): # reset commonly connected but not required by SWD
self.connect(self.reset_node, self.swd.with_mixin(SwdCortexTargetConnectorReset()).reset)
64 changes: 64 additions & 0 deletions edg/abstract_parts/IoControllerWrapped.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
from typing import *

from ..electronics_interfaces import *
from .IoController import BaseIoController


class IoControllerWrapped(BaseIoController):
"""Base class for IoController wrapped blocks, particularly footprints that are used
with an outer WrapperSubboardBlock to implement e.g. a dev board or module around a modeling subcircuit.

Provides some utility functions to remap pin assignments from the model to the footprint.
"""

def _remap_pinning_assigns(
self, model_pin_assigns: List[str], remapping: Dict[str, str]
) -> Tuple[Dict[str, HasPassivePort], Dict[str, str]]:
"""Given the actual pin assignments and a remapping dict, returns the pinning dict for the footprint
and the updated actual pin assignments.
Generates concrete ports elements for IO Vectors"""
pinning: Dict[str, HasPassivePort] = {}
actual_pin_assigns: Dict[str, str] = {}
seen_names: Set[str] = set()

model_pin_assigns_dict: Dict[str, str] = {}
for assign in model_pin_assigns:
name, pindef = assign.split("=")
pins = pindef.split(",")
model_pin_assigns_dict[name.strip()] = pins[0].strip() # use the GPIO name

def remap_port_recursive(port: Port, prefix: str = "") -> None:
"""Remaps a port, recursively for bundles"""
if isinstance(port, HasPassivePort):
if prefix not in model_pin_assigns_dict:
raise ValueError(f"pin {prefix} not assigned")
pin = model_pin_assigns_dict[prefix]
if pin not in remapping:
raise ValueError(f"pin {pin} not in remapping")
remapped_pin = remapping[pin]
pinning[remapped_pin] = port
actual_pin_assigns[prefix] = f"{pin}, {remapped_pin}"

for subport_name, subport in port._ports.items():
remap_port_recursive(subport, f"{prefix}.{subport_name}")

for io_port in self._io_ports:
if isinstance(io_port, Vector):
io_port.defined()
for subport_name in self.get(io_port.requested()):
assert subport_name not in seen_names, f"duplicate pin name {subport_name}"
subport = io_port.append_elt(io_port._tpe.empty(), subport_name)
remap_port_recursive(subport, subport_name)
seen_names.add(subport_name)
elif isinstance(io_port, Port):
if self.get(io_port.is_connected()):
raise NotImplementedError("TODO implement me")
else:
raise NotImplementedError(f"unknown port type {io_port}")

return pinning, actual_pin_assigns

def _remap_assigns_to_value(self, assigns: Dict[str, str]) -> List[str]:
"""Given a dict of pin assigns from _remap_pinning_assigns, returns a list of assign strings
for use in self.actual_pin_assigns"""
return [f"{name}={assign}" for name, assign in assigns.items()]
1 change: 1 addition & 0 deletions edg/abstract_parts/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@

from .IoController import BaseIoController, IoController, IoControllerPowerRequired, BaseIoControllerPinmapGenerator
from .IoControllerExportable import BaseIoControllerExportable
from .IoControllerWrapped import IoControllerWrapped
from .IoControllerInterfaceMixins import (
IoControllerSpiPeripheral,
IoControllerI2cTarget,
Expand Down
2 changes: 0 additions & 2 deletions edg/core/Blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -432,8 +432,6 @@ def _populate_def_proto_block_contents(self, pb: edgir.BlockLikeTypes, ref_map:
or self._elaboration_state == BlockElaborationState.post_generate
)

self._constraints.finalize()

for name, constraint in self._constraints.items():
constraint._populate_expr_proto(edgir.add_pair(pb.constraints, name), ref_map)

Expand Down
3 changes: 1 addition & 2 deletions edg/core/Core.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@ def add_element(self, name: str, item: Any) -> None:

# TODO should this be automatically called?
def finalize(self) -> None:
if self.closed:
return
if self.anon_prefix is None:
assert not self.anons, f"can't have unnamed objects: {self.anons}"
else:
Expand All @@ -51,6 +49,7 @@ def finalize(self) -> None:
self.names[elt] = name
self.keys_list.append(name)
self.anons = IdentitySet[ElementType]() # TODO needs clear operation
self.closed = True

def all_values_temp(self) -> Iterable[ElementType]: # TODO needs better API name, reconcile w/ values?
return list(self.container.values()) + list(self.anons)
Expand Down
2 changes: 2 additions & 0 deletions edg/core/TransformUtil.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,8 @@ def _traverse_portlike(self, context: TransformContext, elt: edgir.PortLike) ->
elif elt.HasField("array") and elt.array.HasField("ports"):
for port_pair in elt.array.ports.ports:
self._traverse_portlike(context.append_port(port_pair.name), port_pair.value)
elif elt.HasField("array") and not elt.array.HasField("ports"):
pass # array with no defined ports, effectively a no-op
else:
raise ValueError(f"_traverse_portlike encountered unknown type {elt} at {context}")

Expand Down
6 changes: 5 additions & 1 deletion edg/electronics_model/SubboardBlock.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,13 @@ def export_tap(self, exterior_port: BasePort, internal_port: BasePort) -> None:
self._export_taps.append((exterior_port, internal_port))

@override
def _populate_def_proto_hierarchy(self, pb: edgir.HierarchyBlock, ref_map: Refable.RefMapType) -> None:
def _def_to_proto(self) -> edgir.HierarchyBlock:
# create this assign after the block definition has run
self.assign(self.fp_external_blocks, [self._blocks.name_of(block) for block in self._external_blocks])
return super()._def_to_proto()

@override
def _populate_def_proto_hierarchy(self, pb: edgir.HierarchyBlock, ref_map: Refable.RefMapType) -> None:
super()._populate_def_proto_hierarchy(pb, ref_map)

for exterior_port, internal_port in self._export_taps:
Expand Down
Loading
Loading