Skip to content

Commit c96790a

Browse files
[query] Use OTEL's helpers for grpc server (#6055)
<!-- !! Please DELETE this comment before posting. We appreciate your contribution to the Jaeger project! πŸ‘‹πŸŽ‰ --> ## Which problem is this PR solving? - Towards #6026 ## Description of the changes - Migrates the GRPC query server to create the server using OTEL rather than using a custom implementation - Adds a log to warn users that having the same port for GRPC and HTTP is now deprecated. We'll be removing this functionality after Feb 2025. ## How was this change tested? - Unit tests / CI ## Checklist - [x] I have read https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md - [x] I have signed all commits - [x] I have added unit tests for the new functionality - [x] I have run lint and test steps successfully - for `jaeger`: `make lint test` - for `jaeger-ui`: `yarn lint` and `yarn test` --------- Signed-off-by: Mahad Zaryab <[email protected]> Signed-off-by: Mahad Zaryab <[email protected]> Signed-off-by: Yuri Shkuro <[email protected]> Co-authored-by: Yuri Shkuro <[email protected]>
1 parent f9474f9 commit c96790a

File tree

11 files changed

+175
-71
lines changed

11 files changed

+175
-71
lines changed

β€Žcmd/all-in-one/main.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,8 @@ by default uses only in-memory database.`,
133133
if err != nil {
134134
logger.Fatal("Failed to initialize collector", zap.Error(err))
135135
}
136-
qOpts, err := new(queryApp.QueryOptions).InitFromViper(v, logger)
136+
defaultOpts := queryApp.DefaultQueryOptions()
137+
qOpts, err := defaultOpts.InitFromViper(v, logger)
137138
if err != nil {
138139
logger.Fatal("Failed to configure query service", zap.Error(err))
139140
}
@@ -220,11 +221,11 @@ func startQuery(
220221
spanReader = storageMetrics.NewReadMetricsDecorator(spanReader, telset.Metrics)
221222
qs := querysvc.NewQueryService(spanReader, depReader, *queryOpts)
222223

223-
server, err := queryApp.NewServer(qs, metricsQueryService, qOpts, tm, telset)
224+
server, err := queryApp.NewServer(context.Background(), qs, metricsQueryService, qOpts, tm, telset)
224225
if err != nil {
225226
svc.Logger.Fatal("Could not create jaeger-query", zap.Error(err))
226227
}
227-
if err := server.Start(); err != nil {
228+
if err := server.Start(context.Background()); err != nil {
228229
svc.Logger.Fatal("Could not start jaeger-query", zap.Error(err))
229230
}
230231

β€Žcmd/jaeger/internal/extension/jaegerquery/factory.go

+1-15
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,9 @@ import (
77
"context"
88

99
"go.opentelemetry.io/collector/component"
10-
"go.opentelemetry.io/collector/config/configgrpc"
11-
"go.opentelemetry.io/collector/config/confighttp"
12-
"go.opentelemetry.io/collector/config/confignet"
1310
"go.opentelemetry.io/collector/extension"
1411

1512
"github.com/jaegertracing/jaeger/cmd/query/app"
16-
"github.com/jaegertracing/jaeger/ports"
1713
)
1814

1915
// componentType is the name of this extension in configuration.
@@ -28,17 +24,7 @@ func NewFactory() extension.Factory {
2824

2925
func createDefaultConfig() component.Config {
3026
return &Config{
31-
QueryOptions: app.QueryOptions{
32-
HTTP: confighttp.ServerConfig{
33-
Endpoint: ports.PortToHostPort(ports.QueryHTTP),
34-
},
35-
GRPC: configgrpc.ServerConfig{
36-
NetAddr: confignet.AddrConfig{
37-
Endpoint: ports.PortToHostPort(ports.QueryGRPC),
38-
Transport: confignet.TransportTypeTCP,
39-
},
40-
},
41-
},
27+
QueryOptions: app.DefaultQueryOptions(),
4228
}
4329
}
4430

β€Žcmd/jaeger/internal/extension/jaegerquery/server.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ func (*server) Dependencies() []component.ID {
5151
return []component.ID{jaegerstorage.ID}
5252
}
5353

54-
func (s *server) Start(_ context.Context, host component.Host) error {
54+
func (s *server) Start(ctx context.Context, host component.Host) error {
5555
mf := otelmetrics.NewFactory(s.telset.MeterProvider)
5656
baseFactory := mf.Namespace(metrics.NSOptions{Name: "jaeger"})
5757
queryMetricsFactory := baseFactory.Namespace(metrics.NSOptions{Name: "query"})
@@ -100,11 +100,11 @@ func (s *server) Start(_ context.Context, host component.Host) error {
100100
ReportStatus: func(event *componentstatus.Event) {
101101
componentstatus.ReportStatus(host, event)
102102
},
103+
Host: host,
103104
}
104105

105-
// TODO contextcheck linter complains about next line that context is not passed. It is not wrong.
106-
//nolint
107106
s.server, err = queryApp.NewServer(
107+
ctx,
108108
// TODO propagate healthcheck updates up to the collector's runtime
109109
qs,
110110
mqs,
@@ -116,7 +116,7 @@ func (s *server) Start(_ context.Context, host component.Host) error {
116116
return fmt.Errorf("could not create jaeger-query: %w", err)
117117
}
118118

119-
if err := s.server.Start(); err != nil {
119+
if err := s.server.Start(ctx); err != nil {
120120
return fmt.Errorf("could not start jaeger-query: %w", err)
121121
}
122122

β€Žcmd/jaeger/internal/extension/jaegerquery/server_test.go

+2
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/stretchr/testify/require"
1616
"go.opentelemetry.io/collector/component"
1717
"go.opentelemetry.io/collector/component/componenttest"
18+
"go.opentelemetry.io/collector/config/confignet"
1819
"go.opentelemetry.io/collector/config/configtelemetry"
1920
"go.opentelemetry.io/otel/metric"
2021
noopmetric "go.opentelemetry.io/otel/metric/noop"
@@ -211,6 +212,7 @@ func TestServerStart(t *testing.T) {
211212
}
212213
tt.config.HTTP.Endpoint = ":0"
213214
tt.config.GRPC.NetAddr.Endpoint = ":0"
215+
tt.config.GRPC.NetAddr.Transport = confignet.TransportTypeTCP
214216
server := newServer(tt.config, telemetrySettings)
215217
err := server.Start(context.Background(), host)
216218
if tt.expectedErr == "" {

β€Žcmd/query/app/flags.go

+20
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"github.com/spf13/viper"
1919
"go.opentelemetry.io/collector/config/configgrpc"
2020
"go.opentelemetry.io/collector/config/confighttp"
21+
"go.opentelemetry.io/collector/config/confignet"
2122
"go.opentelemetry.io/collector/config/configopaque"
2223
"go.uber.org/zap"
2324

@@ -100,6 +101,11 @@ func AddFlags(flagSet *flag.FlagSet) {
100101
func (qOpts *QueryOptions) InitFromViper(v *viper.Viper, logger *zap.Logger) (*QueryOptions, error) {
101102
qOpts.HTTP.Endpoint = v.GetString(queryHTTPHostPort)
102103
qOpts.GRPC.NetAddr.Endpoint = v.GetString(queryGRPCHostPort)
104+
// TODO: drop support for same host ports
105+
// https://github.com/jaegertracing/jaeger/issues/6117
106+
if qOpts.HTTP.Endpoint == qOpts.GRPC.NetAddr.Endpoint {
107+
logger.Warn("using the same port for gRPC and HTTP is deprecated; please use dedicated ports instead; support for shared ports will be removed in Feb 2025")
108+
}
103109
tlsGrpc, err := tlsGRPCFlagsConfig.InitFromViper(v)
104110
if err != nil {
105111
return qOpts, fmt.Errorf("failed to process gRPC TLS options: %w", err)
@@ -169,3 +175,17 @@ func mapHTTPHeaderToOTELHeaders(h http.Header) map[string]configopaque.String {
169175

170176
return otelHeaders
171177
}
178+
179+
func DefaultQueryOptions() QueryOptions {
180+
return QueryOptions{
181+
HTTP: confighttp.ServerConfig{
182+
Endpoint: ports.PortToHostPort(ports.QueryHTTP),
183+
},
184+
GRPC: configgrpc.ServerConfig{
185+
NetAddr: confignet.AddrConfig{
186+
Endpoint: ports.PortToHostPort(ports.QueryGRPC),
187+
Transport: confignet.TransportTypeTCP,
188+
},
189+
},
190+
}
191+
}

β€Žcmd/query/app/flags_test.go

+25
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"go.uber.org/zap"
1616

1717
"github.com/jaegertracing/jaeger/pkg/config"
18+
"github.com/jaegertracing/jaeger/pkg/testutils"
1819
"github.com/jaegertracing/jaeger/ports"
1920
"github.com/jaegertracing/jaeger/storage/mocks"
2021
spanstore_mocks "github.com/jaegertracing/jaeger/storage/spanstore/mocks"
@@ -183,3 +184,27 @@ func TestQueryOptions_FailedTLSFlags(t *testing.T) {
183184
})
184185
}
185186
}
187+
188+
func TestQueryOptions_SamePortsLogsWarning(t *testing.T) {
189+
logger, logBuf := testutils.NewLogger()
190+
v, command := config.Viperize(AddFlags)
191+
command.ParseFlags([]string{
192+
"--query.http-server.host-port=127.0.0.1:8081",
193+
"--query.grpc-server.host-port=127.0.0.1:8081",
194+
})
195+
_, err := new(QueryOptions).InitFromViper(v, logger)
196+
require.NoError(t, err)
197+
198+
require.Contains(
199+
t,
200+
logBuf.String(),
201+
"using the same port for gRPC and HTTP is deprecated",
202+
)
203+
}
204+
205+
func TestDefaultQueryOptions(t *testing.T) {
206+
qo := DefaultQueryOptions()
207+
require.Equal(t, ":16686", qo.HTTP.Endpoint)
208+
require.Equal(t, ":16685", qo.GRPC.NetAddr.Endpoint)
209+
require.EqualValues(t, "tcp", qo.GRPC.NetAddr.Transport)
210+
}

β€Žcmd/query/app/server.go

+65-12
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,12 @@ import (
1616

1717
"github.com/gorilla/handlers"
1818
"github.com/soheilhy/cmux"
19+
"go.opentelemetry.io/collector/component"
1920
"go.opentelemetry.io/collector/component/componentstatus"
21+
"go.opentelemetry.io/collector/config/configgrpc"
22+
"go.opentelemetry.io/collector/config/configtelemetry"
23+
"go.opentelemetry.io/otel/metric"
24+
"go.opentelemetry.io/otel/metric/noop"
2025
"go.uber.org/zap"
2126
"go.uber.org/zap/zapcore"
2227
"google.golang.org/grpc"
@@ -54,7 +59,9 @@ type Server struct {
5459
}
5560

5661
// NewServer creates and initializes Server
57-
func NewServer(querySvc *querysvc.QueryService,
62+
func NewServer(
63+
ctx context.Context,
64+
querySvc *querysvc.QueryService,
5865
metricsQuerySvc querysvc.MetricsQueryService,
5966
options *QueryOptions,
6067
tm *tenancy.Manager,
@@ -74,12 +81,18 @@ func NewServer(querySvc *querysvc.QueryService,
7481
return nil, errors.New("server with TLS enabled can not use same host ports for gRPC and HTTP. Use dedicated HTTP and gRPC host ports instead")
7582
}
7683

77-
grpcServer, err := createGRPCServer(querySvc, metricsQuerySvc, options, tm, telset)
84+
var grpcServer *grpc.Server
85+
if separatePorts {
86+
grpcServer, err = createGRPCServerLegacy(ctx, options, tm)
87+
} else {
88+
grpcServer, err = createGRPCServerOTEL(ctx, options, tm, telset)
89+
}
7890
if err != nil {
7991
return nil, err
8092
}
93+
registerGRPCHandlers(grpcServer, querySvc, metricsQuerySvc, telset)
8194

82-
httpServer, err := createHTTPServer(querySvc, metricsQuerySvc, options, tm, telset)
95+
httpServer, err := createHTTPServer(ctx, querySvc, metricsQuerySvc, options, tm, telset)
8396
if err != nil {
8497
return nil, err
8598
}
@@ -94,11 +107,15 @@ func NewServer(querySvc *querysvc.QueryService,
94107
}, nil
95108
}
96109

97-
func createGRPCServer(querySvc *querysvc.QueryService, metricsQuerySvc querysvc.MetricsQueryService, options *QueryOptions, tm *tenancy.Manager, telset telemetery.Setting) (*grpc.Server, error) {
110+
func createGRPCServerLegacy(
111+
ctx context.Context,
112+
options *QueryOptions,
113+
tm *tenancy.Manager,
114+
) (*grpc.Server, error) {
98115
var grpcOpts []grpc.ServerOption
99116

100117
if options.GRPC.TLSSetting != nil {
101-
tlsCfg, err := options.GRPC.TLSSetting.LoadTLSConfig(context.Background())
118+
tlsCfg, err := options.GRPC.TLSSetting.LoadTLSConfig(ctx)
102119
if err != nil {
103120
return nil, err
104121
}
@@ -108,15 +125,24 @@ func createGRPCServer(querySvc *querysvc.QueryService, metricsQuerySvc querysvc.
108125
grpcOpts = append(grpcOpts, grpc.Creds(creds))
109126
}
110127
if tm.Enabled {
128+
//nolint:contextcheck
111129
grpcOpts = append(grpcOpts,
112130
grpc.StreamInterceptor(tenancy.NewGuardingStreamInterceptor(tm)),
113131
grpc.UnaryInterceptor(tenancy.NewGuardingUnaryInterceptor(tm)),
114132
)
115133
}
116134

117135
server := grpc.NewServer(grpcOpts...)
118-
reflection.Register(server)
136+
return server, nil
137+
}
119138

139+
func registerGRPCHandlers(
140+
server *grpc.Server,
141+
querySvc *querysvc.QueryService,
142+
metricsQuerySvc querysvc.MetricsQueryService,
143+
telset telemetery.Setting,
144+
) {
145+
reflection.Register(server)
120146
handler := NewGRPCHandler(querySvc, metricsQuerySvc, GRPCHandlerOptions{
121147
Logger: telset.Logger,
122148
})
@@ -131,7 +157,33 @@ func createGRPCServer(querySvc *querysvc.QueryService, metricsQuerySvc querysvc.
131157
healthServer.SetServingStatus("jaeger.api_v3.QueryService", grpc_health_v1.HealthCheckResponse_SERVING)
132158

133159
grpc_health_v1.RegisterHealthServer(server, healthServer)
134-
return server, nil
160+
}
161+
162+
func createGRPCServerOTEL(
163+
ctx context.Context,
164+
options *QueryOptions,
165+
tm *tenancy.Manager,
166+
telset telemetery.Setting,
167+
) (*grpc.Server, error) {
168+
var grpcOpts []configgrpc.ToServerOption
169+
if tm.Enabled {
170+
//nolint:contextcheck
171+
grpcOpts = append(grpcOpts,
172+
configgrpc.WithGrpcServerOption(grpc.StreamInterceptor(tenancy.NewGuardingStreamInterceptor(tm))),
173+
configgrpc.WithGrpcServerOption(grpc.UnaryInterceptor(tenancy.NewGuardingUnaryInterceptor(tm))),
174+
)
175+
}
176+
return options.GRPC.ToServer(
177+
ctx,
178+
telset.Host,
179+
component.TelemetrySettings{
180+
Logger: telset.Logger,
181+
TracerProvider: telset.TracerProvider,
182+
LeveledMeterProvider: func(_ configtelemetry.Level) metric.MeterProvider {
183+
return noop.NewMeterProvider()
184+
},
185+
},
186+
grpcOpts...)
135187
}
136188

137189
type httpServer struct {
@@ -142,6 +194,7 @@ type httpServer struct {
142194
var _ io.Closer = (*httpServer)(nil)
143195

144196
func createHTTPServer(
197+
ctx context.Context,
145198
querySvc *querysvc.QueryService,
146199
metricsQuerySvc querysvc.MetricsQueryService,
147200
queryOpts *QueryOptions,
@@ -189,7 +242,7 @@ func createHTTPServer(
189242
}
190243

191244
if queryOpts.HTTP.TLSSetting != nil {
192-
tlsCfg, err := queryOpts.HTTP.TLSSetting.LoadTLSConfig(context.Background()) // This checks if the certificates are correctly provided
245+
tlsCfg, err := queryOpts.HTTP.TLSSetting.LoadTLSConfig(ctx) // This checks if the certificates are correctly provided
193246
if err != nil {
194247
return nil, err
195248
}
@@ -209,10 +262,10 @@ func (hS httpServer) Close() error {
209262
}
210263

211264
// initListener initialises listeners of the server
212-
func (s *Server) initListener() (cmux.CMux, error) {
265+
func (s *Server) initListener(ctx context.Context) (cmux.CMux, error) {
213266
if s.separatePorts { // use separate ports and listeners each for gRPC and HTTP requests
214267
var err error
215-
s.grpcConn, err = net.Listen("tcp", s.queryOptions.GRPC.NetAddr.Endpoint)
268+
s.grpcConn, err = s.queryOptions.GRPC.NetAddr.Listen(ctx)
216269
if err != nil {
217270
return nil, err
218271
}
@@ -260,8 +313,8 @@ func (s *Server) initListener() (cmux.CMux, error) {
260313
}
261314

262315
// Start http, GRPC and cmux servers concurrently
263-
func (s *Server) Start() error {
264-
cmuxServer, err := s.initListener()
316+
func (s *Server) Start(ctx context.Context) error {
317+
cmuxServer, err := s.initListener(ctx)
265318
if err != nil {
266319
return fmt.Errorf("query server failed to initialize listener: %w", err)
267320
}

0 commit comments

Comments
Β (0)