Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: handle community shard unassignment and update #4627

Merged
merged 2 commits into from
Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions eth-node/bridge/geth/waku.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ func (w *gethWakuWrapper) SubscribeToPubsubTopic(topic string, optPublicKey *ecd
return errors.New("not available in WakuV1")
}

func (w *gethWakuWrapper) UnsubscribeFromPubsubTopic(topic string) error {
// not available in WakuV1
return errors.New("not available in WakuV1")
}

func (w *gethWakuWrapper) RetrievePubsubTopicKey(topic string) (*ecdsa.PrivateKey, error) {
// not available in WakuV1
return nil, errors.New("not available in WakuV1")
Expand All @@ -79,6 +84,11 @@ func (w *gethWakuWrapper) StorePubsubTopicKey(topic string, privKey *ecdsa.Priva
return errors.New("not available in WakuV1")
}

func (w *gethWakuWrapper) RemovePubsubTopicKey(topic string) error {
// not available in WakuV1
return errors.New("not available in WakuV1")
}

// AddRelayPeer function only added for compatibility with waku V2
func (w *gethWakuWrapper) AddRelayPeer(address string) (peer.ID, error) {
return "", errors.New("not available in WakuV1")
Expand Down
8 changes: 8 additions & 0 deletions eth-node/bridge/geth/wakuv2.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,10 @@ func (w *gethWakuV2Wrapper) SubscribeToPubsubTopic(topic string, optPublicKey *e
return w.waku.SubscribeToPubsubTopic(topic, optPublicKey)
}

func (w *gethWakuV2Wrapper) UnsubscribeFromPubsubTopic(topic string) error {
return w.waku.UnsubscribeFromPubsubTopic(topic)
}

func (w *gethWakuV2Wrapper) RetrievePubsubTopicKey(topic string) (*ecdsa.PrivateKey, error) {
return w.waku.RetrievePubsubTopicKey(topic)
}
Expand All @@ -245,6 +249,10 @@ func (w *gethWakuV2Wrapper) StorePubsubTopicKey(topic string, privKey *ecdsa.Pri
return w.waku.StorePubsubTopicKey(topic, privKey)
}

func (w *gethWakuV2Wrapper) RemovePubsubTopicKey(topic string) error {
return w.waku.RemovePubsubTopicKey(topic)
}

func (w *gethWakuV2Wrapper) AddStorePeer(address string) (peer.ID, error) {
return w.waku.AddStorePeer(address)
}
Expand Down
4 changes: 4 additions & 0 deletions eth-node/types/waku.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,14 @@ type Waku interface {

SubscribeToPubsubTopic(topic string, optPublicKey *ecdsa.PublicKey) error

UnsubscribeFromPubsubTopic(topic string) error

StorePubsubTopicKey(topic string, privKey *ecdsa.PrivateKey) error

RetrievePubsubTopicKey(topic string) (*ecdsa.PrivateKey, error)

RemovePubsubTopicKey(topic string) error

AddStorePeer(address string) (peer.ID, error)

AddRelayPeer(address string) (peer.ID, error)
Expand Down
18 changes: 7 additions & 11 deletions protocol/communities/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1077,6 +1077,10 @@ func (m *Manager) DeleteCommunity(id types.HexBytes) error {

func (m *Manager) UpdateShard(community *Community, shard *shard.Shard, clock uint64) error {
community.config.Shard = shard
if shard == nil {
return m.persistence.DeleteCommunityShard(community.ID())
}

return m.persistence.SaveCommunityShard(community.ID(), shard, clock)
}

Expand All @@ -1086,9 +1090,6 @@ func (m *Manager) SetShard(communityID types.HexBytes, shard *shard.Shard) (*Com
if err != nil {
return nil, err
}
if !community.IsControlNode() {
return nil, errors.New("not admin or owner")
}

community.increaseClock()

Expand All @@ -1105,17 +1106,12 @@ func (m *Manager) SetShard(communityID types.HexBytes, shard *shard.Shard) (*Com
return community, nil
}

func (m *Manager) UpdatePubsubTopicPrivateKey(community *Community, privKey *ecdsa.PrivateKey) error {
community.SetPubsubTopicPrivateKey(privKey)

func (m *Manager) UpdatePubsubTopicPrivateKey(topic string, privKey *ecdsa.PrivateKey) error {
if privKey != nil {
topic := community.PubsubTopic()
if err := m.transport.StorePubsubTopicKey(topic, privKey); err != nil {
return err
}
return m.transport.StorePubsubTopicKey(topic, privKey)
}

return nil
return m.transport.RemovePubsubTopicKey(topic)
}

// EditCommunity takes a description, updates the community with the description,
Expand Down
90 changes: 77 additions & 13 deletions protocol/messenger_communities.go
Original file line number Diff line number Diff line change
Expand Up @@ -715,6 +715,16 @@ func (m *Messenger) subscribeToCommunityShard(communityID []byte, shard *shard.S
return m.transport.SubscribeToPubsubTopic(pubsubTopic, pubK)
}

func (m *Messenger) unsubscribeFromShard(shard *shard.Shard) error {
if m.transport.WakuVersion() != 2 {
return nil
}

// TODO: this should probably be moved completely to transport once pubsub topic logic is implemented

return m.transport.UnsubscribeFromPubsubTopic(shard.PubsubTopic())
}

func (m *Messenger) joinCommunity(ctx context.Context, communityID types.HexBytes, forceJoin bool) (*MessengerResponse, error) {
logger := m.logger.Named("joinCommunity")

Expand Down Expand Up @@ -2053,26 +2063,57 @@ func (m *Messenger) SetCommunityShard(request *requests.SetCommunityShard) (*Mes
return nil, err
}

community, err := m.communitiesManager.SetShard(request.CommunityID, request.Shard)
community, err := m.communitiesManager.GetByID(request.CommunityID)
if err != nil {
return nil, err
}

var topicPrivKey *ecdsa.PrivateKey
if request.PrivateKey != nil {
topicPrivKey, err = crypto.ToECDSA(*request.PrivateKey)
} else {
topicPrivKey, err = crypto.GenerateKey()
if !community.IsControlNode() {
return nil, errors.New("not admin or owner")
}

// Reset the community private key
community.SetPubsubTopicPrivateKey(nil)

// Removing the private key (if it exist)
err = m.RemovePubsubTopicPrivateKey(community.PubsubTopic())
if err != nil {
return nil, err
}

err = m.communitiesManager.UpdatePubsubTopicPrivateKey(community, topicPrivKey)
// Unsubscribing from existing shard
if community.Shard() != nil {
err := m.unsubscribeFromShard(community.Shard())
if err != nil {
return nil, err
}
}

community, err = m.communitiesManager.SetShard(request.CommunityID, request.Shard)
if err != nil {
return nil, err
}

if request.Shard != nil {
var topicPrivKey *ecdsa.PrivateKey
if request.PrivateKey != nil {
topicPrivKey, err = crypto.ToECDSA(*request.PrivateKey)
} else {
topicPrivKey, err = crypto.GenerateKey()
}
if err != nil {
return nil, err
}

community.SetPubsubTopicPrivateKey(topicPrivKey)

err = m.communitiesManager.UpdatePubsubTopicPrivateKey(community.PubsubTopic(), topicPrivKey)
if err != nil {
return nil, err
}
}

// TODO: Check
err = m.UpdateCommunityFilters(community)
if err != nil {
return nil, err
Expand All @@ -2094,6 +2135,10 @@ func (m *Messenger) SetCommunityShard(request *requests.SetCommunityShard) (*Mes
return response, nil
}

func (m *Messenger) RemovePubsubTopicPrivateKey(topic string) error {
return m.transport.RemovePubsubTopicKey(topic)
}

func (m *Messenger) UpdateCommunityFilters(community *communities.Community) error {
defaultFilters := m.DefaultFilters(community)
publicFiltersToInit := make([]transport.FiltersToInitialize, 0, len(defaultFilters)+len(community.Chats()))
Expand Down Expand Up @@ -2480,15 +2525,16 @@ func (m *Messenger) SendCommunityShardKey(community *communities.Community, pubk
return nil
}

keyBytes := make([]byte, 0)
key := community.PubsubTopicPrivateKey()
if key == nil {
return nil // No community shard key available
if key != nil {
keyBytes = crypto.FromECDSA(key)
}

communityShardKey := &protobuf.CommunityShardKey{
Clock: community.Clock(),
CommunityId: community.ID(),
PrivateKey: crypto.FromECDSA(key),
PrivateKey: keyBytes,
Shard: community.Shard().Protobuffer(),
}

Expand Down Expand Up @@ -2831,14 +2877,32 @@ func (m *Messenger) handleCommunityShardAndFiltersFromProto(community *communiti
}

var privKey *ecdsa.PrivateKey = nil
if message.PrivateKey != nil {
privKey, err = crypto.ToECDSA(message.PrivateKey)
if message.Shard != nil {
if message.PrivateKey != nil {
privKey, err = crypto.ToECDSA(message.PrivateKey)
if err != nil {
return err
}
}
}

// Removing the existing private key (if any)
err = m.RemovePubsubTopicPrivateKey(community.PubsubTopic())
if err != nil {
return err
}

// Unsubscribing from existing shard
if community.Shard() != nil {
err := m.unsubscribeFromShard(community.Shard())
if err != nil {
return err
}
}

err = m.communitiesManager.UpdatePubsubTopicPrivateKey(community, privKey)
community.SetPubsubTopicPrivateKey(privKey)

err = m.communitiesManager.UpdatePubsubTopicPrivateKey(community.PubsubTopic(), privKey)
if err != nil {
return err
}
Expand Down
15 changes: 15 additions & 0 deletions protocol/transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -692,10 +692,25 @@ func (t *Transport) SubscribeToPubsubTopic(topic string, optPublicKey *ecdsa.Pub
return nil
}

// Unsubscribe from a pubsub topic
func (t *Transport) UnsubscribeFromPubsubTopic(topic string) error {
if t.waku.Version() == 2 {
return t.waku.UnsubscribeFromPubsubTopic(topic)
}
return nil
}

func (t *Transport) StorePubsubTopicKey(topic string, privKey *ecdsa.PrivateKey) error {
return t.waku.StorePubsubTopicKey(topic, privKey)
}

func (t *Transport) RetrievePubsubTopicKey(topic string) (*ecdsa.PrivateKey, error) {
return t.waku.RetrievePubsubTopicKey(topic)
}

func (t *Transport) RemovePubsubTopicKey(topic string) error {
if t.waku.Version() == 2 {
return t.waku.RemovePubsubTopicKey(topic)
}
return nil
}
12 changes: 12 additions & 0 deletions wakuv2/persistence/signed_messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type ProtectedTopicsStore struct {

insertStmt *sql.Stmt
fetchPrivKeyStmt *sql.Stmt
deleteStmt *sql.Stmt
}

// Creates a new DB store using the db specified via options.
Expand All @@ -33,11 +34,17 @@ func NewProtectedTopicsStore(log *zap.Logger, db *sql.DB) (*ProtectedTopicsStore
return nil, err
}

deleteStmt, err := db.Prepare("DELETE FROM pubsubtopic_signing_key WHERE topic = ?")
if err != nil {
return nil, err
}

result := new(ProtectedTopicsStore)
result.log = log.Named("protected-topics-store")
result.db = db
result.insertStmt = insertStmt
result.fetchPrivKeyStmt = fetchPrivKeyStmt
result.deleteStmt = deleteStmt

return result, nil
}
Expand All @@ -64,6 +71,11 @@ func (p *ProtectedTopicsStore) Insert(pubsubTopic string, privKey *ecdsa.Private
return err
}

func (p *ProtectedTopicsStore) Delete(pubsubTopic string) error {
_, err := p.deleteStmt.Exec(pubsubTopic)
return err
}

func (p *ProtectedTopicsStore) FetchPrivateKey(topic string) (privKey *ecdsa.PrivateKey, err error) {
var privKeyBytes []byte
err = p.fetchPrivKeyStmt.QueryRow(topic).Scan(&privKeyBytes)
Expand Down
33 changes: 33 additions & 0 deletions wakuv2/waku.go
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,18 @@ func (w *Waku) getPubsubTopic(topic string) string {
return topic
}

func (w *Waku) unsubscribeFromPubsubTopicWithWakuRelay(topic string) error {
topic = w.getPubsubTopic(topic)

if !w.node.Relay().IsSubscribed(topic) {
return nil
}

contentFilter := protocol.NewContentFilter(topic)

return w.node.Relay().Unsubscribe(w.ctx, contentFilter)
}

func (w *Waku) subscribeToPubsubTopicWithWakuRelay(topic string, pubkey *ecdsa.PublicKey) error {
if w.settings.LightClient {
return errors.New("only available for full nodes")
Expand Down Expand Up @@ -1535,6 +1547,18 @@ func (w *Waku) SubscribeToPubsubTopic(topic string, pubkey *ecdsa.PublicKey) err
return nil
}

func (w *Waku) UnsubscribeFromPubsubTopic(topic string) error {
topic = w.getPubsubTopic(topic)

if !w.settings.LightClient {
err := w.unsubscribeFromPubsubTopicWithWakuRelay(topic)
if err != nil {
return err
}
}
return nil
}

func (w *Waku) RetrievePubsubTopicKey(topic string) (*ecdsa.PrivateKey, error) {
topic = w.getPubsubTopic(topic)
if w.protectedTopicStore == nil {
Expand All @@ -1553,6 +1577,15 @@ func (w *Waku) StorePubsubTopicKey(topic string, privKey *ecdsa.PrivateKey) erro
return w.protectedTopicStore.Insert(topic, privKey, &privKey.PublicKey)
}

func (w *Waku) RemovePubsubTopicKey(topic string) error {
topic = w.getPubsubTopic(topic)
if w.protectedTopicStore == nil {
return nil
}

return w.protectedTopicStore.Delete(topic)
}

func (w *Waku) StartDiscV5() error {
if w.node.DiscV5() == nil {
return errors.New("discv5 is not setup")
Expand Down