Skip to content

Commit 2a52bba

Browse files
committed
Initial commit
0 parents  commit 2a52bba

File tree

3 files changed

+202
-0
lines changed

3 files changed

+202
-0
lines changed

Diff for: .gitignore

+2
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
/target
2+
/Cargo.lock

Diff for: Cargo.toml

+15
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
[package]
2+
name = "tower-trace-metrics"
3+
version = "0.1.0"
4+
edition = "2021"
5+
6+
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
7+
8+
[dependencies]
9+
metrics = "0.20.1"
10+
tracing = "0.1.35"
11+
http = "0.2.8"
12+
13+
[dependencies.tower-http]
14+
version = "0.3.0"
15+
features = ["trace"]

Diff for: src/lib.rs

+185
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
// Copyright 2023 Jeremy Wall ([email protected])
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
//! A [metrics] powered [TraceLayer] that works as [Tower](https://crates.io/crates/tower) middleware.
15+
use http::{Request, Response};
16+
use metrics::{histogram, increment_counter, Label};
17+
use std::{
18+
marker::PhantomData,
19+
sync::{
20+
atomic::{AtomicU64, Ordering},
21+
Arc, Mutex,
22+
},
23+
};
24+
use tower_http::{
25+
classify::{ServerErrorsAsFailures, SharedClassifier},
26+
trace::{
27+
DefaultMakeSpan, DefaultOnEos, OnBodyChunk, OnFailure, OnRequest, OnResponse, TraceLayer,
28+
},
29+
};
30+
use tracing;
31+
32+
/// A Metrics Trace Layer using a [MetricsRecorder].
33+
///
34+
/// The layer will record 4 different metrics:
35+
///
36+
/// * http_request_counter
37+
/// * http_request_failure_counter
38+
/// * http_request_size_bytes_hist
39+
/// * http_request_request_time_micros_hist
40+
///
41+
/// Each of the metrics are labled by host, method, and path
42+
pub type MetricsTraceLayer<B, F> = TraceLayer<
43+
SharedClassifier<ServerErrorsAsFailures>,
44+
DefaultMakeSpan,
45+
MetricsRecorder<B, F>,
46+
MetricsRecorder<B, F>,
47+
MetricsRecorder<B, F>,
48+
DefaultOnEos,
49+
MetricsRecorder<B, F>,
50+
>;
51+
52+
/// Holds the state required for recording metrics on a given request.
53+
pub struct MetricsRecorder<B, F>
54+
where
55+
F: Fn(&B) -> u64,
56+
{
57+
/// The labels for each metric we record on this request.
58+
pub labels: Arc<Mutex<Vec<Label>>>,
59+
/// The accumulator for the number of bytes on this request.
60+
pub size: Arc<AtomicU64>,
61+
/// The mapper function to extract the size from a chunk in [OnBodyChunk<B>],
62+
pub chunk_len: Arc<F>,
63+
_phantom: PhantomData<B>,
64+
}
65+
66+
impl<B, F> Clone for MetricsRecorder<B, F>
67+
where
68+
F: Fn(&B) -> u64,
69+
{
70+
fn clone(&self) -> Self {
71+
Self {
72+
labels: self.labels.clone(),
73+
size: self.size.clone(),
74+
chunk_len: self.chunk_len.clone(),
75+
_phantom: self._phantom.clone(),
76+
}
77+
}
78+
}
79+
80+
impl<B, F> MetricsRecorder<B, F>
81+
where
82+
F: Fn(&B) -> u64,
83+
{
84+
/// Construct a new [MetricsRecorder] using the installed [metrics::Recorder].
85+
/// The function passed in is used to extract the size from the chunks in a
86+
/// response for all [OnBodyChunk] calls.
87+
pub fn new(f: F) -> Self {
88+
Self {
89+
labels: Arc::new(Mutex::new(Vec::new())),
90+
size: Arc::new(AtomicU64::new(0)),
91+
chunk_len: Arc::new(f),
92+
_phantom: PhantomData,
93+
}
94+
}
95+
}
96+
97+
impl<B, F> OnBodyChunk<B> for MetricsRecorder<B, F>
98+
where
99+
F: Fn(&B) -> u64,
100+
{
101+
fn on_body_chunk(&mut self, chunk: &B, _latency: std::time::Duration, _span: &tracing::Span) {
102+
let _ = self
103+
.size
104+
.fetch_add(self.chunk_len.as_ref()(chunk), Ordering::SeqCst);
105+
}
106+
}
107+
108+
impl<B, FailureClass, F> OnFailure<FailureClass> for MetricsRecorder<B, F>
109+
where
110+
F: Fn(&B) -> u64,
111+
{
112+
fn on_failure(
113+
&mut self,
114+
_failure_classification: FailureClass,
115+
_latency: std::time::Duration,
116+
_span: &tracing::Span,
117+
) {
118+
let labels = self.labels.lock().expect("Failed to unlock labels").clone();
119+
increment_counter!("http_request_failure_counter", labels);
120+
}
121+
}
122+
123+
impl<B, RB, F> OnResponse<RB> for MetricsRecorder<B, F>
124+
where
125+
F: Fn(&B) -> u64,
126+
{
127+
fn on_response(
128+
self,
129+
_response: &Response<RB>,
130+
latency: std::time::Duration,
131+
_span: &tracing::Span,
132+
) {
133+
let labels = self.labels.lock().expect("Failed to unlock labels").clone();
134+
histogram!(
135+
"http_request_time_micros_hist",
136+
latency.as_micros() as f64,
137+
labels.clone()
138+
);
139+
histogram!(
140+
"http_request_size_bytes_hist",
141+
self.size.as_ref().load(Ordering::SeqCst) as f64,
142+
labels
143+
)
144+
}
145+
}
146+
147+
fn make_request_lables(path: String, host: String, method: String) -> Vec<Label> {
148+
vec![
149+
Label::new("path", path),
150+
Label::new("host", host),
151+
Label::new("method", method),
152+
]
153+
}
154+
155+
impl<B, RB, F> OnRequest<RB> for MetricsRecorder<B, F>
156+
where
157+
F: Fn(&B) -> u64,
158+
{
159+
fn on_request(&mut self, request: &Request<RB>, _span: &tracing::Span) {
160+
let path = request.uri().path().to_lowercase();
161+
let host = request.uri().host().unwrap_or("").to_lowercase();
162+
let method = request.method().to_string();
163+
164+
let labels = make_request_lables(path, host, method);
165+
let mut labels_lock = self.labels.lock().expect("Failed to unlock labels");
166+
(*labels_lock.as_mut()) = labels.clone();
167+
increment_counter!("http_request_counter", labels);
168+
}
169+
}
170+
171+
/// Construct a [TraceLayer] that will use an installed [metrics::Recorder] to record metrics per request.
172+
/// The provided [Fn] is used to extract the size from the chunks in a
173+
/// response for all [OnBodyChunk] calls.
174+
pub fn make_layer<B, F>(f: F) -> MetricsTraceLayer<B, F>
175+
where
176+
F: Fn(&B) -> u64,
177+
{
178+
let metrics_recorder = MetricsRecorder::new(f);
179+
let layer = TraceLayer::new_for_http()
180+
.on_body_chunk(metrics_recorder.clone())
181+
.on_request(metrics_recorder.clone())
182+
.on_response(metrics_recorder.clone())
183+
.on_failure(metrics_recorder.clone());
184+
layer
185+
}

0 commit comments

Comments
 (0)