22
22
import io .emeraldpay .api .proto .BlockchainOuterClass ;
23
23
import io .emeraldpay .api .proto .Common ;
24
24
import io .emeraldpay .api .Chain ;
25
+ import io .emeraldpay .api .proto .ReactorBlockchainGrpc ;
25
26
import io .grpc .*;
26
27
import io .grpc .netty .NettyChannelBuilder ;
27
28
import io .emeraldpay .etherjar .rpc .*;
55
56
*/
56
57
public class EmeraldTransport implements RpcTransport <DefaultBatch .FutureBatchItem > {
57
58
58
- private final Channel channel ;
59
59
private final BlockchainGrpc .BlockchainBlockingStub blockingStub ;
60
60
61
61
private final ResponseJsonConverter responseJsonConverter = new ResponseJsonConverter ();
@@ -67,17 +67,16 @@ public class EmeraldTransport implements RpcTransport<DefaultBatch.FutureBatchIt
67
67
private final Common .ChainRef chainRef ;
68
68
private BlockchainOuterClass .Selector selector ;
69
69
70
- public EmeraldTransport (Channel channel ,
70
+ public EmeraldTransport (BlockchainGrpc . BlockchainBlockingStub stub ,
71
71
ObjectMapper objectMapper ,
72
72
JacksonRpcConverter rpcConverter ,
73
73
ExecutorService executorService ,
74
74
Common .ChainRef chainRef ) {
75
- this .channel = channel ;
76
75
this .objectMapper = objectMapper ;
77
76
this .rpcConverter = rpcConverter ;
78
77
this .executorService = executorService ;
79
78
this .chainRef = chainRef ;
80
- blockingStub = BlockchainGrpc . newBlockingStub ( channel ) ;
79
+ blockingStub = stub ;
81
80
}
82
81
83
82
/**
@@ -97,7 +96,7 @@ public static EmeraldTransport.Builder newBuilder() {
97
96
* @return new instance of EmeraldGrpcTransport configured for new chain
98
97
*/
99
98
public EmeraldTransport copyForChain (Chain chain ) {
100
- return new EmeraldTransport (channel , objectMapper , rpcConverter , executorService , Common .ChainRef .forNumber (chain .getId ()));
99
+ return new EmeraldTransport (blockingStub , objectMapper , rpcConverter , executorService , Common .ChainRef .forNumber (chain .getId ()));
101
100
}
102
101
103
102
/**
@@ -135,7 +134,7 @@ public EmeraldTransport copyForChain(Chain chain) {
135
134
* @return new instance of EmeraldGrpcTransport configured with new selector
136
135
*/
137
136
public EmeraldTransport copyWithSelector (BlockchainOuterClass .Selector selector ) {
138
- EmeraldTransport copy = new EmeraldTransport (channel , objectMapper , rpcConverter , executorService , chainRef );
137
+ EmeraldTransport copy = new EmeraldTransport (blockingStub , objectMapper , rpcConverter , executorService , chainRef );
139
138
copy .selector = selector ;
140
139
return copy ;
141
140
}
@@ -237,7 +236,7 @@ public <JS, RES> JS read(ByteString bytes, DefaultBatch.FutureBatchItem<JS, RES>
237
236
238
237
@ Override
239
238
public void close () throws IOException {
240
- Channel channel = this .channel ;
239
+ Channel channel = this .blockingStub . getChannel () ;
241
240
if (channel instanceof ManagedChannel ) {
242
241
((ManagedChannel ) channel ).shutdownNow ();
243
242
try {
@@ -258,6 +257,8 @@ public static class Builder {
258
257
259
258
private SslContextBuilder sslContextBuilder ;
260
259
private Channel channel ;
260
+ private BlockchainGrpc .BlockchainBlockingStub stub ;
261
+ private ClientInterceptor [] interceptors ;
261
262
262
263
private ObjectMapper objectMapper ;
263
264
private JacksonRpcConverter rpcConverter ;
@@ -278,6 +279,20 @@ public Builder connectUsing(Channel channel) {
278
279
return this ;
279
280
}
280
281
282
+ /**
283
+ * Setup with an existing stub. All other settings related to connection will be ignored
284
+ *
285
+ * @param stub existing stub
286
+ * @return builder
287
+ */
288
+ public Builder connectUsing (BlockchainGrpc .BlockchainBlockingStub stub ) {
289
+ this .stub = stub ;
290
+ this .channel = null ;
291
+ this .channelBuilder = null ;
292
+ this .sslContextBuilder = null ;
293
+ return this ;
294
+ }
295
+
281
296
/**
282
297
* Apply a custom modification for the default NettyChannelBuilder
283
298
*
@@ -455,32 +470,49 @@ public Builder chain(Chain chain) {
455
470
return this ;
456
471
}
457
472
473
+ /**
474
+ * Add interceptors to the client calls
475
+ *
476
+ * @param interceptors interceptors
477
+ * @return builder
478
+ */
479
+ public Builder interceptors (ClientInterceptor ... interceptors ) {
480
+ this .interceptors = interceptors ;
481
+ return this ;
482
+ }
483
+
458
484
/**
459
485
* Validates configuration and builds GrpcTransport
460
486
*
461
487
* @return configured grpc transport
462
488
* @throws SSLException if problem with TLS certificates
463
489
*/
464
490
public EmeraldTransport build () throws SSLException {
465
- if (channel == null ) {
466
- NettyChannelBuilder nettyBuilder = channelBuilder ;
467
- if (sslContextBuilder != null ) {
468
- nettyBuilder = nettyBuilder .useTransportSecurity ()
469
- .sslContext (sslContextBuilder .build ());
470
- }
471
- if (useLoadBalancing ) {
472
- String policy = "round_robin" ;
473
- if (LoadBalancerRegistry .getDefaultRegistry ().getProvider (policy ) != null ) {
474
- nettyBuilder = nettyBuilder .defaultLoadBalancingPolicy (policy );
491
+ if (stub == null ) {
492
+ if (channel == null ) {
493
+ NettyChannelBuilder nettyBuilder = channelBuilder ;
494
+ if (sslContextBuilder != null ) {
495
+ nettyBuilder = nettyBuilder .useTransportSecurity ()
496
+ .sslContext (sslContextBuilder .build ());
497
+ }
498
+ if (useLoadBalancing ) {
499
+ String policy = "round_robin" ;
500
+ if (LoadBalancerRegistry .getDefaultRegistry ().getProvider (policy ) != null ) {
501
+ nettyBuilder = nettyBuilder .defaultLoadBalancingPolicy (policy );
502
+ }
475
503
}
504
+ ManagedChannelBuilder <?> finalBuilder ;
505
+ if (this .channelUpdate != null ) {
506
+ finalBuilder = this .channelUpdate .apply (nettyBuilder );
507
+ } else {
508
+ finalBuilder = nettyBuilder ;
509
+ }
510
+ channel = finalBuilder .build ();
476
511
}
477
- ManagedChannelBuilder <?> finalBuilder ;
478
- if (this .channelUpdate != null ) {
479
- finalBuilder = this .channelUpdate .apply (nettyBuilder );
480
- } else {
481
- finalBuilder = nettyBuilder ;
512
+ stub = BlockchainGrpc .newBlockingStub (channel );
513
+ if (interceptors != null ) {
514
+ stub = stub .withInterceptors (interceptors );
482
515
}
483
- channel = finalBuilder .build ();
484
516
}
485
517
if (executorService == null ) {
486
518
threadsCount (2 );
@@ -499,7 +531,7 @@ public EmeraldTransport build() throws SSLException {
499
531
chain = Chain .UNSPECIFIED ;
500
532
}
501
533
Common .ChainRef chainRef = Common .ChainRef .forNumber (chain .getId ());
502
- return new EmeraldTransport (channel , objectMapper , rpcConverter , executorService , chainRef );
534
+ return new EmeraldTransport (stub , objectMapper , rpcConverter , executorService , chainRef );
503
535
}
504
536
}
505
537
}
0 commit comments