Skip to content

Commit e9d8276

Browse files
author
Justin Jose
committed
Adding hystrix-metrics-event-stream-jaxrs module
1 parent 8f16eb9 commit e9d8276

16 files changed

+1106
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
# hystrix-metrics-event-stream-jaxrs
2+
3+
This module is a JAX-RS implementation of [hystrix-metrics-event-stream](https://github.com/Netflix/Hystrix/tree/master/hystrix-contrib/hystrix-metrics-event-stream) module without any Servlet API dependency and exposes metrics in a [text/event-stream](https://developer.mozilla.org/en-US/docs/Server-sent_events/Using_server-sent_events) formatted stream that continues as long as a client holds the connection.
4+
5+
6+
# Binaries
7+
8+
Binaries and dependency information for Maven, Ivy, Gradle and others can be found at [http://search.maven.org](http://search.maven.org/#search%7Cga%7C1%7Ca%3A%22hystrix-metrics-event-stream-jaxrs%22).
9+
10+
Example for Maven ([lookup latest version](http://search.maven.org/#search%7Cga%7C1%7Ca%3A%22hystrix-metrics-event-stream-jaxrs%22)):
11+
12+
```xml
13+
<dependency>
14+
<groupId>com.netflix.hystrix</groupId>
15+
<artifactId>hystrix-metrics-event-stream-jaxrs</artifactId>
16+
<version>1.6.0</version>
17+
</dependency>
18+
```
19+
and for Ivy:
20+
21+
```xml
22+
<dependency org="com.netflix.hystrix" name="hystrix-metrics-event-stream-jaxrs" rev="1.6.0" />
23+
```
24+
25+
# Installation
26+
27+
1) Include hystrix-metrics-event-stream-jaxrs*.jar in your classpath (such as /WEB-INF/lib).
28+
2) Register `HystrixStreamFeature` in your `javax.ws.rs.core.Application` as shown below.
29+
30+
```java
31+
32+
public class HystrixStreamApplication extends Application{
33+
34+
@Override
35+
public Set<Class<?>> getClasses() {
36+
Set<Class<?>> clazzes = new HashSet<Class<?>>();
37+
clazzes.add(HystrixStreamFeature.class);
38+
return clazzes;
39+
}
40+
}
41+
```
42+
43+
3) Following end-points are available
44+
* /hystrix.stream - Stream Hystrix Metrics
45+
* /hystrix/utilization.stream - Stream Hystrix Utilization
46+
* /hystrix/config.stream - Stream Hystrix configuration
47+
* /hystrix/request.stream - Stream Hystrix SSE events
48+
49+
50+
51+
# Test
52+
53+
To test your installation you can use curl like this:
54+
55+
```
56+
$ curl http://hostname:port/appname/hystrix.stream
57+
58+
data: {"rollingCountFailure":0,"propertyValue_executionIsolationThreadInterruptOnTimeout":true,"rollingCountTimeout":0,"rollingCountExceptionsThrown":0,"rollingCountFallbackSuccess":0,"errorCount":0,"type":"HystrixCommand","propertyValue_circuitBreakerEnabled":true,"reportingHosts":1,"latencyTotal":{"0":0,"95":0,"99.5":0,"90":0,"25":0,"99":0,"75":0,"100":0,"50":0},"currentConcurrentExecutionCount":0,"rollingCountSemaphoreRejected":0,"rollingCountFallbackRejection":0,"rollingCountShortCircuited":0,"rollingCountResponsesFromCache":0,"propertyValue_circuitBreakerForceClosed":false,"name":"IdentityCookieAuthSwitchProfile","propertyValue_executionIsolationThreadPoolKeyOverride":"null","rollingCountSuccess":0,"propertyValue_requestLogEnabled":true,"requestCount":0,"rollingCountCollapsedRequests":0,"errorPercentage":0,"propertyValue_circuitBreakerSleepWindowInMilliseconds":5000,"latencyTotal_mean":0,"propertyValue_circuitBreakerForceOpen":false,"propertyValue_circuitBreakerRequestVolumeThreshold":20,"propertyValue_circuitBreakerErrorThresholdPercentage":50,"propertyValue_executionIsolationStrategy":"THREAD","rollingCountFallbackFailure":0,"isCircuitBreakerOpen":false,"propertyValue_executionIsolationSemaphoreMaxConcurrentRequests":20,"propertyValue_executionIsolationThreadTimeoutInMilliseconds":1000,"propertyValue_metricsRollingStatisticalWindowInMilliseconds":10000,"propertyValue_fallbackIsolationSemaphoreMaxConcurrentRequests":10,"latencyExecute":{"0":0,"95":0,"99.5":0,"90":0,"25":0,"99":0,"75":0,"100":0,"50":0},"group":"IDENTITY","latencyExecute_mean":0,"propertyValue_requestCacheEnabled":true,"rollingCountThreadPoolRejected":0}
59+
60+
data: {"rollingCountFailure":0,"propertyValue_executionIsolationThreadInterruptOnTimeout":true,"rollingCountTimeout":0,"rollingCountExceptionsThrown":0,"rollingCountFallbackSuccess":0,"errorCount":0,"type":"HystrixCommand","propertyValue_circuitBreakerEnabled":true,"reportingHosts":3,"latencyTotal":{"0":1,"95":1,"99.5":1,"90":1,"25":1,"99":1,"75":1,"100":1,"50":1},"currentConcurrentExecutionCount":0,"rollingCountSemaphoreRejected":0,"rollingCountFallbackRejection":0,"rollingCountShortCircuited":0,"rollingCountResponsesFromCache":0,"propertyValue_circuitBreakerForceClosed":false,"name":"CryptexDecrypt","propertyValue_executionIsolationThreadPoolKeyOverride":"null","rollingCountSuccess":1,"propertyValue_requestLogEnabled":true,"requestCount":1,"rollingCountCollapsedRequests":0,"errorPercentage":0,"propertyValue_circuitBreakerSleepWindowInMilliseconds":15000,"latencyTotal_mean":1,"propertyValue_circuitBreakerForceOpen":false,"propertyValue_circuitBreakerRequestVolumeThreshold":60,"propertyValue_circuitBreakerErrorThresholdPercentage":150,"propertyValue_executionIsolationStrategy":"THREAD","rollingCountFallbackFailure":0,"isCircuitBreakerOpen":false,"propertyValue_executionIsolationSemaphoreMaxConcurrentRequests":60,"propertyValue_executionIsolationThreadTimeoutInMilliseconds":3000,"propertyValue_metricsRollingStatisticalWindowInMilliseconds":30000,"propertyValue_fallbackIsolationSemaphoreMaxConcurrentRequests":30,"latencyExecute":{"0":0,"95":0,"99.5":0,"90":0,"25":0,"99":0,"75":0,"100":0,"50":0},"group":"CRYPTEX","latencyExecute_mean":0,"propertyValue_requestCacheEnabled":true,"rollingCountThreadPoolRejected":0}
61+
```
62+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
dependencies {
2+
compileApi project(':hystrix-core')
3+
compile project(':hystrix-serialization')
4+
provided 'javax.ws.rs:javax.ws.rs-api:2.0.1'
5+
testCompile 'junit:junit-dep:4.10'
6+
testCompile 'org.glassfish.jersey.test-framework.providers:jersey-test-framework-provider-grizzly2:2.25.1'
7+
testCompile 'org.glassfish.jersey.media:jersey-media-sse:2.25.1'
8+
9+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.netflix.hystrix.contrib.metrics;
17+
18+
import java.util.concurrent.atomic.AtomicInteger;
19+
20+
import rx.Observable;
21+
22+
/**
23+
* @author justinjose28
24+
*
25+
*/
26+
public final class HystrixStream {
27+
private final Observable<String> sampleStream;
28+
private final int pausePollerThreadDelayInMs;
29+
private final AtomicInteger concurrentConnections;
30+
31+
public HystrixStream(Observable<String> sampleStream, int pausePollerThreadDelayInMs, AtomicInteger concurrentConnections) {
32+
this.sampleStream = sampleStream;
33+
this.pausePollerThreadDelayInMs = pausePollerThreadDelayInMs;
34+
this.concurrentConnections = concurrentConnections;
35+
}
36+
37+
public Observable<String> getSampleStream() {
38+
return sampleStream;
39+
}
40+
41+
public int getPausePollerThreadDelayInMs() {
42+
return pausePollerThreadDelayInMs;
43+
}
44+
45+
public AtomicInteger getConcurrentConnections() {
46+
return concurrentConnections;
47+
}
48+
49+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.netflix.hystrix.contrib.metrics;
17+
18+
import javax.ws.rs.core.Feature;
19+
import javax.ws.rs.core.FeatureContext;
20+
21+
import com.netflix.hystrix.contrib.metrics.controller.HystrixConfigSseController;
22+
import com.netflix.hystrix.contrib.metrics.controller.HystrixMetricsStreamController;
23+
import com.netflix.hystrix.contrib.metrics.controller.HystrixRequestEventsSseController;
24+
import com.netflix.hystrix.contrib.metrics.controller.HystrixUtilizationSseController;
25+
26+
/**
27+
* @author justinjose28
28+
*
29+
*/
30+
public class HystrixStreamFeature implements Feature {
31+
32+
@Override
33+
public boolean configure(FeatureContext context) {
34+
context.register(new HystrixMetricsStreamController());
35+
context.register(new HystrixUtilizationSseController());
36+
context.register(new HystrixRequestEventsSseController());
37+
context.register(new HystrixConfigSseController());
38+
context.register(HystrixStreamingOutputProvider.class);
39+
return true;
40+
}
41+
42+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.netflix.hystrix.contrib.metrics;
17+
18+
import java.io.IOException;
19+
import java.io.OutputStream;
20+
import java.lang.annotation.Annotation;
21+
import java.lang.reflect.Type;
22+
import java.util.concurrent.atomic.AtomicBoolean;
23+
24+
import javax.ws.rs.core.MediaType;
25+
import javax.ws.rs.core.MultivaluedMap;
26+
import javax.ws.rs.ext.MessageBodyWriter;
27+
import javax.ws.rs.ext.Provider;
28+
29+
import org.slf4j.Logger;
30+
import org.slf4j.LoggerFactory;
31+
32+
import rx.Subscriber;
33+
import rx.Subscription;
34+
import rx.schedulers.Schedulers;
35+
36+
/**
37+
* {@link MessageBodyWriter} implementation which handles serialization of HystrixStream
38+
*
39+
*
40+
* @author justinjose28
41+
*
42+
*/
43+
44+
@Provider
45+
public class HystrixStreamingOutputProvider implements MessageBodyWriter<HystrixStream> {
46+
47+
private static final Logger LOGGER = LoggerFactory.getLogger(HystrixStreamingOutputProvider.class);
48+
49+
@Override
50+
public boolean isWriteable(Class<?> t, Type gt, Annotation[] as, MediaType mediaType) {
51+
return HystrixStream.class.isAssignableFrom(t);
52+
}
53+
54+
@Override
55+
public long getSize(HystrixStream o, Class<?> type, Type genericType, Annotation[] annotations, MediaType mediaType) {
56+
return -1;
57+
}
58+
59+
@Override
60+
public void writeTo(HystrixStream o, Class<?> t, Type gt, Annotation[] as, MediaType mediaType, MultivaluedMap<String, Object> httpHeaders, final OutputStream entity) throws IOException {
61+
Subscription sampleSubscription = null;
62+
final AtomicBoolean moreDataWillBeSent = new AtomicBoolean(true);
63+
try {
64+
65+
sampleSubscription = o.getSampleStream().observeOn(Schedulers.io()).subscribe(new Subscriber<String>() {
66+
@Override
67+
public void onCompleted() {
68+
LOGGER.error("HystrixSampleSseServlet: ({}) received unexpected OnCompleted from sample stream", getClass().getSimpleName());
69+
moreDataWillBeSent.set(false);
70+
}
71+
72+
@Override
73+
public void onError(Throwable e) {
74+
moreDataWillBeSent.set(false);
75+
}
76+
77+
@Override
78+
public void onNext(String sampleDataAsString) {
79+
if (sampleDataAsString != null) {
80+
try {
81+
entity.write(("data: " + sampleDataAsString + "\n\n").getBytes());
82+
entity.flush();
83+
} catch (IOException ioe) {
84+
moreDataWillBeSent.set(false);
85+
}
86+
}
87+
}
88+
});
89+
90+
while (moreDataWillBeSent.get()) {
91+
try {
92+
Thread.sleep(o.getPausePollerThreadDelayInMs());
93+
} catch (InterruptedException e) {
94+
moreDataWillBeSent.set(false);
95+
}
96+
}
97+
} finally {
98+
o.getConcurrentConnections().decrementAndGet();
99+
if (sampleSubscription != null && !sampleSubscription.isUnsubscribed()) {
100+
sampleSubscription.unsubscribe();
101+
}
102+
}
103+
}
104+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.netflix.hystrix.contrib.metrics.controller;
17+
18+
import java.util.concurrent.atomic.AtomicInteger;
19+
20+
import javax.ws.rs.core.HttpHeaders;
21+
import javax.ws.rs.core.Response;
22+
import javax.ws.rs.core.Response.ResponseBuilder;
23+
import javax.ws.rs.core.Response.Status;
24+
25+
import org.slf4j.Logger;
26+
import org.slf4j.LoggerFactory;
27+
28+
import rx.Observable;
29+
30+
import com.netflix.hystrix.contrib.metrics.HystrixStream;
31+
import com.netflix.hystrix.contrib.metrics.HystrixStreamingOutputProvider;
32+
33+
/**
34+
* @author justinjose28
35+
*
36+
*/
37+
public abstract class AbstractHystrixStreamController {
38+
protected final Observable<String> sampleStream;
39+
40+
static final Logger logger = LoggerFactory.getLogger(AbstractHystrixStreamController.class);
41+
42+
// wake up occasionally and check that poller is still alive. this value controls how often
43+
protected static final int DEFAULT_PAUSE_POLLER_THREAD_DELAY_IN_MS = 500;
44+
45+
private final int pausePollerThreadDelayInMs;
46+
47+
private static final AtomicInteger concurrentConnections = new AtomicInteger(0);
48+
49+
protected AbstractHystrixStreamController(Observable<String> sampleStream) {
50+
this(sampleStream, DEFAULT_PAUSE_POLLER_THREAD_DELAY_IN_MS);
51+
}
52+
53+
protected AbstractHystrixStreamController(Observable<String> sampleStream, int pausePollerThreadDelayInMs) {
54+
this.sampleStream = sampleStream;
55+
this.pausePollerThreadDelayInMs = pausePollerThreadDelayInMs;
56+
}
57+
58+
protected abstract int getMaxNumberConcurrentConnectionsAllowed();
59+
60+
protected final AtomicInteger getCurrentConnections() {
61+
return concurrentConnections;
62+
}
63+
64+
/**
65+
* Maintain an open connection with the client. On initial connection send latest data of each requested event type and subsequently send all changes for each requested event type.
66+
*
67+
* @return JAX-RS Response - Serialization will be handled by {@link HystrixStreamingOutputProvider}
68+
*/
69+
protected Response handleRequest() {
70+
ResponseBuilder builder = null;
71+
/* ensure we aren't allowing more connections than we want */
72+
int numberConnections = getCurrentConnections().get();
73+
int maxNumberConnectionsAllowed = getMaxNumberConcurrentConnectionsAllowed(); // may change at runtime, so look this up for each request
74+
if (numberConnections >= maxNumberConnectionsAllowed) {
75+
builder = Response.status(Status.SERVICE_UNAVAILABLE).entity("MaxConcurrentConnections reached: " + maxNumberConnectionsAllowed);
76+
} else {
77+
/* initialize response */
78+
builder = Response.status(Status.OK);
79+
builder.header(HttpHeaders.CONTENT_TYPE, "text/event-stream;charset=UTF-8");
80+
builder.header(HttpHeaders.CACHE_CONTROL, "no-cache, no-store, max-age=0, must-revalidate");
81+
builder.header("Pragma", "no-cache");
82+
getCurrentConnections().incrementAndGet();
83+
builder.entity(new HystrixStream(sampleStream, pausePollerThreadDelayInMs, getCurrentConnections()));
84+
}
85+
return builder.build();
86+
87+
}
88+
89+
}

0 commit comments

Comments
 (0)