Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -359,20 +359,18 @@ func Test_ServerRequest_Evictable(t *testing.T) {

t.Run("expired but below minimum retention", func(t *testing.T) {
req := newRequest(20 * time.Millisecond)
time.Sleep(60 * time.Millisecond)
require.Eventually(t, func() bool { return req.Expired() }, time.Second, 10*time.Millisecond)
assert.False(t, req.Evictable(200*time.Millisecond))
})

t.Run("expired and retained past minimum retention", func(t *testing.T) {
req := newRequest(20 * time.Millisecond)
time.Sleep(60 * time.Millisecond)
assert.True(t, req.Evictable(10*time.Millisecond))
require.Eventually(t, func() bool { return req.Evictable(10 * time.Millisecond) }, time.Second, 10*time.Millisecond)
})

t.Run("minimum retention elapsed but request timeout still active", func(t *testing.T) {
req := newRequest(200 * time.Millisecond)
time.Sleep(60 * time.Millisecond)
assert.False(t, req.Evictable(10*time.Millisecond))
require.Never(t, func() bool { return req.Evictable(10 * time.Millisecond) }, 100*time.Millisecond, 10*time.Millisecond)
})
}

Expand Down
4 changes: 3 additions & 1 deletion core/capabilities/remote/executable/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,13 +222,15 @@ func (r *server) expireRequests() {
defer r.receiveLock.Unlock()

for requestID, executeReq := range r.requestIDToRequest {
if executeReq.request.Evictable(commoncap.DefaultExecutableRequestTimeout) {
if executeReq.request.Expired() {
ctx, cancelFn := r.stopCh.NewCtx()
err := executeReq.request.Cancel(ctx, types.Error_TIMEOUT, "request expired by executable server")
cancelFn()
if err != nil {
r.lggr.Errorw("failed to cancel request", "request", executeReq, "err", err)
}
}
if executeReq.request.Evictable(commoncap.DefaultExecutableRequestTimeout) {
delete(r.requestIDToRequest, requestID)
delete(r.messageIDToRequestIDsCount, executeReq.messageID)
}
Expand Down
14 changes: 11 additions & 3 deletions core/capabilities/remote/executable/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1060,17 +1060,17 @@ func Test_Server_DuplicateRequestRemainsDedupedPastRequestTimeout(t *testing.T)
}

server.Receive(ctx, msg)
require.Eventually(t, func() bool { return len(dispatcher.sent) == 1 }, time.Second, 10*time.Millisecond)
require.Eventually(t, func() bool { return dispatcher.Len() == 1 }, time.Second, 10*time.Millisecond)

time.Sleep(2 * cfg.RequestTimeout)
server.Receive(ctx, msg)

time.Sleep(100 * time.Millisecond)
require.Len(t, dispatcher.sent, 1)
require.Never(t, func() bool { return dispatcher.Len() > 1 }, 100*time.Millisecond, 10*time.Millisecond)
}

type noopDispatcher struct {
services.StateMachine
mu sync.Mutex
sent []*remotetypes.MessageBody
}

Expand All @@ -1095,6 +1095,14 @@ func (n *noopDispatcher) SetReceiverForMethod(string, uint32, string, remotetype
func (n *noopDispatcher) RemoveReceiverForMethod(string, uint32, string) {}

func (n *noopDispatcher) Send(peerID p2ptypes.PeerID, msgBody *remotetypes.MessageBody) error {
n.mu.Lock()
defer n.mu.Unlock()
n.sent = append(n.sent, msgBody)
return nil
}

func (n *noopDispatcher) Len() int {
n.mu.Lock()
defer n.mu.Unlock()
return len(n.sent)
}
Loading