Skip to content

Commit c2b7a54

Browse files
authored
Optimize DialStream with route caching, latency sorting, and entry ca… (skycoin#361)
* Optimize DialStream with route caching, latency sorting, and entry caching - Add route cache: remember which server successfully reached a destination, try it first on subsequent dials, evict on failure - Sort sessions by measured ping latency so lowest-latency server is tried first instead of random map iteration order - Cache discovery entry lookups with 30s TTL to avoid re-querying HTTP discovery on every request - Background ping loop measures all session RTTs every 30s * Fix dmsgweb proxy: propagate request context and fix error handling - Use http.NewRequestWithContext to propagate browser request context to dmsg dial, so cancellations stop the stream dial immediately instead of waiting for the full 20s HandshakeTimeout - Remove impossible c.String(500) after c.Status() was already written, which caused "Headers were already written" warnings in gin * Refactor HTTPTransport to use http.Transport with dmsg DialContext Replace manual stream-per-request dial/write/read pattern with Go's http.Transport using a custom DialContext. Keep-alives are disabled because dmsg streams use noise-encrypted per-stream handshakes that make connection reuse unreliable (server ReadTimeout can expire between requests, and POST requests cannot be retried on stale connections). Benefits: - Proper request context propagation through the transport - Standard error handling and timeout support - Removes manual wrappedBody response draining hack - Normalizes dmsg:// URLs to http:// for Go's transport - Cleans up idle connections on context cancellation * Fix TCP proxy race, gin server leak, ReverseProxy Director, and HTTP timeouts - Fix TCP proxy io.Copy race: close both connections after first copy returns to unblock the second, preventing goroutine leak - Replace dlog.Fatal with error return on port overflow (was killing process) - Replace gin r.Run() with http.Server and graceful Shutdown on context cancel, preventing goroutine leak on shutdown - Pass context to proxyTCPConn/proxyHTTPConn for proper cancellation - Fix silent ReverseProxy Director failure: parse URL before creating proxy, return 500 on parse error instead of forwarding to wrong URL - Add 30s timeout to HTTP clients in dmsghttp/util.go to prevent hanging * Harden dmsgweb: connection limits, body limits, close error logging - Add connection semaphore (max 256) to server-side TCP proxy to prevent unbounded goroutine growth from many simultaneous connections - Fix server-side TCP proxy io.Copy race: close both connections after first copy returns, wait for goroutine with done channel - Add 10MB request body limit via http.MaxBytesReader in HTTP proxy - Log close errors at debug level instead of silently ignoring them * Fix CI lint errors: gosec, misspell, and unhandled errors - Fix G104 (gosec): handle Close() errors with debug logging instead of ignoring them in TCP proxy - Fix G112 (gosec): add ReadHeaderTimeout to HTTP server to prevent Slowloris attacks - Fix G118 (gosec): use parent context for DialStream instead of context.Background(); add nolint for intentional Background in graceful shutdown - Fix misspell: cancelled -> canceled in comment * Revert HTTPTransport to direct stream approach for CI compatibility The http.Transport wrapper with DisableKeepAlives caused timeouts on Windows CI and hangs on Linux CI due to Go's transport adding overhead (Connection: close headers, persistConn goroutines) that interacts poorly with noise-encrypted streams under concurrent load. Revert to the proven direct approach: dial stream, write request, read response, wrap body to close stream. Keep the dmsg:// URL normalization. * Remove unnecessary nolint:govet directive
1 parent 8206a02 commit c2b7a54

7 files changed

Lines changed: 333 additions & 79 deletions

File tree

cmd/dmsgweb/commands/dmsgweb.go

Lines changed: 60 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import (
1111
"regexp"
1212
"strconv"
1313
"strings"
14-
"sync"
14+
"time"
1515

1616
"github.com/chen3feng/safecast"
1717
"github.com/confiant-inc/go-socks5"
@@ -288,28 +288,28 @@ dmsgweb conf file detected: ` + dwcfg
288288
if len(resolveDmsgAddr) == 0 && len(webPort) == 1 {
289289
if len(rawTCP) > 0 && rawTCP[0] {
290290
dlog.Debug("proxyTCPConn(-1)")
291-
proxyTCPConn(-1)
291+
proxyTCPConn(ctx, -1)
292292
} else {
293293
dlog.Debug("proxyHTTPConn(-1)")
294-
proxyHTTPConn(-1)
294+
proxyHTTPConn(ctx, -1)
295295
}
296296
} else {
297297
for i := range resolveDmsgAddr {
298298
wg.Add(1)
299299
if rawTCP[i] {
300300
dlog.Debug("proxyTCPConn(" + fmt.Sprintf("%v", i) + ")")
301-
go proxyTCPConn(i)
301+
go proxyTCPConn(ctx, i)
302302
} else {
303303
dlog.Debug("proxyHTTPConn(" + fmt.Sprintf("%v", i) + ")")
304-
go proxyHTTPConn(i)
304+
go proxyHTTPConn(ctx, i)
305305
}
306306
}
307307
}
308308
wg.Wait()
309309
},
310310
}
311311

312-
func proxyTCPConn(n int) {
312+
func proxyTCPConn(ctx context.Context, n int) { //nolint:unparam
313313
var thiswebport uint
314314
if n == -1 {
315315
thiswebport = webPort[0]
@@ -337,49 +337,56 @@ func proxyTCPConn(n int) {
337337
defer ioutil.CloseQuietly(conn, dlog)
338338
dp, ok := safecast.To[uint16](dmsgPorts[n])
339339
if !ok {
340-
dlog.Fatal("uint16 overflow when converting dmsg port")
340+
dlog.WithError(fmt.Errorf("uint16 overflow for port %v", dmsgPorts[n])).
341+
Warn("Failed to convert dmsg port")
342+
return
341343
}
342344
dlog.Debug(fmt.Sprintf("Dialing %v:%v", dialPK[n].String(), dp))
343-
dmsgConn, err := dmsgC.DialStream(context.Background(), dmsg.Addr{PK: dialPK[n], Port: dp}) //nolint
345+
dmsgConn, err := dmsgC.DialStream(ctx, dmsg.Addr{PK: dialPK[n], Port: dp})
344346
if err != nil {
345347
dlog.WithError(err).Warn(fmt.Sprintf("Failed to dial dmsg address %v port %v", dialPK[n].String(), dmsgPorts[n]))
346348
return
347349
}
348-
349350
defer ioutil.CloseQuietly(dmsgConn, dlog)
350351

351-
var wg sync.WaitGroup
352-
wg.Add(2)
353-
352+
done := make(chan struct{})
354353
go func() {
355-
defer wg.Done()
354+
defer close(done)
356355
_, err := io.Copy(dmsgConn, conn)
357356
if err != nil {
358-
dlog.WithError(err).Warn("Error on io.Copy(dmsgConn, conn)")
357+
dlog.WithError(err).Debug("io.Copy(dmsgConn, conn) ended")
359358
}
360359
}()
361360

362-
go func() {
363-
defer wg.Done()
364-
_, err := io.Copy(conn, dmsgConn)
365-
if err != nil {
366-
dlog.WithError(err).Warn("Error on io.Copy(conn, dmsgConn)")
367-
}
368-
}()
361+
_, err = io.Copy(conn, dmsgConn)
362+
if err != nil {
363+
dlog.WithError(err).Debug("io.Copy(conn, dmsgConn) ended")
364+
}
369365

370-
wg.Wait()
366+
// Close both to unblock the goroutine's io.Copy.
367+
if err := conn.Close(); err != nil {
368+
dlog.WithError(err).Debug("Error closing client conn")
369+
}
370+
if err := dmsgConn.Close(); err != nil {
371+
dlog.WithError(err).Debug("Error closing dmsg conn")
372+
}
373+
<-done
371374
}(conn, n, dmsgC)
372375
}
373376
}
374377

375-
func proxyHTTPConn(n int) {
378+
func proxyHTTPConn(ctx context.Context, n int) { //nolint:unparam
376379
r := gin.New()
377380

378381
r.Use(gin.Recovery())
379382

380383
r.Use(loggingMiddleware())
381384

382385
r.Any("/*path", func(c *gin.Context) {
386+
// Limit request body to 10MB to prevent resource exhaustion.
387+
const maxBodySize = 10 << 20
388+
c.Request.Body = http.MaxBytesReader(c.Writer, c.Request.Body, maxBodySize)
389+
383390
var urlStr string
384391
if n > -1 {
385392
urlStr = fmt.Sprintf("dmsg://%s%s", resolveDmsgAddr[n], c.Param("path"))
@@ -401,7 +408,7 @@ func proxyHTTPConn(n int) {
401408
}
402409

403410
dlog.Debug(fmt.Sprintf("Proxying request: %s %s", c.Request.Method, urlStr))
404-
req, err := http.NewRequest(c.Request.Method, urlStr, c.Request.Body)
411+
req, err := http.NewRequestWithContext(c.Request.Context(), c.Request.Method, urlStr, c.Request.Body)
405412
if err != nil {
406413
c.String(http.StatusInternalServerError, "Failed to create HTTP request")
407414
dlog.WithError(err).Warn("Failed to create HTTP request")
@@ -430,23 +437,44 @@ func proxyHTTPConn(n int) {
430437

431438
c.Status(resp.StatusCode)
432439
if _, err := io.Copy(c.Writer, resp.Body); err != nil {
433-
c.String(http.StatusInternalServerError, "Failed to copy response body")
440+
// Status header is already written; cannot override with 500.
441+
// Just log the error.
434442
dlog.WithError(err).Warn("Failed to copy response body")
435443
}
436444
})
445+
446+
var thiswebport uint
447+
if n == -1 {
448+
thiswebport = webPort[0]
449+
} else {
450+
thiswebport = webPort[n]
451+
}
452+
453+
srv := &http.Server{
454+
Addr: fmt.Sprintf(":%v", thiswebport),
455+
Handler: r,
456+
ReadHeaderTimeout: 5 * time.Second,
457+
}
458+
437459
wg.Add(1)
438460
go func() {
439461
defer wg.Done()
440-
var thiswebport uint
441-
if n == -1 {
442-
thiswebport = webPort[0]
443-
} else {
444-
thiswebport = webPort[n]
445-
}
446462
dlog.Debug(fmt.Sprintf("Serving http on: http://127.0.0.1:%v", thiswebport))
447-
r.Run(":" + fmt.Sprintf("%v", thiswebport)) //nolint
463+
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
464+
dlog.WithError(err).Error("HTTP server error")
465+
}
448466
dlog.Debug(fmt.Sprintf("Stopped serving http on: http://127.0.0.1:%v", thiswebport))
449467
}()
468+
469+
// Graceful shutdown on context cancellation.
470+
go func() { //nolint:gosec // G118: context.Background is intentional — shutdown must outlive parent ctx
471+
<-ctx.Done()
472+
shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
473+
defer cancel()
474+
if err := srv.Shutdown(shutdownCtx); err != nil {
475+
dlog.WithError(err).Warn("HTTP server shutdown error")
476+
}
477+
}()
450478
}
451479

452480
const envfileLinux = //nolint unused

cmd/dmsgweb/commands/dmsgwebsrv.go

Lines changed: 44 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -177,12 +177,13 @@ func proxyHTTPConnections(ctx context.Context, localPort uint, listener net.List
177177
}
178178
authRoute.Any("/*path", func(c *gin.Context) {
179179
targetURL := fmt.Sprintf("http://127.0.0.1:%d%s?%s", localPort, c.Request.URL.Path, c.Request.URL.RawQuery)
180+
parsed, err := url.Parse(targetURL)
181+
if err != nil {
182+
dlog.Errorf("failed to parse target URL %q: %v", targetURL, err)
183+
c.String(http.StatusInternalServerError, "Bad target URL")
184+
return
185+
}
180186
proxy := httputil.ReverseProxy{Director: func(req *http.Request) {
181-
parsed, err := url.Parse(targetURL)
182-
if err != nil {
183-
dlog.Errorf("failed to parse target URL %q: %v", targetURL, err)
184-
return
185-
}
186187
req.URL = parsed
187188
req.Host = req.URL.Host
188189
}}
@@ -211,12 +212,16 @@ func proxyHTTPConnections(ctx context.Context, localPort uint, listener net.List
211212
}
212213
}
213214

215+
// maxTCPConns is the maximum number of concurrent TCP proxy connections.
216+
const maxTCPConns = 256
217+
214218
func proxyTCPConnections(ctx context.Context, localPort uint, listener net.Listener) {
215219
// To track active connections for cleanup
216220
var connWg sync.WaitGroup
217221
connChan := make(chan net.Conn)
218222
activeConns := make(map[net.Conn]struct{})
219223
connMutex := &sync.Mutex{} // Protect access to activeConns
224+
sem := make(chan struct{}, maxTCPConns)
220225

221226
// Goroutine to accept new connections
222227
go func() {
@@ -241,11 +246,15 @@ func proxyTCPConnections(ctx context.Context, localPort uint, listener net.Liste
241246
select {
242247
case <-ctx.Done():
243248
dlog.Info("Shutting down TCP proxy connections...")
244-
listener.Close() //nolint
249+
if err := listener.Close(); err != nil {
250+
dlog.WithError(err).Debug("Error closing TCP listener")
251+
}
245252

246253
connMutex.Lock()
247254
for conn := range activeConns {
248-
conn.Close() //nolint
255+
if err := conn.Close(); err != nil {
256+
dlog.WithError(err).Debug("Error closing active connection")
257+
}
249258
}
250259
connMutex.Unlock()
251260

@@ -257,14 +266,30 @@ func proxyTCPConnections(ctx context.Context, localPort uint, listener net.Liste
257266
return
258267
}
259268

269+
// Limit concurrent connections.
270+
select {
271+
case sem <- struct{}{}:
272+
default:
273+
dlog.Warn("Max TCP connections reached, rejecting connection")
274+
if err := conn.Close(); err != nil {
275+
dlog.WithError(err).Debug("Error closing rejected connection")
276+
}
277+
continue
278+
}
279+
260280
connMutex.Lock()
261281
activeConns[conn] = struct{}{}
262282
connMutex.Unlock()
263283

264284
connWg.Add(1)
265285
go func(dmsgConn net.Conn) {
286+
defer func() { <-sem }()
266287
defer connWg.Done()
267-
defer dmsgConn.Close() //nolint
288+
defer func() {
289+
if err := dmsgConn.Close(); err != nil {
290+
dlog.WithError(err).Debug("Error closing dmsg connection")
291+
}
292+
}()
268293

269294
localConn, err := net.Dial("tcp", fmt.Sprintf("127.0.0.1:%d", localPort))
270295
if err != nil {
@@ -276,21 +301,27 @@ func proxyTCPConnections(ctx context.Context, localPort uint, listener net.Liste
276301

277302
return
278303
}
279-
defer localConn.Close() //nolint
280304

305+
done := make(chan struct{})
281306
go func() {
307+
defer close(done)
282308
_, err1 := io.Copy(dmsgConn, localConn)
283309
if err1 != nil {
284-
dlog.WithError(err1).Warn("Error on io.Copy(dmsgConn, localConn)")
310+
dlog.WithError(err1).Debug("io.Copy(dmsgConn, localConn) ended")
285311
}
286312
}()
287313
_, err2 := io.Copy(localConn, dmsgConn)
288314
if err2 != nil {
289-
dlog.WithError(err2).Warn("Error on io.Copy(localConn, dmsgConn)")
315+
dlog.WithError(err2).Debug("io.Copy(localConn, dmsgConn) ended")
290316
}
291317
// Close both to unblock the goroutine
292-
dmsgConn.Close() //nolint
293-
localConn.Close() //nolint
318+
if err := dmsgConn.Close(); err != nil {
319+
dlog.WithError(err).Debug("Error closing dmsg conn")
320+
}
321+
if err := localConn.Close(); err != nil {
322+
dlog.WithError(err).Debug("Error closing local conn")
323+
}
324+
<-done
294325

295326
connMutex.Lock()
296327
delete(activeConns, dmsgConn)

0 commit comments

Comments
 (0)