Skip to content

Commit e4bd371

Browse files
committed
Add multithreaded example for fossa
PUBLISHED_FROM=680a37157fa24884f27d0255379b54193c21f4e8
1 parent e12b02f commit e4bd371

File tree

12 files changed

+329
-20
lines changed

12 files changed

+329
-20
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
PROG = multithreaded_restful_server
2+
SOURCES = $(PROG).c ../../fossa.c
3+
APP_FLAGS = -DNS_ENABLE_THREADS $(CFLAGS_EXTRA)
4+
5+
ifeq ($(OS), Windows_NT)
6+
APP_FLAGS += advapi32.lib
7+
endif
8+
9+
all: $(PROG)
10+
11+
$(PROG): $(SOURCES)
12+
$(CC) $(SOURCES) -o $@ -W -Wall -I../.. -pthread $(APP_FLAGS)
13+
14+
$(PROG).exe: $(SOURCES)
15+
cl $(SOURCES) /I../.. /MD /Fe$@ $(APP_FLAGS)
16+
17+
clean:
18+
rm -rf *.gc* *.dSYM *.exe *.obj *.o a.out $(PROG)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
// Copyright (c) 2015 Cesanta Software Limited
2+
// All rights reserved
3+
4+
// This example shows how to handle long, blocking requests by
5+
// handing off computation to different threads. Here, each
6+
// request spawns a new thread. In a production scenario, a thread
7+
// pools can be used for efficiency, if required.
8+
// Long computation is simulated by sleeping for a random interval.
9+
10+
#include "fossa.h"
11+
12+
static const char *s_http_port = "8000";
13+
14+
static void ev_handler(struct ns_connection *c, int ev, void *p) {
15+
if (ev == NS_HTTP_REQUEST) {
16+
struct http_message *hm = (struct http_message *) p;
17+
char reply[100];
18+
19+
/* Simulate long calculation */
20+
sleep(3);
21+
22+
/* Send the reply */
23+
snprintf(reply, sizeof(reply), "{ \"uri\": \"%.*s\" }\n",
24+
(int) hm->uri.len, hm->uri.p);
25+
ns_printf(c, "HTTP/1.1 200 OK\r\n"
26+
"Content-Type: application/json\r\n"
27+
"Content-Length: %d\r\n"
28+
"\r\n"
29+
"%s",
30+
(int) strlen(reply), reply);
31+
}
32+
}
33+
34+
int main(void) {
35+
struct ns_mgr mgr;
36+
struct ns_connection *nc;
37+
38+
ns_mgr_init(&mgr, NULL);
39+
nc = ns_bind(&mgr, s_http_port, ev_handler);
40+
ns_set_protocol_http_websocket(nc);
41+
42+
/* For each new connection, execute ev_handler in a separate thread */
43+
ns_enable_multithreading(nc);
44+
45+
printf("Starting multi-threaded server on port %s\n", s_http_port);
46+
for (;;) {
47+
ns_mgr_poll(&mgr, 3000);
48+
}
49+
ns_mgr_free(&mgr);
50+
51+
return 0;
52+
}

fossa.c

+119-10
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,9 @@ NS_INTERNAL struct ns_connection *ns_finish_connect(struct ns_connection *nc,
7272
NS_INTERNAL int ns_parse_address(const char *str, union socket_address *sa,
7373
int *proto, char *host, size_t host_len);
7474
NS_INTERNAL void ns_call(struct ns_connection *, int ev, void *ev_data);
75+
NS_INTERNAL void ns_forward(struct ns_connection *, struct ns_connection *);
76+
NS_INTERNAL void ns_add_conn(struct ns_mgr *mgr, struct ns_connection *c);
77+
NS_INTERNAL void ns_remove_conn(struct ns_connection *c);
7578

7679
#ifndef NS_DISABLE_FILESYSTEM
7780
NS_INTERNAL int find_index_file(char *, size_t, const char *, ns_stat_t *);
@@ -1649,15 +1652,16 @@ static void ns_ev_mgr_free(struct ns_mgr *mgr);
16491652
static void ns_ev_mgr_add_conn(struct ns_connection *nc);
16501653
static void ns_ev_mgr_remove_conn(struct ns_connection *nc);
16511654

1652-
static void ns_add_conn(struct ns_mgr *mgr, struct ns_connection *c) {
1655+
NS_INTERNAL void ns_add_conn(struct ns_mgr *mgr, struct ns_connection *c) {
1656+
c->mgr = mgr;
16531657
c->next = mgr->active_connections;
16541658
mgr->active_connections = c;
16551659
c->prev = NULL;
16561660
if (c->next != NULL) c->next->prev = c;
16571661
ns_ev_mgr_add_conn(c);
16581662
}
16591663

1660-
static void ns_remove_conn(struct ns_connection *conn) {
1664+
NS_INTERNAL void ns_remove_conn(struct ns_connection *conn) {
16611665
if (conn->prev == NULL) conn->mgr->active_connections = conn->next;
16621666
if (conn->prev) conn->prev->next = conn->next;
16631667
if (conn->next) conn->next->prev = conn->prev;
@@ -3063,6 +3067,118 @@ int ns_check_ip_acl(const char *acl, uint32_t remote_ip) {
30633067

30643068
return allowed == '+';
30653069
}
3070+
3071+
/* Move data from one connection to another */
3072+
void ns_forward(struct ns_connection *from, struct ns_connection *to) {
3073+
ns_send(to, from->recv_mbuf.buf, from->recv_mbuf.len);
3074+
mbuf_remove(&from->recv_mbuf, from->recv_mbuf.len);
3075+
}
3076+
#ifdef NS_MODULE_LINES
3077+
#line 1 "src/multithreading.c"
3078+
/**/
3079+
#endif
3080+
/*
3081+
* Copyright (c) 2014 Cesanta Software Limited
3082+
* All rights reserved
3083+
*/
3084+
3085+
/* Amalgamated: #include "internal.h" */
3086+
3087+
#ifdef NS_ENABLE_THREADS
3088+
3089+
static void multithreaded_ev_handler(struct ns_connection *c, int ev, void *p);
3090+
3091+
/*
3092+
* This thread function executes user event handler.
3093+
* It runs an event manager that has only one connection, until that
3094+
* connection is alive.
3095+
*/
3096+
static void *per_connection_thread_function(void *param) {
3097+
struct ns_connection *c = (struct ns_connection *) param;
3098+
struct ns_mgr m;
3099+
3100+
ns_mgr_init(&m, NULL);
3101+
ns_add_conn(&m, c);
3102+
while (m.active_connections != NULL) {
3103+
ns_mgr_poll(&m, 1000);
3104+
}
3105+
ns_mgr_free(&m);
3106+
3107+
return param;
3108+
}
3109+
3110+
static void link_conns(struct ns_connection *c1, struct ns_connection *c2) {
3111+
c1->priv_2 = c2;
3112+
c2->priv_2 = c1;
3113+
}
3114+
3115+
static void unlink_conns(struct ns_connection *c) {
3116+
struct ns_connection *peer = (struct ns_connection *) c->priv_2;
3117+
if (peer != NULL) {
3118+
peer->flags |= NSF_SEND_AND_CLOSE;
3119+
peer->priv_2 = NULL;
3120+
}
3121+
c->priv_2 = NULL;
3122+
}
3123+
3124+
static void forwarder_ev_handler(struct ns_connection *c, int ev, void *p) {
3125+
(void) p;
3126+
if (ev == NS_RECV && c->priv_2) {
3127+
ns_forward(c, c->priv_2);
3128+
} else if (ev == NS_CLOSE) {
3129+
unlink_conns(c);
3130+
}
3131+
}
3132+
3133+
static void spawn_handling_thread(struct ns_connection *nc) {
3134+
struct ns_mgr dummy = {};
3135+
sock_t sp[2];
3136+
struct ns_connection *c[2];
3137+
3138+
/*
3139+
* Create a socket pair, and wrap each socket into the connection with
3140+
* dummy event manager.
3141+
* c[0] stays in this thread, c[1] goes to another thread.
3142+
*/
3143+
ns_socketpair(sp, SOCK_STREAM);
3144+
c[0] = ns_add_sock(&dummy, sp[0], forwarder_ev_handler);
3145+
c[1] = ns_add_sock(&dummy, sp[1], nc->listener->priv_1);
3146+
3147+
/* Interlink client connection with c[0] */
3148+
link_conns(c[0], nc);
3149+
3150+
/*
3151+
* Switch c[0] manager from the dummy one to the real one. c[1] manager
3152+
* will be set in another thread, allocated on stack of that thread.
3153+
*/
3154+
ns_add_conn(nc->mgr, c[0]);
3155+
3156+
/*
3157+
* Dress c[1] as nc.
3158+
* TODO(lsm): code in accept_conn() looks similar. Refactor.
3159+
*/
3160+
c[1]->listener = nc->listener;
3161+
c[1]->proto_handler = nc->proto_handler;
3162+
c[1]->proto_data = nc->proto_data;
3163+
c[1]->user_data = nc->user_data;
3164+
3165+
ns_start_thread(per_connection_thread_function, c[1]);
3166+
}
3167+
3168+
static void multithreaded_ev_handler(struct ns_connection *c, int ev, void *p) {
3169+
(void) p;
3170+
if (ev == NS_ACCEPT) {
3171+
spawn_handling_thread(c);
3172+
c->handler = forwarder_ev_handler;
3173+
}
3174+
}
3175+
3176+
void ns_enable_multithreading(struct ns_connection *nc) {
3177+
/* Wrap user event handler into our multithreaded_ev_handler */
3178+
nc->priv_1 = nc->handler;
3179+
nc->handler = multithreaded_ev_handler;
3180+
}
3181+
#endif
30663182
#ifdef NS_MODULE_LINES
30673183
#line 1 "src/http.c"
30683184
/**/
@@ -3625,12 +3741,6 @@ static void free_http_proto_data(struct ns_connection *nc) {
36253741
}
36263742
}
36273743

3628-
/* Move data from one connection to another */
3629-
static void ns_forward(struct ns_connection *from, struct ns_connection *to) {
3630-
ns_send(to, from->recv_mbuf.buf, from->recv_mbuf.len);
3631-
mbuf_remove(&from->recv_mbuf, from->recv_mbuf.len);
3632-
}
3633-
36343744
static void transfer_file_data(struct ns_connection *nc) {
36353745
struct proto_data_http *dp = (struct proto_data_http *) nc->proto_data;
36363746
char buf[NS_MAX_HTTP_SEND_IOBUF];
@@ -5833,8 +5943,7 @@ void ns_hexdump_connection(struct ns_connection *nc, const char *path,
58335943
if ((fp = fopen(path, "a")) != NULL) {
58345944
ns_sock_to_str(nc->sock, src, sizeof(src), 3);
58355945
ns_sock_to_str(nc->sock, dst, sizeof(dst), 7);
5836-
fprintf(fp, "%lu %p %s %s %s %d\n", (unsigned long) time(NULL),
5837-
nc->user_data, src,
5946+
fprintf(fp, "%lu %p %s %s %s %d\n", (unsigned long) time(NULL), nc, src,
58385947
ev == NS_RECV ? "<-" : ev == NS_SEND ? "->" : ev == NS_ACCEPT
58395948
? "<A"
58405949
: ev == NS_CONNECT

fossa.h

+12
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@
137137
#else
138138
#define fseeko(x, y, z) fseek((x), (y), (z))
139139
#endif
140+
#define random() rand()
140141
typedef int socklen_t;
141142
typedef unsigned char uint8_t;
142143
typedef unsigned int uint32_t;
@@ -626,6 +627,8 @@ struct ns_connection {
626627
void *proto_data; /* Protocol-specific data */
627628
ns_event_handler_t handler; /* Event handler function */
628629
void *user_data; /* User-specific data */
630+
void *priv_1; /* Used by ns_enable_multithreading() */
631+
void *priv_2; /* Used by ns_enable_multithreading() */
629632
void *mgr_data; /* Implementation-specific event manager's data. */
630633

631634
unsigned long flags;
@@ -922,6 +925,15 @@ int ns_resolve(const char *domain_name, char *ip_addr_buf, size_t buf_len);
922925
*/
923926
int ns_check_ip_acl(const char *acl, uint32_t remote_ip);
924927

928+
/*
929+
* Enable multi-threaded handling for the given listening connection `nc`.
930+
* For each accepted connection, Mongoose will create a separate thread
931+
* and run event handler in that thread. Thus, if an event hanler is doing
932+
* a blocking call or some long computation, that will not slow down
933+
* other connections.
934+
*/
935+
void ns_enable_multithreading(struct ns_connection *nc);
936+
925937
#ifdef __cplusplus
926938
}
927939
#endif /* __cplusplus */

src/http.c

-6
Original file line numberDiff line numberDiff line change
@@ -556,12 +556,6 @@ static void free_http_proto_data(struct ns_connection *nc) {
556556
}
557557
}
558558

559-
/* Move data from one connection to another */
560-
static void ns_forward(struct ns_connection *from, struct ns_connection *to) {
561-
ns_send(to, from->recv_mbuf.buf, from->recv_mbuf.len);
562-
mbuf_remove(&from->recv_mbuf, from->recv_mbuf.len);
563-
}
564-
565559
static void transfer_file_data(struct ns_connection *nc) {
566560
struct proto_data_http *dp = (struct proto_data_http *) nc->proto_data;
567561
char buf[NS_MAX_HTTP_SEND_IOBUF];

src/internal.h

+3
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,9 @@ NS_INTERNAL struct ns_connection *ns_finish_connect(struct ns_connection *nc,
6767
NS_INTERNAL int ns_parse_address(const char *str, union socket_address *sa,
6868
int *proto, char *host, size_t host_len);
6969
NS_INTERNAL void ns_call(struct ns_connection *, int ev, void *ev_data);
70+
NS_INTERNAL void ns_forward(struct ns_connection *, struct ns_connection *);
71+
NS_INTERNAL void ns_add_conn(struct ns_mgr *mgr, struct ns_connection *c);
72+
NS_INTERNAL void ns_remove_conn(struct ns_connection *c);
7073

7174
#ifndef NS_DISABLE_FILESYSTEM
7275
NS_INTERNAL int find_index_file(char *, size_t, const char *, ns_stat_t *);

src/modules.mk

+1
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ SOURCES = $(COMMON)/mbuf.c \
2828
$(COMMON)/dirent.c \
2929
$(FROZEN)/frozen.c \
3030
net.c \
31+
multithreading.c \
3132
http.c \
3233
util.c \
3334
json-rpc.c \

0 commit comments

Comments
 (0)