Skip to content
This repository was archived by the owner on Jul 16, 2019. It is now read-only.

Connection refactor #11

Open
wants to merge 4 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 10 additions & 20 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,47 +20,37 @@ var (
dbClient *bolt.Client
)

// Options holds the basic configuration of the http server
// TODO implement reading options from a config file
type Options struct {
Port string `json:"port"`
RWTimeout int64 `json:"timeout"`
Verbose bool `json:"verbose"`
Debug bool `json:"debug"`
Database *bolt.Options `json:"database"`
}

// Configure takes a http.Server and configures it with the specified Options
func Configure(server *http.Server, opts *Options) error {
// setup router
apilog.Println("starting server on :" + opts.Port)
server.Addr = ":" + opts.Port
server.ReadTimeout = time.Duration(opts.RWTimeout)
server.WriteTimeout = time.Duration(opts.RWTimeout)

// TODO can we move this to pkg/session?
dbClient = bolt.NewClient(opts.Database)
if err := dbClient.Open(); err != nil {
apilog.Println("Error opening database:", err)
return err
}
canvas.Init()

// initialize canvas service
canvas.Init(opts.Verbose)

// setup router
routes, err := setupRoutes(opts)
if err != nil {
apilog.Println("Error setting up router:", err)
return err
}

server.Addr = ":" + opts.Port
server.ReadTimeout = time.Duration(opts.RWTimeout)
server.WriteTimeout = time.Duration(opts.RWTimeout)
server.Handler = routes

apilog.Println("starting server on :" + opts.Port)
return nil
}

// Cleanup closes the database client
func Cleanup() error {
// TODO notify websocket clients about server shutdown
// close db
dbClient.Close()
dbClient.Close() // close db client

if r := recover(); r != nil {
err, ok := r.(error)
Expand Down
13 changes: 13 additions & 0 deletions pkg/api/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package api

import "github.com/drawr-team/drawrserver/pkg/bolt"

// Options holds the basic configuration of the http server
// TODO implement reading options from a config file
type Options struct {
Port string `json:"port"`
RWTimeout int64 `json:"timeout"`
Verbose bool `json:"verbose"`
Debug bool `json:"debug"`
Database *bolt.Options `json:"database"`
}
13 changes: 12 additions & 1 deletion pkg/canvas/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,30 @@ import (
var svc canvasService

type canvasService struct {
verbose bool

hubs map[string]*websock.Hub
}

// Init initializes the message service
func Init() {
func Init(verbose bool) {
svc.verbose = verbose
svc.hubs = make(map[string]*websock.Hub)
}

// Close notifies all hubs about the server going offline
func Close() {
for _, h := range svc.hubs {
h.Close()
}
}

// Connect adds a new client connection to the session hub
func Connect(w http.ResponseWriter, r *http.Request, s service.Session) error {
_, ok := svc.hubs[s.ID]
if !ok {
svc.hubs[s.ID] = websock.NewHub()
svc.hubs[s.ID].Verbose = svc.verbose
}

c, err := websock.Upgrade(w, r, w.Header())
Expand Down
8 changes: 2 additions & 6 deletions pkg/websock/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,5 @@
* Hub maps connections to their ID
* 1 hub per session
* read and write channels for every connection
* read and write channel for hub
* connection worker instead of hub worker

## Ideas:

* 1 Hub per server?
* broadcast channel for hub
* connection workers instead of hub worker
102 changes: 60 additions & 42 deletions pkg/websock/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,9 @@ import (

// Connection wraps a messenger connection
type Connection struct {
messenger Messenger
wg *sync.WaitGroup
done bool
ws *websocket.Conn

wg *sync.WaitGroup
// connection message channels
send chan []byte
received chan []byte
Expand All @@ -21,40 +20,33 @@ type Connection struct {

// NewConnection constructs a new Connection from a websocket.Conn
func NewConnection(ws *websocket.Conn) *Connection {
println("NewConnection:", ws.LocalAddr())
c := &Connection{
messenger: WebsocketMessenger{ws},
wg: new(sync.WaitGroup),
done: false,
send: make(chan []byte, 256),
received: make(chan []byte, 256),
ws: ws,
wg: new(sync.WaitGroup),
send: make(chan []byte),
received: make(chan []byte),
}
if ws != nil {
c.Addr = ws.RemoteAddr().String()
} else {
c.Addr = "none"
}
return c
}

// RunWorkers starts the reader and writer in seperate goroutines
// for the connection and returns a sync.WaitGroup
func (c *Connection) RunWorkers() {
go c.Reader()
go c.Writer()
}
go c.reader()
go c.writer()

// Wait blocks until the Read and Write workers finish
func (c *Connection) Wait() {
c.wg.Wait()
}
c.ws.SetCloseHandler(func(code int, text string) error {
switch code {
case websocket.CloseGoingAway:
println("peer going away")
case websocket.CloseNormalClosure:
println("peer closing normally")
}
return nil
})

// StopWorkers sends the done signal to the workers
func (c *Connection) Close() error {
close(c.send)
c.done = true
c.wg.Wait() // wait for Reader to finish
close(c.received)
return c.messenger.Close()
return c
}

// SendChan returns the send channel
Expand All @@ -67,32 +59,58 @@ func (c *Connection) ReceiveChan() chan []byte {
return c.received
}

// Close sends the done signal to the workers
func (c *Connection) Close() error {
println("closing connection", c.Addr)

payload := websocket.FormatCloseMessage(websocket.CloseGoingAway, "server shutting down")
if err := c.ws.WriteMessage(websocket.CloseMessage, payload); err != nil {
return err
}

println("waiting for workers to quit")
close(c.send) // closing send channel causes writer to stop
c.wg.Wait()

println("closing connection was successfull")
return c.ws.Close()
}

// Reader reads a message from the websocket connection
func (c *Connection) Reader() {
func (c *Connection) reader() {
c.wg.Add(1)
defer c.wg.Done()

for !c.done {
message, err := c.messenger.ReadMessage()
for {
t, msg, err := c.ws.ReadMessage()
if err != nil {
if websocket.IsCloseError(err, websocket.CloseGoingAway) {
println("connection to peer closed as expected")
break
}
// TODO handle ReadMessage errors
panic(err)
} else {
c.received <- message
panic("Failed to read message: " + err.Error())
}

switch t {
case websocket.TextMessage:
c.received <- msg
case websocket.BinaryMessage:
panic("cannot handle binary message")
}
}
c.wg.Done()
println("reader ended")
}

// Writer writes a message to the websocket connection
func (c *Connection) Writer() {
// writer writes a message to the websocket connection
func (c *Connection) writer() {
c.wg.Add(1)
defer c.wg.Done()

for message := range c.send {
err := c.messenger.WriteMessage(message)
for msg := range c.send {
err := c.ws.WriteMessage(websocket.TextMessage, msg)
if err != nil {
// TODO handle WriteMessage errors
panic(err)
panic("Failed to write message: " + err.Error())
}
}
c.wg.Done()
println("writer ended")
}
78 changes: 62 additions & 16 deletions pkg/websock/hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"log"
"os"
"sync"

"github.com/gorilla/websocket"
)

var ErrConnectionNotFound = errors.New("Connection not found")
Expand All @@ -18,9 +20,10 @@ type Hub struct {
connectionsMx sync.RWMutex

// TODO hub-wide message channel?
// broadcast chan []byte
broadcast chan []byte

log log.Logger
quit chan chan struct{}
log log.Logger
}

// NewHub creates a new hub
Expand All @@ -29,39 +32,60 @@ func NewHub() *Hub {
Verbose: false,
connections: make(map[string]Connection),
connectionsMx: sync.RWMutex{},
// broadcast: make(chan []byte, 2048),
log: *log.New(os.Stdout, "[websock]", log.LstdFlags),
broadcast: make(chan []byte),
quit: make(chan chan struct{}),
log: *log.New(os.Stdout, "[ws]\t", log.LstdFlags),
}
go h.broadcaster()
return h
}

func (h *Hub) BroadcastChan() chan []byte {
return h.broadcast
}

// Close sends the quit signal to the monitor worker
func (h *Hub) Close() {
h.log.Println("closing hub")
q := make(chan struct{})
h.quit <- q

for cID := range h.connections {
h.RemoveConnection(cID)
}
<-q
}

// AddConnection remembers a connection
func (h *Hub) AddConnection(id string, c Connection) {
h.connectionsMx.Lock()
defer h.connectionsMx.Unlock()

h.connections[id] = c

if h.Verbose {
h.log.Println("new connection:", id)
h.log.Printf("add connection: %v (%v)", id, c.Addr)
}

h.connections[id] = c
}

// RemoveConnection forgets a connection
func (h *Hub) RemoveConnection(id string) {
h.connectionsMx.Lock()
defer h.connectionsMx.Unlock()

if h.Verbose {
h.log.Println("remove connection:", id)
}

if c, ok := h.connections[id]; ok {
c, ok := h.connections[id]
if ok {
if err := c.Close(); err != nil {
panic(err)
panic("Failed to remove connection" + id + " from hub: " + err.Error())
}
delete(h.connections, id)
}

if h.Verbose {
h.log.Printf("remove connection: %v (%v)", id, c.Addr)
}

}

func (h *Hub) GetConnection(id string) (*Connection, error) {
Expand All @@ -72,9 +96,31 @@ func (h *Hub) GetConnection(id string) (*Connection, error) {
return &c, nil
}

// Broadcast sends a message to all connections
func (h *Hub) Broadcast(m []byte) {
for _, conn := range h.connections {
conn.SendChan() <- m
// broadcast sends a message to all connections
func (h *Hub) sendBroadcastMessage(m []byte) error {
for cID, conn := range h.connections {
h.log.Println("notified:", cID)
pm, err := websocket.NewPreparedMessage(websocket.TextMessage, m)
if err != nil {
return err
}
if err := conn.ws.WritePreparedMessage(pm); err != nil {
return err
}
}
return nil
}

func (h *Hub) broadcaster() {
h.log.Println("starting broadcaster worker...")
for {
select {
case msg := <-h.broadcast:
h.log.Println("broadcasting:", string(msg))
h.sendBroadcastMessage(msg)
case q := <-h.quit:
close(q)
return
}
}
}
Loading