16
16
package io .infinitape .etherjar .rpc .ws ;
17
17
18
18
import com .fasterxml .jackson .core .JsonProcessingException ;
19
+ import io .infinitape .etherjar .rpc .RpcResponseException ;
19
20
import io .netty .channel .*;
20
21
import io .netty .handler .codec .http .FullHttpResponse ;
21
22
import io .netty .handler .codec .http .websocketx .*;
22
23
import io .netty .util .CharsetUtil ;
23
24
25
+ import java .nio .charset .Charset ;
24
26
import java .util .ArrayList ;
25
27
import java .util .HashMap ;
26
28
import java .util .List ;
@@ -47,6 +49,8 @@ public class SocketApiHandler extends SimpleChannelInboundHandler<Object> {
47
49
private List <Subscription > subscriptions = new ArrayList <>();
48
50
private JacksonWsConverter rpcConverter = new JacksonWsConverter ();
49
51
52
+ private List <String > buffer = new ArrayList <>();
53
+
50
54
public ChannelFuture handshakeFuture () {
51
55
return handshakeFuture ;
52
56
}
@@ -75,52 +79,79 @@ protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Except
75
79
}
76
80
77
81
final WebSocketFrame frame = (WebSocketFrame ) msg ;
78
- if (frame instanceof TextWebSocketFrame ) {
79
- final TextWebSocketFrame textFrame = (TextWebSocketFrame ) frame ;
80
- SubscriptionJson json = rpcConverter .readSubscription (textFrame .text ());
81
- if (json .getSubscription () != null ) {
82
- boolean consumed = false ;
83
- String id = json .getSubscription ();
84
- for (int i = 0 ; i < subscriptions .size () && !consumed ; i ++) {
85
- Subscription s = subscriptions .get (i );
86
- if (id .equals (s .getId ())) {
87
- consumed = true ;
88
- s .onReceive (json );
89
- }
90
- }
91
- if (!consumed ) {
92
- System .err .println ("Unknown subscription:" + id );
93
- }
94
- } else if (json .getId () != null ) {
95
- subscribeLock .lock ();
96
- try {
97
- Subscription s = initializing .remove (json .getId ());
98
- if (s != null ) {
99
- s .setId (json .getStringResult ());
100
- if (json .getError () != null ) {
101
- subscriptions .remove (s );
102
- s .onClose (json .extractError ());
103
- }
104
- } else {
105
- System .err .println ("Cannot find subscriber " + json .getId ());
106
- }
107
- } finally {
108
- subscribeLock .unlock ();
109
- }
82
+ if (frame instanceof TextWebSocketFrame || frame instanceof ContinuationWebSocketFrame ) {
83
+ String textFrameContent = frame .content ().toString (CharsetUtil .UTF_8 );
84
+ if (frame .isFinalFragment ()) {
85
+ String fullMessage = extractBuffer (textFrameContent );
86
+ processMessage (fullMessage );
87
+ } else {
88
+ buffer .add (textFrameContent );
110
89
}
111
90
} else if (frame instanceof CloseWebSocketFrame ) {
112
91
ch .close ();
92
+ } else {
93
+ System .err .println ("Invalid frame: " + frame .getClass ());
94
+ }
95
+ }
96
+
97
+ protected String extractBuffer (String last ) {
98
+ String fullMessage ;
99
+ if (buffer .isEmpty ()) {
100
+ fullMessage = last ;
101
+ } else {
102
+ StringBuilder sb = new StringBuilder ();
103
+ for (String s : buffer ) {
104
+ sb .append (s );
105
+ }
106
+ buffer = new ArrayList <>();
107
+ sb .append (last );
108
+ fullMessage = sb .toString ();
113
109
}
110
+ return fullMessage ;
111
+ }
114
112
113
+ public void processMessage (String message ) throws RpcResponseException {
114
+ SubscriptionJson json = rpcConverter .readSubscription (message );
115
+ if (json .getSubscription () != null ) {
116
+ boolean consumed = false ;
117
+ String id = json .getSubscription ();
118
+ for (int i = 0 ; i < subscriptions .size () && !consumed ; i ++) {
119
+ Subscription s = subscriptions .get (i );
120
+ if (id .equals (s .getId ())) {
121
+ consumed = true ;
122
+ s .onReceive (json );
123
+ }
124
+ }
125
+ if (!consumed ) {
126
+ System .err .println ("Unknown subscription:" + id );
127
+ }
128
+ } else if (json .getId () != null ) {
129
+ subscribeLock .lock ();
130
+ try {
131
+ Subscription s = initializing .remove (json .getId ());
132
+ if (s != null ) {
133
+ s .setId (json .getStringResult ());
134
+ if (json .getError () != null ) {
135
+ subscriptions .remove (s );
136
+ s .onClose (json .extractError ());
137
+ }
138
+ } else {
139
+ System .err .println ("Cannot find subscriber " + json .getId ());
140
+ }
141
+ } finally {
142
+ subscribeLock .unlock ();
143
+ }
144
+ }
115
145
}
116
146
117
147
public void stop () {
118
148
subscribeLock .lock ();
119
149
try {
120
- for (Subscription s : subscriptions ) {
150
+ for (Subscription s : subscriptions ) {
121
151
try {
122
152
s .stop (rpcConverter .getObjectMapper (), sequence .getAndIncrement ());
123
- } catch (Exception e ) { }
153
+ } catch (Exception e ) {
154
+ }
124
155
}
125
156
} finally {
126
157
subscribeLock .unlock ();
0 commit comments