Skip to content

Commit 02b3649

Browse files
committed
Doppler on Reactor Netty
This is the first component in the project to be moved from a Spring-based networking stack to Reactor Netty. In addition to the implementation for Doppler endpoints, this change also adds the common infrastructure and test infrastructure for other components.
1 parent 1ee5cd3 commit 02b3649

File tree

75 files changed

+4248
-49
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

75 files changed

+4248
-49
lines changed

.gitmodules

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
[submodule "vendor/dropsonde-protocol"]
2+
path = vendor/dropsonde-protocol
3+
url = https://github.com/cloudfoundry/dropsonde-protocol.git

.idea/dictionaries/bhale.xml

+16
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

.idea/vcs.xml

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

bin/compile_log_message.sh

-6
This file was deleted.

cloudfoundry-client-spring/cloudfoundry-client-spring.iml

+11-1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
<excludeFolder url="file://$MODULE_DIR$/target/maven-archiver" />
1919
<excludeFolder url="file://$MODULE_DIR$/target/maven-status" />
2020
<excludeFolder url="file://$MODULE_DIR$/target/site" />
21+
<excludeFolder url="file://$MODULE_DIR$/target/surefire" />
2122
<excludeFolder url="file://$MODULE_DIR$/target/surefire-reports" />
2223
<excludeFolder url="file://$MODULE_DIR$/target/test-classes" />
2324
</content>
@@ -30,13 +31,22 @@
3031
<orderEntry type="library" name="Maven: com.fasterxml.jackson.core:jackson-annotations:2.6.5" level="project" />
3132
<orderEntry type="library" name="Maven: com.fasterxml.jackson.core:jackson-core:2.6.5" level="project" />
3233
<orderEntry type="library" name="Maven: com.github.zafarkhaja:java-semver:0.9.0" level="project" />
34+
<orderEntry type="library" scope="TEST" name="Maven: com.squareup.okhttp3:mockwebserver:3.2.0" level="project" />
35+
<orderEntry type="library" scope="TEST" name="Maven: com.squareup.okhttp3:okhttp:3.2.0" level="project" />
36+
<orderEntry type="library" name="Maven: com.squareup.okio:okio:1.6.0" level="project" />
37+
<orderEntry type="library" scope="TEST" name="Maven: com.squareup.okhttp3:okhttp-ws:3.2.0" level="project" />
38+
<orderEntry type="library" scope="TEST" name="Maven: org.bouncycastle:bcprov-jdk15on:1.50" level="project" />
39+
<orderEntry type="library" name="Maven: io.projectreactor:reactor-netty:2.5.0.BUILD-SNAPSHOT" level="project" />
40+
<orderEntry type="library" name="Maven: io.projectreactor:reactor-ipc:2.5.0.BUILD-SNAPSHOT" level="project" />
41+
<orderEntry type="library" name="Maven: io.netty:netty-all:4.1.0.CR7" level="project" />
3342
<orderEntry type="library" scope="TEST" name="Maven: junit:junit:4.12" level="project" />
3443
<orderEntry type="library" scope="TEST" name="Maven: org.hamcrest:hamcrest-core:1.3" level="project" />
3544
<orderEntry type="library" name="Maven: org.apache.tomcat.embed:tomcat-embed-logging-juli:8.0.32" level="project" />
3645
<orderEntry type="library" name="Maven: org.apache.tomcat.embed:tomcat-embed-websocket:8.0.32" level="project" />
3746
<orderEntry type="library" name="Maven: org.apache.tomcat.embed:tomcat-embed-core:8.0.32" level="project" />
3847
<orderEntry type="module" module-name="cloudfoundry-client" />
3948
<orderEntry type="library" name="Maven: com.google.protobuf:protobuf-java:2.6.1" level="project" />
49+
<orderEntry type="library" name="Maven: com.squareup.wire:wire-runtime:2.1.2" level="project" />
4050
<orderEntry type="library" name="Maven: io.projectreactor:reactor-core:2.5.0.BUILD-SNAPSHOT" level="project" />
4151
<orderEntry type="library" name="Maven: org.reactivestreams:reactive-streams:1.0.0" level="project" />
4252
<orderEntry type="module" module-name="cloudfoundry-util" />
@@ -45,6 +55,7 @@
4555
<orderEntry type="library" scope="TEST" name="Maven: org.objenesis:objenesis:2.1" level="project" />
4656
<orderEntry type="library" scope="PROVIDED" name="Maven: org.projectlombok:lombok:1.16.8" level="project" />
4757
<orderEntry type="library" name="Maven: org.slf4j:jcl-over-slf4j:1.7.16" level="project" />
58+
<orderEntry type="library" scope="TEST" name="Maven: org.slf4j:jul-to-slf4j:1.7.16" level="project" />
4859
<orderEntry type="library" scope="TEST" name="Maven: org.springframework:spring-test:4.2.5.RELEASE" level="project" />
4960
<orderEntry type="library" name="Maven: org.springframework:spring-core:4.2.5.RELEASE" level="project" />
5061
<orderEntry type="library" name="Maven: org.springframework:spring-web:4.2.5.RELEASE" level="project" />
@@ -54,7 +65,6 @@
5465
<orderEntry type="library" name="Maven: org.springframework:spring-context:4.2.5.RELEASE" level="project" />
5566
<orderEntry type="library" name="Maven: org.springframework:spring-expression:4.2.5.RELEASE" level="project" />
5667
<orderEntry type="library" scope="TEST" name="Maven: org.springframework.boot:spring-boot-starter-logging:1.3.3.RELEASE" level="project" />
57-
<orderEntry type="library" scope="TEST" name="Maven: org.slf4j:jul-to-slf4j:1.7.16" level="project" />
5868
<orderEntry type="library" scope="TEST" name="Maven: org.slf4j:log4j-over-slf4j:1.7.16" level="project" />
5969
<orderEntry type="library" name="Maven: org.springframework.security.oauth:spring-security-oauth2:2.0.9.RELEASE" level="project" />
6070
<orderEntry type="library" name="Maven: org.springframework:spring-webmvc:4.2.5.RELEASE" level="project" />

cloudfoundry-client-spring/pom.xml

+18
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,19 @@
4747
<groupId>com.github.zafarkhaja</groupId>
4848
<artifactId>java-semver</artifactId>
4949
</dependency>
50+
<dependency>
51+
<groupId>com.squareup.okhttp3</groupId>
52+
<artifactId>mockwebserver</artifactId>
53+
<scope>test</scope>
54+
</dependency>
55+
<dependency>
56+
<groupId>io.projectreactor</groupId>
57+
<artifactId>reactor-netty</artifactId>
58+
</dependency>
59+
<dependency>
60+
<groupId>io.netty</groupId>
61+
<artifactId>netty-all</artifactId>
62+
</dependency>
5063
<dependency>
5164
<groupId>junit</groupId>
5265
<artifactId>junit</artifactId>
@@ -84,6 +97,11 @@
8497
<groupId>org.slf4j</groupId>
8598
<artifactId>jcl-over-slf4j</artifactId>
8699
</dependency>
100+
<dependency>
101+
<groupId>org.slf4j</groupId>
102+
<artifactId>jul-to-slf4j</artifactId>
103+
<scope>test</scope>
104+
</dependency>
87105
<dependency>
88106
<groupId>org.springframework</groupId>
89107
<artifactId>spring-test</artifactId>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
/*
2+
* Copyright 2013-2016 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.cloudfoundry.reactor.doppler;
18+
19+
import lombok.Builder;
20+
import org.cloudfoundry.doppler.ContainerMetric;
21+
import org.cloudfoundry.doppler.ContainerMetricsRequest;
22+
import org.cloudfoundry.doppler.CounterEvent;
23+
import org.cloudfoundry.doppler.DopplerClient;
24+
import org.cloudfoundry.doppler.Error;
25+
import org.cloudfoundry.doppler.Event;
26+
import org.cloudfoundry.doppler.FirehoseRequest;
27+
import org.cloudfoundry.doppler.HttpStart;
28+
import org.cloudfoundry.doppler.HttpStartStop;
29+
import org.cloudfoundry.doppler.HttpStop;
30+
import org.cloudfoundry.doppler.LogMessage;
31+
import org.cloudfoundry.doppler.RecentLogsRequest;
32+
import org.cloudfoundry.doppler.StreamRequest;
33+
import org.cloudfoundry.doppler.ValueMetric;
34+
import org.cloudfoundry.dropsonde.events.Envelope;
35+
import org.cloudfoundry.reactor.util.AbstractReactorOperations;
36+
import org.cloudfoundry.reactor.util.AuthorizationProvider;
37+
import org.cloudfoundry.reactor.util.ConnectionContextSupplier;
38+
import reactor.core.publisher.Flux;
39+
import reactor.core.publisher.Mono;
40+
import reactor.core.util.Exceptions;
41+
import reactor.io.netty.http.HttpClient;
42+
import reactor.io.netty.http.HttpInbound;
43+
44+
import java.io.IOException;
45+
import java.io.InputStream;
46+
47+
import static org.cloudfoundry.util.tuple.TupleUtils.consumer;
48+
49+
/**
50+
* The Reactor-based implementation of {@link DopplerClient}
51+
*/
52+
public final class ReactorDopplerClient extends AbstractReactorOperations implements DopplerClient {
53+
54+
@Builder
55+
ReactorDopplerClient(ConnectionContextSupplier cloudFoundryClient) {
56+
this(cloudFoundryClient.getConnectionContext2().getAuthorizationProvider(), cloudFoundryClient.getConnectionContext2().getHttpClient(),
57+
cloudFoundryClient.getConnectionContext2().getRoot("doppler_logging_endpoint"));
58+
}
59+
60+
ReactorDopplerClient(AuthorizationProvider authorizationProvider, HttpClient httpClient, Mono<String> root) {
61+
super(authorizationProvider, httpClient, root);
62+
}
63+
64+
@Override
65+
public Flux<ContainerMetric> containerMetrics(ContainerMetricsRequest request) {
66+
return get(request, consumer((builder, validRequest) -> builder.pathSegment("apps", validRequest.getApplicationId(), "containermetrics")))
67+
.flatMap(inbound -> inbound.receiveMultipart().receiveInputStream())
68+
.map(ReactorDopplerClient::toEnvelope)
69+
.map(ReactorDopplerClient::toEvent);
70+
}
71+
72+
@Override
73+
public Flux<Event> firehose(FirehoseRequest request) {
74+
return ws(request, consumer((builder, validRequest) -> builder.pathSegment("firehose", validRequest.getSubscriptionId())))
75+
.flatMap(HttpInbound::receiveInputStream)
76+
.map(ReactorDopplerClient::toEnvelope)
77+
.map(ReactorDopplerClient::toEvent);
78+
}
79+
80+
@Override
81+
public Flux<LogMessage> recentLogs(RecentLogsRequest request) {
82+
return get(request, consumer((builder, validRequest) -> builder.pathSegment("apps", validRequest.getApplicationId(), "recentlogs")))
83+
.flatMap(inbound -> inbound.receiveMultipart().receiveInputStream())
84+
.map(ReactorDopplerClient::toEnvelope)
85+
.map(ReactorDopplerClient::toEvent);
86+
}
87+
88+
@Override
89+
public Flux<Event> stream(StreamRequest request) {
90+
return ws(request, consumer((builder, validRequest) -> builder.pathSegment("apps", validRequest.getApplicationId(), "stream")))
91+
.flatMap(HttpInbound::receiveInputStream)
92+
.map(ReactorDopplerClient::toEnvelope)
93+
.map(ReactorDopplerClient::toEvent);
94+
}
95+
96+
private static Envelope toEnvelope(InputStream inputStream) {
97+
try {
98+
return Envelope.ADAPTER.decode(inputStream);
99+
} catch (IOException e) {
100+
throw Exceptions.propagate(e);
101+
}
102+
}
103+
104+
@SuppressWarnings("unchecked")
105+
private static <T extends Event> T toEvent(Envelope envelope) {
106+
switch (envelope.eventType) {
107+
case HttpStart:
108+
return (T) HttpStart.builder()
109+
.dropsonde(envelope.httpStart)
110+
.build();
111+
case HttpStop:
112+
return (T) HttpStop.builder()
113+
.dropsonde(envelope.httpStop)
114+
.build();
115+
case HttpStartStop:
116+
return (T) HttpStartStop.builder()
117+
.dropsonde(envelope.httpStartStop)
118+
.build();
119+
case LogMessage:
120+
return (T) LogMessage.builder()
121+
.dropsonde(envelope.logMessage)
122+
.build();
123+
case ValueMetric:
124+
return (T) ValueMetric.builder()
125+
.dropsonde(envelope.valueMetric)
126+
.build();
127+
case CounterEvent:
128+
return (T) CounterEvent.builder()
129+
.dropsonde(envelope.counterEvent)
130+
.build();
131+
case Error:
132+
return (T) Error.builder()
133+
.dropsonde(envelope.error)
134+
.build();
135+
case ContainerMetric:
136+
return (T) ContainerMetric.builder()
137+
.dropsonde(envelope.containerMetric)
138+
.build();
139+
default:
140+
throw new IllegalStateException(String.format("Envelope event type %s is unsupported", envelope.eventType));
141+
}
142+
}
143+
144+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/*
2+
* Copyright 2013-2016 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.cloudfoundry.reactor.util;
18+
19+
20+
import org.cloudfoundry.Validatable;
21+
import org.cloudfoundry.spring.client.v2.CloudFoundryExceptionBuilder;
22+
import org.cloudfoundry.util.ExceptionUtils;
23+
import org.cloudfoundry.util.ValidationUtils;
24+
import org.slf4j.Logger;
25+
import org.slf4j.LoggerFactory;
26+
import org.springframework.web.util.UriComponentsBuilder;
27+
import reactor.core.publisher.Mono;
28+
import reactor.core.tuple.Tuple;
29+
import reactor.core.tuple.Tuple2;
30+
import reactor.io.netty.common.NettyInbound;
31+
import reactor.io.netty.http.HttpClient;
32+
import reactor.io.netty.http.HttpException;
33+
import reactor.io.netty.http.HttpInbound;
34+
import reactor.io.netty.http.HttpOutbound;
35+
36+
import java.util.function.Consumer;
37+
38+
import static org.cloudfoundry.util.tuple.TupleUtils.function;
39+
40+
public abstract class AbstractReactorOperations {
41+
42+
private final Logger logger = LoggerFactory.getLogger("cloudfoundry-client.request");
43+
44+
private final AuthorizationProvider authorizationProvider;
45+
46+
private final HttpClient httpClient;
47+
48+
private final Mono<String> root;
49+
50+
protected AbstractReactorOperations(AuthorizationProvider authorizationProvider, HttpClient httpClient, Mono<String> root) {
51+
this.authorizationProvider = authorizationProvider;
52+
this.httpClient = httpClient;
53+
this.root = root;
54+
}
55+
56+
protected final <REQ extends Validatable, RSP> Mono<RSP> get(REQ request, Class<RSP> responseType, Consumer<Tuple2<UriComponentsBuilder, REQ>> builderCallback) {
57+
return Mono
58+
.when(ValidationUtils.validate(request), this.root)
59+
.map(function((validRequest, root) -> buildUri(root, validRequest, builderCallback)))
60+
.doOnSuccess(uri -> this.logger.debug("GET {}", uri))
61+
.then(uri -> this.httpClient.get(uri,
62+
outbound -> this.authorizationProvider.addAuthorization(outbound)
63+
.then(HttpOutbound::sendHeaders)))
64+
.otherwise(ExceptionUtils.replace(HttpException.class, CloudFoundryExceptionBuilder::build))
65+
.flatMap(NettyInbound::receiveInputStream)
66+
.as(JsonCodec.decode(responseType))
67+
.single();
68+
}
69+
70+
protected final <REQ extends Validatable> Mono<HttpInbound> get(REQ request, Consumer<Tuple2<UriComponentsBuilder, REQ>> builderCallback) {
71+
return Mono
72+
.when(ValidationUtils.validate(request), this.root)
73+
.map(function((validRequest, root) -> buildUri(root, validRequest, builderCallback)))
74+
.doOnSuccess(uri -> this.logger.debug("GET {}", uri))
75+
.then(uri -> this.httpClient.get(uri,
76+
outbound -> this.authorizationProvider.addAuthorization(outbound)
77+
.then(HttpOutbound::sendHeaders)))
78+
.otherwise(ExceptionUtils.replace(HttpException.class, CloudFoundryExceptionBuilder::build));
79+
}
80+
81+
protected final <REQ extends Validatable> Mono<HttpInbound> ws(REQ request, Consumer<Tuple2<UriComponentsBuilder, REQ>> builderCallback) {
82+
return Mono
83+
.when(ValidationUtils.validate(request), this.root)
84+
.map(function((validRequest, root) -> buildUri(root, validRequest, builderCallback)))
85+
.doOnSuccess(uri -> this.logger.debug("WS {}", uri))
86+
.then(uri -> this.httpClient.get(uri,
87+
outbound -> this.authorizationProvider.addAuthorization(outbound)
88+
.then(HttpOutbound::upgradeToWebsocket)))
89+
.otherwise(ExceptionUtils.replace(HttpException.class, CloudFoundryExceptionBuilder::build));
90+
}
91+
92+
private static <REQ extends Validatable> String buildUri(String root, REQ validRequest, Consumer<Tuple2<UriComponentsBuilder, REQ>> builderCallback) {
93+
UriComponentsBuilder builder = UriComponentsBuilder.fromUriString(root);
94+
builderCallback.accept(Tuple.of(builder, validRequest));
95+
return builder.build().encode().toUriString();
96+
}
97+
98+
}

0 commit comments

Comments
 (0)