Skip to content

Commit b62c88b

Browse files
authored
Enhance P2P Layer: Stream Removal Tracking and Cooldown Handling #4853
2 parents d240733 + 36b6d22 commit b62c88b

File tree

2 files changed

+51
-1
lines changed

2 files changed

+51
-1
lines changed

p2p/stream/common/streammanager/config.go

+2
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ const (
1212
connectTimeout = 60 * time.Second
1313
// MaxReservedStreams is the maximum number of reserved streams
1414
MaxReservedStreams = 100
15+
// RemovalCooldownDuration defines the cooldown period (in minutes) before a removed stream can reconnect.
16+
RemovalCooldownDuration = 60 // 1 hour
1517
)
1618

1719
// Config is the config for stream manager

p2p/stream/common/streammanager/streammanager.go

+49-1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ var (
2626
ErrStreamAlreadyExist = errors.New("stream already exist")
2727
// ErrTooManyStreams is the error that the number of streams is exceeded the capacity
2828
ErrTooManyStreams = errors.New("too many streams")
29+
// ErrStreamRemovalNotExpired is the error that the stream was removed before and can't be added yet
30+
ErrStreamRemovalNotExpired = errors.New("stream removal not expired yet")
2931
)
3032

3133
// streamManager is the implementation of StreamManager. It manages streams on
@@ -44,6 +46,8 @@ type streamManager struct {
4446
// Note that it could happen that remote node does not share exactly the same
4547
// protocol ID (e.g. different version)
4648
streams *streamSet
49+
// tracks removed streams with cooldown
50+
removedStreams *sttypes.SafeMap[sttypes.StreamID, *RemovalInfo]
4751
// reserved streams
4852
reservedStreams *streamSet
4953
// libp2p utilities
@@ -66,6 +70,35 @@ type streamManager struct {
6670
cancel func()
6771
}
6872

73+
type RemovalInfo struct {
74+
count uint64
75+
removedAt time.Time
76+
expireAt time.Time
77+
}
78+
79+
// MarkAsRemoved resets the removal time and increments the removal count.
80+
func (rm *RemovalInfo) MarkAsRemoved() {
81+
now := time.Now()
82+
rm.removedAt = now
83+
rm.expireAt = now.Add(RemovalCooldownDuration * time.Minute)
84+
rm.count++
85+
}
86+
87+
// RemovedAt returns the timestamp when the stream was removed.
88+
func (rm *RemovalInfo) RemovedAt() time.Time {
89+
return rm.removedAt
90+
}
91+
92+
// HasExpired checks if the cooldown period has passed, allowing the stream to reconnect.
93+
func (rm *RemovalInfo) HasExpired() bool {
94+
return time.Now().After(rm.expireAt)
95+
}
96+
97+
// IncrementRemovalCount increases the removal count.
98+
func (rm *RemovalInfo) IncrementRemovalCount() {
99+
rm.count++
100+
}
101+
69102
// NewStreamManager creates a new stream manager for the given proto ID
70103
func NewStreamManager(pid sttypes.ProtoID, host host, pf peerFinder, handleStream func(network.Stream), c Config) StreamManager {
71104
return newStreamManager(pid, host, pf, handleStream, c)
@@ -92,6 +125,7 @@ func newStreamManager(pid sttypes.ProtoID, host host, pf peerFinder, handleStrea
92125
config: c,
93126
streams: newStreamSet(),
94127
reservedStreams: newStreamSet(),
128+
removedStreams: sttypes.NewSafeMap[sttypes.StreamID, *RemovalInfo](),
95129
host: host,
96130
pf: pf,
97131
handleStream: handleStream,
@@ -272,6 +306,12 @@ func (sm *streamManager) handleAddStream(st sttypes.Stream) error {
272306
if _, ok := sm.streams.get(id); ok {
273307
return ErrStreamAlreadyExist
274308
}
309+
// Check if stream was recently removed
310+
if removalInfo, exists := sm.removedStreams.Get(id); exists {
311+
if !removalInfo.HasExpired() {
312+
return ErrStreamRemovalNotExpired
313+
}
314+
}
275315

276316
// If the stream list has sufficient capacity, the stream can be added to the reserved list
277317
if sm.streams.size() >= sm.config.HiCap {
@@ -322,9 +362,17 @@ func (sm *streamManager) handleRemoveStream(id sttypes.StreamID) error {
322362
if !ok {
323363
return ErrStreamAlreadyRemoved
324364
}
325-
326365
sm.streams.deleteStream(st)
327366

367+
info, exist := sm.removedStreams.Get(id)
368+
if !exist {
369+
info = &RemovalInfo{
370+
count: 0,
371+
}
372+
sm.removedStreams.Set(id, info)
373+
}
374+
info.MarkAsRemoved()
375+
328376
// try to replace removed streams from reserved list
329377
requiredStreams := sm.hardRequiredStreams()
330378
if added, err := sm.addStreamFromReserved(requiredStreams); added > 0 {

0 commit comments

Comments
 (0)