@@ -10,6 +10,7 @@ import (
10
10
"io"
11
11
"net"
12
12
"net/http"
13
+ "strings"
13
14
"sync"
14
15
15
16
"github.com/soheilhy/cmux"
@@ -36,18 +37,19 @@ import (
36
37
"github.com/jaegertracing/jaeger/proto-gen/api_v2/metrics"
37
38
)
38
39
39
- var ErrSameGRPCandHTTPPort = errors .New ("cannot use same port for grpc and http server" )
40
-
41
40
// Server runs HTTP, Mux and a grpc server
42
41
type Server struct {
43
42
querySvc * querysvc.QueryService
44
43
queryOptions * QueryOptions
45
44
46
- grpcConn net.Listener
47
- httpConn net.Listener
48
- grpcServer * grpc.Server
49
- httpServer * httpServer
50
- bgFinished sync.WaitGroup
45
+ conn net.Listener
46
+ grpcConn net.Listener
47
+ httpConn net.Listener
48
+ cmuxServer cmux.CMux
49
+ grpcServer * grpc.Server
50
+ httpServer * httpServer
51
+ separatePorts bool
52
+ bgFinished sync.WaitGroup
51
53
telemetery.Setting
52
54
}
53
55
@@ -71,8 +73,8 @@ func NewServer(
71
73
}
72
74
separatePorts := grpcPort != httpPort || grpcPort == "0" || httpPort == "0"
73
75
74
- if ! separatePorts {
75
- return nil , ErrSameGRPCandHTTPPort
76
+ if ( options . HTTP . TLSSetting != nil || options . GRPC . TLSSetting != nil ) && ! separatePorts {
77
+ return nil , errors . New ( "server with TLS enabled can not use same host ports for gRPC and HTTP. Use dedicated HTTP and gRPC host ports instead" )
76
78
}
77
79
78
80
grpcServer , err := createGRPCServer (ctx , host , querySvc , metricsQuerySvc , options , tm , telset )
@@ -86,11 +88,12 @@ func NewServer(
86
88
}
87
89
88
90
return & Server {
89
- querySvc : querySvc ,
90
- queryOptions : options ,
91
- grpcServer : grpcServer ,
92
- httpServer : httpServer ,
93
- Setting : telset ,
91
+ querySvc : querySvc ,
92
+ queryOptions : options ,
93
+ grpcServer : grpcServer ,
94
+ httpServer : httpServer ,
95
+ separatePorts : separatePorts ,
96
+ Setting : telset ,
94
97
}, nil
95
98
}
96
99
@@ -233,32 +236,70 @@ func (hS httpServer) Close() error {
233
236
}
234
237
235
238
// initListener initialises listeners of the server
236
- func (s * Server ) initListener (ctx context.Context ) error {
237
- var err error
238
- s .grpcConn , err = s .queryOptions .GRPC .NetAddr .Listen (ctx )
239
- if err != nil {
240
- return err
239
+ func (s * Server ) initListener (ctx context.Context ) (cmux.CMux , error ) {
240
+ if s .separatePorts { // use separate ports and listeners each for gRPC and HTTP requests
241
+ var err error
242
+ s .grpcConn , err = s .queryOptions .GRPC .NetAddr .Listen (ctx )
243
+ if err != nil {
244
+ return nil , err
245
+ }
246
+
247
+ s .httpConn , err = s .queryOptions .HTTP .ToListener (ctx )
248
+ if err != nil {
249
+ return nil , err
250
+ }
251
+ s .Logger .Info (
252
+ "Query server started" ,
253
+ zap .String ("http_addr" , s .HTTPAddr ()),
254
+ zap .String ("grpc_addr" , s .GRPCAddr ()),
255
+ )
256
+ return nil , nil
241
257
}
242
258
243
- s .httpConn , err = s .queryOptions .HTTP .ToListener (ctx )
259
+ // old behavior using cmux
260
+ conn , err := net .Listen ("tcp" , s .queryOptions .HTTP .Endpoint )
244
261
if err != nil {
245
- return err
262
+ return nil , err
246
263
}
264
+
265
+ s .conn = conn
266
+
267
+ var tcpPort int
268
+ if port , err := netutils .GetPort (s .conn .Addr ()); err == nil {
269
+ tcpPort = port
270
+ }
271
+
247
272
s .Logger .Info (
248
273
"Query server started" ,
249
- zap .String ("http_addr" , s .HTTPAddr ()),
250
- zap .String ("grpc_addr" , s .GRPCAddr ()),
274
+ zap .Int ("port" , tcpPort ),
275
+ zap .String ("addr" , s .queryOptions .HTTP .Endpoint ))
276
+
277
+ // cmux server acts as a reverse-proxy between HTTP and GRPC backends.
278
+ cmuxServer := cmux .New (s .conn )
279
+
280
+ s .grpcConn = cmuxServer .MatchWithWriters (
281
+ cmux .HTTP2MatchHeaderFieldSendSettings ("content-type" , "application/grpc" ),
282
+ cmux .HTTP2MatchHeaderFieldSendSettings ("content-type" , "application/grpc+proto" ),
251
283
)
252
- return nil
284
+ s . httpConn = cmuxServer . Match ( cmux . Any ())
253
285
286
+ return cmuxServer , nil
254
287
}
255
288
256
289
// Start http, GRPC and cmux servers concurrently
257
290
func (s * Server ) Start (ctx context.Context ) error {
258
- err := s .initListener (ctx )
291
+ cmuxServer , err := s .initListener (ctx )
259
292
if err != nil {
260
293
return fmt .Errorf ("query server failed to initialize listener: %w" , err )
261
294
}
295
+ s .cmuxServer = cmuxServer
296
+
297
+ var tcpPort int
298
+ if ! s .separatePorts {
299
+ if port , err := netutils .GetPort (s .conn .Addr ()); err == nil {
300
+ tcpPort = port
301
+ }
302
+ }
262
303
263
304
var httpPort int
264
305
if port , err := netutils .GetPort (s .httpConn .Addr ()); err == nil {
@@ -303,6 +344,23 @@ func (s *Server) Start(ctx context.Context) error {
303
344
s .Logger .Info ("GRPC server stopped" , zap .Int ("port" , grpcPort ), zap .String ("addr" , s .queryOptions .GRPC .NetAddr .Endpoint ))
304
345
}()
305
346
347
+ // Start cmux server concurrently.
348
+ if ! s .separatePorts {
349
+ s .bgFinished .Add (1 )
350
+ go func () {
351
+ defer s .bgFinished .Done ()
352
+ s .Logger .Info ("Starting CMUX server" , zap .Int ("port" , tcpPort ), zap .String ("addr" , s .queryOptions .HTTP .Endpoint ))
353
+
354
+ err := cmuxServer .Serve ()
355
+ // TODO: find a way to avoid string comparison. Even though cmux has ErrServerClosed, it's not returned here.
356
+ if err != nil && ! strings .Contains (err .Error (), "use of closed network connection" ) {
357
+ s .Logger .Error ("Could not start multiplexed server" , zap .Error (err ))
358
+ s .ReportStatus (componentstatus .NewFatalErrorEvent (err ))
359
+ return
360
+ }
361
+ s .Logger .Info ("CMUX server stopped" , zap .Int ("port" , tcpPort ), zap .String ("addr" , s .queryOptions .HTTP .Endpoint ))
362
+ }()
363
+ }
306
364
return nil
307
365
}
308
366
@@ -326,6 +384,10 @@ func (s *Server) Close() error {
326
384
s .Logger .Info ("Stopping gRPC server" )
327
385
s .grpcServer .Stop ()
328
386
387
+ if ! s .separatePorts {
388
+ s .Logger .Info ("Closing CMux server" )
389
+ s .cmuxServer .Close ()
390
+ }
329
391
s .bgFinished .Wait ()
330
392
331
393
s .Logger .Info ("Server stopped" )
0 commit comments