Skip to content

Commit 9c31898

Browse files
committed
Fixed ws connection issues and added concurrent pin fetching to fetchPins
1 parent 4da5fb3 commit 9c31898

File tree

4 files changed

+101
-43
lines changed

4 files changed

+101
-43
lines changed

ipfs/ipfs.go

+1-3
Original file line numberDiff line numberDiff line change
@@ -256,10 +256,7 @@ func SyncNode(allPins map[string]ipfs.PinInfo, name string) {
256256
go func(i int, key string) {
257257
defer wg.Done()
258258
size, _ := FileSize(key)
259-
fmt.Println("Size: ", size)
260-
fmt.Println("Name: ", name)
261259
lock.Lock()
262-
fmt.Println("PeerSize: ", localdata.PeerSize[name])
263260
peersize = peersize + size
264261
lock.Unlock()
265262
// Check if the key exists in Pins
@@ -297,6 +294,7 @@ func SyncNode(allPins map[string]ipfs.PinInfo, name string) {
297294
lock.Lock()
298295
localdata.PeerSize[name] = peersize
299296
fmt.Println("Synced: ", name)
297+
fmt.Println("PeerSize: ", peersize)
300298
localdata.NodesStatus[name] = "Synced"
301299
lock.Unlock()
302300
return

main.go

+58-23
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"proofofaccess/localdata"
1313
"proofofaccess/proofcrypto"
1414
"strconv"
15+
"sync"
1516
"syscall"
1617
"time"
1718

@@ -64,7 +65,15 @@ func initialize(ctx context.Context) {
6465
} else {
6566
go fetchPins(ctx)
6667
}
68+
if *useWS && *nodeType == 2 {
69+
localdata.UseWS = *useWS
70+
fmt.Println()
71+
go messaging.StartWsClient()
6772

73+
} else {
74+
go pubsubHandler(ctx)
75+
go connectToValidators(ctx, nodeType)
76+
}
6877
go api.StartAPI(ctx)
6978

7079
}
@@ -126,12 +135,18 @@ func pubsubHandler(ctx context.Context) {
126135
}
127136
}
128137
func fetchPins(ctx context.Context) {
138+
var lock sync.Mutex // Mutex lock
139+
newPins := false // Assuming this is a boolean based on your usage
140+
129141
for {
130142
select {
131143
case <-ctx.Done():
132144
return
133145
default:
146+
lock.Lock()
134147
ipfs.Pins = ipfs.NewPins
148+
lock.Unlock()
149+
135150
fmt.Println("Fetching pins...")
136151
allPins, err := ipfs.Shell.Pins()
137152
for _, cid := range messaging.PinFileCids {
@@ -143,10 +158,16 @@ func fetchPins(ctx context.Context) {
143158
fmt.Println("Error fetching pins:", err)
144159
continue
145160
}
161+
162+
lock.Lock()
146163
ipfs.NewPins = make(map[string]interface{})
164+
lock.Unlock()
165+
147166
for key, pinInfo := range allPins {
148167
if pinInfo.Type == "recursive" {
168+
lock.Lock()
149169
ipfs.NewPins[key] = key
170+
lock.Unlock()
150171
}
151172
}
152173

@@ -155,44 +176,58 @@ func fetchPins(ctx context.Context) {
155176

156177
keysNotFound := 0
157178

179+
// Create a WaitGroup to wait for the function to finish
180+
var wg sync.WaitGroup
181+
158182
// Iterate through the keys in NewPins
159183
for key := range ipfs.NewPins {
160-
// Check if the key exists in Pins
184+
wg.Add(1)
185+
go func(key string) {
186+
defer wg.Done()
187+
// Check if the key exists in Pins
188+
lock.Lock()
189+
_, exists := ipfs.Pins[key]
190+
lock.Unlock()
161191

162-
if _, exists := ipfs.Pins[key]; !exists {
163-
size, _ := ipfs.FileSize(key)
164-
localdata.PeerSize[localdata.NodeName] = localdata.PeerSize[localdata.NodeName] + size
165-
newPins = true
166-
// If the key doesn't exist in Pins, add it to the pinsNotIncluded map
167-
localdata.SavedRefs[key], _ = ipfs.Refs(key)
168-
if localdata.Synced == false {
192+
if !exists {
193+
size, _ := ipfs.FileSize(key)
194+
lock.Lock()
195+
localdata.PeerSize[localdata.NodeName] += size
196+
newPins = true
197+
lock.Unlock()
169198

199+
// If the key doesn't exist in Pins, add it to the pinsNotIncluded map
200+
savedRefs, _ := ipfs.Refs(key)
201+
lock.Lock()
202+
localdata.SavedRefs[key] = savedRefs
203+
lock.Unlock()
204+
lock.Lock()
170205
localdata.SyncedPercentage = float32(keysNotFound) / float32(mapLength) * 100
171206
fmt.Println("Synced: ", localdata.SyncedPercentage, "%")
207+
lock.Unlock()
208+
keysNotFound++
172209
}
173-
keysNotFound++
174-
}
210+
}(key)
175211
}
176-
if localdata.Synced == false {
177-
fmt.Println("Synced: ", 100)
178-
localdata.SyncedPercentage = 100
179-
if *useWS && *nodeType == 2 {
180-
localdata.UseWS = *useWS
181-
go messaging.StartWsClient()
182-
} else {
183-
go pubsubHandler(ctx)
184-
go connectToValidators(ctx, nodeType)
185-
}
186-
}
187-
if newPins == true {
212+
213+
wg.Wait()
214+
215+
fmt.Println("Synced: ", 100)
216+
lock.Lock()
217+
localdata.SyncedPercentage = 100
218+
lock.Unlock()
219+
220+
if newPins {
188221
fmt.Println("New pins found")
189222
messaging.SendCIDS("validator1")
223+
newPins = false
190224
}
191-
newPins = false
225+
192226
time.Sleep(60 * time.Second)
193227
}
194228
}
195229
}
230+
196231
func checkSynced(ctx context.Context) {
197232
for {
198233
select {

messaging/ws.go

+42-16
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,10 @@ func StartWsClient() {
2929
go func() {
3030
for {
3131
if isConnected {
32-
_, message, err := c.ReadMessage()
32+
_, message, err := localdata.WsValidators["Validator1"].ReadMessage()
3333
if err != nil {
3434
log.Println("read:", err)
35+
fmt.Println("Connection lost. Reconnecting...")
3536
isConnected = false
3637
continue
3738
}
@@ -46,27 +47,51 @@ func StartWsClient() {
4647

4748
// Connection or reconnection loop
4849
for {
49-
if !isConnected {
50-
c, _, err = websocket.DefaultDialer.Dial("ws://spk.tv/messaging", nil)
51-
if err != nil {
52-
log.Println("dial:", err)
53-
time.Sleep(1 * time.Second) // Sleep for a second before next reconnection attempt
54-
continue
50+
for {
51+
if !isConnected {
52+
c, _, err = websocket.DefaultDialer.Dial("ws://spk.tv/messaging", nil)
53+
if err != nil {
54+
log.Println("dial:", err)
55+
time.Sleep(1 * time.Second)
56+
continue
57+
}
58+
isConnected = true
59+
log.Println("Connected to the server")
60+
salt, _ := proofcrypto.CreateRandomHash()
61+
localdata.WsValidators["Validator1"] = c
62+
fmt.Println("Connected to validator1")
63+
wsPing(salt)
64+
} else {
65+
// Ping the server to check if still connected
66+
err = localdata.WsValidators["Validator1"].WriteMessage(websocket.PingMessage, nil)
67+
if err != nil {
68+
log.Println("write:", err)
69+
fmt.Println("Connection lost. Reconnecting...")
70+
isConnected = false
71+
}
5572
}
56-
isConnected = true
57-
log.Println("Connected to the server")
58-
salt, _ := proofcrypto.CreateRandomHash()
59-
localdata.WsValidators["Validator1"] = c
60-
fmt.Println("Connected to validator1")
61-
wsPing(salt, c)
6273

74+
select {
75+
case <-interrupt:
76+
log.Println("interrupt")
77+
if isConnected {
78+
err = localdata.WsValidators["Validator1"].WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
79+
if err != nil {
80+
log.Println("write close:", err)
81+
return
82+
}
83+
}
84+
return
85+
default:
86+
time.Sleep(1 * time.Second)
87+
}
6388
}
6489

6590
select {
6691
case <-interrupt:
6792
log.Println("interrupt")
6893
if isConnected {
69-
err = c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
94+
err = localdata.WsValidators["Validator1"].WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
7095
if err != nil {
7196
log.Println("write close:", err)
7297
return
@@ -80,7 +105,8 @@ func StartWsClient() {
80105
}
81106
}
82107

83-
func wsPing(hash string, c *websocket.Conn) {
108+
func wsPing(hash string) {
109+
localdata.Synced = true
84110
data := map[string]string{
85111
"type": TypePingPongPing,
86112
"hash": hash,
@@ -92,5 +118,5 @@ func wsPing(hash string, c *websocket.Conn) {
92118
return
93119
}
94120
fmt.Println("Client send: ", string(jsonData))
95-
err = c.WriteMessage(websocket.TextMessage, jsonData)
121+
err = localdata.WsValidators["Validator1"].WriteMessage(websocket.TextMessage, jsonData)
96122
}

validation/validation.go

-1
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ func CreatProofHash(hash string, CID string) string {
3535
// Get all the file blocks CIDs from the Target Files CID
3636
fmt.Println("CID: ", CID)
3737
cids := localdata.SavedRefs[CID]
38-
fmt.Println("cids: ", cids)
3938
// Get the length of the CIDs
4039
length := len(cids)
4140
fmt.Println("length", length)

0 commit comments

Comments
 (0)