Skip to content

Commit e2943d3

Browse files
gijzelaerrclaude
andcommitted
Fix CI/CD issues: mypy errors, partner port, sphinx docs
- Change partner default port from 102 to 1102 (non-privileged) - Add missing type annotations across all snap7 modules - Fix client.py read_multi_vars and write_multi_vars type handling - Use cast() for proper type narrowing in union types - Change encode_s7_data parameter type from List to Sequence - Add missing return type annotations to test methods - Fix callback type annotations (use SrvEvent instead of str) - Update example files to use correct API signatures - Update server.rst documentation for pure Python implementation 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent d4092ca commit e2943d3

21 files changed

Lines changed: 264 additions & 240 deletions

doc/API/server.rst

Lines changed: 9 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,22 @@
11
Server
22
======
33

4-
If you just need a quick server with some default values initalised, this package provides a default implementation.
5-
To use it you first need to install some aditional dependencies, using:
4+
The pure Python server implementation provides a simulated S7 server for testing.
65

7-
.. code:: bash
6+
To start a server programmatically:
87

9-
pip install python-snap7[cli]
8+
.. code:: python
109
11-
Now you can start it using one of the following commands:
10+
from snap7.server import Server, mainloop
1211
13-
.. code:: bash
12+
# Quick start with mainloop helper
13+
mainloop(tcp_port=1102)
1414
15-
python -m snap7.server
16-
# or, if your Python `Scripts/` folder is on PATH:
17-
snap7-server
18-
19-
You can optionally provide the port to be used as an argument, like this:
20-
21-
.. code:: bash
22-
23-
python -m snap7.server --port 102
15+
# Or create and configure manually
16+
server = Server()
17+
server.start(port=1102)
2418
2519
----
2620

2721
.. automodule:: snap7.server
2822
:members:
29-
30-
----
31-
32-
.. automodule:: snap7.server.__main__
33-
34-
.. autofunction:: main(port, dll)

example/boolean.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727

2828
reading = plc.db_read(31, 120, 1) # read 1 byte from db 31 staring from byte 120
2929
set_bool(reading, 0, 5, True) # set a value of fifth bit
30-
plc.db_write(reading, 31, 120, 1) # write back the bytearray and now the boolean value is changed in the PLC.
30+
plc.db_write(31, 120, reading) # write back the bytearray and now the boolean value is changed in the PLC.
3131

3232
# NOTE you could also use the read_area and write_area functions.
3333
# then you can specify an area to read from:
@@ -41,6 +41,6 @@
4141

4242
data = bytearray()
4343
set_int(data, 0, 127)
44-
plc.write_area(area=Area.MK, dbnumber=0, start=20, data=data)
44+
plc.write_area(area=Area.MK, db_number=0, start=20, data=data)
4545
# read the client source code!
4646
# and official snap7 documentation

example/example.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from db_layouts import tank_rc_if_db_layout
1010

1111
from snap7 import Client, Row, DB
12+
from snap7.type import Area
1213
from util.db import print_row
1314

1415
client = Client()
@@ -61,7 +62,8 @@ def set_row(x: int, row: Row) -> None:
6162
byte array representation of row in the PLC
6263
"""
6364
row_size = 126
64-
client.db_write(1, 4 + x * row_size, row_size, row._bytearray)
65+
assert isinstance(row._bytearray, bytearray)
66+
client.db_write(1, 4 + x * row_size, row._bytearray)
6567

6668

6769
def open_row(row: Row) -> None:
@@ -107,7 +109,7 @@ def open_and_close() -> None:
107109

108110
def set_part_db(start: int, size: int, _bytearray: bytearray) -> None:
109111
data = _bytearray[start : start + size]
110-
client.db_write(1, start, size, data)
112+
client.db_write(1, start, data)
111113

112114

113115
# def write_data_db(dbnumber, all_data, size):
@@ -126,7 +128,7 @@ def open_and_close_db1() -> None:
126128
# set_part_db(4+x*126, 126, all_data)
127129

128130
t = time.time()
129-
client.write_area(1, all_data, 4 + 126 * 450)
131+
client.write_area(Area.DB, 1, 4, all_data)
130132
print(f"opening all valves took: {time.time() - t}")
131133

132134
print("sleep...")
@@ -138,7 +140,7 @@ def open_and_close_db1() -> None:
138140
print(time.time() - t)
139141

140142
t = time.time()
141-
client.write_area(1, all_data, 4 + 126 * 450)
143+
client.write_area(Area.DB, 1, 4, all_data)
142144
print(f"closing all valves took: {time.time() - t}")
143145

144146

snap7/client.py

Lines changed: 66 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
import logging
88
import struct
99
import time
10-
from typing import List, Any, Optional, Tuple, Union, Callable
10+
from typing import List, Any, Optional, Tuple, Union, Callable, cast
1111
from datetime import datetime
1212
from ctypes import (
1313
c_int,
@@ -91,6 +91,10 @@ def __init__(self, lib_location: Optional[str] = None, **kwargs: Any):
9191
# Session password
9292
self.session_password: Optional[str] = None
9393

94+
# Execution time tracking
95+
self._exec_time = 0
96+
self.last_error = 0
97+
9498
# Parameter storage
9599
self._params = {
96100
Parameter.LocalPort: 0,
@@ -116,6 +120,12 @@ def __init__(self, lib_location: Optional[str] = None, **kwargs: Any):
116120

117121
logger.info("S7Client initialized (pure Python implementation)")
118122

123+
def _get_connection(self) -> ISOTCPConnection:
124+
"""Get connection, raising if not connected."""
125+
if self.connection is None:
126+
raise S7ConnectionError("Not connected to PLC")
127+
return self.connection
128+
119129
def connect(self, address: str, rack: int, slot: int, tcp_port: int = 102) -> "Client":
120130
"""
121131
Connect to S7 PLC.
@@ -267,8 +277,7 @@ def read_area(self, area: Area, db_number: int, start: int, size: int) -> bytear
267277
Returns:
268278
Data read from area
269279
"""
270-
if not self.get_connected():
271-
raise S7ConnectionError("Not connected to PLC")
280+
conn = self._get_connection()
272281

273282
start_time = time.time()
274283

@@ -286,10 +295,10 @@ def read_area(self, area: Area, db_number: int, start: int, size: int) -> bytear
286295
# Build and send read request
287296
request = self.protocol.build_read_request(area=s7_area, db_number=db_number, start=start, word_len=word_len, count=size)
288297

289-
self.connection.send_data(request)
298+
conn.send_data(request)
290299

291300
# Receive and parse response
292-
response_data = self.connection.receive_data()
301+
response_data = conn.receive_data()
293302
response = self.protocol.parse_response(response_data)
294303

295304
# Extract data from response - pass item count, not byte count
@@ -311,8 +320,7 @@ def write_area(self, area: Area, db_number: int, start: int, data: bytearray) ->
311320
Returns:
312321
0 on success
313322
"""
314-
if not self.get_connected():
315-
raise S7ConnectionError("Not connected to PLC")
323+
conn = self._get_connection()
316324

317325
start_time = time.time()
318326

@@ -332,18 +340,18 @@ def write_area(self, area: Area, db_number: int, start: int, data: bytearray) ->
332340
area=s7_area, db_number=db_number, start=start, word_len=word_len, data=bytes(data)
333341
)
334342

335-
self.connection.send_data(request)
343+
conn.send_data(request)
336344

337345
# Receive and parse response
338-
response_data = self.connection.receive_data()
346+
response_data = conn.receive_data()
339347
response = self.protocol.parse_response(response_data)
340348

341349
# Check for write errors
342350
self.protocol.check_write_response(response)
343351
self._exec_time = int((time.time() - start_time) * 1000)
344352
return 0
345353

346-
def read_multi_vars(self, items: Union[List[dict], "Array[S7DataItem]"]) -> Tuple[int, Any]:
354+
def read_multi_vars(self, items: Union[List[dict[str, Any]], "Array[S7DataItem]"]) -> Tuple[int, Any]:
347355
"""
348356
Read multiple variables in a single request.
349357
@@ -358,34 +366,36 @@ def read_multi_vars(self, items: Union[List[dict], "Array[S7DataItem]"]) -> Tupl
358366

359367
# Handle S7DataItem array (ctypes)
360368
if hasattr(items, "_type_") and hasattr(items[0], "Area"):
361-
# This is a ctypes array of S7DataItem
362-
for item in items:
363-
area = Area(item.Area)
364-
db_number = item.DBNumber
365-
start = item.Start
366-
size = item.Amount
369+
# This is a ctypes array of S7DataItem - use cast for type safety
370+
s7_items = cast("Array[S7DataItem]", items)
371+
for s7_item in s7_items:
372+
area = Area(s7_item.Area)
373+
db_number = s7_item.DBNumber
374+
start = s7_item.Start
375+
size = s7_item.Amount
367376
data = self.read_area(area, db_number, start, size)
368377

369378
# Copy data to pData buffer
370-
if item.pData:
379+
if s7_item.pData:
371380
for i, b in enumerate(data):
372-
item.pData[i] = b
381+
s7_item.pData[i] = b
373382

374383
return (0, items)
375384

376385
# Handle dict list
386+
dict_items = cast(List[dict[str, Any]], items)
377387
results = []
378-
for item in items:
379-
area = item["area"]
380-
db_number = item.get("db_number", 0)
381-
start = item["start"]
382-
size = item["size"]
388+
for dict_item in dict_items:
389+
area = dict_item["area"]
390+
db_number = dict_item.get("db_number", 0)
391+
start = dict_item["start"]
392+
size = dict_item["size"]
383393
data = self.read_area(area, db_number, start, size)
384394
results.append(data)
385395

386396
return (0, results)
387397

388-
def write_multi_vars(self, items: Union[List[dict], List[S7DataItem]]) -> int:
398+
def write_multi_vars(self, items: Union[List[dict[str, Any]], List[S7DataItem]]) -> int:
389399
"""
390400
Write multiple variables in a single request.
391401
@@ -400,27 +410,29 @@ def write_multi_vars(self, items: Union[List[dict], List[S7DataItem]]) -> int:
400410

401411
# Handle S7DataItem list (ctypes)
402412
if hasattr(items[0], "Area"):
403-
for item in items:
404-
area = Area(item.Area)
405-
db_number = item.DBNumber
406-
start = item.Start
407-
size = item.Amount
413+
s7_items = cast(List[S7DataItem], items)
414+
for s7_item in s7_items:
415+
area = Area(s7_item.Area)
416+
db_number = s7_item.DBNumber
417+
start = s7_item.Start
418+
size = s7_item.Amount
408419

409420
# Extract data from pData
410421
data = bytearray(size)
411-
if item.pData:
422+
if s7_item.pData:
412423
for i in range(size):
413-
data[i] = item.pData[i]
424+
data[i] = s7_item.pData[i]
414425

415426
self.write_area(area, db_number, start, data)
416427
return 0
417428

418429
# Handle dict list
419-
for item in items:
420-
area = item["area"]
421-
db_number = item.get("db_number", 0)
422-
start = item["start"]
423-
data = item["data"]
430+
dict_items = cast(List[dict[str, Any]], items)
431+
for dict_item in dict_items:
432+
area = dict_item["area"]
433+
db_number = dict_item.get("db_number", 0)
434+
start = dict_item["start"]
435+
data = dict_item["data"]
424436
self.write_area(area, db_number, start, data)
425437

426438
return 0
@@ -491,13 +503,12 @@ def get_cpu_state(self) -> str:
491503
Returns:
492504
CPU state string
493505
"""
494-
if not self.get_connected():
495-
raise S7ConnectionError("Not connected to PLC")
506+
conn = self._get_connection()
496507

497508
request = self.protocol.build_cpu_state_request()
498-
self.connection.send_data(request)
509+
conn.send_data(request)
499510

500-
response_data = self.connection.receive_data()
511+
response_data = conn.receive_data()
501512
response = self.protocol.parse_response(response_data)
502513

503514
return self.protocol.extract_cpu_state(response)
@@ -674,13 +685,12 @@ def plc_stop(self) -> int:
674685
Returns:
675686
0 on success
676687
"""
677-
if not self.get_connected():
678-
raise S7ConnectionError("Not connected to PLC")
688+
conn = self._get_connection()
679689

680690
request = self.protocol.build_plc_control_request("stop")
681-
self.connection.send_data(request)
691+
conn.send_data(request)
682692

683-
response_data = self.connection.receive_data()
693+
response_data = conn.receive_data()
684694
response = self.protocol.parse_response(response_data)
685695

686696
self.protocol.check_control_response(response)
@@ -692,13 +702,12 @@ def plc_hot_start(self) -> int:
692702
Returns:
693703
0 on success
694704
"""
695-
if not self.get_connected():
696-
raise S7ConnectionError("Not connected to PLC")
705+
conn = self._get_connection()
697706

698707
request = self.protocol.build_plc_control_request("hot_start")
699-
self.connection.send_data(request)
708+
conn.send_data(request)
700709

701-
response_data = self.connection.receive_data()
710+
response_data = conn.receive_data()
702711
response = self.protocol.parse_response(response_data)
703712

704713
self.protocol.check_control_response(response)
@@ -710,13 +719,12 @@ def plc_cold_start(self) -> int:
710719
Returns:
711720
0 on success
712721
"""
713-
if not self.get_connected():
714-
raise S7ConnectionError("Not connected to PLC")
722+
conn = self._get_connection()
715723

716724
request = self.protocol.build_plc_control_request("cold_start")
717-
self.connection.send_data(request)
725+
conn.send_data(request)
718726

719-
response_data = self.connection.receive_data()
727+
response_data = conn.receive_data()
720728
response = self.protocol.parse_response(response_data)
721729

722730
self.protocol.check_control_response(response)
@@ -944,11 +952,10 @@ def iso_exchange_buffer(self, data: bytearray) -> bytearray:
944952
Returns:
945953
Response PDU data
946954
"""
947-
if not self.get_connected():
948-
raise S7ConnectionError("Not connected to PLC")
955+
conn = self._get_connection()
949956

950-
self.connection.send_data(bytes(data))
951-
response = self.connection.receive_data()
957+
conn.send_data(bytes(data))
958+
response = conn.receive_data()
952959
return bytearray(response)
953960

954961
# Convenience methods for specific memory areas
@@ -1409,11 +1416,12 @@ def set_param(self, param: Parameter, value: int) -> int:
14091416

14101417
def _setup_communication(self) -> None:
14111418
"""Setup communication and negotiate PDU length."""
1419+
conn = self._get_connection()
14121420
request = self.protocol.build_setup_communication_request(max_amq_caller=1, max_amq_callee=1, pdu_length=self.pdu_length)
14131421

1414-
self.connection.send_data(request)
1422+
conn.send_data(request)
14151423

1416-
response_data = self.connection.receive_data()
1424+
response_data = conn.receive_data()
14171425
response = self.protocol.parse_response(response_data)
14181426

14191427
if response.get("parameters"):

0 commit comments

Comments
 (0)