Skip to content

Fix retval semantics for Gossiper #12

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 18, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 12 additions & 8 deletions examples/increment-only-counter/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (p *peer) set(key string, value int) (result int) {
c := make(chan struct{})
p.actions <- func() {
defer close(c)
st := newState().completeMerge(map[string]int{key: value})
st := newState().mergeComplete(map[string]int{key: value})
data := p.st.Merge(st)
if p.send != nil {
p.send.GossipBroadcast(st)
Expand Down Expand Up @@ -104,7 +104,7 @@ func (p *peer) OnGossip(buf []byte) (delta mesh.GossipData, err error) {
return nil, err
}

delta = p.st.deltaMerge(set)
delta = p.st.mergeDelta(set)
if delta == nil {
p.logger.Printf("OnGossip %v => delta %v", set, delta)
} else {
Expand All @@ -114,16 +114,20 @@ func (p *peer) OnGossip(buf []byte) (delta mesh.GossipData, err error) {
}

// Merge the gossiped data represented by buf into our state.
// Return our complete resulting state.
func (p *peer) OnGossipBroadcast(src mesh.PeerName, buf []byte) (complete mesh.GossipData, err error) {
// Return the state information that was modified.
func (p *peer) OnGossipBroadcast(src mesh.PeerName, buf []byte) (received mesh.GossipData, err error) {
var set map[string]int
if err := json.Unmarshal(buf, &set); err != nil {
return nil, err
}

complete = p.st.completeMerge(set)
p.logger.Printf("OnGossipBroadcast %s %v => complete %v", src, set, complete.(*state).set)
return complete, nil
received = p.st.mergeReceived(set)
if received == nil {
p.logger.Printf("OnGossipBroadcast %s %v => delta %v", src, set, received)
} else {
p.logger.Printf("OnGossipBroadcast %s %v => delta %v", src, set, received.(*state).set)
}
return received, nil
}

// Merge the gossiped data represented by buf into our state.
Expand All @@ -133,7 +137,7 @@ func (p *peer) OnGossipUnicast(src mesh.PeerName, buf []byte) error {
return err
}

complete := p.st.completeMerge(set)
complete := p.st.mergeComplete(set)
p.logger.Printf("OnGossipUnicast %s %v => complete %v", src, set, complete)
return nil
}
10 changes: 5 additions & 5 deletions examples/increment-only-counter/peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestPeerOnGossip(t *testing.T) {
},
} {
p := newPeer(log.New(ioutil.Discard, "", 0))
p.st.completeMerge(testcase.initial)
p.st.mergeComplete(testcase.initial)
buf, _ := json.Marshal(testcase.msg)
delta, err := p.OnGossip(buf)
if err != nil {
Expand Down Expand Up @@ -66,16 +66,16 @@ func TestPeerOnGossipBroadcast(t *testing.T) {
{
map[string]int{"a": 1},
map[string]int{"a": 0, "b": 2},
map[string]int{"a": 1, "b": 2},
map[string]int{"b": 2},
},
{
map[string]int{"a": 9},
map[string]int{"a": 8},
map[string]int{"a": 9},
map[string]int{}, // OnGossipBroadcast returns received, which should never be nil
},
} {
p := newPeer(log.New(ioutil.Discard, "", 0))
p.st.completeMerge(testcase.initial)
p.st.mergeComplete(testcase.initial)
buf, _ := json.Marshal(testcase.msg)
delta, err := p.OnGossipBroadcast(mesh.UnknownPeerName, buf)
if err != nil {
Expand Down Expand Up @@ -111,7 +111,7 @@ func TestPeerOnGossipUnicast(t *testing.T) {
},
} {
p := newPeer(log.New(ioutil.Discard, "", 0))
p.st.completeMerge(testcase.initial)
p.st.mergeComplete(testcase.initial)
buf, _ := json.Marshal(testcase.msg)
if err := p.OnGossipUnicast(mesh.UnknownPeerName, buf); err != nil {
t.Errorf("%v OnGossipBroadcast %v: %v", testcase.initial, testcase.msg, err)
Expand Down
27 changes: 23 additions & 4 deletions examples/increment-only-counter/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (st *state) Encode() [][]byte {
// Merge merges the other GossipData into this one,
// and returns our resulting, complete state.
func (st *state) Merge(other mesh.GossipData) (complete mesh.GossipData) {
return st.completeMerge(other.(*state).copy().set)
return st.mergeComplete(other.(*state).copy().set)
}

func (st *state) copy() *state {
Expand All @@ -51,15 +51,34 @@ func (st *state) copy() *state {
}
}

// Merge the set into our state, abiding increment-only semantics.
// Return a non-nil mesh.GossipData representation of the received set.
func (st *state) mergeReceived(set map[string]int) (received mesh.GossipData) {
st.mtx.Lock()
defer st.mtx.Unlock()

for k, v := range set {
if v <= st.set[k] {
delete(set, k) // optimization: make the forwarded data smaller
continue
}
st.set[k] = v
}

return &state{
set: set, // all remaining elements were novel to us
}
}

// Merge the set into our state, abiding increment-only semantics.
// Return any key/values that have been mutated, or nil if nothing changed.
func (st *state) deltaMerge(set map[string]int) (delta mesh.GossipData) {
func (st *state) mergeDelta(set map[string]int) (delta mesh.GossipData) {
st.mtx.Lock()
defer st.mtx.Unlock()

for k, v := range set {
if v <= st.set[k] {
delete(set, k)
delete(set, k) // requirement: it's not part of a delta
continue
}
st.set[k] = v
Expand All @@ -75,7 +94,7 @@ func (st *state) deltaMerge(set map[string]int) (delta mesh.GossipData) {

// Merge the set into our state, abiding increment-only semantics.
// Return our resulting, complete state.
func (st *state) completeMerge(set map[string]int) (complete mesh.GossipData) {
func (st *state) mergeComplete(set map[string]int) (complete mesh.GossipData) {
st.mtx.Lock()
defer st.mtx.Unlock()

Expand Down
62 changes: 49 additions & 13 deletions examples/increment-only-counter/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestStateConvergence(t *testing.T) {
var debug bytes.Buffer

merge := func(index int) {
st.Merge(newState().completeMerge(ops[index]))
st.Merge(newState().mergeComplete(ops[index]))
fmt.Fprintf(&debug, "Merge %v\n", ops[index])
}

Expand All @@ -67,7 +67,7 @@ func TestStateConvergence(t *testing.T) {
}
}

func TestStateCompleteMerge(t *testing.T) {
func TestStateMergeReceived(t *testing.T) {
for _, testcase := range []struct {
initial map[string]int
merge map[string]int
Expand All @@ -81,27 +81,28 @@ func TestStateCompleteMerge(t *testing.T) {
{
map[string]int{"a": 1, "b": 2},
map[string]int{"a": 1, "b": 2},
map[string]int{"a": 1, "b": 2},
map[string]int{},
},
{
map[string]int{"a": 1, "b": 2},
map[string]int{"c": 3},
map[string]int{"a": 1, "b": 2, "c": 3},
map[string]int{"c": 3},
},
{
map[string]int{"b": 3},
map[string]int{"a": 1, "b": 2},
map[string]int{"a": 0, "b": 3},
map[string]int{"a": 1, "b": 3},
map[string]int{"a": 1}, // we drop keys that don't semantically merge
},
} {
st := newState().completeMerge(testcase.initial).(*state).completeMerge(testcase.merge).(*state)
if want, have := testcase.want, st.set; !reflect.DeepEqual(want, have) {
t.Errorf("%v completeMerge %v: want %v, have %v", testcase.initial, testcase.merge, want, have)
initial, merge := testcase.initial, testcase.merge // mergeReceived modifies arguments
delta := newState().mergeComplete(initial).(*state).mergeReceived(merge)
if want, have := testcase.want, delta.(*state).set; !reflect.DeepEqual(want, have) {
t.Errorf("%v mergeReceived %v: want %v, have %v", testcase.initial, testcase.merge, want, have)
}
}
}

func TestStateDeltaMerge(t *testing.T) {
func TestStateMergeDelta(t *testing.T) {
for _, testcase := range []struct {
initial map[string]int
merge map[string]int
Expand All @@ -128,15 +129,50 @@ func TestStateDeltaMerge(t *testing.T) {
map[string]int{"b": 3},
},
} {
delta := newState().completeMerge(testcase.initial).(*state).deltaMerge(testcase.merge)
initial, merge := testcase.initial, testcase.merge // mergeDelta modifies arguments
delta := newState().mergeComplete(initial).(*state).mergeDelta(merge)
if want := testcase.want; want == nil {
if delta != nil {
t.Errorf("%v deltaMerge %v: want nil, have non-nil", testcase.initial, testcase.merge)
t.Errorf("%v mergeDelta %v: want nil, have non-nil", testcase.initial, testcase.merge)
}
} else {
if have := delta.(*state).set; !reflect.DeepEqual(want, have) {
t.Errorf("%v deltaMerge %v: want %v, have %v", testcase.initial, testcase.merge, want, have)
t.Errorf("%v mergeDelta %v: want %v, have %v", testcase.initial, testcase.merge, want, have)
}
}
}
}

func TestStateMergeComplete(t *testing.T) {
for _, testcase := range []struct {
initial map[string]int
merge map[string]int
want map[string]int
}{
{
map[string]int{},
map[string]int{"a": 1, "b": 2},
map[string]int{"a": 1, "b": 2},
},
{
map[string]int{"a": 1, "b": 2},
map[string]int{"a": 1, "b": 2},
map[string]int{"a": 1, "b": 2},
},
{
map[string]int{"a": 1, "b": 2},
map[string]int{"c": 3},
map[string]int{"a": 1, "b": 2, "c": 3},
},
{
map[string]int{"a": 1, "b": 2},
map[string]int{"a": 0, "b": 3},
map[string]int{"a": 1, "b": 3},
},
} {
st := newState().mergeComplete(testcase.initial).(*state).mergeComplete(testcase.merge).(*state)
if want, have := testcase.want, st.set; !reflect.DeepEqual(want, have) {
t.Errorf("%v mergeComplete %v: want %v, have %v", testcase.initial, testcase.merge, want, have)
}
}
}
17 changes: 13 additions & 4 deletions gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,41 +3,49 @@ package mesh
import "sync"

// Gossip is the sending interface.
//
// TODO(pb): rename to e.g. Sender
type Gossip interface {
// GossipUnicast emits a single message to a peer in the mesh.
//
// TODO(pb): rename to Unicast?
//
// Unicast takes []byte instead of GossipData because "to date there has
// been no compelling reason [in practice] to do merging on unicast."
// But there may be some motivation to have unicast Mergeable; see
// https://github.com/weaveworks/weave/issues/1764
//
// TODO(pb): for uniformity of interface, rather take GossipData?
GossipUnicast(dst PeerName, msg []byte) error

// GossipBroadcast emits a message to all peers in the mesh.
//
// TODO(pb): rename to Broadcast?
GossipBroadcast(update GossipData)
}

// Gossiper is the receiving interface.
//
// TODO(pb): rename to e.g. Receiver
type Gossiper interface {
// OnGossipUnicast merges received data into state.
//
// TODO(pb): rename to e.g. OnUnicast
OnGossipUnicast(src PeerName, msg []byte) error

// OnGossipBroadcast merges received data into state and returns a
// representation of the received data, for further propagation.
// representation of the received data (typically a delta) for further
// propagation.
//
// TODO(pb): rename to e.g. OnBroadcast
OnGossipBroadcast(src PeerName, update []byte) (GossipData, error)
OnGossipBroadcast(src PeerName, update []byte) (received GossipData, err error)

// Gossip returns the state of everything we know; gets called periodically.
Gossip() GossipData
Gossip() (complete GossipData)

// OnGossip merges received data into state and returns "everything new
// I've just learnt", or nil if nothing in the received data was new.
OnGossip(msg []byte) (GossipData, error)
OnGossip(msg []byte) (delta GossipData, err error)
}

// GossipData is a merge-able dataset.
Expand All @@ -47,6 +55,7 @@ type GossipData interface {
Encode() [][]byte

// Merge combines another GossipData into this one and returns the result.
//
// TODO(pb): does it need to be leave the original unmodified?
Merge(GossipData) GossipData
}
Expand Down
8 changes: 8 additions & 0 deletions peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@ type peerSummary struct {
HasShortID bool
}

// PeerDescription collects information about peers that is useful to clients.
type PeerDescription struct {
Name PeerName
NickName string
Self bool
NumConnections int
}

type connectionSet map[Connection]struct{}

func newPeerFromSummary(summary peerSummary) *Peer {
Expand Down
16 changes: 16 additions & 0 deletions peers.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,22 @@ func newPeers(ourself *localPeer) *Peers {
return peers
}

// Descriptions returns descriptions for all known peers.
func (peers *Peers) Descriptions() []PeerDescription {
peers.RLock()
defer peers.RUnlock()
descriptions := make([]PeerDescription, 0, len(peers.byName))
for _, peer := range peers.byName {
descriptions = append(descriptions, PeerDescription{
Name: peer.Name,
NickName: peer.peerSummary.NickName,
Self: peer.Name == peers.ourself.Name,
NumConnections: len(peer.connections),
})
}
return descriptions
}

This comment was marked as abuse.

This comment was marked as abuse.

// OnGC adds a new function to be set of functions that will be executed on
// all subsequent GC runs, receiving the GC'd peer.
func (peers *Peers) OnGC(callback func(*Peer)) {
Expand Down