Skip to content

Commit 0e2a6f4

Browse files
committed
fixup
1 parent d9bab31 commit 0e2a6f4

File tree

12 files changed

+185
-93
lines changed

12 files changed

+185
-93
lines changed

Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -276,8 +276,8 @@ chrono-tz = { version = "0.8", features = ["serde"] }
276276
cidr = { version = "0.2.2" }
277277
clap = { version = "4.4.2", features = ["derive"] }
278278
comfy-table = "7"
279-
convert_case = "0.6.0"
280279
concurrent-queue = "2.5.0"
280+
convert_case = "0.6.0"
281281
cookie = "0.18.1"
282282
crc32fast = "1.3.2"
283283
criterion = "0.5"

scripts/ci/deploy/config/databend-query-node-otlp-logs.toml

+1
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ pkey2 = "pvalue2"
6767

6868
[log.persistentlog]
6969
on = true
70+
level = "INFO"
7071

7172
[meta]
7273
endpoints = ["0.0.0.0:9191"]

src/common/tracing/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ arrow-schema = { workspace = true }
1717
async-channel = { workspace = true }
1818
backtrace = { workspace = true, features = ["std", "serialize-serde"] }
1919
chrono = { workspace = true }
20+
concurrent-queue = { workspace = true }
2021
databend-common-base = { workspace = true }
2122
databend-common-exception = { workspace = true }
2223
defer = { workspace = true }
@@ -34,7 +35,6 @@ parquet = { workspace = true }
3435
serde = { workspace = true }
3536
serde_json = { workspace = true }
3637
tonic = { workspace = true }
37-
concurrent-queue = { workspace = true }
3838

3939
[dev-dependencies]
4040

src/common/tracing/src/config.rs

+4-6
Original file line numberDiff line numberDiff line change
@@ -349,15 +349,14 @@ pub struct PersistentLogConfig {
349349
pub stage_name: String,
350350
pub level: String,
351351
pub retention: usize,
352-
pub retention_frequency: usize,
353352
}
354353

355354
impl Display for PersistentLogConfig {
356355
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
357356
write!(
358357
f,
359-
"enabled={}, interval={}, stage_name={}, level={}, retention={}, retention_frequency={}",
360-
self.on, self.interval, self.stage_name, self.level, self.retention, self.retention_frequency
358+
"enabled={}, interval={}, stage_name={}, level={}, retention={}",
359+
self.on, self.interval, self.stage_name, self.level, self.retention
361360
)
362361
}
363362
}
@@ -366,11 +365,10 @@ impl Default for PersistentLogConfig {
366365
fn default() -> Self {
367366
Self {
368367
on: false,
369-
interval: 8,
368+
interval: 2,
370369
stage_name: "log_1f93b76af0bd4b1d8e018667865fbc65".to_string(),
371-
retention: 24,
370+
retention: 72,
372371
level: "WARN".to_string(),
373-
retention_frequency: 40,
374372
}
375373
}
376374
}

src/common/tracing/src/lib.rs

+2
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,10 @@ pub use crate::init::GlobalLogger;
4848
pub use crate::panic_hook::log_panic;
4949
pub use crate::panic_hook::set_panic_hook;
5050
pub use crate::remote_log::convert_to_batch;
51+
pub use crate::remote_log::LogBuffer as RemoteLogBuffer;
5152
pub use crate::remote_log::RemoteLog;
5253
pub use crate::remote_log::RemoteLogElement;
54+
pub use crate::remote_log::RemoteLogGuard;
5355
pub use crate::structlog::DummyReporter;
5456
pub use crate::structlog::StructLogReporter;
5557

src/common/tracing/src/remote_log.rs

+10-10
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ pub struct RemoteLogGuard {
9292
impl Drop for RemoteLogGuard {
9393
fn drop(&mut self) {
9494
// buffer collect will send all logs and trigger the log flush
95-
if let Ok(_) = self.buffer.collect() {
95+
if self.buffer.collect().is_ok() {
9696
// wait for the log to be flushed
9797
std::thread::sleep(Duration::from_secs(3));
9898
}
@@ -104,6 +104,7 @@ impl RemoteLog {
104104
labels: &BTreeMap<String, String>,
105105
cfg: &Config,
106106
) -> Result<(RemoteLog, Box<RemoteLogGuard>)> {
107+
// all interval in RemoteLog is microseconds
107108
let interval = Duration::from_secs(cfg.persistentlog.interval as u64).as_micros();
108109
let stage_name = cfg.persistentlog.stage_name.clone();
109110
let node_id = labels.get("node_id").cloned().unwrap_or_default();
@@ -130,7 +131,6 @@ impl RemoteLog {
130131
while let Ok(log_element) = receiver.recv().await {
131132
Self::flush(log_element, stage_name).await;
132133
}
133-
// receiver close will trigger interval_flush exit
134134
let _ = receiver.close();
135135
}
136136

@@ -174,13 +174,13 @@ impl RemoteLog {
174174
Ok(())
175175
}
176176

177-
fn prepare_log_element(&self, record: &Record) -> RemoteLogElement {
177+
pub fn prepare_log_element(&self, record: &Record) -> RemoteLogElement {
178178
let mut query_id = ThreadTracker::query_id().cloned();
179179
let mut fields = Map::new();
180180
let target = record.target().to_string();
181181
match target.as_str() {
182182
"databend::log::profile" | "databend::log::query" => {
183-
if let Err(_) = handle_profile_and_query(&mut fields, &mut query_id, &record) {
183+
if handle_profile_and_query(&mut fields, &mut query_id, record).is_err() {
184184
fields.insert("message".to_string(), format!("{}", record.args()).into());
185185
}
186186
}
@@ -237,15 +237,16 @@ impl LogBuffer {
237237
pub fn new(sender: Sender<Vec<RemoteLogElement>>, interval: u64) -> Self {
238238
Self {
239239
queue: ConcurrentQueue::unbounded(),
240-
last_collect: AtomicU64::new(0),
240+
last_collect: AtomicU64::new(chrono::Local::now().timestamp_micros() as u64),
241241
sender,
242242
interval,
243243
}
244244
}
245245

246+
/// log will trigger a collect either when the buffer is full or the interval is reached
246247
pub fn log(&self, log_element: RemoteLogElement) -> anyhow::Result<()> {
247248
self.queue.push(log_element)?;
248-
if self.queue.len() > Self::MAX_BUFFER_SIZE {
249+
if self.queue.len() >= Self::MAX_BUFFER_SIZE {
249250
self.last_collect.store(
250251
chrono::Local::now().timestamp_micros() as u64,
251252
Ordering::SeqCst,
@@ -254,10 +255,9 @@ impl LogBuffer {
254255
}
255256
let now = chrono::Local::now().timestamp_micros() as u64;
256257
let last = self.last_collect.load(Ordering::SeqCst);
257-
if now - last > self.interval {
258-
if self.last_collect.fetch_max(now, Ordering::SeqCst) == last {
259-
self.collect()?;
260-
}
258+
if now - last > self.interval && self.last_collect.fetch_max(now, Ordering::SeqCst) == last
259+
{
260+
self.collect()?;
261261
}
262262
Ok(())
263263
}

src/common/tracing/tests/it/remote_log.rs

+111-27
Original file line numberDiff line numberDiff line change
@@ -12,17 +12,124 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::collections::BTreeMap;
16+
use std::sync::Arc;
17+
use std::time::Duration;
18+
19+
use async_channel::bounded;
1520
use databend_common_base::base::tokio;
1621
use databend_common_exception::Result;
1722
use databend_common_tracing::convert_to_batch;
23+
use databend_common_tracing::Config;
1824
use databend_common_tracing::RemoteLog;
25+
use databend_common_tracing::RemoteLogBuffer;
1926
use databend_common_tracing::RemoteLogElement;
27+
use databend_common_tracing::RemoteLogGuard;
28+
use log::Level;
29+
use log::Record;
2030
use opendal::services;
2131
use opendal::Operator;
32+
use serde_json::Value;
33+
34+
fn setup() -> Result<(RemoteLog, Box<RemoteLogGuard>)> {
35+
let mut labels = BTreeMap::new();
36+
labels.insert("cluster_id".into(), "cluster_id1".into());
37+
labels.insert("node_id".into(), "node_id1".into());
38+
let (remote_log, guard) = RemoteLog::new(&labels, &Config::default())?;
39+
40+
Ok((remote_log, guard))
41+
}
42+
43+
fn get_remote_log_elements() -> RemoteLogElement {
44+
RemoteLogElement {
45+
timestamp: chrono::Local::now().timestamp_micros(),
46+
path: "databend_query::interpreters::common::query_log: query_log.rs:71".to_string(),
47+
target: "databend::log::query".to_string(),
48+
cluster_id: "test_cluster".to_string(),
49+
node_id: "izs9przqhAN4n5hbJanJm2".to_string(),
50+
query_id: Some("89ad07ad-83fe-4424-8005-4c5b318a7212".to_string()),
51+
warehouse_id: None,
52+
log_level: "INFO".to_string(),
53+
fields: r#"{"message":"test"}"#.to_string(),
54+
}
55+
}
56+
57+
#[test]
58+
fn test_basic_parse() -> Result<()> {
59+
let (remote_log, _guard) = setup()?;
60+
let record = Record::builder()
61+
.args(format_args!("begin to list files"))
62+
.level(Level::Info)
63+
.target("databend_query::sessions::query_ctx")
64+
.module_path(Some("databend_query::sessions::query_ctx"))
65+
.file(Some("query_ctx.rs"))
66+
.line(Some(656))
67+
.build();
68+
69+
let remote_log_element = remote_log.prepare_log_element(&record);
70+
71+
assert_eq!(remote_log_element.cluster_id, "cluster_id1");
72+
assert_eq!(remote_log_element.node_id, "node_id1");
73+
assert_eq!(
74+
remote_log_element.path,
75+
"databend_query::sessions::query_ctx: query_ctx.rs:656"
76+
);
77+
78+
let fields: Value = serde_json::from_str(&remote_log_element.fields)?;
79+
80+
assert_eq!(fields["message"], "begin to list files");
81+
82+
Ok(())
83+
}
2284

2385
#[test]
24-
fn test_convert_to_batch() {
25-
let _ = convert_to_batch(get_remote_log()).unwrap();
86+
fn test_convert_to_batch() -> Result<()> {
87+
let elements = vec![get_remote_log_elements()];
88+
let _ = convert_to_batch(elements).unwrap();
89+
Ok(())
90+
}
91+
92+
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
93+
async fn test_buffer_flush_with_buffer_limit() -> Result<()> {
94+
let (tx, rx) = bounded(10);
95+
let interval = Duration::from_secs(100).as_micros() as u64;
96+
let buffer = Arc::new(RemoteLogBuffer::new(tx.clone(), interval));
97+
for _i in 0..5005 {
98+
buffer.log(get_remote_log_elements())?
99+
}
100+
let res = rx.recv().await.unwrap();
101+
assert_eq!(res.len(), 5000);
102+
Ok(())
103+
}
104+
105+
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
106+
async fn test_buffer_flush_with_buffer_interval() -> Result<()> {
107+
let (tx, rx) = bounded(10);
108+
let interval = Duration::from_secs(1).as_micros() as u64;
109+
let buffer = Arc::new(RemoteLogBuffer::new(tx.clone(), interval));
110+
for _i in 0..5 {
111+
buffer.log(get_remote_log_elements())?
112+
}
113+
tokio::time::sleep(Duration::from_secs(1)).await;
114+
buffer.log(get_remote_log_elements())?;
115+
let res = rx.recv().await.unwrap();
116+
assert_eq!(res.len(), 6);
117+
Ok(())
118+
}
119+
120+
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
121+
async fn test_buffer_flush_with_force_collect() -> Result<()> {
122+
// This simulates guard is dropped and collect is called
123+
let (tx, rx) = bounded(10);
124+
let interval = Duration::from_secs(100).as_micros() as u64;
125+
let buffer = Arc::new(RemoteLogBuffer::new(tx.clone(), interval));
126+
for _i in 0..500 {
127+
buffer.log(get_remote_log_elements())?
128+
}
129+
buffer.collect()?;
130+
let res = rx.recv().await.unwrap();
131+
assert_eq!(res.len(), 500);
132+
Ok(())
26133
}
27134

28135
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
@@ -31,35 +138,12 @@ async fn test_do_flush() -> Result<()> {
31138
let op = Operator::new(builder)?.finish();
32139

33140
let path = "test_logs.parquet";
34-
35-
let result = RemoteLog::do_flush(op.clone(), get_remote_log(), path).await;
141+
let elements = vec![get_remote_log_elements()];
142+
let result = RemoteLog::do_flush(op.clone(), elements, path).await;
36143
assert!(result.is_ok());
37144

38145
let exists = op.exists(path).await?;
39146
assert!(exists);
40147

41148
Ok(())
42149
}
43-
44-
fn get_remote_log() -> Vec<RemoteLogElement> {
45-
vec![
46-
RemoteLogElement {
47-
timestamp: 1741680446186595,
48-
path: "test_path".to_string(),
49-
messages: r#"{"message":"MetaGrpcClient(0.0.0.0:9191)::worker spawned"}"#.to_string(),
50-
cluster_id: "cluster_id".to_string(),
51-
node_id: "node_id".to_string(),
52-
query_id: None,
53-
log_level: "INFO".to_string(),
54-
},
55-
RemoteLogElement {
56-
timestamp: 1741680446186596,
57-
path: "test_path2".to_string(),
58-
messages: r#"{"conf":"RpcClientConf { endpoints: [\"0.0.0.0:9191\"], username: \"root\", password: \"root\", tls_conf: None, timeout: Some(60s), auto_sync_interval: Some(60s), unhealthy_endpoint_evict_time: 120s }","message":"use remote meta"}"#.to_string(),
59-
cluster_id: "cluster_id".to_string(),
60-
node_id: "node_id".to_string(),
61-
query_id: Some("query_id".to_string()),
62-
log_level: "DEBUG".to_string(),
63-
},
64-
]
65-
}

src/query/config/src/config.rs

+4-18
Original file line numberDiff line numberDiff line change
@@ -2081,10 +2081,8 @@ impl TryInto<InnerLogConfig> for LogConfig {
20812081

20822082
let mut persistentlog: InnerPersistentLogConfig = self.persistentlog.try_into()?;
20832083

2084-
if persistentlog.on && persistentlog.level.is_empty() {
2085-
if file.on && !file.level.is_empty() {
2086-
persistentlog.level = file.level.clone();
2087-
}
2084+
if persistentlog.on && persistentlog.level.is_empty() && file.on && !file.level.is_empty() {
2085+
persistentlog.level = file.level.clone();
20882086
}
20892087

20902088
Ok(InnerLogConfig {
@@ -2518,7 +2516,7 @@ pub struct PersistentLogConfig {
25182516
#[clap(
25192517
long = "log-persistentlog-interval",
25202518
value_name = "VALUE",
2521-
default_value = "8"
2519+
default_value = "2"
25222520
)]
25232521
#[serde(rename = "interval")]
25242522
pub log_persistentlog_interval: usize,
@@ -2536,7 +2534,7 @@ pub struct PersistentLogConfig {
25362534
#[clap(
25372535
long = "log-persistentlog-retention",
25382536
value_name = "VALUE",
2539-
default_value = "24"
2537+
default_value = "72"
25402538
)]
25412539
#[serde(rename = "retention")]
25422540
pub log_persistentlog_retention: usize,
@@ -2549,16 +2547,6 @@ pub struct PersistentLogConfig {
25492547
)]
25502548
#[serde(rename = "level")]
25512549
pub log_persistentlog_level: String,
2552-
2553-
/// Specifies how often the persistent log retention operation should be triggered, in terms of the number of `interval`
2554-
/// e.g. for default value of `interval` 8 and `retention_frequency` 40, the retention operation will be triggered every 320 seconds
2555-
#[clap(
2556-
long = "log-persistentlog-retention-frequency",
2557-
value_name = "VALUE",
2558-
default_value = "40"
2559-
)]
2560-
#[serde(rename = "retention_frequency")]
2561-
pub log_persistentlog_retention_frequency: usize,
25622550
}
25632551

25642552
impl Default for PersistentLogConfig {
@@ -2577,7 +2565,6 @@ impl TryInto<InnerPersistentLogConfig> for PersistentLogConfig {
25772565
stage_name: self.log_persistentlog_stage_name,
25782566
level: self.log_persistentlog_level,
25792567
retention: self.log_persistentlog_retention,
2580-
retention_frequency: self.log_persistentlog_retention_frequency,
25812568
})
25822569
}
25832570
}
@@ -2590,7 +2577,6 @@ impl From<InnerPersistentLogConfig> for PersistentLogConfig {
25902577
log_persistentlog_stage_name: inner.stage_name,
25912578
log_persistentlog_level: inner.level,
25922579
log_persistentlog_retention: inner.retention,
2593-
log_persistentlog_retention_frequency: inner.retention_frequency,
25942580
}
25952581
}
25962582
}

0 commit comments

Comments
 (0)