Skip to content

Changes to hook API to make it more Observable-friendly #683

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Feb 18, 2015
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
470 changes: 360 additions & 110 deletions hystrix-core/src/main/java/com/netflix/hystrix/AbstractCommand.java

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/**
* Copyright 2015 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.hystrix;

import com.netflix.hystrix.strategy.properties.HystrixPropertiesStrategy;

public interface AbstractTestHystrixCommand<R> extends HystrixObservable<R>, InspectableBuilder {

public static enum ExecutionResult {
SUCCESS, FAILURE, ASYNC_FAILURE, BAD_REQUEST, ASYNC_BAD_REQUEST, MULTIPLE_EMITS_THEN_SUCCESS, MULTIPLE_EMITS_THEN_FAILURE, NO_EMITS_THEN_SUCCESS
}

public static enum FallbackResult {
UNIMPLEMENTED, SUCCESS, FAILURE, ASYNC_FAILURE, MULTIPLE_EMITS_THEN_SUCCESS, MULTIPLE_EMITS_THEN_FAILURE, NO_EMITS_THEN_SUCCESS
}

public static enum CacheEnabled {
YES, NO
}

static HystrixPropertiesStrategy TEST_PROPERTIES_FACTORY = new TestPropertiesFactory();

static class TestPropertiesFactory extends HystrixPropertiesStrategy {

@Override
public HystrixCommandProperties getCommandProperties(HystrixCommandKey commandKey, HystrixCommandProperties.Setter builder) {
if (builder == null) {
builder = HystrixCommandPropertiesTest.getUnitTestPropertiesSetter();
}
return HystrixCommandPropertiesTest.asMock(builder);
}

@Override
public HystrixThreadPoolProperties getThreadPoolProperties(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter builder) {
if (builder == null) {
builder = HystrixThreadPoolProperties.Setter.getUnitTestPropertiesBuilder();
}
return HystrixThreadPoolProperties.Setter.asMock(builder);
}

@Override
public HystrixCollapserProperties getCollapserProperties(HystrixCollapserKey collapserKey, HystrixCollapserProperties.Setter builder) {
throw new IllegalStateException("not expecting collapser properties");
}

@Override
public String getCommandPropertiesCacheKey(HystrixCommandKey commandKey, HystrixCommandProperties.Setter builder) {
return null;
}

@Override
public String getThreadPoolPropertiesCacheKey(HystrixThreadPoolKey threadPoolKey, com.netflix.hystrix.HystrixThreadPoolProperties.Setter builder) {
return null;
}

@Override
public String getCollapserPropertiesCacheKey(HystrixCollapserKey collapserKey, com.netflix.hystrix.HystrixCollapserProperties.Setter builder) {
return null;
}

}

}
1,481 changes: 1,481 additions & 0 deletions hystrix-core/src/test/java/com/netflix/hystrix/CommonHystrixCommandTests.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -33,7 +33,6 @@
import org.junit.Test;

import com.netflix.hystrix.HystrixCollapser.CollapsedRequest;
import com.netflix.hystrix.HystrixCommandTest.TestHystrixCommand;
import com.netflix.hystrix.collapser.CollapserTimer;
import com.netflix.hystrix.collapser.RealCollapserTimer;
import com.netflix.hystrix.collapser.RequestCollapser;
Original file line number Diff line number Diff line change
@@ -20,8 +20,6 @@

import org.junit.Test;

import com.netflix.hystrix.HystrixCommandTest.CommandGroupForUnitTest;
import com.netflix.hystrix.HystrixCommandTest.CommandKeyForUnitTest;
import com.netflix.hystrix.strategy.eventnotifier.HystrixEventNotifierDefault;


@@ -131,7 +129,7 @@ protected Boolean getFallback() {
* Utility method for creating {@link HystrixCommandMetrics} for unit tests.
*/
private static HystrixCommandMetrics getMetrics(HystrixCommandProperties.Setter properties) {
return new HystrixCommandMetrics(CommandKeyForUnitTest.KEY_ONE, CommandGroupForUnitTest.OWNER_ONE, HystrixCommandPropertiesTest.asMock(properties), HystrixEventNotifierDefault.getInstance());
return new HystrixCommandMetrics(InspectableBuilder.CommandKeyForUnitTest.KEY_ONE, InspectableBuilder.CommandGroupForUnitTest.OWNER_ONE, HystrixCommandPropertiesTest.asMock(properties), HystrixEventNotifierDefault.getInstance());
}

}
2,588 changes: 448 additions & 2,140 deletions hystrix-core/src/test/java/com/netflix/hystrix/HystrixCommandTest.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -19,10 +19,8 @@
import static org.junit.Assert.assertSame;

import java.util.Collection;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;

import com.netflix.hystrix.strategy.properties.HystrixPropertiesCollapserDefault;
import com.netflix.hystrix.util.HystrixRollingNumberEvent;
@@ -38,7 +36,6 @@

import com.netflix.hystrix.HystrixCollapser.CollapsedRequest;
import com.netflix.hystrix.HystrixCollapserTest.TestCollapserTimer;
import com.netflix.hystrix.HystrixObservableCommandTest.TestHystrixCommand;
import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext;

public class HystrixObservableCollapserTest {
@@ -205,7 +202,7 @@ public String name() {
};
}

private static class TestCollapserCommand extends TestHystrixCommand<String> {
private static class TestCollapserCommand extends TestHystrixObservableCommand<String> {

private final Collection<CollapsedRequest<String, String>> requests;

3,228 changes: 774 additions & 2,454 deletions hystrix-core/src/test/java/com/netflix/hystrix/HystrixObservableCommandTest.java

Large diffs are not rendered by default.

107 changes: 107 additions & 0 deletions hystrix-core/src/test/java/com/netflix/hystrix/InspectableBuilder.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/**
* Copyright 2015 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.hystrix;

public interface InspectableBuilder {
public TestCommandBuilder getBuilder();

public enum CommandKeyForUnitTest implements HystrixCommandKey {
KEY_ONE, KEY_TWO
}

public enum CommandGroupForUnitTest implements HystrixCommandGroupKey {
OWNER_ONE, OWNER_TWO
}

public enum ThreadPoolKeyForUnitTest implements HystrixThreadPoolKey {
THREAD_POOL_ONE, THREAD_POOL_TWO
}

public static class TestCommandBuilder {
HystrixCircuitBreakerTest.TestCircuitBreaker _cb = new HystrixCircuitBreakerTest.TestCircuitBreaker();
HystrixCommandGroupKey owner = CommandGroupForUnitTest.OWNER_ONE;
HystrixCommandKey dependencyKey = null;
HystrixThreadPoolKey threadPoolKey = null;
HystrixCircuitBreaker circuitBreaker = _cb;
HystrixThreadPool threadPool = null;
HystrixCommandProperties.Setter commandPropertiesDefaults = HystrixCommandPropertiesTest.getUnitTestPropertiesSetter();
HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults = HystrixThreadPoolProperties.Setter.getUnitTestPropertiesBuilder();
HystrixCommandMetrics metrics = _cb.metrics;
AbstractCommand.TryableSemaphore fallbackSemaphore = null;
AbstractCommand.TryableSemaphore executionSemaphore = null;
TestableExecutionHook executionHook = new TestableExecutionHook();

TestCommandBuilder(HystrixCommandProperties.ExecutionIsolationStrategy isolationStrategy) {
this.commandPropertiesDefaults = HystrixCommandPropertiesTest.getUnitTestPropertiesSetter().withExecutionIsolationStrategy(isolationStrategy);
}

TestCommandBuilder setOwner(HystrixCommandGroupKey owner) {
this.owner = owner;
return this;
}

TestCommandBuilder setCommandKey(HystrixCommandKey dependencyKey) {
this.dependencyKey = dependencyKey;
return this;
}

TestCommandBuilder setThreadPoolKey(HystrixThreadPoolKey threadPoolKey) {
this.threadPoolKey = threadPoolKey;
return this;
}

TestCommandBuilder setCircuitBreaker(HystrixCircuitBreaker circuitBreaker) {
this.circuitBreaker = circuitBreaker;
return this;
}

TestCommandBuilder setThreadPool(HystrixThreadPool threadPool) {
this.threadPool = threadPool;
return this;
}

TestCommandBuilder setCommandPropertiesDefaults(HystrixCommandProperties.Setter commandPropertiesDefaults) {
this.commandPropertiesDefaults = commandPropertiesDefaults;
return this;
}

TestCommandBuilder setThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults) {
this.threadPoolPropertiesDefaults = threadPoolPropertiesDefaults;
return this;
}

TestCommandBuilder setMetrics(HystrixCommandMetrics metrics) {
this.metrics = metrics;
return this;
}

TestCommandBuilder setFallbackSemaphore(AbstractCommand.TryableSemaphore fallbackSemaphore) {
this.fallbackSemaphore = fallbackSemaphore;
return this;
}

TestCommandBuilder setExecutionSemaphore(AbstractCommand.TryableSemaphore executionSemaphore) {
this.executionSemaphore = executionSemaphore;
return this;
}

TestCommandBuilder setExecutionHook(TestableExecutionHook executionHook) {
this.executionHook = executionHook;
return this;
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/**
* Copyright 2015 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.hystrix;

abstract public class TestHystrixCommand<T> extends HystrixCommand<T> implements AbstractTestHystrixCommand<T> {

private final TestCommandBuilder builder;

public TestHystrixCommand(TestCommandBuilder builder) {
super(builder.owner, builder.dependencyKey, builder.threadPoolKey, builder.circuitBreaker, builder.threadPool,
builder.commandPropertiesDefaults, builder.threadPoolPropertiesDefaults, builder.metrics,
builder.fallbackSemaphore, builder.executionSemaphore, TEST_PROPERTIES_FACTORY, builder.executionHook);
this.builder = builder;
}

public TestCommandBuilder getBuilder() {
return builder;
}

static TestCommandBuilder testPropsBuilder() {
return new TestCommandBuilder(HystrixCommandProperties.ExecutionIsolationStrategy.THREAD);
}

static TestCommandBuilder testPropsBuilder(HystrixCommandProperties.ExecutionIsolationStrategy isolationStrategy) {
return new TestCommandBuilder(isolationStrategy);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/**
* Copyright 2015 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.hystrix;

abstract public class TestHystrixObservableCommand<T> extends HystrixObservableCommand<T> implements AbstractTestHystrixCommand<T> {

private final TestCommandBuilder builder;

public TestHystrixObservableCommand(TestCommandBuilder builder) {
super(builder.owner, builder.dependencyKey, builder.threadPoolKey, builder.circuitBreaker, builder.threadPool,
builder.commandPropertiesDefaults, builder.threadPoolPropertiesDefaults, builder.metrics,
builder.fallbackSemaphore, builder.executionSemaphore, TEST_PROPERTIES_FACTORY, builder.executionHook);
this.builder = builder;
}

public TestCommandBuilder getBuilder() {
return builder;
}

static TestCommandBuilder testPropsBuilder() {
return new TestCommandBuilder(HystrixCommandProperties.ExecutionIsolationStrategy.SEMAPHORE);
}

static TestCommandBuilder testPropsBuilder(HystrixCommandProperties.ExecutionIsolationStrategy isolationStrategy) {
return new TestCommandBuilder(isolationStrategy);
}
}
Original file line number Diff line number Diff line change
@@ -18,7 +18,10 @@
import com.netflix.hystrix.exception.HystrixRuntimeException;
import com.netflix.hystrix.exception.HystrixRuntimeException.FailureType;
import com.netflix.hystrix.strategy.executionhook.HystrixCommandExecutionHook;
import rx.Notification;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

class TestableExecutionHook extends HystrixCommandExecutionHook {
@@ -28,113 +31,214 @@ private static void recordHookCall(StringBuilder sequenceRecorder, String method
}

StringBuilder executionSequence = new StringBuilder();
AtomicInteger startExecute = new AtomicInteger();
List<Notification<?>> commandEmissions = new ArrayList<>();
List<Notification<?>> executionEmissions = new ArrayList<>();
List<Notification<?>> fallbackEmissions = new ArrayList<>();

public boolean commandEmissionsMatch(int numOnNext, int numOnError, int numOnCompleted) {
return eventsMatch(commandEmissions, numOnNext, numOnError, numOnCompleted);
}

public boolean executionEventsMatch(int numOnNext, int numOnError, int numOnCompleted) {
return eventsMatch(executionEmissions, numOnNext, numOnError, numOnCompleted);
}

public boolean fallbackEventsMatch(int numOnNext, int numOnError, int numOnCompleted) {
return eventsMatch(fallbackEmissions, numOnNext, numOnError, numOnCompleted);
}

private boolean eventsMatch(List<Notification<?>> l, int numOnNext, int numOnError, int numOnCompleted) {
if (numOnNext + numOnError + numOnCompleted != l.size()) {
System.err.println("Events : " + l.size() + " don't add up to the events you asked to verify");
return false;
}
boolean matchSoFar = true;
for (int n = 0; n < numOnNext; n++) {
Notification<?> current = l.get(n);
if (!current.isOnNext()) {
matchSoFar = false;
}
}
for (int e = numOnNext; e < numOnNext + numOnError; e++) {
Notification<?> current = l.get(e);
if (!current.isOnError()) {
matchSoFar = false;
}
}
for (int c = numOnNext + numOnError; c < numOnNext + numOnError + numOnCompleted; c++) {
Notification<?> current = l.get(c);
if (!current.isOnCompleted()) {
matchSoFar = false;
}
}
return matchSoFar;
}

public Throwable getCommandException() {
return getException(commandEmissions);
}

public Throwable getExecutionException() {
return getException(executionEmissions);
}

public Throwable getFallbackException() {
return getException(fallbackEmissions);
}

private Throwable getException(List<Notification<?>> l) {
for (Notification<?> n: l) {
if (n.isOnError()) {
return n.getThrowable();
}
}
return null;
}

@Override
public <T> void onStart(HystrixInvokable<T> commandInstance) {
super.onStart(commandInstance);
recordHookCall(executionSequence, "onStart");
startExecute.incrementAndGet();
}

Object endExecuteSuccessResponse = null;

@Override
public <T> T onComplete(HystrixInvokable<T> commandInstance, T response) {
endExecuteSuccessResponse = response;
recordHookCall(executionSequence, "onComplete");
return super.onComplete(commandInstance, response);
public <T> T onEmit(HystrixInvokable<T> commandInstance, T value) {
commandEmissions.add(Notification.createOnNext(value));
recordHookCall(executionSequence, "onEmit");
return super.onEmit(commandInstance, value);
}

Exception endExecuteFailureException = null;
HystrixRuntimeException.FailureType endExecuteFailureType = null;

@Override
public <T> Exception onError(HystrixInvokable<T> commandInstance, FailureType failureType, Exception e) {
endExecuteFailureException = e;
endExecuteFailureType = failureType;
commandEmissions.add(Notification.createOnError(e));
recordHookCall(executionSequence, "onError");
return super.onError(commandInstance, failureType, e);
}

AtomicInteger startRun = new AtomicInteger();
@Override
public <T> void onSuccess(HystrixInvokable<T> commandInstance) {
commandEmissions.add(Notification.createOnCompleted());
recordHookCall(executionSequence, "onSuccess");
super.onSuccess(commandInstance);
}

@Override
public <T> void onRunStart(HystrixInvokable<T> commandInstance) {
super.onRunStart(commandInstance);
recordHookCall(executionSequence, "onRunStart");
startRun.incrementAndGet();
public <T> void onThreadStart(HystrixInvokable<T> commandInstance) {
super.onThreadStart(commandInstance);
recordHookCall(executionSequence, "onThreadStart");
}

Object runSuccessResponse = null;
@Override
public <T> void onThreadComplete(HystrixInvokable<T> commandInstance) {
super.onThreadComplete(commandInstance);
recordHookCall(executionSequence, "onThreadComplete");
}

@Override
public <T> T onRunSuccess(HystrixInvokable<T> commandInstance, T response) {
runSuccessResponse = response;
recordHookCall(executionSequence, "onRunSuccess");
return super.onRunSuccess(commandInstance, response);
public <T> void onExecutionStart(HystrixInvokable<T> commandInstance) {
recordHookCall(executionSequence, "onExecutionStart");
super.onExecutionStart(commandInstance);
}

Exception runFailureException = null;
@Override
public <T> T onExecutionEmit(HystrixInvokable<T> commandInstance, T value) {
executionEmissions.add(Notification.createOnNext(value));
recordHookCall(executionSequence, "onExecutionEmit");
return super.onExecutionEmit(commandInstance, value);
}

@Override
public <T> Exception onRunError(HystrixInvokable<T> commandInstance, Exception e) {
runFailureException = e;
recordHookCall(executionSequence, "onRunError");
return super.onRunError(commandInstance, e);
public <T> Exception onExecutionError(HystrixInvokable<T> commandInstance, Exception e) {
executionEmissions.add(Notification.createOnError(e));
recordHookCall(executionSequence, "onExecutionError");
return super.onExecutionError(commandInstance, e);
}

AtomicInteger startFallback = new AtomicInteger();
@Override
public <T> void onExecutionSuccess(HystrixInvokable<T> commandInstance) {
executionEmissions.add(Notification.createOnCompleted());
recordHookCall(executionSequence, "onExecutionSuccess");
super.onExecutionSuccess(commandInstance);
}

@Override
public <T> void onFallbackStart(HystrixInvokable<T> commandInstance) {
super.onFallbackStart(commandInstance);
recordHookCall(executionSequence, "onFallbackStart");
startFallback.incrementAndGet();
}

Object fallbackSuccessResponse = null;

@Override
public <T> T onFallbackSuccess(HystrixInvokable<T> commandInstance, T response) {
fallbackSuccessResponse = response;
recordHookCall(executionSequence, "onFallbackSuccess");
return super.onFallbackSuccess(commandInstance, response);
public <T> T onFallbackEmit(HystrixInvokable<T> commandInstance, T value) {
fallbackEmissions.add(Notification.createOnNext(value));
recordHookCall(executionSequence, "onFallbackEmit");
return super.onFallbackEmit(commandInstance, value);
}

Exception fallbackFailureException = null;

@Override
public <T> Exception onFallbackError(HystrixInvokable<T> commandInstance, Exception e) {
fallbackFailureException = e;
fallbackEmissions.add(Notification.createOnError(e));
recordHookCall(executionSequence, "onFallbackError");
return super.onFallbackError(commandInstance, e);
}

AtomicInteger threadStart = new AtomicInteger();
@Override
public <T> void onFallbackSuccess(HystrixInvokable<T> commandInstance) {
fallbackEmissions.add(Notification.createOnCompleted());
recordHookCall(executionSequence, "onFallbackSuccess");
super.onFallbackSuccess(commandInstance);
}

@Override
public <T> void onThreadStart(HystrixInvokable<T> commandInstance) {
super.onThreadStart(commandInstance);
recordHookCall(executionSequence, "onThreadStart");
threadStart.incrementAndGet();
public <T> void onCacheHit(HystrixInvokable<T> commandInstance) {
super.onCacheHit(commandInstance);
recordHookCall(executionSequence, "onCacheHit");
}

/**
* DEPRECATED METHODS FOLLOW. The string representation starts with `!D!` to distinguish
*/

AtomicInteger startExecute = new AtomicInteger();
Object endExecuteSuccessResponse = null;
Exception endExecuteFailureException = null;
HystrixRuntimeException.FailureType endExecuteFailureType = null;
AtomicInteger startRun = new AtomicInteger();
Object runSuccessResponse = null;
Exception runFailureException = null;
AtomicInteger startFallback = new AtomicInteger();
Object fallbackSuccessResponse = null;
Exception fallbackFailureException = null;
AtomicInteger threadStart = new AtomicInteger();
AtomicInteger threadComplete = new AtomicInteger();
AtomicInteger cacheHit = new AtomicInteger();

@Override
public <T> void onThreadComplete(HystrixInvokable<T> commandInstance) {
super.onThreadComplete(commandInstance);
recordHookCall(executionSequence, "onThreadComplete");
threadComplete.incrementAndGet();
public <T> T onFallbackSuccess(HystrixInvokable<T> commandInstance, T response) {
recordHookCall(executionSequence, "!onFallbackSuccess");
return super.onFallbackSuccess(commandInstance, response);
}

AtomicInteger cacheHit = new AtomicInteger();
@Override
public <T> T onComplete(HystrixInvokable<T> commandInstance, T response) {
recordHookCall(executionSequence, "!onComplete");
return super.onComplete(commandInstance, response);
}

@Override
public <T> void onCacheHit(HystrixInvokable<T> commandInstance) {
super.onCacheHit(commandInstance);
recordHookCall(executionSequence, "onCacheHit");
cacheHit.incrementAndGet();
public <T> void onRunStart(HystrixInvokable<T> commandInstance) {
super.onRunStart(commandInstance);
recordHookCall(executionSequence, "!onRunStart");
}

@Override
public <T> T onRunSuccess(HystrixInvokable<T> commandInstance, T response) {
recordHookCall(executionSequence, "!onRunSuccess");
return super.onRunSuccess(commandInstance, response);
}

@Override
public <T> Exception onRunError(HystrixInvokable<T> commandInstance, Exception e) {
recordHookCall(executionSequence, "!onRunError");
return super.onRunError(commandInstance, e);
}
}