@@ -196,9 +196,52 @@ func needsSplitting(m *raftpb.Message) bool {
196196}
197197
198198func (p * peer ) sendProcessMessage (ctx context.Context , m raftpb.Message ) error {
199- ctx , cancel := context .WithTimeout (ctx , p .tr .config .SendTimeout )
199+ // These lines used to be in the code, but they've been removed. I'm
200+ // leaving them in in a comment just in case they cause some unforeseen
201+ // breakage later, to show why they were removed.
202+ //
203+ // ctx, cancel := context.WithTimeout(ctx, p.tr.config.SendTimeout)
204+ // defer cancel()
205+ //
206+ // Basically, these lines created a timeout that applied not to each chunk
207+ // of a streaming message, but to the whole streaming process. With a
208+ // sufficiently large raft log, the bandwidth on some connections can not
209+ // physically be enough to fit within the default 2 second timeout.
210+ // Further, it seems that because of some gRPC magic, the timeout was
211+ // getting propagated to the stream *server*, meaning it wasn't even the
212+ // sender timing out, it was the receiver.
213+ //
214+ // It should be fine to remove this timeout. The whole purpose of this
215+ // method is to send very large raft messages that could take several
216+ // seconds to send.
217+
218+ ctx , cancel := context .WithCancel (ctx )
200219 defer cancel ()
201220
221+ // This is a bootleg watchdog timer. If the timer elapses without something
222+ // being written to the bump channel, it will cancel the context.
223+ //
224+ // We use this because the operations on this stream *must* either time out
225+ // or succeed for raft to function correctly. We can't just time out the
226+ // whole operation, because of the reasons stated above. But we also only
227+ // set the context once, when we create the stream, and so can't set an
228+ // individual timeout for each stream operation.
229+ //
230+ // By doing it as this watchdog-type structure, we can time out individual
231+ // operations by canceling the context on our own terms.
232+ bump := make (chan struct {})
233+ go func () {
234+ for {
235+ select {
236+ case <- bump :
237+ case <- time .After (p .tr .config .SendTimeout ):
238+ cancel ()
239+ case <- ctx .Done ():
240+ return
241+ }
242+ }
243+ }()
244+
202245 var err error
203246 var stream api.Raft_StreamRaftMessageClient
204247 stream , err = api .NewRaftClient (p .conn ()).StreamRaftMessage (ctx )
@@ -217,6 +260,17 @@ func (p *peer) sendProcessMessage(ctx context.Context, m raftpb.Message) error {
217260 // Stream
218261 for _ , msg := range msgs {
219262 err = stream .Send (& msg )
263+
264+ // If the send succeeds, bump the watchdog timer.
265+ //
266+ // We cannot just do a naked send to the bump channel. If we try to
267+ // send, for example, and the timer has elapsed, then the context
268+ // will have been canceled, the watchdog loop will have exited, and
269+ // there would be no receiver. We'd block here forever.
270+ select {
271+ case bump <- struct {}{}:
272+ case <- ctx .Done ():
273+ }
220274 if err != nil {
221275 log .G (ctx ).WithError (err ).Error ("error streaming message to peer" )
222276 stream .CloseAndRecv ()
0 commit comments