Skip to content

Commit 115862e

Browse files
committed
First pass: simplest thing that could possibly work.
0 parents  commit 115862e

File tree

6 files changed

+380
-0
lines changed

6 files changed

+380
-0
lines changed

LICENSE

+19
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
Copyright (c) 2015 New Relic, Inc.
2+
3+
Permission is hereby granted, free of charge, to any person obtaining a copy
4+
of this software and associated documentation files (the "Software"), to deal
5+
in the Software without restriction, including without limitation the rights
6+
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7+
copies of the Software, and to permit persons to whom the Software is
8+
furnished to do so, subject to the following conditions:
9+
10+
The above copyright notice and this permission notice shall be included in
11+
all copies or substantial portions of the Software.
12+
13+
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14+
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15+
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16+
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17+
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18+
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19+
THE SOFTWARE.

bosun.go

+199
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
package main
2+
3+
import (
4+
"fmt"
5+
"log"
6+
"sync"
7+
"time"
8+
9+
"github.com/hashicorp/memberlist"
10+
)
11+
12+
var broadcasts chan [][]byte
13+
var servicesState map[string]*Server
14+
15+
type Server struct {
16+
Name string
17+
Services map[string]*ServiceContainer
18+
LastUpdated time.Time
19+
}
20+
21+
func (p *Server) Init(name string) {
22+
p.Name = ""
23+
// Pre-create for 5 services per host
24+
p.Services = make(map[string]*ServiceContainer, 5)
25+
p.LastUpdated = time.Unix(0, 0)
26+
}
27+
28+
type servicesDelegate struct {}
29+
30+
func (d *servicesDelegate) NodeMeta(limit int) []byte {
31+
fmt.Printf("NodeMeta(): %d\n", limit)
32+
return []byte(`{ "State": "Running" }`)
33+
}
34+
35+
func (d *servicesDelegate) NotifyMsg(message []byte) {
36+
if len(message) < 1 {
37+
fmt.Println("NotifyMsg(): empty")
38+
return
39+
}
40+
41+
fmt.Printf("NotifyMsg(): %s\n", string(message))
42+
43+
// TODO don't just send container structs, send message structs
44+
data := Decode(message)
45+
if data == nil {
46+
fmt.Printf("NotifyMsg(): error decoding!\n")
47+
return
48+
}
49+
50+
addServiceEntry(*data)
51+
}
52+
53+
func (d *servicesDelegate) GetBroadcasts(overhead, limit int) [][]byte {
54+
fmt.Printf("GetBroadcasts(): %d %d\n", overhead, limit)
55+
56+
select {
57+
case broadcast := <-broadcasts:
58+
println("Sending broadcast")
59+
return broadcast
60+
default:
61+
return nil
62+
}
63+
}
64+
65+
func (d *servicesDelegate) LocalState(join bool) []byte {
66+
fmt.Printf("LocalState(): %b\n", join)
67+
return []byte("some state")
68+
}
69+
70+
func (d *servicesDelegate) MergeRemoteState(buf []byte, join bool) {
71+
fmt.Printf("MergeRemoteState(): %s %b\n", string(buf), join)
72+
}
73+
74+
func updateState() {
75+
for ;; {
76+
containerList := containers()
77+
prepared := make([][]byte, len(containerList))
78+
79+
for _, container := range containerList {
80+
addServiceEntry(container)
81+
encoded, err := container.Encode()
82+
if err != nil {
83+
log.Printf("ERROR encoding container: (%s)", err.Error())
84+
continue
85+
}
86+
87+
prepared = append(prepared, encoded)
88+
}
89+
broadcasts <- prepared
90+
91+
time.Sleep(2 * time.Second)
92+
}
93+
}
94+
95+
func updateMetaData(list *memberlist.Memberlist, metaUpdates chan []byte) {
96+
for ;; {
97+
list.LocalNode().Meta = <-metaUpdates // Blocking
98+
fmt.Printf("Got update: %s\n", string(list.LocalNode().Meta))
99+
err := list.UpdateNode(10 * time.Second)
100+
if err != nil {
101+
fmt.Printf("Error pushing node update!")
102+
}
103+
}
104+
}
105+
106+
func announceMembers(list *memberlist.Memberlist) {
107+
for ;; {
108+
// Ask for members of the cluster
109+
for _, member := range list.Members() {
110+
fmt.Printf("Member: %s %s\n", member.Name, member.Addr)
111+
fmt.Printf(" Meta:\n %s\n", string(member.Meta))
112+
}
113+
114+
printServices(list);
115+
116+
time.Sleep(2 * time.Second)
117+
}
118+
}
119+
120+
func formatServices(list *memberlist.Memberlist) string {
121+
var output string
122+
123+
output += "Services ------------------------------\n"
124+
for hostname, server := range servicesState {
125+
output += fmt.Sprintf(" %s: (%s)\n", hostname, server.LastUpdated.String())
126+
for _, service := range server.Services {
127+
output += fmt.Sprintf(" %s %-20s %-30s %s\n",
128+
service.ID,
129+
service.Name,
130+
service.Image,
131+
service.Created,
132+
)
133+
}
134+
output += "\n"
135+
}
136+
137+
output += "\nCluster Hosts -------------------------\n"
138+
for _, host := range list.Members() {
139+
output += fmt.Sprintf(" %s\n", host.Name)
140+
}
141+
142+
output += "---------------------------------------"
143+
144+
return output
145+
}
146+
147+
func printServices(list *memberlist.Memberlist) {
148+
println(formatServices(list))
149+
}
150+
151+
func addServiceEntry(data ServiceContainer) {
152+
// Lazily create the maps
153+
if servicesState == nil {
154+
// Pre-create for 5 hosts
155+
servicesState = make(map[string]*Server, 5)
156+
}
157+
if servicesState[data.Hostname] == nil {
158+
var server Server
159+
server.Init(data.Hostname)
160+
servicesState[data.Hostname] = &server
161+
}
162+
163+
servicesState[data.Hostname].Services[data.ID] = &data
164+
// TODO Use the message time!
165+
servicesState[data.Hostname].LastUpdated = time.Now().UTC()
166+
}
167+
168+
func main() {
169+
opts := parseCommandLine()
170+
171+
var delegate servicesDelegate
172+
173+
broadcasts = make(chan [][]byte)
174+
175+
config := memberlist.DefaultLANConfig()
176+
config.Delegate = &delegate
177+
178+
list, err := memberlist.Create(config)
179+
exitWithError(err, "Failed to create memberlist")
180+
181+
// Join an existing cluster by specifying at least one known member.
182+
_, err = list.Join([]string{ opts.ClusterIP })
183+
exitWithError(err, "Failed to join cluster")
184+
185+
metaUpdates := make(chan []byte)
186+
var wg sync.WaitGroup
187+
wg.Add(1)
188+
189+
go announceMembers(list)
190+
go updateState()
191+
go updateMetaData(list, metaUpdates)
192+
193+
serveHttp(list)
194+
195+
time.Sleep(4 * time.Second)
196+
metaUpdates <-[]byte("A message!")
197+
198+
wg.Wait() // forever... nothing will decrement the wg
199+
}

cli.go

+30
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package main
2+
3+
import (
4+
"errors"
5+
"flag"
6+
"log"
7+
)
8+
9+
type CliOpts struct {
10+
ClusterIP string
11+
}
12+
13+
func exitWithError(err error, message string) {
14+
if err != nil {
15+
log.Fatalf("%s (%s)", message, err.Error())
16+
}
17+
}
18+
19+
func parseCommandLine() *CliOpts {
20+
var opts CliOpts
21+
22+
flag.StringVar(&opts.ClusterIP, "cluster-ip", "", "The initial cluster bootstrap IP")
23+
24+
flag.Parse()
25+
if opts.ClusterIP == "" {
26+
exitWithError(errors.New(""), "-cluster-ip is required!")
27+
}
28+
29+
return &opts
30+
}

docker.go

+60
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
package main
2+
3+
import (
4+
"encoding/json"
5+
"os"
6+
"time"
7+
8+
"github.com/fsouza/go-dockerclient"
9+
)
10+
11+
type ServiceContainer struct {
12+
ID string
13+
Name string
14+
Image string
15+
Created time.Time
16+
Hostname string
17+
}
18+
19+
func (container ServiceContainer) Encode() ([]byte, error) {
20+
return json.Marshal(container)
21+
}
22+
23+
func Decode(data []byte) *ServiceContainer {
24+
var container ServiceContainer
25+
json.Unmarshal(data, &container)
26+
27+
return &container
28+
}
29+
30+
// Format an APIContainers struct into a more compact struct we
31+
// can ship over the wire in a broadcast.
32+
func toServiceContainer(container docker.APIContainers) ServiceContainer {
33+
var svcContainer ServiceContainer
34+
hostname, _ := os.Hostname()
35+
36+
svcContainer.ID = container.ID[0:7] // Use short IDs
37+
svcContainer.Name = container.Names[0] // Use the first name
38+
svcContainer.Image = container.Image
39+
svcContainer.Created = time.Unix(container.Created, 0)
40+
svcContainer.Hostname = hostname
41+
42+
return svcContainer
43+
}
44+
45+
func containers() []ServiceContainer {
46+
endpoint := "tcp://localhost:2375"
47+
client, _ := docker.NewClient(endpoint)
48+
containers, err := client.ListContainers(docker.ListContainersOptions{All: false})
49+
if err != nil {
50+
return nil
51+
}
52+
53+
var containerList []ServiceContainer
54+
55+
for _, container := range containers {
56+
containerList = append(containerList, toServiceContainer(container))
57+
}
58+
59+
return containerList
60+
}

http.go

+42
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package main
2+
3+
import (
4+
"net/http"
5+
6+
"github.com/gorilla/mux"
7+
"github.com/hashicorp/memberlist"
8+
)
9+
10+
11+
func makeQueryHandler(fn func (http.ResponseWriter, *http.Request, *memberlist.Memberlist), list *memberlist.Memberlist) http.HandlerFunc {
12+
return func(response http.ResponseWriter, req *http.Request) {
13+
fn(response, req, list)
14+
}
15+
}
16+
17+
func servicesQueryHandler(response http.ResponseWriter, req *http.Request, list *memberlist.Memberlist) {
18+
//params := mux.Vars(req)
19+
20+
defer req.Body.Close()
21+
22+
response.Header().Set("Content-Type", "text/html")
23+
response.Write(
24+
[]byte(`
25+
<head>
26+
<meta http-equiv="refresh" content="4">
27+
</head>
28+
<pre>` + formatServices(list) + "</pre>"))
29+
}
30+
31+
func serveHttp(list *memberlist.Memberlist) {
32+
router := mux.NewRouter()
33+
34+
router.HandleFunc(
35+
"/services", makeQueryHandler(servicesQueryHandler, list),
36+
).Methods("GET")
37+
38+
http.Handle("/", router)
39+
40+
err := http.ListenAndServe("0.0.0.0:7777", nil)
41+
exitWithError(err, "Can't start HTTP server")
42+
}

printing_delegate.go

+30
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package main
2+
3+
import (
4+
"fmt"
5+
)
6+
7+
type printingDelegate struct {}
8+
9+
func (d *printingDelegate) NodeMeta(limit int) []byte {
10+
fmt.Printf("NodeMeta(): %d\n", limit)
11+
return []byte(`{ "State": "Running" }`)
12+
}
13+
14+
func (d *printingDelegate) NotifyMsg(message []byte) {
15+
fmt.Printf("NotifyMsg(): %s\n", string(message))
16+
}
17+
18+
func (d *printingDelegate) GetBroadcasts(overhead, limit int) [][]byte {
19+
fmt.Printf("GetBroadcasts(): %d %d\n", overhead, limit)
20+
return nil
21+
}
22+
23+
func (d *printingDelegate) LocalState(join bool) []byte {
24+
fmt.Printf("LocalState(): %b\n", join)
25+
return []byte("Some state")
26+
}
27+
28+
func (d *printingDelegate) MergeRemoteState(buf []byte, join bool) {
29+
fmt.Printf("MergeRemoteState(): %s %b\n", string(buf), join)
30+
}

0 commit comments

Comments
 (0)