Skip to content

Commit 1e203e5

Browse files
authored
Kubernetes: Support Discovery Only for this Node (#70)
* Support limiting K8s disco to this node * Augment README
1 parent d309fc2 commit 1e203e5

File tree

5 files changed

+139
-61
lines changed

5 files changed

+139
-61
lines changed

README.md

+3
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,9 @@ Defaults are in bold at the end of the line:
232232
* `KUBE_TIMEOUT`: How long until we time out calling the Kube API? **`3s`**
233233
* `CREDS_PATH`: Where do we find the token file containing API auth credentials?
234234
**`/var/run/secrets/kubernetes.io/serviceaccount`**
235+
* `ANNOUNCE_ALL_NODES`: Should we query the API and announce every node is running
236+
the service? This is useful to represent the K8s cluster with a single Sidecar.
237+
**`false`**
235238

236239
### Ports
237240

config/config.go

+6-5
Original file line numberDiff line numberDiff line change
@@ -66,11 +66,12 @@ type StaticConfig struct {
6666
}
6767

6868
type K8sAPIConfig struct {
69-
KubeAPIIP string `envconfig:"KUBE_API_IP" default:"127.0.0.1"`
70-
KubeAPIPort int `envconfig:"KUBE_API_PORT" default:"8080"`
71-
Namespace string `envconfig:"NAMESPACE" default:"default"`
72-
KubeTimeout time.Duration `envconfig:"KUBE_TIMEOUT" default:"3s"`
73-
CredsPath string `envconfig:"CREDS_PATH" default:"/var/run/secrets/kubernetes.io/serviceaccount"`
69+
KubeAPIIP string `envconfig:"KUBE_API_IP" default:"127.0.0.1"`
70+
KubeAPIPort int `envconfig:"KUBE_API_PORT" default:"8080"`
71+
Namespace string `envconfig:"NAMESPACE" default:"default"`
72+
KubeTimeout time.Duration `envconfig:"KUBE_TIMEOUT" default:"3s"`
73+
CredsPath string `envconfig:"CREDS_PATH" default:"/var/run/secrets/kubernetes.io/serviceaccount"`
74+
AnnounceAllNodes bool `envconfig:"ANNOUNCE_ALL_NODES" default:"false"`
7475
}
7576

7677
type Config struct {

discovery/kubernetes_api_discovery.go

+67-42
Original file line numberDiff line numberDiff line change
@@ -19,69 +19,94 @@ type K8sAPIDiscoverer struct {
1919

2020
Command K8sDiscoveryAdapter
2121

22-
discoveredSvcs *K8sServices
23-
discoveredNodes *K8sNodes
24-
lock sync.RWMutex
22+
discoveredSvcs *K8sServices
23+
discoveredNodes *K8sNodes
24+
lock sync.RWMutex
25+
announceAllNodes bool
26+
hostname string
2527
}
2628

2729
// NewK8sAPIDiscoverer returns a properly configured K8sAPIDiscoverer
28-
func NewK8sAPIDiscoverer(kubeHost string, kubePort int, namespace string, timeout time.Duration, credsPath string) *K8sAPIDiscoverer {
30+
func NewK8sAPIDiscoverer(kubeHost string, kubePort int, namespace string, timeout time.Duration,
31+
credsPath string, announceAllNodes bool, hostname string) *K8sAPIDiscoverer {
2932

3033
cmd := NewKubeAPIDiscoveryCommand(kubeHost, kubePort, namespace, timeout, credsPath)
3134

3235
return &K8sAPIDiscoverer{
33-
discoveredSvcs: &K8sServices{},
34-
discoveredNodes: &K8sNodes{},
35-
Namespace: namespace,
36-
Command: cmd,
36+
discoveredSvcs: &K8sServices{},
37+
discoveredNodes: &K8sNodes{},
38+
Namespace: namespace,
39+
Command: cmd,
40+
announceAllNodes: announceAllNodes,
41+
hostname: hostname,
3742
}
3843
}
3944

45+
// servicesForNode will emit all the services that we previously discovered.
46+
// This means we will attempt to hit the NodePort for each of the nodes when
47+
// looking for this service over HTTP/TCP.
48+
func (k *K8sAPIDiscoverer) servicesForNode(hostname, ip string) []service.Service {
49+
var services []service.Service
50+
51+
for _, item := range k.discoveredSvcs.Items {
52+
// We require an annotation called 'ServiceName' to make sure this is
53+
// a service we want to announce.
54+
if item.Metadata.Labels.ServiceName == "" {
55+
continue
56+
}
57+
58+
svc := service.Service{
59+
ID: item.Metadata.UID,
60+
Name: item.Metadata.Labels.ServiceName,
61+
Image: item.Metadata.Labels.ServiceName + ":kubernetes-hosted",
62+
Created: item.Metadata.CreationTimestamp,
63+
Hostname: hostname,
64+
ProxyMode: "http",
65+
Status: service.ALIVE,
66+
Updated: time.Now().UTC(),
67+
}
68+
69+
for _, port := range item.Spec.Ports {
70+
// We only support entries with NodePort defined
71+
if port.NodePort < 1 {
72+
continue
73+
}
74+
svc.Ports = append(svc.Ports, service.Port{
75+
Type: "tcp",
76+
Port: int64(port.NodePort),
77+
ServicePort: int64(port.Port),
78+
IP: ip,
79+
})
80+
}
81+
services = append(services, svc)
82+
}
83+
84+
return services
85+
}
86+
4087
// Services implements part of the Discoverer interface and looks at the last
4188
// cached data from the Command (`kubectl`) and returns services in a format
4289
// that Sidecar can manage.
4390
func (k *K8sAPIDiscoverer) Services() []service.Service {
4491
k.lock.RLock()
4592
defer k.lock.RUnlock()
4693

47-
// Enumerate all the K8s nodes we discovered, and for each one, emit all the
48-
// services that we separately discovered. This means we will attempt to hit
49-
// the NodePort for each of the nodes when looking for this service.
94+
// Enumerate all the K8s nodes we discovered, and for each one, enumerate
95+
// all the services.
5096
var services []service.Service
5197
for _, node := range k.discoveredNodes.Items {
5298
hostname, ip := getIPHostForNode(&node)
99+
if k.announceAllNodes {
100+
nodeServices := k.servicesForNode(hostname, ip)
101+
services = append(services, nodeServices...)
102+
continue
103+
}
53104

54-
for _, item := range k.discoveredSvcs.Items {
55-
// We require an annotation called 'ServiceName' to make sure this is
56-
// a service we want to announce.
57-
if item.Metadata.Labels.ServiceName == "" {
58-
continue
59-
}
60-
61-
svc := service.Service{
62-
ID: item.Metadata.UID,
63-
Name: item.Metadata.Labels.ServiceName,
64-
Image: item.Metadata.Labels.ServiceName + ":kubernetes-hosted",
65-
Created: item.Metadata.CreationTimestamp,
66-
Hostname: hostname,
67-
ProxyMode: "http",
68-
Status: service.ALIVE,
69-
Updated: time.Now().UTC(),
70-
}
71-
72-
for _, port := range item.Spec.Ports {
73-
// We only support entries with NodePort defined
74-
if port.NodePort < 1 {
75-
continue
76-
}
77-
svc.Ports = append(svc.Ports, service.Port{
78-
Type: "tcp",
79-
Port: int64(port.NodePort),
80-
ServicePort: int64(port.Port),
81-
IP: ip,
82-
})
83-
}
84-
services = append(services, svc)
105+
// Don't discover all nodes, only this one. Short circuit if we found it
106+
// since we'll only be in the list once.
107+
if hostname == k.hostname {
108+
services = k.servicesForNode(hostname, ip)
109+
break
85110
}
86111
}
87112

discovery/kubernetes_api_discovery_test.go

+59-11
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,24 @@ func (m *mockK8sDiscoveryCommand) GetNodes() ([]byte, error) {
103103
}
104104
]
105105
}
106+
},
107+
{
108+
"status" : {
109+
"addresses" : [
110+
{
111+
"address" : "10.100.69.147",
112+
"type" : "InternalIP"
113+
},
114+
{
115+
"address" : "heorot.example.com",
116+
"type" : "Hostname"
117+
},
118+
{
119+
"address" : "heorot.example.com",
120+
"type" : "InternalDNS"
121+
}
122+
]
123+
}
106124
}
107125
]
108126
}
@@ -113,22 +131,25 @@ func (m *mockK8sDiscoveryCommand) GetNodes() ([]byte, error) {
113131
func Test_NewK8sAPIDiscoverer(t *testing.T) {
114132
Convey("NewK8sAPIDiscoverer()", t, func() {
115133
Convey("returns a properly configured K8sAPIDiscoverer", func() {
116-
disco := NewK8sAPIDiscoverer("127.0.0.1", 443, "heorot", 3*time.Second, credsPath)
134+
disco := NewK8sAPIDiscoverer("127.0.0.1", 443, "heorot", 3*time.Second, credsPath, true, "hrothgar")
117135

118136
So(disco.discoveredSvcs, ShouldNotBeNil)
119137
So(disco.Namespace, ShouldEqual, "heorot")
138+
So(disco.announceAllNodes, ShouldBeTrue)
139+
So(disco.hostname, ShouldEqual, "hrothgar")
120140
So(disco.Command, ShouldNotBeNil)
121141

122142
command := disco.Command.(*KubeAPIDiscoveryCommand)
123143
So(command.KubeHost, ShouldEqual, "127.0.0.1")
124144
So(command.KubePort, ShouldEqual, 443)
145+
125146
})
126147
})
127148
}
128149

129150
func Test_K8sHealthCheck(t *testing.T) {
130151
Convey("HealthCheck() always returns 'AlwaysSuccessful'", t, func() {
131-
disco := NewK8sAPIDiscoverer("127.0.0.1", 443, "heorot", 3*time.Second, credsPath)
152+
disco := NewK8sAPIDiscoverer("127.0.0.1", 443, "heorot", 3*time.Second, credsPath, true, "hrothgar")
132153
check, args := disco.HealthCheck(nil)
133154
So(check, ShouldEqual, "AlwaysSuccessful")
134155
So(args, ShouldBeEmpty)
@@ -137,15 +158,15 @@ func Test_K8sHealthCheck(t *testing.T) {
137158

138159
func Test_K8sListeners(t *testing.T) {
139160
Convey("Listeners() always returns and empty slice", t, func() {
140-
disco := NewK8sAPIDiscoverer("127.0.0.1", 443, "heorot", 3*time.Second, credsPath)
161+
disco := NewK8sAPIDiscoverer("127.0.0.1", 443, "heorot", 3*time.Second, credsPath, true, "hrothgar")
141162
listeners := disco.Listeners()
142163
So(listeners, ShouldBeEmpty)
143164
})
144165
}
145166

146167
func Test_K8sGetServices(t *testing.T) {
147168
Convey("GetServices()", t, func() {
148-
disco := NewK8sAPIDiscoverer("127.0.0.1", 443, "heorot", 3*time.Second, credsPath)
169+
disco := NewK8sAPIDiscoverer("127.0.0.1", 443, "heorot", 3*time.Second, credsPath, true, "hrothgar")
149170
mock := &mockK8sDiscoveryCommand{}
150171
disco.Command = mock
151172

@@ -188,7 +209,7 @@ func Test_K8sGetServices(t *testing.T) {
188209

189210
func Test_K8sGetNodes(t *testing.T) {
190211
Convey("GetNodes()", t, func() {
191-
disco := NewK8sAPIDiscoverer("127.0.0.1", 443, "heorot", 3*time.Second, credsPath)
212+
disco := NewK8sAPIDiscoverer("127.0.0.1", 443, "heorot", 3*time.Second, credsPath, true, "hrothgar")
192213
mock := &mockK8sDiscoveryCommand{}
193214
disco.Command = mock
194215

@@ -203,7 +224,7 @@ func Test_K8sGetNodes(t *testing.T) {
203224
So(capture.String(), ShouldNotContainSubstring, "error")
204225
So(disco.discoveredNodes, ShouldNotBeNil)
205226
So(disco.discoveredNodes, ShouldNotEqual, &K8sNodes{})
206-
So(len(disco.discoveredNodes.Items), ShouldEqual, 1)
227+
So(len(disco.discoveredNodes.Items), ShouldEqual, 2)
207228
So(len(disco.discoveredNodes.Items[0].Status.Addresses), ShouldEqual, 3)
208229
})
209230

@@ -231,16 +252,43 @@ func Test_K8sGetNodes(t *testing.T) {
231252

232253
func Test_K8sServices(t *testing.T) {
233254
Convey("Services()", t, func() {
234-
disco := NewK8sAPIDiscoverer("127.0.0.1", 443, "heorot", 3*time.Second, credsPath)
235255
mock := &mockK8sDiscoveryCommand{}
236-
disco.Command = mock
237256

238257
Convey("works on a newly-created Discoverer", func() {
258+
disco := NewK8sAPIDiscoverer("127.0.0.1", 443, "heorot", 3*time.Second, credsPath, true, "hrothgar")
259+
disco.Command = mock
260+
239261
services := disco.Services()
240262
So(len(services), ShouldEqual, 0)
241263
})
242264

243-
Convey("returns the list of cached services", func() {
265+
Convey("when discovering for all nodes", func() {
266+
disco := NewK8sAPIDiscoverer("127.0.0.1", 443, "heorot", 3*time.Second, credsPath, true, "hrothgar")
267+
disco.Command = mock
268+
269+
Convey("returns the list of cached services", func() {
270+
disco.Run(director.NewFreeLooper(director.ONCE, nil))
271+
services := disco.Services()
272+
273+
So(len(services), ShouldEqual, 2)
274+
svc := services[0]
275+
So(svc.ID, ShouldEqual, "107b5bbf-9640-4fd0-b5de-1e898e8ae9f7")
276+
So(svc.Name, ShouldEqual, "chopper")
277+
So(svc.Image, ShouldEqual, "chopper:kubernetes-hosted")
278+
So(svc.Created.String(), ShouldEqual, "2022-11-07 13:18:03 +0000 UTC")
279+
So(svc.Hostname, ShouldEqual, "beowulf.example.com")
280+
So(svc.ProxyMode, ShouldEqual, "http")
281+
So(svc.Status, ShouldEqual, service.ALIVE)
282+
So(svc.Updated.Unix(), ShouldBeGreaterThan, time.Now().UTC().Add(-2*time.Second).Unix())
283+
So(len(svc.Ports), ShouldEqual, 1)
284+
So(svc.Ports[0].IP, ShouldEqual, "10.100.69.136")
285+
})
286+
})
287+
288+
Convey("when discovering for only this node", func() {
289+
disco := NewK8sAPIDiscoverer("127.0.0.1", 443, "heorot", 3*time.Second, credsPath, false, "heorot.example.com")
290+
disco.Command = mock
291+
244292
disco.Run(director.NewFreeLooper(director.ONCE, nil))
245293
services := disco.Services()
246294

@@ -250,12 +298,12 @@ func Test_K8sServices(t *testing.T) {
250298
So(svc.Name, ShouldEqual, "chopper")
251299
So(svc.Image, ShouldEqual, "chopper:kubernetes-hosted")
252300
So(svc.Created.String(), ShouldEqual, "2022-11-07 13:18:03 +0000 UTC")
253-
So(svc.Hostname, ShouldEqual, "beowulf.example.com")
301+
So(svc.Hostname, ShouldEqual, "heorot.example.com")
254302
So(svc.ProxyMode, ShouldEqual, "http")
255303
So(svc.Status, ShouldEqual, service.ALIVE)
256304
So(svc.Updated.Unix(), ShouldBeGreaterThan, time.Now().UTC().Add(-2*time.Second).Unix())
257305
So(len(svc.Ports), ShouldEqual, 1)
258-
So(svc.Ports[0].IP, ShouldEqual, "10.100.69.136")
306+
So(svc.Ports[0].IP, ShouldEqual, "10.100.69.147")
259307
})
260308
})
261309
}

main.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ func configureHAproxy(config *config.Config) *haproxy.HAproxy {
9191
return proxy
9292
}
9393

94-
func configureDiscovery(config *config.Config, publishedIP string) discovery.Discoverer {
94+
func configureDiscovery(config *config.Config, publishedIP string, localNode *memberlist.Node) discovery.Discoverer {
9595
disco := new(discovery.MultiDiscovery)
9696

9797
var svcNamer discovery.ServiceNamer
@@ -142,7 +142,8 @@ func configureDiscovery(config *config.Config, publishedIP string) discovery.Dis
142142
discovery.NewK8sAPIDiscoverer(
143143
config.K8sAPIDiscovery.KubeAPIIP, config.K8sAPIDiscovery.KubeAPIPort,
144144
config.K8sAPIDiscovery.Namespace, config.K8sAPIDiscovery.KubeTimeout,
145-
config.K8sAPIDiscovery.CredsPath,
145+
config.K8sAPIDiscovery.CredsPath, config.K8sAPIDiscovery.AnnounceAllNodes,
146+
localNode.Name,
146147
),
147148
)
148149
default:
@@ -339,7 +340,7 @@ func main() {
339340
// Register the cluster name with the state object
340341
state.ClusterName = config.Sidecar.ClusterName
341342

342-
disco := configureDiscovery(config, mlConfig.AdvertiseAddr)
343+
disco := configureDiscovery(config, mlConfig.AdvertiseAddr, list.LocalNode())
343344
go disco.Run(discoLooper)
344345

345346
// Configure the monitor and use the public address as the default

0 commit comments

Comments
 (0)