From bfa6ccb5416592260e2bdc4d2071e37c083fab2d Mon Sep 17 00:00:00 2001 From: Leonid Evdokimov Date: Mon, 13 Jan 2020 14:27:32 +0300 Subject: [PATCH 1/4] ndt7: measure and report L7 RTT based on websocket ping L7 ping may be significantly different from L4 pings and separate endpoint `/ping` is used to keep spec intact till further discussion happens. See 3520181 and https://github.com/m-lab/ndt-server/issues/192 --- data/result.go | 1 + html/ndt7-ping.js | 22 +++++++++++++++++ html/ndt7.html | 20 ++++++++++++++- ndt-server.go | 1 + ndt7/download/download.go | 11 ++++++--- ndt7/download/sender/sender.go | 18 +++++++++----- ndt7/handler/handler.go | 20 ++++++++++----- ndt7/measurer/measurer.go | 45 ++++++++++++++++++++++++---------- ndt7/model/appinfo.go | 2 ++ ndt7/ping/ping.go | 15 +++++------- ndt7/receiver/receiver.go | 37 ++++++++++++++++------------ ndt7/results/file.go | 4 +-- ndt7/spec/spec.go | 6 +++++ ndt7/upload/sender/sender.go | 12 ++++----- ndt7/upload/upload.go | 11 ++++++--- 15 files changed, 159 insertions(+), 66 deletions(-) create mode 100644 html/ndt7-ping.js diff --git a/data/result.go b/data/result.go index 36bb1571..ad2eaa0e 100644 --- a/data/result.go +++ b/data/result.go @@ -45,4 +45,5 @@ type NDTResult struct { // ndt7 Upload *model.ArchivalData `json:",omitempty"` Download *model.ArchivalData `json:",omitempty"` + Ping *model.ArchivalData `json:",omitempty"` } diff --git a/html/ndt7-ping.js b/html/ndt7-ping.js new file mode 100644 index 00000000..b2ed6c04 --- /dev/null +++ b/html/ndt7-ping.js @@ -0,0 +1,22 @@ +/* jshint esversion: 6, asi: true, worker: true */ +// WebWorker that runs the ndt7 ping test +onmessage = function (ev) { + 'use strict' + let url = new URL(ev.data.href) + url.protocol = (url.protocol === 'https:') ? 'wss:' : 'ws:' + url.pathname = '/ndt/v7/ping' + const sock = new WebSocket(url.toString(), 'net.measurementlab.ndt.v7') + sock.onclose = function () { + postMessage(null) + } + sock.onopen = function () { + sock.onmessage = function (ev) { + if (!(ev.data instanceof Blob)) { + let m = JSON.parse(ev.data) + m.Origin = 'server' + m.Test = 'ping' + postMessage(m) + } + } + } +} diff --git a/html/ndt7.html b/html/ndt7.html index e5e18742..e151be8b 100644 --- a/html/ndt7.html +++ b/html/ndt7.html @@ -24,8 +24,10 @@
+
[Ping]
[Download]
[Upload]
+
diff --git a/ndt-server.go b/ndt-server.go index e9790aed..48706eab 100644 --- a/ndt-server.go +++ b/ndt-server.go @@ -159,6 +159,7 @@ func main() { } ndt7Mux.Handle(spec.DownloadURLPath, http.HandlerFunc(ndt7Handler.Download)) ndt7Mux.Handle(spec.UploadURLPath, http.HandlerFunc(ndt7Handler.Upload)) + ndt7Mux.Handle(spec.PingURLPath, http.HandlerFunc(ndt7Handler.Ping)) ndt7Server := &http.Server{ Addr: *ndt7Addr, Handler: logging.MakeAccessLogHandler(ndt7Mux), diff --git a/ndt7/download/download.go b/ndt7/download/download.go index 832d7a21..616b37e4 100644 --- a/ndt7/download/download.go +++ b/ndt7/download/download.go @@ -3,6 +3,7 @@ package download import ( "context" + "time" "github.com/gorilla/websocket" "github.com/m-lab/ndt-server/ndt7/download/sender" @@ -15,13 +16,15 @@ import ( // Do implements the download subtest. The ctx argument is the parent // context for the subtest. The conn argument is the open WebSocket // connection. The resultfp argument is the file where to save results. Both -// arguments are owned by the caller of this function. -func Do(ctx context.Context, conn *websocket.Conn, resultfp *results.File) { +// arguments are owned by the caller of this function. The start argument is +// the test start time used to calculate ElapsedTime and deadlines. +func Do(ctx context.Context, conn *websocket.Conn, resultfp *results.File, start time.Time) { // Implementation note: use child context so that, if we cannot save the // results in the loop below, we terminate the goroutines early wholectx, cancel := context.WithCancel(ctx) defer cancel() - senderch := sender.Start(conn, measurer.Start(wholectx, conn, resultfp.Data.UUID)) - receiverch := receiver.StartDownloadReceiver(wholectx, conn) + receiverch, rttch := receiver.StartDownloadReceiver(wholectx, conn, start) + measurerch := measurer.Start(wholectx, conn, resultfp.Data.UUID, start, rttch) + senderch := sender.Start(conn, measurerch, start) saver.SaveAll(resultfp, senderch, receiverch) } diff --git a/ndt7/download/sender/sender.go b/ndt7/download/sender/sender.go index 5c4efc03..77ce812e 100644 --- a/ndt7/download/sender/sender.go +++ b/ndt7/download/sender/sender.go @@ -22,7 +22,10 @@ func makePreparedMessage(size int) (*websocket.PreparedMessage, error) { return websocket.NewPreparedMessage(websocket.BinaryMessage, data) } -func loop(conn *websocket.Conn, src <-chan model.Measurement, dst chan<- model.Measurement) { +func loop( + conn *websocket.Conn, src <-chan model.Measurement, + dst chan<- model.Measurement, start time.Time, +) { logging.Logger.Debug("sender: start") defer logging.Logger.Debug("sender: stop") defer close(dst) @@ -38,7 +41,7 @@ func loop(conn *websocket.Conn, src <-chan model.Measurement, dst chan<- model.M logging.Logger.WithError(err).Warn("sender: makePreparedMessage failed") return } - deadline := time.Now().Add(spec.MaxRuntime) + deadline := start.Add(spec.MaxRuntime) err = conn.SetWriteDeadline(deadline) // Liveness! if err != nil { logging.Logger.WithError(err).Warn("sender: conn.SetWriteDeadline failed") @@ -52,12 +55,15 @@ func loop(conn *websocket.Conn, src <-chan model.Measurement, dst chan<- model.M closer.StartClosing(conn) return } + if m.AppInfo != nil { + m.AppInfo.NumBytes = totalSent + } if err := conn.WriteJSON(m); err != nil { logging.Logger.WithError(err).Warn("sender: conn.WriteJSON failed") return } dst <- m // Liveness: this is blocking - if err := ping.SendTicks(conn, deadline); err != nil { + if err := ping.SendTicks(conn, start, deadline); err != nil { logging.Logger.WithError(err).Warn("sender: ping.SendTicks failed") return } @@ -99,9 +105,9 @@ func loop(conn *websocket.Conn, src <-chan model.Measurement, dst chan<- model.M // Liveness guarantee: the sender will not be stuck sending for more then // the MaxRuntime of the subtest, provided that the consumer will // continue reading from the returned channel. This is enforced by -// setting the write deadline to Time.Now() + MaxRuntime. -func Start(conn *websocket.Conn, src <-chan model.Measurement) <-chan model.Measurement { +// setting the write deadline to |start| + MaxRuntime. +func Start(conn *websocket.Conn, src <-chan model.Measurement, start time.Time) <-chan model.Measurement { dst := make(chan model.Measurement) - go loop(conn, src, dst) + go loop(conn, src, dst, start) return dst } diff --git a/ndt7/handler/handler.go b/ndt7/handler/handler.go index 0d7a522f..dffaaca7 100644 --- a/ndt7/handler/handler.go +++ b/ndt7/handler/handler.go @@ -36,14 +36,15 @@ func warnAndClose(writer http.ResponseWriter, message string) { // testerFunc is the function implementing a subtest. The first argument // is the subtest context. The second argument is the connected websocket. The // third argument is the open file where to write results. This function does -// not own the second or the third argument. -type testerFunc = func(context.Context, *websocket.Conn, *results.File) +// not own the second or the third argument. The fourth argument is the base +// start time of the test. +type testerFunc = func(context.Context, *websocket.Conn, *results.File, time.Time) // downloadOrUpload implements both download and upload. The writer argument // is the HTTP response writer. The request argument is the HTTP request -// that we received. The kind argument must be spec.SubtestDownload or -// spec.SubtestUpload. The tester is a function actually implementing the -// requested ndt7 subtest. +// that we received. The kind argument must be spec.SubtestDownload, +// spec.SubtestUpload, or SubtestPing. The tester is a function actually +// implementing the requested ndt7 subtest. func (h Handler) downloadOrUpload(writer http.ResponseWriter, request *http.Request, kind spec.SubtestKind, tester testerFunc) { logging.Logger.Debug("downloadOrUpload: upgrading to WebSockets") if request.Header.Get("Sec-WebSocket-Protocol") != spec.SecWebSocketProtocol { @@ -106,6 +107,8 @@ func (h Handler) downloadOrUpload(writer http.ResponseWriter, request *http.Requ result.Download = resultfp.Data } else if kind == spec.SubtestUpload { result.Upload = resultfp.Data + } else if kind == spec.SubtestPing { + result.Ping = resultfp.Data } else { logging.Logger.Warn(string(kind) + ": data not saved") } @@ -114,7 +117,7 @@ func (h Handler) downloadOrUpload(writer http.ResponseWriter, request *http.Requ } warnonerror.Close(resultfp, string(kind)+": ignoring resultfp.Close error") }() - tester(request.Context(), conn, resultfp) + tester(request.Context(), conn, resultfp, result.StartTime) } // Download handles the download subtest. @@ -126,3 +129,8 @@ func (h Handler) Download(writer http.ResponseWriter, request *http.Request) { func (h Handler) Upload(writer http.ResponseWriter, request *http.Request) { h.downloadOrUpload(writer, request, spec.SubtestUpload, upload.Do) } + +// Ping handles the ping subtest. +func (h Handler) Ping(writer http.ResponseWriter, request *http.Request) { + h.downloadOrUpload(writer, request, spec.SubtestPing, upload.Do) +} diff --git a/ndt7/measurer/measurer.go b/ndt7/measurer/measurer.go index da1afa64..0069334d 100644 --- a/ndt7/measurer/measurer.go +++ b/ndt7/measurer/measurer.go @@ -7,6 +7,7 @@ import ( "errors" "os" "time" + "math" "github.com/gorilla/websocket" "github.com/m-lab/go/memoryless" @@ -18,6 +19,10 @@ import ( "github.com/m-lab/ndt-server/tcpinfox" ) +const ( + MaxDuration = math.MaxInt64 * time.Nanosecond +) + func getSocketAndPossiblyEnableBBR(conn *websocket.Conn) (*os.File, error) { fp := fdcache.GetAndForgetFile(conn.UnderlyingConn()) // Implementation note: in theory fp SHOULD always be non-nil because @@ -39,7 +44,7 @@ func getSocketAndPossiblyEnableBBR(conn *websocket.Conn) (*os.File, error) { func measure(measurement *model.Measurement, sockfp *os.File, elapsed time.Duration) { // Implementation note: we always want to sample BBR before TCPInfo so we // will know from TCPInfo if the connection has been closed. - t := int64(elapsed / time.Microsecond) + t := elapsed.Microseconds() bbrinfo, err := bbr.GetMaxBandwidthAndMinRTT(sockfp) if err == nil { bbrinfo.ElapsedTime = t @@ -54,7 +59,7 @@ func measure(measurement *model.Measurement, sockfp *os.File, elapsed time.Durat } } -func loop(ctx context.Context, conn *websocket.Conn, UUID string, dst chan<- model.Measurement) { +func loop(ctx context.Context, conn *websocket.Conn, UUID string, dst chan<- model.Measurement, start time.Time, rttch <-chan time.Duration) { logging.Logger.Debug("measurer: start") defer logging.Logger.Debug("measurer: stop") defer close(dst) @@ -66,7 +71,6 @@ func loop(ctx context.Context, conn *websocket.Conn, UUID string, dst chan<- mod return } defer sockfp.Close() - start := time.Now() connectionInfo := &model.ConnectionInfo{ Client: conn.RemoteAddr().String(), Server: conn.LocalAddr().String(), @@ -83,17 +87,31 @@ func loop(ctx context.Context, conn *websocket.Conn, UUID string, dst chan<- mod logging.Logger.WithError(err).Warn("memoryless.NewTicker failed") return } + lastRTT, minRTT := MaxDuration, MaxDuration defer ticker.Stop() for { - now, active := <-ticker.C - if !active { - return + select { + case now, active := <-ticker.C: + if !active { + return + } + var measurement model.Measurement + measure(&measurement, sockfp, now.Sub(start)) + measurement.ConnectionInfo = connectionInfo + connectionInfo = nil + if minRTT < MaxDuration { + measurement.AppInfo = &model.AppInfo{ + ElapsedTime: now.Sub(start).Microseconds(), + LastRTT: lastRTT.Microseconds(), + MinRTT: minRTT.Microseconds(), + } + } + dst <- measurement // Liveness: this is blocking + case lastRTT = <-rttch: + if (lastRTT < minRTT) { + minRTT = lastRTT + } } - var measurement model.Measurement - measure(&measurement, sockfp, now.Sub(start)) - measurement.ConnectionInfo = connectionInfo - connectionInfo = nil - dst <- measurement // Liveness: this is blocking } } @@ -104,9 +122,10 @@ func loop(ctx context.Context, conn *websocket.Conn, UUID string, dst chan<- mod // a timeout of DefaultRuntime seconds, provided that the consumer // continues reading from the returned channel. func Start( - ctx context.Context, conn *websocket.Conn, UUID string, + ctx context.Context, conn *websocket.Conn, UUID string, start time.Time, + rttch <-chan time.Duration, ) <-chan model.Measurement { dst := make(chan model.Measurement) - go loop(ctx, conn, UUID, dst) + go loop(ctx, conn, UUID, dst, start, rttch) return dst } diff --git a/ndt7/model/appinfo.go b/ndt7/model/appinfo.go index 5c6f6fb7..3cf044f1 100644 --- a/ndt7/model/appinfo.go +++ b/ndt7/model/appinfo.go @@ -5,4 +5,6 @@ package model type AppInfo struct { NumBytes int64 ElapsedTime int64 + LastRTT int64 + MinRTT int64 } diff --git a/ndt7/ping/ping.go b/ndt7/ping/ping.go index 2af7c7ff..ac41e27a 100644 --- a/ndt7/ping/ping.go +++ b/ndt7/ping/ping.go @@ -9,10 +9,8 @@ import ( ) // SendTicks sends the current ticks as a ping message. -func SendTicks(conn *websocket.Conn, deadline time.Time) error { - // TODO(bassosimone): when we'll have a unique base time.Time reference for - // the whole test, we should use that, since UnixNano() is not monotonic. - ticks := int64(time.Now().UnixNano()) +func SendTicks(conn *websocket.Conn, start time.Time, deadline time.Time) error { + var ticks int64 = time.Since(start).Nanoseconds() data, err := json.Marshal(ticks) if err == nil { err = conn.WriteControl(websocket.PingMessage, data, deadline) @@ -20,13 +18,12 @@ func SendTicks(conn *websocket.Conn, deadline time.Time) error { return err } -func ParseTicks(s string) (d int64, err error) { - // TODO(bassosimone): when we'll have a unique base time.Time reference for - // the whole test, we should use that, since UnixNano() is not monotonic. +func ParseTicks(s string, start time.Time) (d time.Duration, err error) { + elapsed := time.Since(start).Nanoseconds() var prev int64 err = json.Unmarshal([]byte(s), &prev) - if err == nil { - d = (int64(time.Now().UnixNano()) - prev) + if err == nil && prev <= elapsed { + d = time.Duration(elapsed - prev) } return } diff --git a/ndt7/receiver/receiver.go b/ndt7/receiver/receiver.go index ab423ed2..7b5061bd 100644 --- a/ndt7/receiver/receiver.go +++ b/ndt7/receiver/receiver.go @@ -19,11 +19,12 @@ type receiverKind int const ( downloadReceiver = receiverKind(iota) uploadReceiver + pingReceiver ) func loop( ctx context.Context, conn *websocket.Conn, kind receiverKind, - dst chan<- model.Measurement, + dst chan<- model.Measurement, start time.Time, rttch chan<- time.Duration, ) { logging.Logger.Debug("receiver: start") defer logging.Logger.Debug("receiver: stop") @@ -31,16 +32,16 @@ func loop( conn.SetReadLimit(spec.MaxMessageSize) receiverctx, cancel := context.WithTimeout(ctx, spec.MaxRuntime) defer cancel() - err := conn.SetReadDeadline(time.Now().Add(spec.MaxRuntime)) // Liveness! + err := conn.SetReadDeadline(start.Add(spec.MaxRuntime)) // Liveness! if err != nil { logging.Logger.WithError(err).Warn("receiver: conn.SetReadDeadline failed") return } conn.SetPongHandler(func(s string) error { - rtt, err := ping.ParseTicks(s) + rtt, err := ping.ParseTicks(s, start) if err == nil { - rtt /= int64(time.Millisecond) - logging.Logger.Debugf("receiver: ApplicationLevel RTT: %d ms", rtt) + rttch <- rtt // Possibly blocking, but `measurer` + logging.Logger.Debugf("receiver: ApplicationLevel RTT: %d ms", rtt.Milliseconds()) } return err }) @@ -55,11 +56,11 @@ func loop( } if mtype != websocket.TextMessage { switch kind { - case downloadReceiver: + case uploadReceiver: + continue // No further processing required + default: // downloadReceiver and pingReceiver logging.Logger.Warn("receiver: got non-Text message") return // Unexpected message type - default: - continue // No further processing required } } var measurement model.Measurement @@ -72,10 +73,11 @@ func loop( } } -func start(ctx context.Context, conn *websocket.Conn, kind receiverKind) <-chan model.Measurement { +func startReceiver(ctx context.Context, conn *websocket.Conn, kind receiverKind, start time.Time) (<-chan model.Measurement, <-chan time.Duration) { dst := make(chan model.Measurement) - go loop(ctx, conn, kind, dst) - return dst + rttch := make(chan time.Duration) + go loop(ctx, conn, kind, dst, start, rttch) + return dst, rttch } // StartDownloadReceiver starts the receiver in a background goroutine and @@ -87,13 +89,18 @@ func start(ctx context.Context, conn *websocket.Conn, kind receiverKind) <-chan // Liveness guarantee: the goroutine will always terminate after a // MaxRuntime timeout, provided that the consumer will keep reading // from the returned channel. -func StartDownloadReceiver(ctx context.Context, conn *websocket.Conn) <-chan model.Measurement { - return start(ctx, conn, downloadReceiver) +func StartDownloadReceiver(ctx context.Context, conn *websocket.Conn, start time.Time) (<-chan model.Measurement, <-chan time.Duration) { + return startReceiver(ctx, conn, downloadReceiver, start) } // StartUploadReceiver is like StartDownloadReceiver except that it // tolerates incoming binary messages, which are sent to cause // network load, and therefore must not be rejected. -func StartUploadReceiver(ctx context.Context, conn *websocket.Conn) <-chan model.Measurement { - return start(ctx, conn, uploadReceiver) +func StartUploadReceiver(ctx context.Context, conn *websocket.Conn, start time.Time) (<-chan model.Measurement, <-chan time.Duration) { + return startReceiver(ctx, conn, uploadReceiver, start) +} + +// StartPingReceiver is exactly like StartDownloadReceiver currently. +func StartPingReceiver(ctx context.Context, conn *websocket.Conn, start time.Time) (<-chan model.Measurement, <-chan time.Duration) { + return startReceiver(ctx, conn, pingReceiver, start) } diff --git a/ndt7/results/file.go b/ndt7/results/file.go index 8786c429..016dc293 100644 --- a/ndt7/results/file.go +++ b/ndt7/results/file.go @@ -66,8 +66,8 @@ func newFile(datadir, what, uuid string) (*File, error) { // containing the metadata. The conn argument is used to retrieve the local and // the remote endpoints addresses. The "datadir" argument specifies the // directory on disk to write the data into and the what argument should -// indicate whether this is a spec.SubtestDownload or a spec.SubtestUpload -// ndt7 measurement. +// indicate whether this is a spec.SubtestDownload, a spec.SubtestUpload +// or a spec.SubtestPing ndt7 measurement. func OpenFor(request *http.Request, conn *websocket.Conn, datadir string, what spec.SubtestKind) (*File, error) { meta := make(metadata, 0) netConn := conn.UnderlyingConn() diff --git a/ndt7/spec/spec.go b/ndt7/spec/spec.go index 6d47d812..8d7c8522 100644 --- a/ndt7/spec/spec.go +++ b/ndt7/spec/spec.go @@ -9,6 +9,9 @@ const DownloadURLPath = "/ndt/v7/download" // UploadURLPath selects the upload subtest. const UploadURLPath = "/ndt/v7/upload" +// PingURLPath selects the ping subtest. +const PingURLPath = "/ndt/v7/ping" + // SecWebSocketProtocol is the WebSocket subprotocol used by ndt7. const SecWebSocketProtocol = "net.measurementlab.ndt.v7" @@ -57,4 +60,7 @@ const ( // SubtestUpload is a upload subtest SubtestUpload = SubtestKind("upload") + + // SubtestPing is a ping subtest + SubtestPing = SubtestKind("ping") ) diff --git a/ndt7/upload/sender/sender.go b/ndt7/upload/sender/sender.go index ff46a130..4d03f959 100644 --- a/ndt7/upload/sender/sender.go +++ b/ndt7/upload/sender/sender.go @@ -14,7 +14,7 @@ import ( func loop( conn *websocket.Conn, src <-chan model.Measurement, - dst chan<- model.Measurement, + dst chan<- model.Measurement, start time.Time, ) { logging.Logger.Debug("sender: start") defer logging.Logger.Debug("sender: stop") @@ -24,7 +24,7 @@ func loop( // make sure we drain the channel } }() - deadline := time.Now().Add(spec.MaxRuntime) + deadline := start.Add(spec.MaxRuntime) err := conn.SetWriteDeadline(deadline) // Liveness! if err != nil { logging.Logger.WithError(err).Warn("sender: conn.SetWriteDeadline failed") @@ -41,7 +41,7 @@ func loop( return } dst <- m // Liveness: this is blocking - if err := ping.SendTicks(conn, deadline); err != nil { + if err := ping.SendTicks(conn, start, deadline); err != nil { logging.Logger.WithError(err).Warn("sender: ping.SendTicks failed") return } @@ -55,9 +55,9 @@ func loop( // Liveness guarantee: the sender will not be stuck sending for more then // the MaxRuntime of the subtest, provided that the consumer will // continue reading from the returned channel. This is enforced by -// setting the write deadline to MaxRuntime + time.Now. -func Start(conn *websocket.Conn, src <-chan model.Measurement) <-chan model.Measurement { +// setting the write deadline to |start| + MaxRuntime. +func Start(conn *websocket.Conn, src <-chan model.Measurement, start time.Time) <-chan model.Measurement { dst := make(chan model.Measurement) - go loop(conn, src, dst) + go loop(conn, src, dst, start) return dst } diff --git a/ndt7/upload/upload.go b/ndt7/upload/upload.go index 511c3a70..f164e922 100644 --- a/ndt7/upload/upload.go +++ b/ndt7/upload/upload.go @@ -3,6 +3,7 @@ package upload import ( "context" + "time" "github.com/gorilla/websocket" "github.com/m-lab/ndt-server/ndt7/results" @@ -15,13 +16,15 @@ import ( // Do implements the upload subtest. The ctx argument is the parent context // for the subtest. The conn argument is the open WebSocket connection. The // resultfp argument is the file where to save results. Both arguments are -// owned by the caller of this function. -func Do(ctx context.Context, conn *websocket.Conn, resultfp *results.File) { +// owned by the caller of this function. The start argument is the test +// start time used to calculate ElapsedTime and deadlines. +func Do(ctx context.Context, conn *websocket.Conn, resultfp *results.File, start time.Time) { // Implementation note: use child context so that, if we cannot save the // results in the loop below, we terminate the goroutines early wholectx, cancel := context.WithCancel(ctx) defer cancel() - senderch := sender.Start(conn, measurer.Start(wholectx, conn, resultfp.Data.UUID)) - receiverch := receiver.StartUploadReceiver(wholectx, conn) + receiverch, rttch := receiver.StartUploadReceiver(wholectx, conn, start) + measurerch := measurer.Start(wholectx, conn, resultfp.Data.UUID, start, rttch) + senderch := sender.Start(conn, measurerch, start) saver.SaveAll(resultfp, senderch, receiverch) } From 3a8800008c9f8bc56800c5fb9728b5aa63d84d34 Mon Sep 17 00:00:00 2001 From: Leonid Evdokimov Date: Mon, 13 Jan 2020 20:34:39 +0300 Subject: [PATCH 2/4] ndt7: fix liveness bug in L7-RTT measurement Also, move that measurement data into to WSInfo sub-object as it's currently unclear if AppInfo should be extended or not. It's part of ndt7 spec :) TODO: squash this commit into previous one --- html/ndt7.html | 15 +++++++---- ndt7/download/download.go | 6 ++--- ndt7/download/sender/sender.go | 21 +++++++++++----- ndt7/measurer/measurer.go | 40 ++++++++--------------------- ndt7/model/appinfo.go | 2 -- ndt7/model/measurement.go | 1 + ndt7/model/wsinfo.go | 10 ++++++++ ndt7/ping/ping.go | 14 ++++++++--- ndt7/receiver/receiver.go | 39 ++++++++++++++++++++-------- ndt7/upload/sender/sender.go | 46 ++++++++++++++++++++++------------ ndt7/upload/upload.go | 6 ++--- 11 files changed, 121 insertions(+), 79 deletions(-) create mode 100644 ndt7/model/wsinfo.go diff --git a/html/ndt7.html b/html/ndt7.html index e151be8b..fbd3b107 100644 --- a/html/ndt7.html +++ b/html/ndt7.html @@ -51,6 +51,8 @@ } function runSomething(testName, callback) { + let ws = Number.NaN; + let tcp = Number.NaN; ndt7core.run(location.href, testName, function(ev, val) { console.log(ev, val) if (ev === 'complete') { @@ -67,12 +69,15 @@ val.Origin === 'client') { updateView(testName, val.AppInfo) } - if (ev === 'measurement' && val.AppInfo !== undefined && - val.Origin === 'server' && testName === 'ping') { + if (ev === 'measurement' && val.Origin === 'server' && testName === 'ping') { + if (val.WSInfo !== undefined) { + ws = val.WSInfo.MinRTT / 1e3 + } + if (val.TCPInfo !== undefined) { + tcp = val.TCPInfo.MinRTT / 1e3 + } withElementDo('ping', function (elem) { - const ws = val.AppInfo.MinRTT / 1e3 - const tcp = val.TCPInfo.MinRTT / 1e3 - elem.innerHTML = ws.toFixed(1) + ' / ' + tcp.toFixed(1) + ' ms' + elem.innerHTML = '⓻ ' + ws.toFixed(1) + ' / ⓸ ' + tcp.toFixed(1) + ' ms' }) } }) diff --git a/ndt7/download/download.go b/ndt7/download/download.go index 616b37e4..9e3700d2 100644 --- a/ndt7/download/download.go +++ b/ndt7/download/download.go @@ -23,8 +23,8 @@ func Do(ctx context.Context, conn *websocket.Conn, resultfp *results.File, start // results in the loop below, we terminate the goroutines early wholectx, cancel := context.WithCancel(ctx) defer cancel() - receiverch, rttch := receiver.StartDownloadReceiver(wholectx, conn, start) - measurerch := measurer.Start(wholectx, conn, resultfp.Data.UUID, start, rttch) - senderch := sender.Start(conn, measurerch, start) + measurerch := measurer.Start(wholectx, conn, resultfp.Data.UUID, start) + receiverch, pongch := receiver.StartDownloadReceiver(wholectx, conn, start, measurerch) + senderch := sender.Start(conn, measurerch, start, pongch) saver.SaveAll(resultfp, senderch, receiverch) } diff --git a/ndt7/download/sender/sender.go b/ndt7/download/sender/sender.go index 77ce812e..0440ea46 100644 --- a/ndt7/download/sender/sender.go +++ b/ndt7/download/sender/sender.go @@ -24,7 +24,7 @@ func makePreparedMessage(size int) (*websocket.PreparedMessage, error) { func loop( conn *websocket.Conn, src <-chan model.Measurement, - dst chan<- model.Measurement, start time.Time, + dst chan<- model.Measurement, start time.Time, pongch <-chan model.WSInfo, ) { logging.Logger.Debug("sender: start") defer logging.Logger.Debug("sender: stop") @@ -33,6 +33,9 @@ func loop( for range src { // make sure we drain the channel } + for range pongch { + // it should be buffered channel, but let's drain it anyway + } }() logging.Logger.Debug("sender: generating random buffer") bulkMessageSize := 1 << 13 @@ -55,9 +58,6 @@ func loop( closer.StartClosing(conn) return } - if m.AppInfo != nil { - m.AppInfo.NumBytes = totalSent - } if err := conn.WriteJSON(m); err != nil { logging.Logger.WithError(err).Warn("sender: conn.WriteJSON failed") return @@ -67,6 +67,15 @@ func loop( logging.Logger.WithError(err).Warn("sender: ping.SendTicks failed") return } + case wsinfo := <-pongch: + m := model.Measurement{ + WSInfo: &wsinfo, + } + if err := conn.WriteJSON(m); err != nil { + logging.Logger.WithError(err).Warn("sender: conn.WriteJSON failed") + return + } + dst <- m // Liveness: this is blocking write to log default: if err := conn.WritePreparedMessage(preparedMessage); err != nil { logging.Logger.WithError(err).Warn( @@ -106,8 +115,8 @@ func loop( // the MaxRuntime of the subtest, provided that the consumer will // continue reading from the returned channel. This is enforced by // setting the write deadline to |start| + MaxRuntime. -func Start(conn *websocket.Conn, src <-chan model.Measurement, start time.Time) <-chan model.Measurement { +func Start(conn *websocket.Conn, src <-chan model.Measurement, start time.Time, pongch <-chan model.WSInfo) <-chan model.Measurement { dst := make(chan model.Measurement) - go loop(conn, src, dst, start) + go loop(conn, src, dst, start, pongch) return dst } diff --git a/ndt7/measurer/measurer.go b/ndt7/measurer/measurer.go index 0069334d..3d2ac31e 100644 --- a/ndt7/measurer/measurer.go +++ b/ndt7/measurer/measurer.go @@ -7,7 +7,6 @@ import ( "errors" "os" "time" - "math" "github.com/gorilla/websocket" "github.com/m-lab/go/memoryless" @@ -19,10 +18,6 @@ import ( "github.com/m-lab/ndt-server/tcpinfox" ) -const ( - MaxDuration = math.MaxInt64 * time.Nanosecond -) - func getSocketAndPossiblyEnableBBR(conn *websocket.Conn) (*os.File, error) { fp := fdcache.GetAndForgetFile(conn.UnderlyingConn()) // Implementation note: in theory fp SHOULD always be non-nil because @@ -59,7 +54,7 @@ func measure(measurement *model.Measurement, sockfp *os.File, elapsed time.Durat } } -func loop(ctx context.Context, conn *websocket.Conn, UUID string, dst chan<- model.Measurement, start time.Time, rttch <-chan time.Duration) { +func loop(ctx context.Context, conn *websocket.Conn, UUID string, dst chan<- model.Measurement, start time.Time) { logging.Logger.Debug("measurer: start") defer logging.Logger.Debug("measurer: stop") defer close(dst) @@ -87,31 +82,17 @@ func loop(ctx context.Context, conn *websocket.Conn, UUID string, dst chan<- mod logging.Logger.WithError(err).Warn("memoryless.NewTicker failed") return } - lastRTT, minRTT := MaxDuration, MaxDuration defer ticker.Stop() for { - select { - case now, active := <-ticker.C: - if !active { - return - } - var measurement model.Measurement - measure(&measurement, sockfp, now.Sub(start)) - measurement.ConnectionInfo = connectionInfo - connectionInfo = nil - if minRTT < MaxDuration { - measurement.AppInfo = &model.AppInfo{ - ElapsedTime: now.Sub(start).Microseconds(), - LastRTT: lastRTT.Microseconds(), - MinRTT: minRTT.Microseconds(), - } - } - dst <- measurement // Liveness: this is blocking - case lastRTT = <-rttch: - if (lastRTT < minRTT) { - minRTT = lastRTT - } + now, active := <-ticker.C + if !active { + return } + var measurement model.Measurement + measure(&measurement, sockfp, now.Sub(start)) + measurement.ConnectionInfo = connectionInfo + connectionInfo = nil + dst <- measurement // Liveness: this is blocking } } @@ -123,9 +104,8 @@ func loop(ctx context.Context, conn *websocket.Conn, UUID string, dst chan<- mod // continues reading from the returned channel. func Start( ctx context.Context, conn *websocket.Conn, UUID string, start time.Time, - rttch <-chan time.Duration, ) <-chan model.Measurement { dst := make(chan model.Measurement) - go loop(ctx, conn, UUID, dst, start, rttch) + go loop(ctx, conn, UUID, dst, start) return dst } diff --git a/ndt7/model/appinfo.go b/ndt7/model/appinfo.go index 3cf044f1..5c6f6fb7 100644 --- a/ndt7/model/appinfo.go +++ b/ndt7/model/appinfo.go @@ -5,6 +5,4 @@ package model type AppInfo struct { NumBytes int64 ElapsedTime int64 - LastRTT int64 - MinRTT int64 } diff --git a/ndt7/model/measurement.go b/ndt7/model/measurement.go index d0cee8c5..fe24eed7 100644 --- a/ndt7/model/measurement.go +++ b/ndt7/model/measurement.go @@ -8,4 +8,5 @@ type Measurement struct { ConnectionInfo *ConnectionInfo `json:",omitempty" bigquery:"-"` BBRInfo *BBRInfo `json:",omitempty"` TCPInfo *TCPInfo `json:",omitempty"` + WSInfo *WSInfo `json:",omitempty"` } diff --git a/ndt7/model/wsinfo.go b/ndt7/model/wsinfo.go new file mode 100644 index 00000000..53bc51d2 --- /dev/null +++ b/ndt7/model/wsinfo.go @@ -0,0 +1,10 @@ +package model + +// WSInfo contains an application level (websocket) ping measurement data. +// It may be melded into AppInfo. +// FIXME: describe this structure is in the ndt7 specification. +type WSInfo struct { + ElapsedTime int64 + LastRTT int64 // TCPInfo.RTT is smoothed RTT, LastRTT is just a sample. + MinRTT int64 +} diff --git a/ndt7/ping/ping.go b/ndt7/ping/ping.go index ac41e27a..cb0378b6 100644 --- a/ndt7/ping/ping.go +++ b/ndt7/ping/ping.go @@ -3,6 +3,7 @@ package ping import ( "encoding/json" + "errors" "time" "github.com/gorilla/websocket" @@ -18,12 +19,17 @@ func SendTicks(conn *websocket.Conn, start time.Time, deadline time.Time) error return err } -func ParseTicks(s string, start time.Time) (d time.Duration, err error) { - elapsed := time.Since(start).Nanoseconds() +func ParseTicks(s string, start time.Time) (elapsed time.Duration, d time.Duration, err error) { + elapsed = time.Since(start) var prev int64 err = json.Unmarshal([]byte(s), &prev) - if err == nil && prev <= elapsed { - d = time.Duration(elapsed - prev) + if err != nil { + return + } + if 0 <= prev && prev <= elapsed.Nanoseconds() { + d = time.Duration(elapsed.Nanoseconds() - prev) + } else { + err = errors.New("RTT is negative") } return } diff --git a/ndt7/receiver/receiver.go b/ndt7/receiver/receiver.go index 7b5061bd..1a56d5c1 100644 --- a/ndt7/receiver/receiver.go +++ b/ndt7/receiver/receiver.go @@ -6,6 +6,7 @@ import ( "context" "encoding/json" "time" + "math" "github.com/gorilla/websocket" "github.com/m-lab/ndt-server/logging" @@ -22,13 +23,18 @@ const ( pingReceiver ) +const ( + MaxDuration = math.MaxInt64 * time.Nanosecond +) + func loop( ctx context.Context, conn *websocket.Conn, kind receiverKind, - dst chan<- model.Measurement, start time.Time, rttch chan<- time.Duration, + dst chan<- model.Measurement, start time.Time, pongch chan<- model.WSInfo, ) { logging.Logger.Debug("receiver: start") defer logging.Logger.Debug("receiver: stop") defer close(dst) + defer close(pongch) conn.SetReadLimit(spec.MaxMessageSize) receiverctx, cancel := context.WithTimeout(ctx, spec.MaxRuntime) defer cancel() @@ -37,11 +43,20 @@ func loop( logging.Logger.WithError(err).Warn("receiver: conn.SetReadDeadline failed") return } + minRTT := MaxDuration conn.SetPongHandler(func(s string) error { - rtt, err := ping.ParseTicks(s, start) + elapsed, rtt, err := ping.ParseTicks(s, start) if err == nil { - rttch <- rtt // Possibly blocking, but `measurer` logging.Logger.Debugf("receiver: ApplicationLevel RTT: %d ms", rtt.Milliseconds()) + if rtt < minRTT { + minRTT = rtt + } + wsinfo := model.WSInfo{ + ElapsedTime: elapsed.Microseconds(), + LastRTT: rtt.Microseconds(), + MinRTT: minRTT.Microseconds(), + } + pongch <- wsinfo // Liveness: buffered (sender) } return err }) @@ -73,11 +88,15 @@ func loop( } } -func startReceiver(ctx context.Context, conn *websocket.Conn, kind receiverKind, start time.Time) (<-chan model.Measurement, <-chan time.Duration) { +func startReceiver(ctx context.Context, conn *websocket.Conn, kind receiverKind, start time.Time) (<-chan model.Measurement, <-chan model.WSInfo) { + // |dst| is going to the log file dst := make(chan model.Measurement) - rttch := make(chan time.Duration) - go loop(ctx, conn, kind, dst, start, rttch) - return dst, rttch + // |pongch| goes to the client, it's buffered to avoid blocking on `download.sender.loop` + // while `conn.WritePreparedMessage()` is active. + // TODO(darkk): is it possible to reduce buffer size or to avoiding blocking in some other way? May avoiding L7 pings at /download altogether be the way? + pongch := make(chan model.WSInfo, 1 + spec.MaxRuntime / spec.MinPoissonSamplingInterval) + go loop(ctx, conn, kind, dst, start, pongch) + return dst, pongch } // StartDownloadReceiver starts the receiver in a background goroutine and @@ -89,18 +108,18 @@ func startReceiver(ctx context.Context, conn *websocket.Conn, kind receiverKind, // Liveness guarantee: the goroutine will always terminate after a // MaxRuntime timeout, provided that the consumer will keep reading // from the returned channel. -func StartDownloadReceiver(ctx context.Context, conn *websocket.Conn, start time.Time) (<-chan model.Measurement, <-chan time.Duration) { +func StartDownloadReceiver(ctx context.Context, conn *websocket.Conn, start time.Time, msmch <-chan model.Measurement) (<-chan model.Measurement, <-chan model.WSInfo) { return startReceiver(ctx, conn, downloadReceiver, start) } // StartUploadReceiver is like StartDownloadReceiver except that it // tolerates incoming binary messages, which are sent to cause // network load, and therefore must not be rejected. -func StartUploadReceiver(ctx context.Context, conn *websocket.Conn, start time.Time) (<-chan model.Measurement, <-chan time.Duration) { +func StartUploadReceiver(ctx context.Context, conn *websocket.Conn, start time.Time) (<-chan model.Measurement, <-chan model.WSInfo) { return startReceiver(ctx, conn, uploadReceiver, start) } // StartPingReceiver is exactly like StartDownloadReceiver currently. -func StartPingReceiver(ctx context.Context, conn *websocket.Conn, start time.Time) (<-chan model.Measurement, <-chan time.Duration) { +func StartPingReceiver(ctx context.Context, conn *websocket.Conn, start time.Time) (<-chan model.Measurement, <-chan model.WSInfo) { return startReceiver(ctx, conn, pingReceiver, start) } diff --git a/ndt7/upload/sender/sender.go b/ndt7/upload/sender/sender.go index 4d03f959..eee5c007 100644 --- a/ndt7/upload/sender/sender.go +++ b/ndt7/upload/sender/sender.go @@ -14,7 +14,7 @@ import ( func loop( conn *websocket.Conn, src <-chan model.Measurement, - dst chan<- model.Measurement, start time.Time, + dst chan<- model.Measurement, start time.Time, pongch <-chan model.WSInfo, ) { logging.Logger.Debug("sender: start") defer logging.Logger.Debug("sender: stop") @@ -23,6 +23,9 @@ func loop( for range src { // make sure we drain the channel } + for range pongch { + // it should be buffered channel, but let's drain it anyway + } }() deadline := start.Add(spec.MaxRuntime) err := conn.SetWriteDeadline(deadline) // Liveness! @@ -31,19 +34,30 @@ func loop( return } for { - m, ok := <-src - if !ok { // This means that the previous step has terminated - closer.StartClosing(conn) - return - } - if err := conn.WriteJSON(m); err != nil { - logging.Logger.WithError(err).Warn("sender: conn.WriteJSON failed") - return - } - dst <- m // Liveness: this is blocking - if err := ping.SendTicks(conn, start, deadline); err != nil { - logging.Logger.WithError(err).Warn("sender: ping.SendTicks failed") - return + select { + case m, ok := <-src: + if !ok { // This means that the previous step has terminated + closer.StartClosing(conn) + return + } + if err := conn.WriteJSON(m); err != nil { + logging.Logger.WithError(err).Warn("sender: conn.WriteJSON failed") + return + } + dst <- m // Liveness: this is blocking + if err := ping.SendTicks(conn, start, deadline); err != nil { + logging.Logger.WithError(err).Warn("sender: ping.SendTicks failed") + return + } + case wsinfo := <-pongch: + m := model.Measurement{ + WSInfo: &wsinfo, + } + if err := conn.WriteJSON(m); err != nil { + logging.Logger.WithError(err).Warn("sender: conn.WriteJSON failed") + return + } + dst <- m // Liveness: this is blocking write to log } } } @@ -56,8 +70,8 @@ func loop( // the MaxRuntime of the subtest, provided that the consumer will // continue reading from the returned channel. This is enforced by // setting the write deadline to |start| + MaxRuntime. -func Start(conn *websocket.Conn, src <-chan model.Measurement, start time.Time) <-chan model.Measurement { +func Start(conn *websocket.Conn, src <-chan model.Measurement, start time.Time, pongch <-chan model.WSInfo) <-chan model.Measurement { dst := make(chan model.Measurement) - go loop(conn, src, dst, start) + go loop(conn, src, dst, start, pongch) return dst } diff --git a/ndt7/upload/upload.go b/ndt7/upload/upload.go index f164e922..11851b6c 100644 --- a/ndt7/upload/upload.go +++ b/ndt7/upload/upload.go @@ -23,8 +23,8 @@ func Do(ctx context.Context, conn *websocket.Conn, resultfp *results.File, start // results in the loop below, we terminate the goroutines early wholectx, cancel := context.WithCancel(ctx) defer cancel() - receiverch, rttch := receiver.StartUploadReceiver(wholectx, conn, start) - measurerch := measurer.Start(wholectx, conn, resultfp.Data.UUID, start, rttch) - senderch := sender.Start(conn, measurerch, start) + measurerch := measurer.Start(wholectx, conn, resultfp.Data.UUID, start) + receiverch, pongch := receiver.StartUploadReceiver(wholectx, conn, start) + senderch := sender.Start(conn, measurerch, start, pongch) saver.SaveAll(resultfp, senderch, receiverch) } From 6cee443a37070f45bc32f14ba33c5eff57b72d3c Mon Sep 17 00:00:00 2001 From: Leonid Evdokimov Date: Tue, 14 Jan 2020 17:48:58 +0300 Subject: [PATCH 3/4] Fix compatibility with go1.12 --- ndt7/measurer/measurer.go | 2 +- ndt7/receiver/receiver.go | 9 +++++---- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/ndt7/measurer/measurer.go b/ndt7/measurer/measurer.go index 3d2ac31e..fc4c3f2b 100644 --- a/ndt7/measurer/measurer.go +++ b/ndt7/measurer/measurer.go @@ -39,7 +39,7 @@ func getSocketAndPossiblyEnableBBR(conn *websocket.Conn) (*os.File, error) { func measure(measurement *model.Measurement, sockfp *os.File, elapsed time.Duration) { // Implementation note: we always want to sample BBR before TCPInfo so we // will know from TCPInfo if the connection has been closed. - t := elapsed.Microseconds() + t := int64(elapsed / time.Microsecond) bbrinfo, err := bbr.GetMaxBandwidthAndMinRTT(sockfp) if err == nil { bbrinfo.ElapsedTime = t diff --git a/ndt7/receiver/receiver.go b/ndt7/receiver/receiver.go index 1a56d5c1..3e8a6ee5 100644 --- a/ndt7/receiver/receiver.go +++ b/ndt7/receiver/receiver.go @@ -47,14 +47,15 @@ func loop( conn.SetPongHandler(func(s string) error { elapsed, rtt, err := ping.ParseTicks(s, start) if err == nil { - logging.Logger.Debugf("receiver: ApplicationLevel RTT: %d ms", rtt.Milliseconds()) + logging.Logger.Debugf("receiver: ApplicationLevel RTT: %d ms", int64(rtt / time.Millisecond)) if rtt < minRTT { minRTT = rtt } + wsinfo := model.WSInfo{ - ElapsedTime: elapsed.Microseconds(), - LastRTT: rtt.Microseconds(), - MinRTT: minRTT.Microseconds(), + ElapsedTime: int64(elapsed / time.Microsecond), + LastRTT: int64(rtt / time.Microsecond), + MinRTT: int64(minRTT / time.Microsecond), } pongch <- wsinfo // Liveness: buffered (sender) } From d7e80651bea6b9ea991d513d6315b8f397ff54e4 Mon Sep 17 00:00:00 2001 From: Leonid Evdokimov Date: Tue, 14 Jan 2020 18:03:51 +0300 Subject: [PATCH 4/4] ndt7: take one L7-RTT sample right before flooding the connection --- ndt7/download/sender/sender.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/ndt7/download/sender/sender.go b/ndt7/download/sender/sender.go index 0440ea46..b7ce17fe 100644 --- a/ndt7/download/sender/sender.go +++ b/ndt7/download/sender/sender.go @@ -50,6 +50,11 @@ func loop( logging.Logger.WithError(err).Warn("sender: conn.SetWriteDeadline failed") return } + // only the first RTT sample taken before flooding the conn is not affected by HOL + if err := ping.SendTicks(conn, start, deadline); err != nil { + logging.Logger.WithError(err).Warn("sender: ping.SendTicks failed") + return + } var totalSent int64 for { select {