Skip to content

Commit 796cec1

Browse files
committed
speed up net pumps
1 parent b29521d commit 796cec1

3 files changed

Lines changed: 42 additions & 51 deletions

File tree

src/modules/net/ZeroHttpPump.h

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ class ZeroHttpPump {
6464
};
6565

6666
static const uint8_t kQueueCapacity = ZEROKERNEL_HTTP_PUMP_QUEUE_CAPACITY;
67+
static const uint8_t kImmediatePhaseBudget = 4;
6768

6869
ZeroHttpPump()
6970
: kernel_(NULL),
@@ -164,29 +165,34 @@ class ZeroHttpPump {
164165
hasTicked_ = true;
165166
lastTickAtMs_ = nowMs;
166167

167-
if (!active_) {
168-
if (queueCount_ == 0) {
169-
return;
168+
for (uint8_t stepBudget = 0; stepBudget < kImmediatePhaseBudget; ++stepBudget) {
169+
if (!active_) {
170+
if (queueCount_ == 0) {
171+
return;
172+
}
173+
174+
startNextRequest_(nowMs);
170175
}
171176

172-
startNextRequest_(nowMs);
173-
}
177+
if (phase_ == kPhaseConnecting && nextRetryAtMs_ != 0 && nowMs < nextRetryAtMs_) {
178+
return;
179+
}
174180

175-
if (phase_ == kPhaseConnecting && nextRetryAtMs_ != 0 && nowMs < nextRetryAtMs_) {
176-
return;
177-
}
181+
const StepResult result = stepCurrentPhase_();
182+
if (result == kStepPending) {
183+
return;
184+
}
178185

179-
const StepResult result = stepCurrentPhase_();
180-
if (result == kStepPending) {
181-
return;
182-
}
186+
if (result == kStepFailed) {
187+
handlePhaseFailure_(nowMs);
188+
return;
189+
}
183190

184-
if (result == kStepFailed) {
185-
handlePhaseFailure_(nowMs);
186-
return;
191+
handlePhaseSuccess_(nowMs);
192+
if (!active_) {
193+
return;
194+
}
187195
}
188-
189-
handlePhaseSuccess_(nowMs);
190196
}
191197

192198
bool isBusy() const {

src/modules/net/ZeroMqttPump.h

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ class ZeroMqttPump {
4949
};
5050

5151
static const uint8_t kQueueCapacity = ZEROKERNEL_MQTT_PUMP_QUEUE_CAPACITY;
52+
static const uint8_t kPublishBurstPerTick = 2;
5253

5354
ZeroMqttPump()
5455
: kernel_(NULL),
@@ -182,21 +183,24 @@ class ZeroMqttPump {
182183
}
183184

184185
pendingRetryAtMs_ = 0;
185-
metrics_.recordSendAttempt();
186-
if (publishStep_(queue_[0].topicKey, queue_[0].value, context_)) {
187-
metrics_.recordSendResult(true, 0, nowMs - queue_[0].queuedAtMs);
188-
popFront_();
189-
return;
190-
}
186+
for (uint8_t burst = 0; burst < kPublishBurstPerTick && queueCount_ > 0; ++burst) {
187+
metrics_.recordSendAttempt();
188+
if (publishStep_(queue_[0].topicKey, queue_[0].value, context_)) {
189+
metrics_.recordSendResult(true, 0, nowMs - queue_[0].queuedAtMs);
190+
popFront_();
191+
continue;
192+
}
191193

192-
metrics_.recordSendResult(false, 0, nowMs - queue_[0].queuedAtMs);
193-
if (queue_[0].retriesRemaining == 0) {
194-
popFront_();
194+
metrics_.recordSendResult(false, 0, nowMs - queue_[0].queuedAtMs);
195+
if (queue_[0].retriesRemaining == 0) {
196+
popFront_();
197+
return;
198+
}
199+
200+
--queue_[0].retriesRemaining;
201+
scheduleBackoff_(nowMs);
195202
return;
196203
}
197-
198-
--queue_[0].retriesRemaining;
199-
scheduleBackoff_(nowMs);
200204
}
201205

202206
bool isConnected() const {

tests/desktop/KernelTests.cpp

Lines changed: 3 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -940,20 +940,10 @@ int testHttpPumpModule() {
940940
g_fakeNowMs = 2;
941941
isolatedKernel.tick();
942942
pump.tick();
943-
expectTrue(pump.phase() == zerokernel::modules::net::ZeroHttpPump::kPhaseWriting,
944-
"http pump advances to write phase");
945-
946-
g_fakeNowMs = 3;
947-
isolatedKernel.tick();
948-
pump.tick();
949-
expectTrue(pump.phase() == zerokernel::modules::net::ZeroHttpPump::kPhaseReading,
950-
"http pump advances to read phase");
951-
952-
g_fakeNowMs = 4;
953-
isolatedKernel.tick();
954-
pump.tick();
955943
expectTrue(pump.phase() == zerokernel::modules::net::ZeroHttpPump::kPhaseConnecting,
956-
"http pump retries after read failure");
944+
"http pump retries after collapsed phase failure");
945+
expectTrue(g_httpWriteCalls == 1, "http pump collapses write phase in same tick");
946+
expectTrue(g_httpReadCalls == 1, "http pump collapses read phase in same tick");
957947

958948
g_fakeNowMs = 10;
959949
isolatedKernel.tick();
@@ -963,15 +953,6 @@ int testHttpPumpModule() {
963953
g_fakeNowMs = 24;
964954
isolatedKernel.tick();
965955
pump.tick();
966-
g_fakeNowMs = 25;
967-
isolatedKernel.tick();
968-
pump.tick();
969-
g_fakeNowMs = 26;
970-
isolatedKernel.tick();
971-
pump.tick();
972-
g_fakeNowMs = 27;
973-
isolatedKernel.tick();
974-
pump.tick();
975956

976957
const zerokernel::modules::net::ZeroTransportMetrics::Snapshot httpMetrics =
977958
pump.metrics().snapshot();

0 commit comments

Comments
 (0)