Skip to content

Commit b2c018f

Browse files
authored
Fix binary ws transport and add new callback types (#41)
This commit fixes returning binary data from an Emit or Ack callback. It also adds an exported generic Callback wrapper that can be used for On events. So callbacks can accept interface parameters if needed. The binary websocket transport can offer a buffered or unbuffered reader.
1 parent ea5aa32 commit b2c018f

14 files changed

+258
-86
lines changed

Diff for: callback/callback.go

+18-4
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,25 @@ func (ErrorWrap) Unserialize(string) error { return ErrStubUnserial
1717

1818
type FuncAny func(...interface{}) error
1919

20-
func (fn FuncAny) Callback(v ...interface{}) error {
21-
return fn(v...)
20+
func (fn FuncAny) Callback(v ...interface{}) error { return fn(v...) }
21+
func (FuncAny) Serialize() (string, error) { return "", ErrStubSerialize }
22+
func (FuncAny) Unserialize(string) error { return ErrStubUnserialize }
23+
24+
type FuncAnyAck func(...interface{}) []seri.Serializable
25+
26+
func (fn FuncAnyAck) Callback(v ...interface{}) error { return ErrStubSerialize }
27+
func (fn FuncAnyAck) CallbackAck(v ...interface{}) []interface{} {
28+
slice := fn(v...)
29+
out := make([]interface{}, len(v))
30+
for i, ice := range slice {
31+
if x, ok := ice.(interface{ Interface() interface{} }); ok {
32+
out[i] = x.Interface()
33+
}
34+
}
35+
return out
2236
}
23-
func (FuncAny) Serialize() (string, error) { return "", ErrStubSerialize }
24-
func (FuncAny) Unserialize(string) error { return ErrStubUnserialize }
37+
func (FuncAnyAck) Serialize() (string, error) { return "", ErrStubSerialize }
38+
func (FuncAnyAck) Unserialize(string) error { return ErrStubUnserialize }
2539

2640
type FuncString func(string)
2741

Diff for: engineio/option.v2.go

+4
Original file line numberDiff line numberDiff line change
@@ -81,9 +81,13 @@ func WithTransportChannelBuffer(n int) Option {
8181

8282
func WithTransportOption(opts ...eiot.Option) Option {
8383
return func(svr Server) {
84+
ServerCheck: // makes things an O(2^n) check...
8485
switch v := svr.(type) {
8586
case *serverV2:
8687
v.eto = append(v.eto, opts...)
88+
case interface{ prev() Server }:
89+
svr = v.prev()
90+
goto ServerCheck
8791
}
8892
}
8993
}

Diff for: engineio/server.v2.go

+1
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ func (v2 *serverV2) new(opts ...Option) *serverV2 {
8888
WithTransport("polling", eiot.NewPollingTransport(v2.transportChanBuf))(v2)
8989
WithTransport("websocket", eiot.NewWebsocketTransport(v2.transportChanBuf))(v2)
9090

91+
v2.eto = []eiot.Option{eiot.WithGovernor(1500*time.Microsecond, 500*time.Microsecond)}
9192
v2.With(v2, opts...)
9293

9394
return v2

Diff for: engineio/session/manage.go

+6-8
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,16 @@ import "time"
55
type sessionCtxKey string
66

77
const (
8-
SessionTimeoutKey sessionCtxKey = "timeout"
9-
SessionIntervalKey sessionCtxKey = "interval"
10-
SessionExtendTimeoutKey sessionCtxKey = "timeout-extend"
11-
SessionExtendIntervalKey sessionCtxKey = "interval-extend"
8+
SessionTimeoutKey sessionCtxKey = "timeout"
9+
SessionIntervalKey sessionCtxKey = "interval"
10+
SessionExtendTimeoutKey sessionCtxKey = "timeout-extend"
1211

1312
SessionCloseChannelKey sessionCtxKey = "cancel-channel"
1413
SessionCloseFunctionKey sessionCtxKey = "cancel-function"
1514
)
1615

1716
type (
18-
TimeoutChannel func() <-chan struct{}
19-
IntervalChannel func() <-chan time.Time
20-
ExtendTimeoutFunc func()
21-
ExtendIntervalFunc func(time.Duration)
17+
TimeoutChannel func() <-chan struct{}
18+
IntervalChannel func() <-chan time.Time
19+
ExtendTimeoutFunc func()
2220
)

Diff for: engineio/sessions.go

-10
Original file line numberDiff line numberDiff line change
@@ -130,16 +130,6 @@ func (c *lifecycle) WithInterval(ctx context.Context, d time.Duration) context.C
130130
c.id = d
131131
c.i.LoadOrStore(sessionID, time.NewTicker(c.id))
132132

133-
ctx = context.WithValue(ctx, eios.SessionExtendIntervalKey, eios.ExtendIntervalFunc(func(d time.Duration) {
134-
if val, ok := c.i.Load(sessionID); ok {
135-
if d != 0 {
136-
val.(*time.Ticker).Reset(d)
137-
} else {
138-
val.(*time.Ticker).Reset(c.id)
139-
}
140-
}
141-
}))
142-
143133
var interval eios.IntervalChannel = func() <-chan time.Time {
144134
if val, ok := c.i.Load(sessionID); ok {
145135
val.(*time.Ticker).Reset(c.id)

Diff for: engineio/transport/option.go

+22
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package transport
22

3+
import "time"
4+
35
type Option func(Transporter)
46

57
func WithCodec(codec Codec) Option {
@@ -37,3 +39,23 @@ func WithNoPing() Option {
3739
}
3840
}
3941
}
42+
43+
func WithBufferedReader() Option {
44+
return func(t Transporter) {
45+
switch v := t.(type) {
46+
// TODO(njones): case *PollingTransport: ...
47+
case *WebsocketTransport:
48+
v.buffered = true
49+
}
50+
}
51+
}
52+
53+
func WithGovernor(minTime, sleep time.Duration) Option {
54+
return func(t Transporter) {
55+
switch v := t.(type) {
56+
case *WebsocketTransport:
57+
v.governor.minTime = minTime
58+
v.governor.sleep = sleep
59+
}
60+
}
61+
}

Diff for: engineio/transport/transport.websocket.go

+67-23
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
package transport
22

33
import (
4+
"bytes"
45
"context"
6+
"errors"
57
"fmt"
68
"io"
79
"net/http"
@@ -24,8 +26,13 @@ type WebsocketTransport struct {
2426

2527
origin []string
2628
PingMsg string
29+
buffered bool // default: false
2730
isInitProbe bool
2831
fnOnUpgrade func() error
32+
governor struct {
33+
sleep time.Duration
34+
minTime time.Duration
35+
}
2936
}
3037

3138
func NewWebsocketTransport(chanBuf int) func(SessionID, Codec) Transporter {
@@ -80,7 +87,9 @@ func (t *WebsocketTransport) Run(w http.ResponseWriter, r *http.Request, opts ..
8087
grp.Go(func() error { return t.incoming(ctx) })
8188
grp.Go(func() error { return t.outgoing(r.WithContext(ctx)) })
8289

83-
return grp.Wait()
90+
err = grp.Wait()
91+
t.conn.Close(ws.StatusNormalClosure, "done")
92+
return err
8493
}
8594

8695
func (t *WebsocketTransport) probe(w http.ResponseWriter, r *http.Request) error {
@@ -142,13 +151,12 @@ func (t *WebsocketTransport) incoming(ctx context.Context) (err error) {
142151
if !ok {
143152
extendTimeout = func() {}
144153
}
145-
extendInterval, ok := ctx.Value(eios.SessionExtendIntervalKey).(eios.ExtendIntervalFunc)
146-
if !ok {
147-
extendTimeout = func() {}
148-
}
149154

150-
defer t.conn.Close(ws.StatusNormalClosure, "write")
151155
var done func()
156+
var reason string
157+
defer func() { t.conn.Close(ws.StatusNormalClosure, reason) }()
158+
159+
var start = time.Now()
152160
Write:
153161

154162
for {
@@ -157,25 +165,29 @@ Write:
157165
if stop != nil {
158166
done = stop
159167
}
168+
reason = "stop"
160169
break Write
161170
case <-timeout:
171+
reason = "timeout"
162172
break Write
163173
case <-interval:
174+
reason = "interval"
164175
cw, err := t.conn.Writer(ctx, ws.MessageText)
165176
if err != nil {
166177
if cw != nil {
167178
cw.Close()
168179
}
169180
return err
170181
}
182+
171183
if err = t.codec.PacketEncoder.To(cw).WritePacket(eiop.Packet{T: eiop.PingPacket, D: nil}); err != nil {
172184
cw.Close()
173185
return err
174186
}
175187
cw.Close()
176188
case packet := <-t.receive:
189+
reason = "receive"
177190
extendTimeout()
178-
extendInterval(0)
179191
if packet.T == eiop.BinaryPacket {
180192
cw, err := t.conn.Writer(ctx, ws.MessageBinary)
181193
if err != nil {
@@ -185,11 +197,20 @@ Write:
185197
io.Copy(cw, packet.D.(io.Reader))
186198
cw.Close()
187199
} else {
200+
188201
cw, err := t.conn.Writer(ctx, ws.MessageText)
189202
if err != nil {
190203
return err
191204
}
192205

206+
if t.governor.minTime > 0 {
207+
// we need to slow things down sometimes...
208+
if time.Since(start) < t.governor.minTime {
209+
time.Sleep(t.governor.sleep)
210+
}
211+
start = time.Now()
212+
}
213+
193214
t.codec.PacketEncoder.To(cw).WritePacket(packet)
194215
cw.Close()
195216
}
@@ -206,40 +227,63 @@ Write:
206227
}
207228
default:
208229
}
230+
209231
return nil
210232
}
211233

212-
func (t *WebsocketTransport) outgoing(r *http.Request) error {
213-
ctx := r.Context()
214-
enc := t.codec.PacketEncoder
215-
dec := t.codec.PacketDecoder
234+
type syncReader struct {
235+
r io.Reader
236+
s *sync.WaitGroup
237+
}
216238

217-
extendTimeout, ok := ctx.Value(eios.SessionExtendTimeoutKey).(eios.ExtendTimeoutFunc)
218-
if !ok {
219-
extendTimeout = func() {}
239+
func (r syncReader) Read(p []byte) (n int, err error) {
240+
n, err = r.r.Read(p)
241+
if errors.Is(err, io.EOF) {
242+
r.s.Done()
220243
}
221-
extendInterval, ok := ctx.Value(eios.SessionExtendIntervalKey).(eios.ExtendIntervalFunc)
244+
return n, err
245+
}
246+
247+
func (t *WebsocketTransport) outgoing(r *http.Request) (err error) {
248+
ctx, enc, dec := r.Context(), t.codec.PacketEncoder, t.codec.PacketDecoder
249+
extendTimeout, ok := ctx.Value(eios.SessionExtendTimeoutKey).(eios.ExtendTimeoutFunc)
222250
if !ok {
223251
extendTimeout = func() {}
224252
}
225253

226-
defer t.conn.Close(ws.StatusNormalClosure, "write")
254+
var unbuffered = new(sync.WaitGroup)
255+
defer t.conn.Close(ws.StatusNormalClosure, "read")
227256

228257
for {
258+
if !t.buffered {
259+
unbuffered.Wait()
260+
}
261+
229262
// - /* blocking */ read a packet off the wire...
230-
mt, cr, err := t.conn.Reader(ctx) // this will close when shutdown() is called.
263+
msgType, cr, err := t.conn.Reader(ctx) // this will close when shutdown() is called.
231264
if err != nil {
232265
return err
233266
}
234267

235268
extendTimeout()
236-
extendInterval(0)
237-
238-
if mt != ws.MessageText {
269+
if msgType != ws.MessageText {
239270
// this is binary data
240-
t.send <- eiop.Packet{
241-
T: eiop.BinaryPacket,
242-
D: cr,
271+
if t.buffered {
272+
var buf = new(bytes.Buffer)
273+
_, err := buf.ReadFrom(cr)
274+
if err != nil {
275+
return err
276+
}
277+
t.send <- eiop.Packet{
278+
T: eiop.BinaryPacket,
279+
D: buf,
280+
}
281+
} else {
282+
unbuffered.Add(1)
283+
t.send <- eiop.Packet{
284+
T: eiop.BinaryPacket,
285+
D: syncReader{r: cr, s: unbuffered},
286+
}
243287
}
244288
continue
245289
}

Diff for: server.v1.runback.go

+16
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,10 @@ func doDisconnectPacket(v1 *ServerV1) func(SocketID, siot.Socket, *Request) erro
115115

116116
func doEventPacket(v1 *ServerV1) func(SocketID, siot.Socket) error {
117117
return func(socketID SocketID, socket siot.Socket) (err error) {
118+
type callbackAck interface {
119+
CallbackAck(...interface{}) []interface{}
120+
}
121+
118122
switch data := socket.Data.(type) {
119123
case []interface{}:
120124
event, ok := data[0].(string)
@@ -126,9 +130,21 @@ func doEventPacket(v1 *ServerV1) func(SocketID, siot.Socket) error {
126130
}
127131

128132
if fn, ok := v1.events[socket.Namespace][event][socketID]; ok {
133+
if socket.AckID > 0 {
134+
if fn, ok := fn.(callbackAck); ok {
135+
vals := fn.CallbackAck(data...)
136+
return v1.tr().Send(socketID, vals, siop.WithNamespace(socket.Namespace), siop.WithAckID(socket.AckID), siop.WithType(byte(siop.AckPacket)))
137+
}
138+
}
129139
return fn.Callback(data...)
130140
}
131141
if fn, ok := v1.events[socket.Namespace][event][serverEvent]; ok {
142+
if socket.AckID > 0 {
143+
if fn, ok := fn.(callbackAck); ok {
144+
vals := fn.CallbackAck(data...)
145+
return v1.tr().Send(socketID, vals, siop.WithNamespace(socket.Namespace), siop.WithAckID(socket.AckID), siop.WithType(byte(siop.AckPacket)))
146+
}
147+
}
132148
return fn.Callback(data...)
133149
}
134150
case []string:

0 commit comments

Comments
 (0)