Skip to content

Commit b2c5b1d

Browse files
committed
gs: Cancel the context when downlink path expired
1 parent 9f00ae4 commit b2c5b1d

2 files changed

Lines changed: 166 additions & 4 deletions

File tree

pkg/gatewayserver/io/udp/udp.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -450,6 +450,8 @@ var (
450450
)
451451

452452
func (s *srv) handleDown(ctx context.Context, st *state) error {
453+
ctx, cancel := context.WithCancel(ctx)
454+
defer cancel()
453455
defer func() {
454456
st.lastDownlinkPath.Store(nil)
455457
st.startHandleDownMu.Lock()

pkg/gatewayserver/io/udp/udp_test.go

Lines changed: 164 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -357,6 +357,164 @@ func TestScheduleLateCancel(t *testing.T) {
357357
// A deadline-exceeded error means nothing was written — the expected outcome.
358358
}
359359

360+
// TestScheduleLateDownlinkPathExpired verifies that a pending late-scheduled
361+
// write goroutine is cancelled when handleDown exits due to the downlink path
362+
// expiring (errDownlinkPathExpired), rather than firing after the path has gone.
363+
// This is a regression test for the goroutine leak where func4 goroutines
364+
// accumulated across handleDown reconnect cycles.
365+
func TestScheduleLateDownlinkPathExpired(t *testing.T) {
366+
t.Parallel()
367+
368+
var (
369+
registeredGatewayID = ttnpb.GatewayIdentifiers{GatewayId: "test-gateway"}
370+
timeout = (1 << 4) * test.Delay
371+
testConfig = Config{
372+
PacketHandlers: 2,
373+
PacketBuffer: 10,
374+
DownlinkPathExpires: 8 * timeout,
375+
ConnectionExpires: 20 * timeout,
376+
ScheduleLateTime: 0,
377+
}
378+
)
379+
380+
a, ctx := test.New(t)
381+
ctx, cancelCtx := context.WithCancel(ctx)
382+
defer cancelCtx()
383+
384+
is, _, closeIS := mockis.New(ctx)
385+
defer closeIS()
386+
387+
c := componenttest.NewComponent(t, &component.Config{
388+
ServiceBase: config.ServiceBase{
389+
FrequencyPlans: config.FrequencyPlansConfig{
390+
ConfigSource: "static",
391+
Static: test.StaticFrequencyPlans,
392+
},
393+
},
394+
})
395+
componenttest.StartComponent(t, c)
396+
defer c.Close()
397+
398+
gs := mock.NewServer(c, is)
399+
addr, _ := net.ResolveUDPAddr("udp", ":0")
400+
lis, err := net.ListenUDP("udp", addr)
401+
if !a.So(err, should.BeNil) {
402+
t.FailNow()
403+
}
404+
405+
go Serve(ctx, gs, lis, testConfig) // nolint:errcheck
406+
407+
connections := &sync.Map{}
408+
eui := types.EUI64{0x06, 0x06, 0x06, 0x06, 0x06, 0x06, 0x06, 0x06}
409+
410+
udpConn, err := net.Dial("udp", lis.LocalAddr().String())
411+
if !a.So(err, should.BeNil) {
412+
t.FailNow()
413+
}
414+
defer udpConn.Close()
415+
416+
// Establish a downlink path by sending PULL_DATA.
417+
pullPacket := generatePullData(eui)
418+
pullPacket.Token = [2]byte{0x00, 0x01}
419+
pullBuf, err := pullPacket.MarshalBinary()
420+
if !a.So(err, should.BeNil) {
421+
t.FailNow()
422+
}
423+
_, err = udpConn.Write(pullBuf)
424+
if !a.So(err, should.BeNil) {
425+
t.FailNow()
426+
}
427+
expectAck(t, udpConn, true, encoding.PullAck, pullPacket.Token)
428+
429+
conn := expectConnection(t, gs, connections, eui, true)
430+
431+
// Sync the gateway clock by sending PUSH_DATA with a known concentrator timestamp.
432+
syncConcentratorTime := 300 * test.Delay
433+
pushPacket := generatePushData(eui, false, syncConcentratorTime)
434+
pushPacket.Token = [2]byte{0x00, 0x02}
435+
pushBuf, err := pushPacket.MarshalBinary()
436+
if !a.So(err, should.BeNil) {
437+
t.FailNow()
438+
}
439+
_, err = udpConn.Write(pushBuf)
440+
if !a.So(err, should.BeNil) {
441+
t.FailNow()
442+
}
443+
clockSynced := time.Now()
444+
expectAck(t, udpConn, true, encoding.PushAck, pushPacket.Token)
445+
time.Sleep(timeout) // ensure the clock sync is processed before scheduling
446+
447+
// Schedule a Class A downlink whose concentrator time is set well beyond the
448+
// downlink-path expiry window. This makes the late-schedule timer (d) larger
449+
// than DownlinkPathExpires, so the timer will not fire on its own before the
450+
// path expires and handleDown exits.
451+
lateConcentratorTime := syncConcentratorTime + 2*testConfig.DownlinkPathExpires
452+
path := &ttnpb.DownlinkPath{
453+
Path: &ttnpb.DownlinkPath_UplinkToken{
454+
UplinkToken: io.MustUplinkToken(
455+
&ttnpb.GatewayAntennaIdentifiers{GatewayIds: &registeredGatewayID},
456+
uint32(lateConcentratorTime/time.Microsecond), // nolint:gosec
457+
scheduling.ConcentratorTime(lateConcentratorTime),
458+
time.Unix(0, int64(lateConcentratorTime)),
459+
nil,
460+
),
461+
},
462+
}
463+
msg := &ttnpb.DownlinkMessage{
464+
RawPayload: []byte{0x01},
465+
Settings: &ttnpb.DownlinkMessage_Request{
466+
Request: &ttnpb.TxRequest{
467+
Class: ttnpb.Class_CLASS_A,
468+
Priority: ttnpb.TxSchedulePriority_NORMAL,
469+
Rx1Delay: ttnpb.RxDelay_RX_DELAY_1,
470+
Rx1DataRate: &ttnpb.DataRate{
471+
Modulation: &ttnpb.DataRate_Lora{
472+
Lora: &ttnpb.LoRaDataRate{
473+
SpreadingFactor: 7,
474+
Bandwidth: 125000,
475+
CodingRate: band.Cr4_5,
476+
},
477+
},
478+
},
479+
Rx1Frequency: 868100000,
480+
FrequencyPlanId: test.EUFrequencyPlanID,
481+
},
482+
},
483+
}
484+
_, _, _, err = conn.ScheduleDown(path, msg)
485+
if !a.So(err, should.BeNil) {
486+
t.FailNow()
487+
}
488+
489+
// Compute when the timer goroutine would fire if not cancelled.
490+
// serverTime(T) = clockSynced + (T - syncConcentratorTime); with ScheduleLateTime=0,
491+
// d = time.Until(serverTime(scheduledTimestamp)).
492+
scheduledTimestamp := time.Duration(msg.GetScheduled().Timestamp) * time.Microsecond
493+
expectedFireTime := clockSynced.Add(-syncConcentratorTime).Add(scheduledTimestamp)
494+
495+
// Give handleDown time to dequeue the message and start the timer goroutine.
496+
// Then stop sending PULL_DATA so the downlink path expires. handleDown detects
497+
// the expired path via its health-check ticker and returns errDownlinkPathExpired,
498+
// which triggers defer cancel() and stops the timer goroutine before it fires.
499+
time.Sleep(timeout)
500+
time.Sleep(2 * testConfig.DownlinkPathExpires)
501+
502+
// Read from the UDP connection until expectedFireTime + margin. A broken
503+
// implementation (missing defer cancel in handleDown) would deliver a PULL_RESP
504+
// near expectedFireTime. With the fix the goroutine exits on path expiry and
505+
// nothing is written.
506+
var buf [65507]byte
507+
udpConn.SetReadDeadline(expectedFireTime.Add(2 * timeout)) // nolint:errcheck,gosec
508+
n, readErr := udpConn.Read(buf[:])
509+
if readErr == nil {
510+
var pkt encoding.Packet
511+
if unmarshalErr := pkt.UnmarshalBinary(buf[:n]); unmarshalErr == nil {
512+
a.So(pkt.PacketType, should.NotEqual, encoding.PullResp)
513+
}
514+
}
515+
// A deadline-exceeded error means nothing was written — the expected outcome.
516+
}
517+
360518
func TestFrontend(t *testing.T) {
361519
t.Parallel()
362520
iotest.Frontend(t, iotest.FrontendConfig{
@@ -533,10 +691,12 @@ func TestRawData(t *testing.T) {
533691
registeredGatewayID = ttnpb.GatewayIdentifiers{GatewayId: "test-gateway"}
534692
timeout = (1 << 4) * test.Delay
535693
testConfig = Config{
536-
PacketHandlers: 2,
537-
PacketBuffer: 10,
538-
DownlinkPathExpires: 8 * timeout,
539-
ConnectionExpires: 20 * timeout,
694+
PacketHandlers: 2,
695+
PacketBuffer: 10,
696+
// DownlinkPathExpires must exceed the ~300*test.Delay late-schedule timer
697+
// used by the TxScheduledLate test cases. 32*timeout gives enough margin.
698+
DownlinkPathExpires: 32 * timeout,
699+
ConnectionExpires: 64 * timeout,
540700
ScheduleLateTime: 0,
541701
}
542702
)

0 commit comments

Comments
 (0)