Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 19 additions & 29 deletions edg/abstract_parts/IoController.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from itertools import chain
from typing import List, Dict, Tuple, Type, Optional, Any, Union, Callable, cast
from typing import List, Dict, Tuple, Type, Optional, Any, Union, Callable
from typing_extensions import override
from deprecated import deprecated

Expand Down Expand Up @@ -125,24 +125,23 @@ def _export_ios_inner(
assert isinstance(self, GeneratorBlock), "transforms require a GeneratorBlock to work"
assert self._elaboration_state in (BlockElaborationState.generate,), "transforms can only run in generate()"

assigns_raw = self.get(self.pin_assigns)
# mutated in-place during _make_export_*
assigns = cast(List[Optional[str]], assigns_raw.copy())
assign_index_by_name = {assign.split("=")[0]: i for i, assign in enumerate(assigns_raw)}
assigns_dict: Optional[Dict[str, str]] = {}
assert assigns_dict is not None
for assign in self.get(self.pin_assigns):
key, value = assign.split("=")
assigns_dict[key] = value
else:
assigns = None
assign_index_by_name = {}
assigns_dict = None

def connect_port_transformed(self_io: BasePort, inner_io: BasePort, name: str) -> None:
assert transform_fn is not None and assigns is not None
assign_index = assign_index_by_name.get(name)
assign = assigns[assign_index] if assign_index is not None else None
def connect_port_transformed(self_io: BasePort, inner_io: Callable[[], BasePort], name: str) -> None:
assert transform_fn is not None and assigns_dict is not None
assign = assigns_dict.get(name, None)
transform_result = transform_fn(self_io, assign)
if transform_result is not None:
self.connect(transform_result, inner_io)
self.connect(transform_result, inner_io())
else:
if assign_index is not None:
assigns[assign_index] = None
if assign is not None:
del assigns_dict[name]

inner_ios_by_type = {self._type_of_io(io_port): io_port for io_port in inner._io_ports}
for self_io in self._io_ports:
Expand All @@ -162,17 +161,17 @@ def connect_port_transformed(self_io: BasePort, inner_io: BasePort, name: str) -
self_io.defined()
for io_requested in self.get(self_io.requested()):
self_io_elt = self_io.append_elt(self_io.elt_type().empty(), io_requested)
connect_port_transformed(self_io_elt, inner_io.request(io_requested), io_requested)
connect_port_transformed(self_io_elt, lambda: inner_io.request(io_requested), io_requested)
elif isinstance(inner_io, Port):
if transform_fn is None:
self.connect(self_io, inner_io)
else:
connect_port_transformed(self_io, inner_io, self_io._name_from(self))
connect_port_transformed(self_io, lambda: inner_io, self_io._name_from(self))
else:
raise NotImplementedError(f"unknown port type {self_io}")

if assigns is not None:
filtered_assigns = [assign for assign in assigns if assign is not None]
if assigns_dict is not None:
filtered_assigns = [f"{key}={value}" for key, value in assigns_dict.items()]
return ArrayStringExpr._to_expr_type(filtered_assigns)
else:
return self.pin_assigns
Expand Down Expand Up @@ -267,20 +266,11 @@ def _instantiate_from(
class BaseIoControllerPinmapGenerator(BaseIoController, GeneratorBlock):
"""BaseIoController with generator code to set pin mappings"""

def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.generator_param(self.pin_assigns)

@override
def contents(self) -> None:
super().contents()
for io_port in self._io_ports: # defined in contents() so subclass __init__ can define additional _io_ports
if isinstance(io_port, Vector):
self.generator_param(io_port.requested())
elif isinstance(io_port, Port):
self.generator_param(io_port.is_connected())
else:
raise NotImplementedError(f"unknown port type {io_port}")
self.generator_param(self.pin_assigns)
self._generator_param_all_ios() # defined in contents() so subclass __init__ can define additional _io_ports

def _system_pinmap(self) -> Dict[str, Union[Passive, HasPassivePort]]:
"""Implement me. Defines the fixed pin mappings from pin number to port."""
Expand Down
2 changes: 2 additions & 0 deletions edg/abstract_parts/IoControllerExportable.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
from typing import List, Optional, TypeVar, cast, Any

from deprecated import deprecated
from typing_extensions import override

from ..electronics_model import *
from .IoController import BaseIoController


@non_library
@deprecated("use explicit BaseIoController._export_ios_inner")
class BaseIoControllerExportable(BaseIoController, GeneratorBlock):
"""BaseIoController wrapper (this is a BaseIoController, which wraps another BaseIoController)
which automatically exports my IOs from the internal IOs in an extensible way (additional connects
Expand Down
34 changes: 33 additions & 1 deletion edg/abstract_parts/PinMappable.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,36 @@ def resource_pin(resource: BasePinMapResource) -> List[str]:

return PinMapUtil(remapped_resources, self.transforms)

def filter_pins(self, allowed_pins: List[str]) -> "PinMapUtil":
"""Returns a new PinMapUtil with only the specified pins kept.
If the allowed_pins list is empty, returns the input (all pins kept).
allowed_pins may be specified as a pin name or pin number."""

if not allowed_pins:
return self

def filter_resource(resource: BasePinMapResource) -> Optional[BasePinMapResource]:
if isinstance(resource, PinResource):
if resource.pin in allowed_pins or resource.pinname in allowed_pins:
return resource
else:
return None
elif isinstance(resource, PeripheralFixedPin):
# only keep if the entire peripheral can be pinned
for key, pin in resource.inner_allowed_pins.items():
if pin not in allowed_pins and resource.inner_pinnames[key] not in allowed_pins:
return None
return resource
elif isinstance(resource, BaseDelegatingPinMapResource):
return resource
else:
raise NotImplementedError(f"unknown resource {resource}")

filtered_resources_raw = [filter_resource(resource) for resource in self.resources]
filtered_resources = [resource for resource in filtered_resources_raw if resource is not None]

return PinMapUtil(filtered_resources, self.transforms)

@staticmethod
def _resource_port_types(resource: BasePinMapResource) -> List[Type[Port]]:
if isinstance(resource, PinResource):
Expand All @@ -340,7 +370,9 @@ def _resource_port_types(resource: BasePinMapResource) -> List[Type[Port]]:
@staticmethod
def _resource_names(resource: BasePinMapResource) -> List[str]:
if isinstance(resource, PinResource):
return [resource.pin] + [resource_name for resource_name, model in resource.name_models.items()]
return [resource.pin, resource.pinname] + [
resource_name for resource_name, model in resource.name_models.items()
]
elif isinstance(resource, (PeripheralFixedPin, PeripheralAnyResource, PeripheralFixedResource)):
return [resource.name]
else:
Expand Down
50 changes: 50 additions & 0 deletions edg/abstract_parts/test_pinmappable.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,56 @@ def test_assign_remapped(self) -> None: # fully user-specified
self.assertIn(AllocatedResource(ain_model, "AIO4", "P4", "4"), allocated)
self.assertIn(AllocatedResource(ain_model, "AIO5", "P5", "5"), allocated)

def test_assign_filtered(self) -> None: # fully user-specified
dio_model = DigitalBidir()
allocated = (
PinMapUtil(
[
PinResource("P1", {"PIO1": dio_model}),
PinResource("P2", {"PIO2": dio_model}),
PinResource("P3", {"PIO3": dio_model}),
]
)
.remap_pins(
{
"P1": "1",
"P2": "2",
"P3": "3",
}
)
.filter_pins(["P1", "P3"]) # test both pin name and pin number
.allocate([(DigitalBidir, ["DIO1", "DIO3"])])
)
self.assertIn(AllocatedResource(dio_model, "DIO1", "P1", "1"), allocated)
self.assertIn(AllocatedResource(dio_model, "DIO3", "P3", "3"), allocated)

def test_assign_filtered_empty(self) -> None:
dio_model = DigitalBidir()
allocated = (
PinMapUtil(
[
PinResource("P1", {"PIO1": dio_model}),
PinResource("P2", {"PIO2": dio_model}),
]
)
.filter_pins([])
.allocate([(DigitalBidir, ["DIO1", "DIO2"])])
)
self.assertIn(AllocatedResource(dio_model, "DIO1", "P1", "P1"), allocated)
self.assertIn(AllocatedResource(dio_model, "DIO2", "P2", "P2"), allocated)

def test_assign_filtered_overflow(self) -> None:
dio_model = DigitalBidir()
with self.assertRaises(AutomaticAllocationError):
PinMapUtil(
[
PinResource("1", {"PIO1": dio_model}),
PinResource("3", {"PIO3": dio_model}),
]
).filter_pins(
["1"]
).allocate([(DigitalBidir, ["DIO1", "DIO2"])])

def test_assign_mixed(self) -> None: # mix of user-specified and automatic assignments, assuming greedy algo
dio_model = DigitalBidir()
ain_model = AnalogSink()
Expand Down
Loading
Loading