Skip to content

Commit a124c96

Browse files
committed
Merge branch 'release/v1.2.0'
2 parents 6468f77 + bf7a95a commit a124c96

32 files changed

+591
-804
lines changed

Diff for: CHANGELOG.md

+27-5
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,29 @@
22
All notable changes to this project will be documented in this file.
33
This project adheres to [Semantic Versioning](http://semver.org/).
44

5-
## v1.1.4
5+
## v1.2.0 - 2015-11-19
6+
### Added
7+
- Added `UUID` term
8+
- Added `Values` term
9+
- Added `IncludeInitial` and `ChangefeedQueueSize` to `ChangesOpts`
10+
- Added `UseJSONNumber` to `ConnectOpts` which changes the way the JSON unmarshal works when deserializing JSON with interface{}, it's preferred to use json.Number instead float64 as it preserves the original precision.
11+
- Added `HostDecayDuration` to `ConnectOpts` to configure how hosts are selected. For more information see the godoc.
12+
13+
### Changed
14+
- Timezones from `time.Time` are now stored in the database, before all times were stored as UTC. To convert a go `time.Time` back to UTC you can call `t.In(time.UTC)`.
15+
- Improved host selection to use `hailocab/go-hostpool` to select nodes based on recent responses and timings.
16+
- Changed connection pool to use `fatih/pool` instead of a custom connection pool, this has caused some internal API changes and the behaviour of `MaxIdle` and `MaxOpen` has slightly changed. This change was made mostly to make driver maintenance easier.
17+
+ `MaxIdle` now configures the initial size of the pool, the name of this field will likely change in the future.
18+
+ Not setting `MaxOpen` no longer creates an unbounded connection pool per host but instead creates a pool with a maximum capacity of 2 per host.
19+
20+
### Deprecated
21+
- Deprecated the option `NodeRefreshInterval` in `ConnectOpts`
22+
- Deprecated `SetMaxIdleConns` and `SetMaxOpenConns`, these options should now only be set when creating the session.
23+
24+
### Fixed
25+
- Fixed some type aliases not being correctly encoded when using `Expr`.
26+
27+
## v1.1.4 - 2015-10-02
628
### Added
729
- Added root table terms (`r.TableCreate`, `r.TableList` and `r.TableDrop`)
830

@@ -16,22 +38,22 @@ This project adheres to [Semantic Versioning](http://semver.org/).
1638
- Fixed stop query incorrectly waiting for response
1739
- Fixed pointers not to be properly decoded
1840

19-
## v1.1.3
41+
## v1.1.3 - 2015-09-06
2042
### Fixed
2143
- Fixed pointers not to be properly decoded
2244
- Fixed queries always timing out when Timeout ConnectOpt is set.
2345

24-
## v1.1.2
46+
## v1.1.2 - 2015-08-28
2547
### Fixed
2648
- Fixed issue when encoding some maps
2749

28-
## v1.1.1
50+
## v1.1.1 - 2015-08-21
2951
### Fixed
3052
- Corrected protobuf import
3153
- Fixed documentation
3254
- Fixed issues with time pseudotype conversion that caused issues with milliseconds
3355

34-
## v1.1.0
56+
## v1.1.0 - 2015-08-19
3557
### Added
3658
- Replaced `UseOutdated` with `ReadMode`
3759
- Added `EmergencyRepair` and `NonVotingReplicaTags` to `ReconfigureOpts`

Diff for: README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
![GoRethink Logo](https://raw.github.com/wiki/dancannon/gorethink/gopher-and-thinker-s.png "Golang Gopher and RethinkDB Thinker")
1010

11-
Current version: v1.1.4 (RethinkDB v2.1)
11+
Current version: v1.2.0 (RethinkDB v2.2)
1212

1313
Please note that this version of the driver only supports versions of RethinkDB using the v0.4 protocol (any versions of the driver older than RethinkDB 2.0 will not work).
1414

Diff for: cluster.go

+81-50
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,14 @@ package gorethink
22

33
import (
44
"fmt"
5-
"math"
65
"strings"
76
"sync"
87
"sync/atomic"
98
"time"
109

1110
"github.com/Sirupsen/logrus"
1211
"github.com/cenkalti/backoff"
12+
"github.com/hailocab/go-hostpool"
1313
)
1414

1515
// A Cluster represents a connection to a RethinkDB cluster, a cluster is created
@@ -23,8 +23,9 @@ type Cluster struct {
2323
opts *ConnectOpts
2424

2525
mu sync.RWMutex
26-
seeds []Host // Initial host nodes specified by user.
27-
nodes []*Node // Active nodes in cluster.
26+
seeds []Host // Initial host nodes specified by user.
27+
hp hostpool.HostPool
28+
nodes map[string]*Node // Active nodes in cluster.
2829
closed bool
2930

3031
nodeIndex int64
@@ -33,6 +34,7 @@ type Cluster struct {
3334
// NewCluster creates a new cluster by connecting to the given hosts.
3435
func NewCluster(hosts []Host, opts *ConnectOpts) (*Cluster, error) {
3536
c := &Cluster{
37+
hp: hostpool.NewEpsilonGreedy([]string{}, opts.HostDecayDuration, &hostpool.LinearEpsilonValueCalculator{}),
3638
seeds: hosts,
3739
opts: opts,
3840
}
@@ -52,22 +54,38 @@ func NewCluster(hosts []Host, opts *ConnectOpts) (*Cluster, error) {
5254

5355
// Query executes a ReQL query using the cluster to connect to the database
5456
func (c *Cluster) Query(q Query) (cursor *Cursor, err error) {
55-
node, err := c.GetRandomNode()
57+
node, hpr, err := c.GetNextNode()
5658
if err != nil {
5759
return nil, err
5860
}
5961

60-
return node.Query(q)
62+
cursor, err = node.Query(q)
63+
hpr.Mark(err)
64+
return cursor, err
6165
}
6266

6367
// Exec executes a ReQL query using the cluster to connect to the database
6468
func (c *Cluster) Exec(q Query) (err error) {
65-
node, err := c.GetRandomNode()
69+
node, hpr, err := c.GetNextNode()
6670
if err != nil {
6771
return err
6872
}
6973

70-
return node.Exec(q)
74+
err = node.Exec(q)
75+
hpr.Mark(err)
76+
return err
77+
}
78+
79+
// Server returns the server name and server UUID being used by a connection.
80+
func (c *Cluster) Server() (response ServerResponse, err error) {
81+
node, hpr, err := c.GetNextNode()
82+
if err != nil {
83+
return ServerResponse{}, err
84+
}
85+
86+
response, err = node.Server()
87+
hpr.Mark(err)
88+
return response, err
7189
}
7290

7391
// SetMaxIdleConns sets the maximum number of connections in the idle
@@ -120,7 +138,7 @@ func (c *Cluster) discover() {
120138

121139
return c.listenForNodeChanges()
122140
}, b, func(err error, wait time.Duration) {
123-
Log.Debugf("Error discovering hosts %s, waiting %s", err, wait)
141+
Log.Debugf("Error discovering hosts %s, waiting: %s", err, wait)
124142
})
125143
}
126144
}
@@ -129,7 +147,7 @@ func (c *Cluster) discover() {
129147
// This function will block until the query fails
130148
func (c *Cluster) listenForNodeChanges() error {
131149
// Start listening to changes from a random active node
132-
node, err := c.GetRandomNode()
150+
node, hpr, err := c.GetNextNode()
133151
if err != nil {
134152
return err
135153
}
@@ -140,11 +158,12 @@ func (c *Cluster) listenForNodeChanges() error {
140158
c.opts,
141159
)
142160
if err != nil {
143-
return fmt.Errorf("Error building query %s", err)
161+
return fmt.Errorf("Error building query: %s", err)
144162
}
145163

146164
cursor, err := node.Query(q)
147165
if err != nil {
166+
hpr.Mark(err)
148167
return err
149168
}
150169

@@ -182,7 +201,9 @@ func (c *Cluster) listenForNodeChanges() error {
182201
}
183202
}
184203

185-
return cursor.Err()
204+
err = cursor.Err()
205+
hpr.Mark(err)
206+
return err
186207
}
187208

188209
func (c *Cluster) connectNodes(hosts []Host) {
@@ -196,7 +217,7 @@ func (c *Cluster) connectNodes(hosts []Host) {
196217
for _, host := range hosts {
197218
conn, err := NewConnection(host.String(), c.opts)
198219
if err != nil {
199-
Log.Warnf("Error creating connection %s", err.Error())
220+
Log.Warnf("Error creating connection: %s", err.Error())
200221
continue
201222
}
202223
defer conn.Close()
@@ -207,16 +228,21 @@ func (c *Cluster) connectNodes(hosts []Host) {
207228
c.opts,
208229
)
209230
if err != nil {
210-
Log.Warnf("Error building query %s", err)
231+
Log.Warnf("Error building query: %s", err)
211232
continue
212233
}
213234

214235
_, cursor, err := conn.Query(q)
215236
if err != nil {
216-
Log.Warnf("Error fetching cluster status %s", err)
237+
Log.Warnf("Error fetching cluster status: %s", err)
217238
continue
218239
}
219240

241+
// TODO: connect to seed hosts using `.Server()` to get server ID. Need
242+
// some way of making this backwards compatible
243+
244+
// TODO: AFTER try to discover hosts
245+
220246
if c.opts.DiscoverHosts {
221247
var results []nodeStatus
222248
err = cursor.All(&results)
@@ -234,6 +260,8 @@ func (c *Cluster) connectNodes(hosts []Host) {
234260
}).Debug("Connected to node")
235261
nodeSet[node.ID] = node
236262
}
263+
} else {
264+
Log.Warnf("Error connecting to node: %s", err)
237265
}
238266
}
239267
} else {
@@ -246,6 +274,8 @@ func (c *Cluster) connectNodes(hosts []Host) {
246274
}).Debug("Connected to node")
247275
nodeSet[node.ID] = node
248276
}
277+
} else {
278+
Log.Warnf("Error connecting to node: %s", err)
249279
}
250280
}
251281
}
@@ -322,44 +352,31 @@ func (c *Cluster) getSeeds() []Host {
322352
return seeds
323353
}
324354

325-
// GetRandomNode returns a random node on the cluster
326-
// TODO(dancannon) replace with hostpool
327-
func (c *Cluster) GetRandomNode() (*Node, error) {
355+
// GetNextNode returns a random node on the cluster
356+
func (c *Cluster) GetNextNode() (*Node, hostpool.HostPoolResponse, error) {
328357
if !c.IsConnected() {
329-
return nil, ErrNoConnections
358+
return nil, nil, ErrNoConnections
330359
}
331-
// Must copy array reference for copy on write semantics to work.
332-
nodeArray := c.GetNodes()
333-
length := len(nodeArray)
334-
for i := 0; i < length; i++ {
335-
// Must handle concurrency with other non-tending goroutines, so nodeIndex is consistent.
336-
index := int(math.Abs(float64(c.nextNodeIndex() % int64(length))))
337-
node := nodeArray[index]
338-
339-
if !node.Closed() && node.IsHealthy() {
340-
return node, nil
360+
c.mu.RLock()
361+
defer c.mu.RUnlock()
362+
363+
nodes := c.nodes
364+
hpr := c.hp.Get()
365+
if n, ok := nodes[hpr.Host()]; ok {
366+
if !n.Closed() {
367+
return n, hpr, nil
341368
}
342369
}
343-
return nil, ErrNoConnections
370+
371+
return nil, nil, ErrNoConnections
344372
}
345373

346374
// GetNodes returns a list of all nodes in the cluster
347375
func (c *Cluster) GetNodes() []*Node {
348376
c.mu.RLock()
349-
nodes := c.nodes
350-
c.mu.RUnlock()
351-
352-
return nodes
353-
}
354-
355-
// GetHealthyNodes returns a list of all healthy nodes in the cluster
356-
func (c *Cluster) GetHealthyNodes() []*Node {
357-
c.mu.RLock()
358-
nodes := []*Node{}
359-
for _, node := range c.nodes {
360-
if node.IsHealthy() {
361-
nodes = append(nodes, node)
362-
}
377+
nodes := make([]*Node, 0, len(c.nodes))
378+
for _, n := range c.nodes {
379+
nodes = append(nodes, n)
363380
}
364381
c.mu.RUnlock()
365382

@@ -376,20 +393,34 @@ func (c *Cluster) nodeExists(search *Node) bool {
376393
}
377394

378395
func (c *Cluster) addNode(node *Node) {
379-
c.mu.Lock()
380-
c.nodes = append(c.nodes, node)
381-
c.mu.Unlock()
396+
c.mu.RLock()
397+
nodes := append(c.GetNodes(), node)
398+
c.mu.RUnlock()
399+
400+
c.setNodes(nodes)
382401
}
383402

384403
func (c *Cluster) addNodes(nodesToAdd []*Node) {
385-
c.mu.Lock()
386-
c.nodes = append(c.nodes, nodesToAdd...)
387-
c.mu.Unlock()
404+
c.mu.RLock()
405+
nodes := append(c.GetNodes(), nodesToAdd...)
406+
c.mu.RUnlock()
407+
408+
c.setNodes(nodes)
388409
}
389410

390411
func (c *Cluster) setNodes(nodes []*Node) {
412+
nodesMap := make(map[string]*Node, len(nodes))
413+
hosts := make([]string, len(nodes))
414+
for i, node := range nodes {
415+
host := node.Host.String()
416+
417+
nodesMap[host] = node
418+
hosts[i] = host
419+
}
420+
391421
c.mu.Lock()
392-
c.nodes = nodes
422+
c.nodes = nodesMap
423+
c.hp.SetHosts(hosts)
393424
c.mu.Unlock()
394425
}
395426

Diff for: cluster_integration_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ func (s *RethinkSuite) TestClusterRecoverAfterNoNodes(c *test.C) {
6363

6464
func (s *RethinkSuite) TestClusterNodeHealth(c *test.C) {
6565
session, err := Connect(ConnectOpts{
66-
Addresses: []string{url, url2, url3},
66+
Addresses: []string{url1, url2, url3},
6767
DiscoverHosts: true,
6868
NodeRefreshInterval: time.Second,
6969
MaxIdle: 50,
@@ -75,7 +75,7 @@ func (s *RethinkSuite) TestClusterNodeHealth(c *test.C) {
7575
failed := 0
7676
seconds := 0
7777

78-
t := time.NewTimer(time.Second * 10)
78+
t := time.NewTimer(time.Second * 30)
7979
tick := time.NewTicker(time.Second)
8080
for {
8181
select {

0 commit comments

Comments
 (0)