Skip to content

Commit 389d4df

Browse files
committed
Add support for at-most-once message delivery
By default, Homa provides at-least-once message delivery. Add an option for applications to request at-most-once message delivery.
1 parent 040b57c commit 389d4df

4 files changed

Lines changed: 103 additions & 12 deletions

File tree

include/Homa/Homa.h

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,16 @@ class OutMessage {
132132
FAILED, //< The message failed to be delivered and processed.
133133
};
134134

135+
/**
136+
* Options with which an OutMessage can be sent.
137+
*/
138+
enum Options {
139+
NONE = 0, //< Default send behavior.
140+
NO_RETRY = 1 << 0, //< Message will not be resent if recoverable send
141+
//< failure occurs; provides at-most-once delivery
142+
//< of messages.
143+
};
144+
135145
/**
136146
* Custom deleter for use with std::unique_ptr.
137147
*/
@@ -206,8 +216,11 @@ class OutMessage {
206216
*
207217
* @param destination
208218
* Address of the transport to which this message will be sent.
219+
* @param options
220+
* Flags to request non-default sending behavior.
209221
*/
210-
virtual void send(Driver::Address destination) = 0;
222+
virtual void send(Driver::Address destination,
223+
Options options = Options::NONE) = 0;
211224

212225
protected:
213226
/**

src/Sender.cc

Lines changed: 34 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -332,8 +332,32 @@ Sender::handleUnknownPacket(Driver::Packet* packet, Driver* driver)
332332
}
333333

334334
OutMessage::Status status = message->getStatus();
335-
if (status == OutMessage::Status::IN_PROGRESS ||
336-
status == OutMessage::Status::SENT) {
335+
assert(status != OutMessage::Status::NOT_STARTED);
336+
if (status != OutMessage::Status::IN_PROGRESS &&
337+
status != OutMessage::Status::SENT) {
338+
// The message is already considered "done" so the UNKNOWN packet
339+
// must be a stale response to a ping.
340+
} else if (message->options & OutMessage::Options::NO_RETRY) {
341+
// Option: NO_RETRY
342+
343+
// Either the Message or the DONE packet was lost; consider the message
344+
// failed since the application asked for the message not to be retried.
345+
346+
// Remove Message from sendQueue.
347+
if (message->numPackets > 1) {
348+
SpinLock::Lock lock_queue(queueMutex);
349+
QueuedMessageInfo* info = &message->queuedMessageInfo;
350+
if (message->state == OutMessage::Status::IN_PROGRESS) {
351+
assert(sendQueue.contains(&info->sendQueueNode));
352+
sendQueue.remove(&info->sendQueueNode);
353+
}
354+
assert(!sendQueue.contains(&info->sendQueueNode));
355+
}
356+
357+
bucket->messageTimeouts.cancelTimeout(&message->messageTimeout);
358+
bucket->pingTimeouts.cancelTimeout(&message->pingTimeout);
359+
message->state.store(OutMessage::Status::FAILED);
360+
} else {
337361
// Message isn't done yet so we will restart sending the message.
338362

339363
// Make sure the message is not in the sendQueue before making any
@@ -405,9 +429,6 @@ Sender::handleUnknownPacket(Driver::Packet* packet, Driver* driver)
405429
QueuedMessageInfo::ComparePriority());
406430
sendReady.store(true);
407431
}
408-
} else {
409-
// The message is already considered "done" so the UNKNOWN packet
410-
// must be a stale response to a ping.
411432
}
412433

413434
driver->releasePackets(&packet, 1);
@@ -676,9 +697,10 @@ Sender::Message::reserve(size_t count)
676697
* @copydoc Homa::OutMessage::send()
677698
*/
678699
void
679-
Sender::Message::send(Driver::Address destination)
700+
Sender::Message::send(Driver::Address destination,
701+
Sender::Message::Options options)
680702
{
681-
sender->sendMessage(this, destination);
703+
sender->sendMessage(this, destination, options);
682704
}
683705

684706
/**
@@ -730,11 +752,14 @@ Sender::Message::getOrAllocPacket(size_t index)
730752
* Sender::Message to be sent.
731753
* @param destination
732754
* Destination address for this message.
755+
* @param options
756+
* Flags indicating requested non-default send behavior.
733757
*
734758
* @sa dropMessage()
735759
*/
736760
void
737-
Sender::sendMessage(Sender::Message* message, Driver::Address destination)
761+
Sender::sendMessage(Sender::Message* message, Driver::Address destination,
762+
Sender::Message::Options options)
738763
{
739764
// Prepare the message
740765
assert(message->driver == driver);
@@ -749,6 +774,7 @@ Sender::sendMessage(Sender::Message* message, Driver::Address destination)
749774

750775
message->id = id;
751776
message->destination = destination;
777+
message->options = options;
752778
message->state.store(OutMessage::Status::IN_PROGRESS);
753779

754780
int actualMessageLen = 0;

src/Sender.h

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@ class Sender {
139139
TRANSPORT_HEADER_LENGTH)
140140
, id(0, 0)
141141
, destination()
142+
, options(Options::NONE)
142143
, start(0)
143144
, messageLength(0)
144145
, numPackets(0)
@@ -160,7 +161,8 @@ class Sender {
160161
virtual void prepend(const void* source, size_t count);
161162
virtual void release();
162163
virtual void reserve(size_t count);
163-
virtual void send(Driver::Address destination);
164+
virtual void send(Driver::Address destination,
165+
Options options = Options::NONE);
164166

165167
private:
166168
/// Define the maximum number of packets that a message can hold.
@@ -189,6 +191,9 @@ class Sender {
189191
/// Contains destination address this message.
190192
Driver::Address destination;
191193

194+
/// Contains flags for any requested optional send behavior.
195+
Options options;
196+
192197
/// First byte where data is or will go if empty.
193198
int start;
194199

@@ -379,7 +384,8 @@ class Sender {
379384
Protocol::MessageId::Hasher hasher;
380385
};
381386

382-
void sendMessage(Sender::Message* message, Driver::Address destination);
387+
void sendMessage(Sender::Message* message, Driver::Address destination,
388+
Message::Options options = Message::Options::NONE);
383389
void cancelMessage(Sender::Message* message);
384390
void dropMessage(Sender::Message* message);
385391
uint64_t checkMessageTimeouts();

src/SenderTest.cc

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -751,6 +751,50 @@ TEST_F(SenderTest, handleUnknownPacket_singlePacketMessage)
751751
EXPECT_FALSE(sender->sendReady.load());
752752
}
753753

754+
TEST_F(SenderTest, handleUnknownPacket_NO_RETRY)
755+
{
756+
Protocol::MessageId id = {42, 1};
757+
Driver::Address destination = (Driver::Address)22;
758+
759+
Sender::MessageBucket* bucket = sender->messageBuckets.getBucket(id);
760+
Sender::Message* message =
761+
dynamic_cast<Sender::Message*>(sender->allocMessage());
762+
message->options = OutMessage::Options::NO_RETRY;
763+
std::vector<Homa::Mock::MockDriver::MockPacket*> packets;
764+
char payload[5][1028];
765+
for (int i = 0; i < 5; ++i) {
766+
Homa::Mock::MockDriver::MockPacket* packet =
767+
new Homa::Mock::MockDriver::MockPacket(payload[i]);
768+
packets.push_back(packet);
769+
setMessagePacket(message, i, packet);
770+
}
771+
message->destination = destination;
772+
message->messageLength = 4500;
773+
message->state.store(Homa::OutMessage::Status::IN_PROGRESS);
774+
SenderTest::addMessage(sender, id, message, true);
775+
bucket->messageTimeouts.setTimeout(&message->messageTimeout);
776+
bucket->pingTimeouts.setTimeout(&message->pingTimeout);
777+
EXPECT_TRUE(
778+
sender->sendQueue.contains(&message->queuedMessageInfo.sendQueueNode));
779+
780+
Protocol::Packet::UnknownHeader* header =
781+
static_cast<Protocol::Packet::UnknownHeader*>(mockPacket.payload);
782+
header->common.messageId = id;
783+
784+
EXPECT_CALL(mockDriver, releasePackets(Pointee(&mockPacket), Eq(1)))
785+
.Times(1);
786+
787+
sender->handleUnknownPacket(&mockPacket, &mockDriver);
788+
789+
EXPECT_FALSE(
790+
sender->sendQueue.contains(&message->queuedMessageInfo.sendQueueNode));
791+
EXPECT_EQ(nullptr, message->messageTimeout.node.list);
792+
EXPECT_EQ(nullptr, message->pingTimeout.node.list);
793+
EXPECT_EQ(Homa::OutMessage::Status::FAILED, message->state);
794+
EXPECT_EQ(Homa::OutMessage::Status::FAILED, message->state);
795+
EXPECT_FALSE(sender->sendReady.load());
796+
}
797+
754798
TEST_F(SenderTest, handleUnknownPacket_no_message)
755799
{
756800
Protocol::MessageId id = {42, 1};
@@ -1303,11 +1347,13 @@ TEST_F(SenderTest, sendMessage_basic)
13031347
.WillOnce(Return(policy));
13041348
EXPECT_CALL(mockDriver, sendPacket(Eq(&mockPacket))).Times(1);
13051349

1306-
sender->sendMessage(message, destination);
1350+
sender->sendMessage(message, destination,
1351+
Sender::Message::Options::NO_RETRY);
13071352

13081353
// Check Message metadata
13091354
EXPECT_EQ(id, message->id);
13101355
EXPECT_EQ(destination, message->destination);
1356+
EXPECT_EQ(Sender::Message::Options::NO_RETRY, message->options);
13111357

13121358
// Check packet metadata
13131359
Protocol::Packet::DataHeader* header =

0 commit comments

Comments
 (0)