Skip to content

Add tapir path matching within pekko instrumentation #13386

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Mar 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 51 additions & 4 deletions instrumentation/pekko/pekko-http-1.0/javaagent/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -10,35 +10,79 @@ muzzle {
versions.set("[1.0,)")
assertInverse.set(true)
extraDependency("org.apache.pekko:pekko-stream_2.12:1.0.1")
excludeInstrumentationName("tapir-pekko-http-server")
}
pass {
group.set("org.apache.pekko")
module.set("pekko-http_2.13")
versions.set("[1.0,)")
assertInverse.set(true)
extraDependency("org.apache.pekko:pekko-stream_2.13:1.0.1")
excludeInstrumentationName("tapir-pekko-http-server")
}
pass {
group.set("org.apache.pekko")
module.set("pekko-http_3")
versions.set("[1.0,)")
assertInverse.set(true)
extraDependency("org.apache.pekko:pekko-stream_3:1.0.1")
excludeInstrumentationName("tapir-pekko-http-server")
}
pass {
group.set("com.softwaremill.sttp.tapir")
module.set("tapir-pekko-http-server_2.12")
versions.set("[1.7,)")
assertInverse.set(true)
excludeInstrumentationName("pekko-http-server")
}
pass {
group.set("com.softwaremill.sttp.tapir")
module.set("tapir-pekko-http-server_2.13")
versions.set("[1.7,)")
assertInverse.set(true)
excludeInstrumentationName("pekko-http-server")
}
pass {
group.set("com.softwaremill.sttp.tapir")
module.set("tapir-pekko-http-server_3")
versions.set("[1.7,)")
assertInverse.set(true)
excludeInstrumentationName("pekko-http-server")
}
}

dependencies {
library("org.apache.pekko:pekko-http_2.12:1.0.0")
library("org.apache.pekko:pekko-stream_2.12:1.0.1")

testImplementation("com.softwaremill.sttp.tapir:tapir-pekko-http-server_2.12:1.7.0")
compileOnly("com.softwaremill.sttp.tapir:tapir-pekko-http-server_2.12:1.7.0")

testInstrumentation(project(":instrumentation:pekko:pekko-actor-1.0:javaagent"))
testInstrumentation(project(":instrumentation:executors:javaagent"))

latestDepTestLibrary("org.apache.pekko:pekko-http_2.13:latest.release")
latestDepTestLibrary("org.apache.pekko:pekko-stream_2.13:latest.release")
latestDepTestLibrary("com.softwaremill.sttp.tapir:tapir-pekko-http-server_2.13:latest.release")
}

testing {
suites {
val tapirTest by registering(JvmTestSuite::class) {
dependencies {
// this only exists to make Intellij happy since it doesn't (currently at least) understand our
// inclusion of this artifact inside :testing-common
compileOnly(project.dependencies.project(":testing:armeria-shaded-for-testing", configuration = "shadow"))

if (findProperty("testLatestDeps") as Boolean) {
implementation("com.typesafe.akka:akka-http_2.13:latest.release")
implementation("com.typesafe.akka:akka-stream_2.13:latest.release")
implementation("com.softwaremill.sttp.tapir:tapir-pekko-http-server_2.13:latest.release")
} else {
implementation("org.apache.pekko:pekko-http_2.12:1.0.0")
implementation("org.apache.pekko:pekko-stream_2.12:1.0.1")
implementation("com.softwaremill.sttp.tapir:tapir-pekko-http-server_2.12:1.7.0")
}
}
}
}
}

tasks {
Expand All @@ -51,6 +95,10 @@ tasks {

systemProperty("testLatestDeps", findProperty("testLatestDeps") as Boolean)
}

check {
dependsOn(testing.suites)
}
}

if (findProperty("testLatestDeps") as Boolean) {
Expand All @@ -59,7 +107,6 @@ if (findProperty("testLatestDeps") as Boolean) {
testImplementation {
exclude("org.apache.pekko", "pekko-http_2.12")
exclude("org.apache.pekko", "pekko-stream_2.12")
exclude("com.softwaremill.sttp.tapir", "tapir-pekko-http-server_2.12")
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.pekkohttp.v1_0.server.tapir;

import io.opentelemetry.javaagent.instrumentation.pekkohttp.v1_0.server.route.PekkoRouteHolder;
import org.apache.pekko.http.scaladsl.server.RequestContext;
import org.apache.pekko.http.scaladsl.server.RouteResult;
import scala.Function1;
import scala.Function2;
import scala.Option;
import scala.PartialFunction;
import scala.Unit;
import scala.concurrent.Future;
import scala.util.Try;
import sttp.tapir.EndpointInput;
import sttp.tapir.server.ServerEndpoint;

public class RouteWrapper implements Function1<RequestContext, Future<RouteResult>> {
private final Function1<RequestContext, Future<RouteResult>> route;
private final ServerEndpoint<?, ?> serverEndpoint;

public RouteWrapper(
ServerEndpoint<?, ?> serverEndpoint, Function1<RequestContext, Future<RouteResult>> route) {
this.route = route;
this.serverEndpoint = serverEndpoint;
}

public class Finalizer implements PartialFunction<Try<RouteResult>, Unit> {
@Override
public boolean isDefinedAt(Try<RouteResult> tryResult) {
return tryResult.isSuccess();
}

@Override
public Unit apply(Try<RouteResult> tryResult) {
if (tryResult.isSuccess()) {
RouteResult result = tryResult.get();
if (result.getClass() == RouteResult.Complete.class) {
String path =
serverEndpoint.showPathTemplate(
(index, pc) ->
pc.name().isDefined() ? "{" + pc.name().get() + "}" : "{param" + index + "}",
Option.apply(
(Function2<Object, EndpointInput.Query<?>, String>)
(index, q) -> q.name() + "={" + q.name() + "}"),
false,
"*",
Option.apply("*"),
Option.apply("*"));

PekkoRouteHolder.push(path);
PekkoRouteHolder.endMatched();
}
}
return null;
}
}

@Override
public Future<RouteResult> apply(RequestContext ctx) {
return route.apply(ctx).andThen(new Finalizer(), ctx.executionContext());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.pekkohttp.v1_0.server.tapir;

import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;

import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.pekko.http.scaladsl.server.RequestContext;
import org.apache.pekko.http.scaladsl.server.RouteResult;
import scala.Function1;
import scala.concurrent.Future;
import sttp.tapir.server.ServerEndpoint;

public class TapirPathInstrumentation implements TypeInstrumentation {

@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("sttp.tapir.server.pekkohttp.PekkoHttpServerInterpreter");
}

@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
named("toRoute").and(takesArgument(0, named("sttp.tapir.server.ServerEndpoint"))),
this.getClass().getName() + "$ApplyAdvice");
}

@SuppressWarnings("unused")
public static class ApplyAdvice {
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void onExit(
@Advice.Argument(0) ServerEndpoint<?, ?> endpoint,
@Advice.Return(readOnly = false) Function1<RequestContext, Future<RouteResult>> route) {
route = new RouteWrapper(endpoint, route);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.pekkohttp.v1_0.server.tapir;

import static java.util.Collections.singletonList;

import com.google.auto.service.AutoService;
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.internal.ExperimentalInstrumentationModule;
import java.util.List;

@AutoService(InstrumentationModule.class)
public class TapirPekkoHttpServerRouteInstrumentationModule extends InstrumentationModule
implements ExperimentalInstrumentationModule {
public TapirPekkoHttpServerRouteInstrumentationModule() {
super(
"pekko-http",
"pekko-http-1.0",
"pekko-http-server",
"pekko-http-server-route",
"tapir-pekko-http-server",
"tapir-pekko-http-server-route");
}

@Override
public String getModuleGroup() {
return "pekko-server";
}

@Override
public List<TypeInstrumentation> typeInstrumentations() {
return singletonList(new TapirPathInstrumentation());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.pekkohttp.v1_0

import io.opentelemetry.instrumentation.test.utils.PortUtils
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension
import io.opentelemetry.sdk.testing.assertj.{SpanDataAssert, TraceAssert}
import io.opentelemetry.testing.internal.armeria.client.WebClient
import io.opentelemetry.testing.internal.armeria.common.{
AggregatedHttpRequest,
HttpMethod
}
import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.http.scaladsl.Http
import org.apache.pekko.http.scaladsl.server.Directives.{
IntNumber,
complete,
concat,
path,
pathEndOrSingleSlash,
pathPrefix,
pathSingleSlash
}
import org.apache.pekko.http.scaladsl.server.Route
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.extension.RegisterExtension
import org.junit.jupiter.api.{AfterAll, Test, TestInstance}
import sttp.tapir._
import sttp.tapir.server.pekkohttp.PekkoHttpServerInterpreter

import java.net.{URI, URISyntaxException}
import java.util.function.Consumer
import scala.concurrent.duration.DurationInt
import scala.concurrent.{Await, Future}

@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class TapirHttpServerRouteTest {
@RegisterExtension private val testing: AgentInstrumentationExtension =
AgentInstrumentationExtension.create
private val client: WebClient = WebClient.of()

implicit val system: ActorSystem = ActorSystem("my-system")

private def buildAddress(port: Int): URI = try
new URI("http://localhost:" + port + "/")
catch {
case exception: URISyntaxException =>
throw new IllegalStateException(exception)
}

@Test def testSimple(): Unit = {
val route = path("test") {
complete("ok")
}

test(route, "/test", "GET /test")
}

@Test def testRoute(): Unit = {
val route = concat(
pathEndOrSingleSlash {
complete("root")
},
pathPrefix("test") {
concat(
pathSingleSlash {
complete("test")
},
path(IntNumber) { _ =>
complete("ok")
}
)
}
)

test(route, "/test/1", "GET /test/*")
}

@Test def testTapirRoutes(): Unit = {
val interpreter = PekkoHttpServerInterpreter()(system.dispatcher)
def makeRoute(input: EndpointInput[Unit]) = {
interpreter.toRoute(
endpoint.get
.in(input)
.errorOut(stringBody)
.out(stringBody)
.serverLogicPure[Future](_ => Right("ok"))
)
}

val routes = concat(
concat(makeRoute("test" / "1"), makeRoute("test" / "2")),
concat(makeRoute("test" / "3"), makeRoute("test" / "4"))
)

test(routes, "/test/4", "GET /test/4")
}

def test(route: Route, path: String, spanName: String): Unit = {
val port = PortUtils.findOpenPort
val address: URI = buildAddress(port)
val binding =
Await.result(Http().bindAndHandle(route, "localhost", port), 10.seconds)
try {
val request = AggregatedHttpRequest.of(
HttpMethod.GET,
address.resolve(path).toString
)
val response = client.execute(request).aggregate.join
assertThat(response.status.code).isEqualTo(200)
assertThat(response.contentUtf8).isEqualTo("ok")

testing.waitAndAssertTraces(new Consumer[TraceAssert] {
override def accept(trace: TraceAssert): Unit =
trace.hasSpansSatisfyingExactly(new Consumer[SpanDataAssert] {
override def accept(span: SpanDataAssert): Unit = {
span.hasName(spanName)
}
})
})
} finally {
binding.unbind()
}
}

@AfterAll
def cleanUp(): Unit = {
system.terminate()
}
}
Loading
Loading