Skip to content

Commit 4f091a5

Browse files
committed
Revert "Remove CMux Server And Handling of Requests On Same Port"
This reverts commit 61cbdd7.
1 parent 61cbdd7 commit 4f091a5

File tree

2 files changed

+111
-33
lines changed

2 files changed

+111
-33
lines changed

cmd/query/app/server.go

+87-25
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"io"
1111
"net"
1212
"net/http"
13+
"strings"
1314
"sync"
1415

1516
"github.com/soheilhy/cmux"
@@ -36,18 +37,19 @@ import (
3637
"github.com/jaegertracing/jaeger/proto-gen/api_v2/metrics"
3738
)
3839

39-
var ErrSameGRPCandHTTPPort = errors.New("cannot use same port for grpc and http server")
40-
4140
// Server runs HTTP, Mux and a grpc server
4241
type Server struct {
4342
querySvc *querysvc.QueryService
4443
queryOptions *QueryOptions
4544

46-
grpcConn net.Listener
47-
httpConn net.Listener
48-
grpcServer *grpc.Server
49-
httpServer *httpServer
50-
bgFinished sync.WaitGroup
45+
conn net.Listener
46+
grpcConn net.Listener
47+
httpConn net.Listener
48+
cmuxServer cmux.CMux
49+
grpcServer *grpc.Server
50+
httpServer *httpServer
51+
separatePorts bool
52+
bgFinished sync.WaitGroup
5153
telemetery.Setting
5254
}
5355

@@ -71,8 +73,8 @@ func NewServer(
7173
}
7274
separatePorts := grpcPort != httpPort || grpcPort == "0" || httpPort == "0"
7375

74-
if !separatePorts {
75-
return nil, ErrSameGRPCandHTTPPort
76+
if (options.HTTP.TLSSetting != nil || options.GRPC.TLSSetting != nil) && !separatePorts {
77+
return nil, errors.New("server with TLS enabled can not use same host ports for gRPC and HTTP. Use dedicated HTTP and gRPC host ports instead")
7678
}
7779

7880
grpcServer, err := createGRPCServer(ctx, host, querySvc, metricsQuerySvc, options, tm, telset)
@@ -86,11 +88,12 @@ func NewServer(
8688
}
8789

8890
return &Server{
89-
querySvc: querySvc,
90-
queryOptions: options,
91-
grpcServer: grpcServer,
92-
httpServer: httpServer,
93-
Setting: telset,
91+
querySvc: querySvc,
92+
queryOptions: options,
93+
grpcServer: grpcServer,
94+
httpServer: httpServer,
95+
separatePorts: separatePorts,
96+
Setting: telset,
9497
}, nil
9598
}
9699

@@ -233,32 +236,70 @@ func (hS httpServer) Close() error {
233236
}
234237

235238
// initListener initialises listeners of the server
236-
func (s *Server) initListener(ctx context.Context) error {
237-
var err error
238-
s.grpcConn, err = s.queryOptions.GRPC.NetAddr.Listen(ctx)
239-
if err != nil {
240-
return err
239+
func (s *Server) initListener(ctx context.Context) (cmux.CMux, error) {
240+
if s.separatePorts { // use separate ports and listeners each for gRPC and HTTP requests
241+
var err error
242+
s.grpcConn, err = s.queryOptions.GRPC.NetAddr.Listen(ctx)
243+
if err != nil {
244+
return nil, err
245+
}
246+
247+
s.httpConn, err = s.queryOptions.HTTP.ToListener(ctx)
248+
if err != nil {
249+
return nil, err
250+
}
251+
s.Logger.Info(
252+
"Query server started",
253+
zap.String("http_addr", s.HTTPAddr()),
254+
zap.String("grpc_addr", s.GRPCAddr()),
255+
)
256+
return nil, nil
241257
}
242258

243-
s.httpConn, err = s.queryOptions.HTTP.ToListener(ctx)
259+
// old behavior using cmux
260+
conn, err := net.Listen("tcp", s.queryOptions.HTTP.Endpoint)
244261
if err != nil {
245-
return err
262+
return nil, err
246263
}
264+
265+
s.conn = conn
266+
267+
var tcpPort int
268+
if port, err := netutils.GetPort(s.conn.Addr()); err == nil {
269+
tcpPort = port
270+
}
271+
247272
s.Logger.Info(
248273
"Query server started",
249-
zap.String("http_addr", s.HTTPAddr()),
250-
zap.String("grpc_addr", s.GRPCAddr()),
274+
zap.Int("port", tcpPort),
275+
zap.String("addr", s.queryOptions.HTTP.Endpoint))
276+
277+
// cmux server acts as a reverse-proxy between HTTP and GRPC backends.
278+
cmuxServer := cmux.New(s.conn)
279+
280+
s.grpcConn = cmuxServer.MatchWithWriters(
281+
cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc"),
282+
cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc+proto"),
251283
)
252-
return nil
284+
s.httpConn = cmuxServer.Match(cmux.Any())
253285

286+
return cmuxServer, nil
254287
}
255288

256289
// Start http, GRPC and cmux servers concurrently
257290
func (s *Server) Start(ctx context.Context) error {
258-
err := s.initListener(ctx)
291+
cmuxServer, err := s.initListener(ctx)
259292
if err != nil {
260293
return fmt.Errorf("query server failed to initialize listener: %w", err)
261294
}
295+
s.cmuxServer = cmuxServer
296+
297+
var tcpPort int
298+
if !s.separatePorts {
299+
if port, err := netutils.GetPort(s.conn.Addr()); err == nil {
300+
tcpPort = port
301+
}
302+
}
262303

263304
var httpPort int
264305
if port, err := netutils.GetPort(s.httpConn.Addr()); err == nil {
@@ -303,6 +344,23 @@ func (s *Server) Start(ctx context.Context) error {
303344
s.Logger.Info("GRPC server stopped", zap.Int("port", grpcPort), zap.String("addr", s.queryOptions.GRPC.NetAddr.Endpoint))
304345
}()
305346

347+
// Start cmux server concurrently.
348+
if !s.separatePorts {
349+
s.bgFinished.Add(1)
350+
go func() {
351+
defer s.bgFinished.Done()
352+
s.Logger.Info("Starting CMUX server", zap.Int("port", tcpPort), zap.String("addr", s.queryOptions.HTTP.Endpoint))
353+
354+
err := cmuxServer.Serve()
355+
// TODO: find a way to avoid string comparison. Even though cmux has ErrServerClosed, it's not returned here.
356+
if err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
357+
s.Logger.Error("Could not start multiplexed server", zap.Error(err))
358+
s.ReportStatus(componentstatus.NewFatalErrorEvent(err))
359+
return
360+
}
361+
s.Logger.Info("CMUX server stopped", zap.Int("port", tcpPort), zap.String("addr", s.queryOptions.HTTP.Endpoint))
362+
}()
363+
}
306364
return nil
307365
}
308366

@@ -326,6 +384,10 @@ func (s *Server) Close() error {
326384
s.Logger.Info("Stopping gRPC server")
327385
s.grpcServer.Stop()
328386

387+
if !s.separatePorts {
388+
s.Logger.Info("Closing CMux server")
389+
s.cmuxServer.Close()
390+
}
329391
s.bgFinished.Wait()
330392

331393
s.Logger.Info("Server stopped")

cmd/query/app/server_test.go

+24-8
Original file line numberDiff line numberDiff line change
@@ -657,7 +657,7 @@ func TestServerSinglePort(t *testing.T) {
657657
hostPort := ports.PortToHostPort(ports.QueryHTTP)
658658
querySvc := makeQuerySvc()
659659
telset := initTelSet(flagsSvc.Logger, jtracer.NoOp(), flagsSvc.HC())
660-
_, err := NewServer(context.Background(), componenttest.NewNopHost(), querySvc.qs, nil,
660+
server, err := NewServer(context.Background(), componenttest.NewNopHost(), querySvc.qs, nil,
661661
&QueryOptions{
662662
BearerTokenPropagation: true,
663663
HTTP: confighttp.ServerConfig{
@@ -672,7 +672,24 @@ func TestServerSinglePort(t *testing.T) {
672672
},
673673
tenancy.NewManager(&tenancy.Options{}),
674674
telset)
675-
require.ErrorIs(t, err, ErrSameGRPCandHTTPPort)
675+
require.NoError(t, err)
676+
require.NoError(t, server.Start(context.Background()))
677+
t.Cleanup(func() {
678+
require.NoError(t, server.Close())
679+
})
680+
681+
client := newGRPCClient(t, hostPort)
682+
t.Cleanup(func() {
683+
require.NoError(t, client.conn.Close())
684+
})
685+
686+
// using generous timeout since grpc.NewClient no longer does a handshake.
687+
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
688+
defer cancel()
689+
690+
res, err := client.GetServices(ctx, &api_v2.GetServicesRequest{})
691+
require.NoError(t, err)
692+
assert.Equal(t, querySvc.expectedServices, res.Services)
676693
}
677694

678695
func TestServerGracefulExit(t *testing.T) {
@@ -682,19 +699,18 @@ func TestServerGracefulExit(t *testing.T) {
682699
assert.Equal(t, 0, logs.Len(), "Expected initial ObservedLogs to have zero length.")
683700

684701
flagsSvc.Logger = zap.New(zapCore)
685-
httpHostPort := ports.PortToHostPort(ports.QueryHTTP)
686-
grpcHostPort := ports.PortToHostPort(ports.QueryGRPC)
702+
hostPort := ports.PortToHostPort(ports.QueryAdminHTTP)
687703

688704
querySvc := makeQuerySvc()
689705
telset := initTelSet(flagsSvc.Logger, jtracer.NoOp(), flagsSvc.HC())
690706
server, err := NewServer(context.Background(), componenttest.NewNopHost(), querySvc.qs, nil,
691707
&QueryOptions{
692708
HTTP: confighttp.ServerConfig{
693-
Endpoint: httpHostPort,
709+
Endpoint: hostPort,
694710
},
695711
GRPC: configgrpc.ServerConfig{
696712
NetAddr: confignet.AddrConfig{
697-
Endpoint: grpcHostPort,
713+
Endpoint: hostPort,
698714
Transport: "tcp",
699715
},
700716
},
@@ -705,7 +721,7 @@ func TestServerGracefulExit(t *testing.T) {
705721

706722
// Wait for servers to come up before we can call .Close()
707723
{
708-
client := newGRPCClient(t, grpcHostPort)
724+
client := newGRPCClient(t, hostPort)
709725
t.Cleanup(func() {
710726
require.NoError(t, client.conn.Close())
711727
})
@@ -789,7 +805,7 @@ func TestServerHTTPTenancy(t *testing.T) {
789805
},
790806
GRPC: configgrpc.ServerConfig{
791807
NetAddr: confignet.AddrConfig{
792-
Endpoint: ":8081",
808+
Endpoint: ":8080",
793809
Transport: "tcp",
794810
},
795811
},

0 commit comments

Comments
 (0)