Skip to content

Commit 4f9dfda

Browse files
committed
rnet: export listener & summary via pkg backend
1 parent e8e2a76 commit 4f9dfda

6 files changed

Lines changed: 63 additions & 55 deletions

File tree

intra/backend/rnet_services.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,3 +53,28 @@ type Services interface {
5353
// Refresh re-registers servces and returns a csv of active ones.
5454
RefreshServers() (active string)
5555
}
56+
57+
// Summary is a summary of a DNS transaction, reported when it is complete.
58+
type ServerSummary struct {
59+
Type string // http1, socks5, etc.
60+
SID string // Server ID.
61+
PID string // Proxy ID (hop) that handled egress, if any.
62+
CID string // Connection id
63+
Tx int64 // Total uploaded (bytes).
64+
Rx int64 // Total downloaded (bytes).
65+
Duration int64 // Conn open duration (millis).
66+
Msg string // Error message, if any.
67+
}
68+
69+
// ServerListener receives Server events.
70+
type ServerListener interface {
71+
// SvcRoute decides how to forward an incoming connection over service (sid).
72+
SvcRoute(sid, pid, network, sipport, dipport string) *Tab
73+
// OnSvcComplete reports summary after a connection closes.
74+
OnSvcComplete(*ServerSummary)
75+
}
76+
77+
type Tab struct {
78+
CID string // CID is the ID of this connection.
79+
Block bool // Block is true if this connection should be blocked.
80+
}

intra/dns53/dot_test.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ import (
2525
"github.com/celzero/firestack/intra/ipn/warp"
2626
ilog "github.com/celzero/firestack/intra/log"
2727
"github.com/celzero/firestack/intra/protect"
28-
"github.com/celzero/firestack/intra/rnet"
2928
"github.com/celzero/firestack/intra/settings"
3029
"github.com/celzero/firestack/intra/x64"
3130
"github.com/celzero/firestack/intra/xdns"
@@ -111,7 +110,7 @@ type fakeBdg struct {
111110

112111
var (
113112
// baseNsOpts = &x.DNSOpts{PIDCSV: dnsx.NetBaseProxy, IPCSV: "", TIDCSV: x.CT + "test0"}
114-
baseTab = &rnet.Tab{CID: "testcid", Block: false}
113+
baseTab = &x.Tab{CID: "testcid", Block: false}
115114
seNsOpts = &x.DNSOpts{PIDCSV: ipn.RpnSE, IPCSV: "", TIDCSV: x.CT + "test0"}
116115
)
117116

@@ -121,8 +120,8 @@ func (*fakeBdg) OnDNSAdded(string) {}
121120
func (*fakeBdg) OnDNSRemoved(string) {}
122121
func (*fakeBdg) OnDNSStopped() {}
123122

124-
func (*fakeBdg) Route(a, b, c, d, e string) *rnet.Tab { return baseTab }
125-
func (*fakeBdg) OnComplete(*rnet.ServerSummary) {}
123+
func (*fakeBdg) Route(a, b, c, d, e string) *x.Tab { return baseTab }
124+
func (*fakeBdg) OnComplete(*x.ServerSummary) {}
126125

127126
const minmtu = 1280
128127
const dualstack = settings.IP46

intra/rnet/http.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ func (h *httpx) route(req *http.Request, ctx *tx.ProxyCtx) (*http.Request, *http
138138
src := req.RemoteAddr
139139
sid := h.id
140140
pid := h.pid()
141-
tab := h.listener.Route(sid, pid, "tcp", src, req.Host)
141+
tab := h.listener.SvcRoute(sid, pid, "tcp", src, req.Host)
142142
log.D("svchttp: route: tab(%v) id(%s) p(%s) src(%s) dst(%s)", tab, h.id, pid, src, req.Host)
143143
if tab.Block {
144144
return req, tx.NewResponse(req, tx.ContentTypeText, http.StatusForbidden, "Forbidden")
@@ -161,12 +161,12 @@ func (h *httpx) summarize(res *http.Response, ctx *tx.ProxyCtx) *http.Response {
161161
log.W("svchttp: summarize: invalid userdata %v", ctx.UserData)
162162
return res
163163
}
164-
ssu.Rx = int(res.ContentLength)
164+
ssu.Rx = res.ContentLength
165165
if req != nil {
166-
ssu.Tx = int(req.ContentLength)
166+
ssu.Tx = req.ContentLength
167167
}
168168
ssu.done(errNop)
169-
go h.listener.OnComplete(ssu)
169+
go h.listener.OnSvcComplete(ssu.ServerSummary)
170170
return res
171171
}
172172

@@ -175,7 +175,7 @@ func (h *httpx) routeConnect(host string, ctx *tx.ProxyCtx) (*tx.ConnectAction,
175175
dst := ctx.Req.Host
176176
sid := h.id
177177
pid := h.pid()
178-
tab := h.listener.Route(sid, pid, "tcp", src, host)
178+
tab := h.listener.SvcRoute(sid, pid, "tcp", src, host)
179179
log.D("svchttp: routeConnect: tab(%v) id(%s) p(%s) src(%s) dst(%s)", tab, h.id, pid, src, dst)
180180
if tab.Block {
181181
return tx.RejectConnect, host
@@ -224,7 +224,7 @@ func (h *httpx) hijackConnect(req *http.Request, client net.Conn, ctx *tx.ProxyC
224224
wg.Wait()
225225
clos(client, target)
226226
}
227-
h.listener.OnComplete(ssu)
227+
h.listener.OnSvcComplete(ssu.ServerSummary)
228228
}()
229229
}
230230

intra/rnet/listener.go

Lines changed: 11 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -11,30 +11,23 @@ import (
1111
"fmt"
1212
"time"
1313

14+
x "github.com/celzero/firestack/intra/backend"
1415
"github.com/celzero/firestack/intra/core"
1516
)
1617

1718
var errNop = errors.New("no error")
1819

19-
// Summary is a summary of a DNS transaction, reported when it is complete.
2020
type ServerSummary struct {
21-
Type string // http1, socks5, etc.
22-
SID string // Server id
23-
PID string // Proxy ID (hop) that handled egress, if any.
24-
CID string // Connection id
25-
Tx int // Amount uploaded (bytes).
26-
Rx int // Amount downloaded (bytes).
27-
Duration int32 // Conn open duration (seconds).
28-
start time.Time // Tracks start time; unexported.
29-
Msg string // Error message, if any.
21+
*x.ServerSummary
22+
start time.Time // Tracks start time; unexported.
3023
}
3124

3225
func (s *ServerSummary) done(errs ...error) {
3326
if s == nil {
3427
return
3528
}
3629

37-
s.Duration = int32(time.Since(s.start).Seconds())
30+
s.Duration = time.Since(s.start).Milliseconds()
3831

3932
err := core.JoinErr(errs...) // errs may be nil
4033
if err != nil {
@@ -59,24 +52,13 @@ func (s *ServerSummary) String() string {
5952

6053
func serverSummary(typ, sid, pid, cid string) *ServerSummary {
6154
return &ServerSummary{
62-
Type: typ,
63-
SID: sid,
64-
PID: pid,
65-
CID: cid,
55+
ServerSummary: &x.ServerSummary{
56+
Type: typ,
57+
SID: sid,
58+
PID: pid,
59+
CID: cid,
60+
Msg: errNop.Error(),
61+
},
6662
start: time.Now(),
67-
Msg: errNop.Error(),
6863
}
6964
}
70-
71-
// ServerListener receives Server events.
72-
type ServerListener interface {
73-
// Route decides how to forward an incoming connection over service (sid).
74-
Route(sid, pid, network, sipport, dipport string) *Tab
75-
// OnComplete reports summary after a connection closes.
76-
OnComplete(*ServerSummary)
77-
}
78-
79-
type Tab struct {
80-
CID string // CID is the ID of this connection.
81-
Block bool // Block is true if this connection should be blocked.
82-
}

intra/rnet/servers.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,10 +49,12 @@ var (
4949

5050
// todo: github.com/txthinking/brook/blob/master/pac.go
5151

52-
type Server = x.Server
52+
type Server x.Server
5353

5454
type Services x.Services
5555

56+
type ServerListener x.ServerListener
57+
5658
var _ Services = (*services)(nil)
5759
var _ Server = (*httpx)(nil)
5860
var _ Server = (*socks5)(nil)
@@ -79,7 +81,7 @@ func NewServices(pctx context.Context, proxies ipn.Proxies, ctl protect.Controll
7981
return svc
8082
}
8183

82-
func (s *services) AddServer(id, url string) (svc Server, err error) {
84+
func (s *services) AddServer(id, url string) (svc x.Server, err error) {
8385
s.RemoveServer(id)
8486

8587
switch id {
@@ -146,7 +148,7 @@ func (s *services) RemoveServer(id string) bool {
146148
return false
147149
}
148150

149-
func (s *services) GetServer(id string) (Server, error) {
151+
func (s *services) GetServer(id string) (x.Server, error) {
150152
s.RLock()
151153
defer s.RUnlock()
152154

intra/rnet/socks5.go

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -237,8 +237,8 @@ func (h *socks5) pid() (x string) {
237237
return
238238
}
239239

240-
func (h *socks5) route(network, src, dst string) *Tab {
241-
return h.listener.Route(h.id, h.pid(), network, src, dst)
240+
func (h *socks5) route(network, src, dst string) *x.Tab {
241+
return h.listener.SvcRoute(h.id, h.pid(), network, src, dst)
242242
}
243243

244244
func (h *socks5) candial() error {
@@ -260,7 +260,7 @@ func (h *socks5) setDeadline(c net.Conn, secs int) error {
260260
}
261261

262262
type pipefin struct {
263-
ex int // bytes exchanged
263+
ex int64 // bytes exchanged
264264
err error // error, if any
265265
}
266266

@@ -272,7 +272,7 @@ func (h *socks5) pipe(r, w net.Conn, finch chan<- pipefin) {
272272
*bptr = bf
273273
core.Recycle(bptr)
274274
}()
275-
ex := 0
275+
ex := int64(0)
276276
laddr := r.LocalAddr()
277277
raddr := w.RemoteAddr()
278278
for {
@@ -281,7 +281,7 @@ func (h *socks5) pipe(r, w net.Conn, finch chan<- pipefin) {
281281
break
282282
}
283283
n, err := r.Read(bf[:])
284-
ex += n
284+
ex += int64(n)
285285
if err != nil {
286286
log.E("svcsocks5: tcp: %s; read %s; err: %v", h.ID(), laddr, err)
287287
finch <- pipefin{ex, err}
@@ -304,10 +304,10 @@ func (h *socks5) tcphandle(s *tx.Server, ingress *net.TCPConn, r *tx.Request) (e
304304
var cid string
305305
var egress *net.TCPConn
306306
cid, egress, err = h.Connect(r, ingress)
307-
summary := serverSummary(h.Type(), h.ID(), h.pid(), cid)
307+
ssu := serverSummary(h.Type(), h.ID(), h.pid(), cid)
308308
defer func() {
309-
summary.done(err)
310-
go h.listener.OnComplete(summary)
309+
ssu.done(err)
310+
go h.listener.OnSvcComplete(ssu.ServerSummary)
311311
}()
312312

313313
log.D("svcsocks5: proxy-tcp: %s; socks5-connect %s", cid, r.Address())
@@ -329,8 +329,8 @@ func (h *socks5) tcphandle(s *tx.Server, ingress *net.TCPConn, r *tx.Request) (e
329329

330330
err = core.JoinErr(finrx.err, fintx.err)
331331

332-
summary.Rx = finrx.ex
333-
summary.Tx = fintx.ex
332+
ssu.Rx = finrx.ex
333+
ssu.Tx = fintx.ex
334334

335335
return err
336336
}
@@ -386,7 +386,7 @@ func (h *socks5) udphandle(s *tx.Server, addr *net.UDPAddr, pkt *tx.Datagram) (e
386386
// writing to egress conn
387387
n, werr := egress.RemoteConn.Write(data)
388388
if ssu != nil {
389-
ssu.Tx += n
389+
ssu.Tx += int64(n)
390390
}
391391
log.D("svcsocks5: udp: %s; data sent; (err: %v / summary? %t)? client: %s server: %s remote: %s sz: %d", cid, werr, ssu != nil, uecaddr, ueladdr, ueraddr, n)
392392
if werr != nil {
@@ -409,7 +409,7 @@ func (h *socks5) udphandle(s *tx.Server, addr *net.UDPAddr, pkt *tx.Datagram) (e
409409
ssu := serverSummary(h.Type(), h.ID(), h.pid(), cid)
410410
defer func() {
411411
ssu.done(err)
412-
go h.listener.OnComplete(ssu)
412+
go h.listener.OnSvcComplete(ssu.ServerSummary)
413413
}()
414414

415415
log.D("svcsocks5: udp: %s; dst %s", cid, dst)
@@ -473,7 +473,7 @@ func (h *socks5) udphandle(s *tx.Server, addr *net.UDPAddr, pkt *tx.Datagram) (e
473473
log.E("svcsocks5: udp: %s; read err: %v", cid, err)
474474
return
475475
}
476-
ssu.Rx += n
476+
ssu.Rx += int64(n)
477477
log.D("svcsocks5: udp: %s; got data; client: %s server: %s remote: %s data: %d", cid, uecaddr, ueladdr, ueraddr, n)
478478
a, addr, port, err := tx.ParseAddress(dst)
479479
if err != nil {

0 commit comments

Comments
 (0)