Skip to content

Commit b397bc6

Browse files
authored
[core] New moving average value for sent/recv rates over last second (#3009)
* [core] Average values for sent/recv bitrate are now over last second. * [core] Adding packet header size in average measure * [core] Removing inheritance and fixing issue in rate compute
1 parent 75f56d9 commit b397bc6

4 files changed

Lines changed: 166 additions & 5 deletions

File tree

srtcore/buffer_tools.cpp

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -273,5 +273,65 @@ int CSndRateEstimator::incSampleIdx(int val, int inc) const
273273
return val;
274274
}
275275

276+
CMovingRateEstimator::CMovingRateEstimator()
277+
: m_tsFirstSampleTime(sync::steady_clock::now())
278+
, m_iCurSampleIdx(0)
279+
, m_iRateBps(0)
280+
, m_Samples(NUM_PERIODS)
281+
{
282+
resetRate(0, NUM_PERIODS);
276283
}
277284

285+
void CMovingRateEstimator::addSample(int pkts, double bytes)
286+
{
287+
const time_point now = steady_clock::now();
288+
const int iSampleDeltaIdx = int(count_milliseconds(now - m_tsLastSlotTimestamp) / SAMPLE_DURATION_MS);
289+
290+
if (iSampleDeltaIdx == 0)
291+
{
292+
m_Samples[m_iCurSampleIdx].m_iBytesCount += bytes;
293+
m_Samples[m_iCurSampleIdx].m_iPktsCount += pkts;
294+
}
295+
else
296+
{
297+
if ((m_iCurSampleIdx + iSampleDeltaIdx) < NUM_PERIODS)
298+
resetRate(m_iCurSampleIdx + 1, m_iCurSampleIdx + iSampleDeltaIdx);
299+
else
300+
{
301+
int loopbackDiff = m_iCurSampleIdx + iSampleDeltaIdx - NUM_PERIODS;
302+
resetRate(m_iCurSampleIdx + 1, NUM_PERIODS);
303+
resetRate(0, loopbackDiff);
304+
}
305+
306+
m_iCurSampleIdx = ((m_iCurSampleIdx + iSampleDeltaIdx) % NUM_PERIODS);
307+
m_Samples[m_iCurSampleIdx].m_iBytesCount = bytes;
308+
m_Samples[m_iCurSampleIdx].m_iPktsCount = pkts;
309+
310+
m_tsLastSlotTimestamp += milliseconds_from(SAMPLE_DURATION_MS * iSampleDeltaIdx);
311+
312+
computeAverageValue();
313+
}
314+
}
315+
316+
void CMovingRateEstimator::resetRate(int from, int to)
317+
{
318+
for (int i = max(0, from); i < min(int(NUM_PERIODS), to); i++)
319+
m_Samples[i].reset();
320+
}
321+
322+
void CMovingRateEstimator::computeAverageValue()
323+
{
324+
const time_point now = steady_clock::now();
325+
const int startDelta = count_milliseconds(now - m_tsFirstSampleTime);
326+
const bool isFirstPeriod = startDelta < (SAMPLE_DURATION_MS * NUM_PERIODS);
327+
int newRateBps = 0;
328+
329+
for (int i = 0; i < NUM_PERIODS; i++)
330+
newRateBps += (m_Samples[i].m_iBytesCount + (CPacket::HDR_SIZE * m_Samples[i].m_iPktsCount));
331+
332+
if (isFirstPeriod)
333+
newRateBps = newRateBps * SAMPLE_DURATION_MS * NUM_PERIODS / max(1, startDelta);
334+
335+
m_iRateBps = newRateBps;
336+
}
337+
} // namespace srt

srtcore/buffer_tools.h

Lines changed: 79 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -192,10 +192,87 @@ class CSndRateEstimator
192192

193193
Sample m_Samples[NUM_PERIODS];
194194

195-
time_point m_tsFirstSampleTime; //< Start time of the first sample.
195+
time_point m_tsFirstSampleTime; //< Start time of the first sameple.
196196
int m_iFirstSampleIdx; //< Index of the first sample.
197197
int m_iCurSampleIdx; //< Index of the current sample being collected.
198-
int m_iRateBps; //< Rate in Bytes/sec.
198+
int m_iRateBps; // Input Rate in Bytes/sec
199+
};
200+
201+
class CMovingRateEstimator
202+
{
203+
typedef sync::steady_clock::time_point time_point;
204+
205+
public:
206+
CMovingRateEstimator();
207+
208+
/// Add sample.
209+
/// @param [in] pkts number of packets in the sample.
210+
/// @param [in] bytes number of payload bytes in the sample.
211+
void addSample(int pkts = 0, double bytes = 0);
212+
213+
/// Clean the mobile measures table to reset average value.
214+
void resetRate() { resetRate(0, NUM_PERIODS); };
215+
216+
/// Retrieve estimated bitrate in bytes per second with 16-byte packet header.
217+
int getRate() const { return m_iRateBps; }
218+
219+
private:
220+
// We would like responsiveness (accuracy) of rate estimation higher than 100 ms
221+
// (ideally around 50 ms) for network adaptive algorithms.
222+
static const int NUM_PERIODS = 100; // To get 1s of values
223+
static const int SAMPLE_DURATION_MS = 10; // 10 ms
224+
time_point m_tsFirstSampleTime; //< Start time of the first sample.
225+
time_point m_tsLastSlotTimestamp; // Used to compute the delta between 2 calls
226+
int m_iCurSampleIdx; //< Index of the current sample being collected.
227+
int m_iRateBps; //< Rate in Bytes/sec.
228+
229+
struct Sample
230+
{
231+
int m_iPktsCount; // number of payload packets
232+
int m_iBytesCount; // number of payload bytes
233+
234+
void reset()
235+
{
236+
m_iPktsCount = 0;
237+
m_iBytesCount = 0;
238+
}
239+
240+
Sample()
241+
: m_iPktsCount(0)
242+
, m_iBytesCount(0)
243+
{
244+
}
245+
246+
Sample(int iPkts, int iBytes)
247+
: m_iPktsCount(iPkts)
248+
, m_iBytesCount(iBytes)
249+
{
250+
}
251+
252+
Sample operator+(const Sample& other)
253+
{
254+
return Sample(m_iPktsCount + other.m_iPktsCount, m_iBytesCount + other.m_iBytesCount);
255+
}
256+
257+
Sample& operator+=(const Sample& other)
258+
{
259+
*this = *this + other;
260+
return *this;
261+
}
262+
263+
bool empty() const { return m_iPktsCount == 0; }
264+
};
265+
266+
srt::FixedArray<Sample> m_Samples; // Table of stored data
267+
268+
/// This method will compute the average value based on all table's measures and the period
269+
/// (NUM_PERIODS*SAMPLE_DURATION_MS)
270+
void computeAverageValue();
271+
272+
/// Reset a part of the stored measures
273+
/// @param from The beginning where the reset have to be applied
274+
/// @param to The last data that have to be reset
275+
void resetRate(int from, int to);
199276
};
200277

201278
} // namespace srt

srtcore/core.cpp

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7665,10 +7665,15 @@ void srt::CUDT::bstats(CBytePerfMon *perf, bool clear, bool instantaneous)
76657665
perf->pktRcvUndecryptTotal = m_stats.rcvr.undecrypted.total.count();
76667666
perf->byteRcvUndecryptTotal = m_stats.rcvr.undecrypted.total.bytes();
76677667

7668+
7669+
// Average values management
7670+
// We are updating rate with 0 Byte 0 packet to ensure an up to date compute in case we are not sending packet for a while.
7671+
m_stats.sndr.updateRate(0, 0);
7672+
m_stats.rcvr.updateRate(0, 0);
7673+
perf->mbpsSendRate = Bps2Mbps(m_stats.sndr.getAverageValue());
7674+
perf->mbpsRecvRate = Bps2Mbps(m_stats.rcvr.getAverageValue());
7675+
76687676
// TODO: The following class members must be protected with a different mutex, not the m_StatsLock.
7669-
const double interval = (double) count_microseconds(currtime - m_stats.tsLastSampleTime);
7670-
perf->mbpsSendRate = double(perf->byteSent) * 8.0 / interval;
7671-
perf->mbpsRecvRate = double(perf->byteRecv) * 8.0 / interval;
76727677
perf->usPktSndPeriod = (double) count_microseconds(m_tdSendInterval.load());
76737678
perf->pktFlowWindow = m_iFlowWindowSize.load();
76747679
perf->pktCongestionWindow = m_iCongestionWindow;
@@ -9836,6 +9841,7 @@ bool srt::CUDT::packData(CPacket& w_packet, steady_clock::time_point& w_nexttime
98369841
m_stats.sndr.sent.count(payload);
98379842
if (new_packet_packed)
98389843
m_stats.sndr.sentUnique.count(payload);
9844+
m_stats.sndr.updateRate(1, payload);
98399845
leaveCS(m_StatsLock);
98409846

98419847
const duration sendint = m_tdSendInterval;
@@ -10520,6 +10526,7 @@ int srt::CUDT::processData(CUnit* in_unit)
1052010526

1052110527
enterCS(m_StatsLock);
1052210528
m_stats.rcvr.recvd.count(pktsz);
10529+
m_stats.rcvr.updateRate(1, pktsz);
1052310530
leaveCS(m_StatsLock);
1052410531

1052510532
loss_seqs_t filter_loss_seqs;

srtcore/stats.h

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
#include "platform_sys.h"
1515
#include "packet.h"
16+
#include "buffer_tools.h"
1617

1718
namespace srt
1819
{
@@ -144,6 +145,8 @@ struct Sender
144145
Metric<Packets> recvdAck; // The number of ACK packets received by the sender.
145146
Metric<Packets> recvdNak; // The number of ACK packets received by the sender.
146147

148+
CMovingRateEstimator mavgRateEstimator; // The average Mbps over last second
149+
147150
void reset()
148151
{
149152
sent.reset();
@@ -167,6 +170,12 @@ struct Sender
167170
recvdNak.resetTrace();
168171
sentFilterExtra.resetTrace();
169172
}
173+
174+
void updateRate(int pkts, double bytes) { mavgRateEstimator.addSample(pkts, bytes); }
175+
176+
void resetRate() { mavgRateEstimator.resetRate(); }
177+
178+
int getAverageValue() { return mavgRateEstimator.getRate(); }
170179
};
171180

172181
/// Receiver-side statistics.
@@ -187,6 +196,8 @@ struct Receiver
187196
Metric<Packets> sentAck; // The number of ACK packets sent by the receiver.
188197
Metric<Packets> sentNak; // The number of NACK packets sent by the receiver.
189198

199+
CMovingRateEstimator mavgRateEstimator; // The average Mbps over last second
200+
190201
void reset()
191202
{
192203
recvd.reset();
@@ -218,6 +229,12 @@ struct Receiver
218229
sentAck.resetTrace();
219230
sentNak.resetTrace();
220231
}
232+
233+
void updateRate(int pkts, double bytes) { mavgRateEstimator.addSample(pkts, bytes); }
234+
235+
void resetRate() { mavgRateEstimator.resetRate(); }
236+
237+
int getAverageValue() { return mavgRateEstimator.getRate(); }
221238
};
222239

223240
} // namespace stats

0 commit comments

Comments
 (0)