-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
basic support bidirectional stream #9
base: master
Are you sure you want to change the base?
Conversation
client.go
Outdated
return 1 | ||
} | ||
|
||
func (c *Client) CallStream(opname string, req, resp Message, handleStream func(client *StreamClient) error) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
undeclared name: StreamClient
server.go
Outdated
} | ||
} | ||
|
||
func (s *Server) handleRequest(wi *serverWorkItem, pendingResponses chan<- *serverWorkItem, stopCh <-chan struct{}) { | ||
func (s *Server) handleRequest(wi *serverWorkItem, pendingResponses chan<- *serverWorkItem, stream *StreamServer, stopCh <-chan struct{}) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
undeclared name: StreamServer
client.go
Outdated
// | ||
// This saves memory and CPU resources. | ||
|
||
s := NewStreamClient(sid, xxhash.Sum64String(opname), c.opts.Codec, in, func(id uint32, m *request, resp chan error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
undeclared name: NewStreamClient
server.go
Outdated
@@ -301,19 +374,63 @@ func (s *Server) connReader(br *bufio.Reader, pipeline bool, conn net.Conn, pend | |||
} | |||
continue | |||
} | |||
|
|||
var stream *StreamServer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
undeclared name: StreamServer
server.go
Outdated
in := make(chan *request, 1) | ||
streamID := binary.BigEndian.Uint32(wi.streamID[:]) | ||
s.inStreamMsg[streamID] = in | ||
stream = NewServerStream(streamID, s.opts.Codec, in, func(id uint32, m *response, resp chan error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
undeclared name: NewServerStream
stream_test.go
Outdated
var request = strconv.Itoa(t.N / 2) | ||
var response = "" | ||
|
||
err := c.CallStream("stream", &request, &response, func(client *StreamClient) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
undeclared name: StreamClient
stream_test.go
Outdated
t.ResetTimer() | ||
t.ReportAllocs() | ||
|
||
err := client.SendMsg(request) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
invalid operation: client (variable of type *invalid type) has no field or method SendMsg
stream_test.go
Outdated
var r string | ||
|
||
for i := 0; i < t.N/2; i++ { | ||
err = client.RecvMsg(&r) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
invalid operation: client (variable of type *invalid type) has no field or method RecvMsg
@@ -53,7 +53,7 @@ func (h *exposedCtx) Handle(ctxv *exposedCtx) (rctxv *exposedCtx) { | |||
} | |||
reply := opinfo.ReplyType() | |||
|
|||
if err = handler(nil, args, reply); err != nil { | |||
if err = handler(&Context{stream}, args, reply); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
github.com/thesyncim/exposed.Context composite literal uses unkeyed fields
client.go
Outdated
ReleaseResponse(rawResp) | ||
releaseClientWorkItem(wi) | ||
c.decPendingRequests() | ||
c.getError(err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Error return value of c.getError
is not checked
client.go
Outdated
ReleaseResponse(rawResp) | ||
releaseClientWorkItem(wi) | ||
c.decPendingRequests() | ||
c.getError(err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Error return value of c.getError
is not checked
client.go
Outdated
ReleaseResponse(rawResp) | ||
releaseClientWorkItem(wi) | ||
c.decPendingRequests() | ||
c.getError(err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Error return value of c.getError
is not checked
codec encoding.Codec | ||
|
||
inMessages chan *request | ||
errOutCh chan error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
errOutCh
is unused
|
||
type StreamClient struct { | ||
ID uint32 | ||
isServer bool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isServer
is unused
stream.go
Outdated
codec encoding.Codec | ||
|
||
inMessages <-chan *response | ||
errOutCh chan error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
errOutCh
is unused
client.go
Outdated
@@ -159,6 +159,11 @@ type Client struct { | |||
pendingResponses map[uint32]*clientWorkItem | |||
pendingResponsesLock sync.Mutex | |||
|
|||
incomingStreamMsg sync.Map | |||
incomingStreamMsgLock sync.Mutex |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
incomingStreamMsgLock
is unused
server.go
Outdated
@@ -87,6 +88,10 @@ var defaultServerOptions = serverOptions{ | |||
MaxBatchDelay: 0, | |||
} | |||
|
|||
var ( | |||
typeUnarycall = []byte{byte(0)} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typeUnarycall
is unused
stream_test.go
Outdated
if err != nil { | ||
t.Fatal(err) | ||
} | ||
if i == 99 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
empty branch
make stream thread safe improve and reorganize code
@@ -236,23 +276,34 @@ func (s *Server) serveConn(conn net.Conn) error { | |||
case err = <-readerDone: | |||
conn.Close() | |||
close(stopCh) | |||
inStreamMsg.Range(func(key, value interface{}) bool { | |||
panic("das") | |||
close(value.(chan *request)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unreachable code
<-writerDone | ||
case err = <-writerDone: | ||
conn.Close() | ||
inStreamMsg.Range(func(key, value interface{}) bool { | ||
panic("das") | ||
close(value.(chan *request)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unreachable code
streamMessage = packetControl(1) | ||
streamStart = packetControl(2) | ||
///todo implement | ||
streamClose = packetControl(3) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
streamClose
is unused
streamStart = packetControl(2) | ||
///todo implement | ||
streamClose = packetControl(3) | ||
streamCloseRead = packetControl(4) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
streamCloseRead
is unused
///todo implement | ||
streamClose = packetControl(3) | ||
streamCloseRead = packetControl(4) | ||
streamCloseWrite = packetControl(5) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
streamCloseWrite
is unused
inMessages <-chan *response | ||
serverOutMessages chan<- WorkItem | ||
|
||
errOutCh chan error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
errOutCh
is unused
codec encoding.Codec | ||
|
||
inMessages <-chan *response | ||
serverOutMessages chan<- WorkItem |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
serverOutMessages
is unused
reqID [4]byte | ||
type serverUnaryWorkItem struct { | ||
ctx *exposedCtx | ||
startStream bool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
startStream
is unused
if err != nil { | ||
t.Fatal(err) | ||
} | ||
if i == 99 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
empty branch
}) | ||
sid := c.nextStreamID() | ||
inStream, _ := c.incomingStreamMsg.LoadOrStore(sid, make(chan *response, 100)) | ||
var in chan *response |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should merge variable declaration with assignment on next line
Codecov Report
@@ Coverage Diff @@
## master #9 +/- ##
===========================================
- Coverage 66.84% 53.08% -13.77%
===========================================
Files 9 10 +1
Lines 926 1217 +291
===========================================
+ Hits 619 646 +27
- Misses 239 487 +248
- Partials 68 84 +16
Continue to review full report at Codecov.
|
No description provided.