Skip to content

Commit fa05269

Browse files
gijzelaerrclaude
andcommitted
Add symbolic (LID-based) access for S7-1200/1500 optimized DBs
S7-1200/1500 DBs with "Optimized block access" enabled (the TIA Portal V13+ default) do not use fixed byte offsets — the PLC relocates variables internally between downloads. Symbolic access navigates the PLC's symbol tree using LIDs (Local IDs) instead. Tag extensions: - access_sequence: list[int] — LID path through the symbol tree - symbol_crc: int — layout version validation (0 = skip) - is_symbolic property — True when access_sequence is set - Tag.from_access_string("8A0E0001.A", "REAL") classmethod using the S7CommPlusDriver dot-separated hex format (AccessArea.LID.LID...) S7CommPlus client: - read_symbolic(access_area, lids, symbol_crc) — GetMultiVariables with LID-based ItemAddress - write_symbolic(...) — SetMultiVariables equivalent - Module-level _build_symbolic_read/write_payload helpers s7.Client routing: - read_tag/write_tag detect Tag.is_symbolic and route to S7CommPlus symbolic access; classic byte-offset tags continue to use legacy - read_tags falls back to sequential when any tag is symbolic (batching symbolic reads via optimizer is future work) - snap7.Client.read_tag raises NotImplementedError for symbolic tags (legacy S7 has no symbolic support) Experimental — the wire implementation follows S7CommPlusDriver but has not been validated against a real optimized-DB PLC yet. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent c73d3cd commit fa05269

7 files changed

Lines changed: 386 additions & 3 deletions

File tree

CHANGES.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,11 @@ Major release: new `s7` package with S7CommPlus protocol support.
2626
* **Unified Tag API**: `client.read_tag("DB1.DBD0:REAL")` with PLC4X /
2727
Siemens STEP7 syntax, replacing the homegrown SymbolTable class.
2828
Loaders: `load_csv`, `load_json`, `load_tia_xml` return `dict[str, Tag]`
29+
* **Symbolic (LID-based) access for optimized DBs** (experimental):
30+
`Tag.from_access_string("8A0E0001.A", "REAL")` creates a symbolic Tag;
31+
`client.read_tag(tag)` routes to S7CommPlus LID-based access via the
32+
PLC's symbol tree. Required for S7-1200/1500 DBs with
33+
"Optimized block access" enabled (the TIA Portal V13+ default).
2934
* Optimizer excludes counter/timer areas from byte-range merging
3035
* Fixed `get_cpu_info` field offsets for real S7-300/1500 (thanks @qzertywsx)
3136
* Fixed `S7SZL.__str__` attribute name typo (thanks @qzertywsx)

doc/API/tags.rst

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,42 @@ Supported types
5656

5757
Arrays are supported for any fixed-size type via ``[count]`` suffix.
5858

59+
Optimized block access (S7CommPlus)
60+
------------------------------------
61+
62+
.. warning::
63+
64+
Symbolic (LID-based) access is **experimental** and requires real PLC
65+
testing. The wire-level implementation follows the S7CommPlusDriver
66+
reference but has not yet been validated against hardware.
67+
68+
S7-1200/1500 DBs with "Optimized block access" enabled (the default in
69+
TIA Portal V13+) do not use fixed byte offsets. The PLC internally
70+
relocates variables between downloads, so addresses like ``DB1.DBX0.0``
71+
are unreliable.
72+
73+
For optimized blocks, use :meth:`~snap7.tags.Tag.from_access_string`
74+
with LIDs discovered via :meth:`~s7.client.Client.browse`:
75+
76+
.. code-block:: python
77+
78+
from s7 import Client, Tag
79+
80+
client = Client()
81+
client.connect("192.168.1.10", 0, 1)
82+
83+
# Create a symbolic tag (LIDs come from browse)
84+
tag = Tag.from_access_string(
85+
"8A0E0001.A", # DB1, LID 0xA
86+
datatype="REAL",
87+
name="Motor.Speed",
88+
symbol_crc=0x12345678, # optional layout version check
89+
)
90+
91+
# Read/write via S7CommPlus symbolic access
92+
speed = client.read_tag(tag)
93+
client.write_tag(tag, 1500.0)
94+
5995
API reference
6096
-------------
6197

s7/_s7commplus_client.py

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,55 @@ def write_area(self, area_rid: int, start: int, data: bytes) -> None:
200200
response = self._connection.send_request(FunctionCode.SET_MULTI_VARIABLES, payload)
201201
_parse_write_response(response)
202202

203+
def read_symbolic(self, access_area: int, lids: list[int], symbol_crc: int = 0) -> bytes:
204+
"""Read a variable using S7CommPlus symbolic (LID-based) access.
205+
206+
.. warning:: This method is **experimental** and may change.
207+
208+
For S7-1200/1500 DBs with "Optimized block access" enabled, byte
209+
offsets are unreliable — the PLC internally relocates variables
210+
between downloads. Symbolic access navigates the PLC's symbol tree
211+
using LIDs (Local IDs) discovered via :meth:`browse`.
212+
213+
Args:
214+
access_area: Access area ID. For DBs this is
215+
``0x8A0E0000 + db_number``.
216+
lids: LID path through the symbol tree.
217+
symbol_crc: Symbol CRC for layout validation (0 = skip check).
218+
219+
Returns:
220+
Raw bytes of the variable value.
221+
"""
222+
if self._connection is None:
223+
raise RuntimeError("Not connected")
224+
225+
payload = _build_symbolic_read_payload(access_area, lids, symbol_crc)
226+
response = self._connection.send_request(FunctionCode.GET_MULTI_VARIABLES, payload)
227+
results = _parse_read_response(response)
228+
if not results or results[0] is None:
229+
raise RuntimeError("Symbolic read failed")
230+
return results[0]
231+
232+
def write_symbolic(self, access_area: int, lids: list[int], data: bytes, symbol_crc: int = 0) -> None:
233+
"""Write a variable using S7CommPlus symbolic (LID-based) access.
234+
235+
.. warning:: This method is **experimental** and may change.
236+
237+
See :meth:`read_symbolic` for context on when to use symbolic access.
238+
239+
Args:
240+
access_area: Access area ID.
241+
lids: LID path through the symbol tree.
242+
data: Raw bytes to write.
243+
symbol_crc: Symbol CRC for layout validation (0 = skip check).
244+
"""
245+
if self._connection is None:
246+
raise RuntimeError("Not connected")
247+
248+
payload = _build_symbolic_write_payload(access_area, lids, data, symbol_crc)
249+
response = self._connection.send_request(FunctionCode.SET_MULTI_VARIABLES, payload)
250+
_parse_write_response(response)
251+
203252
def explore(self, explore_id: int = 0) -> bytes:
204253
"""Browse the PLC object tree.
205254
@@ -598,6 +647,65 @@ def _build_area_write_payload(area_rid: int, start: int, data: bytes) -> bytes:
598647
return bytes(payload)
599648

600649

650+
def _build_symbolic_read_payload(access_area: int, lids: list[int], symbol_crc: int = 0) -> bytes:
651+
"""Build a GetMultiVariables payload for symbolic (LID-based) access.
652+
653+
Used for optimized block access on S7-1200/1500 where byte offsets
654+
are unreliable. The PLC navigates its symbol tree using the LIDs.
655+
656+
For DBs, ``access_sub_area`` is ``DB_VALUE_ACTUAL``. For controller
657+
areas (M/I/Q), it's ``CONTROLLER_AREA_VALUE_ACTUAL``.
658+
"""
659+
# Determine sub-area based on access_area
660+
if access_area >= 0x8A0E0000:
661+
access_sub_area = Ids.DB_VALUE_ACTUAL
662+
else:
663+
access_sub_area = Ids.CONTROLLER_AREA_VALUE_ACTUAL
664+
665+
addr_bytes, field_count = encode_item_address(
666+
access_area=access_area,
667+
access_sub_area=access_sub_area,
668+
lids=lids,
669+
symbol_crc=symbol_crc,
670+
)
671+
672+
payload = bytearray()
673+
payload += struct.pack(">I", 0)
674+
payload += encode_uint32_vlq(1) # one item
675+
payload += encode_uint32_vlq(field_count)
676+
payload += addr_bytes
677+
payload += encode_object_qualifier()
678+
payload += struct.pack(">I", 0)
679+
return bytes(payload)
680+
681+
682+
def _build_symbolic_write_payload(access_area: int, lids: list[int], data: bytes, symbol_crc: int = 0) -> bytes:
683+
"""Build a SetMultiVariables payload for symbolic (LID-based) access."""
684+
if access_area >= 0x8A0E0000:
685+
access_sub_area = Ids.DB_VALUE_ACTUAL
686+
else:
687+
access_sub_area = Ids.CONTROLLER_AREA_VALUE_ACTUAL
688+
689+
addr_bytes, field_count = encode_item_address(
690+
access_area=access_area,
691+
access_sub_area=access_sub_area,
692+
lids=lids,
693+
symbol_crc=symbol_crc,
694+
)
695+
696+
payload = bytearray()
697+
payload += struct.pack(">I", 0)
698+
payload += encode_uint32_vlq(1)
699+
payload += encode_uint32_vlq(field_count)
700+
payload += addr_bytes
701+
payload += encode_uint32_vlq(1) # item number 1
702+
payload += encode_pvalue_blob(data)
703+
payload += bytes([0x00])
704+
payload += encode_object_qualifier()
705+
payload += struct.pack(">I", 0)
706+
return bytes(payload)
707+
708+
601709
def _build_explore_payload(explore_id: int = 0) -> bytes:
602710
"""Build an EXPLORE request payload.
603711

s7/client.py

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
from snap7.client import Client as LegacyClient
1919

20+
from snap7.type import Area
21+
2022
from ._protocol import Protocol
2123
from ._s7commplus_client import S7CommPlusClient
2224

@@ -300,6 +302,91 @@ def browse(self) -> list[dict[str, Any]]:
300302
raise RuntimeError("browse() requires S7CommPlus connection")
301303
return self._plus.browse()
302304

305+
def read_tag(self, tag: Any) -> Any:
306+
"""Read a typed value by Tag or address string.
307+
308+
For symbolic tags (with ``access_sequence`` set), routes to
309+
S7CommPlus LID-based access. For classic tags (byte-offset),
310+
delegates to the legacy client.
311+
312+
Args:
313+
tag: A :class:`~snap7.tags.Tag` instance or address string.
314+
315+
Returns:
316+
The typed value.
317+
"""
318+
from snap7.tags import Tag
319+
from snap7.client import _decode_tag
320+
321+
resolved = Tag.from_string(tag) if isinstance(tag, str) else tag
322+
323+
if resolved.is_symbolic:
324+
if self._plus is None:
325+
raise RuntimeError("Symbolic tag access requires S7CommPlus connection")
326+
# Build access_area from Tag
327+
if resolved.area == Area.DB:
328+
access_area = 0x8A0E0000 + resolved.db_number
329+
elif resolved.area == Area.MK:
330+
access_area = 82
331+
elif resolved.area == Area.PE:
332+
access_area = 80
333+
elif resolved.area == Area.PA:
334+
access_area = 81
335+
else:
336+
access_area = 0x8A0E0000 + resolved.db_number
337+
data = self._plus.read_symbolic(access_area, resolved.access_sequence, resolved.symbol_crc)
338+
return _decode_tag(resolved, bytearray(data))
339+
340+
# Classic byte-offset access — delegate to legacy
341+
if self._legacy is None:
342+
raise RuntimeError("Not connected")
343+
return self._legacy.read_tag(resolved)
344+
345+
def write_tag(self, tag: Any, value: Any) -> int:
346+
"""Write a typed value by Tag or address string."""
347+
from snap7.tags import Tag
348+
from snap7.client import _encode_tag
349+
350+
resolved = Tag.from_string(tag) if isinstance(tag, str) else tag
351+
352+
if resolved.is_symbolic:
353+
if self._plus is None:
354+
raise RuntimeError("Symbolic tag access requires S7CommPlus connection")
355+
if resolved.area == Area.DB:
356+
access_area = 0x8A0E0000 + resolved.db_number
357+
elif resolved.area == Area.MK:
358+
access_area = 82
359+
elif resolved.area == Area.PE:
360+
access_area = 80
361+
elif resolved.area == Area.PA:
362+
access_area = 81
363+
else:
364+
access_area = 0x8A0E0000 + resolved.db_number
365+
buf = bytearray(resolved.size)
366+
_encode_tag(resolved, buf, value)
367+
self._plus.write_symbolic(access_area, resolved.access_sequence, bytes(buf), resolved.symbol_crc)
368+
return 0
369+
370+
# Classic — delegate to legacy
371+
if self._legacy is None:
372+
raise RuntimeError("Not connected")
373+
return self._legacy.write_tag(resolved, value)
374+
375+
def read_tags(self, tags: list[Any]) -> list[Any]:
376+
"""Read multiple tags, routing each to the appropriate protocol."""
377+
from snap7.tags import Tag
378+
379+
resolved = [Tag.from_string(t) if isinstance(t, str) else t for t in tags]
380+
# If any are symbolic, read each individually (batching symbolic
381+
# reads via the optimizer is a future enhancement)
382+
if any(t.is_symbolic for t in resolved):
383+
return [self.read_tag(t) for t in resolved]
384+
385+
# All classic — delegate to legacy for batched optimizer read
386+
if self._legacy is None:
387+
raise RuntimeError("Not connected")
388+
return self._legacy.read_tags(resolved)
389+
303390
def read_diagnostic_buffer(self) -> list[dict[str, Any]]:
304391
"""Read the PLC diagnostic buffer.
305392

snap7/client.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -801,6 +801,10 @@ def read_tag(self, tag: "Union[Tag, str]") -> Any:
801801
client.read_tag(Tag(Area.DB, 1, 0, "REAL")) # from Tag instance
802802
"""
803803
resolved = Tag.from_string(tag) if isinstance(tag, str) else tag
804+
if resolved.is_symbolic:
805+
raise NotImplementedError(
806+
"Symbolic (LID-based) tag access requires S7CommPlus. Use s7.Client instead of snap7.Client."
807+
)
804808
data = self.read_area(Area(resolved.area), resolved.db_number, resolved.byte_offset, resolved.size)
805809
return _decode_tag(resolved, bytearray(data))
806810

@@ -815,6 +819,10 @@ def write_tag(self, tag: "Union[Tag, str]", value: Any) -> int:
815819
0 on success.
816820
"""
817821
resolved = Tag.from_string(tag) if isinstance(tag, str) else tag
822+
if resolved.is_symbolic:
823+
raise NotImplementedError(
824+
"Symbolic (LID-based) tag access requires S7CommPlus. Use s7.Client instead of snap7.Client."
825+
)
818826
size = resolved.size
819827
buf = bytearray(size)
820828
# For BOOL writes, we need the current byte to preserve other bits

0 commit comments

Comments
 (0)