Skip to content

Commit f206435

Browse files
committed
tcp,udp,icmp: impl post flow
1 parent f18478d commit f206435

5 files changed

Lines changed: 20 additions & 0 deletions

File tree

intra/common.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ var _ SocketListener = (*zeroListener)(nil)
6060
func (*zeroListener) Preflow(_, _ int32, _, _ *x.Gostr) *PreMark { return nil }
6161
func (*zeroListener) Flow(_, _ int32, _, _, _, _, _, _ *x.Gostr) *Mark { return nil }
6262
func (*zeroListener) Inflow(_, _ int32, _, _ *x.Gostr) *Mark { return nil }
63+
func (*zeroListener) PostFlow(*Mark) {}
6364
func (*zeroListener) OnSocketClosed(*SocketSummary) {}
6465

6566
var nooplistener = new(zeroListener)

intra/icmp.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,8 @@ func (h *icmpHandler) Ping(msg []byte, source, target netip.AddrPort) (echoed bo
115115
h.conntracker.Track(cid, uid, pidstr(px), uc)
116116
defer h.conntracker.Untrack(cid)
117117

118+
h.listener.PostFlow(smm.postMark())
119+
118120
tx = len(msg)
119121
// todo: construct ICMP header? github.com/prometheus-community/pro-bing/blob/0bacb2d5e7/ping.go#L717
120122
reply, from, err := core.Echo(uc, msg, net.UDPAddrFromAddrPort(dst), target.Addr().Is4())

intra/listener.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,10 @@ type SocketListener interface {
6868
// except for the CID field, which is sent back via OnSocketClosed, and "Block" proxy which
6969
// will drop this connection on the floor.
7070
Inflow(protocol, uid int32, src, dst *x.Gostr) *Mark
71+
// PostFlow is called after a flow is marked by Flow or Inflow.
72+
// It denotes the final Mark that was applied to the flow.
73+
// The only major discernable effect is PIDCSV has a single PID.
74+
PostFlow(m *Mark)
7175
// OnSocketClosed reports summary after a socket closes.
7276
OnSocketClosed(*SocketSummary)
7377
}
@@ -135,6 +139,17 @@ func udpSummary(id, uid string, dst netip.Addr) *SocketSummary {
135139
return s
136140
}
137141

142+
func (s *SocketSummary) postMark() *Mark {
143+
if s == nil {
144+
return nil
145+
}
146+
return &Mark{
147+
PIDCSV: s.PID,
148+
CID: s.ID,
149+
UID: s.UID,
150+
}
151+
}
152+
138153
// String implements fmt.Stringer.
139154
func (s *SocketSummary) String() string {
140155
if s != nil {

intra/tcp.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -289,6 +289,7 @@ func (h *tcpHandler) handle(px ipn.Proxy, src net.Conn, boundSrc, target netip.A
289289
}
290290

291291
core.Go("tcp.forward."+smm.ID, func() {
292+
h.listener.PostFlow(smm.postMark())
292293
h.forward(src, rwext{dst, tcptimeout}, smm) // src always *gonet.TCPConn
293294
})
294295
return nil // handled; takes ownership of src

intra/udp.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,7 @@ func (h *udpHandler) proxy(gconn *netstack.GUDPConn, src, dst netip.AddrPort, dm
169169

170170
cid := smm.ID
171171
core.Go("udp.forward."+cid, func() {
172+
h.listener.PostFlow(smm.postMark())
172173
h.forward(gconn, rwext{remote, udptimeout}, smm)
173174
})
174175
return true // ok

0 commit comments

Comments
 (0)