forked from kofj/naivechain
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.go
329 lines (298 loc) · 9.23 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
package main
import (
"crypto/sha256"
"encoding/json"
"flag"
"fmt"
"io"
"log"
"net/http"
"sort"
"strings"
"time"
"golang.org/x/net/websocket"
)
/*
iota是一个静态计数器,只能用在常量声明中,作用是,每次用过iota之后,当前常量值被置为0,随后出现的常量依次+1
比如下面常量值queryLatest = 0, queryAll = 1, responseBlockchain = 2
*/
const (
queryLatest = iota
queryAll
responseBlockchain
)
/*
&的写法表示当前声明的变量genesisBlock是Block实例的一个指针,指向了创世块Block的地址, Block是golang中的一个数据结构struct,
在后面进行了声明,对于struct的声明形式是:
type <struct name> struct{ 注意大括号必须和声明语句在同一行
<variable name> <varible type> 声明时候每一行没有逗号
}
实例化的时候,如下面,每一行有逗号,最后一行也必须有逗号
*/
var genesisBlock = &Block{
Index: 0,
PreviousHash: "0",
Timestamp: 1465154705,
Data: "my genesis block!!",
Hash: "816534932c2b7154836da6afc367695e6337db8a921823784c14378abed4f7d7",
}
/*
flag 是第三方的库,作用是解析程序启动时候传入的参数,需要注意的是,具体什么参数可以被解析是需要提前注册的
如下面的使用例子这样,并且可以给定一个默认值,和一个解释
*/
var (
sockets []*websocket.Conn
blockchain = []*Block{genesisBlock}
httpAddr = flag.String("api", ":3001", "api server address.")
p2pAddr = flag.String("p2p", ":6001", "p2p server address.")
initialPeers = flag.String("peers", "ws://localhost:6001", "initial peers")
)
/*
结构体的声明,其中需要注意的是`` 符号的使用, 这是golang语句中的TAG用法, TAG可以使用反射机制去获取, 如:
myBlock := Block{}
myBlockType := reflect.TypeOf(myBlock)
field := myBlockType.Field(0)
fmt.Println(field.Tag.Get("json"))
TAG也可以有多个,中间用空格隔开,如下面Index字段也可以这样写
Index int64 `json:"index" primaryKey: true`
取的时候,对应使用
fmt.Println(field.Tag.Get("json"), field.TAG.Get("primaryKey"))
*/
type Block struct {
Index int64 `json:"index"`
PreviousHash string `json:"previousHash"`
Timestamp int64 `json:"timestamp"`
Data string `json:"data"`
Hash string `json:"hash"`
}
/*
打印一个区块, 注意返回语句中使用的是 Sprintf 和Println不同的是,Println是往os.Stdout输出,Sprintf直接返回字符串,不进行控制台输出
*/
func (b *Block) String() string {
return fmt.Sprintf("index: %d,previousHash:%s,timestamp:%d,data:%s,hash:%s", b.Index, b.PreviousHash, b.Timestamp, b.Data, b.Hash)
}
type ByIndex []*Block
func (b ByIndex) Len() int { return len(b) }
func (b ByIndex) Swap(i, j int) { b[i], b[j] = b[j], b[i] }
func (b ByIndex) Less(i, j int) bool { return b[i].Index < b[j].Index }
type ResponseBlockchain struct {
Type int `json:"type"`
Data string `json:"data"`
}
func errFatal(msg string, err error) {
if err != nil {
log.Fatalln(msg, err)
}
}
func connectToPeers(peersAddr []string) {
for _, peer := range peersAddr {
if peer == "" {
continue
}
ws, err := websocket.Dial(peer, "", peer)
if err != nil {
log.Println("dial to peer", err)
continue
}
initConnection(ws)
}
}
func initConnection(ws *websocket.Conn) {
go wsHandleP2P(ws)
log.Println("query latest block.")
ws.Write(queryLatestMsg())
}
func handleBlocks(w http.ResponseWriter, r *http.Request) {
bs, _ := json.Marshal(blockchain)
w.Write(bs)
}
func handleMineBlock(w http.ResponseWriter, r *http.Request) {
var v struct {
Data string `json:"data"`
}
decoder := json.NewDecoder(r.Body)
defer r.Body.Close()
err := decoder.Decode(&v)
if err != nil {
w.WriteHeader(http.StatusGone)
log.Println("[API] invalid block data : ", err.Error())
w.Write([]byte("invalid block data. " + err.Error() + "\n"))
return
}
block := generateNextBlock(v.Data)
addBlock(block)
broadcast(responseLatestMsg())
}
func handlePeers(w http.ResponseWriter, r *http.Request) {
var slice []string
for _, socket := range sockets {
if socket.IsClientConn() {
slice = append(slice, strings.Replace(socket.LocalAddr().String(), "ws://", "", 1))
} else {
slice = append(slice, socket.Request().RemoteAddr)
}
}
bs, _ := json.Marshal(slice)
w.Write(bs)
}
func handleAddPeer(w http.ResponseWriter, r *http.Request) {
var v struct {
Peer string `json:"peer"`
}
decoder := json.NewDecoder(r.Body)
defer r.Body.Close()
err := decoder.Decode(&v)
if err != nil {
w.WriteHeader(http.StatusGone)
log.Println("[API] invalid peer data : ", err.Error())
w.Write([]byte("invalid peer data. " + err.Error()))
return
}
connectToPeers([]string{v.Peer})
}
func wsHandleP2P(ws *websocket.Conn) {
var (
v = &ResponseBlockchain{}
peer = ws.LocalAddr().String()
)
sockets = append(sockets, ws)
for {
var msg []byte
err := websocket.Message.Receive(ws, &msg)
if err == io.EOF {
log.Printf("p2p Peer[%s] shutdown, remove it form peers pool.\n", peer)
break
}
if err != nil {
log.Println("Can't receive p2p msg from ", peer, err.Error())
break
}
log.Printf("Received[from %s]: %s.\n", peer, msg)
err = json.Unmarshal(msg, v)
errFatal("invalid p2p msg", err)
switch v.Type {
case queryLatest:
v.Type = responseBlockchain
bs := responseLatestMsg()
log.Printf("responseLatestMsg: %s\n", bs)
ws.Write(bs)
case queryAll:
d, _ := json.Marshal(blockchain)
v.Type = responseBlockchain
v.Data = string(d)
bs, _ := json.Marshal(v)
log.Printf("responseChainMsg: %s\n", bs)
ws.Write(bs)
case responseBlockchain:
handleBlockchainResponse([]byte(v.Data))
}
}
}
func getLatestBlock() (block *Block) { return blockchain[len(blockchain)-1] }
func responseLatestMsg() (bs []byte) {
var v = &ResponseBlockchain{Type: responseBlockchain}
d, _ := json.Marshal(blockchain[len(blockchain)-1:])
v.Data = string(d)
bs, _ = json.Marshal(v)
return
}
func queryLatestMsg() []byte { return []byte(fmt.Sprintf("{\"type\": %d}", queryLatest)) }
func queryAllMsg() []byte { return []byte(fmt.Sprintf("{\"type\": %d}", queryAll)) }
func calculateHashForBlock(b *Block) string {
return fmt.Sprintf("%x", sha256.Sum256([]byte(fmt.Sprintf("%d%s%d%s", b.Index, b.PreviousHash, b.Timestamp, b.Data))))
}
func generateNextBlock(data string) (nb *Block) {
var previousBlock = getLatestBlock()
nb = &Block{
Data: data,
PreviousHash: previousBlock.Hash,
Index: previousBlock.Index + 1,
Timestamp: time.Now().Unix(),
}
nb.Hash = calculateHashForBlock(nb)
return
}
func addBlock(b *Block) {
if isValidNewBlock(b, getLatestBlock()) {
blockchain = append(blockchain, b)
}
}
func isValidNewBlock(nb, pb *Block) (ok bool) {
if nb.Hash == calculateHashForBlock(nb) &&
pb.Index+1 == nb.Index &&
pb.Hash == nb.PreviousHash {
ok = true
}
return
}
func isValidChain(bc []*Block) bool {
if bc[0].String() != genesisBlock.String() {
log.Println("No same GenesisBlock.", bc[0].String())
return false
}
var temp = []*Block{bc[0]}
for i := 1; i < len(bc); i++ {
if isValidNewBlock(bc[i], temp[i-1]) {
temp = append(temp, bc[i])
} else {
return false
}
}
return true
}
func replaceChain(bc []*Block) {
if isValidChain(bc) && len(bc) > len(blockchain) {
log.Println("Received blockchain is valid. Replacing current blockchain with received blockchain.")
blockchain = bc
broadcast(responseLatestMsg())
} else {
log.Println("Received blockchain invalid.")
}
}
func broadcast(msg []byte) {
for n, socket := range sockets {
_, err := socket.Write(msg)
if err != nil {
log.Printf("peer [%s] disconnected.", socket.RemoteAddr().String())
sockets = append(sockets[0:n], sockets[n+1:]...)
}
}
}
func handleBlockchainResponse(msg []byte) {
var receivedBlocks = []*Block{}
err := json.Unmarshal(msg, &receivedBlocks)
errFatal("invalid blockchain", err)
sort.Sort(ByIndex(receivedBlocks))
latestBlockReceived := receivedBlocks[len(receivedBlocks)-1]
latestBlockHeld := getLatestBlock()
if latestBlockReceived.Index > latestBlockHeld.Index {
log.Printf("blockchain possibly behind. We got: %d Peer got: %d", latestBlockHeld.Index, latestBlockReceived.Index)
if latestBlockHeld.Hash == latestBlockReceived.PreviousHash {
log.Println("We can append the received block to our chain.")
blockchain = append(blockchain, latestBlockReceived)
} else if len(receivedBlocks) == 1 {
log.Println("We have to query the chain from our peer.")
broadcast(queryAllMsg())
} else {
log.Println("Received blockchain is longer than current blockchain.")
replaceChain(receivedBlocks)
}
} else {
log.Println("received blockchain is not longer than current blockchain. Do nothing.")
}
}
func main() {
flag.Parse()
connectToPeers(strings.Split(*initialPeers, ","))
http.HandleFunc("/blocks", handleBlocks)
http.HandleFunc("/mine_block", handleMineBlock)
http.HandleFunc("/peers", handlePeers)
http.HandleFunc("/add_peer", handleAddPeer)
go func() {
log.Println("Listen HTTP on", *httpAddr)
errFatal("start api server", http.ListenAndServe(*httpAddr, nil))
}()
http.Handle("/", websocket.Handler(wsHandleP2P))
log.Println("Listen P2P on ", *p2pAddr)
errFatal("start p2p server", http.ListenAndServe(*p2pAddr, nil))
}