diff --git a/data/result.go b/data/result.go index 36bb1571..ad2eaa0e 100644 --- a/data/result.go +++ b/data/result.go @@ -45,4 +45,5 @@ type NDTResult struct { // ndt7 Upload *model.ArchivalData `json:",omitempty"` Download *model.ArchivalData `json:",omitempty"` + Ping *model.ArchivalData `json:",omitempty"` } diff --git a/html/ndt7-ping.js b/html/ndt7-ping.js new file mode 100644 index 00000000..b2ed6c04 --- /dev/null +++ b/html/ndt7-ping.js @@ -0,0 +1,22 @@ +/* jshint esversion: 6, asi: true, worker: true */ +// WebWorker that runs the ndt7 ping test +onmessage = function (ev) { + 'use strict' + let url = new URL(ev.data.href) + url.protocol = (url.protocol === 'https:') ? 'wss:' : 'ws:' + url.pathname = '/ndt/v7/ping' + const sock = new WebSocket(url.toString(), 'net.measurementlab.ndt.v7') + sock.onclose = function () { + postMessage(null) + } + sock.onopen = function () { + sock.onmessage = function (ev) { + if (!(ev.data instanceof Blob)) { + let m = JSON.parse(ev.data) + m.Origin = 'server' + m.Test = 'ping' + postMessage(m) + } + } + } +} diff --git a/html/ndt7.html b/html/ndt7.html index e5e18742..fbd3b107 100644 --- a/html/ndt7.html +++ b/html/ndt7.html @@ -24,8 +24,10 @@
+
[Ping]
[Download]
[Upload]
+
diff --git a/ndt-server.go b/ndt-server.go index e9790aed..48706eab 100644 --- a/ndt-server.go +++ b/ndt-server.go @@ -159,6 +159,7 @@ func main() { } ndt7Mux.Handle(spec.DownloadURLPath, http.HandlerFunc(ndt7Handler.Download)) ndt7Mux.Handle(spec.UploadURLPath, http.HandlerFunc(ndt7Handler.Upload)) + ndt7Mux.Handle(spec.PingURLPath, http.HandlerFunc(ndt7Handler.Ping)) ndt7Server := &http.Server{ Addr: *ndt7Addr, Handler: logging.MakeAccessLogHandler(ndt7Mux), diff --git a/ndt7/download/download.go b/ndt7/download/download.go index 832d7a21..9e3700d2 100644 --- a/ndt7/download/download.go +++ b/ndt7/download/download.go @@ -3,6 +3,7 @@ package download import ( "context" + "time" "github.com/gorilla/websocket" "github.com/m-lab/ndt-server/ndt7/download/sender" @@ -15,13 +16,15 @@ import ( // Do implements the download subtest. The ctx argument is the parent // context for the subtest. The conn argument is the open WebSocket // connection. The resultfp argument is the file where to save results. Both -// arguments are owned by the caller of this function. -func Do(ctx context.Context, conn *websocket.Conn, resultfp *results.File) { +// arguments are owned by the caller of this function. The start argument is +// the test start time used to calculate ElapsedTime and deadlines. +func Do(ctx context.Context, conn *websocket.Conn, resultfp *results.File, start time.Time) { // Implementation note: use child context so that, if we cannot save the // results in the loop below, we terminate the goroutines early wholectx, cancel := context.WithCancel(ctx) defer cancel() - senderch := sender.Start(conn, measurer.Start(wholectx, conn, resultfp.Data.UUID)) - receiverch := receiver.StartDownloadReceiver(wholectx, conn) + measurerch := measurer.Start(wholectx, conn, resultfp.Data.UUID, start) + receiverch, pongch := receiver.StartDownloadReceiver(wholectx, conn, start, measurerch) + senderch := sender.Start(conn, measurerch, start, pongch) saver.SaveAll(resultfp, senderch, receiverch) } diff --git a/ndt7/download/sender/sender.go b/ndt7/download/sender/sender.go index 5c4efc03..b7ce17fe 100644 --- a/ndt7/download/sender/sender.go +++ b/ndt7/download/sender/sender.go @@ -22,7 +22,10 @@ func makePreparedMessage(size int) (*websocket.PreparedMessage, error) { return websocket.NewPreparedMessage(websocket.BinaryMessage, data) } -func loop(conn *websocket.Conn, src <-chan model.Measurement, dst chan<- model.Measurement) { +func loop( + conn *websocket.Conn, src <-chan model.Measurement, + dst chan<- model.Measurement, start time.Time, pongch <-chan model.WSInfo, +) { logging.Logger.Debug("sender: start") defer logging.Logger.Debug("sender: stop") defer close(dst) @@ -30,6 +33,9 @@ func loop(conn *websocket.Conn, src <-chan model.Measurement, dst chan<- model.M for range src { // make sure we drain the channel } + for range pongch { + // it should be buffered channel, but let's drain it anyway + } }() logging.Logger.Debug("sender: generating random buffer") bulkMessageSize := 1 << 13 @@ -38,12 +44,17 @@ func loop(conn *websocket.Conn, src <-chan model.Measurement, dst chan<- model.M logging.Logger.WithError(err).Warn("sender: makePreparedMessage failed") return } - deadline := time.Now().Add(spec.MaxRuntime) + deadline := start.Add(spec.MaxRuntime) err = conn.SetWriteDeadline(deadline) // Liveness! if err != nil { logging.Logger.WithError(err).Warn("sender: conn.SetWriteDeadline failed") return } + // only the first RTT sample taken before flooding the conn is not affected by HOL + if err := ping.SendTicks(conn, start, deadline); err != nil { + logging.Logger.WithError(err).Warn("sender: ping.SendTicks failed") + return + } var totalSent int64 for { select { @@ -57,10 +68,19 @@ func loop(conn *websocket.Conn, src <-chan model.Measurement, dst chan<- model.M return } dst <- m // Liveness: this is blocking - if err := ping.SendTicks(conn, deadline); err != nil { + if err := ping.SendTicks(conn, start, deadline); err != nil { logging.Logger.WithError(err).Warn("sender: ping.SendTicks failed") return } + case wsinfo := <-pongch: + m := model.Measurement{ + WSInfo: &wsinfo, + } + if err := conn.WriteJSON(m); err != nil { + logging.Logger.WithError(err).Warn("sender: conn.WriteJSON failed") + return + } + dst <- m // Liveness: this is blocking write to log default: if err := conn.WritePreparedMessage(preparedMessage); err != nil { logging.Logger.WithError(err).Warn( @@ -99,9 +119,9 @@ func loop(conn *websocket.Conn, src <-chan model.Measurement, dst chan<- model.M // Liveness guarantee: the sender will not be stuck sending for more then // the MaxRuntime of the subtest, provided that the consumer will // continue reading from the returned channel. This is enforced by -// setting the write deadline to Time.Now() + MaxRuntime. -func Start(conn *websocket.Conn, src <-chan model.Measurement) <-chan model.Measurement { +// setting the write deadline to |start| + MaxRuntime. +func Start(conn *websocket.Conn, src <-chan model.Measurement, start time.Time, pongch <-chan model.WSInfo) <-chan model.Measurement { dst := make(chan model.Measurement) - go loop(conn, src, dst) + go loop(conn, src, dst, start, pongch) return dst } diff --git a/ndt7/handler/handler.go b/ndt7/handler/handler.go index 0d7a522f..dffaaca7 100644 --- a/ndt7/handler/handler.go +++ b/ndt7/handler/handler.go @@ -36,14 +36,15 @@ func warnAndClose(writer http.ResponseWriter, message string) { // testerFunc is the function implementing a subtest. The first argument // is the subtest context. The second argument is the connected websocket. The // third argument is the open file where to write results. This function does -// not own the second or the third argument. -type testerFunc = func(context.Context, *websocket.Conn, *results.File) +// not own the second or the third argument. The fourth argument is the base +// start time of the test. +type testerFunc = func(context.Context, *websocket.Conn, *results.File, time.Time) // downloadOrUpload implements both download and upload. The writer argument // is the HTTP response writer. The request argument is the HTTP request -// that we received. The kind argument must be spec.SubtestDownload or -// spec.SubtestUpload. The tester is a function actually implementing the -// requested ndt7 subtest. +// that we received. The kind argument must be spec.SubtestDownload, +// spec.SubtestUpload, or SubtestPing. The tester is a function actually +// implementing the requested ndt7 subtest. func (h Handler) downloadOrUpload(writer http.ResponseWriter, request *http.Request, kind spec.SubtestKind, tester testerFunc) { logging.Logger.Debug("downloadOrUpload: upgrading to WebSockets") if request.Header.Get("Sec-WebSocket-Protocol") != spec.SecWebSocketProtocol { @@ -106,6 +107,8 @@ func (h Handler) downloadOrUpload(writer http.ResponseWriter, request *http.Requ result.Download = resultfp.Data } else if kind == spec.SubtestUpload { result.Upload = resultfp.Data + } else if kind == spec.SubtestPing { + result.Ping = resultfp.Data } else { logging.Logger.Warn(string(kind) + ": data not saved") } @@ -114,7 +117,7 @@ func (h Handler) downloadOrUpload(writer http.ResponseWriter, request *http.Requ } warnonerror.Close(resultfp, string(kind)+": ignoring resultfp.Close error") }() - tester(request.Context(), conn, resultfp) + tester(request.Context(), conn, resultfp, result.StartTime) } // Download handles the download subtest. @@ -126,3 +129,8 @@ func (h Handler) Download(writer http.ResponseWriter, request *http.Request) { func (h Handler) Upload(writer http.ResponseWriter, request *http.Request) { h.downloadOrUpload(writer, request, spec.SubtestUpload, upload.Do) } + +// Ping handles the ping subtest. +func (h Handler) Ping(writer http.ResponseWriter, request *http.Request) { + h.downloadOrUpload(writer, request, spec.SubtestPing, upload.Do) +} diff --git a/ndt7/measurer/measurer.go b/ndt7/measurer/measurer.go index da1afa64..fc4c3f2b 100644 --- a/ndt7/measurer/measurer.go +++ b/ndt7/measurer/measurer.go @@ -54,7 +54,7 @@ func measure(measurement *model.Measurement, sockfp *os.File, elapsed time.Durat } } -func loop(ctx context.Context, conn *websocket.Conn, UUID string, dst chan<- model.Measurement) { +func loop(ctx context.Context, conn *websocket.Conn, UUID string, dst chan<- model.Measurement, start time.Time) { logging.Logger.Debug("measurer: start") defer logging.Logger.Debug("measurer: stop") defer close(dst) @@ -66,7 +66,6 @@ func loop(ctx context.Context, conn *websocket.Conn, UUID string, dst chan<- mod return } defer sockfp.Close() - start := time.Now() connectionInfo := &model.ConnectionInfo{ Client: conn.RemoteAddr().String(), Server: conn.LocalAddr().String(), @@ -104,9 +103,9 @@ func loop(ctx context.Context, conn *websocket.Conn, UUID string, dst chan<- mod // a timeout of DefaultRuntime seconds, provided that the consumer // continues reading from the returned channel. func Start( - ctx context.Context, conn *websocket.Conn, UUID string, + ctx context.Context, conn *websocket.Conn, UUID string, start time.Time, ) <-chan model.Measurement { dst := make(chan model.Measurement) - go loop(ctx, conn, UUID, dst) + go loop(ctx, conn, UUID, dst, start) return dst } diff --git a/ndt7/model/measurement.go b/ndt7/model/measurement.go index d0cee8c5..fe24eed7 100644 --- a/ndt7/model/measurement.go +++ b/ndt7/model/measurement.go @@ -8,4 +8,5 @@ type Measurement struct { ConnectionInfo *ConnectionInfo `json:",omitempty" bigquery:"-"` BBRInfo *BBRInfo `json:",omitempty"` TCPInfo *TCPInfo `json:",omitempty"` + WSInfo *WSInfo `json:",omitempty"` } diff --git a/ndt7/model/wsinfo.go b/ndt7/model/wsinfo.go new file mode 100644 index 00000000..53bc51d2 --- /dev/null +++ b/ndt7/model/wsinfo.go @@ -0,0 +1,10 @@ +package model + +// WSInfo contains an application level (websocket) ping measurement data. +// It may be melded into AppInfo. +// FIXME: describe this structure is in the ndt7 specification. +type WSInfo struct { + ElapsedTime int64 + LastRTT int64 // TCPInfo.RTT is smoothed RTT, LastRTT is just a sample. + MinRTT int64 +} diff --git a/ndt7/ping/ping.go b/ndt7/ping/ping.go index 2af7c7ff..cb0378b6 100644 --- a/ndt7/ping/ping.go +++ b/ndt7/ping/ping.go @@ -3,16 +3,15 @@ package ping import ( "encoding/json" + "errors" "time" "github.com/gorilla/websocket" ) // SendTicks sends the current ticks as a ping message. -func SendTicks(conn *websocket.Conn, deadline time.Time) error { - // TODO(bassosimone): when we'll have a unique base time.Time reference for - // the whole test, we should use that, since UnixNano() is not monotonic. - ticks := int64(time.Now().UnixNano()) +func SendTicks(conn *websocket.Conn, start time.Time, deadline time.Time) error { + var ticks int64 = time.Since(start).Nanoseconds() data, err := json.Marshal(ticks) if err == nil { err = conn.WriteControl(websocket.PingMessage, data, deadline) @@ -20,13 +19,17 @@ func SendTicks(conn *websocket.Conn, deadline time.Time) error { return err } -func ParseTicks(s string) (d int64, err error) { - // TODO(bassosimone): when we'll have a unique base time.Time reference for - // the whole test, we should use that, since UnixNano() is not monotonic. +func ParseTicks(s string, start time.Time) (elapsed time.Duration, d time.Duration, err error) { + elapsed = time.Since(start) var prev int64 err = json.Unmarshal([]byte(s), &prev) - if err == nil { - d = (int64(time.Now().UnixNano()) - prev) + if err != nil { + return + } + if 0 <= prev && prev <= elapsed.Nanoseconds() { + d = time.Duration(elapsed.Nanoseconds() - prev) + } else { + err = errors.New("RTT is negative") } return } diff --git a/ndt7/receiver/receiver.go b/ndt7/receiver/receiver.go index ab423ed2..3e8a6ee5 100644 --- a/ndt7/receiver/receiver.go +++ b/ndt7/receiver/receiver.go @@ -6,6 +6,7 @@ import ( "context" "encoding/json" "time" + "math" "github.com/gorilla/websocket" "github.com/m-lab/ndt-server/logging" @@ -19,28 +20,44 @@ type receiverKind int const ( downloadReceiver = receiverKind(iota) uploadReceiver + pingReceiver +) + +const ( + MaxDuration = math.MaxInt64 * time.Nanosecond ) func loop( ctx context.Context, conn *websocket.Conn, kind receiverKind, - dst chan<- model.Measurement, + dst chan<- model.Measurement, start time.Time, pongch chan<- model.WSInfo, ) { logging.Logger.Debug("receiver: start") defer logging.Logger.Debug("receiver: stop") defer close(dst) + defer close(pongch) conn.SetReadLimit(spec.MaxMessageSize) receiverctx, cancel := context.WithTimeout(ctx, spec.MaxRuntime) defer cancel() - err := conn.SetReadDeadline(time.Now().Add(spec.MaxRuntime)) // Liveness! + err := conn.SetReadDeadline(start.Add(spec.MaxRuntime)) // Liveness! if err != nil { logging.Logger.WithError(err).Warn("receiver: conn.SetReadDeadline failed") return } + minRTT := MaxDuration conn.SetPongHandler(func(s string) error { - rtt, err := ping.ParseTicks(s) + elapsed, rtt, err := ping.ParseTicks(s, start) if err == nil { - rtt /= int64(time.Millisecond) - logging.Logger.Debugf("receiver: ApplicationLevel RTT: %d ms", rtt) + logging.Logger.Debugf("receiver: ApplicationLevel RTT: %d ms", int64(rtt / time.Millisecond)) + if rtt < minRTT { + minRTT = rtt + } + + wsinfo := model.WSInfo{ + ElapsedTime: int64(elapsed / time.Microsecond), + LastRTT: int64(rtt / time.Microsecond), + MinRTT: int64(minRTT / time.Microsecond), + } + pongch <- wsinfo // Liveness: buffered (sender) } return err }) @@ -55,11 +72,11 @@ func loop( } if mtype != websocket.TextMessage { switch kind { - case downloadReceiver: + case uploadReceiver: + continue // No further processing required + default: // downloadReceiver and pingReceiver logging.Logger.Warn("receiver: got non-Text message") return // Unexpected message type - default: - continue // No further processing required } } var measurement model.Measurement @@ -72,10 +89,15 @@ func loop( } } -func start(ctx context.Context, conn *websocket.Conn, kind receiverKind) <-chan model.Measurement { +func startReceiver(ctx context.Context, conn *websocket.Conn, kind receiverKind, start time.Time) (<-chan model.Measurement, <-chan model.WSInfo) { + // |dst| is going to the log file dst := make(chan model.Measurement) - go loop(ctx, conn, kind, dst) - return dst + // |pongch| goes to the client, it's buffered to avoid blocking on `download.sender.loop` + // while `conn.WritePreparedMessage()` is active. + // TODO(darkk): is it possible to reduce buffer size or to avoiding blocking in some other way? May avoiding L7 pings at /download altogether be the way? + pongch := make(chan model.WSInfo, 1 + spec.MaxRuntime / spec.MinPoissonSamplingInterval) + go loop(ctx, conn, kind, dst, start, pongch) + return dst, pongch } // StartDownloadReceiver starts the receiver in a background goroutine and @@ -87,13 +109,18 @@ func start(ctx context.Context, conn *websocket.Conn, kind receiverKind) <-chan // Liveness guarantee: the goroutine will always terminate after a // MaxRuntime timeout, provided that the consumer will keep reading // from the returned channel. -func StartDownloadReceiver(ctx context.Context, conn *websocket.Conn) <-chan model.Measurement { - return start(ctx, conn, downloadReceiver) +func StartDownloadReceiver(ctx context.Context, conn *websocket.Conn, start time.Time, msmch <-chan model.Measurement) (<-chan model.Measurement, <-chan model.WSInfo) { + return startReceiver(ctx, conn, downloadReceiver, start) } // StartUploadReceiver is like StartDownloadReceiver except that it // tolerates incoming binary messages, which are sent to cause // network load, and therefore must not be rejected. -func StartUploadReceiver(ctx context.Context, conn *websocket.Conn) <-chan model.Measurement { - return start(ctx, conn, uploadReceiver) +func StartUploadReceiver(ctx context.Context, conn *websocket.Conn, start time.Time) (<-chan model.Measurement, <-chan model.WSInfo) { + return startReceiver(ctx, conn, uploadReceiver, start) +} + +// StartPingReceiver is exactly like StartDownloadReceiver currently. +func StartPingReceiver(ctx context.Context, conn *websocket.Conn, start time.Time) (<-chan model.Measurement, <-chan model.WSInfo) { + return startReceiver(ctx, conn, pingReceiver, start) } diff --git a/ndt7/results/file.go b/ndt7/results/file.go index 8786c429..016dc293 100644 --- a/ndt7/results/file.go +++ b/ndt7/results/file.go @@ -66,8 +66,8 @@ func newFile(datadir, what, uuid string) (*File, error) { // containing the metadata. The conn argument is used to retrieve the local and // the remote endpoints addresses. The "datadir" argument specifies the // directory on disk to write the data into and the what argument should -// indicate whether this is a spec.SubtestDownload or a spec.SubtestUpload -// ndt7 measurement. +// indicate whether this is a spec.SubtestDownload, a spec.SubtestUpload +// or a spec.SubtestPing ndt7 measurement. func OpenFor(request *http.Request, conn *websocket.Conn, datadir string, what spec.SubtestKind) (*File, error) { meta := make(metadata, 0) netConn := conn.UnderlyingConn() diff --git a/ndt7/spec/spec.go b/ndt7/spec/spec.go index 6d47d812..8d7c8522 100644 --- a/ndt7/spec/spec.go +++ b/ndt7/spec/spec.go @@ -9,6 +9,9 @@ const DownloadURLPath = "/ndt/v7/download" // UploadURLPath selects the upload subtest. const UploadURLPath = "/ndt/v7/upload" +// PingURLPath selects the ping subtest. +const PingURLPath = "/ndt/v7/ping" + // SecWebSocketProtocol is the WebSocket subprotocol used by ndt7. const SecWebSocketProtocol = "net.measurementlab.ndt.v7" @@ -57,4 +60,7 @@ const ( // SubtestUpload is a upload subtest SubtestUpload = SubtestKind("upload") + + // SubtestPing is a ping subtest + SubtestPing = SubtestKind("ping") ) diff --git a/ndt7/upload/sender/sender.go b/ndt7/upload/sender/sender.go index ff46a130..eee5c007 100644 --- a/ndt7/upload/sender/sender.go +++ b/ndt7/upload/sender/sender.go @@ -14,7 +14,7 @@ import ( func loop( conn *websocket.Conn, src <-chan model.Measurement, - dst chan<- model.Measurement, + dst chan<- model.Measurement, start time.Time, pongch <-chan model.WSInfo, ) { logging.Logger.Debug("sender: start") defer logging.Logger.Debug("sender: stop") @@ -23,27 +23,41 @@ func loop( for range src { // make sure we drain the channel } + for range pongch { + // it should be buffered channel, but let's drain it anyway + } }() - deadline := time.Now().Add(spec.MaxRuntime) + deadline := start.Add(spec.MaxRuntime) err := conn.SetWriteDeadline(deadline) // Liveness! if err != nil { logging.Logger.WithError(err).Warn("sender: conn.SetWriteDeadline failed") return } for { - m, ok := <-src - if !ok { // This means that the previous step has terminated - closer.StartClosing(conn) - return - } - if err := conn.WriteJSON(m); err != nil { - logging.Logger.WithError(err).Warn("sender: conn.WriteJSON failed") - return - } - dst <- m // Liveness: this is blocking - if err := ping.SendTicks(conn, deadline); err != nil { - logging.Logger.WithError(err).Warn("sender: ping.SendTicks failed") - return + select { + case m, ok := <-src: + if !ok { // This means that the previous step has terminated + closer.StartClosing(conn) + return + } + if err := conn.WriteJSON(m); err != nil { + logging.Logger.WithError(err).Warn("sender: conn.WriteJSON failed") + return + } + dst <- m // Liveness: this is blocking + if err := ping.SendTicks(conn, start, deadline); err != nil { + logging.Logger.WithError(err).Warn("sender: ping.SendTicks failed") + return + } + case wsinfo := <-pongch: + m := model.Measurement{ + WSInfo: &wsinfo, + } + if err := conn.WriteJSON(m); err != nil { + logging.Logger.WithError(err).Warn("sender: conn.WriteJSON failed") + return + } + dst <- m // Liveness: this is blocking write to log } } } @@ -55,9 +69,9 @@ func loop( // Liveness guarantee: the sender will not be stuck sending for more then // the MaxRuntime of the subtest, provided that the consumer will // continue reading from the returned channel. This is enforced by -// setting the write deadline to MaxRuntime + time.Now. -func Start(conn *websocket.Conn, src <-chan model.Measurement) <-chan model.Measurement { +// setting the write deadline to |start| + MaxRuntime. +func Start(conn *websocket.Conn, src <-chan model.Measurement, start time.Time, pongch <-chan model.WSInfo) <-chan model.Measurement { dst := make(chan model.Measurement) - go loop(conn, src, dst) + go loop(conn, src, dst, start, pongch) return dst } diff --git a/ndt7/upload/upload.go b/ndt7/upload/upload.go index 511c3a70..11851b6c 100644 --- a/ndt7/upload/upload.go +++ b/ndt7/upload/upload.go @@ -3,6 +3,7 @@ package upload import ( "context" + "time" "github.com/gorilla/websocket" "github.com/m-lab/ndt-server/ndt7/results" @@ -15,13 +16,15 @@ import ( // Do implements the upload subtest. The ctx argument is the parent context // for the subtest. The conn argument is the open WebSocket connection. The // resultfp argument is the file where to save results. Both arguments are -// owned by the caller of this function. -func Do(ctx context.Context, conn *websocket.Conn, resultfp *results.File) { +// owned by the caller of this function. The start argument is the test +// start time used to calculate ElapsedTime and deadlines. +func Do(ctx context.Context, conn *websocket.Conn, resultfp *results.File, start time.Time) { // Implementation note: use child context so that, if we cannot save the // results in the loop below, we terminate the goroutines early wholectx, cancel := context.WithCancel(ctx) defer cancel() - senderch := sender.Start(conn, measurer.Start(wholectx, conn, resultfp.Data.UUID)) - receiverch := receiver.StartUploadReceiver(wholectx, conn) + measurerch := measurer.Start(wholectx, conn, resultfp.Data.UUID, start) + receiverch, pongch := receiver.StartUploadReceiver(wholectx, conn, start) + senderch := sender.Start(conn, measurerch, start, pongch) saver.SaveAll(resultfp, senderch, receiverch) }