Skip to content

Commit 8d10fa2

Browse files
committed
transport(ptp): move from protobuf to fixed-size c-struct
This commit moves the abstraction of a PTP message from a protobuf object to a fixed-size C-struct with a heap pointer. The rationale is that these PTP messages move through the system, and even when careful it is challenging to keep track of the copies allocations that protobuf is doing under the hood. In exchange, we are very explicity about the copies and allocations we do.
1 parent 7483943 commit 8d10fa2

15 files changed

+502
-239
lines changed

include/faabric/transport/PointToPointBroker.h

+6-15
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
#include <faabric/batch-scheduler/SchedulingDecision.h>
44
#include <faabric/transport/PointToPointClient.h>
5+
#include <faabric/transport/PointToPointMessage.h>
56
#include <faabric/util/config.h>
67
#include <faabric/util/locks.h>
78

@@ -120,27 +121,16 @@ class PointToPointBroker
120121

121122
void updateHostForIdx(int groupId, int groupIdx, std::string newHost);
122123

123-
void sendMessage(int groupId,
124-
int sendIdx,
125-
int recvIdx,
126-
const uint8_t* buffer,
127-
size_t bufferSize,
124+
void sendMessage(const PointToPointMessage& msg,
128125
std::string hostHint,
129126
bool mustOrderMsg = false);
130127

131-
void sendMessage(int groupId,
132-
int sendIdx,
133-
int recvIdx,
134-
const uint8_t* buffer,
135-
size_t bufferSize,
128+
void sendMessage(const PointToPointMessage& msg,
136129
bool mustOrderMsg = false,
137130
int sequenceNum = NO_SEQUENCE_NUM,
138131
std::string hostHint = "");
139132

140-
std::vector<uint8_t> recvMessage(int groupId,
141-
int sendIdx,
142-
int recvIdx,
143-
bool mustOrderMsg = false);
133+
void recvMessage(PointToPointMessage& msg, bool mustOrderMsg = false);
144134

145135
void clearGroup(int groupId);
146136

@@ -163,7 +153,8 @@ class PointToPointBroker
163153

164154
std::shared_ptr<faabric::util::FlagWaiter> getGroupFlag(int groupId);
165155

166-
Message doRecvMessage(int groupId, int sendIdx, int recvIdx);
156+
// Returns the message response code and the sequence number
157+
std::pair<MessageResponseCode, int> doRecvMessage(PointToPointMessage& msg);
167158

168159
void initSequenceCounters(int groupId);
169160

include/faabric/transport/PointToPointClient.h

+4-3
Original file line numberDiff line numberDiff line change
@@ -3,18 +3,19 @@
33
#include <faabric/proto/faabric.pb.h>
44
#include <faabric/transport/MessageEndpointClient.h>
55
#include <faabric/transport/PointToPointCall.h>
6+
#include <faabric/transport/PointToPointMessage.h>
67

78
namespace faabric::transport {
89

910
std::vector<std::pair<std::string, faabric::PointToPointMappings>>
1011
getSentMappings();
1112

12-
std::vector<std::pair<std::string, faabric::PointToPointMessage>>
13+
std::vector<std::pair<std::string, PointToPointMessage>>
1314
getSentPointToPointMessages();
1415

1516
std::vector<std::tuple<std::string,
1617
faabric::transport::PointToPointCall,
17-
faabric::PointToPointMessage>>
18+
PointToPointMessage>>
1819
getSentLockMessages();
1920

2021
void clearSentMessages();
@@ -26,7 +27,7 @@ class PointToPointClient : public faabric::transport::MessageEndpointClient
2627

2728
void sendMappings(faabric::PointToPointMappings& mappings);
2829

29-
void sendMessage(faabric::PointToPointMessage& msg,
30+
void sendMessage(const PointToPointMessage& msg,
3031
int sequenceNum = NO_SEQUENCE_NUM);
3132

3233
void groupLock(int appId,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
#pragma once
2+
3+
#include <cstdint>
4+
#include <span>
5+
6+
namespace faabric::transport {
7+
8+
/* Simple fixed-size C-struct to capture the state of a PTP message moving
9+
* through Faabric.
10+
*
11+
* We require fixed-size, and no unique pointers to be able to use
12+
* high-throughput ring-buffers to send the messages around. This also means
13+
* that we manually malloc/free the data pointer. The message size is:
14+
* 4 * int32_t = 4 * 4 bytes = 16 bytes
15+
* 1 * size_t = 1 * 8 bytes = 8 bytes
16+
* 1 * void* = 1 * 8 bytes = 8 bytes
17+
* total = 32 bytes = 4 * 8 so the struct is naturally 8 byte-aligned
18+
*/
19+
struct PointToPointMessage
20+
{
21+
int32_t appId;
22+
int32_t groupId;
23+
int32_t sendIdx;
24+
int32_t recvIdx;
25+
size_t dataSize;
26+
void* dataPtr;
27+
};
28+
static_assert((sizeof(PointToPointMessage) % 8) == 0,
29+
"PTP message mus be 8-aligned!");
30+
31+
// The wire format for a PTP message is very simple: the fixed-size struct,
32+
// followed by dataSize bytes containing the payload.
33+
void serializePtpMsg(std::span<uint8_t> buffer, const PointToPointMessage& msg);
34+
35+
// This parsing function mallocs space for the message payload. This is to
36+
// keep the PTP message at fixed-size, and be able to efficiently move it
37+
// around in-memory queues
38+
void parsePtpMsg(std::span<const uint8_t> bytes, PointToPointMessage* msg);
39+
40+
// Alternative signature for parsing PTP messages for when the caller can
41+
// provide an already-allocated buffer to write into
42+
void parsePtpMessage(std::span<const uint8_t> bytes,
43+
PointToPointMessage* msg,
44+
std::span<uint8_t> preAllocBuffer);
45+
}

src/mpi/MpiWorld.cpp

+26-13
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
#include <faabric/mpi/MpiWorld.h>
33
#include <faabric/mpi/mpi.pb.h>
44
#include <faabric/planner/PlannerClient.h>
5+
#include <faabric/transport/PointToPointMessage.h>
56
#include <faabric/transport/macros.h>
67
#include <faabric/util/ExecGraph.h>
78
#include <faabric/util/batch.h>
@@ -60,14 +61,16 @@ void MpiWorld::sendRemoteMpiMessage(std::string dstHost,
6061
throw std::runtime_error("Error serialising message");
6162
}
6263
try {
63-
broker.sendMessage(
64-
thisRankMsg->groupid(),
65-
sendRank,
66-
recvRank,
67-
reinterpret_cast<const uint8_t*>(serialisedBuffer.data()),
68-
serialisedBuffer.size(),
69-
dstHost,
70-
true);
64+
// It is safe to send a pointer to a stack-allocated object
65+
// because the broker will make an additional copy (and so will NNG!)
66+
faabric::transport::PointToPointMessage msg(
67+
{ .groupId = thisRankMsg->groupid(),
68+
.sendIdx = sendRank,
69+
.recvIdx = recvRank,
70+
.dataSize = serialisedBuffer.size(),
71+
.dataPtr = (void*)serialisedBuffer.data() });
72+
73+
broker.sendMessage(msg, dstHost, true);
7174
} catch (std::runtime_error& e) {
7275
SPDLOG_ERROR("{}:{}:{} Timed out with: MPI - send {} -> {}",
7376
thisRankMsg->appid(),
@@ -82,10 +85,12 @@ void MpiWorld::sendRemoteMpiMessage(std::string dstHost,
8285
std::shared_ptr<MPIMessage> MpiWorld::recvRemoteMpiMessage(int sendRank,
8386
int recvRank)
8487
{
85-
std::vector<uint8_t> msg;
88+
faabric::transport::PointToPointMessage msg(
89+
{ .groupId = thisRankMsg->groupid(),
90+
.sendIdx = sendRank,
91+
.recvIdx = recvRank });
8692
try {
87-
msg =
88-
broker.recvMessage(thisRankMsg->groupid(), sendRank, recvRank, true);
93+
broker.recvMessage(msg, true);
8994
} catch (std::runtime_error& e) {
9095
SPDLOG_ERROR("{}:{}:{} Timed out with: MPI - recv (remote) {} -> {}",
9196
thisRankMsg->appid(),
@@ -95,7 +100,12 @@ std::shared_ptr<MPIMessage> MpiWorld::recvRemoteMpiMessage(int sendRank,
95100
recvRank);
96101
throw e;
97102
}
98-
PARSE_MSG(MPIMessage, msg.data(), msg.size());
103+
104+
// Parsing into the protobuf makes a copy of the message, so we can
105+
// free the heap pointer after
106+
PARSE_MSG(MPIMessage, msg.dataPtr, msg.dataSize);
107+
faabric::util::free(msg.dataPtr);
108+
99109
return std::make_shared<MPIMessage>(parsedMsg);
100110
}
101111

@@ -599,7 +609,10 @@ void MpiWorld::doRecv(std::shared_ptr<MPIMessage>& m,
599609
// Assert message integrity
600610
// Note - this checks won't happen in Release builds
601611
if (m->messagetype() != messageType) {
602-
SPDLOG_ERROR("Different message types (got: {}, expected: {})",
612+
SPDLOG_ERROR("{}:{}:{} Different message types (got: {}, expected: {})",
613+
m->worldid(),
614+
m->sender(),
615+
m->destination(),
603616
m->messagetype(),
604617
messageType);
605618
}

src/proto/faabric.proto

-8
Original file line numberDiff line numberDiff line change
@@ -199,14 +199,6 @@ message StateAppendedResponse {
199199
// POINT-TO-POINT
200200
// ---------------------------------------------
201201

202-
message PointToPointMessage {
203-
int32 appId = 1;
204-
int32 groupId = 2;
205-
int32 sendIdx = 3;
206-
int32 recvIdx = 4;
207-
bytes data = 5;
208-
}
209-
210202
message PointToPointMappings {
211203
int32 appId = 1;
212204
int32 groupId = 2;

src/scheduler/Scheduler.cpp

+23-3
Original file line numberDiff line numberDiff line change
@@ -456,12 +456,32 @@ Scheduler::checkForMigrationOpportunities(faabric::Message& msg,
456456
auto groupIdxs = broker.getIdxsRegisteredForGroup(groupId);
457457
groupIdxs.erase(0);
458458
for (const auto& recvIdx : groupIdxs) {
459-
broker.sendMessage(
460-
groupId, 0, recvIdx, BYTES_CONST(&newGroupId), sizeof(int));
459+
// It is safe to send a pointer to the stack, because the
460+
// transport layer will perform an additional copy of the PTP
461+
// message to put it in the message body
462+
// TODO(no-inproc): this may not be true once we move the inproc
463+
// sockets to in-memory queues
464+
faabric::transport::PointToPointMessage msg(
465+
{ .groupId = groupId,
466+
.sendIdx = 0,
467+
.recvIdx = recvIdx,
468+
.dataSize = sizeof(int),
469+
.dataPtr = &newGroupId });
470+
471+
broker.sendMessage(msg);
461472
}
462473
} else if (overwriteNewGroupId == 0) {
463-
std::vector<uint8_t> bytes = broker.recvMessage(groupId, 0, groupIdx);
474+
faabric::transport::PointToPointMessage msg(
475+
{ .groupId = groupId, .sendIdx = 0, .recvIdx = groupIdx });
476+
// TODO(no-order): when we remove the need to order ptp messages we
477+
// should be able to call recv giving it a pre-allocated buffer,
478+
// avoiding the hassle of malloc-ing and free-ing
479+
broker.recvMessage(msg);
480+
std::vector<uint8_t> bytes((uint8_t*)msg.dataPtr,
481+
(uint8_t*)msg.dataPtr + msg.dataSize);
464482
newGroupId = faabric::util::bytesToInt(bytes);
483+
// The previous call makes a copy, so safe to free now
484+
faabric::util::free(msg.dataPtr);
465485
} else {
466486
// In some settings, like tests, we already know the new group id, so
467487
// we can set it here (and in fact, we need to do so when faking two

src/transport/CMakeLists.txt

+1
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ faabric_lib(transport
99
MessageEndpointServer.cpp
1010
PointToPointBroker.cpp
1111
PointToPointClient.cpp
12+
PointToPointMessage.cpp
1213
PointToPointServer.cpp
1314
)
1415

src/transport/MessageEndpointClient.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ void MessageEndpointClient::asyncSend(int header,
3636
sequenceNum);
3737
}
3838

39+
// TODO: consider making an iovec-style scatter/gather alternative signature
3940
void MessageEndpointClient::asyncSend(int header,
4041
const uint8_t* buffer,
4142
size_t bufferSize,

0 commit comments

Comments
 (0)