58
58
import rx .subjects .ReplaySubject ;
59
59
import rx .subscriptions .Subscriptions ;
60
60
import rx .util .functions .Action0 ;
61
+ import rx .util .functions .Action1 ;
61
62
import rx .util .functions .Func1 ;
62
63
import rx .util .functions .Func2 ;
63
64
@@ -1006,6 +1007,24 @@ public Subscription onSubscribe(final Observer<? super R> observer) {
1006
1007
// TODO better yet, get TimeoutObservable part of Rx
1007
1008
final SafeObservableSubscription s = new SafeObservableSubscription ();
1008
1009
1010
+ /*
1011
+ * Define the action to perform on timeout outside of the TimerListener to it can capture the HystrixRequestContext
1012
+ * of the calling thread which doesn't exist on the Timer thread.
1013
+ */
1014
+ final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable (originalCommand .concurrencyStrategy , new Runnable () {
1015
+
1016
+ @ Override
1017
+ public void run () {
1018
+ try {
1019
+ R v = originalCommand .getFallbackOrThrowException (HystrixEventType .TIMEOUT , FailureType .TIMEOUT , "timed-out" , new TimeoutException ());
1020
+ observer .onNext (v );
1021
+ observer .onCompleted ();
1022
+ } catch (HystrixRuntimeException re ) {
1023
+ observer .onError (re );
1024
+ }
1025
+ }
1026
+ });
1027
+
1009
1028
TimerListener listener = new TimerListener () {
1010
1029
1011
1030
@ Override
@@ -1021,13 +1040,7 @@ public void tick() {
1021
1040
// we record execution time because we are returning before
1022
1041
originalCommand .recordTotalExecutionTime (originalCommand .invocationStartTime );
1023
1042
1024
- try {
1025
- R v = originalCommand .getFallbackOrThrowException (HystrixEventType .TIMEOUT , FailureType .TIMEOUT , "timed-out" , new TimeoutException ());
1026
- observer .onNext (v );
1027
- observer .onCompleted ();
1028
- } catch (HystrixRuntimeException re ) {
1029
- observer .onError (re );
1030
- }
1043
+ timeoutRunnable .run ();
1031
1044
}
1032
1045
1033
1046
s .unsubscribe ();
@@ -1146,7 +1159,7 @@ private Subscription subscribeWithThreadIsolation(final Observer<? super R> obse
1146
1159
}
1147
1160
1148
1161
// wrap the synchronous execute() method in a Callable and execute in the threadpool
1149
- final Future <R > f = threadPool .getExecutor ().submit (concurrencyStrategy . wrapCallable ( new HystrixContextCallable <R >(new Callable <R >() {
1162
+ final Future <R > f = threadPool .getExecutor ().submit (new HystrixContextCallable <R >(concurrencyStrategy , new Callable <R >() {
1150
1163
1151
1164
@ Override
1152
1165
public R call () throws Exception {
@@ -1215,7 +1228,7 @@ private void preTerminationWork(boolean recordDuration) {
1215
1228
}
1216
1229
}
1217
1230
1218
- }))) ;
1231
+ }));
1219
1232
1220
1233
return new Subscription () {
1221
1234
@@ -3818,7 +3831,7 @@ public void testExecutionSemaphoreWithQueue() {
3818
3831
final TryableSemaphore semaphore =
3819
3832
new TryableSemaphore (HystrixProperty .Factory .asProperty (1 ));
3820
3833
3821
- Runnable r = new HystrixContextRunnable (new Runnable () {
3834
+ Runnable r = new HystrixContextRunnable (HystrixPlugins . getInstance (). getConcurrencyStrategy (), new Runnable () {
3822
3835
3823
3836
@ Override
3824
3837
public void run () {
@@ -3890,7 +3903,7 @@ public void testExecutionSemaphoreWithExecution() {
3890
3903
final TryableSemaphore semaphore =
3891
3904
new TryableSemaphore (HystrixProperty .Factory .asProperty (1 ));
3892
3905
3893
- Runnable r = new HystrixContextRunnable (new Runnable () {
3906
+ Runnable r = new HystrixContextRunnable (HystrixPlugins . getInstance (). getConcurrencyStrategy (), new Runnable () {
3894
3907
3895
3908
@ Override
3896
3909
public void run () {
@@ -3953,7 +3966,7 @@ public void testRejectedExecutionSemaphoreWithFallback() {
3953
3966
3954
3967
final AtomicBoolean exceptionReceived = new AtomicBoolean ();
3955
3968
3956
- Runnable r = new HystrixContextRunnable (new Runnable () {
3969
+ Runnable r = new HystrixContextRunnable (HystrixPlugins . getInstance (). getConcurrencyStrategy (), new Runnable () {
3957
3970
3958
3971
@ Override
3959
3972
public void run () {
@@ -4026,7 +4039,7 @@ public void testSemaphorePermitsInUse() {
4026
4039
// used to signal that all command can finish
4027
4040
final CountDownLatch sharedLatch = new CountDownLatch (1 );
4028
4041
4029
- final Runnable sharedSemaphoreRunnable = new HystrixContextRunnable (new Runnable () {
4042
+ final Runnable sharedSemaphoreRunnable = new HystrixContextRunnable (HystrixPlugins . getInstance (). getConcurrencyStrategy (), new Runnable () {
4030
4043
public void run () {
4031
4044
try {
4032
4045
new LatchedSemaphoreCommand (circuitBreaker , sharedSemaphore , startLatch , sharedLatch ).execute ();
@@ -4054,7 +4067,7 @@ public void run() {
4054
4067
// tracks failures to obtain semaphores
4055
4068
final AtomicInteger failureCount = new AtomicInteger ();
4056
4069
4057
- final Thread isolatedThread = new Thread (new HystrixContextRunnable (new Runnable () {
4070
+ final Thread isolatedThread = new Thread (new HystrixContextRunnable (HystrixPlugins . getInstance (). getConcurrencyStrategy (), new Runnable () {
4058
4071
public void run () {
4059
4072
try {
4060
4073
new LatchedSemaphoreCommand (circuitBreaker , isolatedSemaphore , startLatch , isolatedLatch ).execute ();
@@ -6133,6 +6146,64 @@ protected String getFallback() {
6133
6146
6134
6147
}
6135
6148
6149
+ /**
6150
+ * See https://github.com/Netflix/Hystrix/issues/212
6151
+ */
6152
+ @ Test
6153
+ public void testObservableTimeoutNoFallbackThreadContext () {
6154
+ final AtomicReference <Thread > onErrorThread = new AtomicReference <Thread >();
6155
+ final AtomicBoolean isRequestContextInitialized = new AtomicBoolean ();
6156
+ TestHystrixCommand <Boolean > command = new TestCommandWithTimeout (50 , TestCommandWithTimeout .FALLBACK_NOT_IMPLEMENTED );
6157
+ try {
6158
+ command .toObservable ().doOnError (new Action1 <Throwable >() {
6159
+
6160
+ @ Override
6161
+ public void call (Throwable t1 ) {
6162
+ System .out .println ("onError: " + t1 );
6163
+ System .out .println ("onError Thread: " + Thread .currentThread ());
6164
+ System .out .println ("ThreadContext in onError: " + HystrixRequestContext .isCurrentThreadInitialized ());
6165
+ onErrorThread .set (Thread .currentThread ());
6166
+ isRequestContextInitialized .set (HystrixRequestContext .isCurrentThreadInitialized ());
6167
+ }
6168
+
6169
+ }).toBlockingObservable ().single ();
6170
+ throw new RuntimeException ("expected error to be thrown" );
6171
+ } catch (Throwable e ) {
6172
+ assertTrue (isRequestContextInitialized .get ());
6173
+ assertTrue (onErrorThread .get ().getName ().startsWith ("RxComputationThreadPool" ));
6174
+
6175
+ if (e instanceof HystrixRuntimeException ) {
6176
+ HystrixRuntimeException de = (HystrixRuntimeException ) e ;
6177
+ assertNotNull (de .getFallbackException ());
6178
+ assertTrue (de .getFallbackException () instanceof UnsupportedOperationException );
6179
+ assertNotNull (de .getImplementingClass ());
6180
+ assertNotNull (de .getCause ());
6181
+ assertTrue (de .getCause () instanceof TimeoutException );
6182
+ } else {
6183
+ fail ("the exception should be ExecutionException with cause as HystrixRuntimeException" );
6184
+ }
6185
+ }
6186
+
6187
+ assertTrue (command .getExecutionTimeInMilliseconds () > -1 );
6188
+ assertTrue (command .isResponseTimedOut ());
6189
+
6190
+ assertEquals (0 , command .builder .metrics .getRollingCount (HystrixRollingNumberEvent .SUCCESS ));
6191
+ assertEquals (1 , command .builder .metrics .getRollingCount (HystrixRollingNumberEvent .EXCEPTION_THROWN ));
6192
+ assertEquals (0 , command .builder .metrics .getRollingCount (HystrixRollingNumberEvent .FAILURE ));
6193
+ assertEquals (0 , command .builder .metrics .getRollingCount (HystrixRollingNumberEvent .FALLBACK_REJECTION ));
6194
+ assertEquals (0 , command .builder .metrics .getRollingCount (HystrixRollingNumberEvent .FALLBACK_FAILURE ));
6195
+ assertEquals (0 , command .builder .metrics .getRollingCount (HystrixRollingNumberEvent .FALLBACK_SUCCESS ));
6196
+ assertEquals (0 , command .builder .metrics .getRollingCount (HystrixRollingNumberEvent .SEMAPHORE_REJECTED ));
6197
+ assertEquals (0 , command .builder .metrics .getRollingCount (HystrixRollingNumberEvent .SHORT_CIRCUITED ));
6198
+ assertEquals (0 , command .builder .metrics .getRollingCount (HystrixRollingNumberEvent .THREAD_POOL_REJECTED ));
6199
+ assertEquals (1 , command .builder .metrics .getRollingCount (HystrixRollingNumberEvent .TIMEOUT ));
6200
+ assertEquals (0 , command .builder .metrics .getRollingCount (HystrixRollingNumberEvent .RESPONSE_FROM_CACHE ));
6201
+
6202
+ assertEquals (100 , command .builder .metrics .getHealthCounts ().getErrorPercentage ());
6203
+
6204
+ assertEquals (1 , HystrixRequestLog .getCurrentRequest ().getExecutedCommands ().size ());
6205
+ }
6206
+
6136
6207
/* ******************************************************************************** */
6137
6208
/* ******************************************************************************** */
6138
6209
/* private HystrixCommand class implementations for unit testing */
0 commit comments