Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

basic support bidirectional stream #9

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
add missing stream.go
thesyncim committed Aug 6, 2018
commit bf7601a4ba6ba3d650809394d9d6542019fe8861
124 changes: 124 additions & 0 deletions stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package exposed

import (
"errors"
"github.com/thesyncim/exposed/encoding"
)

type StreamServer struct {
ID uint32
isServer bool
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isServer is unused


codec encoding.Codec

inMessages chan *request
errOutCh chan error
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

errOutCh is unused


sendOutMessages func(id uint32, m *response, resp chan error)
inClosed bool
outClosed bool
}

func NewServerStream(id uint32, codec encoding.Codec, inMessages chan *request, sendf func(id uint32, m *response, resp chan error)) *StreamServer {
return &StreamServer{
ID: id,
codec: codec,
inMessages: inMessages,
sendOutMessages: sendf,
}
}

func (sc *StreamServer) SendMsg(m Message) error {
if sc.outClosed {
return errClosedWriteStream
}
v, err := sc.codec.Marshal(m)
if err != nil {

return err
}
resp := AcquireResponse()
resp.SwapPayload(v)
errch := make(chan error, 1)
sc.sendOutMessages(sc.ID, resp, errch)
return <-errch
}

func (sc *StreamServer) RecvMsg(m Message) (err error) {
if sc.inClosed {
return errClosedReadStream
}
//todo timout
var mr *request
var open bool
if mr, open = <-sc.inMessages; !open {
return errClosedReadChannel
}
err = sc.codec.Unmarshal(mr.payload, m)
releaseRequest(mr)
return
}

type StreamClient struct {
ID uint32
isServer bool
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isServer is unused


op uint64

codec encoding.Codec

inMessages <-chan *response
errOutCh chan error
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

errOutCh is unused


sendOutMessages func(id uint32, m *request, resp chan error)
inClosed bool
outClosed bool
}

func NewStreamClient(id uint32, op uint64, codec encoding.Codec, inMessages <-chan *response, sendf func(id uint32, m *request, resp chan error)) *StreamClient {
return &StreamClient{
ID: id,
op: op,
codec: codec,
inMessages: inMessages,
sendOutMessages: sendf,
}
}

func (sc *StreamClient) SendMsg(m Message) error {
if sc.outClosed {
return errClosedWriteStream
}
v, err := sc.codec.Marshal(m)
if err != nil {
return err
}
req := acquireRequest()
req.SetOperation(sc.op)
req.SwapPayload(v)
errch := make(chan error, 1)
sc.sendOutMessages(sc.ID, req, errch)
return <-errch
}

func (sc *StreamClient) RecvMsg(m Message) (err error) {
if sc.inClosed {
return errClosedReadStream
}
//todo timout
var mr *response
var ok bool
if mr, ok = <-sc.inMessages; !ok {
return errClosedReadChannel
}

err = sc.codec.Unmarshal(mr.payload, m)
ReleaseResponse(mr)
return err
}

var (
errClosedReadStream = errors.New("closed read StreamServer")
errClosedReadChannel = errors.New("closed read StreamServer channel")
errClosedWriteStream = errors.New("closed write StreamServer")
)