From ee9b3ca5f4a97db198a83268cd9c5710eca09bf8 Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Thu, 25 Jan 2024 14:41:51 -0400 Subject: [PATCH 1/2] fix: handle community shard unassignment and update --- eth-node/bridge/geth/waku.go | 10 +++ eth-node/bridge/geth/wakuv2.go | 8 +++ eth-node/types/waku.go | 4 ++ protocol/communities/manager.go | 18 +++--- protocol/messenger_communities.go | 90 +++++++++++++++++++++++---- protocol/transport/transport.go | 15 +++++ wakuv2/persistence/signed_messages.go | 12 ++++ wakuv2/waku.go | 39 ++++++++++++ 8 files changed, 172 insertions(+), 24 deletions(-) diff --git a/eth-node/bridge/geth/waku.go b/eth-node/bridge/geth/waku.go index 55c72917c6..868644ec85 100644 --- a/eth-node/bridge/geth/waku.go +++ b/eth-node/bridge/geth/waku.go @@ -69,6 +69,11 @@ func (w *gethWakuWrapper) SubscribeToPubsubTopic(topic string, optPublicKey *ecd return errors.New("not available in WakuV1") } +func (w *gethWakuWrapper) UnsubscribeFromPubsubTopic(topic string) error { + // not available in WakuV1 + return errors.New("not available in WakuV1") +} + func (w *gethWakuWrapper) RetrievePubsubTopicKey(topic string) (*ecdsa.PrivateKey, error) { // not available in WakuV1 return nil, errors.New("not available in WakuV1") @@ -79,6 +84,11 @@ func (w *gethWakuWrapper) StorePubsubTopicKey(topic string, privKey *ecdsa.Priva return errors.New("not available in WakuV1") } +func (w *gethWakuWrapper) RemovePubsubTopicKey(topic string) error { + // not available in WakuV1 + return errors.New("not available in WakuV1") +} + // AddRelayPeer function only added for compatibility with waku V2 func (w *gethWakuWrapper) AddRelayPeer(address string) (peer.ID, error) { return "", errors.New("not available in WakuV1") diff --git a/eth-node/bridge/geth/wakuv2.go b/eth-node/bridge/geth/wakuv2.go index 3449b180a3..24e6ce838c 100644 --- a/eth-node/bridge/geth/wakuv2.go +++ b/eth-node/bridge/geth/wakuv2.go @@ -237,6 +237,10 @@ func (w *gethWakuV2Wrapper) SubscribeToPubsubTopic(topic string, optPublicKey *e return w.waku.SubscribeToPubsubTopic(topic, optPublicKey) } +func (w *gethWakuV2Wrapper) UnsubscribeFromPubsubTopic(topic string) error { + return w.waku.UnsubscribeFromPubsubTopic(topic) +} + func (w *gethWakuV2Wrapper) RetrievePubsubTopicKey(topic string) (*ecdsa.PrivateKey, error) { return w.waku.RetrievePubsubTopicKey(topic) } @@ -245,6 +249,10 @@ func (w *gethWakuV2Wrapper) StorePubsubTopicKey(topic string, privKey *ecdsa.Pri return w.waku.StorePubsubTopicKey(topic, privKey) } +func (w *gethWakuV2Wrapper) RemovePubsubTopicKey(topic string) error { + return w.waku.RemovePubsubTopicKey(topic) +} + func (w *gethWakuV2Wrapper) AddStorePeer(address string) (peer.ID, error) { return w.waku.AddStorePeer(address) } diff --git a/eth-node/types/waku.go b/eth-node/types/waku.go index 1d737e5134..213da7d0b9 100644 --- a/eth-node/types/waku.go +++ b/eth-node/types/waku.go @@ -90,10 +90,14 @@ type Waku interface { SubscribeToPubsubTopic(topic string, optPublicKey *ecdsa.PublicKey) error + UnsubscribeFromPubsubTopic(topic string) error + StorePubsubTopicKey(topic string, privKey *ecdsa.PrivateKey) error RetrievePubsubTopicKey(topic string) (*ecdsa.PrivateKey, error) + RemovePubsubTopicKey(topic string) error + AddStorePeer(address string) (peer.ID, error) AddRelayPeer(address string) (peer.ID, error) diff --git a/protocol/communities/manager.go b/protocol/communities/manager.go index e62f60cc1a..0a16cc4dbc 100644 --- a/protocol/communities/manager.go +++ b/protocol/communities/manager.go @@ -1077,6 +1077,10 @@ func (m *Manager) DeleteCommunity(id types.HexBytes) error { func (m *Manager) UpdateShard(community *Community, shard *shard.Shard, clock uint64) error { community.config.Shard = shard + if shard == nil { + return m.persistence.DeleteCommunityShard(community.ID()) + } + return m.persistence.SaveCommunityShard(community.ID(), shard, clock) } @@ -1086,9 +1090,6 @@ func (m *Manager) SetShard(communityID types.HexBytes, shard *shard.Shard) (*Com if err != nil { return nil, err } - if !community.IsControlNode() { - return nil, errors.New("not admin or owner") - } community.increaseClock() @@ -1105,17 +1106,12 @@ func (m *Manager) SetShard(communityID types.HexBytes, shard *shard.Shard) (*Com return community, nil } -func (m *Manager) UpdatePubsubTopicPrivateKey(community *Community, privKey *ecdsa.PrivateKey) error { - community.SetPubsubTopicPrivateKey(privKey) - +func (m *Manager) UpdatePubsubTopicPrivateKey(topic string, privKey *ecdsa.PrivateKey) error { if privKey != nil { - topic := community.PubsubTopic() - if err := m.transport.StorePubsubTopicKey(topic, privKey); err != nil { - return err - } + return m.transport.StorePubsubTopicKey(topic, privKey) } - return nil + return m.transport.RemovePubsubTopicKey(topic) } // EditCommunity takes a description, updates the community with the description, diff --git a/protocol/messenger_communities.go b/protocol/messenger_communities.go index 12b37954b8..9949b6e014 100644 --- a/protocol/messenger_communities.go +++ b/protocol/messenger_communities.go @@ -715,6 +715,16 @@ func (m *Messenger) subscribeToCommunityShard(communityID []byte, shard *shard.S return m.transport.SubscribeToPubsubTopic(pubsubTopic, pubK) } +func (m *Messenger) unsubscribeFromShard(shard *shard.Shard) error { + if m.transport.WakuVersion() != 2 { + return nil + } + + // TODO: this should probably be moved completely to transport once pubsub topic logic is implemented + + return m.transport.UnsubscribeFromPubsubTopic(shard.PubsubTopic()) +} + func (m *Messenger) joinCommunity(ctx context.Context, communityID types.HexBytes, forceJoin bool) (*MessengerResponse, error) { logger := m.logger.Named("joinCommunity") @@ -2053,26 +2063,57 @@ func (m *Messenger) SetCommunityShard(request *requests.SetCommunityShard) (*Mes return nil, err } - community, err := m.communitiesManager.SetShard(request.CommunityID, request.Shard) + community, err := m.communitiesManager.GetByID(request.CommunityID) if err != nil { return nil, err } - var topicPrivKey *ecdsa.PrivateKey - if request.PrivateKey != nil { - topicPrivKey, err = crypto.ToECDSA(*request.PrivateKey) - } else { - topicPrivKey, err = crypto.GenerateKey() + if !community.IsControlNode() { + return nil, errors.New("not admin or owner") } + + // Reset the community private key + community.SetPubsubTopicPrivateKey(nil) + + // Removing the private key (if it exist) + err = m.RemovePubsubTopicPrivateKey(community.PubsubTopic()) if err != nil { return nil, err } - err = m.communitiesManager.UpdatePubsubTopicPrivateKey(community, topicPrivKey) + // Unsubscribing from existing shard + if community.Shard() != nil { + err := m.unsubscribeFromShard(community.Shard()) + if err != nil { + return nil, err + } + } + + community, err = m.communitiesManager.SetShard(request.CommunityID, request.Shard) if err != nil { return nil, err } + if request.Shard != nil { + var topicPrivKey *ecdsa.PrivateKey + if request.PrivateKey != nil { + topicPrivKey, err = crypto.ToECDSA(*request.PrivateKey) + } else { + topicPrivKey, err = crypto.GenerateKey() + } + if err != nil { + return nil, err + } + + community.SetPubsubTopicPrivateKey(topicPrivKey) + + err = m.communitiesManager.UpdatePubsubTopicPrivateKey(community.PubsubTopic(), topicPrivKey) + if err != nil { + return nil, err + } + } + + // TODO: Check err = m.UpdateCommunityFilters(community) if err != nil { return nil, err @@ -2094,6 +2135,10 @@ func (m *Messenger) SetCommunityShard(request *requests.SetCommunityShard) (*Mes return response, nil } +func (m *Messenger) RemovePubsubTopicPrivateKey(topic string) error { + return m.transport.RemovePubsubTopicKey(topic) +} + func (m *Messenger) UpdateCommunityFilters(community *communities.Community) error { defaultFilters := m.DefaultFilters(community) publicFiltersToInit := make([]transport.FiltersToInitialize, 0, len(defaultFilters)+len(community.Chats())) @@ -2480,15 +2525,16 @@ func (m *Messenger) SendCommunityShardKey(community *communities.Community, pubk return nil } + keyBytes := make([]byte, 0) key := community.PubsubTopicPrivateKey() - if key == nil { - return nil // No community shard key available + if key != nil { + keyBytes = crypto.FromECDSA(key) } communityShardKey := &protobuf.CommunityShardKey{ Clock: community.Clock(), CommunityId: community.ID(), - PrivateKey: crypto.FromECDSA(key), + PrivateKey: keyBytes, Shard: community.Shard().Protobuffer(), } @@ -2831,14 +2877,32 @@ func (m *Messenger) handleCommunityShardAndFiltersFromProto(community *communiti } var privKey *ecdsa.PrivateKey = nil - if message.PrivateKey != nil { - privKey, err = crypto.ToECDSA(message.PrivateKey) + if message.Shard != nil { + if message.PrivateKey != nil { + privKey, err = crypto.ToECDSA(message.PrivateKey) + if err != nil { + return err + } + } + } + + // Removing the existing private key (if any) + err = m.RemovePubsubTopicPrivateKey(community.PubsubTopic()) + if err != nil { + return err + } + + // Unsubscribing from existing shard + if community.Shard() != nil { + err := m.unsubscribeFromShard(community.Shard()) if err != nil { return err } } - err = m.communitiesManager.UpdatePubsubTopicPrivateKey(community, privKey) + community.SetPubsubTopicPrivateKey(privKey) + + err = m.communitiesManager.UpdatePubsubTopicPrivateKey(community.PubsubTopic(), privKey) if err != nil { return err } diff --git a/protocol/transport/transport.go b/protocol/transport/transport.go index 0fe6fa0f01..3157377f00 100644 --- a/protocol/transport/transport.go +++ b/protocol/transport/transport.go @@ -692,6 +692,14 @@ func (t *Transport) SubscribeToPubsubTopic(topic string, optPublicKey *ecdsa.Pub return nil } +// Unsubscribe from a pubsub topic +func (t *Transport) UnsubscribeFromPubsubTopic(topic string) error { + if t.waku.Version() == 2 { + return t.waku.UnsubscribeFromPubsubTopic(topic) + } + return nil +} + func (t *Transport) StorePubsubTopicKey(topic string, privKey *ecdsa.PrivateKey) error { return t.waku.StorePubsubTopicKey(topic, privKey) } @@ -699,3 +707,10 @@ func (t *Transport) StorePubsubTopicKey(topic string, privKey *ecdsa.PrivateKey) func (t *Transport) RetrievePubsubTopicKey(topic string) (*ecdsa.PrivateKey, error) { return t.waku.RetrievePubsubTopicKey(topic) } + +func (t *Transport) RemovePubsubTopicKey(topic string) error { + if t.waku.Version() == 2 { + return t.waku.RemovePubsubTopicKey(topic) + } + return nil +} diff --git a/wakuv2/persistence/signed_messages.go b/wakuv2/persistence/signed_messages.go index e9522f4644..aad6d812a4 100644 --- a/wakuv2/persistence/signed_messages.go +++ b/wakuv2/persistence/signed_messages.go @@ -17,6 +17,7 @@ type ProtectedTopicsStore struct { insertStmt *sql.Stmt fetchPrivKeyStmt *sql.Stmt + deleteStmt *sql.Stmt } // Creates a new DB store using the db specified via options. @@ -33,11 +34,17 @@ func NewProtectedTopicsStore(log *zap.Logger, db *sql.DB) (*ProtectedTopicsStore return nil, err } + deleteStmt, err := db.Prepare("DELETE FROM pubsubtopic_signing_key WHERE topic = ?") + if err != nil { + return nil, err + } + result := new(ProtectedTopicsStore) result.log = log.Named("protected-topics-store") result.db = db result.insertStmt = insertStmt result.fetchPrivKeyStmt = fetchPrivKeyStmt + result.deleteStmt = deleteStmt return result, nil } @@ -64,6 +71,11 @@ func (p *ProtectedTopicsStore) Insert(pubsubTopic string, privKey *ecdsa.Private return err } +func (p *ProtectedTopicsStore) Delete(pubsubTopic string) error { + _, err := p.deleteStmt.Exec(pubsubTopic) + return err +} + func (p *ProtectedTopicsStore) FetchPrivateKey(topic string) (privKey *ecdsa.PrivateKey, err error) { var privKeyBytes []byte err = p.fetchPrivKeyStmt.QueryRow(topic).Scan(&privKeyBytes) diff --git a/wakuv2/waku.go b/wakuv2/waku.go index 870bb2f863..57e77d2d97 100644 --- a/wakuv2/waku.go +++ b/wakuv2/waku.go @@ -610,6 +610,24 @@ func (w *Waku) getPubsubTopic(topic string) string { return topic } +func (w *Waku) unsubscribeFromPubsubTopicWithWakuRelay(topic string) error { + if w.settings.LightClient { + return errors.New("only available for full nodes") + } + + topic = w.getPubsubTopic(topic) + + if !w.node.Relay().IsSubscribed(topic) { + return nil + } + + w.node.Relay().RemoveTopicValidator(topic) + + contentFilter := protocol.NewContentFilter(topic) + + return w.node.Relay().Unsubscribe(w.ctx, contentFilter) +} + func (w *Waku) subscribeToPubsubTopicWithWakuRelay(topic string, pubkey *ecdsa.PublicKey) error { if w.settings.LightClient { return errors.New("only available for full nodes") @@ -1535,6 +1553,18 @@ func (w *Waku) SubscribeToPubsubTopic(topic string, pubkey *ecdsa.PublicKey) err return nil } +func (w *Waku) UnsubscribeFromPubsubTopic(topic string) error { + topic = w.getPubsubTopic(topic) + + if !w.settings.LightClient { + err := w.unsubscribeFromPubsubTopicWithWakuRelay(topic) + if err != nil { + return err + } + } + return nil +} + func (w *Waku) RetrievePubsubTopicKey(topic string) (*ecdsa.PrivateKey, error) { topic = w.getPubsubTopic(topic) if w.protectedTopicStore == nil { @@ -1553,6 +1583,15 @@ func (w *Waku) StorePubsubTopicKey(topic string, privKey *ecdsa.PrivateKey) erro return w.protectedTopicStore.Insert(topic, privKey, &privKey.PublicKey) } +func (w *Waku) RemovePubsubTopicKey(topic string) error { + topic = w.getPubsubTopic(topic) + if w.protectedTopicStore == nil { + return nil + } + + return w.protectedTopicStore.Delete(topic) +} + func (w *Waku) StartDiscV5() error { if w.node.DiscV5() == nil { return errors.New("discv5 is not setup") From d3265e57aff3763a2b6aec109934c2c477f4726b Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Tue, 30 Jan 2024 13:12:59 -0400 Subject: [PATCH 2/2] fix: code review --- wakuv2/waku.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/wakuv2/waku.go b/wakuv2/waku.go index 57e77d2d97..814ea60549 100644 --- a/wakuv2/waku.go +++ b/wakuv2/waku.go @@ -611,18 +611,12 @@ func (w *Waku) getPubsubTopic(topic string) string { } func (w *Waku) unsubscribeFromPubsubTopicWithWakuRelay(topic string) error { - if w.settings.LightClient { - return errors.New("only available for full nodes") - } - topic = w.getPubsubTopic(topic) if !w.node.Relay().IsSubscribed(topic) { return nil } - w.node.Relay().RemoveTopicValidator(topic) - contentFilter := protocol.NewContentFilter(topic) return w.node.Relay().Unsubscribe(w.ctx, contentFilter)