Skip to content

Commit 1830b13

Browse files
fix: better stream reconstruction logic to handle backpressure and connectionreseterrors
1 parent 38476d4 commit 1830b13

File tree

2 files changed

+109
-99
lines changed

2 files changed

+109
-99
lines changed

examples/example.py

+7-14
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
import asyncio
22

3-
from umodbus.functions import WriteSingleCoil
4-
53
from tcp_modbus_aio.client import TCPModbusClient
6-
from tcp_modbus_aio.exceptions import ModbusCommunicationTimeoutError
4+
from tcp_modbus_aio.exceptions import (
5+
ModbusCommunicationFailureError,
6+
ModbusCommunicationTimeoutError,
7+
)
78
from tcp_modbus_aio.typed_functions import ReadCoils
89

910
DIGITAL_IN_COILS = list(range(8))
@@ -12,7 +13,7 @@
1213

1314
async def example() -> None:
1415

15-
async with TCPModbusClient("192.168.250.207") as conn:
16+
async with TCPModbusClient("192.168.250.207", enforce_pingable=False) as conn:
1617
for _ in range(1000):
1718
for digital_in_coil in DIGITAL_IN_COILS:
1819
example_message = ReadCoils()
@@ -21,21 +22,13 @@ async def example() -> None:
2122

2223
try:
2324
response = await conn.send_modbus_message(
24-
example_message, retries=0
25+
example_message, timeout=0.02
2526
)
2627
assert response is not None, "we expect a response from ReadCoils"
2728
print(response.data) # noqa: T201
2829
except ModbusCommunicationTimeoutError as e:
2930
print(f"{type(e).__name__}({e})")
30-
31-
for digital_out_coil in DIGITAL_OUT_COILS:
32-
example_message = WriteSingleCoil()
33-
example_message.address = digital_out_coil
34-
example_message.value = False
35-
36-
try:
37-
await conn.send_modbus_message(example_message, retries=0)
38-
except ModbusCommunicationTimeoutError as e:
31+
except ModbusCommunicationFailureError as e:
3932
print(f"{type(e).__name__}({e})")
4033

4134

tcp_modbus_aio/client.py

+102-85
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ class TCPModbusClient:
5757
KEEPALIVE_MAX_FAILS: ClassVar = 5
5858

5959
PING_LOOP_PERIOD: ClassVar = 1
60+
CONSECUTIVE_TIMEOUTS_TO_RECONNECT: ClassVar = 5
6061

6162
def __init__(
6263
self,
@@ -66,11 +67,13 @@ def __init__(
6667
*,
6768
logger: logging.Logger | None = None,
6869
enforce_pingable: bool = True,
70+
ping_timeout: float = 0.5,
6971
) -> None:
7072
self.host = host
7173
self.port = port
7274
self.slave_id = slave_id
7375
self.logger = logger
76+
self.ping_timeout = ping_timeout
7477

7578
# If True, will throw an exception if attempting to send a request and the device is not pingable
7679
self.enforce_pingable = enforce_pingable
@@ -82,6 +85,9 @@ def __init__(
8285
self._reader: asyncio.StreamReader | None = None
8386
self._writer: asyncio.StreamWriter | None = None
8487

88+
# Number of current consecutive modbus calls that resulted in a timeout
89+
self._consecutive_timeouts = 0
90+
8591
# Last ping time in seconds from ping loop, or None if the last ping failed
8692
self._last_ping: float | None = None
8793

@@ -132,7 +138,7 @@ def __repr__(self) -> str:
132138

133139
async def _ping_loop_task(self) -> None:
134140
while True:
135-
self._last_ping = await ping_ip(self.host)
141+
self._last_ping = await ping_ip(self.host, timeout=self.ping_timeout)
136142

137143
if self.logger is not None:
138144
self.logger.debug(f"[{self}][_ping_loop_task] ping ping ping")
@@ -143,67 +149,74 @@ async def _ping_loop_task(self) -> None:
143149
async def _get_tcp_connection(
144150
self, timeout: float | None = DEFAULT_MODBUS_TIMEOUT_SEC
145151
) -> tuple[asyncio.StreamReader, asyncio.StreamWriter]:
146-
if self._reader is None or self._writer is None:
147-
self._lifetime_tcp_connection_num += 1
152+
if self._reader is not None and self._writer is not None:
153+
return self._reader, self._writer
148154

149-
if self.logger is not None:
150-
self.logger.info(
151-
f"[{self}][_get_tcp_connection] creating new TCP connection (#{self._lifetime_tcp_connection_num})"
152-
)
155+
self._lifetime_tcp_connection_num += 1
153156

154-
try:
155-
reader, writer = await asyncio.wait_for(
156-
asyncio.open_connection(host=self.host, port=self.port), timeout
157-
)
157+
if self.logger is not None:
158+
self.logger.info(
159+
f"[{self}][_get_tcp_connection] creating new TCP connection (#{self._lifetime_tcp_connection_num})"
160+
)
158161

159-
sock: socket.socket = writer.get_extra_info("socket")
160-
161-
# Receive and send buffers set to 900 bytes (recommended by MODBUS implementation guide: this is
162-
# becuase the max request size is 256 bytes + the header size of 7 bytes = 263 bytes, and the
163-
# max response size is 256 bytes + the header size of 7 bytes = 263 bytes, so a 900 byte buffer
164-
# can store 3 frames of buffering, which is apparently the suggestion).
165-
sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 900)
166-
sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 900)
167-
168-
# Reuse address (perf optimization, recommended by MODBUS implementation guide)
169-
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
170-
171-
# Enable TCP_NODELAY (prevent small packet buffering, recommended by MODBUS implementation guide)
172-
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
173-
174-
# Enable TCP keepalive (otherwise the Adam connection will terminate after 720 (1000?) seconds
175-
# with an open idle connection: this is also recommended by the MODBUS implementation guide)
176-
#
177-
# In most cases this is not necessary because Adam commands are short lived and we
178-
# close the connection after each command. However, if we want to keep a connection
179-
# open for a long time we would need to enable keepalive.
180-
181-
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
182-
if hasattr(socket, "TCP_KEEPIDLE"):
183-
# Only available on Linux so this makes typing work cross platform
184-
sock.setsockopt(
185-
socket.IPPROTO_TCP,
186-
socket.TCP_KEEPIDLE,
187-
self.KEEPALIVE_AFTER_IDLE_SEC,
188-
)
162+
try:
163+
reader, writer = await asyncio.wait_for(
164+
asyncio.open_connection(host=self.host, port=self.port), timeout
165+
)
166+
except asyncio.TimeoutError:
167+
msg = (
168+
f"Timed out connecting to TCP modbus device at {self.host}:{self.port}"
169+
)
170+
if self.logger is not None:
171+
self.logger.warning(f"[{self}][_get_tcp_connection] {msg}")
172+
raise ModbusCommunicationTimeoutError(msg)
173+
except OSError:
174+
msg = f"Cannot connect to TCP modbus device at {self.host}:{self.port}"
175+
if self.logger is not None:
176+
self.logger.warning(f"[{self}][_get_tcp_connection] {msg}")
177+
raise ModbusNotConnectedError(msg)
178+
179+
sock: socket.socket = writer.get_extra_info("socket")
180+
181+
# Receive and send buffers set to 900 bytes (recommended by MODBUS implementation guide: this is
182+
# becuase the max request size is 256 bytes + the header size of 7 bytes = 263 bytes, and the
183+
# max response size is 256 bytes + the header size of 7 bytes = 263 bytes, so a 900 byte buffer
184+
# can store 3 frames of buffering, which is apparently the suggestion).
185+
sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 900)
186+
sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 900)
187+
188+
# Reuse address (perf optimization, recommended by MODBUS implementation guide)
189+
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
190+
191+
# Enable TCP_NODELAY (prevent small packet buffering, recommended by MODBUS implementation guide)
192+
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
193+
194+
# Enable TCP keepalive (otherwise the Adam connection will terminate after 720 (1000?) seconds
195+
# with an open idle connection: this is also recommended by the MODBUS implementation guide)
196+
#
197+
# In most cases this is not necessary because Adam commands are short lived and we
198+
# close the connection after each command. However, if we want to keep a connection
199+
# open for a long time we would need to enable keepalive.
200+
201+
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
202+
if hasattr(socket, "TCP_KEEPIDLE"):
203+
# Only available on Linux so this makes typing work cross platform
204+
sock.setsockopt(
205+
socket.IPPROTO_TCP,
206+
socket.TCP_KEEPIDLE,
207+
self.KEEPALIVE_AFTER_IDLE_SEC,
208+
)
189209

190-
sock.setsockopt(
191-
socket.IPPROTO_TCP,
192-
socket.TCP_KEEPINTVL,
193-
self.KEEPALIVE_INTERVAL_SEC,
194-
)
195-
sock.setsockopt(
196-
socket.IPPROTO_TCP, socket.TCP_KEEPCNT, self.KEEPALIVE_MAX_FAILS
197-
)
210+
sock.setsockopt(
211+
socket.IPPROTO_TCP,
212+
socket.TCP_KEEPINTVL,
213+
self.KEEPALIVE_INTERVAL_SEC,
214+
)
215+
sock.setsockopt(
216+
socket.IPPROTO_TCP, socket.TCP_KEEPCNT, self.KEEPALIVE_MAX_FAILS
217+
)
198218

199-
self._reader, self._writer = reader, writer
200-
except (asyncio.TimeoutError, OSError):
201-
msg = f"Cannot connect to TCP modbus device at {self.host}:{self.port}"
202-
if self.logger is not None:
203-
self.logger.warning(f"[{self}][_get_tcp_connection] {msg}")
204-
raise ModbusNotConnectedError(msg)
205-
else:
206-
reader, writer = self._reader, self._writer
219+
self._reader, self._writer = reader, writer
207220

208221
return reader, writer
209222

@@ -226,7 +239,7 @@ async def close(self) -> None:
226239
if self._ping_loop is None:
227240
return
228241

229-
await self.clear_tcp_connection()
242+
self.clear_tcp_connection()
230243

231244
if self._ping_loop is not None:
232245
if self.logger is not None:
@@ -302,12 +315,14 @@ async def _watch_loop() -> None:
302315
_watch_loop(), name=f"TCPModbusClient{log_prefix}"
303316
)
304317

305-
async def clear_tcp_connection(self) -> None:
318+
def clear_tcp_connection(self) -> None:
306319
"""
307320
Closes the current TCP connection and clears the reader and writer objects.
308321
On the next send_modbus_message call, a new connection will be created.
309322
"""
310323

324+
self._consecutive_timeouts = 0
325+
311326
if self._ping_loop is None:
312327
raise RuntimeError("Cannot clear TCP connection on closed TCPModbusClient")
313328

@@ -319,17 +334,6 @@ async def clear_tcp_connection(self) -> None:
319334

320335
self._writer.close()
321336

322-
try:
323-
await self._writer.wait_closed()
324-
except (TimeoutError, ConnectionResetError, OSError) as e:
325-
if self.logger is not None:
326-
self.logger.warning(
327-
f"[{self}][clear_tcp_connection] {type(e).__name__}({e}) error on connection close, "
328-
"continuing anyway"
329-
)
330-
331-
pass
332-
333337
self._reader = None
334338
self._writer = None
335339

@@ -434,6 +438,7 @@ async def send_modbus_message(
434438
reader, writer = await self._get_tcp_connection(
435439
timeout=time_budget_remaining
436440
)
441+
437442
time_budget_remaining -= conn_t()
438443

439444
# STEP THREE: WRITE OUR REQUEST
@@ -447,9 +452,9 @@ async def send_modbus_message(
447452
if self.logger is not None:
448453
self.logger.debug(f"[{self}][send_modbus_message] wrote {msg_str}")
449454

450-
except (asyncio.TimeoutError, OSError, ConnectionResetError):
455+
except OSError: # this includes timeout errors
451456
# Clear connection no matter what if we fail on the write
452-
# TODO: consider revisiting this to only do it on OSError and ConnectionResetError
457+
# TODO: consider revisiting this to not do it on a timeouterror
453458
# (but Gru is scared about partial writes)
454459

455460
if self.logger is not None:
@@ -458,7 +463,7 @@ async def send_modbus_message(
458463
f"request {msg_str}, clearing connection"
459464
)
460465

461-
await self.clear_tcp_connection()
466+
self.clear_tcp_connection()
462467

463468
if retries > 0:
464469
if self.logger is not None:
@@ -516,25 +521,37 @@ async def send_modbus_message(
516521
return None
517522

518523
raise
524+
except asyncio.TimeoutError as e:
525+
self._consecutive_timeouts += 1
526+
if self._consecutive_timeouts >= self.CONSECUTIVE_TIMEOUTS_TO_RECONNECT:
527+
if self.logger is not None:
528+
self.logger.warning(
529+
f"[{self}][send_modbus_message] {self._consecutive_timeouts} consecutive timeouts, "
530+
"clearing connection"
531+
)
532+
self.clear_tcp_connection()
519533

520-
except (asyncio.TimeoutError, OSError, ConnectionResetError) as e:
521-
# We clear the connection if the connection was reset by peer or was an OS error
522-
if isinstance(e, (OSError, ConnectionResetError)):
523-
print("CLEARING TCP ON GENERAL FAIL")
524-
await self.clear_tcp_connection()
525-
526-
raise (
527-
ModbusCommunicationTimeoutError
528-
if isinstance(e, asyncio.TimeoutError)
529-
else ModbusCommunicationFailureError
530-
)(
531-
f"Request {msg_str} failed to {self.host}:{self.port} ({type(e).__name__}({e}))"
534+
raise ModbusCommunicationTimeoutError(
535+
f"Request {msg_str} timed out to {self.host}:{self.port}"
532536
) from e
537+
except OSError as e:
538+
if self.logger is not None:
539+
self.logger.warning(
540+
f"[{self}][send_modbus_message] OSError{type(e).__name__}({e}) while sending request {msg_str}, "
541+
"clearing connection"
542+
)
533543

544+
self.clear_tcp_connection()
545+
546+
raise ModbusCommunicationFailureError(
547+
f"Request {msg_str} failed to {self.host}:{self.port} ({type(e).__name__}({e}))"
548+
) from e
534549
finally:
535550
if self._comms_lock.locked():
536551
self._comms_lock.release()
537552

553+
self._consecutive_timeouts = 0
554+
538555
if self.logger is not None:
539556
self.logger.debug(
540557
f"[{self}][send_modbus_message] executed request/response with timing "

0 commit comments

Comments
 (0)