Skip to content

Commit 72efec3

Browse files
committed
ns: Retry confirmed class C downlink on timeout
1 parent 26cfe33 commit 72efec3

6 files changed

Lines changed: 181 additions & 16 deletions

File tree

cmd/ttn-lw-stack/commands/start.go

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,12 @@ import (
5353
"go.thethings.network/lorawan-stack/v3/pkg/web"
5454
)
5555

56-
const defaultLockTTL = 10 * time.Second
56+
const (
57+
defaultLockTTL = 10 * time.Second
58+
59+
downlinkKey = "downlink"
60+
pendingDownlinkKey = "pending-downlink"
61+
)
5762

5863
// NewComponentDeviceRegistryRedis instantiates a new redis client with the Component Device Registry namespace.
5964
func NewComponentDeviceRegistryRedis(conf *Config, name string) *redis.Client {
@@ -78,6 +83,12 @@ func NewNetworkServerDownlinkTaskRedis(conf *Config) *redis.Client {
7883
return redis.New(conf.Redis.WithNamespace("ns", "tasks"))
7984
}
8085

86+
// NewNetworkServerPendingDownlinkTaskRedis instantiates a new redis client
87+
// with the Network Server Pending Downlink Task namespace.
88+
func NewNetworkServerPendingDownlinkTaskRedis(conf *Config) *redis.Client {
89+
return redis.New(conf.Redis.WithNamespace("ns", "pending-tasks"))
90+
}
91+
8192
// NewNetworkServerMACSettingsProfileRegistryRedis instantiates a new redis client
8293
// with the Network Server MAC Settings Profile Registry namespace.
8394
func NewNetworkServerMACSettingsProfileRegistryRedis(conf *Config) *redis.Client {
@@ -372,12 +383,25 @@ var startCommand = &cobra.Command{
372383
100000,
373384
redisConsumerGroup,
374385
redis.DefaultStreamBlockLimit,
386+
downlinkKey,
375387
)
376388
if err := downlinkTasks.Init(ctx); err != nil {
377389
return shared.ErrInitializeNetworkServer.WithCause(err)
378390
}
379391
defer downlinkTasks.Close(ctx)
380392
config.NS.DownlinkTaskQueue.Queue = downlinkTasks
393+
pendingDownlinkTasks := nsredis.NewDownlinkTaskQueue(
394+
NewNetworkServerPendingDownlinkTaskRedis(config),
395+
100000,
396+
redisConsumerGroup,
397+
redis.DefaultStreamBlockLimit,
398+
pendingDownlinkKey,
399+
)
400+
if err := pendingDownlinkTasks.Init(ctx); err != nil {
401+
return shared.ErrInitializeNetworkServer.WithCause(err)
402+
}
403+
defer pendingDownlinkTasks.Close(ctx)
404+
config.NS.PendingDownlinkTaskQueue.Queue = pendingDownlinkTasks
381405
config.NS.ScheduledDownlinkMatcher = &nsredis.ScheduledDownlinkMatcher{
382406
Redis: redis.New(config.Cache.Redis.WithNamespace("ns", "scheduled-downlinks")),
383407
}

pkg/networkserver/config.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ type ApplicationUplinkQueueConfig struct {
3434
FastNumConsumers uint64 `name:"fast-num-consumers"`
3535
}
3636

37-
// ApplicationUplinkQueueConfig defines downlink task queue configuration.
37+
// DownlinkTaskQueueConfig defines downlink task queue configuration.
3838
type DownlinkTaskQueueConfig struct {
3939
Queue DownlinkTaskQueue `name:"-"`
4040
NumConsumers uint64 `name:"num-consumers"`
@@ -141,6 +141,7 @@ type Config struct {
141141
ApplicationUplinkQueue ApplicationUplinkQueueConfig `name:"application-uplink-queue"`
142142
Devices DeviceRegistry `name:"-"`
143143
DownlinkTaskQueue DownlinkTaskQueueConfig `name:"downlink-task-queue"`
144+
PendingDownlinkTaskQueue DownlinkTaskQueueConfig `name:"pending-downlink-task-queue"`
144145
UplinkDeduplicator UplinkDeduplicator `name:"-"`
145146
ScheduledDownlinkMatcher ScheduledDownlinkMatcher `name:"-"`
146147
NetID types.NetID `name:"net-id" description:"NetID of this Network Server"` // nolint: lll
@@ -170,6 +171,9 @@ var DefaultConfig = Config{
170171
DownlinkTaskQueue: DownlinkTaskQueueConfig{
171172
NumConsumers: 1,
172173
},
174+
PendingDownlinkTaskQueue: DownlinkTaskQueueConfig{
175+
NumConsumers: 1,
176+
},
173177
DeduplicationWindow: 200 * time.Millisecond,
174178
CooldownWindow: time.Second,
175179
DownlinkPriorities: DownlinkPriorityConfig{

pkg/networkserver/downlink.go

Lines changed: 127 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1473,7 +1473,15 @@ type downlinkAttemptResult struct {
14731473
DownlinkTaskUpdateStrategy downlinkTaskUpdateStrategy
14741474
}
14751475

1476-
func (ns *NetworkServer) attemptClassADataDownlink(ctx context.Context, dev *ttnpb.EndDevice, phy *band.Band, fp *frequencyplans.FrequencyPlan, slot *classADownlinkSlot, maxUpLength uint16) downlinkAttemptResult {
1476+
func (ns *NetworkServer) attemptClassADataDownlink( // nolint:gocyclo
1477+
ctx context.Context,
1478+
dev *ttnpb.EndDevice,
1479+
phy *band.Band,
1480+
fp *frequencyplans.FrequencyPlan,
1481+
slot *classADownlinkSlot,
1482+
maxUpLength uint16,
1483+
profile *ttnpb.MACSettings,
1484+
) downlinkAttemptResult {
14771485
ctx = events.ContextWithCorrelationID(ctx, slot.Uplink.CorrelationIds...)
14781486
if !dev.MacState.RxWindowsAvailable {
14791487
log.FromContext(ctx).Error("RX windows not available, skip class A downlink slot")
@@ -1661,6 +1669,16 @@ func (ns *NetworkServer) attemptClassADataDownlink(ctx context.Context, dev *ttn
16611669
dev.Session.QueuedApplicationDownlinks = dev.Session.QueuedApplicationDownlinks[:0:0]
16621670
}
16631671
recordDataDownlink(dev, genState, genDown.NeedsMACAnswer, down, ns.defaultMACSettings)
1672+
if genState.ApplicationDownlink != nil &&
1673+
genState.ApplicationDownlink.Confirmed &&
1674+
dev.MacState.DeviceClass == ttnpb.Class_CLASS_C {
1675+
timeout := mac.DeviceClassCTimeout(dev, ns.defaultMACSettings, profile)
1676+
taskAt := time.Now().UTC().Add(timeout)
1677+
log.FromContext(ctx).WithField("start_at", taskAt).Debug("Add pending downlink task")
1678+
if err := ns.pendingDownlinkTasks.Add(ctx, dev.Ids, taskAt, true); err != nil {
1679+
log.FromContext(ctx).WithError(err).Warn("Failed to add pending downlink task")
1680+
}
1681+
}
16641682
dev.MacState.PendingRelayDownlink = nil
16651683
return downlinkAttemptResult{
16661684
SetPaths: ttnpb.AddFields(sets,
@@ -1679,7 +1697,15 @@ func (ns *NetworkServer) attemptClassADataDownlink(ctx context.Context, dev *ttn
16791697
}
16801698
}
16811699

1682-
func (ns *NetworkServer) attemptNetworkInitiatedDataDownlink(ctx context.Context, dev *ttnpb.EndDevice, phy *band.Band, fp *frequencyplans.FrequencyPlan, slot *networkInitiatedDownlinkSlot, maxUpLength uint16) downlinkAttemptResult {
1700+
func (ns *NetworkServer) attemptNetworkInitiatedDataDownlink( // nolint:gocyclo
1701+
ctx context.Context,
1702+
dev *ttnpb.EndDevice,
1703+
phy *band.Band,
1704+
fp *frequencyplans.FrequencyPlan,
1705+
slot *networkInitiatedDownlinkSlot,
1706+
maxUpLength uint16,
1707+
profile *ttnpb.MACSettings,
1708+
) downlinkAttemptResult {
16831709
var drIdx ttnpb.DataRateIndex
16841710
var freq uint64
16851711
switch slot.Class {
@@ -1890,6 +1916,16 @@ func (ns *NetworkServer) attemptNetworkInitiatedDataDownlink(ctx context.Context
18901916
}
18911917

18921918
recordDataDownlink(dev, genState, genDown.NeedsMACAnswer, down, ns.defaultMACSettings)
1919+
if genState.ApplicationDownlink != nil &&
1920+
genState.ApplicationDownlink.Confirmed &&
1921+
dev.MacState.DeviceClass == ttnpb.Class_CLASS_C {
1922+
timeout := mac.DeviceClassCTimeout(dev, ns.defaultMACSettings, profile)
1923+
taskAt := time.Now().UTC().Add(timeout)
1924+
log.FromContext(ctx).WithField("start_at", taskAt).Debug("Add pending downlink task")
1925+
if err := ns.pendingDownlinkTasks.Add(ctx, dev.Ids, taskAt, true); err != nil {
1926+
log.FromContext(ctx).WithError(err).Warn("Failed to add pending downlink task")
1927+
}
1928+
}
18931929
if genState.ApplicationDownlink != nil || genState.EvictDownlinkQueueIfScheduled {
18941930
sets = ttnpb.AddFields(sets, "session.queued_application_downlinks")
18951931
}
@@ -2198,7 +2234,7 @@ func (ns *NetworkServer) processDownlinkTask(ctx context.Context, consumerID str
21982234
}
21992235
switch slot := v.(type) {
22002236
case *classADownlinkSlot:
2201-
a := ns.attemptClassADataDownlink(ctx, dev, phy, fp, slot, maxUpLength)
2237+
a := ns.attemptClassADataDownlink(ctx, dev, phy, fp, slot, maxUpLength, profile.GetMacSettings())
22022238
queuedEvents = append(queuedEvents, a.QueuedEvents...)
22032239
queuedApplicationUplinks = append(queuedApplicationUplinks, a.QueuedApplicationUplinks...)
22042240
taskUpdateStrategy = a.DownlinkTaskUpdateStrategy
@@ -2229,7 +2265,7 @@ func (ns *NetworkServer) processDownlinkTask(ctx context.Context, consumerID str
22292265
earliestAt = time.Now().Add(absoluteTimeSchedulingDelay / 2)
22302266
continue
22312267
}
2232-
a := ns.attemptNetworkInitiatedDataDownlink(ctx, dev, phy, fp, slot, maxUpLength)
2268+
a := ns.attemptNetworkInitiatedDataDownlink(ctx, dev, phy, fp, slot, maxUpLength, profile.GetMacSettings())
22332269
queuedEvents = append(queuedEvents, a.QueuedEvents...)
22342270
queuedApplicationUplinks = append(queuedApplicationUplinks, a.QueuedApplicationUplinks...)
22352271
taskUpdateStrategy = a.DownlinkTaskUpdateStrategy
@@ -2273,3 +2309,90 @@ func (ns *NetworkServer) processDownlinkTask(ctx context.Context, consumerID str
22732309
}
22742310
return err
22752311
}
2312+
2313+
const maxConfirmedDownlinkRetries = 8
2314+
2315+
func (ns *NetworkServer) createProcessPendingDownlinkTask(consumerID string) func(context.Context) error {
2316+
return func(ctx context.Context) error {
2317+
return ns.processPendingDownlinkTask(ctx, consumerID)
2318+
}
2319+
}
2320+
2321+
// processPendingDownlinkTask processes the most recent pending downlink task ready for execution,
2322+
// if such is available or wait until it is before processing it.
2323+
// NOTE: ctx.Done() is not guaranteed to be respected by processPendingDownlinkTask.
2324+
// The processPendingDownlinkTask receives the consumerID that will be used for popping
2325+
// from the pending downlink task queue.
2326+
func (ns *NetworkServer) processPendingDownlinkTask(ctx context.Context, consumerID string) error {
2327+
var setErr bool
2328+
err := ns.pendingDownlinkTasks.Pop(
2329+
ctx,
2330+
consumerID,
2331+
func(ctx context.Context, devID *ttnpb.EndDeviceIdentifiers, t time.Time,
2332+
) (time.Time, error) {
2333+
ctx = log.NewContextWithFields(ctx, log.Fields(
2334+
"device_uid", unique.ID(ctx, devID),
2335+
"started_at", time.Now().UTC(),
2336+
))
2337+
logger := log.FromContext(ctx)
2338+
logger.WithField("start_at", t).Debug("Process pending downlink task")
2339+
2340+
dev, ctx, err := ns.devices.SetByID(ctx, devID.ApplicationIds, devID.DeviceId,
2341+
[]string{
2342+
"mac_state",
2343+
"session",
2344+
},
2345+
func(_ context.Context, dev *ttnpb.EndDevice) (*ttnpb.EndDevice, []string, error) {
2346+
if dev == nil {
2347+
logger.Warn("Device not found")
2348+
return nil, nil, nil
2349+
}
2350+
2351+
pendingAppDown := dev.MacState.GetPendingApplicationDownlink()
2352+
if pendingAppDown != nil {
2353+
pendingAppDown.FCnt = dev.Session.LastNFCntDown + 1
2354+
pendingAppDown.ConfirmedRetry.Attempt++
2355+
2356+
if pendingAppDown.ConfirmedRetry.Attempt > maxConfirmedDownlinkRetries {
2357+
dev.MacState.PendingApplicationDownlink = nil
2358+
logger.Warn("Max confirmed downlink retries reached, drop pending application downlink")
2359+
return dev, []string{
2360+
"mac_state.pending_application_downlink",
2361+
}, nil
2362+
}
2363+
2364+
// Enqueue the pending application downlink at the front of the queue.
2365+
// This preserves the order of other queued application downlinks.
2366+
// The pending application downlink will be processed first in the next downlink task.
2367+
// This is important for confirmed downlinks, as we need to ensure that the ACK is received
2368+
// before processing other downlinks.
2369+
dev.Session.QueuedApplicationDownlinks = append(
2370+
[]*ttnpb.ApplicationDownlink{pendingAppDown},
2371+
dev.Session.QueuedApplicationDownlinks...,
2372+
)
2373+
dev.MacState.PendingApplicationDownlink = nil
2374+
}
2375+
return dev, []string{
2376+
"mac_state",
2377+
"session",
2378+
}, nil
2379+
},
2380+
)
2381+
if err != nil {
2382+
setErr = true
2383+
logger.WithError(err).Error("Failed to update device in registry")
2384+
return time.Time{}, err
2385+
}
2386+
2387+
if err := ns.updateDataDownlinkTask(ctx, dev, time.Time{}); err != nil {
2388+
log.FromContext(ctx).WithError(err).Error(
2389+
"Failed to update downlink task queue after processing pending downlink")
2390+
}
2391+
2392+
return time.Time{}, nil
2393+
})
2394+
if err != nil && !setErr {
2395+
log.FromContext(ctx).WithError(err).Error("Failed to pop entry from pending downlink task queue")
2396+
}
2397+
return err
2398+
}

pkg/networkserver/internal/test/shared/redis.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ func NewRedisDeviceRegistry(ctx context.Context) (DeviceRegistry, func()) {
7070
func NewRedisDownlinkTaskQueue(ctx context.Context) (DownlinkTaskQueue, func()) {
7171
tb := test.MustTBFromContext(ctx)
7272
cl, flush := test.NewRedis(ctx, append(redisNamespace[:], "downlink-tasks")...)
73-
q := redis.NewDownlinkTaskQueue(cl, 10000, redisConsumerGroup, testStreamBlockLimit())
73+
q := redis.NewDownlinkTaskQueue(cl, 10000, redisConsumerGroup, testStreamBlockLimit(), "downlink")
7474
if err := q.Init(ctx); err != nil {
7575
tb.Fatalf("Failed to initialize Redis downlink task queue: %s", test.FormatError(err))
7676
}

pkg/networkserver/networkserver.go

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -172,8 +172,9 @@ type NetworkServer struct {
172172

173173
applicationUplinks ApplicationUplinkQueue
174174

175-
downlinkTasks DownlinkTaskQueue
176-
downlinkPriorities DownlinkPriorities
175+
downlinkTasks DownlinkTaskQueue
176+
downlinkPriorities DownlinkPriorities
177+
pendingDownlinkTasks DownlinkTaskQueue
177178

178179
deduplicationWindow windowDurationFunc
179180
collectionWindow windowDurationFunc
@@ -211,6 +212,7 @@ const (
211212
downlinkProcessTaskName = "process_downlink"
212213
applicationUplinkDispatchTaskName = "dispatch_application_uplink"
213214
downlinkDispatchTaskName = "dispatch_downlink"
215+
pendingDownlinkProcessTaskName = "process_pending_downlink"
214216

215217
maxInt = int(^uint(0) >> 1)
216218
)
@@ -230,10 +232,14 @@ func New(c *component.Component, conf *Config, opts ...Option) (*NetworkServer,
230232
panic(errInvalidConfiguration.WithCause(errors.New("Devices is not specified")))
231233
case conf.DownlinkTaskQueue.NumConsumers == 0:
232234
return nil, errInvalidConfiguration.WithCause(errors.New("DownlinkTaskQueue.NumConsumers must be greater than 0"))
235+
case conf.PendingDownlinkTaskQueue.NumConsumers == 0:
236+
return nil, errInvalidConfiguration.WithCause(errors.New("PendingDownlinkTaskQueue.NumConsumers must be greater than 0")) // nolint:lll
233237
case conf.ApplicationUplinkQueue.NumConsumers == 0:
234238
return nil, errInvalidConfiguration.WithCause(errors.New("ApplicationUplinkQueue.NumConsumers must be greater than 0"))
235239
case conf.DownlinkTaskQueue.Queue == nil:
236240
panic(errInvalidConfiguration.WithCause(errors.New("DownlinkTaskQueue is not specified")))
241+
case conf.PendingDownlinkTaskQueue.Queue == nil:
242+
panic(errInvalidConfiguration.WithCause(errors.New("PendingDownlinkTaskQueue is not specified")))
237243
case conf.UplinkDeduplicator == nil:
238244
panic(errInvalidConfiguration.WithCause(errors.New("UplinkDeduplicator is not specified")))
239245
case conf.ScheduledDownlinkMatcher == nil:
@@ -297,6 +303,7 @@ func New(c *component.Component, conf *Config, opts ...Option) (*NetworkServer,
297303
macSettingsProfile: &NsMACSettingsProfileRegistry{registry: conf.MACSettingsProfileRegistry},
298304
macSettingsProfiles: conf.MACSettingsProfileRegistry,
299305
downlinkTasks: conf.DownlinkTaskQueue.Queue,
306+
pendingDownlinkTasks: conf.PendingDownlinkTaskQueue.Queue,
300307
downlinkPriorities: downlinkPriorities,
301308
defaultMACSettings: defaultMACSettings,
302309
interopClient: interopCl,
@@ -358,7 +365,8 @@ func New(c *component.Component, conf *Config, opts ...Option) (*NetworkServer,
358365
for id, dispatcher := range map[string]interface {
359366
Dispatch(context.Context, string) error
360367
}{
361-
downlinkDispatchTaskName: ns.downlinkTasks,
368+
downlinkDispatchTaskName: ns.downlinkTasks,
369+
pendingDownlinkProcessTaskName: ns.pendingDownlinkTasks,
362370
} {
363371
dispatcher := dispatcher
364372
ns.RegisterTask(&task.Config{
@@ -391,6 +399,16 @@ func New(c *component.Component, conf *Config, opts ...Option) (*NetworkServer,
391399
Backoff: processTaskBackoff,
392400
})
393401
}
402+
for i := uint64(0); i < conf.PendingDownlinkTaskQueue.NumConsumers; i++ {
403+
consumerID := fmt.Sprintf("%s:%d", consumerIDPrefix, i)
404+
ns.RegisterTask(&task.Config{
405+
Context: ctx,
406+
ID: fmt.Sprintf("%s_%d", pendingDownlinkProcessTaskName, i),
407+
Func: ns.createProcessPendingDownlinkTask(consumerID),
408+
Restart: task.RestartAlways,
409+
Backoff: processTaskBackoff,
410+
})
411+
}
394412
c.RegisterGRPC(ns)
395413
return ns, nil
396414
}

pkg/networkserver/redis/downlink_task_queue.go

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,20 +29,16 @@ type DownlinkTaskQueue struct {
2929
queue *ttnredis.TaskQueue
3030
}
3131

32-
const (
33-
downlinkKey = "downlink"
34-
)
35-
3632
// NewDownlinkTaskQueue returns new downlink task queue.
3733
func NewDownlinkTaskQueue(
38-
cl *ttnredis.Client, maxLen int64, group string, streamBlockLimit time.Duration,
34+
cl *ttnredis.Client, maxLen int64, group string, streamBlockLimit time.Duration, key string,
3935
) *DownlinkTaskQueue {
4036
return &DownlinkTaskQueue{
4137
queue: &ttnredis.TaskQueue{
4238
Redis: cl,
4339
MaxLen: maxLen,
4440
Group: group,
45-
Key: cl.Key(downlinkKey),
41+
Key: cl.Key(key),
4642
StreamBlockLimit: streamBlockLimit,
4743
},
4844
}

0 commit comments

Comments
 (0)