Skip to content

Commit b3dcd70

Browse files
nonsenseraulk
andauthored
Add ChannelStages to keep track of history of lifecycle of a DataTransfer (#163)
* add ChannelStage to keep track of lifecycle of DataTransfer * better errors * fix tests * backward compat in case Stages is nil * remove logger as we dont use it * add godocs to stages objects. * make linter happy * remove `queued data` log Co-authored-by: raulk <raul@protocol.ai>
1 parent 1bccff0 commit b3dcd70

8 files changed

Lines changed: 572 additions & 28 deletions

File tree

channelmonitor/channelmonitor_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -806,6 +806,10 @@ func (m *mockChannelState) LastVoucherResult() datatransfer.VoucherResult {
806806
panic("implement me")
807807
}
808808

809+
func (m *mockChannelState) Stages() *datatransfer.ChannelStages {
810+
panic("implement me")
811+
}
812+
809813
func (m *mockChannelState) ReceivedCids() []cid.Cid {
810814
panic("implement me")
811815
}

channels/channel_state.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,10 @@ type channelState struct {
4949
voucherResultDecoder DecoderByTypeFunc
5050
voucherDecoder DecoderByTypeFunc
5151
channelCIDsReader ChannelCIDsReader
52+
53+
// stages tracks the timeline of events related to a data transfer, for
54+
// traceability purposes.
55+
stages *datatransfer.ChannelStages
5256
}
5357

5458
// EmptyChannelState is the zero value for channel state, meaning not present
@@ -171,6 +175,21 @@ func (c channelState) OtherPeer() peer.ID {
171175
return c.sender
172176
}
173177

178+
// Stages returns the current ChannelStages object, or an empty object.
179+
// It is unsafe for the caller to modify the return value, and changes may not
180+
// be persisted. It should be treated as immutable.
181+
//
182+
// EXPERIMENTAL; subject to change.
183+
func (c channelState) Stages() *datatransfer.ChannelStages {
184+
if c.stages == nil {
185+
// return an empty placeholder; it will be discarded because the caller
186+
// is not supposed to mutate the value anyway.
187+
return &datatransfer.ChannelStages{}
188+
}
189+
190+
return c.stages
191+
}
192+
174193
func fromInternalChannelState(c internal.ChannelState, voucherDecoder DecoderByTypeFunc, voucherResultDecoder DecoderByTypeFunc, channelCIDsReader ChannelCIDsReader) datatransfer.ChannelState {
175194
return channelState{
176195
selfPeer: c.SelfPeer,
@@ -191,6 +210,7 @@ func fromInternalChannelState(c internal.ChannelState, voucherDecoder DecoderByT
191210
voucherResultDecoder: voucherResultDecoder,
192211
voucherDecoder: voucherDecoder,
193212
channelCIDsReader: channelCIDsReader,
213+
stages: c.Stages,
194214
}
195215
}
196216

channels/channels.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,7 @@ func (c *Channels) CreateNew(selfPeer peer.ID, tid datatransfer.TransferID, base
165165
Selector: &cbg.Deferred{Raw: selBytes},
166166
Sender: dataSender,
167167
Recipient: dataReceiver,
168+
Stages: &datatransfer.ChannelStages{},
168169
Vouchers: []internal.EncodedVoucher{
169170
{
170171
Type: voucher.Type(),

channels/channels_fsm.go

Lines changed: 78 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -25,102 +25,139 @@ var transferringStates = []fsm.StateKey{
2525
// ChannelEvents describe the events taht can
2626
var ChannelEvents = fsm.Events{
2727
// Open a channel
28-
fsm.Event(datatransfer.Open).FromAny().To(datatransfer.Requested),
29-
28+
fsm.Event(datatransfer.Open).FromAny().To(datatransfer.Requested).Action(func(chst *internal.ChannelState) error {
29+
chst.AddLog("")
30+
return nil
31+
}),
3032
// Remote peer has accepted the Open channel request
31-
fsm.Event(datatransfer.Accept).From(datatransfer.Requested).To(datatransfer.Ongoing),
32-
33+
fsm.Event(datatransfer.Accept).From(datatransfer.Requested).To(datatransfer.Ongoing).Action(func(chst *internal.ChannelState) error {
34+
chst.AddLog("")
35+
return nil
36+
}),
3337
fsm.Event(datatransfer.Restart).FromAny().ToNoChange().Action(func(chst *internal.ChannelState) error {
3438
chst.Message = ""
39+
chst.AddLog("")
40+
return nil
41+
}),
42+
fsm.Event(datatransfer.Cancel).FromAny().To(datatransfer.Cancelling).Action(func(chst *internal.ChannelState) error {
43+
chst.AddLog("")
44+
return nil
45+
}),
46+
fsm.Event(datatransfer.DataReceived).FromMany(transferringStates...).ToNoChange().Action(func(chst *internal.ChannelState) error {
47+
chst.AddLog("")
3548
return nil
3649
}),
37-
38-
fsm.Event(datatransfer.Cancel).FromAny().To(datatransfer.Cancelling),
39-
40-
fsm.Event(datatransfer.DataReceived).FromMany(transferringStates...).ToNoChange(),
4150
fsm.Event(datatransfer.DataReceivedProgress).FromMany(transferringStates...).ToNoChange().
4251
Action(func(chst *internal.ChannelState, delta uint64) error {
4352
chst.Received += delta
53+
chst.AddLog("received data")
4454
return nil
4555
}),
46-
47-
fsm.Event(datatransfer.DataSent).FromMany(transferringStates...).ToNoChange(),
56+
fsm.Event(datatransfer.DataSent).FromMany(transferringStates...).ToNoChange().Action(func(chst *internal.ChannelState) error {
57+
chst.AddLog("")
58+
return nil
59+
}),
4860
fsm.Event(datatransfer.DataSentProgress).FromMany(transferringStates...).ToNoChange().
4961
Action(func(chst *internal.ChannelState, delta uint64) error {
5062
chst.Sent += delta
63+
chst.AddLog("sending data")
5164
return nil
5265
}),
53-
fsm.Event(datatransfer.DataQueued).FromMany(transferringStates...).ToNoChange(),
66+
fsm.Event(datatransfer.DataQueued).FromMany(transferringStates...).ToNoChange().Action(func(chst *internal.ChannelState) error {
67+
chst.AddLog("")
68+
return nil
69+
}),
5470
fsm.Event(datatransfer.DataQueuedProgress).FromMany(transferringStates...).ToNoChange().
5571
Action(func(chst *internal.ChannelState, delta uint64) error {
5672
chst.Queued += delta
73+
chst.AddLog("")
5774
return nil
5875
}),
59-
6076
fsm.Event(datatransfer.Disconnected).FromAny().ToNoChange().Action(func(chst *internal.ChannelState, err error) error {
6177
chst.Message = err.Error()
78+
chst.AddLog("data transfer disconnected: %s", chst.Message)
6279
return nil
6380
}),
64-
6581
fsm.Event(datatransfer.SendDataError).FromAny().ToNoChange().Action(func(chst *internal.ChannelState, err error) error {
6682
chst.Message = err.Error()
83+
chst.AddLog("data transfer send error: %s", chst.Message)
6784
return nil
6885
}),
69-
7086
fsm.Event(datatransfer.RequestTimedOut).FromAny().ToNoChange().Action(func(chst *internal.ChannelState, err error) error {
7187
chst.Message = err.Error()
88+
chst.AddLog("data transfer request timed out: %s", chst.Message)
7289
return nil
7390
}),
74-
7591
fsm.Event(datatransfer.Error).FromAny().To(datatransfer.Failing).Action(func(chst *internal.ChannelState, err error) error {
7692
chst.Message = err.Error()
93+
chst.AddLog("data transfer erred: %s", chst.Message)
7794
return nil
7895
}),
7996

8097
fsm.Event(datatransfer.NewVoucher).FromAny().ToNoChange().
8198
Action(func(chst *internal.ChannelState, vtype datatransfer.TypeIdentifier, voucherBytes []byte) error {
8299
chst.Vouchers = append(chst.Vouchers, internal.EncodedVoucher{Type: vtype, Voucher: &cbg.Deferred{Raw: voucherBytes}})
100+
chst.AddLog("got new voucher")
83101
return nil
84102
}),
85103
fsm.Event(datatransfer.NewVoucherResult).FromAny().ToNoChange().
86104
Action(func(chst *internal.ChannelState, vtype datatransfer.TypeIdentifier, voucherResultBytes []byte) error {
87105
chst.VoucherResults = append(chst.VoucherResults,
88106
internal.EncodedVoucherResult{Type: vtype, VoucherResult: &cbg.Deferred{Raw: voucherResultBytes}})
107+
chst.AddLog("got new voucher result")
89108
return nil
90109
}),
91110

92111
fsm.Event(datatransfer.PauseInitiator).
93112
FromMany(datatransfer.Requested, datatransfer.Ongoing).To(datatransfer.InitiatorPaused).
94113
From(datatransfer.ResponderPaused).To(datatransfer.BothPaused).
95-
FromAny().ToJustRecord(),
114+
FromAny().ToJustRecord().Action(func(chst *internal.ChannelState) error {
115+
chst.AddLog("")
116+
return nil
117+
}),
96118

97119
fsm.Event(datatransfer.PauseResponder).
98120
FromMany(datatransfer.Requested, datatransfer.Ongoing).To(datatransfer.ResponderPaused).
99121
From(datatransfer.InitiatorPaused).To(datatransfer.BothPaused).
100-
FromAny().ToJustRecord(),
122+
FromAny().ToJustRecord().Action(func(chst *internal.ChannelState) error {
123+
chst.AddLog("")
124+
return nil
125+
}),
101126

102127
fsm.Event(datatransfer.ResumeInitiator).
103128
From(datatransfer.InitiatorPaused).To(datatransfer.Ongoing).
104129
From(datatransfer.BothPaused).To(datatransfer.ResponderPaused).
105-
FromAny().ToJustRecord(),
130+
FromAny().ToJustRecord().Action(func(chst *internal.ChannelState) error {
131+
chst.AddLog("")
132+
return nil
133+
}),
106134

107135
fsm.Event(datatransfer.ResumeResponder).
108136
From(datatransfer.ResponderPaused).To(datatransfer.Ongoing).
109137
From(datatransfer.BothPaused).To(datatransfer.InitiatorPaused).
110138
From(datatransfer.Finalizing).To(datatransfer.Completing).
111-
FromAny().ToJustRecord(),
139+
FromAny().ToJustRecord().Action(func(chst *internal.ChannelState) error {
140+
chst.AddLog("")
141+
return nil
142+
}),
112143

113144
// The transfer has finished on the local node - all data was sent / received
114145
fsm.Event(datatransfer.FinishTransfer).
115146
FromAny().To(datatransfer.TransferFinished).
116147
FromMany(datatransfer.Failing, datatransfer.Cancelling).ToJustRecord().
117148
From(datatransfer.ResponderCompleted).To(datatransfer.Completing).
118-
From(datatransfer.ResponderFinalizing).To(datatransfer.ResponderFinalizingTransferFinished),
149+
From(datatransfer.ResponderFinalizing).To(datatransfer.ResponderFinalizingTransferFinished).Action(func(chst *internal.ChannelState) error {
150+
chst.AddLog("")
151+
return nil
152+
}),
119153

120154
fsm.Event(datatransfer.ResponderBeginsFinalization).
121155
FromAny().To(datatransfer.ResponderFinalizing).
122156
FromMany(datatransfer.Failing, datatransfer.Cancelling).ToJustRecord().
123-
From(datatransfer.TransferFinished).To(datatransfer.ResponderFinalizingTransferFinished),
157+
From(datatransfer.TransferFinished).To(datatransfer.ResponderFinalizingTransferFinished).Action(func(chst *internal.ChannelState) error {
158+
chst.AddLog("")
159+
return nil
160+
}),
124161

125162
// The remote peer sent a Complete message, meaning it has sent / received all data
126163
fsm.Event(datatransfer.ResponderCompletes).
@@ -129,20 +166,35 @@ var ChannelEvents = fsm.Events{
129166
From(datatransfer.ResponderPaused).To(datatransfer.ResponderFinalizing).
130167
From(datatransfer.TransferFinished).To(datatransfer.Completing).
131168
From(datatransfer.ResponderFinalizing).To(datatransfer.ResponderCompleted).
132-
From(datatransfer.ResponderFinalizingTransferFinished).To(datatransfer.Completing),
169+
From(datatransfer.ResponderFinalizingTransferFinished).To(datatransfer.Completing).Action(func(chst *internal.ChannelState) error {
170+
chst.AddLog("")
171+
return nil
172+
}),
133173

134-
fsm.Event(datatransfer.BeginFinalizing).FromAny().To(datatransfer.Finalizing),
174+
fsm.Event(datatransfer.BeginFinalizing).FromAny().To(datatransfer.Finalizing).Action(func(chst *internal.ChannelState) error {
175+
chst.AddLog("")
176+
return nil
177+
}),
135178

136179
// Both the local node and the remote peer have completed the transfer
137-
fsm.Event(datatransfer.Complete).FromAny().To(datatransfer.Completing),
180+
fsm.Event(datatransfer.Complete).FromAny().To(datatransfer.Completing).Action(func(chst *internal.ChannelState) error {
181+
chst.AddLog("")
182+
return nil
183+
}),
138184

139185
fsm.Event(datatransfer.CleanupComplete).
140186
From(datatransfer.Cancelling).To(datatransfer.Cancelled).
141187
From(datatransfer.Failing).To(datatransfer.Failed).
142-
From(datatransfer.Completing).To(datatransfer.Completed),
188+
From(datatransfer.Completing).To(datatransfer.Completed).Action(func(chst *internal.ChannelState) error {
189+
chst.AddLog("")
190+
return nil
191+
}),
143192

144193
// will kickoff state handlers for channels that were cleaning up
145-
fsm.Event(datatransfer.CompleteCleanupOnRestart).FromAny().ToNoChange(),
194+
fsm.Event(datatransfer.CompleteCleanupOnRestart).FromAny().ToNoChange().Action(func(chst *internal.ChannelState) error {
195+
chst.AddLog("")
196+
return nil
197+
}),
146198
}
147199

148200
// ChannelStateEntryFuncs are handlers called as we enter different states

channels/internal/internalchannel.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package internal
22

33
import (
4+
"fmt"
5+
46
"github.com/ipfs/go-cid"
57
peer "github.com/libp2p/go-libp2p-core/peer"
68
cbg "github.com/whyrusleeping/cbor-gen"
@@ -58,4 +60,23 @@ type ChannelState struct {
5860
Message string
5961
Vouchers []EncodedVoucher
6062
VoucherResults []EncodedVoucherResult
63+
64+
// Stages traces the execution fo a data transfer.
65+
//
66+
// EXPERIMENTAL; subject to change.
67+
Stages *datatransfer.ChannelStages
68+
}
69+
70+
// AddLog takes an fmt string with arguments, and adds the formatted string to
71+
// the logs for the current deal stage.
72+
//
73+
// EXPERIMENTAL; subject to change.
74+
func (cs *ChannelState) AddLog(msg string, a ...interface{}) {
75+
if len(a) > 0 {
76+
msg = fmt.Sprintf(msg, a...)
77+
}
78+
79+
stage := datatransfer.Statuses[cs.Status]
80+
81+
cs.Stages.AddLog(stage, msg)
6182
}

channels/internal/internalchannel_cbor_gen.go

Lines changed: 38 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)