Skip to content

Commit 78c1bc6

Browse files
jthomas43mafintosh
authored andcommitted
Feature/combined writes with writev (#177)
1 parent 7269ab5 commit 78c1bc6

14 files changed

+354
-233
lines changed

examples/server.c

+2-2
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ on_ack_end (udx_stream_write_t *req, int status, int unordered) {
5353
static void
5454
pump_writes () {
5555
while (bytes_sent < PUMP_BYTES) {
56-
udx_stream_write_t *req = malloc(sizeof(udx_stream_write_t));
56+
udx_stream_write_t *req = malloc(udx_stream_write_sizeof(1));
5757
bytes_sent += chunk.len;
5858

5959
if (udx_stream_write(req, &stream, &chunk, 1, on_ack)) continue;
@@ -62,7 +62,7 @@ pump_writes () {
6262
return;
6363
}
6464

65-
udx_stream_write_t *req = malloc(sizeof(udx_stream_write_t));
65+
udx_stream_write_t *req = malloc(udx_stream_write_sizeof(1));
6666
udx_stream_write_end(req, &stream, &empty, 1, on_ack_end);
6767
}
6868

include/udx.h

+28-10
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ typedef struct udx_packet_s udx_packet_t;
9494
typedef struct udx_socket_send_s udx_socket_send_t;
9595
typedef struct udx_stream_send_s udx_stream_send_t;
9696
typedef struct udx_stream_write_s udx_stream_write_t;
97+
typedef struct udx_stream_write_buf_s udx_stream_write_buf_t;
9798

9899
typedef enum {
99100
UDX_LOOKUP_FAMILY_IPV4 = 1,
@@ -270,54 +271,68 @@ struct udx_packet_s {
270271
int ttl;
271272
int is_retransmit;
272273

273-
udx_stream_t *stream; // pointer to the stream if stream packet
274-
275274
uint8_t transmits;
276275
bool is_mtu_probe;
277276
uint16_t size;
278277
uint64_t time_sent;
279278

280-
void *ctx;
279+
void *ctx; // stream_send_t | socket_send_t | stream_t
281280

282281
struct sockaddr_storage dest;
283282
int dest_len;
284283

285284
uint32_t fifo_gc; // for removing from inflight / retransmit queue
286-
// udx_packet_t *prev; // alternative for inflight / retransmit queues
287-
// udx_packet_t *next; // alternative for inflight / retransmit queues
288285

289286
// just alloc it in place here, easier to manage
290287
char header[UDX_HEADER_SIZE];
291-
unsigned int bufs_len;
292-
uv_buf_t bufs[3];
288+
unsigned short nbufs;
289+
290+
// inefficient - only relevant for stream_t packets
291+
unsigned short nwbufs;
292+
udx_stream_write_buf_t **wbufs;
293293
};
294294

295295
struct udx_socket_send_s {
296296
udx_packet_t pkt;
297+
uv_buf_t bufs[1]; // buf_t[] must be after packet_t
297298
udx_socket_t *socket;
298299

299300
udx_socket_send_cb on_send;
300301

301302
void *data;
302303
};
303304

304-
struct udx_stream_write_s {
305-
// immutable, original write
305+
struct udx_stream_write_buf_s {
306+
// immutable original buf
306307
uv_buf_t buf;
307308

308-
size_t bytes_acked;
309+
// 1. remove from write_queue when bytes_inflight + bytes_acked == buf.len
310+
// 2. free when bytes_acked == buf.len
309311
size_t bytes_inflight;
312+
size_t bytes_acked;
313+
314+
udx_stream_write_t *write;
315+
316+
bool is_write_end;
317+
};
310318

319+
struct udx_stream_write_s {
320+
size_t size;
321+
size_t bytes_acked;
311322
bool is_write_end;
312323

313324
udx_stream_t *stream;
314325
udx_stream_ack_cb on_ack;
315326

316327
void *data;
328+
329+
unsigned int nwbufs;
330+
udx_stream_write_buf_t wbuf[];
317331
};
318332

319333
struct udx_stream_send_s {
320334
udx_packet_t pkt;
335+
uv_buf_t bufs[3]; // buf_t[] must be after packet_t
321336
udx_stream_t *stream;
322337

323338
udx_stream_send_cb on_send;
@@ -449,6 +464,9 @@ udx_stream_send (udx_stream_send_t *req, udx_stream_t *stream, const uv_buf_t bu
449464
int
450465
udx_stream_write_resume (udx_stream_t *stream, udx_stream_drain_cb drain_cb);
451466

467+
int
468+
udx_stream_write_sizeof (int nwbufs);
469+
452470
int
453471
udx_stream_write (udx_stream_write_t *req, udx_stream_t *stream, const uv_buf_t bufs[], unsigned int bufs_len, udx_stream_ack_cb ack_cb);
454472

src/io_posix.c

+3-3
Original file line numberDiff line numberDiff line change
@@ -142,8 +142,8 @@ udx__on_writable (udx_socket_t *socket) {
142142
p->msg_hdr.msg_name = &pkt->dest;
143143
p->msg_hdr.msg_namelen = pkt->dest_len;
144144

145-
p->msg_hdr.msg_iov = (struct iovec *) pkt->bufs;
146-
p->msg_hdr.msg_iovlen = pkt->bufs_len;
145+
p->msg_hdr.msg_iov = (struct iovec *) (pkt + 1);
146+
p->msg_hdr.msg_iovlen = pkt->nbufs;
147147

148148
npkts++;
149149
}
@@ -198,7 +198,7 @@ udx__on_writable (udx_socket_t *socket) {
198198
pkt->dest_len = sizeof(struct sockaddr_in6);
199199
}
200200

201-
ssize_t size = udx__sendmsg(socket, pkt->bufs, pkt->bufs_len, (struct sockaddr *) &(pkt->dest), pkt->dest_len);
201+
ssize_t size = udx__sendmsg(socket, (uv_buf_t *) (pkt + 1), pkt->nbufs, (struct sockaddr *) &(pkt->dest), pkt->dest_len);
202202

203203
if (adjust_ttl) uv_udp_set_ttl((uv_udp_t *) socket, socket->ttl);
204204

src/io_win.c

+1-1
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ udx__on_writable (udx_socket_t *socket) {
9898
pkt->dest_len = sizeof(struct sockaddr_in6);
9999
}
100100

101-
ssize_t size = udx__sendmsg(socket, pkt->bufs, pkt->bufs_len, (struct sockaddr *) &(pkt->dest), pkt->dest_len);
101+
ssize_t size = udx__sendmsg(socket, (uv_buf_t *) (pkt + 1), pkt->nbufs, (struct sockaddr *) &(pkt->dest), pkt->dest_len);
102102

103103
if (adjust_ttl) uv_udp_set_ttl((uv_udp_t *) socket, socket->ttl);
104104

0 commit comments

Comments
 (0)