Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add db metrics #445

Merged
merged 9 commits into from
Mar 24, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 127 additions & 29 deletions auction-server/src/auction/repository/models.rs
Original file line number Diff line number Diff line change
@@ -59,10 +59,7 @@ use {
PrimitiveDateTime,
UtcOffset,
},
tracing::{
info_span,
Instrument,
},
tracing::instrument,
};

#[derive(Clone, Debug, PartialEq, PartialOrd, sqlx::Type)]
@@ -648,6 +645,17 @@ pub trait Database<T: ChainTrait>: Debug + Send + Sync + 'static {

#[async_trait]
impl<T: ChainTrait> Database<T> for DB {
#[instrument(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RN in the traces, the name is add_auction. I think it's a good idea to set the name as db_add_auction in the traces. In the metrics I think it's good to keep the name as it is RN, because all of these queries are in the same category (db_queries).

If you set the name in the outer scope (same level as the target), it'll be used for tracing. the name in the fields variable is being used only by metrics as the traces is not using this field name for the trace name :D

So you can update the code like below and this issue should be fixed:

#[instrument(
        target = "metrics",
        name = "db_add_auction",
        fields(
            category = "db_queries",
            result = "success",
            name = "add_auction",
            tracing_enabled
        ),
        skip_all
    )]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup sgtm

target = "metrics",
name = "db_add_auction",
fields(
category = "db_queries",
result = "success",
name = "add_auction",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we should name this something else, to distinguish it from the instrument name. This name field influences the name of the metrics, but the instrument/span name is what determines the naming of the traces.

tracing_enabled
),
skip_all
)]
async fn add_auction(&self, auction: &entities::Auction<T>) -> anyhow::Result<()> {
sqlx::query!(
"INSERT INTO auction (id, creation_time, permission_key, chain_id, chain_type, bid_collection_time, tx_hash) VALUES ($1, $2, $3, $4, $5, $6, $7)",
@@ -660,11 +668,24 @@ impl<T: ChainTrait> Database<T> for DB {
auction.tx_hash.clone().map(|tx_hash| T::BidStatusType::convert_tx_hash(&tx_hash)),
)
.execute(self)
.instrument(info_span!("db_add_auction"))
.await?;
.await
.inspect_err(|_| {
tracing::Span::current().record("result", "error");
})?;
Ok(())
}

#[instrument(
target = "metrics",
name = "db_add_bid",
fields(
category = "db_queries",
result = "success",
name = "add_bid",
tracing_enabled
),
skip_all
)]
async fn add_bid(&self, bid: &Bid<T>) -> Result<(), RestError> {
sqlx::query!("INSERT INTO bid (id, creation_time, permission_key, chain_id, chain_type, bid_amount, status, initiation_time, profile_id, metadata) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)",
bid.id,
@@ -678,14 +699,26 @@ impl<T: ChainTrait> Database<T> for DB {
bid.profile_id,
serde_json::to_value(bid.metadata.clone()).expect("Failed to serialize metadata"),
).execute(self)
.instrument(info_span!("db_add_bid"))
.await.map_err(|e| {
.await
.map_err(|e| {
tracing::Span::current().record("result", "error");
tracing::error!(error = e.to_string(), bid = ?bid, "DB: Failed to insert bid");
RestError::TemporarilyUnavailable
})?;
Ok(())
}

#[instrument(
target = "metrics",
name = "db_conclude_auction",
fields(
category = "db_queries",
result = "success",
name = "conclude_auction",
tracing_enabled
),
skip_all
)]
async fn conclude_auction(&self, auction_id: entities::AuctionId) -> anyhow::Result<()> {
let now = OffsetDateTime::now_utc();
sqlx::query!(
@@ -694,25 +727,38 @@ impl<T: ChainTrait> Database<T> for DB {
auction_id,
)
.execute(self)
.instrument(info_span!("db_conclude_auction"))
.await?;
.await
.inspect_err(|_| {
tracing::Span::current().record("result", "error");
})?;
Ok(())
}

#[instrument(
target = "metrics",
name = "db_get_bid",
fields(
category = "db_queries",
result = "success",
name = "get_bid",
tracing_enabled
),
skip_all
)]
async fn get_bid(
&self,
bid_id: entities::BidId,
chain_id: ChainId,
) -> Result<Bid<T>, RestError> {
sqlx::query_as("SELECT * FROM bid WHERE id = $1 AND chain_id = $2")
.bind(bid_id)
.bind(chain_id)
.bind(&chain_id)
.fetch_one(self)
.instrument(info_span!("db_get_bid"))
.await
.map_err(|e| match e {
sqlx::Error::RowNotFound => RestError::BidNotFound,
_ => {
tracing::Span::current().record("result", "error");
tracing::error!(
error = e.to_string(),
bid_id = bid_id.to_string(),
@@ -723,13 +769,24 @@ impl<T: ChainTrait> Database<T> for DB {
})
}

#[instrument(
target = "metrics",
name = "db_get_auction",
fields(
category = "db_queries",
result = "success",
name = "get_auction",
tracing_enabled
),
skip_all
)]
async fn get_auction(&self, auction_id: entities::AuctionId) -> Result<Auction, RestError> {
sqlx::query_as("SELECT * FROM auction WHERE id = $1")
.bind(auction_id)
.fetch_one(self)
.instrument(info_span!("db_get_auction"))
.await
.map_err(|e| {
tracing::Span::current().record("result", "error");
tracing::error!(
error = e.to_string(),
auction_id = auction_id.to_string(),
@@ -739,20 +796,42 @@ impl<T: ChainTrait> Database<T> for DB {
})
}

#[instrument(
target = "metrics",
name = "db_get_auctions_by_bids",
fields(
category = "db_queries",
result = "success",
name = "get_auctions_by_bids",
tracing_enabled
),
skip_all
)]
async fn get_auctions_by_bids(&self, bids: &[Bid<T>]) -> Result<Vec<Auction>, RestError> {
let auction_ids: Vec<entities::AuctionId> =
bids.iter().filter_map(|bid| bid.auction_id).collect();
sqlx::query_as("SELECT * FROM auction WHERE id = ANY($1)")
.bind(auction_ids)
.fetch_all(self)
.instrument(info_span!("db_get_auctions_by_bids"))
.await
.map_err(|e| {
tracing::Span::current().record("result", "error");
tracing::error!("DB: Failed to fetch auctions: {}", e);
RestError::TemporarilyUnavailable
})
}

#[instrument(
target = "metrics",
name = "db_get_bids",
fields(
category = "db_queries",
result = "success",
name = "get_bids",
tracing_enabled
),
skip_all
)]
async fn get_bids(
&self,
chain_id: ChainId,
@@ -769,17 +848,24 @@ impl<T: ChainTrait> Database<T> for DB {
query.push_bind(from_time);
}
query.push(" ORDER BY initiation_time ASC LIMIT 20");
query
.build_query_as()
.fetch_all(self)
.instrument(info_span!("db_get_bids"))
.await
.map_err(|e| {
tracing::error!("DB: Failed to fetch bids: {}", e);
RestError::TemporarilyUnavailable
})
query.build_query_as().fetch_all(self).await.map_err(|e| {
tracing::Span::current().record("result", "error");
tracing::error!("DB: Failed to fetch bids: {}", e);
RestError::TemporarilyUnavailable
})
}

#[instrument(
target = "metrics",
name = "db_submit_auction",
fields(
category = "db_queries",
result = "success",
name = "submit_auction",
tracing_enabled
),
skip_all
)]
async fn submit_auction(
&self,
auction: &entities::Auction<T>,
@@ -793,20 +879,32 @@ impl<T: ChainTrait> Database<T> for DB {
PrimitiveDateTime::new(now.date(), now.time()),
T::BidStatusType::convert_tx_hash(transaction_hash),
auction.id,
).execute(self).instrument(info_span!("db_update_auction")).await?;
).execute(self).await.inspect_err(|_| {
tracing::Span::current().record("result", "error");
})?;
Ok(auction)
}

#[instrument(
target = "metrics",
name = "db_update_bid_status",
fields(
category = "db_queries",
result = "success",
name = "update_bid_status",
tracing_enabled
),
skip_all
)]
async fn update_bid_status(
&self,
bid: &entities::Bid<T>,
new_status: &T::BidStatusType,
) -> anyhow::Result<bool> {
let update_query = T::get_update_bid_query(bid, new_status.clone())?;
let query_result = update_query
.execute(self)
.instrument(info_span!("db_update_bid_status"))
.await?;
Ok(query_result.rows_affected() > 0)
let result = update_query.execute(self).await.inspect_err(|_| {
tracing::Span::current().record("result", "error");
})?;
Ok(result.rows_affected() > 0)
}
}
8 changes: 4 additions & 4 deletions auction-server/src/main.rs
Original file line number Diff line number Diff line change
@@ -74,9 +74,9 @@ async fn main() -> Result<()> {
let telemetry = tracing_opentelemetry::layer().with_tracer(tracer);

let registry = tracing_subscriber::registry()
.with(MetricsLayer.with_filter(filter::filter_fn(is_metrics)))
.with(MetricsLayer.with_filter(filter::filter_fn(|metadata| is_metrics(metadata, false))))
.with(telemetry.with_filter(filter::filter_fn(|metadata| {
!is_metrics(metadata) && is_internal(metadata)
is_internal(metadata) || is_metrics(metadata, true)
})));

if std::io::stderr().is_terminal() {
@@ -86,7 +86,7 @@ async fn main() -> Result<()> {
.compact()
.with_filter(LevelFilter::INFO)
.with_filter(filter::filter_fn(|metadata| {
!is_metrics(metadata) && is_internal(metadata)
is_internal(metadata) || is_metrics(metadata, true)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

later we can add more flags to distinguish btwn logging and telemetry

})),
)
.init();
@@ -97,7 +97,7 @@ async fn main() -> Result<()> {
.json()
.with_filter(LevelFilter::INFO)
.with_filter(filter::filter_fn(|metadata| {
!is_metrics(metadata) && is_internal(metadata)
is_internal(metadata) || is_metrics(metadata, true)
})),
)
.init();
70 changes: 57 additions & 13 deletions auction-server/src/opportunity/repository/models.rs
Original file line number Diff line number Diff line change
@@ -55,10 +55,7 @@ use {
OffsetDateTime,
PrimitiveDateTime,
},
tracing::{
info_span,
Instrument,
},
tracing::instrument,
uuid::Uuid,
};

@@ -178,6 +175,17 @@ pub trait Database<T: InMemoryStore>: Debug + Send + Sync + 'static {
}
#[async_trait]
impl<T: InMemoryStore> Database<T> for DB {
#[instrument(
target = "metrics",
name = "db_add_opportunity",
fields(
category = "db_queries",
result = "success",
name = "add_opportunity",
tracing_enabled
),
skip_all
)]
async fn add_opportunity(&self, opportunity: &T::Opportunity) -> Result<(), RestError> {
let metadata = opportunity.get_models_metadata();
let chain_type = <T::Opportunity as entities::Opportunity>::ModelMetadata::get_chain_type();
@@ -198,15 +206,25 @@ impl<T: InMemoryStore> Database<T> for DB {
serde_json::to_value(&opportunity.sell_tokens).expect("Failed to serialize sell_tokens"),
serde_json::to_value(&opportunity.buy_tokens).expect("Failed to serialize buy_tokens"))
.execute(self)
.instrument(info_span!("db_add_opportunity"))
.await
.map_err(|e| {
.await.map_err(|e| {
tracing::Span::current().record("result", "error");
tracing::error!("DB: Failed to insert opportunity: {}", e);
RestError::TemporarilyUnavailable
})?;
Ok(())
}

#[instrument(
target = "metrics",
name = "db_get_opportunities",
fields(
category = "db_queries",
result = "success",
name = "get_opportunities",
tracing_enabled
),
skip_all
)]
async fn get_opportunities(
&self,
chain_id: ChainId,
@@ -232,9 +250,9 @@ impl<T: InMemoryStore> Database<T> for DB {
let opps: Vec<models::Opportunity<<T::Opportunity as entities::Opportunity>::ModelMetadata>> = query
.build_query_as()
.fetch_all(self)
.instrument(info_span!("db_get_opportunities"))
.await
.map_err(|e| {
tracing::Span::current().record("result", "error");
tracing::error!(
"DB: Failed to fetch opportunities: {} - chain_id: {:?} - permission_key: {:?} - from_time: {:?}",
e,
@@ -259,6 +277,17 @@ impl<T: InMemoryStore> Database<T> for DB {
)).collect()
}

#[instrument(
target = "metrics",
name = "db_remove_opportunities",
fields(
category = "db_queries",
result = "success",
name = "remove_opportunities",
tracing_enabled
),
skip_all
)]
async fn remove_opportunities(
&self,
permission_key: PermissionKey,
@@ -270,13 +299,26 @@ impl<T: InMemoryStore> Database<T> for DB {
.bind(PrimitiveDateTime::new(now.date(), now.time()))
.bind(reason)
.bind(permission_key.as_ref())
.bind(chain_id)
.bind(&chain_id)
.execute(self)
.instrument(info_span!("db_remove_opportunities"))
.await?;
.await
.inspect_err(|_| {
tracing::Span::current().record("result", "error");
})?;
Ok(())
}

#[instrument(
target = "metrics",
name = "db_remove_opportunity",
fields(
category = "db_queries",
result = "success",
name = "remove_opportunity",
tracing_enabled
),
skip_all
)]
async fn remove_opportunity(
&self,
opportunity: &T::Opportunity,
@@ -288,8 +330,10 @@ impl<T: InMemoryStore> Database<T> for DB {
.bind(reason)
.bind(opportunity.id)
.execute(self)
.instrument(info_span!("db_remove_opportunity"))
.await?;
.await
.inspect_err(|_| {
tracing::Span::current().record("result", "error");
})?;
Ok(())
}
}
9 changes: 7 additions & 2 deletions auction-server/src/per_metrics.rs
Original file line number Diff line number Diff line change
@@ -91,8 +91,13 @@ impl MetricsLayerData {
}
}

pub fn is_metrics(metadata: &Metadata) -> bool {
metadata.target().starts_with("metrics")
pub fn is_metrics(metadata: &Metadata, check_tracing_enabled: bool) -> bool {
let tracing_check = !check_tracing_enabled
|| metadata
.fields()
.iter()
.any(|f| f.name() == "tracing_enabled");
tracing_check && (metadata.target().starts_with("metrics"))
}

impl<S> Layer<S> for MetricsLayer