Skip to content

Commit 1c60935

Browse files
committed
Reduce NetIPC platform duplication
1 parent 83c9a56 commit 1c60935

24 files changed

Lines changed: 1244 additions & 1411 deletions

.agents/sow/current/SOW-0015-20260605-codacy-scope-and-maintainability.md

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -669,6 +669,41 @@ SonarCloud duplication composition for Netdata PR #22649:
669669

670670
The duplication is a real POSIX/Windows paired-implementation signal, not unresolved Sonar line findings. It will be handled after this review-finding fix is committed and re-vendored.
671671

672+
### 2026-06-07 - SonarCloud Duplication Reduction
673+
674+
Live Netdata PR #22649 recheck before this increment:
675+
676+
- GitHub review threads: eight total, eight resolved, zero open.
677+
- SonarCloud issue API: zero unresolved issues on the PR.
678+
- SonarCloud hotspot API: zero unresolved hotspots on the PR.
679+
- SonarCloud quality gate status: failed only because new-code duplication is 10.5%, above the configured 3% threshold.
680+
- Latest SonarCloud duplication component tree still reports 1,826 duplicated new lines across 29 files.
681+
682+
Selected low-risk production-source refactors:
683+
684+
- Extract Go POSIX/Windows transport receive framing from `uds_receive.go` and `pipe_receive.go` into a shared internal framing receiver. This targets the top two SonarCloud contributors, 189 duplicated lines each.
685+
- Extract the Go raw-service per-session request/dispatch/response loop from `server_unix.go` and `server_windows.go` into shared raw-service helpers while leaving OS-specific accept, readiness, receive, send, and SHM cleanup code local. This targets the next two contributors, 160 and 159 duplicated lines.
686+
- Extract Go POSIX/Windows transport send chunking and HELLO/HELLO_ACK protocol helpers into shared internal framing helpers while keeping socket/pipe I/O and session construction platform-local. This targets the next Go transport contributors: handshake and send pairs.
687+
- Extract C client refresh, raw-call envelope validation, and retry/reconnect policy into common service helpers with platform callbacks. This targets the POSIX/Windows C client-call and client lifecycle duplication without changing transport send/receive functions.
688+
689+
Risk controls:
690+
691+
- Do not merge POSIX and Windows accept loops; they differ in readiness, listener shutdown, and SHM setup behavior.
692+
- Keep transport-specific receive/send as callbacks so OS-specific error and timeout semantics remain local.
693+
- Validate POSIX Go packages locally and Windows Go packages on Win11 after the refactor.
694+
- Validate C common helper changes on POSIX and Win11 because both platform service client paths now call common retry/raw-call helpers.
695+
696+
Validation completed for this duplication-reduction increment:
697+
698+
- `git diff --check`: passed.
699+
- `cd src/go && go test -count=1 ./pkg/netipc/transport/internal/framing ./pkg/netipc/transport/posix ./pkg/netipc/service/raw`: passed.
700+
- `cd src/go && go test -count=1 ./pkg/netipc/...`: passed.
701+
- `cmake --build build`: passed.
702+
- `/usr/bin/ctest --test-dir build --output-on-failure`: 46/46 tests passed.
703+
- Win11 temp-copy Go validation: `cd src/go && go test -count=1 ./pkg/netipc/transport/windows ./pkg/netipc/service/raw`: passed.
704+
- Win11 temp-copy C validation: MSYS CMake build of `test_win_service`, `test_win_service_extra`, and `test_win_service_payload_limits` passed; CTest for those three tests passed.
705+
- `bash .agents/sow/audit.sh`: passed.
706+
672707
## Validation
673708

674709
Acceptance criteria evidence:

src/go/pkg/netipc/service/raw/server.go

Lines changed: 189 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
package raw
22

3-
import "sync/atomic"
3+
import (
4+
"errors"
5+
"sync/atomic"
6+
7+
"github.com/netdata/plugin-ipc/go/pkg/netipc/protocol"
8+
)
49

510
func (s *Server) dispatchSingle(methodCode uint16, request []byte, responseBuf []byte) (int, error) {
611
if methodCode != s.expectedMethodCode || s.handler == nil {
@@ -26,3 +31,186 @@ func serverNotePayloadCapacity(target *atomic.Uint32, payloadLen uint32) {
2631
}
2732
}
2833
}
34+
35+
type serverReceiveAction uint8
36+
37+
const (
38+
serverReceiveOK serverReceiveAction = iota
39+
serverReceiveContinue
40+
serverReceiveStop
41+
)
42+
43+
type serverSessionOps struct {
44+
maxRequestPayloadBytes uint32
45+
maxResponsePayloadBytes uint32
46+
receive func([]byte) (protocol.Header, []byte, serverReceiveAction)
47+
send func(*protocol.Header, []byte, *[]byte) error
48+
close func()
49+
}
50+
51+
func (s *Server) handleServerSession(ops serverSessionOps) {
52+
recvBuf := make([]byte, protocol.HeaderSize+int(ops.maxRequestPayloadBytes))
53+
respBuf := make([]byte, int(ops.maxResponsePayloadBytes))
54+
itemRespBuf := make([]byte, int(ops.maxResponsePayloadBytes))
55+
msgBuf := make([]byte, int(ops.maxResponsePayloadBytes)+protocol.HeaderSize)
56+
57+
defer ops.close()
58+
59+
for s.running.Load() {
60+
hdr, payload, action := ops.receive(recvBuf)
61+
switch action {
62+
case serverReceiveContinue:
63+
continue
64+
case serverReceiveStop:
65+
return
66+
}
67+
68+
if hdr.Kind != protocol.KindRequest {
69+
return
70+
}
71+
72+
respHdr, responseLen, closeAfterSend := s.handleServerRequest(
73+
hdr, payload, respBuf, itemRespBuf, ops.maxResponsePayloadBytes)
74+
if err := ops.send(&respHdr, respBuf[:responseLen], &msgBuf); err != nil {
75+
return
76+
}
77+
if closeAfterSend {
78+
return
79+
}
80+
}
81+
}
82+
83+
func (s *Server) handleServerRequest(
84+
hdr protocol.Header,
85+
payload []byte,
86+
respBuf []byte,
87+
itemRespBuf []byte,
88+
maxResponsePayloadBytes uint32,
89+
) (protocol.Header, int, bool) {
90+
if payloadLen, err := checkedLookupU32(len(payload)); err == nil {
91+
serverNotePayloadCapacity(&s.learnedRequestPayloadBytes, payloadLen)
92+
}
93+
94+
if !s.methodSupported(hdr.Code) {
95+
return serverUnsupportedResponseHeader(hdr), 0, false
96+
}
97+
98+
responseLen, isBatch, dispatchErr := s.dispatchServerResponse(hdr, payload, respBuf, itemRespBuf)
99+
respHdr := serverResponseHeader(hdr)
100+
101+
if dispatchErr == nil {
102+
if responseLen32, err := checkedLookupU32(responseLen); err == nil {
103+
serverNotePayloadCapacity(&s.learnedResponsePayloadBytes, responseLen32)
104+
}
105+
respHdr.TransportStatus = protocol.StatusOK
106+
if isBatch {
107+
respHdr.Flags = protocol.FlagBatch
108+
respHdr.ItemCount = hdr.ItemCount
109+
} else {
110+
respHdr.ItemCount = 1
111+
}
112+
return respHdr, responseLen, false
113+
}
114+
115+
respHdr.ItemCount = 1
116+
responseLen = 0
117+
switch {
118+
case errors.Is(dispatchErr, protocol.ErrOverflow):
119+
if maxResponsePayloadBytes >= ^uint32(0)/2 {
120+
serverNotePayloadCapacity(&s.learnedResponsePayloadBytes, ^uint32(0))
121+
} else {
122+
serverNotePayloadCapacity(&s.learnedResponsePayloadBytes, maxResponsePayloadBytes*2)
123+
}
124+
respHdr.TransportStatus = protocol.StatusLimitExceeded
125+
return respHdr, responseLen, true
126+
case errors.Is(dispatchErr, errHandlerFailed):
127+
respHdr.TransportStatus = protocol.StatusInternalError
128+
default:
129+
respHdr.TransportStatus = protocol.StatusBadEnvelope
130+
}
131+
132+
return respHdr, responseLen, false
133+
}
134+
135+
func (s *Server) dispatchServerResponse(
136+
hdr protocol.Header,
137+
payload []byte,
138+
respBuf []byte,
139+
itemRespBuf []byte,
140+
) (int, bool, error) {
141+
isBatch := (hdr.Flags&protocol.FlagBatch != 0) && hdr.ItemCount >= 1
142+
if !isBatch {
143+
responseLen, err := s.dispatchSingle(hdr.Code, payload, respBuf)
144+
if err != nil {
145+
return 0, false, err
146+
}
147+
if responseLen < 0 || responseLen > len(respBuf) {
148+
return 0, false, protocol.ErrOverflow
149+
}
150+
return responseLen, false, nil
151+
}
152+
153+
var bb protocol.BatchBuilder
154+
bb.Reset(respBuf, hdr.ItemCount)
155+
for i := uint32(0); i < hdr.ItemCount; i++ {
156+
itemData, err := protocol.BatchItemGet(payload, hdr.ItemCount, i)
157+
if err != nil {
158+
return 0, true, err
159+
}
160+
161+
itemResultLen, err := s.dispatchSingle(hdr.Code, itemData, itemRespBuf)
162+
if err != nil {
163+
return 0, true, err
164+
}
165+
if itemResultLen < 0 || itemResultLen > len(itemRespBuf) {
166+
return 0, true, protocol.ErrOverflow
167+
}
168+
if err := bb.Add(itemRespBuf[:itemResultLen]); err != nil {
169+
return 0, true, err
170+
}
171+
}
172+
173+
responseLen, _ := bb.Finish()
174+
return responseLen, true, nil
175+
}
176+
177+
func serverResponseHeader(hdr protocol.Header) protocol.Header {
178+
return protocol.Header{
179+
Kind: protocol.KindResponse,
180+
Code: hdr.Code,
181+
MessageID: hdr.MessageID,
182+
}
183+
}
184+
185+
func serverUnsupportedResponseHeader(hdr protocol.Header) protocol.Header {
186+
respHdr := serverResponseHeader(hdr)
187+
respHdr.TransportStatus = protocol.StatusUnsupported
188+
respHdr.ItemCount = 1
189+
return respHdr
190+
}
191+
192+
func serverEncodeSharedResponse(
193+
respHdr *protocol.Header,
194+
payload []byte,
195+
msgBuf *[]byte,
196+
) ([]byte, error) {
197+
payloadLen, err := checkedLookupU32(len(payload))
198+
if err != nil {
199+
return nil, err
200+
}
201+
msgLen := protocol.HeaderSize + len(payload)
202+
if len(*msgBuf) < msgLen {
203+
*msgBuf = make([]byte, msgLen)
204+
}
205+
msg := (*msgBuf)[:msgLen]
206+
207+
respHdr.Magic = protocol.MagicMsg
208+
respHdr.Version = protocol.Version
209+
respHdr.HeaderLen = protocol.HeaderLen
210+
respHdr.PayloadLen = payloadLen
211+
respHdr.Encode(msg[:protocol.HeaderSize])
212+
if len(payload) > 0 {
213+
copy(msg[protocol.HeaderSize:], payload)
214+
}
215+
return msg, nil
216+
}

0 commit comments

Comments
 (0)