Skip to content

Commit 9f00ae4

Browse files
authored
Merge pull request #7868 from TheThingsNetwork/fix/schedule-too-far-ahead
Do not schedule too far ahead
2 parents 124bc63 + a56452c commit 9f00ae4

10 files changed

Lines changed: 262 additions & 13 deletions

File tree

config/messages.json

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5687,6 +5687,15 @@
56875687
"file": "scheduler.go"
56885688
}
56895689
},
5690+
"error:pkg/gatewayserver/scheduling:schedule_too_far_ahead": {
5691+
"translations": {
5692+
"en": "schedule time is too far in the future"
5693+
},
5694+
"description": {
5695+
"package": "pkg/gatewayserver/scheduling",
5696+
"file": "sub_band.go"
5697+
}
5698+
},
56905699
"error:pkg/gatewayserver/scheduling:sub_band_not_found": {
56915700
"translations": {
56925701
"en": "sub-band not found for frequency `{frequency}` Hz"

pkg/gatewayserver/io/io.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -601,7 +601,7 @@ func (c *Connection) ScheduleDown(path *ttnpb.DownlinkPath, msg *ttnpb.DownlinkM
601601
logger := logger.WithFields(log.Fields(
602602
"rx_window", i+1,
603603
"frequency", rx.frequency,
604-
"data_rate", rx.dataRate,
604+
"data_rate", rx.dataRate.String(),
605605
))
606606
logger.Debug("Attempt to schedule downlink in receive window")
607607
// The maximum payload size is MACPayload only; for PHYPayload take MHDR (1 byte) and MIC (4 bytes) into account.
@@ -610,7 +610,7 @@ func (c *Connection) ScheduleDown(path *ttnpb.DownlinkPath, msg *ttnpb.DownlinkM
610610
return false, false, 0, errTooLong.WithAttributes(
611611
"payload_length", len(msg.RawPayload),
612612
"maximum_length", maxPHYLength,
613-
"data_rate", rx.dataRate,
613+
"data_rate", rx.dataRate.String(),
614614
)
615615
}
616616
eirp := phy.DefaultMaxEIRP

pkg/gatewayserver/io/udp/udp.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -530,7 +530,16 @@ func (s *srv) handleDown(ctx context.Context, st *state) error {
530530
st.clockMu.RUnlock()
531531
d := time.Until(serverTime.Add(-s.config.ScheduleLateTime))
532532
logger.WithField("duration", d).Debug("Wait to schedule downlink message late")
533-
time.AfterFunc(d, write)
533+
go func() {
534+
t := time.NewTimer(d)
535+
defer t.Stop()
536+
select {
537+
case <-t.C:
538+
write()
539+
case <-ctx.Done():
540+
case <-st.io.Context().Done():
541+
}
542+
}()
534543
case <-healthCheck.C:
535544
if st.isPullPathActive(s.config.DownlinkPathExpires) {
536545
break

pkg/gatewayserver/io/udp/udp_test.go

Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,161 @@ func TestConnection(t *testing.T) {
202202
cancelCtx()
203203
}
204204

205+
// TestScheduleLateCancel verifies that cancelling a connection context stops
206+
// a pending late-scheduled write goroutine before it fires, rather than
207+
// letting it write to the gateway after the connection has been torn down.
208+
// This is a regression test for the time.AfterFunc timer leak in handleDown.
209+
func TestScheduleLateCancel(t *testing.T) {
210+
t.Parallel()
211+
212+
var (
213+
registeredGatewayID = ttnpb.GatewayIdentifiers{GatewayId: "test-gateway"}
214+
timeout = (1 << 4) * test.Delay
215+
testConfig = Config{
216+
PacketHandlers: 2,
217+
PacketBuffer: 10,
218+
DownlinkPathExpires: 8 * timeout,
219+
ConnectionExpires: 20 * timeout,
220+
ScheduleLateTime: 0,
221+
}
222+
)
223+
224+
a, ctx := test.New(t)
225+
ctx, cancelCtx := context.WithCancel(ctx)
226+
defer cancelCtx()
227+
228+
is, _, closeIS := mockis.New(ctx)
229+
defer closeIS()
230+
231+
c := componenttest.NewComponent(t, &component.Config{
232+
ServiceBase: config.ServiceBase{
233+
FrequencyPlans: config.FrequencyPlansConfig{
234+
ConfigSource: "static",
235+
Static: test.StaticFrequencyPlans,
236+
},
237+
},
238+
})
239+
componenttest.StartComponent(t, c)
240+
defer c.Close()
241+
242+
gs := mock.NewServer(c, is)
243+
addr, _ := net.ResolveUDPAddr("udp", ":0")
244+
lis, err := net.ListenUDP("udp", addr)
245+
if !a.So(err, should.BeNil) {
246+
t.FailNow()
247+
}
248+
249+
go Serve(ctx, gs, lis, testConfig) // nolint:errcheck
250+
251+
connections := &sync.Map{}
252+
eui := types.EUI64{0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05}
253+
254+
udpConn, err := net.Dial("udp", lis.LocalAddr().String())
255+
if !a.So(err, should.BeNil) {
256+
t.FailNow()
257+
}
258+
defer udpConn.Close()
259+
260+
// Establish a downlink path by sending PULL_DATA.
261+
pullPacket := generatePullData(eui)
262+
pullPacket.Token = [2]byte{0x00, 0x01}
263+
pullBuf, err := pullPacket.MarshalBinary()
264+
if !a.So(err, should.BeNil) {
265+
t.FailNow()
266+
}
267+
_, err = udpConn.Write(pullBuf)
268+
if !a.So(err, should.BeNil) {
269+
t.FailNow()
270+
}
271+
expectAck(t, udpConn, true, encoding.PullAck, pullPacket.Token)
272+
273+
conn := expectConnection(t, gs, connections, eui, true)
274+
275+
// Sync the gateway clock by sending PUSH_DATA with a known concentrator timestamp.
276+
syncConcentratorTime := 300 * test.Delay
277+
pushPacket := generatePushData(eui, false, syncConcentratorTime)
278+
pushPacket.Token = [2]byte{0x00, 0x02}
279+
pushBuf, err := pushPacket.MarshalBinary()
280+
if !a.So(err, should.BeNil) {
281+
t.FailNow()
282+
}
283+
_, err = udpConn.Write(pushBuf)
284+
if !a.So(err, should.BeNil) {
285+
t.FailNow()
286+
}
287+
clockSynced := time.Now()
288+
expectAck(t, udpConn, true, encoding.PushAck, pushPacket.Token)
289+
time.Sleep(timeout) // ensure the clock sync is processed before scheduling
290+
291+
// Schedule a Class A downlink. No TxAck has been received yet, so
292+
// canImmediate=false; with a synced clock, handleDown takes the late-schedule
293+
// path and starts a goroutine with a timer for d = time.Until(serverTime).
294+
path := &ttnpb.DownlinkPath{
295+
Path: &ttnpb.DownlinkPath_UplinkToken{
296+
UplinkToken: io.MustUplinkToken(
297+
&ttnpb.GatewayAntennaIdentifiers{GatewayIds: &registeredGatewayID},
298+
uint32(syncConcentratorTime/time.Microsecond), // nolint:gosec
299+
scheduling.ConcentratorTime(syncConcentratorTime),
300+
time.Unix(0, int64(syncConcentratorTime)),
301+
nil,
302+
),
303+
},
304+
}
305+
msg := &ttnpb.DownlinkMessage{
306+
RawPayload: []byte{0x01},
307+
Settings: &ttnpb.DownlinkMessage_Request{
308+
Request: &ttnpb.TxRequest{
309+
Class: ttnpb.Class_CLASS_A,
310+
Priority: ttnpb.TxSchedulePriority_NORMAL,
311+
Rx1Delay: ttnpb.RxDelay_RX_DELAY_1,
312+
Rx1DataRate: &ttnpb.DataRate{
313+
Modulation: &ttnpb.DataRate_Lora{
314+
Lora: &ttnpb.LoRaDataRate{
315+
SpreadingFactor: 7,
316+
Bandwidth: 125000,
317+
CodingRate: band.Cr4_5,
318+
},
319+
},
320+
},
321+
Rx1Frequency: 868100000,
322+
FrequencyPlanId: test.EUFrequencyPlanID,
323+
},
324+
},
325+
}
326+
_, _, _, err = conn.ScheduleDown(path, msg)
327+
if !a.So(err, should.BeNil) {
328+
t.FailNow()
329+
}
330+
331+
// Compute the wall-clock time at which the timer goroutine would call write().
332+
// serverTime(T) = clockSynced + (T - syncConcentratorTime); with ScheduleLateTime=0,
333+
// d = time.Until(serverTime(scheduledTimestamp)).
334+
scheduledTimestamp := time.Duration(msg.GetScheduled().Timestamp) * time.Microsecond
335+
expectedFireTime := clockSynced.Add(-syncConcentratorTime).Add(scheduledTimestamp)
336+
337+
// Give handleDown time to dequeue the message and start the timer goroutine.
338+
time.Sleep(timeout)
339+
340+
// Cancel the connection. The goroutine must observe ctx.Done() and exit
341+
// without calling write(), so no PULL_RESP should be sent to the gateway.
342+
conn.Disconnect(context.Canceled)
343+
344+
// Read from the UDP connection until expectedFireTime + margin. A broken
345+
// implementation (time.AfterFunc) would deliver a PULL_RESP near
346+
// expectedFireTime. With the fix the goroutine exits on cancel and nothing
347+
// is written.
348+
var buf [65507]byte
349+
udpConn.SetReadDeadline(expectedFireTime.Add(2 * timeout)) // nolint:errcheck,gosec
350+
n, readErr := udpConn.Read(buf[:])
351+
if readErr == nil {
352+
var pkt encoding.Packet
353+
if unmarshalErr := pkt.UnmarshalBinary(buf[:n]); unmarshalErr == nil {
354+
a.So(pkt.PacketType, should.NotEqual, encoding.PullResp)
355+
}
356+
}
357+
// A deadline-exceeded error means nothing was written — the expected outcome.
358+
}
359+
205360
func TestFrontend(t *testing.T) {
206361
t.Parallel()
207362
iotest.Frontend(t, iotest.FrontendConfig{

pkg/gatewayserver/scheduling/scheduler.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -459,7 +459,7 @@ func (s *Scheduler) ScheduleAnytime(ctx context.Context, opts Options) (res Emis
459459
}
460460
return em.t
461461
}
462-
em, err = sb.ScheduleAnytime(em.d, next, opts.Priority)
462+
em, err = sb.ScheduleAnytime(em.d, next, opts.Priority, now)
463463
if err != nil {
464464
return Emission{}, 0, err
465465
}

pkg/gatewayserver/scheduling/scheduler_internal_test.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,9 @@
1515
package scheduling
1616

1717
var (
18-
ErrConflict = errConflict
19-
ErrDwellTime = errDwellTime
20-
ErrTooLate = errTooLate
21-
ErrDutyCycle = errDutyCycle
18+
ErrConflict = errConflict
19+
ErrDwellTime = errDwellTime
20+
ErrTooLate = errTooLate
21+
ErrDutyCycle = errDutyCycle
22+
ErrScheduleTooFarAhead = errScheduleTooFarAhead
2223
)

pkg/gatewayserver/scheduling/sub_band.go

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,11 @@ import (
2727
// A lower value results in balancing capacity in time, while a higher value allows for bursts.
2828
var DutyCycleWindow = 1 * time.Hour
2929

30+
// MaxScheduleAhead is the maximum duration into the future that ScheduleAnytime may place an emission.
31+
// Emissions beyond this are rejected to avoid scheduling downlinks that would never actually be transmitted
32+
// in practice.
33+
var MaxScheduleAhead = 2 * DutyCycleWindow
34+
3035
// DutyCycleStyle represents the of duty cycle algorithm to be used by a sub band.
3136
type DutyCycleStyle int
3237

@@ -144,6 +149,10 @@ var (
144149
"blocked",
145150
"sub band is blocked for `{duration}`",
146151
)
152+
errScheduleTooFarAhead = errors.DefineResourceExhausted(
153+
"schedule_too_far_ahead",
154+
"schedule time is too far in the future",
155+
)
147156
)
148157

149158
// checkDutyCycle verifies if the emission complies with the duty cycle limitations, based on the style.
@@ -211,7 +220,12 @@ func (sb *SubBand) Schedule(em Emission, p ttnpb.TxSchedulePriority) error {
211220
// ScheduleAnytime schedules the given duration at a time when there is availability by accounting for duty-cycle.
212221
// The given next callback should return the next option that does not conflict with other scheduled downlinks.
213222
// If there is no duty-cycle limitation, this method returns the first option.
214-
func (sb *SubBand) ScheduleAnytime(d time.Duration, next func() ConcentratorTime, p ttnpb.TxSchedulePriority) (Emission, error) {
223+
func (sb *SubBand) ScheduleAnytime(
224+
d time.Duration,
225+
next func() ConcentratorTime,
226+
p ttnpb.TxSchedulePriority,
227+
now ConcentratorTime,
228+
) (Emission, error) {
215229
sb.mu.Lock()
216230
defer sb.mu.Unlock()
217231
em := NewEmission(next(), d)
@@ -241,7 +255,13 @@ func (sb *SubBand) ScheduleAnytime(d time.Duration, next func() ConcentratorTime
241255
other := sb.emissions[i]
242256
used += float32(other.d) / float32(DutyCycleWindow)
243257
if used > usable {
244-
em.t = other.Ends() + ConcentratorTime(DutyCycleWindow) - ConcentratorTime(em.d)
258+
newT := other.Ends() + ConcentratorTime(DutyCycleWindow) - ConcentratorTime(em.d)
259+
// If the new time is too far in the future, return an error instead of scheduling an emission that would
260+
// never be transmitted in practice.
261+
if newT > now+ConcentratorTime(MaxScheduleAhead) {
262+
return Emission{}, errScheduleTooFarAhead.New()
263+
}
264+
em.t = newT
245265
break
246266
}
247267
}

pkg/gatewayserver/scheduling/sub_band_test.go

Lines changed: 56 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,8 @@ func TestSubBandScheduleRestricted(t *testing.T) {
172172
}
173173

174174
func TestScheduleAnytimeRestricted(t *testing.T) {
175+
t.Parallel()
176+
175177
a := assertions.New(t)
176178
params := scheduling.SubBandParameters{
177179
MinFrequency: 0,
@@ -204,7 +206,8 @@ func TestScheduleAnytimeRestricted(t *testing.T) {
204206
from += scheduling.ConcentratorTime(time.Second)
205207
return res
206208
}
207-
em, err := sb.ScheduleAnytime(time.Second, next, ttnpb.TxSchedulePriority_NORMAL)
209+
now := scheduling.ConcentratorTime(time.Now().UnixNano())
210+
em, err := sb.ScheduleAnytime(time.Second, next, ttnpb.TxSchedulePriority_NORMAL, now)
208211
a.So(err, should.BeNil)
209212
a.So(em.Starts(), should.Equal, 16*time.Second)
210213
// [ 1 2 4 3 ]
@@ -217,7 +220,8 @@ func TestScheduleAnytimeRestricted(t *testing.T) {
217220
next := func() scheduling.ConcentratorTime {
218221
return scheduling.ConcentratorTime(19 * time.Second)
219222
}
220-
em, err := sb.ScheduleAnytime(time.Second, next, ttnpb.TxSchedulePriority_NORMAL)
223+
now := scheduling.ConcentratorTime(time.Now().UnixNano())
224+
em, err := sb.ScheduleAnytime(time.Second, next, ttnpb.TxSchedulePriority_NORMAL, now)
221225
a.So(err, should.BeNil)
222226
a.So(em.Starts(), should.Equal, 26*time.Second)
223227
// [ 1 2 4 3 5]
@@ -229,11 +233,60 @@ func TestScheduleAnytimeRestricted(t *testing.T) {
229233
next := func() scheduling.ConcentratorTime {
230234
return scheduling.ConcentratorTime(19 * time.Second)
231235
}
232-
_, err := sb.ScheduleAnytime(5*time.Second, next, ttnpb.TxSchedulePriority_NORMAL)
236+
now := scheduling.ConcentratorTime(time.Now().UnixNano())
237+
_, err := sb.ScheduleAnytime(5*time.Second, next, ttnpb.TxSchedulePriority_NORMAL, now)
233238
a.So(err, should.HaveSameErrorDefinitionAs, scheduling.ErrDutyCycle)
234239
}
235240
}
236241

242+
func TestScheduleAnytimeTooFarAhead(t *testing.T) {
243+
t.Parallel()
244+
// In the test environment DutyCycleWindow = 10s and MaxScheduleAhead = 20s.
245+
a := assertions.New(t)
246+
params := scheduling.SubBandParameters{
247+
MinFrequency: 0,
248+
MaxFrequency: math.MaxUint64,
249+
DutyCycle: 0.5,
250+
}
251+
clock := &mockClock{}
252+
ceilings := map[ttnpb.TxSchedulePriority]float32{
253+
ttnpb.TxSchedulePriority_NORMAL: 0.5, // usable = 0.25
254+
ttnpb.TxSchedulePriority_HIGHEST: 1.0, // usable = 0.50
255+
}
256+
sb := scheduling.NewSubBand(params, clock, ceilings, scheduling.DefaultDutyCycleStyle)
257+
258+
// Schedule a 3-second emission at t=9s using HIGHEST priority.
259+
// It occupies 30% of the duty-cycle window (10s), within HIGHEST (50%) but above
260+
// NORMAL (25%). Its window [9s, 12s] falls within the checkDutyCycle range
261+
// [0, 10s] for an emission starting at t=0, causing that check to fail.
262+
err := sb.Schedule(
263+
scheduling.NewEmission(scheduling.ConcentratorTime(9*time.Second), 3*time.Second),
264+
ttnpb.TxSchedulePriority_HIGHEST,
265+
)
266+
a.So(err, should.BeNil)
267+
268+
// next() always returns the same ConcentratorTime, so ScheduleAnytime falls back
269+
// to the backwards scan. newT = 9s + 3s + 10s - 1s = 21s, which exceeds
270+
// now (0) + MaxScheduleAhead (20s).
271+
{
272+
next := func() scheduling.ConcentratorTime { return 0 }
273+
now := scheduling.ConcentratorTime(0)
274+
_, err := sb.ScheduleAnytime(time.Second, next, ttnpb.TxSchedulePriority_NORMAL, now)
275+
a.So(err, should.HaveSameErrorDefinitionAs, scheduling.ErrScheduleTooFarAhead)
276+
}
277+
278+
// With now shifted forward by 1 second, newT (21s) equals now + MaxScheduleAhead
279+
// (1s + 20s = 21s) exactly, so the check (strictly greater than) does not fire
280+
// and the emission is accepted.
281+
{
282+
next := func() scheduling.ConcentratorTime { return 0 }
283+
now := scheduling.ConcentratorTime(time.Second)
284+
em, err := sb.ScheduleAnytime(time.Second, next, ttnpb.TxSchedulePriority_NORMAL, now)
285+
a.So(err, should.BeNil)
286+
a.So(em.Starts(), should.Equal, 21*time.Second)
287+
}
288+
}
289+
237290
func TestBlockingScheduling(t *testing.T) {
238291
t.Parallel()
239292

pkg/gatewayserver/scheduling/util_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,4 +80,5 @@ func float32Ptr(v float32) *float32 { return &v }
8080

8181
func init() {
8282
scheduling.DutyCycleWindow = 10 * time.Second
83+
scheduling.MaxScheduleAhead = 2 * scheduling.DutyCycleWindow
8384
}

pkg/webui/locales/ja.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2520,6 +2520,7 @@
25202520
"error:pkg/gatewayserver/scheduling:no_absolute_gateway_time": "アブソリュートゲートウェイタイムがありません",
25212521
"error:pkg/gatewayserver/scheduling:no_clock_sync": "クロックが未同期",
25222522
"error:pkg/gatewayserver/scheduling:no_server_time": "サーバの時間がありません",
2523+
"error:pkg/gatewayserver/scheduling:schedule_too_far_ahead": "",
25232524
"error:pkg/gatewayserver/scheduling:sub_band_not_found": "周波数 `{frequency}` Hz に対するサブ帯域が見つかりません",
25242525
"error:pkg/gatewayserver/scheduling:too_late": "予定された時間(`{delta}`)で送信するには遅すぎます",
25252526
"error:pkg/gatewayserver/upstream/ns:network_server_not_found": "ネットワークサーバが見つかりません",

0 commit comments

Comments
 (0)