forked from frevib/io_uring-echo-server
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathlink_recv.c
156 lines (122 loc) · 3.52 KB
/
link_recv.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/socket.h>
#include <sys/poll.h>
#include <netinet/in.h>
#include <liburing.h>
#include "global.h"
#define MAX_CONNECTIONS 1024
enum { ACCEPT, POLL, READ, WRITE };
struct conn_info {
__u32 fd;
__u32 type;
};
typedef char buf_type[MAX_CONNECTIONS][MAX_MESSAGE_LEN];
static struct io_uring ring;
static unsigned cqe_count = 0;
static struct io_uring_sqe* get_sqe_safe() {
struct io_uring_sqe *sqe = io_uring_get_sqe(&ring);
if (__builtin_expect(!!sqe, 1)) {
return sqe;
} else {
io_uring_cq_advance(&ring, cqe_count);
cqe_count = 0;
io_uring_submit(&ring);
return io_uring_get_sqe(&ring);
}
}
static void add_accept(int fd, struct sockaddr *client_addr, socklen_t *client_len) {
struct io_uring_sqe *sqe = get_sqe_safe();
struct conn_info conn_i = {
.fd = fd,
.type = ACCEPT,
};
io_uring_prep_accept(sqe, fd, client_addr, client_len, 0);
memcpy(&sqe->user_data, &conn_i, sizeof(conn_i));
}
static void add_poll(int fd, int poll_mask, unsigned flags) {
struct io_uring_sqe *sqe = get_sqe_safe();
struct conn_info conn_i = {
.fd = fd,
.type = POLL,
};
io_uring_prep_poll_add(sqe, fd, poll_mask);
io_uring_sqe_set_flags(sqe, flags);
memcpy(&sqe->user_data, &conn_i, sizeof(conn_i));
}
static void add_socket_read(int fd, size_t size, buf_type *bufs) {
struct io_uring_sqe *sqe = get_sqe_safe();
struct conn_info conn_i = {
.fd = fd,
.type = READ,
};
io_uring_prep_recv(sqe, fd, (*bufs)[fd], size, MSG_NOSIGNAL);
memcpy(&sqe->user_data, &conn_i, sizeof(conn_i));
}
static void add_socket_write(int fd, size_t size, buf_type *bufs, unsigned flags) {
struct io_uring_sqe *sqe = get_sqe_safe();
struct conn_info conn_i = {
.fd = fd,
.type = WRITE,
};
io_uring_prep_send(sqe, fd, (*bufs)[fd], size, MSG_NOSIGNAL);
io_uring_sqe_set_flags(sqe, flags);
memcpy(&sqe->user_data, &conn_i, sizeof(conn_i));
}
int main(int argc, char *argv[]) {
if (argc < 2) {
fprintf(stderr, "Please give a port number: %s [port]\n", argv[0]);
return 1;
}
int portno = strtol(argv[1], NULL, 10);
int sock_listen_fd = init_socket(portno);
if (sock_listen_fd < 0) return -1;
printf("io_uring echo server listening for connections on port: %d\n", portno);
int ret = io_uring_queue_init(BACKLOG, &ring, 0);
if (ret < 0) {
fprintf(stderr, "queue_init: %s\n", strerror(-ret));
return -1;
}
buf_type *bufs = (buf_type *)malloc(sizeof(*bufs));
struct sockaddr_in client_addr;
socklen_t client_len = sizeof(client_addr);
add_accept(sock_listen_fd, (struct sockaddr *)&client_addr, &client_len);
while (1) {
io_uring_submit_and_wait(&ring, 1);
struct io_uring_cqe *cqe;
unsigned head;
io_uring_for_each_cqe(&ring, head, cqe) {
++cqe_count;
struct conn_info conn_i;
memcpy(&conn_i, &cqe->user_data, sizeof(conn_i));
int result = cqe->res;
switch (conn_i.type) {
case ACCEPT:
add_poll(result, POLLIN, IOSQE_IO_LINK);
add_socket_read(result, MAX_MESSAGE_LEN, bufs);
add_accept(sock_listen_fd, (struct sockaddr *)&client_addr, &client_len);
break;
case POLL:
// Do nothing
break;
case READ:
if (__builtin_expect(result <= 0, 0)) {
shutdown(conn_i.fd, SHUT_RDWR);
} else {
add_socket_write(conn_i.fd, result, bufs, 0);
}
break;
case WRITE:
add_poll(conn_i.fd, POLLIN, IOSQE_IO_LINK);
add_socket_read(conn_i.fd, MAX_MESSAGE_LEN, bufs);
break;
}
}
io_uring_cq_advance(&ring, cqe_count);
cqe_count = 0;
}
close(sock_listen_fd);
free(bufs);
}