From 9be4dcf098fc3de3d155769b6af9adeb70fbc447 Mon Sep 17 00:00:00 2001 From: Leonid Evdokimov Date: Mon, 13 Jan 2020 14:27:32 +0300 Subject: [PATCH 1/7] ndt7: measure and report L7 RTT based on websocket ping L7 ping may be significantly different from L4 pings and separate endpoint `/ping` is used to keep spec intact till further discussion happens. One L7-ping sent before `/download` test to get one sample that is not biased by the queue of bytes. L7 ping is logged as `WSInfo` sub-object as it's currently unclear if AppInfo should be extended or not. It's part of ndt7 spec :) See 3520181 and https://github.com/m-lab/ndt-server/issues/192 --- data/result.go | 1 + html/ndt7-ping.js | 22 +++++++++++++ html/ndt7.html | 25 ++++++++++++++- ndt-server.go | 1 + ndt7/download/download.go | 11 ++++--- ndt7/download/sender/sender.go | 32 +++++++++++++++---- ndt7/handler/handler.go | 20 ++++++++---- ndt7/measurer/measurer.go | 7 ++--- ndt7/model/measurement.go | 1 + ndt7/model/wsinfo.go | 10 ++++++ ndt7/ping/ping.go | 21 +++++++------ ndt7/receiver/receiver.go | 57 +++++++++++++++++++++++++--------- ndt7/results/file.go | 4 +-- ndt7/spec/spec.go | 6 ++++ ndt7/upload/sender/sender.go | 50 ++++++++++++++++++----------- ndt7/upload/upload.go | 11 ++++--- 16 files changed, 210 insertions(+), 69 deletions(-) create mode 100644 html/ndt7-ping.js create mode 100644 ndt7/model/wsinfo.go diff --git a/data/result.go b/data/result.go index 36bb1571..ad2eaa0e 100644 --- a/data/result.go +++ b/data/result.go @@ -45,4 +45,5 @@ type NDTResult struct { // ndt7 Upload *model.ArchivalData `json:",omitempty"` Download *model.ArchivalData `json:",omitempty"` + Ping *model.ArchivalData `json:",omitempty"` } diff --git a/html/ndt7-ping.js b/html/ndt7-ping.js new file mode 100644 index 00000000..b2ed6c04 --- /dev/null +++ b/html/ndt7-ping.js @@ -0,0 +1,22 @@ +/* jshint esversion: 6, asi: true, worker: true */ +// WebWorker that runs the ndt7 ping test +onmessage = function (ev) { + 'use strict' + let url = new URL(ev.data.href) + url.protocol = (url.protocol === 'https:') ? 'wss:' : 'ws:' + url.pathname = '/ndt/v7/ping' + const sock = new WebSocket(url.toString(), 'net.measurementlab.ndt.v7') + sock.onclose = function () { + postMessage(null) + } + sock.onopen = function () { + sock.onmessage = function (ev) { + if (!(ev.data instanceof Blob)) { + let m = JSON.parse(ev.data) + m.Origin = 'server' + m.Test = 'ping' + postMessage(m) + } + } + } +} diff --git a/html/ndt7.html b/html/ndt7.html index e5e18742..fbd3b107 100644 --- a/html/ndt7.html +++ b/html/ndt7.html @@ -24,8 +24,10 @@
+
[Ping]
[Download]
[Upload]
+
diff --git a/ndt-server.go b/ndt-server.go index e9790aed..48706eab 100644 --- a/ndt-server.go +++ b/ndt-server.go @@ -159,6 +159,7 @@ func main() { } ndt7Mux.Handle(spec.DownloadURLPath, http.HandlerFunc(ndt7Handler.Download)) ndt7Mux.Handle(spec.UploadURLPath, http.HandlerFunc(ndt7Handler.Upload)) + ndt7Mux.Handle(spec.PingURLPath, http.HandlerFunc(ndt7Handler.Ping)) ndt7Server := &http.Server{ Addr: *ndt7Addr, Handler: logging.MakeAccessLogHandler(ndt7Mux), diff --git a/ndt7/download/download.go b/ndt7/download/download.go index 832d7a21..9e3700d2 100644 --- a/ndt7/download/download.go +++ b/ndt7/download/download.go @@ -3,6 +3,7 @@ package download import ( "context" + "time" "github.com/gorilla/websocket" "github.com/m-lab/ndt-server/ndt7/download/sender" @@ -15,13 +16,15 @@ import ( // Do implements the download subtest. The ctx argument is the parent // context for the subtest. The conn argument is the open WebSocket // connection. The resultfp argument is the file where to save results. Both -// arguments are owned by the caller of this function. -func Do(ctx context.Context, conn *websocket.Conn, resultfp *results.File) { +// arguments are owned by the caller of this function. The start argument is +// the test start time used to calculate ElapsedTime and deadlines. +func Do(ctx context.Context, conn *websocket.Conn, resultfp *results.File, start time.Time) { // Implementation note: use child context so that, if we cannot save the // results in the loop below, we terminate the goroutines early wholectx, cancel := context.WithCancel(ctx) defer cancel() - senderch := sender.Start(conn, measurer.Start(wholectx, conn, resultfp.Data.UUID)) - receiverch := receiver.StartDownloadReceiver(wholectx, conn) + measurerch := measurer.Start(wholectx, conn, resultfp.Data.UUID, start) + receiverch, pongch := receiver.StartDownloadReceiver(wholectx, conn, start, measurerch) + senderch := sender.Start(conn, measurerch, start, pongch) saver.SaveAll(resultfp, senderch, receiverch) } diff --git a/ndt7/download/sender/sender.go b/ndt7/download/sender/sender.go index 5c4efc03..b7ce17fe 100644 --- a/ndt7/download/sender/sender.go +++ b/ndt7/download/sender/sender.go @@ -22,7 +22,10 @@ func makePreparedMessage(size int) (*websocket.PreparedMessage, error) { return websocket.NewPreparedMessage(websocket.BinaryMessage, data) } -func loop(conn *websocket.Conn, src <-chan model.Measurement, dst chan<- model.Measurement) { +func loop( + conn *websocket.Conn, src <-chan model.Measurement, + dst chan<- model.Measurement, start time.Time, pongch <-chan model.WSInfo, +) { logging.Logger.Debug("sender: start") defer logging.Logger.Debug("sender: stop") defer close(dst) @@ -30,6 +33,9 @@ func loop(conn *websocket.Conn, src <-chan model.Measurement, dst chan<- model.M for range src { // make sure we drain the channel } + for range pongch { + // it should be buffered channel, but let's drain it anyway + } }() logging.Logger.Debug("sender: generating random buffer") bulkMessageSize := 1 << 13 @@ -38,12 +44,17 @@ func loop(conn *websocket.Conn, src <-chan model.Measurement, dst chan<- model.M logging.Logger.WithError(err).Warn("sender: makePreparedMessage failed") return } - deadline := time.Now().Add(spec.MaxRuntime) + deadline := start.Add(spec.MaxRuntime) err = conn.SetWriteDeadline(deadline) // Liveness! if err != nil { logging.Logger.WithError(err).Warn("sender: conn.SetWriteDeadline failed") return } + // only the first RTT sample taken before flooding the conn is not affected by HOL + if err := ping.SendTicks(conn, start, deadline); err != nil { + logging.Logger.WithError(err).Warn("sender: ping.SendTicks failed") + return + } var totalSent int64 for { select { @@ -57,10 +68,19 @@ func loop(conn *websocket.Conn, src <-chan model.Measurement, dst chan<- model.M return } dst <- m // Liveness: this is blocking - if err := ping.SendTicks(conn, deadline); err != nil { + if err := ping.SendTicks(conn, start, deadline); err != nil { logging.Logger.WithError(err).Warn("sender: ping.SendTicks failed") return } + case wsinfo := <-pongch: + m := model.Measurement{ + WSInfo: &wsinfo, + } + if err := conn.WriteJSON(m); err != nil { + logging.Logger.WithError(err).Warn("sender: conn.WriteJSON failed") + return + } + dst <- m // Liveness: this is blocking write to log default: if err := conn.WritePreparedMessage(preparedMessage); err != nil { logging.Logger.WithError(err).Warn( @@ -99,9 +119,9 @@ func loop(conn *websocket.Conn, src <-chan model.Measurement, dst chan<- model.M // Liveness guarantee: the sender will not be stuck sending for more then // the MaxRuntime of the subtest, provided that the consumer will // continue reading from the returned channel. This is enforced by -// setting the write deadline to Time.Now() + MaxRuntime. -func Start(conn *websocket.Conn, src <-chan model.Measurement) <-chan model.Measurement { +// setting the write deadline to |start| + MaxRuntime. +func Start(conn *websocket.Conn, src <-chan model.Measurement, start time.Time, pongch <-chan model.WSInfo) <-chan model.Measurement { dst := make(chan model.Measurement) - go loop(conn, src, dst) + go loop(conn, src, dst, start, pongch) return dst } diff --git a/ndt7/handler/handler.go b/ndt7/handler/handler.go index 0d7a522f..dffaaca7 100644 --- a/ndt7/handler/handler.go +++ b/ndt7/handler/handler.go @@ -36,14 +36,15 @@ func warnAndClose(writer http.ResponseWriter, message string) { // testerFunc is the function implementing a subtest. The first argument // is the subtest context. The second argument is the connected websocket. The // third argument is the open file where to write results. This function does -// not own the second or the third argument. -type testerFunc = func(context.Context, *websocket.Conn, *results.File) +// not own the second or the third argument. The fourth argument is the base +// start time of the test. +type testerFunc = func(context.Context, *websocket.Conn, *results.File, time.Time) // downloadOrUpload implements both download and upload. The writer argument // is the HTTP response writer. The request argument is the HTTP request -// that we received. The kind argument must be spec.SubtestDownload or -// spec.SubtestUpload. The tester is a function actually implementing the -// requested ndt7 subtest. +// that we received. The kind argument must be spec.SubtestDownload, +// spec.SubtestUpload, or SubtestPing. The tester is a function actually +// implementing the requested ndt7 subtest. func (h Handler) downloadOrUpload(writer http.ResponseWriter, request *http.Request, kind spec.SubtestKind, tester testerFunc) { logging.Logger.Debug("downloadOrUpload: upgrading to WebSockets") if request.Header.Get("Sec-WebSocket-Protocol") != spec.SecWebSocketProtocol { @@ -106,6 +107,8 @@ func (h Handler) downloadOrUpload(writer http.ResponseWriter, request *http.Requ result.Download = resultfp.Data } else if kind == spec.SubtestUpload { result.Upload = resultfp.Data + } else if kind == spec.SubtestPing { + result.Ping = resultfp.Data } else { logging.Logger.Warn(string(kind) + ": data not saved") } @@ -114,7 +117,7 @@ func (h Handler) downloadOrUpload(writer http.ResponseWriter, request *http.Requ } warnonerror.Close(resultfp, string(kind)+": ignoring resultfp.Close error") }() - tester(request.Context(), conn, resultfp) + tester(request.Context(), conn, resultfp, result.StartTime) } // Download handles the download subtest. @@ -126,3 +129,8 @@ func (h Handler) Download(writer http.ResponseWriter, request *http.Request) { func (h Handler) Upload(writer http.ResponseWriter, request *http.Request) { h.downloadOrUpload(writer, request, spec.SubtestUpload, upload.Do) } + +// Ping handles the ping subtest. +func (h Handler) Ping(writer http.ResponseWriter, request *http.Request) { + h.downloadOrUpload(writer, request, spec.SubtestPing, upload.Do) +} diff --git a/ndt7/measurer/measurer.go b/ndt7/measurer/measurer.go index da1afa64..fc4c3f2b 100644 --- a/ndt7/measurer/measurer.go +++ b/ndt7/measurer/measurer.go @@ -54,7 +54,7 @@ func measure(measurement *model.Measurement, sockfp *os.File, elapsed time.Durat } } -func loop(ctx context.Context, conn *websocket.Conn, UUID string, dst chan<- model.Measurement) { +func loop(ctx context.Context, conn *websocket.Conn, UUID string, dst chan<- model.Measurement, start time.Time) { logging.Logger.Debug("measurer: start") defer logging.Logger.Debug("measurer: stop") defer close(dst) @@ -66,7 +66,6 @@ func loop(ctx context.Context, conn *websocket.Conn, UUID string, dst chan<- mod return } defer sockfp.Close() - start := time.Now() connectionInfo := &model.ConnectionInfo{ Client: conn.RemoteAddr().String(), Server: conn.LocalAddr().String(), @@ -104,9 +103,9 @@ func loop(ctx context.Context, conn *websocket.Conn, UUID string, dst chan<- mod // a timeout of DefaultRuntime seconds, provided that the consumer // continues reading from the returned channel. func Start( - ctx context.Context, conn *websocket.Conn, UUID string, + ctx context.Context, conn *websocket.Conn, UUID string, start time.Time, ) <-chan model.Measurement { dst := make(chan model.Measurement) - go loop(ctx, conn, UUID, dst) + go loop(ctx, conn, UUID, dst, start) return dst } diff --git a/ndt7/model/measurement.go b/ndt7/model/measurement.go index d0cee8c5..fe24eed7 100644 --- a/ndt7/model/measurement.go +++ b/ndt7/model/measurement.go @@ -8,4 +8,5 @@ type Measurement struct { ConnectionInfo *ConnectionInfo `json:",omitempty" bigquery:"-"` BBRInfo *BBRInfo `json:",omitempty"` TCPInfo *TCPInfo `json:",omitempty"` + WSInfo *WSInfo `json:",omitempty"` } diff --git a/ndt7/model/wsinfo.go b/ndt7/model/wsinfo.go new file mode 100644 index 00000000..53bc51d2 --- /dev/null +++ b/ndt7/model/wsinfo.go @@ -0,0 +1,10 @@ +package model + +// WSInfo contains an application level (websocket) ping measurement data. +// It may be melded into AppInfo. +// FIXME: describe this structure is in the ndt7 specification. +type WSInfo struct { + ElapsedTime int64 + LastRTT int64 // TCPInfo.RTT is smoothed RTT, LastRTT is just a sample. + MinRTT int64 +} diff --git a/ndt7/ping/ping.go b/ndt7/ping/ping.go index 2af7c7ff..cb0378b6 100644 --- a/ndt7/ping/ping.go +++ b/ndt7/ping/ping.go @@ -3,16 +3,15 @@ package ping import ( "encoding/json" + "errors" "time" "github.com/gorilla/websocket" ) // SendTicks sends the current ticks as a ping message. -func SendTicks(conn *websocket.Conn, deadline time.Time) error { - // TODO(bassosimone): when we'll have a unique base time.Time reference for - // the whole test, we should use that, since UnixNano() is not monotonic. - ticks := int64(time.Now().UnixNano()) +func SendTicks(conn *websocket.Conn, start time.Time, deadline time.Time) error { + var ticks int64 = time.Since(start).Nanoseconds() data, err := json.Marshal(ticks) if err == nil { err = conn.WriteControl(websocket.PingMessage, data, deadline) @@ -20,13 +19,17 @@ func SendTicks(conn *websocket.Conn, deadline time.Time) error { return err } -func ParseTicks(s string) (d int64, err error) { - // TODO(bassosimone): when we'll have a unique base time.Time reference for - // the whole test, we should use that, since UnixNano() is not monotonic. +func ParseTicks(s string, start time.Time) (elapsed time.Duration, d time.Duration, err error) { + elapsed = time.Since(start) var prev int64 err = json.Unmarshal([]byte(s), &prev) - if err == nil { - d = (int64(time.Now().UnixNano()) - prev) + if err != nil { + return + } + if 0 <= prev && prev <= elapsed.Nanoseconds() { + d = time.Duration(elapsed.Nanoseconds() - prev) + } else { + err = errors.New("RTT is negative") } return } diff --git a/ndt7/receiver/receiver.go b/ndt7/receiver/receiver.go index ab423ed2..3e8a6ee5 100644 --- a/ndt7/receiver/receiver.go +++ b/ndt7/receiver/receiver.go @@ -6,6 +6,7 @@ import ( "context" "encoding/json" "time" + "math" "github.com/gorilla/websocket" "github.com/m-lab/ndt-server/logging" @@ -19,28 +20,44 @@ type receiverKind int const ( downloadReceiver = receiverKind(iota) uploadReceiver + pingReceiver +) + +const ( + MaxDuration = math.MaxInt64 * time.Nanosecond ) func loop( ctx context.Context, conn *websocket.Conn, kind receiverKind, - dst chan<- model.Measurement, + dst chan<- model.Measurement, start time.Time, pongch chan<- model.WSInfo, ) { logging.Logger.Debug("receiver: start") defer logging.Logger.Debug("receiver: stop") defer close(dst) + defer close(pongch) conn.SetReadLimit(spec.MaxMessageSize) receiverctx, cancel := context.WithTimeout(ctx, spec.MaxRuntime) defer cancel() - err := conn.SetReadDeadline(time.Now().Add(spec.MaxRuntime)) // Liveness! + err := conn.SetReadDeadline(start.Add(spec.MaxRuntime)) // Liveness! if err != nil { logging.Logger.WithError(err).Warn("receiver: conn.SetReadDeadline failed") return } + minRTT := MaxDuration conn.SetPongHandler(func(s string) error { - rtt, err := ping.ParseTicks(s) + elapsed, rtt, err := ping.ParseTicks(s, start) if err == nil { - rtt /= int64(time.Millisecond) - logging.Logger.Debugf("receiver: ApplicationLevel RTT: %d ms", rtt) + logging.Logger.Debugf("receiver: ApplicationLevel RTT: %d ms", int64(rtt / time.Millisecond)) + if rtt < minRTT { + minRTT = rtt + } + + wsinfo := model.WSInfo{ + ElapsedTime: int64(elapsed / time.Microsecond), + LastRTT: int64(rtt / time.Microsecond), + MinRTT: int64(minRTT / time.Microsecond), + } + pongch <- wsinfo // Liveness: buffered (sender) } return err }) @@ -55,11 +72,11 @@ func loop( } if mtype != websocket.TextMessage { switch kind { - case downloadReceiver: + case uploadReceiver: + continue // No further processing required + default: // downloadReceiver and pingReceiver logging.Logger.Warn("receiver: got non-Text message") return // Unexpected message type - default: - continue // No further processing required } } var measurement model.Measurement @@ -72,10 +89,15 @@ func loop( } } -func start(ctx context.Context, conn *websocket.Conn, kind receiverKind) <-chan model.Measurement { +func startReceiver(ctx context.Context, conn *websocket.Conn, kind receiverKind, start time.Time) (<-chan model.Measurement, <-chan model.WSInfo) { + // |dst| is going to the log file dst := make(chan model.Measurement) - go loop(ctx, conn, kind, dst) - return dst + // |pongch| goes to the client, it's buffered to avoid blocking on `download.sender.loop` + // while `conn.WritePreparedMessage()` is active. + // TODO(darkk): is it possible to reduce buffer size or to avoiding blocking in some other way? May avoiding L7 pings at /download altogether be the way? + pongch := make(chan model.WSInfo, 1 + spec.MaxRuntime / spec.MinPoissonSamplingInterval) + go loop(ctx, conn, kind, dst, start, pongch) + return dst, pongch } // StartDownloadReceiver starts the receiver in a background goroutine and @@ -87,13 +109,18 @@ func start(ctx context.Context, conn *websocket.Conn, kind receiverKind) <-chan // Liveness guarantee: the goroutine will always terminate after a // MaxRuntime timeout, provided that the consumer will keep reading // from the returned channel. -func StartDownloadReceiver(ctx context.Context, conn *websocket.Conn) <-chan model.Measurement { - return start(ctx, conn, downloadReceiver) +func StartDownloadReceiver(ctx context.Context, conn *websocket.Conn, start time.Time, msmch <-chan model.Measurement) (<-chan model.Measurement, <-chan model.WSInfo) { + return startReceiver(ctx, conn, downloadReceiver, start) } // StartUploadReceiver is like StartDownloadReceiver except that it // tolerates incoming binary messages, which are sent to cause // network load, and therefore must not be rejected. -func StartUploadReceiver(ctx context.Context, conn *websocket.Conn) <-chan model.Measurement { - return start(ctx, conn, uploadReceiver) +func StartUploadReceiver(ctx context.Context, conn *websocket.Conn, start time.Time) (<-chan model.Measurement, <-chan model.WSInfo) { + return startReceiver(ctx, conn, uploadReceiver, start) +} + +// StartPingReceiver is exactly like StartDownloadReceiver currently. +func StartPingReceiver(ctx context.Context, conn *websocket.Conn, start time.Time) (<-chan model.Measurement, <-chan model.WSInfo) { + return startReceiver(ctx, conn, pingReceiver, start) } diff --git a/ndt7/results/file.go b/ndt7/results/file.go index 8786c429..016dc293 100644 --- a/ndt7/results/file.go +++ b/ndt7/results/file.go @@ -66,8 +66,8 @@ func newFile(datadir, what, uuid string) (*File, error) { // containing the metadata. The conn argument is used to retrieve the local and // the remote endpoints addresses. The "datadir" argument specifies the // directory on disk to write the data into and the what argument should -// indicate whether this is a spec.SubtestDownload or a spec.SubtestUpload -// ndt7 measurement. +// indicate whether this is a spec.SubtestDownload, a spec.SubtestUpload +// or a spec.SubtestPing ndt7 measurement. func OpenFor(request *http.Request, conn *websocket.Conn, datadir string, what spec.SubtestKind) (*File, error) { meta := make(metadata, 0) netConn := conn.UnderlyingConn() diff --git a/ndt7/spec/spec.go b/ndt7/spec/spec.go index 6d47d812..8d7c8522 100644 --- a/ndt7/spec/spec.go +++ b/ndt7/spec/spec.go @@ -9,6 +9,9 @@ const DownloadURLPath = "/ndt/v7/download" // UploadURLPath selects the upload subtest. const UploadURLPath = "/ndt/v7/upload" +// PingURLPath selects the ping subtest. +const PingURLPath = "/ndt/v7/ping" + // SecWebSocketProtocol is the WebSocket subprotocol used by ndt7. const SecWebSocketProtocol = "net.measurementlab.ndt.v7" @@ -57,4 +60,7 @@ const ( // SubtestUpload is a upload subtest SubtestUpload = SubtestKind("upload") + + // SubtestPing is a ping subtest + SubtestPing = SubtestKind("ping") ) diff --git a/ndt7/upload/sender/sender.go b/ndt7/upload/sender/sender.go index ff46a130..eee5c007 100644 --- a/ndt7/upload/sender/sender.go +++ b/ndt7/upload/sender/sender.go @@ -14,7 +14,7 @@ import ( func loop( conn *websocket.Conn, src <-chan model.Measurement, - dst chan<- model.Measurement, + dst chan<- model.Measurement, start time.Time, pongch <-chan model.WSInfo, ) { logging.Logger.Debug("sender: start") defer logging.Logger.Debug("sender: stop") @@ -23,27 +23,41 @@ func loop( for range src { // make sure we drain the channel } + for range pongch { + // it should be buffered channel, but let's drain it anyway + } }() - deadline := time.Now().Add(spec.MaxRuntime) + deadline := start.Add(spec.MaxRuntime) err := conn.SetWriteDeadline(deadline) // Liveness! if err != nil { logging.Logger.WithError(err).Warn("sender: conn.SetWriteDeadline failed") return } for { - m, ok := <-src - if !ok { // This means that the previous step has terminated - closer.StartClosing(conn) - return - } - if err := conn.WriteJSON(m); err != nil { - logging.Logger.WithError(err).Warn("sender: conn.WriteJSON failed") - return - } - dst <- m // Liveness: this is blocking - if err := ping.SendTicks(conn, deadline); err != nil { - logging.Logger.WithError(err).Warn("sender: ping.SendTicks failed") - return + select { + case m, ok := <-src: + if !ok { // This means that the previous step has terminated + closer.StartClosing(conn) + return + } + if err := conn.WriteJSON(m); err != nil { + logging.Logger.WithError(err).Warn("sender: conn.WriteJSON failed") + return + } + dst <- m // Liveness: this is blocking + if err := ping.SendTicks(conn, start, deadline); err != nil { + logging.Logger.WithError(err).Warn("sender: ping.SendTicks failed") + return + } + case wsinfo := <-pongch: + m := model.Measurement{ + WSInfo: &wsinfo, + } + if err := conn.WriteJSON(m); err != nil { + logging.Logger.WithError(err).Warn("sender: conn.WriteJSON failed") + return + } + dst <- m // Liveness: this is blocking write to log } } } @@ -55,9 +69,9 @@ func loop( // Liveness guarantee: the sender will not be stuck sending for more then // the MaxRuntime of the subtest, provided that the consumer will // continue reading from the returned channel. This is enforced by -// setting the write deadline to MaxRuntime + time.Now. -func Start(conn *websocket.Conn, src <-chan model.Measurement) <-chan model.Measurement { +// setting the write deadline to |start| + MaxRuntime. +func Start(conn *websocket.Conn, src <-chan model.Measurement, start time.Time, pongch <-chan model.WSInfo) <-chan model.Measurement { dst := make(chan model.Measurement) - go loop(conn, src, dst) + go loop(conn, src, dst, start, pongch) return dst } diff --git a/ndt7/upload/upload.go b/ndt7/upload/upload.go index 511c3a70..11851b6c 100644 --- a/ndt7/upload/upload.go +++ b/ndt7/upload/upload.go @@ -3,6 +3,7 @@ package upload import ( "context" + "time" "github.com/gorilla/websocket" "github.com/m-lab/ndt-server/ndt7/results" @@ -15,13 +16,15 @@ import ( // Do implements the upload subtest. The ctx argument is the parent context // for the subtest. The conn argument is the open WebSocket connection. The // resultfp argument is the file where to save results. Both arguments are -// owned by the caller of this function. -func Do(ctx context.Context, conn *websocket.Conn, resultfp *results.File) { +// owned by the caller of this function. The start argument is the test +// start time used to calculate ElapsedTime and deadlines. +func Do(ctx context.Context, conn *websocket.Conn, resultfp *results.File, start time.Time) { // Implementation note: use child context so that, if we cannot save the // results in the loop below, we terminate the goroutines early wholectx, cancel := context.WithCancel(ctx) defer cancel() - senderch := sender.Start(conn, measurer.Start(wholectx, conn, resultfp.Data.UUID)) - receiverch := receiver.StartUploadReceiver(wholectx, conn) + measurerch := measurer.Start(wholectx, conn, resultfp.Data.UUID, start) + receiverch, pongch := receiver.StartUploadReceiver(wholectx, conn, start) + senderch := sender.Start(conn, measurerch, start, pongch) saver.SaveAll(resultfp, senderch, receiverch) } From dc0cdbf31f7de2fcd97a7ecf5a4a35a3dc1b70a5 Mon Sep 17 00:00:00 2001 From: Leonid Evdokimov Date: Tue, 21 Jan 2020 13:46:22 +0300 Subject: [PATCH 2/7] ndt: add websocket-based ping to the spec --- spec/ndt7-protocol.md | 63 ++++++++++++++++++++++++++++++++++--------- 1 file changed, 51 insertions(+), 12 deletions(-) diff --git a/spec/ndt7-protocol.md b/spec/ndt7-protocol.md index 663bfa31..c761470d 100644 --- a/spec/ndt7-protocol.md +++ b/spec/ndt7-protocol.md @@ -7,18 +7,19 @@ protocol](https://github.com/ndt-project/ndt). Ndt7 is based on WebSocket and TLS, and takes advantage of TCP BBR, where this flavour of TCP is available. -This is version v0.8.3 of the ndt7 specification. +This is version v0.9.0 of the ndt7 specification. ## Design choices (This section is non-normative.) Ndt7 measures the application-level download and upload performance -using WebSockets over TLS. Each test type is independent, and -there are two types of test: the download and the upload tests. Ndt7 -always uses a single TCP connection. Whenever possible, ndt7 uses a recent -version of TCP BBR. Writing an ndt7 client is designed to be as simple -as possible. [A complete Go language ndt7 client]( +using WebSockets over TLS. Each test type is independent, and there are +three types of test: the download, the upload tests, and the latency +test. Ndt7 always uses a single new TCP connection for each type of +test. Whenever possible, ndt7 uses a recent version of TCP BBR. Writing +an ndt7 client is designed to be as simple as possible. [A complete Go +language ndt7 client]( https://github.com/bassosimone/ndt7-client-go-minimal) has been implemented in just 151 lines. We used 26 lines for the download, 33 for the upload, and 17 for establishing a connections. No code from the NDT server has been @@ -68,12 +69,14 @@ servers should behave during the download and the upload tests. The client connects to the server using HTTPS and requests to upgrade the connection to WebSockets. The same connection will be used to exchange control and measurement messages. The upgrade request URL will indicate -the type of test that the client wants to perform. Two tests and -hence two URLs are defined: +the type of test that the client wants to perform. Three tests and +hence three URLs are defined: - `/ndt/v7/download`, which selects the download test; -- `/ndt/v7/upload`, which selects the upload test. +- `/ndt/v7/upload`, which selects the upload test; + +- `/ndt/v7/ping`, which selects the ping test. The upgrade message MUST also contain the WebSocket subprotocol that identifies ndt7, which is `net.measurementlab.ndt.v7`. The URL in the @@ -199,14 +202,14 @@ provide information useful to diagnose performance issues. While in theory we could specify all `TCP_INFO` and `BBR_INFO` variables, different kernel versions provide different subsets of these measurements and we do not want to be needlessly restrictive regarding the underlying -kernel for the server. Instead, +kernel for the server. Instead, our guiding principle is to describe only the variables that in our experience are useful to understand performance issues. More variables could be added in the future. No variables should be removed, but, if some are removed, we should document them as being removed rather than removing them from this specification. -Since version v0.8.0 of this specification, the measurement message +Since version v0.9.0 of this specification, the measurement message has the following structure: ```json @@ -222,6 +225,11 @@ has the following structure: }, "Origin": "server", "Test": "download", + "WSInfo": { + "ElapsedTime": 1234, + "LastRTT": 134, + "MinRTT": 1234 + }, "TCPInfo": { "BusyTime": 1234, "BytesAcked": 1234, @@ -281,6 +289,18 @@ Where: current test. This field SHOULD only be used when the current test should otherwise not be obvious. +- `WSInfo` is an _optional_ `object` only included in the measurement + when a reasonable websocket-level measurement is available: + + - `ElapsedTime` (a `int64`) is the time elapsed since the beginning of + this test, measured in microseconds. + + - `LastRTT` (an _optional_ `int64`), the last observed RTT for the websocket + ping-pong exchange, measured in microseconds. + + - `MinRTT` (an _optional_ `int64`), the minimum observed RTT for the websocket + ping-pong exchange, measured in microseconds. + - `TCPInfo` is an _optional_ `object` only included in the measurement when it is possible to access `TCP_INFO` stats. It contains: @@ -387,9 +407,11 @@ When the server sends measurement messages, the download becomes: ``` > GET /ndt/v7/download Upgrade: websocket < 101 Switching Protocols +< PingMessage < BinaryMessage < BinaryMessage < TextMessage clientElapsedTime=0.30 s +> PongMessage < BinaryMessage < BinaryMessage < TextMessage clientElapsedTime=0.55 s @@ -645,7 +667,9 @@ of the round-trip time. The buildup of a large queue is unexpected when using BBR. It generally indicates the presence of a bottleneck with a large buffer that's filling as the test proceeds. The `MinRTT` can also be useful to verify we're using a reasonably nearby-server. Also, an unreasonably small RTT when -the link is 2G or 3G could indicate a performance enhancing proxy. +the link is 2G or 3G could indicate a performance enhancing proxy, one can +compare `TCPInfo.MinRTT` against `WSInfo.MinRTT` to get additional evidence +supporing this case. The times (`BusyTime`, `RWndLimited`, and `SndBufLimited`) are useful to understand where the bottleneck could be. In general we would like to see @@ -679,3 +703,18 @@ packet is uniformly distributed, which isn't likely the case. Yet, it may be an useful first order information to characterise a network as possibly very lossy. Some packet loss is normal and healthy, but too much packet loss is the sign of a network path with systemic problems. + +### Measuring latency + +The presence of TCP-level proxies leads to L7 means being needed to +measure end-to-end latency in addition to end-to-end bandwidth. Such +proxies may include ISP-level performance-enhancing proxies, OpenSSH, +Tor anonymity network and many others. + +`WSInfo.LastRTT` samples may be affected by the payload during download +and upload tests, as the queue of BinaryMessage may delay either ping +or pong frame. Ping test does not send BinaryMessage payload, so WSInfo +RTT measurements should be reasonably accurate (unless it's practical +for the client to delay pong frames). The very first `WSInfo` sample +collected during the download test also has a chance to be accurate as +the ping frame SHOULD precede any BinaryMessages in the case. From 1ac78b32baa83231200e9785c6aa216e2f5aba67 Mon Sep 17 00:00:00 2001 From: Leonid Evdokimov Date: Tue, 21 Jan 2020 20:36:12 +0300 Subject: [PATCH 3/7] Rename WSInfo to WSPingInfo --- html/ndt7.html | 4 ++-- ndt7/download/sender/sender.go | 6 +++--- ndt7/model/measurement.go | 2 +- ndt7/model/wsinfo.go | 10 ---------- ndt7/model/wspinginfo.go | 9 +++++++++ ndt7/receiver/receiver.go | 14 +++++++------- ndt7/upload/sender/sender.go | 6 +++--- spec/ndt7-protocol.md | 12 ++++++------ 8 files changed, 31 insertions(+), 32 deletions(-) delete mode 100644 ndt7/model/wsinfo.go create mode 100644 ndt7/model/wspinginfo.go diff --git a/html/ndt7.html b/html/ndt7.html index fbd3b107..548355c1 100644 --- a/html/ndt7.html +++ b/html/ndt7.html @@ -70,8 +70,8 @@ updateView(testName, val.AppInfo) } if (ev === 'measurement' && val.Origin === 'server' && testName === 'ping') { - if (val.WSInfo !== undefined) { - ws = val.WSInfo.MinRTT / 1e3 + if (val.WSPingInfo !== undefined) { + ws = val.WSPingInfo.MinRTT / 1e3 } if (val.TCPInfo !== undefined) { tcp = val.TCPInfo.MinRTT / 1e3 diff --git a/ndt7/download/sender/sender.go b/ndt7/download/sender/sender.go index b7ce17fe..dc25cf3e 100644 --- a/ndt7/download/sender/sender.go +++ b/ndt7/download/sender/sender.go @@ -24,7 +24,7 @@ func makePreparedMessage(size int) (*websocket.PreparedMessage, error) { func loop( conn *websocket.Conn, src <-chan model.Measurement, - dst chan<- model.Measurement, start time.Time, pongch <-chan model.WSInfo, + dst chan<- model.Measurement, start time.Time, pongch <-chan model.WSPingInfo, ) { logging.Logger.Debug("sender: start") defer logging.Logger.Debug("sender: stop") @@ -74,7 +74,7 @@ func loop( } case wsinfo := <-pongch: m := model.Measurement{ - WSInfo: &wsinfo, + WSPingInfo: &wsinfo, } if err := conn.WriteJSON(m); err != nil { logging.Logger.WithError(err).Warn("sender: conn.WriteJSON failed") @@ -120,7 +120,7 @@ func loop( // the MaxRuntime of the subtest, provided that the consumer will // continue reading from the returned channel. This is enforced by // setting the write deadline to |start| + MaxRuntime. -func Start(conn *websocket.Conn, src <-chan model.Measurement, start time.Time, pongch <-chan model.WSInfo) <-chan model.Measurement { +func Start(conn *websocket.Conn, src <-chan model.Measurement, start time.Time, pongch <-chan model.WSPingInfo) <-chan model.Measurement { dst := make(chan model.Measurement) go loop(conn, src, dst, start, pongch) return dst diff --git a/ndt7/model/measurement.go b/ndt7/model/measurement.go index fe24eed7..1e3b418a 100644 --- a/ndt7/model/measurement.go +++ b/ndt7/model/measurement.go @@ -8,5 +8,5 @@ type Measurement struct { ConnectionInfo *ConnectionInfo `json:",omitempty" bigquery:"-"` BBRInfo *BBRInfo `json:",omitempty"` TCPInfo *TCPInfo `json:",omitempty"` - WSInfo *WSInfo `json:",omitempty"` + WSPingInfo *WSPingInfo `json:",omitempty"` } diff --git a/ndt7/model/wsinfo.go b/ndt7/model/wsinfo.go deleted file mode 100644 index 53bc51d2..00000000 --- a/ndt7/model/wsinfo.go +++ /dev/null @@ -1,10 +0,0 @@ -package model - -// WSInfo contains an application level (websocket) ping measurement data. -// It may be melded into AppInfo. -// FIXME: describe this structure is in the ndt7 specification. -type WSInfo struct { - ElapsedTime int64 - LastRTT int64 // TCPInfo.RTT is smoothed RTT, LastRTT is just a sample. - MinRTT int64 -} diff --git a/ndt7/model/wspinginfo.go b/ndt7/model/wspinginfo.go new file mode 100644 index 00000000..d16ac226 --- /dev/null +++ b/ndt7/model/wspinginfo.go @@ -0,0 +1,9 @@ +package model + +// WSPingInfo contains an application level (websocket) ping measurement data. +// This structure is described in the ndt7 specification. +type WSPingInfo struct { + ElapsedTime int64 + LastRTT int64 // TCPInfo.RTT is smoothed RTT, LastRTT is just a sample. + MinRTT int64 +} diff --git a/ndt7/receiver/receiver.go b/ndt7/receiver/receiver.go index 3e8a6ee5..2086c3df 100644 --- a/ndt7/receiver/receiver.go +++ b/ndt7/receiver/receiver.go @@ -29,7 +29,7 @@ const ( func loop( ctx context.Context, conn *websocket.Conn, kind receiverKind, - dst chan<- model.Measurement, start time.Time, pongch chan<- model.WSInfo, + dst chan<- model.Measurement, start time.Time, pongch chan<- model.WSPingInfo, ) { logging.Logger.Debug("receiver: start") defer logging.Logger.Debug("receiver: stop") @@ -52,7 +52,7 @@ func loop( minRTT = rtt } - wsinfo := model.WSInfo{ + wsinfo := model.WSPingInfo{ ElapsedTime: int64(elapsed / time.Microsecond), LastRTT: int64(rtt / time.Microsecond), MinRTT: int64(minRTT / time.Microsecond), @@ -89,13 +89,13 @@ func loop( } } -func startReceiver(ctx context.Context, conn *websocket.Conn, kind receiverKind, start time.Time) (<-chan model.Measurement, <-chan model.WSInfo) { +func startReceiver(ctx context.Context, conn *websocket.Conn, kind receiverKind, start time.Time) (<-chan model.Measurement, <-chan model.WSPingInfo) { // |dst| is going to the log file dst := make(chan model.Measurement) // |pongch| goes to the client, it's buffered to avoid blocking on `download.sender.loop` // while `conn.WritePreparedMessage()` is active. // TODO(darkk): is it possible to reduce buffer size or to avoiding blocking in some other way? May avoiding L7 pings at /download altogether be the way? - pongch := make(chan model.WSInfo, 1 + spec.MaxRuntime / spec.MinPoissonSamplingInterval) + pongch := make(chan model.WSPingInfo, 1 + spec.MaxRuntime / spec.MinPoissonSamplingInterval) go loop(ctx, conn, kind, dst, start, pongch) return dst, pongch } @@ -109,18 +109,18 @@ func startReceiver(ctx context.Context, conn *websocket.Conn, kind receiverKind, // Liveness guarantee: the goroutine will always terminate after a // MaxRuntime timeout, provided that the consumer will keep reading // from the returned channel. -func StartDownloadReceiver(ctx context.Context, conn *websocket.Conn, start time.Time, msmch <-chan model.Measurement) (<-chan model.Measurement, <-chan model.WSInfo) { +func StartDownloadReceiver(ctx context.Context, conn *websocket.Conn, start time.Time, msmch <-chan model.Measurement) (<-chan model.Measurement, <-chan model.WSPingInfo) { return startReceiver(ctx, conn, downloadReceiver, start) } // StartUploadReceiver is like StartDownloadReceiver except that it // tolerates incoming binary messages, which are sent to cause // network load, and therefore must not be rejected. -func StartUploadReceiver(ctx context.Context, conn *websocket.Conn, start time.Time) (<-chan model.Measurement, <-chan model.WSInfo) { +func StartUploadReceiver(ctx context.Context, conn *websocket.Conn, start time.Time) (<-chan model.Measurement, <-chan model.WSPingInfo) { return startReceiver(ctx, conn, uploadReceiver, start) } // StartPingReceiver is exactly like StartDownloadReceiver currently. -func StartPingReceiver(ctx context.Context, conn *websocket.Conn, start time.Time) (<-chan model.Measurement, <-chan model.WSInfo) { +func StartPingReceiver(ctx context.Context, conn *websocket.Conn, start time.Time) (<-chan model.Measurement, <-chan model.WSPingInfo) { return startReceiver(ctx, conn, pingReceiver, start) } diff --git a/ndt7/upload/sender/sender.go b/ndt7/upload/sender/sender.go index eee5c007..e6c30384 100644 --- a/ndt7/upload/sender/sender.go +++ b/ndt7/upload/sender/sender.go @@ -14,7 +14,7 @@ import ( func loop( conn *websocket.Conn, src <-chan model.Measurement, - dst chan<- model.Measurement, start time.Time, pongch <-chan model.WSInfo, + dst chan<- model.Measurement, start time.Time, pongch <-chan model.WSPingInfo, ) { logging.Logger.Debug("sender: start") defer logging.Logger.Debug("sender: stop") @@ -51,7 +51,7 @@ func loop( } case wsinfo := <-pongch: m := model.Measurement{ - WSInfo: &wsinfo, + WSPingInfo: &wsinfo, } if err := conn.WriteJSON(m); err != nil { logging.Logger.WithError(err).Warn("sender: conn.WriteJSON failed") @@ -70,7 +70,7 @@ func loop( // the MaxRuntime of the subtest, provided that the consumer will // continue reading from the returned channel. This is enforced by // setting the write deadline to |start| + MaxRuntime. -func Start(conn *websocket.Conn, src <-chan model.Measurement, start time.Time, pongch <-chan model.WSInfo) <-chan model.Measurement { +func Start(conn *websocket.Conn, src <-chan model.Measurement, start time.Time, pongch <-chan model.WSPingInfo) <-chan model.Measurement { dst := make(chan model.Measurement) go loop(conn, src, dst, start, pongch) return dst diff --git a/spec/ndt7-protocol.md b/spec/ndt7-protocol.md index c761470d..f863acd5 100644 --- a/spec/ndt7-protocol.md +++ b/spec/ndt7-protocol.md @@ -225,7 +225,7 @@ has the following structure: }, "Origin": "server", "Test": "download", - "WSInfo": { + "WSPingInfo": { "ElapsedTime": 1234, "LastRTT": 134, "MinRTT": 1234 @@ -289,7 +289,7 @@ Where: current test. This field SHOULD only be used when the current test should otherwise not be obvious. -- `WSInfo` is an _optional_ `object` only included in the measurement +- `WSPingInfo` is an _optional_ `object` only included in the measurement when a reasonable websocket-level measurement is available: - `ElapsedTime` (a `int64`) is the time elapsed since the beginning of @@ -668,7 +668,7 @@ BBR. It generally indicates the presence of a bottleneck with a large buffer that's filling as the test proceeds. The `MinRTT` can also be useful to verify we're using a reasonably nearby-server. Also, an unreasonably small RTT when the link is 2G or 3G could indicate a performance enhancing proxy, one can -compare `TCPInfo.MinRTT` against `WSInfo.MinRTT` to get additional evidence +compare `TCPInfo.MinRTT` against `WSPingInfo.MinRTT` to get additional evidence supporing this case. The times (`BusyTime`, `RWndLimited`, and `SndBufLimited`) are useful to @@ -711,10 +711,10 @@ measure end-to-end latency in addition to end-to-end bandwidth. Such proxies may include ISP-level performance-enhancing proxies, OpenSSH, Tor anonymity network and many others. -`WSInfo.LastRTT` samples may be affected by the payload during download +`WSPingInfo.LastRTT` samples may be affected by the payload during download and upload tests, as the queue of BinaryMessage may delay either ping -or pong frame. Ping test does not send BinaryMessage payload, so WSInfo +or pong frame. Ping test does not send BinaryMessage payload, so WSPingInfo RTT measurements should be reasonably accurate (unless it's practical -for the client to delay pong frames). The very first `WSInfo` sample +for the client to delay pong frames). The very first `WSPingInfo` sample collected during the download test also has a chance to be accurate as the ping frame SHOULD precede any BinaryMessages in the case. From da9614c341e48405414003cfd2b183e1eed505e9 Mon Sep 17 00:00:00 2001 From: Leonid Evdokimov Date: Fri, 24 Jan 2020 14:46:11 +0300 Subject: [PATCH 4/7] ndt7: wrap websocket ping into json to protect against unsolicited pong frames --- ndt7/ping/ping.go | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/ndt7/ping/ping.go b/ndt7/ping/ping.go index cb0378b6..a5b86f06 100644 --- a/ndt7/ping/ping.go +++ b/ndt7/ping/ping.go @@ -9,10 +9,21 @@ import ( "github.com/gorilla/websocket" ) +// The json object is used as a namespace to avoid erratic interpretation of +// unsolicited pong frames. Ping and pong frames are not a part of +// Sec-WebSocket-Protocol, they're part of RFC6455. Section 5.5.3 of the RFC +// allows unsolicited pong frames. Some browsers are known to send unsolicited +// pong frames, see golang/go#6377 . +type pingMessage struct { + Ndt7TS int64 +} + // SendTicks sends the current ticks as a ping message. func SendTicks(conn *websocket.Conn, start time.Time, deadline time.Time) error { - var ticks int64 = time.Since(start).Nanoseconds() - data, err := json.Marshal(ticks) + msg := pingMessage{ + Ndt7TS: time.Since(start).Nanoseconds(), + } + data, err := json.Marshal(msg) if err == nil { err = conn.WriteControl(websocket.PingMessage, data, deadline) } @@ -21,11 +32,12 @@ func SendTicks(conn *websocket.Conn, start time.Time, deadline time.Time) error func ParseTicks(s string, start time.Time) (elapsed time.Duration, d time.Duration, err error) { elapsed = time.Since(start) - var prev int64 - err = json.Unmarshal([]byte(s), &prev) + var msg pingMessage + err = json.Unmarshal([]byte(s), &msg) if err != nil { return } + prev := msg.Ndt7TS if 0 <= prev && prev <= elapsed.Nanoseconds() { d = time.Duration(elapsed.Nanoseconds() - prev) } else { From 1de3f7a54c19ba2b3b35ae05276efb484b6d1d51 Mon Sep 17 00:00:00 2001 From: Leonid Evdokimov Date: Tue, 28 Jan 2020 16:49:31 +0300 Subject: [PATCH 5/7] ndt7: implement /ping handler as a self-contained module --- ndt7/download/download.go | 6 +- ndt7/download/sender/sender.go | 31 +++------- ndt7/handler/handler.go | 3 +- ndt7/ping/message/message.go | 47 +++++++++++++++ ndt7/ping/mux/mux.go | 102 +++++++++++++++++++++++++++++++++ ndt7/ping/ping.go | 61 ++++++++------------ ndt7/ping/receiver/receiver.go | 101 ++++++++++++++++++++++++++++++++ ndt7/ping/sender/sender.go | 61 ++++++++++++++++++++ ndt7/receiver/receiver.go | 54 +++++------------ ndt7/upload/sender/sender.go | 48 ++++++---------- ndt7/upload/upload.go | 5 +- spec/ndt7-protocol.md | 4 +- 12 files changed, 382 insertions(+), 141 deletions(-) create mode 100644 ndt7/ping/message/message.go create mode 100644 ndt7/ping/mux/mux.go create mode 100644 ndt7/ping/receiver/receiver.go create mode 100644 ndt7/ping/sender/sender.go diff --git a/ndt7/download/download.go b/ndt7/download/download.go index 9e3700d2..6ae0e741 100644 --- a/ndt7/download/download.go +++ b/ndt7/download/download.go @@ -23,8 +23,8 @@ func Do(ctx context.Context, conn *websocket.Conn, resultfp *results.File, start // results in the loop below, we terminate the goroutines early wholectx, cancel := context.WithCancel(ctx) defer cancel() - measurerch := measurer.Start(wholectx, conn, resultfp.Data.UUID, start) - receiverch, pongch := receiver.StartDownloadReceiver(wholectx, conn, start, measurerch) - senderch := sender.Start(conn, measurerch, start, pongch) + senderch := sender.Start(conn, measurer.Start(wholectx, conn, resultfp.Data.UUID, start), start) + receiverch := receiver.StartDownloadReceiver(wholectx, conn, start) + saver.SaveAll(resultfp, senderch, receiverch) } diff --git a/ndt7/download/sender/sender.go b/ndt7/download/sender/sender.go index dc25cf3e..d161c423 100644 --- a/ndt7/download/sender/sender.go +++ b/ndt7/download/sender/sender.go @@ -9,7 +9,7 @@ import ( "github.com/m-lab/ndt-server/logging" "github.com/m-lab/ndt-server/ndt7/closer" "github.com/m-lab/ndt-server/ndt7/model" - "github.com/m-lab/ndt-server/ndt7/ping" + "github.com/m-lab/ndt-server/ndt7/ping/message" "github.com/m-lab/ndt-server/ndt7/spec" ) @@ -22,10 +22,7 @@ func makePreparedMessage(size int) (*websocket.PreparedMessage, error) { return websocket.NewPreparedMessage(websocket.BinaryMessage, data) } -func loop( - conn *websocket.Conn, src <-chan model.Measurement, - dst chan<- model.Measurement, start time.Time, pongch <-chan model.WSPingInfo, -) { +func loop(conn *websocket.Conn, src <-chan model.Measurement, dst chan<- model.Measurement, start time.Time) { logging.Logger.Debug("sender: start") defer logging.Logger.Debug("sender: stop") defer close(dst) @@ -33,9 +30,6 @@ func loop( for range src { // make sure we drain the channel } - for range pongch { - // it should be buffered channel, but let's drain it anyway - } }() logging.Logger.Debug("sender: generating random buffer") bulkMessageSize := 1 << 13 @@ -51,8 +45,8 @@ func loop( return } // only the first RTT sample taken before flooding the conn is not affected by HOL - if err := ping.SendTicks(conn, start, deadline); err != nil { - logging.Logger.WithError(err).Warn("sender: ping.SendTicks failed") + if err := message.SendTicks(conn, start, deadline); err != nil { + logging.Logger.WithError(err).Warn("sender: ping.message.SendTicks failed") return } var totalSent int64 @@ -68,19 +62,10 @@ func loop( return } dst <- m // Liveness: this is blocking - if err := ping.SendTicks(conn, start, deadline); err != nil { - logging.Logger.WithError(err).Warn("sender: ping.SendTicks failed") - return - } - case wsinfo := <-pongch: - m := model.Measurement{ - WSPingInfo: &wsinfo, - } - if err := conn.WriteJSON(m); err != nil { - logging.Logger.WithError(err).Warn("sender: conn.WriteJSON failed") + if err := message.SendTicks(conn, start, deadline); err != nil { + logging.Logger.WithError(err).Warn("sender: ping.message.SendTicks failed") return } - dst <- m // Liveness: this is blocking write to log default: if err := conn.WritePreparedMessage(preparedMessage); err != nil { logging.Logger.WithError(err).Warn( @@ -120,8 +105,8 @@ func loop( // the MaxRuntime of the subtest, provided that the consumer will // continue reading from the returned channel. This is enforced by // setting the write deadline to |start| + MaxRuntime. -func Start(conn *websocket.Conn, src <-chan model.Measurement, start time.Time, pongch <-chan model.WSPingInfo) <-chan model.Measurement { +func Start(conn *websocket.Conn, src <-chan model.Measurement, start time.Time) <-chan model.Measurement { dst := make(chan model.Measurement) - go loop(conn, src, dst, start, pongch) + go loop(conn, src, dst, start) return dst } diff --git a/ndt7/handler/handler.go b/ndt7/handler/handler.go index dffaaca7..e6288259 100644 --- a/ndt7/handler/handler.go +++ b/ndt7/handler/handler.go @@ -16,6 +16,7 @@ import ( "github.com/m-lab/ndt-server/ndt7/results" "github.com/m-lab/ndt-server/ndt7/spec" "github.com/m-lab/ndt-server/ndt7/upload" + "github.com/m-lab/ndt-server/ndt7/ping" "github.com/m-lab/ndt-server/version" ) @@ -132,5 +133,5 @@ func (h Handler) Upload(writer http.ResponseWriter, request *http.Request) { // Ping handles the ping subtest. func (h Handler) Ping(writer http.ResponseWriter, request *http.Request) { - h.downloadOrUpload(writer, request, spec.SubtestPing, upload.Do) + h.downloadOrUpload(writer, request, spec.SubtestPing, ping.Do) } diff --git a/ndt7/ping/message/message.go b/ndt7/ping/message/message.go new file mode 100644 index 00000000..d8cb5725 --- /dev/null +++ b/ndt7/ping/message/message.go @@ -0,0 +1,47 @@ +// Package message implements operations with WebSocket PING messages. +package message + +import ( + "encoding/json" + "errors" + "time" + + "github.com/gorilla/websocket" +) + +// The json object is used as a namespace to avoid erratic interpretation of +// unsolicited pong frames. Ping and pong frames are not a part of +// Sec-WebSocket-Protocol, they're part of RFC6455. Section 5.5.3 of the RFC +// allows unsolicited pong frames. Some browsers are known to send unsolicited +// pong frames, see golang/go#6377 . +type pingMessage struct { + Ndt7TS int64 +} + +// SendTicks sends the current ticks as a ping message. +func SendTicks(conn *websocket.Conn, start time.Time, deadline time.Time) error { + msg := pingMessage{ + Ndt7TS: time.Since(start).Nanoseconds(), + } + data, err := json.Marshal(msg) + if err == nil { + err = conn.WriteControl(websocket.PingMessage, data, deadline) + } + return err +} + +func ParseTicks(s string, start time.Time) (elapsed time.Duration, d time.Duration, err error) { + elapsed = time.Since(start) + var msg pingMessage + err = json.Unmarshal([]byte(s), &msg) + if err != nil { + return + } + prev := msg.Ndt7TS + if 0 <= prev && prev <= elapsed.Nanoseconds() { + d = time.Duration(elapsed.Nanoseconds() - prev) + } else { + err = errors.New("RTT is negative") + } + return +} diff --git a/ndt7/ping/mux/mux.go b/ndt7/ping/mux/mux.go new file mode 100644 index 00000000..2dd05e1e --- /dev/null +++ b/ndt7/ping/mux/mux.go @@ -0,0 +1,102 @@ +// Package mux implements the ping subtest channel multiplexor. +package mux + +import ( + "context" + + "github.com/m-lab/ndt-server/logging" + "github.com/m-lab/ndt-server/ndt7/model" + "github.com/m-lab/ndt-server/ndt7/ping/receiver" +) + +func upsert(self *model.Measurement, m model.Measurement) { + if m.AppInfo != nil { + self.AppInfo = m.AppInfo + } + if m.ConnectionInfo != nil { + self.ConnectionInfo = m.ConnectionInfo + } + if m.BBRInfo != nil { + self.BBRInfo = m.BBRInfo + } + if m.TCPInfo != nil { + self.TCPInfo = m.TCPInfo + } + if m.WSPingInfo != nil { + self.WSPingInfo = m.WSPingInfo + } +} + +func loop( + measurerch <-chan model.Measurement, receiverch <-chan receiver.Measurement, + senderch, serverlog, clientlog chan<- model.Measurement, + cancel context.CancelFunc, +) { + logging.Logger.Debug("mux: start") + defer logging.Logger.Debug("mux: stop") + + state := model.Measurement{} + for measurerch != nil || receiverch != nil { + select { + case m, ok := <-measurerch: + if ok { + serverlog <- m + upsert(&state, m) + if state.WSPingInfo != nil { + // Pong arrived, there is no in-flight ping. Perfect time for another sample. + senderch <- state + state = model.Measurement{} + } + } else { + logging.Logger.Debug("mux: measurerch closed") + measurerch = nil + // Propagate EOF to close websocket when measurer stops ticking. + close(senderch) + } + case m, ok := <-receiverch: + if ok { + if m.IsServerOrigin { // likely, pong frame + serverlog <- m.Measurement + // The sample is forwarded to the client with the next TCPInfo frame. + upsert(&state, m.Measurement) + } else { // json from the client + clientlog <- m.Measurement + } + } else { + logging.Logger.Debug("mux: receiverch closed") + receiverch = nil + // Stop measurer's ticker. Receiver has already finished its duties. + cancel() + } + } + } + close(serverlog) + close(clientlog) +} + +// MuxOutput is a return value of mux.Start to avoid confusion in argument ordering. +type MuxOutput struct { + SenderC <-chan model.Measurement + ServerLog <-chan model.Measurement + ClientLog <-chan model.Measurement +} + +// Start starts the channel multiplexor in a background goroutine. +// +// Liveness guarantees: +// 1) mux drains measurerch, otherwise measurer is deadlocked on send(); +// 2) mux drains receiverch, otherwise receiver MAY become deadlocked on send(clientMessage); +// 3) mux closes output channels when it's done; +// 4) EOF from receiverch is a signal to call |cancel| to terminate early. +func Start(measurerch <-chan model.Measurement, receiverch <-chan receiver.Measurement, cancel context.CancelFunc) MuxOutput { + senderch := make(chan model.Measurement) + serverlog := make(chan model.Measurement) + clientlog := make(chan model.Measurement) + go loop(measurerch, receiverch, senderch, serverlog, clientlog, cancel) + return MuxOutput{ + // ServerLog is not named "ServerC" to avoid visual confusion with "SenderC". + SenderC: senderch, + ServerLog: serverlog, + ClientLog: clientlog, + } +} diff --git a/ndt7/ping/ping.go b/ndt7/ping/ping.go index a5b86f06..eacbff2f 100644 --- a/ndt7/ping/ping.go +++ b/ndt7/ping/ping.go @@ -1,47 +1,32 @@ -// Package ping implements WebSocket PING messages. +// Package ping implements the ndt7 ping test. package ping import ( - "encoding/json" - "errors" + "context" "time" "github.com/gorilla/websocket" + "github.com/m-lab/ndt-server/ndt7/measurer" + "github.com/m-lab/ndt-server/ndt7/ping/mux" + "github.com/m-lab/ndt-server/ndt7/ping/receiver" + "github.com/m-lab/ndt-server/ndt7/ping/sender" + "github.com/m-lab/ndt-server/ndt7/results" + "github.com/m-lab/ndt-server/ndt7/saver" + "github.com/m-lab/ndt-server/ndt7/spec" ) -// The json object is used as a namespace to avoid erratic interpretation of -// unsolicited pong frames. Ping and pong frames are not a part of -// Sec-WebSocket-Protocol, they're part of RFC6455. Section 5.5.3 of the RFC -// allows unsolicited pong frames. Some browsers are known to send unsolicited -// pong frames, see golang/go#6377 . -type pingMessage struct { - Ndt7TS int64 -} - -// SendTicks sends the current ticks as a ping message. -func SendTicks(conn *websocket.Conn, start time.Time, deadline time.Time) error { - msg := pingMessage{ - Ndt7TS: time.Since(start).Nanoseconds(), - } - data, err := json.Marshal(msg) - if err == nil { - err = conn.WriteControl(websocket.PingMessage, data, deadline) - } - return err -} - -func ParseTicks(s string, start time.Time) (elapsed time.Duration, d time.Duration, err error) { - elapsed = time.Since(start) - var msg pingMessage - err = json.Unmarshal([]byte(s), &msg) - if err != nil { - return - } - prev := msg.Ndt7TS - if 0 <= prev && prev <= elapsed.Nanoseconds() { - d = time.Duration(elapsed.Nanoseconds() - prev) - } else { - err = errors.New("RTT is negative") - } - return +// Do implements the ping subtest. The ctx argument is the parent +// context for the subtest. The conn argument is the open WebSocket +// connection. The resultfp argument is the file where to save results. Both +// arguments are owned by the caller of this function. The start argument is +// the test start time used to calculate ElapsedTime and deadlines. +func Do(ctx context.Context, conn *websocket.Conn, resultfp *results.File, start time.Time) { + wholectx, cancel := context.WithTimeout(ctx, spec.MaxRuntime) + // saver.SaveAll() blocks till channels are drained, so cancel() is just for consistency here. + defer cancel() + measurerch := measurer.Start(wholectx, conn, resultfp.Data.UUID, start) + receiverch := receiver.Start(wholectx, conn, start) + x := mux.Start(measurerch, receiverch, cancel) + sender.Start(conn, x.SenderC, start) + saver.SaveAll(resultfp, x.ServerLog, x.ClientLog) } diff --git a/ndt7/ping/receiver/receiver.go b/ndt7/ping/receiver/receiver.go new file mode 100644 index 00000000..f22668ed --- /dev/null +++ b/ndt7/ping/receiver/receiver.go @@ -0,0 +1,101 @@ +// Package mux implements the ping subtest receiver. +package receiver + +import ( + "context" + "encoding/json" + "math" + "time" + + "github.com/gorilla/websocket" + "github.com/m-lab/ndt-server/logging" + "github.com/m-lab/ndt-server/ndt7/model" + "github.com/m-lab/ndt-server/ndt7/ping/message" + "github.com/m-lab/ndt-server/ndt7/spec" +) + +type Measurement struct { + Measurement model.Measurement + IsServerOrigin bool +} + +const ( + maxDuration = math.MaxInt64 * time.Nanosecond +) + +func loop(ctx context.Context, conn *websocket.Conn, receiverch chan<- Measurement, start time.Time) { + logging.Logger.Debug("receiver: start") + defer logging.Logger.Debug("receiver: stop") + defer close(receiverch) + + conn.SetReadLimit(spec.MaxMessageSize) + + deadline, ok := ctx.Deadline() + if !ok { + panic("You passed me a context.Context without deadline") + } + + if err := conn.SetReadDeadline(deadline); err != nil { + logging.Logger.WithError(err).Warn("receiver: conn.SetReadDeadline failed") + return + } + + minRTT := maxDuration + + conn.SetPongHandler(func(s string) error { + elapsed, rtt, err := message.ParseTicks(s, start) + if err == nil { + if rtt < minRTT { + minRTT = rtt + } + m := Measurement{ + Measurement: model.Measurement{ + WSPingInfo: &model.WSPingInfo{ + ElapsedTime: int64(elapsed / time.Microsecond), + LastRTT: int64(rtt / time.Microsecond), + MinRTT: int64(minRTT / time.Microsecond), + }, + }, + IsServerOrigin: true, + } + receiverch <- m + } + return err + }) + + for ctx.Err() == nil { + mtype, mdata, err := conn.ReadMessage() + if err != nil { + if websocket.IsCloseError(err, websocket.CloseNormalClosure) { + return + } + logging.Logger.WithError(err).Warn("receiver: conn.ReadMessage failed") + return + } + if mtype != websocket.TextMessage { + logging.Logger.Warn("receiver: got non-Text message") + return // Unexpected message type + } + m := Measurement{ + IsServerOrigin: false, + } + err = json.Unmarshal(mdata, &m.Measurement) + if err != nil { + logging.Logger.WithError(err).Warn("receiver: json.Unmarshal failed") + return + } + receiverch <- m + } +} + +// Start starts the receiver in a background goroutine. The receiver processes pong frames +// and the measurement messages coming from conn. +// +// Liveness guarantees: +// 1) receiver uses ctx as the deadline for all conn operations and the goroutine itself, +// 2) receiver closes output channels when it's done. +func Start(ctx context.Context, conn *websocket.Conn, start time.Time) <-chan Measurement { + receiverch := make(chan Measurement) + go loop(ctx, conn, receiverch, start) + return receiverch +} diff --git a/ndt7/ping/sender/sender.go b/ndt7/ping/sender/sender.go new file mode 100644 index 00000000..76e44a3d --- /dev/null +++ b/ndt7/ping/sender/sender.go @@ -0,0 +1,61 @@ +// Package sender implements the pingupload sender. +package sender + +import ( + "time" + + "github.com/gorilla/websocket" + "github.com/m-lab/ndt-server/logging" + "github.com/m-lab/ndt-server/ndt7/closer" + "github.com/m-lab/ndt-server/ndt7/model" + "github.com/m-lab/ndt-server/ndt7/ping/message" + "github.com/m-lab/ndt-server/ndt7/spec" +) + +func loop(conn *websocket.Conn, senderch <-chan model.Measurement, start time.Time) { + logging.Logger.Debug("sender: start") + defer logging.Logger.Debug("sender: stop") + + defer func() { + for range senderch { + // drain the channel (in case of error) + } + }() + + deadline := start.Add(spec.MaxRuntime) + + if err := conn.SetWriteDeadline(deadline); err != nil { + logging.Logger.WithError(err).Warn("sender: conn.SetWriteDeadline failed") + return + } + + if err := message.SendTicks(conn, start, deadline); err != nil { + logging.Logger.WithError(err).Warn("sender: ping.message.SendTicks failed") + return + } + + for m := range senderch { + if err := conn.WriteJSON(m); err != nil { + logging.Logger.WithError(err).Warn("sender: conn.WriteJSON failed") + return + } + if err := message.SendTicks(conn, start, deadline); err != nil { + logging.Logger.WithError(err).Warn("sender: ping.message.SendTicks failed") + return + } + } + + closer.StartClosing(conn) +} + +// Start starts the sender in a background goroutine. The sender will send +// to the client the measurement messages coming from senderch. Websocket ping +// frame will be sent right after the message. The sender does not signal errors, +// early cancellation in case of a network error is delegated to the receiver. +// +// Liveness guarantees: +// 1) sender keeps MaxRuntime as a timeout for conn operations; +// 2) sender drains the senderch, otherwise mux is deadlocked on send(TCPInfo). +func Start(conn *websocket.Conn, senderch <-chan model.Measurement, start time.Time) { + go loop(conn, senderch, start) +} diff --git a/ndt7/receiver/receiver.go b/ndt7/receiver/receiver.go index 2086c3df..ffc99bfd 100644 --- a/ndt7/receiver/receiver.go +++ b/ndt7/receiver/receiver.go @@ -6,12 +6,11 @@ import ( "context" "encoding/json" "time" - "math" "github.com/gorilla/websocket" "github.com/m-lab/ndt-server/logging" "github.com/m-lab/ndt-server/ndt7/model" - "github.com/m-lab/ndt-server/ndt7/ping" + "github.com/m-lab/ndt-server/ndt7/ping/message" "github.com/m-lab/ndt-server/ndt7/spec" ) @@ -20,21 +19,15 @@ type receiverKind int const ( downloadReceiver = receiverKind(iota) uploadReceiver - pingReceiver -) - -const ( - MaxDuration = math.MaxInt64 * time.Nanosecond ) func loop( ctx context.Context, conn *websocket.Conn, kind receiverKind, - dst chan<- model.Measurement, start time.Time, pongch chan<- model.WSPingInfo, + dst chan<- model.Measurement, start time.Time, ) { logging.Logger.Debug("receiver: start") defer logging.Logger.Debug("receiver: stop") defer close(dst) - defer close(pongch) conn.SetReadLimit(spec.MaxMessageSize) receiverctx, cancel := context.WithTimeout(ctx, spec.MaxRuntime) defer cancel() @@ -43,21 +36,12 @@ func loop( logging.Logger.WithError(err).Warn("receiver: conn.SetReadDeadline failed") return } - minRTT := MaxDuration conn.SetPongHandler(func(s string) error { - elapsed, rtt, err := ping.ParseTicks(s, start) + _, rtt, err := message.ParseTicks(s, start) if err == nil { - logging.Logger.Debugf("receiver: ApplicationLevel RTT: %d ms", int64(rtt / time.Millisecond)) - if rtt < minRTT { - minRTT = rtt - } - - wsinfo := model.WSPingInfo{ - ElapsedTime: int64(elapsed / time.Microsecond), - LastRTT: int64(rtt / time.Microsecond), - MinRTT: int64(minRTT / time.Microsecond), - } - pongch <- wsinfo // Liveness: buffered (sender) + // Writing rtt to |dst| will write the Measurement to `ClientMeasurements` object. + // That goes against data format, so the value is just logged. + logging.Logger.Debugf("receiver: ApplicationLevel RTT: %d ms", int64(rtt / time.Microsecond)) } return err }) @@ -72,11 +56,11 @@ func loop( } if mtype != websocket.TextMessage { switch kind { - case uploadReceiver: - continue // No further processing required - default: // downloadReceiver and pingReceiver + case downloadReceiver: logging.Logger.Warn("receiver: got non-Text message") return // Unexpected message type + default: + continue // No further processing required } } var measurement model.Measurement @@ -89,15 +73,10 @@ func loop( } } -func startReceiver(ctx context.Context, conn *websocket.Conn, kind receiverKind, start time.Time) (<-chan model.Measurement, <-chan model.WSPingInfo) { - // |dst| is going to the log file +func startReceiver(ctx context.Context, conn *websocket.Conn, kind receiverKind, start time.Time) (<-chan model.Measurement) { dst := make(chan model.Measurement) - // |pongch| goes to the client, it's buffered to avoid blocking on `download.sender.loop` - // while `conn.WritePreparedMessage()` is active. - // TODO(darkk): is it possible to reduce buffer size or to avoiding blocking in some other way? May avoiding L7 pings at /download altogether be the way? - pongch := make(chan model.WSPingInfo, 1 + spec.MaxRuntime / spec.MinPoissonSamplingInterval) - go loop(ctx, conn, kind, dst, start, pongch) - return dst, pongch + go loop(ctx, conn, kind, dst, start) + return dst } // StartDownloadReceiver starts the receiver in a background goroutine and @@ -109,18 +88,13 @@ func startReceiver(ctx context.Context, conn *websocket.Conn, kind receiverKind, // Liveness guarantee: the goroutine will always terminate after a // MaxRuntime timeout, provided that the consumer will keep reading // from the returned channel. -func StartDownloadReceiver(ctx context.Context, conn *websocket.Conn, start time.Time, msmch <-chan model.Measurement) (<-chan model.Measurement, <-chan model.WSPingInfo) { +func StartDownloadReceiver(ctx context.Context, conn *websocket.Conn, start time.Time) (<-chan model.Measurement) { return startReceiver(ctx, conn, downloadReceiver, start) } // StartUploadReceiver is like StartDownloadReceiver except that it // tolerates incoming binary messages, which are sent to cause // network load, and therefore must not be rejected. -func StartUploadReceiver(ctx context.Context, conn *websocket.Conn, start time.Time) (<-chan model.Measurement, <-chan model.WSPingInfo) { +func StartUploadReceiver(ctx context.Context, conn *websocket.Conn, start time.Time) (<-chan model.Measurement) { return startReceiver(ctx, conn, uploadReceiver, start) } - -// StartPingReceiver is exactly like StartDownloadReceiver currently. -func StartPingReceiver(ctx context.Context, conn *websocket.Conn, start time.Time) (<-chan model.Measurement, <-chan model.WSPingInfo) { - return startReceiver(ctx, conn, pingReceiver, start) -} diff --git a/ndt7/upload/sender/sender.go b/ndt7/upload/sender/sender.go index e6c30384..7bc4672b 100644 --- a/ndt7/upload/sender/sender.go +++ b/ndt7/upload/sender/sender.go @@ -8,13 +8,13 @@ import ( "github.com/m-lab/ndt-server/logging" "github.com/m-lab/ndt-server/ndt7/closer" "github.com/m-lab/ndt-server/ndt7/model" - "github.com/m-lab/ndt-server/ndt7/ping" + "github.com/m-lab/ndt-server/ndt7/ping/message" "github.com/m-lab/ndt-server/ndt7/spec" ) func loop( conn *websocket.Conn, src <-chan model.Measurement, - dst chan<- model.Measurement, start time.Time, pongch <-chan model.WSPingInfo, + dst chan<- model.Measurement, start time.Time, ) { logging.Logger.Debug("sender: start") defer logging.Logger.Debug("sender: stop") @@ -23,9 +23,6 @@ func loop( for range src { // make sure we drain the channel } - for range pongch { - // it should be buffered channel, but let's drain it anyway - } }() deadline := start.Add(spec.MaxRuntime) err := conn.SetWriteDeadline(deadline) // Liveness! @@ -34,30 +31,19 @@ func loop( return } for { - select { - case m, ok := <-src: - if !ok { // This means that the previous step has terminated - closer.StartClosing(conn) - return - } - if err := conn.WriteJSON(m); err != nil { - logging.Logger.WithError(err).Warn("sender: conn.WriteJSON failed") - return - } - dst <- m // Liveness: this is blocking - if err := ping.SendTicks(conn, start, deadline); err != nil { - logging.Logger.WithError(err).Warn("sender: ping.SendTicks failed") - return - } - case wsinfo := <-pongch: - m := model.Measurement{ - WSPingInfo: &wsinfo, - } - if err := conn.WriteJSON(m); err != nil { - logging.Logger.WithError(err).Warn("sender: conn.WriteJSON failed") - return - } - dst <- m // Liveness: this is blocking write to log + m, ok := <-src + if !ok { // This means that the previous step has terminated + closer.StartClosing(conn) + return + } + if err := conn.WriteJSON(m); err != nil { + logging.Logger.WithError(err).Warn("sender: conn.WriteJSON failed") + return + } + dst <- m // Liveness: this is blocking + if err := message.SendTicks(conn, start, deadline); err != nil { + logging.Logger.WithError(err).Warn("sender: ping.message.SendTicks failed") + return } } } @@ -70,8 +56,8 @@ func loop( // the MaxRuntime of the subtest, provided that the consumer will // continue reading from the returned channel. This is enforced by // setting the write deadline to |start| + MaxRuntime. -func Start(conn *websocket.Conn, src <-chan model.Measurement, start time.Time, pongch <-chan model.WSPingInfo) <-chan model.Measurement { +func Start(conn *websocket.Conn, src <-chan model.Measurement, start time.Time) <-chan model.Measurement { dst := make(chan model.Measurement) - go loop(conn, src, dst, start, pongch) + go loop(conn, src, dst, start) return dst } diff --git a/ndt7/upload/upload.go b/ndt7/upload/upload.go index 11851b6c..e31dc83a 100644 --- a/ndt7/upload/upload.go +++ b/ndt7/upload/upload.go @@ -23,8 +23,7 @@ func Do(ctx context.Context, conn *websocket.Conn, resultfp *results.File, start // results in the loop below, we terminate the goroutines early wholectx, cancel := context.WithCancel(ctx) defer cancel() - measurerch := measurer.Start(wholectx, conn, resultfp.Data.UUID, start) - receiverch, pongch := receiver.StartUploadReceiver(wholectx, conn, start) - senderch := sender.Start(conn, measurerch, start, pongch) + senderch := sender.Start(conn, measurer.Start(wholectx, conn, resultfp.Data.UUID, start), start) + receiverch := receiver.StartUploadReceiver(wholectx, conn, start) saver.SaveAll(resultfp, senderch, receiverch) } diff --git a/spec/ndt7-protocol.md b/spec/ndt7-protocol.md index f863acd5..827d0f42 100644 --- a/spec/ndt7-protocol.md +++ b/spec/ndt7-protocol.md @@ -292,8 +292,8 @@ Where: - `WSPingInfo` is an _optional_ `object` only included in the measurement when a reasonable websocket-level measurement is available: - - `ElapsedTime` (a `int64`) is the time elapsed since the beginning of - this test, measured in microseconds. + - `ElapsedTime` (a `int64`) is the pong frame arrival time elapsed + since the beginning of this test, measured in microseconds. - `LastRTT` (an _optional_ `int64`), the last observed RTT for the websocket ping-pong exchange, measured in microseconds. From 1c72e32ad26780a33b10b22478bca9fbee6381c0 Mon Sep 17 00:00:00 2001 From: Leonid Evdokimov Date: Thu, 30 Jan 2020 12:05:59 +0300 Subject: [PATCH 6/7] ndt7: add comments on the initial ping frame --- ndt7/download/sender/sender.go | 3 ++- ndt7/ping/sender/sender.go | 2 ++ ndt7/upload/sender/sender.go | 3 +++ 3 files changed, 7 insertions(+), 1 deletion(-) diff --git a/ndt7/download/sender/sender.go b/ndt7/download/sender/sender.go index d161c423..9f9f86cc 100644 --- a/ndt7/download/sender/sender.go +++ b/ndt7/download/sender/sender.go @@ -44,7 +44,8 @@ func loop(conn *websocket.Conn, src <-chan model.Measurement, dst chan<- model.M logging.Logger.WithError(err).Warn("sender: conn.SetWriteDeadline failed") return } - // only the first RTT sample taken before flooding the conn is not affected by HOL + // One RTT sample is taken before flooding the connection with data. + // That sample is not affected by HOL, so it has additional value and is treated specially. if err := message.SendTicks(conn, start, deadline); err != nil { logging.Logger.WithError(err).Warn("sender: ping.message.SendTicks failed") return diff --git a/ndt7/ping/sender/sender.go b/ndt7/ping/sender/sender.go index 76e44a3d..2096e04f 100644 --- a/ndt7/ping/sender/sender.go +++ b/ndt7/ping/sender/sender.go @@ -29,6 +29,8 @@ func loop(conn *websocket.Conn, senderch <-chan model.Measurement, start time.Ti return } + // Initial ping is sent ASAP as all the following pings are sent if an only if pong frame + // has arrived. So, without this ping, no pong arrives and the test gets stuck till deadline. if err := message.SendTicks(conn, start, deadline); err != nil { logging.Logger.WithError(err).Warn("sender: ping.message.SendTicks failed") return diff --git a/ndt7/upload/sender/sender.go b/ndt7/upload/sender/sender.go index 7bc4672b..3b101208 100644 --- a/ndt7/upload/sender/sender.go +++ b/ndt7/upload/sender/sender.go @@ -30,6 +30,9 @@ func loop( logging.Logger.WithError(err).Warn("sender: conn.SetWriteDeadline failed") return } + // There is no "special" first RTT sample for /upload. The server can't control if the client + // has already started flooding the connection. So, if server sends ping frame here, + // the pong frame may be still affected by HOL, like every other ping-pong frame. for { m, ok := <-src if !ok { // This means that the previous step has terminated From 6f3810e4ee6d8c6c224a340771afc47bb1fa079a Mon Sep 17 00:00:00 2001 From: Leonid Evdokimov Date: Tue, 4 Feb 2020 16:38:40 +0300 Subject: [PATCH 7/7] Fix typo --- ndt7/receiver/receiver.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ndt7/receiver/receiver.go b/ndt7/receiver/receiver.go index ffc99bfd..d649ec00 100644 --- a/ndt7/receiver/receiver.go +++ b/ndt7/receiver/receiver.go @@ -41,7 +41,7 @@ func loop( if err == nil { // Writing rtt to |dst| will write the Measurement to `ClientMeasurements` object. // That goes against data format, so the value is just logged. - logging.Logger.Debugf("receiver: ApplicationLevel RTT: %d ms", int64(rtt / time.Microsecond)) + logging.Logger.Debugf("receiver: ApplicationLevel RTT: %d ms", int64(rtt / time.Millisecond)) } return err })