Skip to content

Commit 19e0a36

Browse files
authored
add a stat for packets dropped from socket receive buffer (#214)
* wip: added stats for dropped packets (SO_RXQ_OVFL) * linux: add a socket->packets_dropped_by_kernel stat to detect receive buffer overrun. uses linux-only socket option SO_RXQ_OVFL * fixes
1 parent d0735c7 commit 19e0a36

File tree

6 files changed

+65
-0
lines changed

6 files changed

+65
-0
lines changed

include/udx.h

+3
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ struct udx_socket_s {
140140
udx_t *udx;
141141
udx_cirbuf_t *streams_by_id; // for convenience
142142

143+
bool cmsg_wanted; // include a control buffer for recvmsg
143144
int family;
144145
int status;
145146
int readers;
@@ -157,6 +158,8 @@ struct udx_socket_s {
157158

158159
uint64_t packets_rx;
159160
uint64_t packets_tx;
161+
162+
int64_t packets_dropped_by_kernel;
160163
};
161164

162165
typedef struct udx_cong_s {

src/io.h

+3
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,7 @@ udx__sendmsg (udx_socket_t *handle, const uv_buf_t bufs[], unsigned int bufs_len
1212
ssize_t
1313
udx__recvmsg (udx_socket_t *handle, uv_buf_t *buf, struct sockaddr *addr, int addr_len);
1414

15+
int
16+
udx__udp_set_rxq_ovfl (int fd);
17+
1518
#endif // UDX_IO_H

src/io_posix.c

+39
Original file line numberDiff line numberDiff line change
@@ -90,9 +90,48 @@ udx__recvmsg (udx_socket_t *handle, uv_buf_t *buf, struct sockaddr *addr, int ad
9090
h.msg_iov = (struct iovec *) buf;
9191
h.msg_iovlen = 1;
9292

93+
union {
94+
struct cmsghdr align;
95+
uint8_t buf[2048];
96+
} u;
97+
98+
h.msg_control = u.buf;
99+
h.msg_controllen = sizeof(u.buf);
100+
93101
do {
94102
size = recvmsg(handle->io_poll.io_watcher.fd, &h, 0);
95103
} while (size == -1 && errno == EINTR);
96104

105+
#if defined(__linux__)
106+
107+
// relies on SO_RXQ_OVFL being set
108+
uint32_t packets_dropped_by_kernel = 0;
109+
110+
for (struct cmsghdr *cmsg = CMSG_FIRSTHDR(&h); cmsg != NULL; cmsg = CMSG_NXTHDR(&h, cmsg)) {
111+
if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SO_RXQ_OVFL) {
112+
packets_dropped_by_kernel = *(uint32_t *) CMSG_DATA(cmsg);
113+
}
114+
}
115+
116+
if (packets_dropped_by_kernel) {
117+
handle->packets_dropped_by_kernel = packets_dropped_by_kernel;
118+
}
119+
120+
#endif
121+
97122
return size == -1 ? uv_translate_sys_error(errno) : size;
98123
}
124+
125+
#if defined(__linux__)
126+
int
127+
udx__udp_set_rxq_ovfl (int fd) {
128+
int on = 1;
129+
return setsockopt(fd, SOL_SOCKET, SO_RXQ_OVFL, &on, sizeof(on));
130+
}
131+
#else
132+
int
133+
udx__udp_set_rxq_ovfl (int fd) {
134+
UDX_UNUSED(fd);
135+
return -1;
136+
}
137+
#endif

src/io_win.c

+6
Original file line numberDiff line numberDiff line change
@@ -81,3 +81,9 @@ udx__recvmsg (udx_socket_t *socket, uv_buf_t *buf, struct sockaddr *addr, int ad
8181

8282
return bytes;
8383
}
84+
85+
int
86+
udx__udp_set_rxq_ovfl (int fd) {
87+
UDX_UNUSED(fd);
88+
return -1;
89+
}

src/udx.c

+9
Original file line numberDiff line numberDiff line change
@@ -1991,6 +1991,9 @@ udx_socket_init (udx_t *udx, udx_socket_t *socket) {
19911991
socket->packets_rx = 0;
19921992
socket->packets_tx = 0;
19931993

1994+
socket->packets_dropped_by_kernel = -1;
1995+
socket->cmsg_wanted = false;
1996+
19941997
uv_udp_t *handle = &(socket->handle);
19951998
udx__queue_init(&socket->send_queue);
19961999

@@ -2080,6 +2083,12 @@ udx_socket_bind (udx_socket_t *socket, const struct sockaddr *addr, unsigned int
20802083
err = uv_poll_init_socket(socket->udx->loop, poll, (uv_os_sock_t) fd);
20812084
assert(err == 0);
20822085

2086+
err = udx__udp_set_rxq_ovfl(fd);
2087+
if (!err) {
2088+
socket->cmsg_wanted = true;
2089+
socket->packets_dropped_by_kernel = 0;
2090+
}
2091+
20832092
socket->status |= UDX_SOCKET_BOUND;
20842093
poll->data = socket;
20852094

test/stream-write-read-perf.c

+5
Original file line numberDiff line numberDiff line change
@@ -149,5 +149,10 @@ main () {
149149
printf("stats: stream a: bytes_rx=%" PRIu64 " packets_rx=%" PRIu64 " bytes_tx=%" PRIu64 " packets_tx=%" PRIu64 "\n", astream.bytes_rx, astream.packets_rx, astream.bytes_tx, astream.packets_tx);
150150
printf("stats: stream b: bytes_rx=%" PRIu64 " packets_rx=%" PRIu64 " bytes_tx=%" PRIu64 " packets_tx=%" PRIu64 "\n", bstream.bytes_rx, bstream.packets_rx, bstream.bytes_tx, bstream.packets_tx);
151151

152+
if (asock.packets_dropped_by_kernel != -1 && bsock.packets_dropped_by_kernel != -1) {
153+
printf("stats: socket a: packets_dropped=%" PRIi64 "\n", asock.packets_dropped_by_kernel);
154+
printf("stats: socket b: packets_dropped=%" PRIi64 "\n", bsock.packets_dropped_by_kernel);
155+
}
156+
152157
return 0;
153158
}

0 commit comments

Comments
 (0)