-
Notifications
You must be signed in to change notification settings - Fork 246
/
Copy pathlib.rs
105 lines (92 loc) · 3.46 KB
/
lib.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
use hdrhistogram::serialization::interval_log;
use hdrhistogram::Histogram;
use std::time::Duration;
#[derive(Default, Clone)]
pub struct Timeline {
// these are logarithmically spaced
// the first histogram is 0-1s after start, the second 1-2s after start, then 2-4s, etc.
histograms: Vec<Histograms>,
total_duration: Duration,
}
#[derive(Clone)]
pub struct Histograms {
processing: Histogram<u64>,
sojourn: Histogram<u64>,
}
impl Default for Histograms {
fn default() -> Self {
Self {
processing: Histogram::new_with_bounds(1, 60_000_000, 3).unwrap(),
sojourn: Histogram::new_with_bounds(1, 60_000_000, 3).unwrap(),
}
}
}
impl Histograms {
pub fn processing(&mut self, time: u64) {
self.processing.saturating_record(time);
}
pub fn sojourn(&mut self, time: u64) {
self.sojourn.saturating_record(time);
}
pub fn merge(&mut self, other: &Self) {
self.processing.add(&other.processing).expect("same bounds");
self.sojourn.add(&other.sojourn).expect("same bounds");
}
}
impl Timeline {
pub fn set_total_duration(&mut self, total: Duration) {
self.total_duration = total;
}
pub fn histogram_for(&mut self, issued_at: Duration) -> &mut Histograms {
let hist = ((issued_at.as_secs_f64() + 0.000000000001).ceil() as usize)
.next_power_of_two()
.trailing_zeros() as usize;
if hist >= self.histograms.len() {
self.histograms.resize(hist + 1, Histograms::default());
}
self.histograms.get_mut(hist).unwrap()
}
pub fn merge(&mut self, other: &Self) {
for (ti, other_hs) in other.histograms.iter().enumerate() {
if let Some(self_hs) = self.histograms.get_mut(ti) {
self_hs.merge(other_hs);
} else {
self.histograms.push(other_hs.clone());
}
}
}
pub fn write<W: std::io::Write, S: hdrhistogram::serialization::Serializer>(
&self,
w: &mut interval_log::IntervalLogWriter<W, S>,
) -> Result<(), interval_log::IntervalLogWriterError<S::SerializeError>> {
let proc_tag = interval_log::Tag::new("processing").unwrap();
let sjrn_tag = interval_log::Tag::new("sojourn").unwrap();
for (i, hs) in self.histograms.iter().enumerate() {
let start = Duration::from_secs((1 << i) >> 1);
let mut dur = Duration::from_secs(1 << i) - start;
if self.total_duration != Duration::new(0, 0) && start + dur > self.total_duration {
dur = self.total_duration - start;
}
w.write_histogram(&hs.processing, start, dur, Some(proc_tag))?;
w.write_histogram(&hs.sojourn, start, dur, Some(sjrn_tag))?;
}
Ok(())
}
pub fn last(&self) -> Option<(&Histogram<u64>, &Histogram<u64>)> {
self.histograms.last().map(|h| (&h.processing, &h.sojourn))
}
pub fn collapse(&self) -> (Histogram<u64>, Histogram<u64>) {
let mut hists = self.histograms.iter();
if let Some(hs) = hists.next() {
let mut proc = hs.processing.clone();
let mut sjrn = hs.sojourn.clone();
for hs in hists {
proc.add(&hs.processing).expect("same bounds");
sjrn.add(&hs.sojourn).expect("same bounds");
}
(proc, sjrn)
} else {
(Histogram::new(1).unwrap(), Histogram::new(1).unwrap())
}
}
}