Skip to content

Commit 080e510

Browse files
mihaitodorrelistan
authored andcommitted
Speed up the Envoy adapter implementation
Use state.EachService() instead of state.ByService() when populating the clusters and listeners for the Envoy gRPC API. This is much more efficient, since state.ByService() does a lot of unnecessary manipulation, including sorting. Also merge EnvoyClustersFromState and EnvoyListenersFromState into one function to avoid locking the state several times.
1 parent 9a2a755 commit 080e510

File tree

2 files changed

+93
-128
lines changed

2 files changed

+93
-128
lines changed

envoy/adapter/adapter.go

+84-113
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,12 @@ const (
2828
ServiceNameSeparator = ":"
2929
)
3030

31+
// EnvoyResources is a collection of Enovy API resource definitions
32+
type EnvoyResources struct {
33+
Clusters []cache.Resource
34+
Listeners []cache.Resource
35+
}
36+
3137
// SvcName formats an Envoy service name from our service name and port
3238
func SvcName(name string, port int64) string {
3339
return fmt.Sprintf("%s%s%d", name, ServiceNameSeparator, port)
@@ -61,57 +67,94 @@ func LookupHost(hostname string) (string, error) {
6167
return addrs[0], nil
6268
}
6369

64-
// EnvoyListenersFromState creates a set of Enovy API listener
65-
// definitions from all the ServicePorts in the Sidecar state.
66-
func EnvoyListenersFromState(state *catalog.ServicesState, bindIP string) ([]cache.Resource, error) {
67-
var listeners []cache.Resource
68-
69-
state.RLock()
70-
defer state.RUnlock()
71-
72-
svcs := state.ByService()
73-
// Loop over all the services by service name
74-
for _, endpoints := range svcs {
75-
if len(endpoints) < 1 {
76-
continue
77-
}
70+
// EnvoyResourcesFromState creates a set of Enovy API resource definitions from all
71+
// the ServicePorts in the Sidecar state. The Sidecar state needs to be locked by the
72+
// caller before calling this function.
73+
func EnvoyResourcesFromState(state *catalog.ServicesState, bindIP string,
74+
useHostnames bool) EnvoyResources {
7875

79-
var svc *service.Service
80-
// Find the first alive service and use that as the definition.
81-
// If none are alive, we won't open the port.
82-
for _, endpoint := range endpoints {
83-
if endpoint.IsAlive() {
84-
svc = endpoint
85-
break
86-
}
87-
}
76+
clusterMap := make(map[string]*api.Cluster)
77+
listenerMap := make(map[string]cache.Resource)
8878

89-
if svc == nil {
90-
continue
79+
state.EachService(func(hostname *string, id *string, svc *service.Service) {
80+
if svc == nil || !svc.IsAlive() {
81+
return
9182
}
9283

93-
// Loop over the ports and generate a named listener for
94-
// each port.
84+
// Loop over the ports and generate a named listener for each port
9585
for _, port := range svc.Ports {
9686
// Only listen on ServicePorts
9787
if port.ServicePort < 1 {
9888
continue
9989
}
10090

101-
listener, err := EnvoyListenerFromService(svc, port.ServicePort, bindIP)
102-
if err != nil {
103-
return nil, fmt.Errorf("failed to create listener from service: %s", err)
91+
envoyServiceName := SvcName(svc.Name, port.ServicePort)
92+
93+
if cluster, ok := clusterMap[envoyServiceName]; ok {
94+
cluster.LoadAssignment.Endpoints[0].LbEndpoints =
95+
append(cluster.LoadAssignment.Endpoints[0].LbEndpoints,
96+
envoyServiceFromService(svc, port.ServicePort, useHostnames)...)
97+
} else {
98+
envoyCluster := &api.Cluster{
99+
Name: envoyServiceName,
100+
ConnectTimeout: &duration.Duration{Nanos: 500000000}, // 500ms
101+
ClusterDiscoveryType: &api.Cluster_Type{Type: api.Cluster_STATIC}, // Use IPs only
102+
ProtocolSelection: api.Cluster_USE_CONFIGURED_PROTOCOL,
103+
// Setting the endpoints here directly bypasses EDS, so we can
104+
// avoid having to configure that as well
105+
// Note that in `EnvoyClustersFromState()` for the REST API we only need
106+
// the first non-nil alive endpoint instance to construct the cluster
107+
// because, in that case, SDS (now EDS) fetches the actual endpoints in a
108+
// separate call.
109+
LoadAssignment: &api.ClusterLoadAssignment{
110+
ClusterName: envoyServiceName,
111+
Endpoints: []*endpoint.LocalityLbEndpoints{{
112+
LbEndpoints: envoyServiceFromService(svc, port.ServicePort, useHostnames),
113+
}},
114+
},
115+
// Contour believes the IdleTimeout should be set to 60s. Not sure if we also need to enable these.
116+
// See here: https://github.com/projectcontour/contour/blob/2858fec20d26f56cc75a19d91b61d625a86f36de/internal/envoy/listener.go#L102-L106
117+
// CommonHttpProtocolOptions: &core.HttpProtocolOptions{
118+
// IdleTimeout: &duration.Duration{Seconds: 60},
119+
// MaxConnectionDuration: &duration.Duration{Seconds: 60},
120+
// },
121+
// If this needs to be enabled, we might also need to set `ProtocolSelection: api.USE_DOWNSTREAM_PROTOCOL`.
122+
// Http2ProtocolOptions: &core.Http2ProtocolOptions{},
123+
}
124+
125+
clusterMap[envoyServiceName] = envoyCluster
126+
}
127+
128+
if _, ok := listenerMap[envoyServiceName]; !ok {
129+
listener, err := envoyListenerFromService(svc, envoyServiceName, port.ServicePort, bindIP)
130+
if err != nil {
131+
log.Errorf("Failed to create Envoy listener for service %q and port %d: %s", svc.Name, port.ServicePort, err)
132+
continue
133+
}
134+
listenerMap[envoyServiceName] = listener
104135
}
105-
listeners = append(listeners, listener)
106136
}
137+
})
138+
139+
clusters := make([]cache.Resource, 0, len(clusterMap))
140+
for _, cluster := range clusterMap {
141+
clusters = append(clusters, cluster)
142+
}
143+
144+
listeners := make([]cache.Resource, 0, len(listenerMap))
145+
for _, listener := range listenerMap {
146+
listeners = append(listeners, listener)
107147
}
108148

109-
return listeners, nil
149+
return EnvoyResources{
150+
Clusters: clusters,
151+
Listeners: listeners,
152+
}
110153
}
111154

112-
// EnvoyListenerFromService creates an Envoy listener from a service instance
113-
func EnvoyListenerFromService(svc *service.Service, port int64, bindIP string) (cache.Resource, error) {
114-
apiName := SvcName(svc.Name, port)
155+
// envoyListenerFromService creates an Envoy listener from a service instance
156+
func envoyListenerFromService(svc *service.Service, envoyServiceName string,
157+
servicePort int64, bindIP string) (cache.Resource, error) {
115158

116159
var connectionManagerName string
117160
var connectionManager proto.Message
@@ -127,7 +170,7 @@ func EnvoyListenerFromService(svc *service.Service, port int64, bindIP string) (
127170
RouteSpecifier: &hcm.HttpConnectionManager_RouteConfig{
128171
RouteConfig: &api.RouteConfiguration{
129172
VirtualHosts: []*route.VirtualHost{{
130-
Name: apiName,
173+
Name: envoyServiceName,
131174
Domains: []string{"*"},
132175
Routes: []*route.Route{{
133176
Match: &route.RouteMatch{
@@ -138,7 +181,7 @@ func EnvoyListenerFromService(svc *service.Service, port int64, bindIP string) (
138181
Action: &route.Route_Route{
139182
Route: &route.RouteAction{
140183
ClusterSpecifier: &route.RouteAction_Cluster{
141-
Cluster: apiName,
184+
Cluster: envoyServiceName,
142185
},
143186
Timeout: &duration.Duration{},
144187
},
@@ -154,7 +197,7 @@ func EnvoyListenerFromService(svc *service.Service, port int64, bindIP string) (
154197
connectionManager = &tcpp.TcpProxy{
155198
StatPrefix: "ingress_tcp",
156199
ClusterSpecifier: &tcpp.TcpProxy_Cluster{
157-
Cluster: apiName,
200+
Cluster: envoyServiceName,
158201
},
159202
}
160203
default:
@@ -173,7 +216,7 @@ func EnvoyListenerFromService(svc *service.Service, port int64, bindIP string) (
173216
SocketAddress: &core.SocketAddress{
174217
Address: bindIP,
175218
PortSpecifier: &core.SocketAddress_PortValue{
176-
PortValue: uint32(port),
219+
PortValue: uint32(servicePort),
177220
},
178221
},
179222
},
@@ -189,80 +232,8 @@ func EnvoyListenerFromService(svc *service.Service, port int64, bindIP string) (
189232
}, nil
190233
}
191234

192-
// EnvoyClustersFromState genenerates a list of Envoy clusters from the
193-
// current Sidecar state
194-
func EnvoyClustersFromState(state *catalog.ServicesState, useHostnames bool) []cache.Resource {
195-
state.RLock()
196-
defer state.RUnlock()
197-
198-
// `s.state.ByService()` returns the list of service endpoints for each service.
199-
// Since some services can expose multiple service ports, we need to create a
200-
// separate cluster for each (service, servicePort) pair. If a service doesn't
201-
// have any endpoints that are alive, we don't want to create a cluster for it.
202-
//
203-
// Note that in `EnvoyClustersFromState()` for the REST API we only need
204-
// the first non-nil alive endpoint instance to construct the cluster
205-
// because, in that case, SDS (now EDS) fetches the actual endpoints in a
206-
// separate call.
207-
var clusters []cache.Resource
208-
clustersMap := make(map[string]*api.Cluster)
209-
for svcName, svcEndpoints := range state.ByService() {
210-
if len(svcEndpoints) < 1 {
211-
continue
212-
}
213-
214-
for _, svcEndpoint := range svcEndpoints {
215-
if svcEndpoint == nil || !svcEndpoint.IsAlive() {
216-
continue
217-
}
218-
219-
for _, port := range svcEndpoint.Ports {
220-
if port.ServicePort < 1 {
221-
continue
222-
}
223-
224-
envoyServiceName := SvcName(svcName, port.ServicePort)
225-
226-
if cluster, ok := clustersMap[envoyServiceName]; ok {
227-
cluster.LoadAssignment.Endpoints[0].LbEndpoints =
228-
append(cluster.LoadAssignment.Endpoints[0].LbEndpoints,
229-
envoyServiceFromService(svcEndpoint, port.ServicePort, useHostnames)...)
230-
} else {
231-
envoyCluster := &api.Cluster{
232-
Name: envoyServiceName,
233-
ConnectTimeout: &duration.Duration{Nanos: 500000000}, // 500ms
234-
ClusterDiscoveryType: &api.Cluster_Type{Type: api.Cluster_STATIC}, // Use IPs only
235-
ProtocolSelection: api.Cluster_USE_CONFIGURED_PROTOCOL,
236-
// Setting the endpoints here directly bypasses EDS, so we can
237-
// avoid having to configure that as well
238-
LoadAssignment: &api.ClusterLoadAssignment{
239-
ClusterName: envoyServiceName,
240-
Endpoints: []*endpoint.LocalityLbEndpoints{{
241-
LbEndpoints: envoyServiceFromService(svcEndpoint, port.ServicePort, useHostnames),
242-
}},
243-
},
244-
// Contour believes the IdleTimeout should be set to 60s. Not sure if we also need to enable these.
245-
// See here: https://github.com/projectcontour/contour/blob/2858fec20d26f56cc75a19d91b61d625a86f36de/internal/envoy/listener.go#L102-L106
246-
// CommonHttpProtocolOptions: &core.HttpProtocolOptions{
247-
// IdleTimeout: &duration.Duration{Seconds: 60},
248-
// MaxConnectionDuration: &duration.Duration{Seconds: 60},
249-
// },
250-
// If this needs to be enabled, we might also need to set `ProtocolSelection: api.USE_DOWNSTREAM_PROTOCOL`.
251-
// Http2ProtocolOptions: &core.Http2ProtocolOptions{},
252-
}
253-
254-
clustersMap[envoyServiceName] = envoyCluster
255-
clusters = append(clusters, envoyCluster)
256-
}
257-
}
258-
}
259-
}
260-
261-
return clusters
262-
}
263-
264-
// envoyServiceFromService converts a Sidecar service to an Envoy
265-
// API service for reporting to the proxy
235+
// envoyServiceFromService converts a Sidecar service to an Envoy API service for
236+
// reporting to the proxy
266237
func envoyServiceFromService(svc *service.Service, svcPort int64, useHostnames bool) []*endpoint.LbEndpoint {
267238
var endpoints []*endpoint.LbEndpoint
268239
for _, port := range svc.Ports {

envoy/server.go

+9-15
Original file line numberDiff line numberDiff line change
@@ -70,19 +70,19 @@ func (s *Server) Run(ctx context.Context, looper director.Looper, grpcListener n
7070
go looper.Loop(func() error {
7171
s.state.RLock()
7272
lastChanged := s.state.LastChanged
73-
s.state.RUnlock()
7473

7574
// Do nothing if the state hasn't changed
7675
if lastChanged == prevStateLastChanged {
76+
s.state.RUnlock()
7777
return nil
7878
}
79+
resources := adapter.EnvoyResourcesFromState(s.state, s.config.BindIP, s.config.UseHostnames)
80+
s.state.RUnlock()
7981

8082
prevStateLastChanged = lastChanged
8183

8284
snapshotVersion := newSnapshotVersion()
8385

84-
clusters := adapter.EnvoyClustersFromState(s.state, s.config.UseHostnames)
85-
8686
// Set the new clusters in the current snapshot to send them along with the
8787
// previous listeners to Envoy. If we would pass in the new listeners too, Envoy
8888
// will complain if it happens to receive the new listeners before the new clusters
@@ -93,39 +93,33 @@ func (s *Server) Run(ctx context.Context, looper director.Looper, grpcListener n
9393
snapshot, err := s.snapshotCache.GetSnapshot(hostname)
9494
if err != nil {
9595
// During the first iteration, there is no existing snapshot, so we create one
96-
snapshot = cache.NewSnapshot(snapshotVersion, nil, clusters, nil, nil, nil)
96+
snapshot = cache.NewSnapshot(snapshotVersion, nil, resources.Clusters, nil, nil, nil)
9797
} else {
98-
snapshot.Resources[cache.Cluster] = cache.NewResources(snapshotVersion, clusters)
98+
snapshot.Resources[cache.Cluster] = cache.NewResources(snapshotVersion, resources.Clusters)
9999
}
100100

101101
err = s.snapshotCache.SetSnapshot(hostname, snapshot)
102102
if err != nil {
103103
log.Errorf("Failed to set new Envoy cache snapshot: %s", err)
104104
return nil
105105
}
106-
log.Infof("Sent %d clusters to Envoy with version %s", len(clusters), snapshotVersion)
107-
108-
listeners, err := adapter.EnvoyListenersFromState(s.state, s.config.BindIP)
109-
if err != nil {
110-
log.Errorf("Failed to create Envoy listeners: %s", err)
111-
return nil
112-
}
106+
log.Infof("Sent %d clusters to Envoy with version %s", len(resources.Clusters), snapshotVersion)
113107

114108
// Create a new snapshot version and, finally, send the updated listeners to Envoy
115109
snapshotVersion = newSnapshotVersion()
116110
err = s.snapshotCache.SetSnapshot(hostname, cache.NewSnapshot(
117111
snapshotVersion,
118112
nil,
119-
clusters,
113+
resources.Clusters,
120114
nil,
121-
listeners,
115+
resources.Listeners,
122116
nil,
123117
))
124118
if err != nil {
125119
log.Errorf("Failed to set new Envoy cache snapshot: %s", err)
126120
return nil
127121
}
128-
log.Infof("Sent %d listeners to Envoy with version %s", len(listeners), snapshotVersion)
122+
log.Infof("Sent %d listeners to Envoy with version %s", len(resources.Listeners), snapshotVersion)
129123

130124
return nil
131125
})

0 commit comments

Comments
 (0)