Skip to content

Commit d7cccb2

Browse files
committed
Fix Go timeout scan and receive errors
1 parent aaec3d5 commit d7cccb2

7 files changed

Lines changed: 111 additions & 34 deletions

File tree

.agents/sow/done/SOW-0016-20260610-client-call-timeout-and-abort.md

Lines changed: 51 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
Status: completed
66

7-
Sub-state: implementation and validation completed on 2026-06-11; `SOW-0015` remains paused and can be resumed after this SOW is committed.
7+
Sub-state: regression repair completed on 2026-06-11 after CI failures on commit `aaec3d5`; `SOW-0015` remains paused and can be resumed after this repair is committed.
88

99
## Requirements
1010

@@ -375,4 +375,53 @@ None.
375375

376376
## Regression Log
377377

378-
None yet.
378+
### 2026-06-11
379+
380+
CI failures after commit `aaec3d5` reopened this SOW:
381+
382+
- Static Analysis run `27336192003`, job `Go Static Analysis (src/go)`, failed because `gosec` exited with status 1 after uploading SARIF. Local reproduction with `gosec -fmt=json -exclude=G404 ./...` under `src/go` reports `G115` at `src/go/pkg/netipc/service/raw/client_unix.go:198`: integer overflow conversion `int64 -> uint32` in the SHM receive polling timeout calculation.
383+
- CodeQL run `27336192249`, job `Analyze Go Windows`, failed in the MSYS2 build step while running Go tests. The failing test was `TestSessionReceiveRejectsMalformedChunks/continuation_recv_disconnect` in `src/go/pkg/netipc/transport/windows/pipe_edge_test.go`; CI observed `peer disconnected` while the existing test expected the old wrapped continuation receive error.
384+
385+
Why previous validation missed it:
386+
387+
- The local Go validation did not run `gosec`; the static-analysis workflow did.
388+
- The final `win11` validation did run Windows Go tests, but the CI failure is a Windows named-pipe close timing variation. The changed framing code returned raw continuation receive errors, so one platform/timing path exposed a direct `ErrDisconnected` instead of the pre-existing wrapped `ErrRecv` contract.
389+
390+
Repair plan:
391+
392+
- Keep timeout and abort errors distinct through chunk continuation receives.
393+
- Restore pre-SOW behavior for ordinary continuation receive failures by wrapping them as `ErrRecv("continuation recv: ...")`.
394+
- Replace duplicated timeout polling conversions with bounded helpers that prove the narrowed value is below the poll cap before converting to `uint32`.
395+
396+
Validation plan:
397+
398+
- Re-run `gosec -fmt=json -exclude=G404 ./...` under `src/go`.
399+
- Re-run `cd src/go && go test ./pkg/netipc/...` locally.
400+
- Re-run the failing Windows package/test on `win11` with `MSYSTEM=MSYS`.
401+
- Re-run `git diff --check`.
402+
403+
Repair results:
404+
405+
- Updated `src/go/pkg/netipc/transport/internal/framing/receive.go` so chunk continuation receive errors are wrapped as `ErrRecv("continuation recv: ...")` unless the transport explicitly marks them as timeout/abort control errors.
406+
- Updated POSIX UDS and Windows Named Pipe receive config to propagate only timeout and abort receive errors raw.
407+
- Replaced duplicated Go SHM timeout narrowing logic in `src/go/pkg/netipc/service/raw/client_unix.go` and `src/go/pkg/netipc/service/raw/client_windows.go` with `boundedClientWaitMs()`, which caps the value before the justified `G115`-suppressed conversion.
408+
- Updated Windows named-pipe receive polling to use the same bounded conversion pattern.
409+
410+
Validation results:
411+
412+
- `cd src/go && go test ./pkg/netipc/...`: passed.
413+
- `cd src/go && go vet ./...`: passed.
414+
- `cd src/go && staticcheck ./...`: passed.
415+
- Exact CI-style `gosec -quiet -fmt sarif -out gosec.sarif -exclude=G404 ./...` passed for `src/go`, `tests/fixtures/go`, and `bench/drivers/go`.
416+
- Verbose local `gosec -fmt=json -exclude=G404 ./...` under `src/go` now reports zero issues; it still reports the pre-existing assembly-stub SSA warning for `pkg/netipc/transport/posix/shm_pause_amd64.go`, but the CI-style SARIF command exits successfully with zero findings.
417+
- `win11` MSYS CodeQL-style loop passed: `CGO_ENABLED=0 go test ./...` in `src/go`, `tests/fixtures/go`, and `bench/drivers/go`.
418+
- `win11` focused failure reproduction passed 50 times: `CGO_ENABLED=0 go test ./pkg/netipc/transport/windows -run 'TestSessionReceiveRejectsMalformedChunks/continuation_recv_disconnect' -count=50`.
419+
- `git diff --check`: passed.
420+
421+
Regression artifact maintenance:
422+
423+
- `AGENTS.md`: no update needed; workflow and project guardrails did not change.
424+
- Runtime project skills: no update needed; this was a narrow code regression repair, not a reusable workflow change.
425+
- Specs and end-user/operator docs: no update needed; public timeout/abort behavior remains as documented.
426+
- End-user/operator skills: no update needed; integrator guidance remains correct.
427+
- SOW lifecycle: SOW was reopened from `done/`, repaired, validated, marked completed, and moved back to `done/` with the repair commit.

src/go/pkg/netipc/service/raw/client_unix.go

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -194,12 +194,7 @@ func (c *Client) transportReceiveWithControl(timeoutMs uint32, abortCh <-chan st
194194
if remaining <= 0 {
195195
return protocol.Header{}, nil, protocol.ErrTimeout
196196
}
197-
if remaining < time.Duration(waitMs)*time.Millisecond {
198-
waitMs = uint32((remaining + time.Millisecond - 1) / time.Millisecond)
199-
if waitMs == 0 {
200-
waitMs = 1
201-
}
202-
}
197+
waitMs = boundedClientWaitMs(remaining, waitMs)
203198
}
204199

205200
mlen, err := c.shm.ShmReceive(scratch, waitMs)

src/go/pkg/netipc/service/raw/client_windows.go

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -182,12 +182,7 @@ func (c *Client) transportReceiveWithControl(timeoutMs uint32, abortCh <-chan st
182182
if remaining <= 0 {
183183
return protocol.Header{}, nil, protocol.ErrTimeout
184184
}
185-
if remaining < time.Duration(waitMs)*time.Millisecond {
186-
waitMs = uint32((remaining + time.Millisecond - 1) / time.Millisecond)
187-
if waitMs == 0 {
188-
waitMs = 1
189-
}
190-
}
185+
waitMs = boundedClientWaitMs(remaining, waitMs)
191186
}
192187

193188
mlen, err := c.shm.WinShmReceive(scratch, waitMs)

src/go/pkg/netipc/service/raw/types.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
// Pure Go — no cgo. Works with CGO_ENABLED=0.
99
package raw
1010

11+
import "time"
12+
1113
// Poll/receive timeout for server loops (ms). Controls shutdown detection latency.
1214
const serverPollTimeoutMs = 100
1315

@@ -16,6 +18,20 @@ const ClientCallTimeoutDefaultMs uint32 = 30000
1618

1719
const clientAbortPollMs uint32 = 100
1820

21+
func boundedClientWaitMs(remaining time.Duration, pollCapMs uint32) uint32 {
22+
if remaining <= 0 {
23+
return 0
24+
}
25+
if remaining >= time.Duration(pollCapMs)*time.Millisecond {
26+
return pollCapMs
27+
}
28+
waitMs := uint32((remaining + time.Millisecond - 1) / time.Millisecond) // #nosec G115 -- remaining is positive and below pollCapMs here.
29+
if waitMs == 0 {
30+
return 1
31+
}
32+
return waitMs
33+
}
34+
1935
// ---------------------------------------------------------------------------
2036
// Client state (shared across platforms)
2137
// ---------------------------------------------------------------------------

src/go/pkg/netipc/transport/internal/framing/receive.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ type Receiver struct {
2323
Recv func([]byte) (int, error)
2424
EnsurePacketScratch func(*[]byte, int) []byte
2525
OnRecvError func(error)
26+
PropagateRecvError func(error) bool
2627

2728
ErrLimitExceeded func(string) error
2829
ErrProtocol func(string) error
@@ -47,6 +48,7 @@ type SessionReceiveConfig struct {
4748
Recv func([]byte) (int, error)
4849
EnsurePacketScratch func(*[]byte, int) []byte
4950
IsRecvDisconnect func(error) bool
51+
PropagateRecvError func(error) bool
5052
FailAllInflight func()
5153

5254
ErrLimitExceeded func(string) error
@@ -74,6 +76,7 @@ func SessionReceive(config SessionReceiveConfig, buf []byte) (protocol.Header, [
7476
PacketBuf: config.PacketBuf,
7577
Recv: config.Recv,
7678
EnsurePacketScratch: config.EnsurePacketScratch,
79+
PropagateRecvError: config.PropagateRecvError,
7780
OnRecvError: func(err error) {
7881
if config.IsRecvDisconnect(err) {
7982
config.FailAllInflight()
@@ -256,7 +259,10 @@ func (r Receiver) receiveOneChunk(
256259
cn, err := r.Recv(pktBuf)
257260
if err != nil {
258261
r.noteRecvError(err)
259-
return err
262+
if r.PropagateRecvError != nil && r.PropagateRecvError(err) {
263+
return err
264+
}
265+
return r.ErrRecv("continuation recv: " + err.Error())
260266
}
261267
if cn < protocol.HeaderSize {
262268
return r.ErrChunk("continuation too short")

src/go/pkg/netipc/transport/posix/uds_receive.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -151,11 +151,14 @@ func (s *Session) ReceiveTimeout(buf []byte, timeoutMs uint32, abortCh <-chan st
151151
Recv: recv,
152152
EnsurePacketScratch: ensureScratchBuf,
153153
IsRecvDisconnect: func(err error) bool { return errors.Is(err, ErrRecv) },
154-
FailAllInflight: s.failAllInflight,
155-
ErrLimitExceeded: func(msg string) error { return wrapErr(ErrLimitExceeded, msg) },
156-
ErrProtocol: func(msg string) error { return wrapErr(ErrProtocol, msg) },
157-
ErrChunk: func(msg string) error { return wrapErr(ErrChunk, msg) },
158-
ErrUnknownMsgID: func(msg string) error { return wrapErr(ErrUnknownMsgID, msg) },
159-
ErrRecv: func(msg string) error { return wrapErr(ErrRecv, msg) },
154+
PropagateRecvError: func(err error) bool {
155+
return errors.Is(err, ErrTimeout) || errors.Is(err, ErrAborted)
156+
},
157+
FailAllInflight: s.failAllInflight,
158+
ErrLimitExceeded: func(msg string) error { return wrapErr(ErrLimitExceeded, msg) },
159+
ErrProtocol: func(msg string) error { return wrapErr(ErrProtocol, msg) },
160+
ErrChunk: func(msg string) error { return wrapErr(ErrChunk, msg) },
161+
ErrUnknownMsgID: func(msg string) error { return wrapErr(ErrUnknownMsgID, msg) },
162+
ErrRecv: func(msg string) error { return wrapErr(ErrRecv, msg) },
160163
}, buf)
161164
}

src/go/pkg/netipc/transport/windows/pipe_receive.go

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ import (
1111
"github.com/netdata/plugin-ipc/go/pkg/netipc/transport/internal/framing"
1212
)
1313

14+
const receiveAbortPollMs uint32 = 100
15+
1416
// Receive reads one logical message. buf is a scratch buffer.
1517
func (s *Session) Receive(buf []byte) (protocol.Header, []byte, error) {
1618
return s.ReceiveTimeout(buf, 0, nil)
@@ -40,12 +42,15 @@ func (s *Session) ReceiveTimeout(buf []byte, timeoutMs uint32, abortCh <-chan st
4042
Recv: recv,
4143
EnsurePacketScratch: ensurePipeScratchBuf,
4244
IsRecvDisconnect: func(err error) bool { return errors.Is(err, ErrDisconnected) },
43-
FailAllInflight: s.failAllInflight,
44-
ErrLimitExceeded: func(msg string) error { return wrapErr(ErrLimitExceeded, msg) },
45-
ErrProtocol: func(msg string) error { return wrapErr(ErrProtocol, msg) },
46-
ErrChunk: func(msg string) error { return wrapErr(ErrChunk, msg) },
47-
ErrUnknownMsgID: func(msg string) error { return wrapErr(ErrUnknownMsgID, msg) },
48-
ErrRecv: func(msg string) error { return wrapErr(ErrRecv, msg) },
45+
PropagateRecvError: func(err error) bool {
46+
return errors.Is(err, ErrTimeout) || errors.Is(err, ErrAborted)
47+
},
48+
FailAllInflight: s.failAllInflight,
49+
ErrLimitExceeded: func(msg string) error { return wrapErr(ErrLimitExceeded, msg) },
50+
ErrProtocol: func(msg string) error { return wrapErr(ErrProtocol, msg) },
51+
ErrChunk: func(msg string) error { return wrapErr(ErrChunk, msg) },
52+
ErrUnknownMsgID: func(msg string) error { return wrapErr(ErrUnknownMsgID, msg) },
53+
ErrRecv: func(msg string) error { return wrapErr(ErrRecv, msg) },
4954
}, buf)
5055
}
5156

@@ -75,7 +80,7 @@ func (w receiveWait) waitMs() (uint32, error) {
7580
}
7681
}
7782

78-
waitMs := uint32(100)
83+
waitMs := receiveAbortPollMs
7984
if w.infinite {
8085
return waitMs, nil
8186
}
@@ -84,13 +89,21 @@ func (w receiveWait) waitMs() (uint32, error) {
8489
if remaining <= 0 {
8590
return 0, ErrTimeout
8691
}
87-
if remaining < time.Duration(waitMs)*time.Millisecond {
88-
waitMs = uint32((remaining + time.Millisecond - 1) / time.Millisecond)
89-
if waitMs == 0 {
90-
waitMs = 1
91-
}
92+
return boundedReceiveWaitMs(remaining, waitMs), nil
93+
}
94+
95+
func boundedReceiveWaitMs(remaining time.Duration, pollCapMs uint32) uint32 {
96+
if remaining <= 0 {
97+
return 0
98+
}
99+
if remaining >= time.Duration(pollCapMs)*time.Millisecond {
100+
return pollCapMs
101+
}
102+
waitMs := uint32((remaining + time.Millisecond - 1) / time.Millisecond) // #nosec G115 -- remaining is positive and below pollCapMs here.
103+
if waitMs == 0 {
104+
return 1
92105
}
93-
return waitMs, nil
106+
return waitMs
94107
}
95108

96109
func (s *Session) rawRecvWithTimeout(buf []byte, wait receiveWait) (int, error) {

0 commit comments

Comments
 (0)