Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for storing logs in table #17598

Merged
merged 26 commits into from
Mar 29, 2025
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions src/binaries/query/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use databend_common_tracing::set_panic_hook;
use databend_enterprise_background_service::get_background_service_handler;
use databend_query::clusters::ClusterDiscovery;
use databend_query::local;
use databend_query::persistent_log::GlobalPersistentLog;
use databend_query::servers::admin::AdminService;
use databend_query::servers::flight::FlightService;
use databend_query::servers::metrics::MetricService;
Expand Down Expand Up @@ -282,6 +283,10 @@ pub async fn start_services(conf: &InnerConfig) -> Result<(), MainError> {
if conf.log.structlog.on {
println!(" structlog: {}", conf.log.structlog);
}
if conf.log.persistentlog.on {
GlobalPersistentLog::instance().initialized();
println!(" persistentlog: {}", conf.log.persistentlog);
}

println!();
println!(
Expand Down
5 changes: 5 additions & 0 deletions src/common/tracing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ doctest = false
test = true

[dependencies]
anyhow = { workspace = true }
arrow-array = { workspace = true }
arrow-schema = { workspace = true }
backtrace = { workspace = true, features = ["std", "serialize-serde"] }
chrono = { workspace = true }
databend-common-base = { workspace = true }
Expand All @@ -22,9 +25,11 @@ itertools = { workspace = true }
libc = { workspace = true }
log = { workspace = true }
logforth = { workspace = true }
opendal = { workspace = true }
opentelemetry = { workspace = true }
opentelemetry-otlp = { workspace = true, features = ["reqwest-client"] }
opentelemetry_sdk = { workspace = true }
parquet = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
tonic = { workspace = true }
Expand Down
32 changes: 32 additions & 0 deletions src/common/tracing/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ pub struct Config {
pub profile: ProfileLogConfig,
pub structlog: StructLogConfig,
pub tracing: TracingConfig,
pub persistentlog: PersistentLogConfig,
}

impl Config {
Expand Down Expand Up @@ -340,3 +341,34 @@ impl Default for OTLPEndpointConfig {
}
}
}

#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize)]
pub struct PersistentLogConfig {
pub on: bool,
pub interval: usize,
pub stage_name: String,
pub retention: usize,
pub level: String,
}

impl Display for PersistentLogConfig {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
write!(
f,
"enabled={}, interval={}, stage_name={}, retention={}, level={}",
self.on, self.interval, self.stage_name, self.retention, self.level
)
}
}

impl Default for PersistentLogConfig {
fn default() -> Self {
Self {
on: false,
interval: 8,
stage_name: "log_1f93b76af0bd4b1d8e018667865fbc65".to_string(),
retention: 24,
level: "WARN".to_string(),
}
}
}
46 changes: 44 additions & 2 deletions src/common/tracing/src/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@

use std::borrow::Cow;
use std::collections::BTreeMap;
use std::sync::Arc;
use std::time::Duration;

use databend_common_base::base::tokio;
use databend_common_base::base::tokio::sync::RwLock;
use databend_common_base::base::GlobalInstance;
use databend_common_base::runtime::Thread;
use fastrace::prelude::*;
Expand All @@ -25,24 +27,54 @@ use logforth::filter::env::EnvFilterBuilder;
use logforth::filter::EnvFilter;
use logforth::Dispatch;
use logforth::Logger;
use opendal::Operator;
use opentelemetry_otlp::WithExportConfig;

use crate::config::OTLPProtocol;
use crate::loggers::get_layout;
use crate::loggers::new_rolling_file_appender;
use crate::remote_log::RemoteLog;
use crate::structlog::StructLogReporter;
use crate::Config;

const HEADER_TRACE_PARENT: &str = "traceparent";

pub struct GlobalLogger {
_drop_guards: Vec<Box<dyn Send + Sync + 'static>>,
pub remote_log_operator: RwLock<Option<Operator>>,
}

impl GlobalLogger {
pub fn init(name: &str, cfg: &Config, labels: BTreeMap<String, String>) {
let _drop_guards = init_logging(name, cfg, labels);
GlobalInstance::set(Self { _drop_guards });

// GlobalLogger is initialized before DataOperator, so set the operator to None first
let remote_log_operator = RwLock::new(None);

let instance = Arc::new(Self {
_drop_guards,
remote_log_operator,
});
GlobalInstance::set(instance);
}

pub fn instance() -> Arc<GlobalLogger> {
GlobalInstance::get()
}

// Get the operator for remote log when it is ready.
pub(crate) async fn get_operator(&self) -> Option<Operator> {
let operator = self.remote_log_operator.read().await;
if let Some(operator) = operator.as_ref() {
return Some(operator.clone());
}
None
}

// Set the operator for remote log, this should be only called once
pub async fn set_operator(&self, operator: Operator) {
let mut remote_log_operator = self.remote_log_operator.write().await;
*remote_log_operator = Some(operator);
}
}

Expand Down Expand Up @@ -100,7 +132,6 @@ pub fn init_logging(
}
),
};

// initialize tracing a reporter
if cfg.tracing.on {
let endpoint = cfg.tracing.otlp.endpoint.clone();
Expand Down Expand Up @@ -345,6 +376,17 @@ pub fn init_logging(
logger = logger.dispatch(dispatch);
}

if cfg.persistentlog.on && log_name.starts_with("databend-query") {
let (remote_log, flush_guard) =
RemoteLog::new(&labels, cfg).expect("initialize remote logger");
let dispatch = Dispatch::new()
.filter(env_filter(&cfg.persistentlog.level))
.append(remote_log);

logger = logger.dispatch(dispatch);
_drop_guards.push(flush_guard);
}

// set global logger
if logger.apply().is_err() {
eprintln!("logger has already been set");
Expand Down
5 changes: 5 additions & 0 deletions src/common/tracing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ mod crash_hook;
mod init;
mod loggers;
mod panic_hook;
mod remote_log;
mod structlog;

pub use crash_hook::pipe_file;
Expand All @@ -32,6 +33,7 @@ pub use crate::config::FileConfig;
pub use crate::config::OTLPConfig;
pub use crate::config::OTLPEndpointConfig;
pub use crate::config::OTLPProtocol;
pub use crate::config::PersistentLogConfig;
pub use crate::config::ProfileLogConfig;
pub use crate::config::QueryLogConfig;
pub use crate::config::StderrConfig;
Expand All @@ -45,6 +47,9 @@ pub use crate::init::start_trace_for_remote_request;
pub use crate::init::GlobalLogger;
pub use crate::panic_hook::log_panic;
pub use crate::panic_hook::set_panic_hook;
pub use crate::remote_log::convert_to_batch;
pub use crate::remote_log::RemoteLog;
pub use crate::remote_log::RemoteLogElement;
pub use crate::structlog::DummyReporter;
pub use crate::structlog::StructLogReporter;

Expand Down
Loading
Loading