Skip to content

Commit c293617

Browse files
committed
as: Use read-only client for webhook List without pagination
Avoid using Watch() for non-paginated List() calls in the webhook registry. Watch() holds a Redis connection for the entire transaction duration, which contributes to connection spikes during high traffic. When pagination is not required (the common case for handleUp), we now use ReadOnlyClient() directly, matching the pattern used in other registries (packages/redis, pubsub/redis).
1 parent fc2c2bd commit c293617

3 files changed

Lines changed: 54 additions & 17 deletions

File tree

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@ For details about compatibility between different releases, see the **Commitment
2828
- Add HSTS response headers.
2929
- Draft band definition for Uzbekistan 923Mhz band.
3030

31+
### Fixed
32+
33+
- Application Server webhook registry now uses read-only Redis client for non-paginated list operations, reducing connection holding time during high traffic.
34+
3135
## [3.35.1] - 2025-12-19
3236

3337
## [3.35.0] - 2025-11-19

pkg/applicationserver/io/web/redis/registry.go

Lines changed: 14 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -102,16 +102,19 @@ func (r WebhookRegistry) List(ctx context.Context, ids *ttnpb.ApplicationIdentif
102102
appUID := unique.ID(ctx, ids)
103103
uidKey := r.appKey(appUID)
104104

105-
opts := []ttnredis.FindProtosOption{}
105+
defer trace.StartRegion(ctx, "list webhooks by application id").End()
106+
106107
limit, offset := ttnredis.PaginationLimitAndOffsetFromContext(ctx)
107-
if limit != 0 {
108-
opts = append(opts,
109-
ttnredis.FindProtosSorted(true),
110-
ttnredis.FindProtosWithOffsetAndCount(offset, limit),
111-
)
112-
}
113108

114109
rangeProtos := func(c redis.Cmdable) error {
110+
var opts []ttnredis.FindProtosOption
111+
if limit != 0 {
112+
opts = append(opts,
113+
ttnredis.FindProtosSorted(true),
114+
ttnredis.FindProtosWithOffsetAndCount(offset, limit),
115+
)
116+
}
117+
pbs = make([]*ttnpb.ApplicationWebhook, 0)
115118
return ttnredis.FindProtos(ctx, c, uidKey, r.makeIDKeyFunc(appUID), opts...).Range(
116119
func() (proto.Message, func() (bool, error)) {
117120
pb := &ttnpb.ApplicationWebhook{}
@@ -126,27 +129,21 @@ func (r WebhookRegistry) List(ctx context.Context, ids *ttnpb.ApplicationIdentif
126129
})
127130
}
128131

129-
defer trace.StartRegion(ctx, "list webhooks by application id").End()
130-
131132
var err error
132133
if limit != 0 {
133-
var lockerID string
134-
lockerID, err = ttnredis.GenerateLockerID()
135-
if err != nil {
136-
return nil, err
137-
}
138-
err = ttnredis.LockedWatch(ctx, r.Redis, uidKey, lockerID, r.LockTTL, func(tx *redis.Tx) (err error) {
134+
// Pagination requires Watch() for consistency between SCard and SORT.
135+
err = r.Redis.Watch(ctx, func(tx *redis.Tx) error {
139136
total, err := tx.SCard(ctx, uidKey).Result()
140137
if err != nil {
141138
return err
142139
}
143140
ttnredis.SetPaginationTotal(ctx, total)
144141
return rangeProtos(tx)
145-
})
142+
}, uidKey)
146143
} else {
144+
// No pagination - use client without Watch() to reduce connection holding time.
147145
err = rangeProtos(r.Redis)
148146
}
149-
150147
if err != nil {
151148
return nil, ttnredis.ConvertError(err)
152149
}

pkg/applicationserver/io/web/redis/registry_test.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,42 @@ func TestWebhookRegistry(t *testing.T) {
179179
a.So(webhooks[0].Format, should.Equal, format)
180180
})
181181

182+
t.Run("ListMultipleWithoutPagination", func(t *testing.T) {
183+
t.Parallel()
184+
a, ctx := test.New(t)
185+
186+
appID := &ttnpb.ApplicationIdentifiers{
187+
ApplicationId: "myapp-list-multiple",
188+
}
189+
190+
// Create multiple webhooks
191+
for i := 1; i <= 5; i++ {
192+
whIDs := &ttnpb.ApplicationWebhookIdentifiers{
193+
ApplicationIds: appID,
194+
WebhookId: fmt.Sprintf("webhook-%02d", i),
195+
}
196+
_, err := registry.Set(ctx, whIDs, paths,
197+
func(*ttnpb.ApplicationWebhook) (*ttnpb.ApplicationWebhook, []string, error) {
198+
return &ttnpb.ApplicationWebhook{
199+
Ids: whIDs,
200+
Format: format,
201+
BaseUrl: baseURL,
202+
}, paths, nil
203+
},
204+
)
205+
a.So(err, should.BeNil)
206+
}
207+
208+
// List without pagination context - exercises ReadOnlyClient() path
209+
webhooks, err := registry.List(ctx, appID, paths)
210+
a.So(err, should.BeNil)
211+
a.So(webhooks, should.HaveLength, 5)
212+
for _, wh := range webhooks {
213+
a.So(wh.Ids.ApplicationIds, should.Resemble, appID)
214+
a.So(wh.Format, should.Equal, format)
215+
}
216+
})
217+
182218
t.Run("Pagination", func(t *testing.T) {
183219
t.Parallel()
184220
a, ctx := test.New(t)

0 commit comments

Comments
 (0)