Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(networking): prune peers from same ip beyond colocation limit #1765

Merged
merged 3 commits into from
May 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions tests/v2/test_peer_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -727,3 +727,40 @@ procSuite "Peer Manager":
.build(),
maxFailedAttempts = 5,
storage = nil)

asyncTest "colocationLimit is enforced by pruneConnsByIp()":
# Create 5 nodes
let nodes = toSeq(0..<5).mapIt(newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)))

# Start them with relay + filter
await allFutures(nodes.mapIt(it.start()))
await allFutures(nodes.mapIt(it.mountRelay()))

let pInfos = nodes.mapIt(it.switch.peerInfo.toRemotePeerInfo())

# 2 in connections
discard await nodes[1].peerManager.connectRelay(pInfos[0])
discard await nodes[2].peerManager.connectRelay(pInfos[0])

# 2 out connections
discard await nodes[0].peerManager.connectRelay(pInfos[3])
discard await nodes[0].peerManager.connectRelay(pInfos[4])

# force max 1 conn per ip
nodes[0].peerManager.colocationLimit = 1
nodes[0].peerManager.updateIpTable()

# table is updated and we have 4 conns (2in 2out)
check:
nodes[0].peerManager.ipTable["127.0.0.1"].len == 4
nodes[0].peerManager.switch.connManager.getConnections().len == 4
nodes[0].peerManager.peerStore.peers().len == 4

await nodes[0].peerManager.pruneConnsByIp()

# peers are pruned, max 1 conn per ip
nodes[0].peerManager.updateIpTable()
check:
nodes[0].peerManager.ipTable["127.0.0.1"].len == 1
nodes[0].peerManager.switch.connManager.getConnections().len == 1
nodes[0].peerManager.peerStore.peers().len == 1
109 changes: 78 additions & 31 deletions waku/v2/node/peer_manager/peer_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ import
chronicles,
metrics,
libp2p/multistream,
libp2p/muxers/muxer
libp2p/muxers/muxer,
libp2p/nameresolving/nameresolver
import
../../../common/nimchronos,
../../waku_core,
Expand Down Expand Up @@ -54,11 +55,14 @@ const
# How often the peer store is pruned
PrunePeerStoreInterval = chronos.minutes(5)

# How often the peer store is updated with metrics
UpdateMetricsInterval = chronos.seconds(15)
# How often metrics and logs are shown/updated
LogAndMetricsInterval = chronos.seconds(60)

# How often to log peer manager metrics
LogSummaryInterval = chronos.seconds(60)
# Prune by ip interval
PruneByIpInterval = chronos.seconds(30)

# Max peers that we allow from the same IP
ColocationLimit = 5

type
PeerManager* = ref object of RootObj
Expand All @@ -70,6 +74,8 @@ type
storage: PeerStorage
serviceSlots*: Table[string, RemotePeerInfo]
outPeersTarget*: int
ipTable*: Table[string, seq[PeerId]]
colocationLimit*: int
started: bool

proc protocolMatcher*(codec: string): Matcher =
Expand Down Expand Up @@ -278,44 +284,60 @@ proc canBeConnected*(pm: PeerManager,
# Initialisation #
##################

# currently disabled. note that peer connection state connected/disconnected
# cant be tracked using this handler when more than one conn is allowed and
# when using autonat. eg if a peer has 2 conns and one is disconnected we cant
# assume that the peer is disconnected, because the other one might still be active.
# note that even with maxconn = 1, autonat forces more than one connection.
# called when a connection i) is created or ii) is closed
proc onConnEvent(pm: PeerManager, peerId: PeerID, event: ConnEvent) {.async.} =

case event.kind
of ConnEventKind.Connected:
let direction = if event.incoming: Inbound else: Outbound
discard
of ConnEventKind.Disconnected:
discard

# called when a peer i) first connects to us ii) disconnects all connections from us
proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} =
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor refactor to avoid some duplicated code. no funcionality change.

var direction: PeerDirection
var connectedness: Connectedness

if event.kind == PeerEventKind.Joined:
let direction = if event.initiator: Outbound else: Inbound
pm.peerStore[ConnectionBook][peerId] = Connected
pm.peerStore[DirectionBook][peerId] = direction
direction = if event.initiator: Outbound else: Inbound
connectedness = Connected
elif event.kind == PeerEventKind.Left:
direction = UnknownDirection
connectedness = CanConnect

if not pm.storage.isNil:
pm.storage.insertOrReplace(peerId, pm.peerStore.get(peerId), Connected)
return
pm.peerStore[ConnectionBook][peerId] = connectedness
pm.peerStore[DirectionBook][peerId] = direction
if not pm.storage.isNil:
pm.storage.insertOrReplace(peerId, pm.peerStore.get(peerId), connectedness, getTime().toUnix)

elif event.kind == PeerEventKind.Left:
pm.peerStore[DirectionBook][peerId] = UnknownDirection
pm.peerStore[ConnectionBook][peerId] = CanConnect
proc updateIpTable*(pm: PeerManager) =
# clean table
pm.ipTable = initTable[string, seq[PeerId]]()

# populate ip->peerIds from existing out/in connections
for peerId, conn in pm.switch.connManager.getConnections():
if conn.len == 0:
continue

# we may want to enable it only in inbound peers
#if conn[0].connection.transportDir != In:
# continue

# assumes just one physical connection per peer
let observedAddr = conn[0].connection.observedAddr
if observedAddr.isSome:
# TODO: think if circuit relay ips should be handled differently
let ip = observedAddr.get.getHostname()
pm.ipTable.mgetOrPut(ip, newSeq[PeerId]()).add(peerId)

if not pm.storage.isNil:
pm.storage.insertOrReplace(peerId, pm.peerStore.get(peerId), CanConnect, getTime().toUnix)
return

proc new*(T: type PeerManager,
switch: Switch,
storage: PeerStorage = nil,
initialBackoffInSec = InitialBackoffInSec,
backoffFactor = BackoffFactor,
maxFailedAttempts = MaxFailedAttempts,): PeerManager =
maxFailedAttempts = MaxFailedAttempts,
colocationLimit = ColocationLimit,): PeerManager =

let capacity = switch.peerStore.capacity
let maxConnections = switch.connManager.inSema.size
Expand All @@ -338,7 +360,8 @@ proc new*(T: type PeerManager,
initialBackoffInSec: initialBackoffInSec,
backoffFactor: backoffFactor,
outPeersTarget: max(maxConnections div 10, 10),
maxFailedAttempts: maxFailedAttempts)
maxFailedAttempts: maxFailedAttempts,
colocationLimit: colocationLimit)

proc connHook(peerId: PeerID, event: ConnEvent): Future[void] {.gcsafe.} =
onConnEvent(pm, peerId, event)
Expand All @@ -360,6 +383,7 @@ proc new*(T: type PeerManager,
pm.peerStore[AddressBook].addHandler(peerStoreChanged)

pm.serviceSlots = initTable[string, RemotePeerInfo]()
pm.ipTable = initTable[string, seq[PeerId]]()

if not storage.isNil():
debug "found persistent peer storage"
Expand Down Expand Up @@ -520,6 +544,23 @@ proc pruneInRelayConns(pm: PeerManager, amount: int) {.async.} =
for p in inRelayPeers[0..<connsToPrune]:
await pm.switch.disconnect(p)

proc pruneConnsByIp*(pm: PeerManager) {.async.} =
## prunes connections based on ip colocation, allowing no more
## than ColocationLimit inbound connections from same ip
##

# update the table tracking ip and the connected peers
pm.updateIpTable()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mmm. This rebuilds the IP table every time, so I don't see any reason to cache the ipTable on the pm object. In other words, why not have a getPeersPerIp that simply creates and populates such a table every time?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup, but in the future I plan to use this table for other things. still unclear but some ideas:

  • intercept a connection. before even allowing a connection to go through, check if colocation limit is ok.
  • discovery. if we discover a peer violating the colocation limit, do not add it to the peer store.
  • out connections. dont start connections to peers violating the colocation limit.

so would like to leave it if its fine.


# trigger disconnections based on colocationLimit
for ip, peersInIp in pm.ipTable.pairs:
if peersInIp.len > pm.colocationLimit:
let connsToPrune = peersInIp.len - pm.colocationLimit
for peerId in peersInIp[0..<connsToPrune]:
debug "Pruning connection due to ip colocation", peerId = peerId, ip = ip
await pm.switch.disconnect(peerId)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be obvious, but presumably this removes the peer from all peer books (including our own?) in the peer store as well so that we don't attempt an outgoing connection to it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually it wasnt obvious, good catch, the peer was not being removed. added f9c25b4 removing + test case.

right now prunning is only for in connections, which is the attack this PR aims to mitigate. but yep i might implement it later on for out connections.

pm.peerStore.delete(peerId)

proc connectToRelayPeers*(pm: PeerManager) {.async.} =
let (inRelayPeers, outRelayPeers) = pm.connectedPeers(WakuRelayCodec)
let maxConnections = pm.switch.connManager.inSema.size
Expand Down Expand Up @@ -603,6 +644,12 @@ proc selectPeer*(pm: PeerManager, proto: string): Option[RemotePeerInfo] =
debug "No peer found for protocol", protocol=proto
return none(RemotePeerInfo)

proc pruneConnsByIpLoop(pm: PeerManager) {.async.} =
debug "Starting prune peer by ip loop"
while pm.started:
await pm.pruneConnsByIp()
await sleepAsync(PruneByIpInterval)

# Prunes peers from peerstore to remove old/stale ones
proc prunePeerStoreLoop(pm: PeerManager) {.async.} =
debug "Starting prune peerstore loop"
Expand All @@ -617,8 +664,9 @@ proc relayConnectivityLoop*(pm: PeerManager) {.async.} =
await pm.connectToRelayPeers()
await sleepAsync(ConnectivityLoopInterval)

proc logSummary*(pm: PeerManager) {.async.} =
heartbeat "Log peer manager summary", LogSummaryInterval:
proc logAndMetrics(pm: PeerManager) {.async.} =
heartbeat "Scheduling log and metrics run", LogAndMetricsInterval:
# log metrics
let (inRelayPeers, outRelayPeers) = pm.connectedPeers(WakuRelayCodec)
let maxConnections = pm.switch.connManager.inSema.size
let totalRelayPeers = inRelayPeers.len + outRelayPeers.len
Expand All @@ -634,8 +682,7 @@ proc logSummary*(pm: PeerManager) {.async.} =
notConnectedPeers = notConnectedPeers.len,
outsideBackoffPeers = outsideBackoffPeers.len

proc updateMetrics(pm: PeerManager) {.async.} =
heartbeat "Scheduling updateMetrics run", UpdateMetricsInterval:
# update prometheus metrics
for proto in pm.peerStore.getWakuProtos():
let (protoConnsIn, protoConnsOut) = pm.connectedPeers(proto)
let (protoStreamsIn, protoStreamsOut) = pm.getNumStreams(proto)
Expand All @@ -646,10 +693,10 @@ proc updateMetrics(pm: PeerManager) {.async.} =

proc start*(pm: PeerManager) =
pm.started = true
asyncSpawn pm.updateMetrics()
asyncSpawn pm.relayConnectivityLoop()
asyncSpawn pm.prunePeerStoreLoop()
asyncSpawn pm.logSummary()
asyncSpawn pm.pruneConnsByIpLoop()
asyncSpawn pm.logAndMetrics()

proc stop*(pm: PeerManager) =
pm.started = false