Skip to content

Commit 7731e97

Browse files
committed
grid: Remove allocs
Removes some minor allocations and cleans up benchmarks as well as adds typed roundtrip benchmarks. Websocket library updated to include gobwas/ws#189
1 parent 85af24e commit 7731e97

8 files changed

+143
-74
lines changed

go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ require (
2626
github.com/go-ldap/ldap/v3 v3.4.6
2727
github.com/go-openapi/loads v0.21.2
2828
github.com/go-sql-driver/mysql v1.7.1
29-
github.com/gobwas/ws v1.2.1
29+
github.com/gobwas/ws v1.3.1-0.20231030152437-516805a9f3b3
3030
github.com/golang-jwt/jwt/v4 v4.5.0
3131
github.com/gomodule/redigo v1.8.9
3232
github.com/google/uuid v1.3.1

go.sum

+2-2
Original file line numberDiff line numberDiff line change
@@ -235,8 +235,8 @@ github.com/gobwas/httphead v0.1.0 h1:exrUm0f4YX0L7EBwZHuCF4GDp8aJfVeBrlLQrs6NqWU
235235
github.com/gobwas/httphead v0.1.0/go.mod h1:O/RXo79gxV8G+RqlR/otEwx4Q36zl9rqC5u12GKvMCM=
236236
github.com/gobwas/pool v0.2.1 h1:xfeeEhW7pwmX8nuLVlqbzVc7udMDrwetjEv+TZIz1og=
237237
github.com/gobwas/pool v0.2.1/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw=
238-
github.com/gobwas/ws v1.2.1 h1:F2aeBZrm2NDsc7vbovKrWSogd4wvfAxg0FQ89/iqOTk=
239-
github.com/gobwas/ws v1.2.1/go.mod h1:hRKAFb8wOxFROYNsT1bqfWnhX+b5MFeJM9r2ZSwg/KY=
238+
github.com/gobwas/ws v1.3.1-0.20231030152437-516805a9f3b3 h1:u5on5kZjHKikhx6d2IAGOxFf4BAcJhUb2v8VJFHBgFA=
239+
github.com/gobwas/ws v1.3.1-0.20231030152437-516805a9f3b3/go.mod h1:hRKAFb8wOxFROYNsT1bqfWnhX+b5MFeJM9r2ZSwg/KY=
240240
github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU=
241241
github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
242242
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=

internal/grid/benchmark_test.go

+121-49
Original file line numberDiff line numberDiff line change
@@ -46,15 +46,24 @@ func benchmarkGridRequests(b *testing.B, n int) {
4646
b.Fatal(err)
4747
}
4848
}
49+
rpc := NewSingleHandler[*testRequest, *testResponse](handlerTest2, newTestRequest, newTestResponse)
4950
grid, err := SetupTestGrid(n)
5051
errFatal(err)
5152
b.Cleanup(grid.Cleanup)
5253
// Create n managers.
5354
for _, remote := range grid.Managers {
5455
// Register a single handler which echos the payload.
5556
errFatal(remote.RegisterSingleHandler(handlerTest, func(payload []byte) ([]byte, *RemoteErr) {
57+
defer PutByteBuffer(payload)
5658
return append(GetByteBuffer()[:0], payload...), nil
5759
}))
60+
errFatal(rpc.Register(remote, func(req *testRequest) (resp *testResponse, err *RemoteErr) {
61+
return &testResponse{
62+
OrgNum: req.Num,
63+
OrgString: req.String,
64+
Embedded: *req,
65+
}, nil
66+
}))
5867
errFatal(err)
5968
}
6069
const payloadSize = 512
@@ -65,61 +74,124 @@ func benchmarkGridRequests(b *testing.B, n int) {
6574

6675
// Wait for all to connect
6776
// Parallel writes per server.
68-
for par := 1; par <= 32; par *= 2 {
69-
b.Run("par="+strconv.Itoa(par*runtime.GOMAXPROCS(0)), func(b *testing.B) {
70-
defer timeout(30 * time.Second)()
71-
b.ReportAllocs()
72-
b.SetBytes(int64(len(payload) * 2))
73-
b.ResetTimer()
74-
t := time.Now()
75-
var ops int64
76-
var lat int64
77-
b.SetParallelism(par)
78-
b.RunParallel(func(pb *testing.PB) {
79-
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
80-
n := 0
81-
var latency int64
82-
managers := grid.Managers
83-
hosts := grid.Hosts
84-
for pb.Next() {
85-
// Pick a random manager.
86-
src, dst := rng.Intn(len(managers)), rng.Intn(len(managers))
87-
if src == dst {
88-
dst = (dst + 1) % len(managers)
77+
b.Run("bytes", func(b *testing.B) {
78+
for par := 1; par <= 32; par *= 2 {
79+
b.Run("par="+strconv.Itoa(par*runtime.GOMAXPROCS(0)), func(b *testing.B) {
80+
defer timeout(60 * time.Second)()
81+
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
82+
defer cancel()
83+
b.ReportAllocs()
84+
b.SetBytes(int64(len(payload) * 2))
85+
b.ResetTimer()
86+
t := time.Now()
87+
var ops int64
88+
var lat int64
89+
b.SetParallelism(par)
90+
b.RunParallel(func(pb *testing.PB) {
91+
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
92+
n := 0
93+
var latency int64
94+
managers := grid.Managers
95+
hosts := grid.Hosts
96+
for pb.Next() {
97+
// Pick a random manager.
98+
src, dst := rng.Intn(len(managers)), rng.Intn(len(managers))
99+
if src == dst {
100+
dst = (dst + 1) % len(managers)
101+
}
102+
local := managers[src]
103+
conn := local.Connection(hosts[dst])
104+
if conn == nil {
105+
b.Fatal("No connection")
106+
}
107+
// Send the payload.
108+
t := time.Now()
109+
resp, err := conn.Request(ctx, handlerTest, payload)
110+
latency += time.Since(t).Nanoseconds()
111+
if err != nil {
112+
if debugReqs {
113+
fmt.Println(err.Error())
114+
}
115+
b.Fatal(err.Error())
116+
}
117+
PutByteBuffer(resp)
118+
n++
89119
}
90-
local := managers[src]
91-
conn := local.Connection(hosts[dst])
92-
if conn == nil {
93-
b.Fatal("No connection")
120+
atomic.AddInt64(&ops, int64(n))
121+
atomic.AddInt64(&lat, latency)
122+
})
123+
spent := time.Since(t)
124+
if spent > 0 && n > 0 {
125+
// Since we are benchmarking n parallel servers we need to multiply by n.
126+
// This will give an estimate of the total ops/s.
127+
latency := float64(atomic.LoadInt64(&lat)) / float64(time.Millisecond)
128+
b.ReportMetric(float64(n)*float64(ops)/spent.Seconds(), "vops/s")
129+
b.ReportMetric(latency/float64(ops), "ms/op")
130+
}
131+
})
132+
}
133+
})
134+
return
135+
b.Run("rpc", func(b *testing.B) {
136+
for par := 1; par <= 32; par *= 2 {
137+
b.Run("par="+strconv.Itoa(par*runtime.GOMAXPROCS(0)), func(b *testing.B) {
138+
defer timeout(60 * time.Second)()
139+
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
140+
defer cancel()
141+
b.ReportAllocs()
142+
b.ResetTimer()
143+
t := time.Now()
144+
var ops int64
145+
var lat int64
146+
b.SetParallelism(par)
147+
b.RunParallel(func(pb *testing.PB) {
148+
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
149+
n := 0
150+
var latency int64
151+
managers := grid.Managers
152+
hosts := grid.Hosts
153+
req := testRequest{
154+
Num: rng.Int(),
155+
String: "hello",
94156
}
95-
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
96-
// Send the payload.
97-
t := time.Now()
98-
resp, err := conn.Request(ctx, handlerTest, payload)
99-
latency += time.Since(t).Nanoseconds()
100-
cancel()
101-
if err != nil {
102-
if debugReqs {
103-
fmt.Println(err.Error())
157+
for pb.Next() {
158+
// Pick a random manager.
159+
src, dst := rng.Intn(len(managers)), rng.Intn(len(managers))
160+
if src == dst {
161+
dst = (dst + 1) % len(managers)
104162
}
105-
b.Fatal(err.Error())
163+
local := managers[src]
164+
conn := local.Connection(hosts[dst])
165+
if conn == nil {
166+
b.Fatal("No connection")
167+
}
168+
// Send the payload.
169+
t := time.Now()
170+
resp, err := rpc.Call(ctx, conn, &req)
171+
latency += time.Since(t).Nanoseconds()
172+
if err != nil {
173+
if debugReqs {
174+
fmt.Println(err.Error())
175+
}
176+
b.Fatal(err.Error())
177+
}
178+
rpc.PutResponse(resp)
179+
n++
106180
}
107-
PutByteBuffer(resp)
108-
n++
181+
atomic.AddInt64(&ops, int64(n))
182+
atomic.AddInt64(&lat, latency)
183+
})
184+
spent := time.Since(t)
185+
if spent > 0 && n > 0 {
186+
// Since we are benchmarking n parallel servers we need to multiply by n.
187+
// This will give an estimate of the total ops/s.
188+
latency := float64(atomic.LoadInt64(&lat)) / float64(time.Millisecond)
189+
b.ReportMetric(float64(n)*float64(ops)/spent.Seconds(), "vops/s")
190+
b.ReportMetric(latency/float64(ops), "ms/op")
109191
}
110-
atomic.AddInt64(&ops, int64(n))
111-
atomic.AddInt64(&lat, latency)
112192
})
113-
spent := time.Since(t)
114-
if spent > 0 && n > 0 {
115-
// Since we are benchmarking n parallel servers we need to multiply by n.
116-
// This will give an estimate of the total ops/s.
117-
latency := float64(atomic.LoadInt64(&lat)) / float64(time.Millisecond)
118-
b.ReportMetric(float64(n)*float64(ops)/spent.Seconds(), "vops/s")
119-
b.ReportMetric(latency/float64(ops), "ms/op")
120-
}
121-
})
122-
}
193+
}
194+
})
123195
}
124196

125197
func BenchmarkStream(b *testing.B) {

internal/grid/connection.go

+7-1
Original file line numberDiff line numberDiff line change
@@ -1187,6 +1187,10 @@ func (c *Connection) handleConnectMux(ctx context.Context, m message, subID *sub
11871187
// Stream:
11881188
var handler *StreamHandler
11891189
if subID == nil {
1190+
if !m.Handler.valid() {
1191+
logger.LogIf(ctx, c.queueMsg(m, muxConnectError{Error: "Invalid Handler"}))
1192+
return
1193+
}
11901194
handler = c.handlers.streams[m.Handler]
11911195
} else {
11921196
handler = c.handlers.subStreams[*subID]
@@ -1237,6 +1241,9 @@ func (c *Connection) handleRequest(ctx context.Context, m message, subID *subHan
12371241
logger.LogIf(ctx, c.queueMsg(m, muxConnectError{Error: "Invalid Handler for type"}))
12381242
return
12391243
}
1244+
1245+
// TODO: This causes allocations, but escape analysis doesn't really show the cause.
1246+
// If another faithful engineer wants to take a stab, feel free.
12401247
go func(m message) {
12411248
var start time.Time
12421249
if m.DeadlineMS > 0 {
@@ -1258,7 +1265,6 @@ func (c *Connection) handleRequest(ctx context.Context, m message, subID *subHan
12581265
}
12591266
}()
12601267

1261-
// TODO: Maybe recycle m.Payload - should be free here.
12621268
if m.DeadlineMS > 0 && time.Since(start).Milliseconds()+c.addDeadline.Milliseconds() > int64(m.DeadlineMS) {
12631269
if debugReqs {
12641270
fmt.Println(m.MuxID, c.StringReverse(), "DEADLINE EXCEEDED")

internal/grid/grid_test.go

+3-19
Original file line numberDiff line numberDiff line change
@@ -160,11 +160,7 @@ func TestSingleRoundtripGenerics(t *testing.T) {
160160
return resp, nil
161161
}
162162
// Return error
163-
h2 := NewSingleHandler[*testRequest, *testResponse](handlerTest2, func() *testRequest {
164-
return &testRequest{}
165-
}, func() *testResponse {
166-
return &testResponse{}
167-
})
163+
h2 := NewSingleHandler[*testRequest, *testResponse](handlerTest2, newTestRequest, newTestResponse)
168164
handler2 := func(req *testRequest) (resp *testResponse, err *RemoteErr) {
169165
r := RemoteErr(req.String)
170166
return nil, &r
@@ -682,13 +678,7 @@ func testGenericsStreamRoundtrip(t *testing.T, local, remote *Manager) {
682678

683679
// We fake a local and remote server.
684680
remoteHost := remote.HostName()
685-
handler := NewStream[*testRequest, *testRequest, *testResponse](handlerTest, func() *testRequest {
686-
return &testRequest{}
687-
}, func() *testRequest {
688-
return &testRequest{}
689-
}, func() *testResponse {
690-
return &testResponse{}
691-
})
681+
handler := NewStream[*testRequest, *testRequest, *testResponse](handlerTest, newTestRequest, newTestRequest, newTestResponse)
692682
handler.InCapacity = 1
693683
handler.OutCapacity = 1
694684
const payloads = 10
@@ -759,13 +749,7 @@ func testGenericsStreamRoundtripSubroute(t *testing.T, local, remote *Manager) {
759749

760750
// We fake a local and remote server.
761751
remoteHost := remote.HostName()
762-
handler := NewStream[*testRequest, *testRequest, *testResponse](handlerTest, func() *testRequest {
763-
return &testRequest{}
764-
}, func() *testRequest {
765-
return &testRequest{}
766-
}, func() *testResponse {
767-
return &testResponse{}
768-
})
752+
handler := NewStream[*testRequest, *testRequest, *testResponse](handlerTest, newTestRequest, newTestRequest, newTestResponse)
769753
handler.InCapacity = 1
770754
handler.OutCapacity = 1
771755
const payloads = 10

internal/grid/grid_types_test.go

+8
Original file line numberDiff line numberDiff line change
@@ -29,3 +29,11 @@ type testResponse struct {
2929
OrgString string
3030
Embedded testRequest
3131
}
32+
33+
func newTestRequest() *testRequest {
34+
return &testRequest{}
35+
}
36+
37+
func newTestResponse() *testResponse {
38+
return &testResponse{}
39+
}

internal/grid/handlers.go

+1
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,7 @@ type (
147147
// A non-nil error value will be returned as RemoteErr(msg) to client.
148148
// No client information or cancellation (deadline) is available.
149149
// Include this in payload if needed.
150+
// Payload should be recycled with PutByteBuffer if not needed after the call.
150151
SingleHandlerFn func(payload []byte) ([]byte, *RemoteErr)
151152

152153
// StatelessHandlerFn must handle incoming stateless request.

internal/grid/muxclient.go

-2
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ type muxClient struct {
3535
MuxID uint64
3636
SendSeq, RecvSeq uint32
3737
LastPong int64
38-
Resp chan []byte
3938
BaseFlags Flags
4039
ctx context.Context
4140
cancelFn context.CancelCauseFunc
@@ -62,7 +61,6 @@ func newMuxClient(ctx context.Context, muxID uint64, parent *Connection) *muxCli
6261
ctx, cancelFn := context.WithCancelCause(ctx)
6362
return &muxClient{
6463
MuxID: muxID,
65-
Resp: make(chan []byte, 1),
6664
ctx: ctx,
6765
cancelFn: cancelFn,
6866
parent: parent,

0 commit comments

Comments
 (0)