From 3d31131e20e09bd20fa6c5eebd8c32fe65f251df Mon Sep 17 00:00:00 2001 From: --systemdf Date: Fri, 21 Mar 2025 16:29:00 +0900 Subject: [PATCH 1/9] add db metrics --- .../auction/repository/conclude_auction.rs | 2 +- .../src/auction/repository/get_bid.rs | 2 +- .../src/auction/repository/get_bids.rs | 2 +- .../src/auction/repository/models.rs | 169 +++++++++++++++--- .../src/opportunity/repository/models.rs | 50 +++++- 5 files changed, 196 insertions(+), 29 deletions(-) diff --git a/auction-server/src/auction/repository/conclude_auction.rs b/auction-server/src/auction/repository/conclude_auction.rs index 84828114..2fe38e5d 100644 --- a/auction-server/src/auction/repository/conclude_auction.rs +++ b/auction-server/src/auction/repository/conclude_auction.rs @@ -12,7 +12,7 @@ impl Repository { #[tracing::instrument(skip_all, name = "conclude_auction_repo", fields(auction_id))] pub async fn conclude_auction(&self, auction_id: entities::AuctionId) -> anyhow::Result<()> { tracing::Span::current().record("auction_id", auction_id.to_string()); - self.db.conclude_auction(auction_id).await?; + self.db.conclude_auction(auction_id, &self.chain_id).await?; self.remove_in_memory_auction(auction_id).await; Ok(()) } diff --git a/auction-server/src/auction/repository/get_bid.rs b/auction-server/src/auction/repository/get_bid.rs index 2c54dbb6..d49f2cfe 100644 --- a/auction-server/src/auction/repository/get_bid.rs +++ b/auction-server/src/auction/repository/get_bid.rs @@ -13,7 +13,7 @@ impl Repository { pub async fn get_bid(&self, bid_id: entities::BidId) -> Result, RestError> { let bid = self.db.get_bid(bid_id, self.chain_id.clone()).await?; let auction = match bid.auction_id { - Some(auction_id) => Some(self.db.get_auction(auction_id).await?), + Some(auction_id) => Some(self.db.get_auction(auction_id, &self.chain_id).await?), None => None, }; diff --git a/auction-server/src/auction/repository/get_bids.rs b/auction-server/src/auction/repository/get_bids.rs index 8176b2b3..75ed1ad3 100644 --- a/auction-server/src/auction/repository/get_bids.rs +++ b/auction-server/src/auction/repository/get_bids.rs @@ -21,7 +21,7 @@ impl Repository { .db .get_bids(self.chain_id.clone(), profile_id, from_time) .await?; - let auctions = self.db.get_auctions_by_bids(&bids).await?; + let auctions = self.db.get_auctions_by_bids(&bids, &self.chain_id).await?; Ok(bids .into_iter() diff --git a/auction-server/src/auction/repository/models.rs b/auction-server/src/auction/repository/models.rs index 3bf3a1be..b0b5721b 100644 --- a/auction-server/src/auction/repository/models.rs +++ b/auction-server/src/auction/repository/models.rs @@ -21,6 +21,7 @@ use { models::ProfileId, }, axum::async_trait, + axum_prometheus::metrics, ethers::types::{ Address, Bytes, @@ -53,6 +54,7 @@ use { num::ParseIntError, ops::Deref, str::FromStr, + time::Instant, }, time::{ OffsetDateTime, @@ -99,6 +101,21 @@ pub enum BidStatus { Cancelled, } +impl std::fmt::Display for BidStatus { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + BidStatus::Pending => write!(f, "pending"), + BidStatus::AwaitingSignature => write!(f, "awaiting_signature"), + BidStatus::Submitted => write!(f, "submitted"), + BidStatus::Lost => write!(f, "lost"), + BidStatus::Won => write!(f, "won"), + BidStatus::Failed => write!(f, "failed"), + BidStatus::Expired => write!(f, "expired"), + BidStatus::Cancelled => write!(f, "cancelled"), + } + } +} + #[derive(Clone, Debug, Serialize, Deserialize)] pub struct BundleIndex(pub Option); impl Deref for BundleIndex { @@ -620,14 +637,26 @@ impl> Bid { pub trait Database: Debug + Send + Sync + 'static { async fn add_auction(&self, auction: &entities::Auction) -> anyhow::Result<()>; async fn add_bid(&self, bid: &Bid) -> Result<(), RestError>; - async fn conclude_auction(&self, auction_id: entities::AuctionId) -> anyhow::Result<()>; + async fn conclude_auction( + &self, + auction_id: entities::AuctionId, + chain_id: &ChainId, + ) -> anyhow::Result<()>; async fn get_bid( &self, bid_id: entities::BidId, chain_id: ChainId, ) -> Result, RestError>; - async fn get_auction(&self, auction_id: entities::AuctionId) -> Result; - async fn get_auctions_by_bids(&self, bids: &[Bid]) -> Result, RestError>; + async fn get_auction( + &self, + auction_id: entities::AuctionId, + chain_id: &ChainId, + ) -> Result; + async fn get_auctions_by_bids( + &self, + bids: &[Bid], + chain_id: &ChainId, + ) -> Result, RestError>; async fn get_bids( &self, chain_id: ChainId, @@ -649,7 +678,8 @@ pub trait Database: Debug + Send + Sync + 'static { #[async_trait] impl Database for DB { async fn add_auction(&self, auction: &entities::Auction) -> anyhow::Result<()> { - sqlx::query!( + let start = Instant::now(); + let query_result = sqlx::query!( "INSERT INTO auction (id, creation_time, permission_key, chain_id, chain_type, bid_collection_time, tx_hash) VALUES ($1, $2, $3, $4, $5, $6, $7)", auction.id, PrimitiveDateTime::new(auction.creation_time.date(), auction.creation_time.time()), @@ -662,11 +692,23 @@ impl Database for DB { .execute(self) .instrument(info_span!("db_add_auction")) .await?; + let latency = start.elapsed().as_secs_f64(); + let labels = [ + ("chain_id", auction.chain_id.to_string()), + ("db_query", "add_auction".to_string()), + ( + "made_change", + (query_result.rows_affected() > 0).to_string(), + ), + ]; + metrics::counter!("db_queries_total", &labels).increment(1); + metrics::histogram!("db_queries_latency_seconds", &labels).record(latency); Ok(()) } async fn add_bid(&self, bid: &Bid) -> Result<(), RestError> { - sqlx::query!("INSERT INTO bid (id, creation_time, permission_key, chain_id, chain_type, bid_amount, status, initiation_time, profile_id, metadata) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)", + let start = Instant::now(); + let query_result = sqlx::query!("INSERT INTO bid (id, creation_time, permission_key, chain_id, chain_type, bid_amount, status, initiation_time, profile_id, metadata) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)", bid.id, bid.creation_time, bid.permission_key, @@ -683,12 +725,28 @@ impl Database for DB { tracing::error!(error = e.to_string(), bid = ?bid, "DB: Failed to insert bid"); RestError::TemporarilyUnavailable })?; + let latency = start.elapsed().as_secs_f64(); + let labels = [ + ("chain_id", bid.chain_id.to_string()), + ("db_query", "add_bid".to_string()), + ( + "made_change", + (query_result.rows_affected() > 0).to_string(), + ), + ]; + metrics::counter!("db_queries_total", &labels).increment(1); + metrics::histogram!("db_queries_latency_seconds", &labels).record(latency); Ok(()) } - async fn conclude_auction(&self, auction_id: entities::AuctionId) -> anyhow::Result<()> { + async fn conclude_auction( + &self, + auction_id: entities::AuctionId, + chain_id: &ChainId, + ) -> anyhow::Result<()> { + let start = Instant::now(); let now = OffsetDateTime::now_utc(); - sqlx::query!( + let query_result = sqlx::query!( "UPDATE auction SET conclusion_time = $1 WHERE id = $2 AND conclusion_time IS NULL", PrimitiveDateTime::new(now.date(), now.time()), auction_id, @@ -696,6 +754,17 @@ impl Database for DB { .execute(self) .instrument(info_span!("db_conclude_auction")) .await?; + let latency = start.elapsed().as_secs_f64(); + let labels = [ + ("chain_id", chain_id.to_string()), + ("db_query", "conclude_auction".to_string()), + ( + "made_change", + (query_result.rows_affected() > 0).to_string(), + ), + ]; + metrics::counter!("db_queries_total", &labels).increment(1); + metrics::histogram!("db_queries_latency_seconds", &labels).record(latency); Ok(()) } @@ -704,9 +773,10 @@ impl Database for DB { bid_id: entities::BidId, chain_id: ChainId, ) -> Result, RestError> { - sqlx::query_as("SELECT * FROM bid WHERE id = $1 AND chain_id = $2") + let start = Instant::now(); + let result = sqlx::query_as("SELECT * FROM bid WHERE id = $1 AND chain_id = $2") .bind(bid_id) - .bind(chain_id) + .bind(&chain_id) .fetch_one(self) .instrument(info_span!("db_get_bid")) .await @@ -720,11 +790,24 @@ impl Database for DB { ); RestError::TemporarilyUnavailable } - }) - } - - async fn get_auction(&self, auction_id: entities::AuctionId) -> Result { - sqlx::query_as("SELECT * FROM auction WHERE id = $1") + }); + let latency = start.elapsed().as_secs_f64(); + let labels = [ + ("chain_id", chain_id.to_string()), + ("db_query", "get_bid".to_string()), + ]; + metrics::counter!("db_queries_total", &labels).increment(1); + metrics::histogram!("db_queries_latency_seconds", &labels).record(latency); + result + } + + async fn get_auction( + &self, + auction_id: entities::AuctionId, + chain_id: &ChainId, + ) -> Result { + let start = Instant::now(); + let result = sqlx::query_as("SELECT * FROM auction WHERE id = $1") .bind(auction_id) .fetch_one(self) .instrument(info_span!("db_get_auction")) @@ -736,13 +819,26 @@ impl Database for DB { "Failed to get auction from db" ); RestError::TemporarilyUnavailable - }) - } - - async fn get_auctions_by_bids(&self, bids: &[Bid]) -> Result, RestError> { + }); + let latency = start.elapsed().as_secs_f64(); + let labels = [ + ("chain_id", chain_id.to_string()), + ("db_query", "get_auction".to_string()), + ]; + metrics::counter!("db_queries_total", &labels).increment(1); + metrics::histogram!("db_queries_latency_seconds", &labels).record(latency); + result + } + + async fn get_auctions_by_bids( + &self, + bids: &[Bid], + chain_id: &ChainId, + ) -> Result, RestError> { let auction_ids: Vec = bids.iter().filter_map(|bid| bid.auction_id).collect(); - sqlx::query_as("SELECT * FROM auction WHERE id = ANY($1)") + let start = Instant::now(); + let result = sqlx::query_as("SELECT * FROM auction WHERE id = ANY($1)") .bind(auction_ids) .fetch_all(self) .instrument(info_span!("db_get_auctions_by_bids")) @@ -750,7 +846,15 @@ impl Database for DB { .map_err(|e| { tracing::error!("DB: Failed to fetch auctions: {}", e); RestError::TemporarilyUnavailable - }) + }); + let latency = start.elapsed().as_secs_f64(); + let labels = [ + ("chain_id", chain_id.to_string()), + ("db_query", "get_auctions_by_bids".to_string()), + ]; + metrics::counter!("db_queries_total", &labels).increment(1); + metrics::histogram!("db_queries_latency_seconds", &labels).record(latency); + result } async fn get_bids( @@ -789,11 +893,23 @@ impl Database for DB { let now = OffsetDateTime::now_utc(); auction.tx_hash = Some(transaction_hash.clone()); auction.submission_time = Some(now); - sqlx::query!("UPDATE auction SET submission_time = $1, tx_hash = $2 WHERE id = $3 AND submission_time IS NULL", + let start = Instant::now(); + let query_result = sqlx::query!("UPDATE auction SET submission_time = $1, tx_hash = $2 WHERE id = $3 AND submission_time IS NULL", PrimitiveDateTime::new(now.date(), now.time()), T::BidStatusType::convert_tx_hash(transaction_hash), auction.id, ).execute(self).instrument(info_span!("db_update_auction")).await?; + let latency = start.elapsed().as_secs_f64(); + let labels = [ + ("chain_id", auction.chain_id.to_string()), + ("db_query", "submit_auction".to_string()), + ( + "made_change", + (query_result.rows_affected() > 0).to_string(), + ), + ]; + metrics::counter!("db_queries_total", &labels).increment(1); + metrics::histogram!("db_queries_latency_seconds", &labels).record(latency); Ok(auction) } @@ -803,10 +919,21 @@ impl Database for DB { new_status: &T::BidStatusType, ) -> anyhow::Result { let update_query = T::get_update_bid_query(bid, new_status.clone())?; + let start = Instant::now(); let query_result = update_query .execute(self) .instrument(info_span!("db_update_bid_status")) .await?; + let latency = start.elapsed().as_secs_f64(); + let made_change = query_result.rows_affected() > 0; + let labels: [(&str, String); 4] = [ + ("chain_id", bid.chain_id.to_string()), + ("db_query", "update_bid_status".to_string()), + ("status", T::convert_bid_status(new_status).to_string()), + ("made_change", made_change.to_string()), + ]; + metrics::counter!("db_queries_total", &labels).increment(1); + metrics::histogram!("db_queries_latency_seconds", &labels).record(latency); Ok(query_result.rows_affected() > 0) } } diff --git a/auction-server/src/opportunity/repository/models.rs b/auction-server/src/opportunity/repository/models.rs index 9e0968f7..e5172763 100644 --- a/auction-server/src/opportunity/repository/models.rs +++ b/auction-server/src/opportunity/repository/models.rs @@ -23,6 +23,7 @@ use { }, }, axum::async_trait, + axum_prometheus::metrics, ethers::types::{ Address, Bytes, @@ -50,7 +51,10 @@ use { }, QueryBuilder, }, - std::fmt::Debug, + std::{ + fmt::Debug, + time::Instant, + }, time::{ OffsetDateTime, PrimitiveDateTime, @@ -181,7 +185,8 @@ impl Database for DB { async fn add_opportunity(&self, opportunity: &T::Opportunity) -> Result<(), RestError> { let metadata = opportunity.get_models_metadata(); let chain_type = ::ModelMetadata::get_chain_type(); - sqlx::query!("INSERT INTO opportunity (id, + let start = Instant::now(); + let query_result = sqlx::query!("INSERT INTO opportunity (id, creation_time, permission_key, chain_id, @@ -204,6 +209,17 @@ impl Database for DB { tracing::error!("DB: Failed to insert opportunity: {}", e); RestError::TemporarilyUnavailable })?; + let latency = start.elapsed().as_secs_f64(); + let labels = [ + ("chain_id", opportunity.chain_id.to_string()), + ("db_query", "add_opportunity".to_string()), + ( + "made_change", + (query_result.rows_affected() > 0).to_string(), + ), + ]; + metrics::counter!("db_queries_total", &labels).increment(1); + metrics::histogram!("db_queries_duration_seconds", &labels).record(latency); Ok(()) } @@ -265,15 +281,27 @@ impl Database for DB { chain_id: ChainId, reason: OpportunityRemovalReason, ) -> anyhow::Result<()> { + let start = Instant::now(); let now = OffsetDateTime::now_utc(); - sqlx::query("UPDATE opportunity SET removal_time = $1, removal_reason = $2 WHERE permission_key = $3 AND chain_id = $4 and removal_time IS NULL") + let query_result = sqlx::query("UPDATE opportunity SET removal_time = $1, removal_reason = $2 WHERE permission_key = $3 AND chain_id = $4 and removal_time IS NULL") .bind(PrimitiveDateTime::new(now.date(), now.time())) .bind(reason) .bind(permission_key.as_ref()) - .bind(chain_id) + .bind(&chain_id) .execute(self) .instrument(info_span!("db_remove_opportunities")) .await?; + let latency = start.elapsed().as_secs_f64(); + let labels = [ + ("chain_id", chain_id.to_string()), + ("db_query", "remove_opportunities".to_string()), + ( + "made_change", + (query_result.rows_affected() > 0).to_string(), + ), + ]; + metrics::counter!("db_queries_total", &labels).increment(1); + metrics::histogram!("db_queries_duration_seconds", &labels).record(latency); Ok(()) } @@ -282,14 +310,26 @@ impl Database for DB { opportunity: &T::Opportunity, reason: OpportunityRemovalReason, ) -> anyhow::Result<()> { + let start = Instant::now(); let now = OffsetDateTime::now_utc(); - sqlx::query("UPDATE opportunity SET removal_time = $1, removal_reason = $2 WHERE id = $3 AND removal_time IS NULL") + let query_result = sqlx::query("UPDATE opportunity SET removal_time = $1, removal_reason = $2 WHERE id = $3 AND removal_time IS NULL") .bind(PrimitiveDateTime::new(now.date(), now.time())) .bind(reason) .bind(opportunity.id) .execute(self) .instrument(info_span!("db_remove_opportunity")) .await?; + let latency = start.elapsed().as_secs_f64(); + let labels = [ + ("chain_id", opportunity.chain_id.to_string()), + ("db_query", "remove_opportunity".to_string()), + ( + "made_change", + (query_result.rows_affected() > 0).to_string(), + ), + ]; + metrics::counter!("db_queries_total", &labels).increment(1); + metrics::histogram!("db_queries_duration_seconds", &labels).record(latency); Ok(()) } } From 7ceb3f95db378165256bfd55bf5cfbf807753e67 Mon Sep 17 00:00:00 2001 From: --systemdf Date: Fri, 21 Mar 2025 18:32:44 +0900 Subject: [PATCH 2/9] convert to macro --- .../auction/repository/conclude_auction.rs | 2 +- .../src/auction/repository/get_bid.rs | 2 +- .../src/auction/repository/get_bids.rs | 2 +- .../src/auction/repository/models.rs | 222 ++++++++---------- .../src/opportunity/repository/models.rs | 79 +++---- 5 files changed, 131 insertions(+), 176 deletions(-) diff --git a/auction-server/src/auction/repository/conclude_auction.rs b/auction-server/src/auction/repository/conclude_auction.rs index 2fe38e5d..84828114 100644 --- a/auction-server/src/auction/repository/conclude_auction.rs +++ b/auction-server/src/auction/repository/conclude_auction.rs @@ -12,7 +12,7 @@ impl Repository { #[tracing::instrument(skip_all, name = "conclude_auction_repo", fields(auction_id))] pub async fn conclude_auction(&self, auction_id: entities::AuctionId) -> anyhow::Result<()> { tracing::Span::current().record("auction_id", auction_id.to_string()); - self.db.conclude_auction(auction_id, &self.chain_id).await?; + self.db.conclude_auction(auction_id).await?; self.remove_in_memory_auction(auction_id).await; Ok(()) } diff --git a/auction-server/src/auction/repository/get_bid.rs b/auction-server/src/auction/repository/get_bid.rs index d49f2cfe..2c54dbb6 100644 --- a/auction-server/src/auction/repository/get_bid.rs +++ b/auction-server/src/auction/repository/get_bid.rs @@ -13,7 +13,7 @@ impl Repository { pub async fn get_bid(&self, bid_id: entities::BidId) -> Result, RestError> { let bid = self.db.get_bid(bid_id, self.chain_id.clone()).await?; let auction = match bid.auction_id { - Some(auction_id) => Some(self.db.get_auction(auction_id, &self.chain_id).await?), + Some(auction_id) => Some(self.db.get_auction(auction_id).await?), None => None, }; diff --git a/auction-server/src/auction/repository/get_bids.rs b/auction-server/src/auction/repository/get_bids.rs index 75ed1ad3..8176b2b3 100644 --- a/auction-server/src/auction/repository/get_bids.rs +++ b/auction-server/src/auction/repository/get_bids.rs @@ -21,7 +21,7 @@ impl Repository { .db .get_bids(self.chain_id.clone(), profile_id, from_time) .await?; - let auctions = self.db.get_auctions_by_bids(&bids, &self.chain_id).await?; + let auctions = self.db.get_auctions_by_bids(&bids).await?; Ok(bids .into_iter() diff --git a/auction-server/src/auction/repository/models.rs b/auction-server/src/auction/repository/models.rs index b0b5721b..d911500a 100644 --- a/auction-server/src/auction/repository/models.rs +++ b/auction-server/src/auction/repository/models.rs @@ -21,7 +21,6 @@ use { models::ProfileId, }, axum::async_trait, - axum_prometheus::metrics, ethers::types::{ Address, Bytes, @@ -54,7 +53,6 @@ use { num::ParseIntError, ops::Deref, str::FromStr, - time::Instant, }, time::{ OffsetDateTime, @@ -63,6 +61,7 @@ use { }, tracing::{ info_span, + instrument, Instrument, }, }; @@ -637,26 +636,14 @@ impl> Bid { pub trait Database: Debug + Send + Sync + 'static { async fn add_auction(&self, auction: &entities::Auction) -> anyhow::Result<()>; async fn add_bid(&self, bid: &Bid) -> Result<(), RestError>; - async fn conclude_auction( - &self, - auction_id: entities::AuctionId, - chain_id: &ChainId, - ) -> anyhow::Result<()>; + async fn conclude_auction(&self, auction_id: entities::AuctionId) -> anyhow::Result<()>; async fn get_bid( &self, bid_id: entities::BidId, chain_id: ChainId, ) -> Result, RestError>; - async fn get_auction( - &self, - auction_id: entities::AuctionId, - chain_id: &ChainId, - ) -> Result; - async fn get_auctions_by_bids( - &self, - bids: &[Bid], - chain_id: &ChainId, - ) -> Result, RestError>; + async fn get_auction(&self, auction_id: entities::AuctionId) -> Result; + async fn get_auctions_by_bids(&self, bids: &[Bid]) -> Result, RestError>; async fn get_bids( &self, chain_id: ChainId, @@ -677,9 +664,13 @@ pub trait Database: Debug + Send + Sync + 'static { #[async_trait] impl Database for DB { + #[instrument( + target = "metrics", + fields(category = "db_queries", result = "success", name = "add_auction"), + skip_all + )] async fn add_auction(&self, auction: &entities::Auction) -> anyhow::Result<()> { - let start = Instant::now(); - let query_result = sqlx::query!( + if let Err(e) = sqlx::query!( "INSERT INTO auction (id, creation_time, permission_key, chain_id, chain_type, bid_collection_time, tx_hash) VALUES ($1, $2, $3, $4, $5, $6, $7)", auction.id, PrimitiveDateTime::new(auction.creation_time.date(), auction.creation_time.time()), @@ -691,24 +682,20 @@ impl Database for DB { ) .execute(self) .instrument(info_span!("db_add_auction")) - .await?; - let latency = start.elapsed().as_secs_f64(); - let labels = [ - ("chain_id", auction.chain_id.to_string()), - ("db_query", "add_auction".to_string()), - ( - "made_change", - (query_result.rows_affected() > 0).to_string(), - ), - ]; - metrics::counter!("db_queries_total", &labels).increment(1); - metrics::histogram!("db_queries_latency_seconds", &labels).record(latency); + .await { + tracing::Span::current().record("result", "error"); + return Err(e.into()); + }; Ok(()) } + #[instrument( + target = "metrics", + fields(category = "db_queries", result = "success", name = "add_bid"), + skip_all + )] async fn add_bid(&self, bid: &Bid) -> Result<(), RestError> { - let start = Instant::now(); - let query_result = sqlx::query!("INSERT INTO bid (id, creation_time, permission_key, chain_id, chain_type, bid_amount, status, initiation_time, profile_id, metadata) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)", + if let Err(e) = sqlx::query!("INSERT INTO bid (id, creation_time, permission_key, chain_id, chain_type, bid_amount, status, initiation_time, profile_id, metadata) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)", bid.id, bid.creation_time, bid.permission_key, @@ -724,56 +711,45 @@ impl Database for DB { .await.map_err(|e| { tracing::error!(error = e.to_string(), bid = ?bid, "DB: Failed to insert bid"); RestError::TemporarilyUnavailable - })?; - let latency = start.elapsed().as_secs_f64(); - let labels = [ - ("chain_id", bid.chain_id.to_string()), - ("db_query", "add_bid".to_string()), - ( - "made_change", - (query_result.rows_affected() > 0).to_string(), - ), - ]; - metrics::counter!("db_queries_total", &labels).increment(1); - metrics::histogram!("db_queries_latency_seconds", &labels).record(latency); + }) { + tracing::Span::current().record("result", "error"); + return Err(e); + }; Ok(()) } - async fn conclude_auction( - &self, - auction_id: entities::AuctionId, - chain_id: &ChainId, - ) -> anyhow::Result<()> { - let start = Instant::now(); + #[instrument( + target = "metrics", + fields(category = "db_queries", result = "success", name = "conclude_auction"), + skip_all + )] + async fn conclude_auction(&self, auction_id: entities::AuctionId) -> anyhow::Result<()> { let now = OffsetDateTime::now_utc(); - let query_result = sqlx::query!( + if let Err(e) = sqlx::query!( "UPDATE auction SET conclusion_time = $1 WHERE id = $2 AND conclusion_time IS NULL", PrimitiveDateTime::new(now.date(), now.time()), auction_id, ) .execute(self) .instrument(info_span!("db_conclude_auction")) - .await?; - let latency = start.elapsed().as_secs_f64(); - let labels = [ - ("chain_id", chain_id.to_string()), - ("db_query", "conclude_auction".to_string()), - ( - "made_change", - (query_result.rows_affected() > 0).to_string(), - ), - ]; - metrics::counter!("db_queries_total", &labels).increment(1); - metrics::histogram!("db_queries_latency_seconds", &labels).record(latency); + .await + { + tracing::Span::current().record("result", "error"); + return Err(e.into()); + }; Ok(()) } + #[instrument( + target = "metrics", + fields(category = "db_queries", result = "success", name = "get_bid"), + skip_all + )] async fn get_bid( &self, bid_id: entities::BidId, chain_id: ChainId, ) -> Result, RestError> { - let start = Instant::now(); let result = sqlx::query_as("SELECT * FROM bid WHERE id = $1 AND chain_id = $2") .bind(bid_id) .bind(&chain_id) @@ -791,22 +767,20 @@ impl Database for DB { RestError::TemporarilyUnavailable } }); - let latency = start.elapsed().as_secs_f64(); - let labels = [ - ("chain_id", chain_id.to_string()), - ("db_query", "get_bid".to_string()), - ]; - metrics::counter!("db_queries_total", &labels).increment(1); - metrics::histogram!("db_queries_latency_seconds", &labels).record(latency); + + if let Err(e) = result { + tracing::Span::current().record("result", "error"); + return Err(e); + }; result } - async fn get_auction( - &self, - auction_id: entities::AuctionId, - chain_id: &ChainId, - ) -> Result { - let start = Instant::now(); + #[instrument( + target = "metrics", + fields(category = "db_queries", result = "success", name = "get_auction"), + skip_all + )] + async fn get_auction(&self, auction_id: entities::AuctionId) -> Result { let result = sqlx::query_as("SELECT * FROM auction WHERE id = $1") .bind(auction_id) .fetch_one(self) @@ -820,24 +794,25 @@ impl Database for DB { ); RestError::TemporarilyUnavailable }); - let latency = start.elapsed().as_secs_f64(); - let labels = [ - ("chain_id", chain_id.to_string()), - ("db_query", "get_auction".to_string()), - ]; - metrics::counter!("db_queries_total", &labels).increment(1); - metrics::histogram!("db_queries_latency_seconds", &labels).record(latency); + if let Err(e) = result { + tracing::Span::current().record("result", "error"); + return Err(e); + }; result } - async fn get_auctions_by_bids( - &self, - bids: &[Bid], - chain_id: &ChainId, - ) -> Result, RestError> { + #[instrument( + target = "metrics", + fields( + category = "db_queries", + result = "success", + name = "get_auctions_by_bids" + ), + skip_all + )] + async fn get_auctions_by_bids(&self, bids: &[Bid]) -> Result, RestError> { let auction_ids: Vec = bids.iter().filter_map(|bid| bid.auction_id).collect(); - let start = Instant::now(); let result = sqlx::query_as("SELECT * FROM auction WHERE id = ANY($1)") .bind(auction_ids) .fetch_all(self) @@ -847,13 +822,10 @@ impl Database for DB { tracing::error!("DB: Failed to fetch auctions: {}", e); RestError::TemporarilyUnavailable }); - let latency = start.elapsed().as_secs_f64(); - let labels = [ - ("chain_id", chain_id.to_string()), - ("db_query", "get_auctions_by_bids".to_string()), - ]; - metrics::counter!("db_queries_total", &labels).increment(1); - metrics::histogram!("db_queries_latency_seconds", &labels).record(latency); + if let Err(e) = result { + tracing::Span::current().record("result", "error"); + return Err(e); + } result } @@ -884,6 +856,11 @@ impl Database for DB { }) } + #[instrument( + target = "metrics", + fields(category = "db_queries", result = "success", name = "submit_auction"), + skip_all + )] async fn submit_auction( &self, auction: &entities::Auction, @@ -893,47 +870,40 @@ impl Database for DB { let now = OffsetDateTime::now_utc(); auction.tx_hash = Some(transaction_hash.clone()); auction.submission_time = Some(now); - let start = Instant::now(); - let query_result = sqlx::query!("UPDATE auction SET submission_time = $1, tx_hash = $2 WHERE id = $3 AND submission_time IS NULL", + if let Err(e) = sqlx::query!("UPDATE auction SET submission_time = $1, tx_hash = $2 WHERE id = $3 AND submission_time IS NULL", PrimitiveDateTime::new(now.date(), now.time()), T::BidStatusType::convert_tx_hash(transaction_hash), auction.id, - ).execute(self).instrument(info_span!("db_update_auction")).await?; - let latency = start.elapsed().as_secs_f64(); - let labels = [ - ("chain_id", auction.chain_id.to_string()), - ("db_query", "submit_auction".to_string()), - ( - "made_change", - (query_result.rows_affected() > 0).to_string(), - ), - ]; - metrics::counter!("db_queries_total", &labels).increment(1); - metrics::histogram!("db_queries_latency_seconds", &labels).record(latency); + ).execute(self).instrument(info_span!("db_update_auction")).await { + tracing::Span::current().record("result", "error"); + return Err(e.into()); + }; Ok(auction) } + #[instrument( + target = "metrics", + fields( + category = "db_queries", + result = "success", + name = "update_bid_status" + ), + skip_all + )] async fn update_bid_status( &self, bid: &entities::Bid, new_status: &T::BidStatusType, ) -> anyhow::Result { let update_query = T::get_update_bid_query(bid, new_status.clone())?; - let start = Instant::now(); - let query_result = update_query + let result = update_query .execute(self) .instrument(info_span!("db_update_bid_status")) - .await?; - let latency = start.elapsed().as_secs_f64(); - let made_change = query_result.rows_affected() > 0; - let labels: [(&str, String); 4] = [ - ("chain_id", bid.chain_id.to_string()), - ("db_query", "update_bid_status".to_string()), - ("status", T::convert_bid_status(new_status).to_string()), - ("made_change", made_change.to_string()), - ]; - metrics::counter!("db_queries_total", &labels).increment(1); - metrics::histogram!("db_queries_latency_seconds", &labels).record(latency); - Ok(query_result.rows_affected() > 0) + .await; + if let Err(e) = result { + tracing::Span::current().record("result", "error"); + return Err(e.into()); + } + Ok(result?.rows_affected() > 0) } } diff --git a/auction-server/src/opportunity/repository/models.rs b/auction-server/src/opportunity/repository/models.rs index e5172763..4387d21e 100644 --- a/auction-server/src/opportunity/repository/models.rs +++ b/auction-server/src/opportunity/repository/models.rs @@ -23,7 +23,6 @@ use { }, }, axum::async_trait, - axum_prometheus::metrics, ethers::types::{ Address, Bytes, @@ -51,16 +50,14 @@ use { }, QueryBuilder, }, - std::{ - fmt::Debug, - time::Instant, - }, + std::fmt::Debug, time::{ OffsetDateTime, PrimitiveDateTime, }, tracing::{ info_span, + instrument, Instrument, }, uuid::Uuid, @@ -182,11 +179,15 @@ pub trait Database: Debug + Send + Sync + 'static { } #[async_trait] impl Database for DB { + #[instrument( + target = "metrics", + fields(category = "db_queries", result = "success", name = "add_opportunity"), + skip_all + )] async fn add_opportunity(&self, opportunity: &T::Opportunity) -> Result<(), RestError> { let metadata = opportunity.get_models_metadata(); let chain_type = ::ModelMetadata::get_chain_type(); - let start = Instant::now(); - let query_result = sqlx::query!("INSERT INTO opportunity (id, + if let Err(e) = sqlx::query!("INSERT INTO opportunity (id, creation_time, permission_key, chain_id, @@ -208,18 +209,10 @@ impl Database for DB { .map_err(|e| { tracing::error!("DB: Failed to insert opportunity: {}", e); RestError::TemporarilyUnavailable - })?; - let latency = start.elapsed().as_secs_f64(); - let labels = [ - ("chain_id", opportunity.chain_id.to_string()), - ("db_query", "add_opportunity".to_string()), - ( - "made_change", - (query_result.rows_affected() > 0).to_string(), - ), - ]; - metrics::counter!("db_queries_total", &labels).increment(1); - metrics::histogram!("db_queries_duration_seconds", &labels).record(latency); + }) { + tracing::Span::current().record("result", "error"); + return Err(e); + } Ok(()) } @@ -275,33 +268,34 @@ impl Database for DB { )).collect() } + #[instrument( + target = "metrics", + fields( + category = "db_queries", + result = "success", + name = "remove_opportunities" + ), + skip_all + )] async fn remove_opportunities( &self, permission_key: PermissionKey, chain_id: ChainId, reason: OpportunityRemovalReason, ) -> anyhow::Result<()> { - let start = Instant::now(); let now = OffsetDateTime::now_utc(); - let query_result = sqlx::query("UPDATE opportunity SET removal_time = $1, removal_reason = $2 WHERE permission_key = $3 AND chain_id = $4 and removal_time IS NULL") + if let Err(e) = sqlx::query("UPDATE opportunity SET removal_time = $1, removal_reason = $2 WHERE permission_key = $3 AND chain_id = $4 and removal_time IS NULL") .bind(PrimitiveDateTime::new(now.date(), now.time())) .bind(reason) .bind(permission_key.as_ref()) .bind(&chain_id) .execute(self) .instrument(info_span!("db_remove_opportunities")) - .await?; - let latency = start.elapsed().as_secs_f64(); - let labels = [ - ("chain_id", chain_id.to_string()), - ("db_query", "remove_opportunities".to_string()), - ( - "made_change", - (query_result.rows_affected() > 0).to_string(), - ), - ]; - metrics::counter!("db_queries_total", &labels).increment(1); - metrics::histogram!("db_queries_duration_seconds", &labels).record(latency); + .await { + tracing::Span::current().record("result", "error"); + return Err(e.into()); + }; + Ok(()) } @@ -310,26 +304,17 @@ impl Database for DB { opportunity: &T::Opportunity, reason: OpportunityRemovalReason, ) -> anyhow::Result<()> { - let start = Instant::now(); let now = OffsetDateTime::now_utc(); - let query_result = sqlx::query("UPDATE opportunity SET removal_time = $1, removal_reason = $2 WHERE id = $3 AND removal_time IS NULL") + if let Err(e) = sqlx::query("UPDATE opportunity SET removal_time = $1, removal_reason = $2 WHERE id = $3 AND removal_time IS NULL") .bind(PrimitiveDateTime::new(now.date(), now.time())) .bind(reason) .bind(opportunity.id) .execute(self) .instrument(info_span!("db_remove_opportunity")) - .await?; - let latency = start.elapsed().as_secs_f64(); - let labels = [ - ("chain_id", opportunity.chain_id.to_string()), - ("db_query", "remove_opportunity".to_string()), - ( - "made_change", - (query_result.rows_affected() > 0).to_string(), - ), - ]; - metrics::counter!("db_queries_total", &labels).increment(1); - metrics::histogram!("db_queries_duration_seconds", &labels).record(latency); + .await { + tracing::Span::current().record("result", "error"); + return Err(e.into()); + }; Ok(()) } } From 8f0ba45b91ed2b1aa0a3a76dea30c126b441ec0f Mon Sep 17 00:00:00 2001 From: --systemdf Date: Fri, 21 Mar 2025 19:05:15 +0900 Subject: [PATCH 3/9] address query builders + clean up --- .../src/auction/repository/models.rs | 27 ++++++++++------ .../src/opportunity/repository/models.rs | 31 ++++++++++++++----- 2 files changed, 41 insertions(+), 17 deletions(-) diff --git a/auction-server/src/auction/repository/models.rs b/auction-server/src/auction/repository/models.rs index d911500a..b3a84f0e 100644 --- a/auction-server/src/auction/repository/models.rs +++ b/auction-server/src/auction/repository/models.rs @@ -708,13 +708,11 @@ impl Database for DB { serde_json::to_value(bid.metadata.clone()).expect("Failed to serialize metadata"), ).execute(self) .instrument(info_span!("db_add_bid")) - .await.map_err(|e| { - tracing::error!(error = e.to_string(), bid = ?bid, "DB: Failed to insert bid"); - RestError::TemporarilyUnavailable - }) { - tracing::Span::current().record("result", "error"); - return Err(e); - }; + .await { + tracing::Span::current().record("result", "error"); + tracing::error!(error = e.to_string(), bid = ?bid, "DB: Failed to insert bid"); + return Err(RestError::TemporarilyUnavailable); + }; Ok(()) } @@ -829,6 +827,11 @@ impl Database for DB { result } + #[instrument( + target = "metrics", + fields(category = "db_queries", result = "success", name = "get_bids"), + skip_all + )] async fn get_bids( &self, chain_id: ChainId, @@ -845,7 +848,7 @@ impl Database for DB { query.push_bind(from_time); } query.push(" ORDER BY initiation_time ASC LIMIT 20"); - query + let result = query .build_query_as() .fetch_all(self) .instrument(info_span!("db_get_bids")) @@ -853,7 +856,13 @@ impl Database for DB { .map_err(|e| { tracing::error!("DB: Failed to fetch bids: {}", e); RestError::TemporarilyUnavailable - }) + }); + if let Err(e) = result { + tracing::Span::current().record("result", "error"); + return Err(e); + }; + let bids: Vec> = result?; + Ok(bids) } #[instrument( diff --git a/auction-server/src/opportunity/repository/models.rs b/auction-server/src/opportunity/repository/models.rs index 4387d21e..de2c0028 100644 --- a/auction-server/src/opportunity/repository/models.rs +++ b/auction-server/src/opportunity/repository/models.rs @@ -205,17 +205,23 @@ impl Database for DB { serde_json::to_value(&opportunity.buy_tokens).expect("Failed to serialize buy_tokens")) .execute(self) .instrument(info_span!("db_add_opportunity")) - .await - .map_err(|e| { - tracing::error!("DB: Failed to insert opportunity: {}", e); - RestError::TemporarilyUnavailable - }) { + .await { tracing::Span::current().record("result", "error"); - return Err(e); + tracing::error!("DB: Failed to insert opportunity: {}", e); + return Err(RestError::TemporarilyUnavailable); } Ok(()) } + #[instrument( + target = "metrics", + fields( + category = "db_queries", + result = "success", + name = "get_opportunities" + ), + skip_all + )] async fn get_opportunities( &self, chain_id: ChainId, @@ -238,7 +244,7 @@ impl Database for DB { } query.push(" ORDER BY creation_time ASC LIMIT "); query.push_bind(super::OPPORTUNITY_PAGE_SIZE_CAP as i64); - let opps: Vec::ModelMetadata>> = query + let result = query .build_query_as() .fetch_all(self) .instrument(info_span!("db_get_opportunities")) @@ -252,7 +258,16 @@ impl Database for DB { from_time, ); RestError::TemporarilyUnavailable - })?; + }); + + if let Err(e) = result { + tracing::Span::current().record("result", "error"); + return Err(e); + } + + let opps: Vec< + models::Opportunity<::ModelMetadata>, + > = result?; opps.into_iter().map(|opp| opp.clone().try_into().map_err( |_| { From ffd0ae694458166a9aa8965fcdf1cd5a89e1ff51 Mon Sep 17 00:00:00 2001 From: --systemdf Date: Fri, 21 Mar 2025 19:15:47 +0900 Subject: [PATCH 4/9] forgot about remove opportunity --- auction-server/src/opportunity/repository/models.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/auction-server/src/opportunity/repository/models.rs b/auction-server/src/opportunity/repository/models.rs index de2c0028..8ec46fd2 100644 --- a/auction-server/src/opportunity/repository/models.rs +++ b/auction-server/src/opportunity/repository/models.rs @@ -314,6 +314,15 @@ impl Database for DB { Ok(()) } + #[instrument( + target = "metrics", + fields( + category = "db_queries", + result = "success", + name = "remove_opportunity" + ), + skip_all + )] async fn remove_opportunity( &self, opportunity: &T::Opportunity, From 43be94937eccc10346c9c34565ab77232b57f76b Mon Sep 17 00:00:00 2001 From: --systemdf Date: Fri, 21 Mar 2025 19:53:03 +0900 Subject: [PATCH 5/9] unify tracing & metrics in macro --- .../src/auction/repository/models.rs | 97 +++++++++++-------- auction-server/src/main.rs | 8 +- .../src/opportunity/repository/models.rs | 26 ++--- auction-server/src/per_metrics.rs | 11 ++- 4 files changed, 84 insertions(+), 58 deletions(-) diff --git a/auction-server/src/auction/repository/models.rs b/auction-server/src/auction/repository/models.rs index b3a84f0e..82e7c014 100644 --- a/auction-server/src/auction/repository/models.rs +++ b/auction-server/src/auction/repository/models.rs @@ -59,11 +59,7 @@ use { PrimitiveDateTime, UtcOffset, }, - tracing::{ - info_span, - instrument, - Instrument, - }, + tracing::instrument, }; #[derive(Clone, Debug, PartialEq, PartialOrd, sqlx::Type)] @@ -666,7 +662,12 @@ pub trait Database: Debug + Send + Sync + 'static { impl Database for DB { #[instrument( target = "metrics", - fields(category = "db_queries", result = "success", name = "add_auction"), + fields( + category = "db_queries", + result = "success", + name = "add_auction", + tracing_enabled + ), skip_all )] async fn add_auction(&self, auction: &entities::Auction) -> anyhow::Result<()> { @@ -681,7 +682,6 @@ impl Database for DB { auction.tx_hash.clone().map(|tx_hash| T::BidStatusType::convert_tx_hash(&tx_hash)), ) .execute(self) - .instrument(info_span!("db_add_auction")) .await { tracing::Span::current().record("result", "error"); return Err(e.into()); @@ -691,7 +691,12 @@ impl Database for DB { #[instrument( target = "metrics", - fields(category = "db_queries", result = "success", name = "add_bid"), + fields( + category = "db_queries", + result = "success", + name = "add_bid", + tracing_enabled + ), skip_all )] async fn add_bid(&self, bid: &Bid) -> Result<(), RestError> { @@ -707,18 +712,22 @@ impl Database for DB { bid.profile_id, serde_json::to_value(bid.metadata.clone()).expect("Failed to serialize metadata"), ).execute(self) - .instrument(info_span!("db_add_bid")) - .await { - tracing::Span::current().record("result", "error"); - tracing::error!(error = e.to_string(), bid = ?bid, "DB: Failed to insert bid"); - return Err(RestError::TemporarilyUnavailable); - }; + .await { + tracing::Span::current().record("result", "error"); + tracing::error!(error = e.to_string(), bid = ?bid, "DB: Failed to insert bid"); + return Err(RestError::TemporarilyUnavailable); + }; Ok(()) } #[instrument( target = "metrics", - fields(category = "db_queries", result = "success", name = "conclude_auction"), + fields( + category = "db_queries", + result = "success", + name = "conclude_auction", + tracing_enabled + ), skip_all )] async fn conclude_auction(&self, auction_id: entities::AuctionId) -> anyhow::Result<()> { @@ -729,7 +738,6 @@ impl Database for DB { auction_id, ) .execute(self) - .instrument(info_span!("db_conclude_auction")) .await { tracing::Span::current().record("result", "error"); @@ -740,7 +748,12 @@ impl Database for DB { #[instrument( target = "metrics", - fields(category = "db_queries", result = "success", name = "get_bid"), + fields( + category = "db_queries", + result = "success", + name = "get_bid", + tracing_enabled + ), skip_all )] async fn get_bid( @@ -752,7 +765,6 @@ impl Database for DB { .bind(bid_id) .bind(&chain_id) .fetch_one(self) - .instrument(info_span!("db_get_bid")) .await .map_err(|e| match e { sqlx::Error::RowNotFound => RestError::BidNotFound, @@ -775,14 +787,18 @@ impl Database for DB { #[instrument( target = "metrics", - fields(category = "db_queries", result = "success", name = "get_auction"), + fields( + category = "db_queries", + result = "success", + name = "get_auction", + tracing_enabled + ), skip_all )] async fn get_auction(&self, auction_id: entities::AuctionId) -> Result { let result = sqlx::query_as("SELECT * FROM auction WHERE id = $1") .bind(auction_id) .fetch_one(self) - .instrument(info_span!("db_get_auction")) .await .map_err(|e| { tracing::error!( @@ -804,7 +820,8 @@ impl Database for DB { fields( category = "db_queries", result = "success", - name = "get_auctions_by_bids" + name = "get_auctions_by_bids", + tracing_enabled ), skip_all )] @@ -814,7 +831,6 @@ impl Database for DB { let result = sqlx::query_as("SELECT * FROM auction WHERE id = ANY($1)") .bind(auction_ids) .fetch_all(self) - .instrument(info_span!("db_get_auctions_by_bids")) .await .map_err(|e| { tracing::error!("DB: Failed to fetch auctions: {}", e); @@ -829,7 +845,12 @@ impl Database for DB { #[instrument( target = "metrics", - fields(category = "db_queries", result = "success", name = "get_bids"), + fields( + category = "db_queries", + result = "success", + name = "get_bids", + tracing_enabled + ), skip_all )] async fn get_bids( @@ -848,15 +869,10 @@ impl Database for DB { query.push_bind(from_time); } query.push(" ORDER BY initiation_time ASC LIMIT 20"); - let result = query - .build_query_as() - .fetch_all(self) - .instrument(info_span!("db_get_bids")) - .await - .map_err(|e| { - tracing::error!("DB: Failed to fetch bids: {}", e); - RestError::TemporarilyUnavailable - }); + let result = query.build_query_as().fetch_all(self).await.map_err(|e| { + tracing::error!("DB: Failed to fetch bids: {}", e); + RestError::TemporarilyUnavailable + }); if let Err(e) = result { tracing::Span::current().record("result", "error"); return Err(e); @@ -867,7 +883,12 @@ impl Database for DB { #[instrument( target = "metrics", - fields(category = "db_queries", result = "success", name = "submit_auction"), + fields( + category = "db_queries", + result = "success", + name = "submit_auction", + tracing_enabled + ), skip_all )] async fn submit_auction( @@ -883,7 +904,7 @@ impl Database for DB { PrimitiveDateTime::new(now.date(), now.time()), T::BidStatusType::convert_tx_hash(transaction_hash), auction.id, - ).execute(self).instrument(info_span!("db_update_auction")).await { + ).execute(self).await { tracing::Span::current().record("result", "error"); return Err(e.into()); }; @@ -895,7 +916,8 @@ impl Database for DB { fields( category = "db_queries", result = "success", - name = "update_bid_status" + name = "update_bid_status", + tracing_enabled ), skip_all )] @@ -905,10 +927,7 @@ impl Database for DB { new_status: &T::BidStatusType, ) -> anyhow::Result { let update_query = T::get_update_bid_query(bid, new_status.clone())?; - let result = update_query - .execute(self) - .instrument(info_span!("db_update_bid_status")) - .await; + let result = update_query.execute(self).await; if let Err(e) = result { tracing::Span::current().record("result", "error"); return Err(e.into()); diff --git a/auction-server/src/main.rs b/auction-server/src/main.rs index 5c0ea1e1..3170763e 100644 --- a/auction-server/src/main.rs +++ b/auction-server/src/main.rs @@ -74,9 +74,9 @@ async fn main() -> Result<()> { let telemetry = tracing_opentelemetry::layer().with_tracer(tracer); let registry = tracing_subscriber::registry() - .with(MetricsLayer.with_filter(filter::filter_fn(is_metrics))) + .with(MetricsLayer.with_filter(filter::filter_fn(|metadata| is_metrics(metadata, false)))) .with(telemetry.with_filter(filter::filter_fn(|metadata| { - !is_metrics(metadata) && is_internal(metadata) + is_internal(metadata) || is_metrics(metadata, true) }))); if std::io::stderr().is_terminal() { @@ -86,7 +86,7 @@ async fn main() -> Result<()> { .compact() .with_filter(LevelFilter::INFO) .with_filter(filter::filter_fn(|metadata| { - !is_metrics(metadata) && is_internal(metadata) + is_internal(metadata) || is_metrics(metadata, true) })), ) .init(); @@ -97,7 +97,7 @@ async fn main() -> Result<()> { .json() .with_filter(LevelFilter::INFO) .with_filter(filter::filter_fn(|metadata| { - !is_metrics(metadata) && is_internal(metadata) + is_internal(metadata) || is_metrics(metadata, true) })), ) .init(); diff --git a/auction-server/src/opportunity/repository/models.rs b/auction-server/src/opportunity/repository/models.rs index 8ec46fd2..beca9436 100644 --- a/auction-server/src/opportunity/repository/models.rs +++ b/auction-server/src/opportunity/repository/models.rs @@ -55,11 +55,7 @@ use { OffsetDateTime, PrimitiveDateTime, }, - tracing::{ - info_span, - instrument, - Instrument, - }, + tracing::instrument, uuid::Uuid, }; @@ -181,7 +177,12 @@ pub trait Database: Debug + Send + Sync + 'static { impl Database for DB { #[instrument( target = "metrics", - fields(category = "db_queries", result = "success", name = "add_opportunity"), + fields( + category = "db_queries", + result = "success", + name = "add_opportunity", + tracing_enabled + ), skip_all )] async fn add_opportunity(&self, opportunity: &T::Opportunity) -> Result<(), RestError> { @@ -204,7 +205,6 @@ impl Database for DB { serde_json::to_value(&opportunity.sell_tokens).expect("Failed to serialize sell_tokens"), serde_json::to_value(&opportunity.buy_tokens).expect("Failed to serialize buy_tokens")) .execute(self) - .instrument(info_span!("db_add_opportunity")) .await { tracing::Span::current().record("result", "error"); tracing::error!("DB: Failed to insert opportunity: {}", e); @@ -218,7 +218,8 @@ impl Database for DB { fields( category = "db_queries", result = "success", - name = "get_opportunities" + name = "get_opportunities", + tracing_enabled ), skip_all )] @@ -247,7 +248,6 @@ impl Database for DB { let result = query .build_query_as() .fetch_all(self) - .instrument(info_span!("db_get_opportunities")) .await .map_err(|e| { tracing::error!( @@ -288,7 +288,8 @@ impl Database for DB { fields( category = "db_queries", result = "success", - name = "remove_opportunities" + name = "remove_opportunities", + tracing_enabled ), skip_all )] @@ -305,7 +306,6 @@ impl Database for DB { .bind(permission_key.as_ref()) .bind(&chain_id) .execute(self) - .instrument(info_span!("db_remove_opportunities")) .await { tracing::Span::current().record("result", "error"); return Err(e.into()); @@ -319,7 +319,8 @@ impl Database for DB { fields( category = "db_queries", result = "success", - name = "remove_opportunity" + name = "remove_opportunity", + tracing_enabled ), skip_all )] @@ -334,7 +335,6 @@ impl Database for DB { .bind(reason) .bind(opportunity.id) .execute(self) - .instrument(info_span!("db_remove_opportunity")) .await { tracing::Span::current().record("result", "error"); return Err(e.into()); diff --git a/auction-server/src/per_metrics.rs b/auction-server/src/per_metrics.rs index 297b4b71..61cba2f9 100644 --- a/auction-server/src/per_metrics.rs +++ b/auction-server/src/per_metrics.rs @@ -91,8 +91,15 @@ impl MetricsLayerData { } } -pub fn is_metrics(metadata: &Metadata) -> bool { - metadata.target().starts_with("metrics") +pub fn is_metrics(metadata: &Metadata, check_tracing_enabled: bool) -> bool { + let tracing_check = match check_tracing_enabled { + true => metadata + .fields() + .iter() + .any(|f| f.name() == "tracing_enabled"), + false => true, + }; + tracing_check && (metadata.target().starts_with("metrics")) } impl Layer for MetricsLayer From a4f5b525da206450fb7506586c42502cb51d0a4a Mon Sep 17 00:00:00 2001 From: --systemdf Date: Fri, 21 Mar 2025 20:08:02 +0900 Subject: [PATCH 6/9] dont need this no more --- auction-server/src/auction/repository/models.rs | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/auction-server/src/auction/repository/models.rs b/auction-server/src/auction/repository/models.rs index 82e7c014..dd402e9a 100644 --- a/auction-server/src/auction/repository/models.rs +++ b/auction-server/src/auction/repository/models.rs @@ -96,21 +96,6 @@ pub enum BidStatus { Cancelled, } -impl std::fmt::Display for BidStatus { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - BidStatus::Pending => write!(f, "pending"), - BidStatus::AwaitingSignature => write!(f, "awaiting_signature"), - BidStatus::Submitted => write!(f, "submitted"), - BidStatus::Lost => write!(f, "lost"), - BidStatus::Won => write!(f, "won"), - BidStatus::Failed => write!(f, "failed"), - BidStatus::Expired => write!(f, "expired"), - BidStatus::Cancelled => write!(f, "cancelled"), - } - } -} - #[derive(Clone, Debug, Serialize, Deserialize)] pub struct BundleIndex(pub Option); impl Deref for BundleIndex { From 410896bbfa25c808c97490f76a6488904fd1ca48 Mon Sep 17 00:00:00 2001 From: --systemdf Date: Fri, 21 Mar 2025 20:13:32 +0900 Subject: [PATCH 7/9] improvement --- auction-server/src/per_metrics.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/auction-server/src/per_metrics.rs b/auction-server/src/per_metrics.rs index 61cba2f9..e47145d4 100644 --- a/auction-server/src/per_metrics.rs +++ b/auction-server/src/per_metrics.rs @@ -92,13 +92,11 @@ impl MetricsLayerData { } pub fn is_metrics(metadata: &Metadata, check_tracing_enabled: bool) -> bool { - let tracing_check = match check_tracing_enabled { - true => metadata + let tracing_check = !check_tracing_enabled + || metadata .fields() .iter() - .any(|f| f.name() == "tracing_enabled"), - false => true, - }; + .any(|f| f.name() == "tracing_enabled"); tracing_check && (metadata.target().starts_with("metrics")) } From 042d60a7cd511504acd7c52654080af48848c4b2 Mon Sep 17 00:00:00 2001 From: --systemdf Date: Mon, 24 Mar 2025 15:36:54 +0900 Subject: [PATCH 8/9] address comments --- .../src/auction/repository/models.rs | 90 ++++++++----------- .../src/opportunity/repository/models.rs | 43 ++++----- 2 files changed, 58 insertions(+), 75 deletions(-) diff --git a/auction-server/src/auction/repository/models.rs b/auction-server/src/auction/repository/models.rs index dd402e9a..4fd754da 100644 --- a/auction-server/src/auction/repository/models.rs +++ b/auction-server/src/auction/repository/models.rs @@ -647,6 +647,7 @@ pub trait Database: Debug + Send + Sync + 'static { impl Database for DB { #[instrument( target = "metrics", + name = "db_add_auction", fields( category = "db_queries", result = "success", @@ -656,7 +657,7 @@ impl Database for DB { skip_all )] async fn add_auction(&self, auction: &entities::Auction) -> anyhow::Result<()> { - if let Err(e) = sqlx::query!( + sqlx::query!( "INSERT INTO auction (id, creation_time, permission_key, chain_id, chain_type, bid_collection_time, tx_hash) VALUES ($1, $2, $3, $4, $5, $6, $7)", auction.id, PrimitiveDateTime::new(auction.creation_time.date(), auction.creation_time.time()), @@ -667,15 +668,16 @@ impl Database for DB { auction.tx_hash.clone().map(|tx_hash| T::BidStatusType::convert_tx_hash(&tx_hash)), ) .execute(self) - .await { + .await + .inspect_err(|_| { tracing::Span::current().record("result", "error"); - return Err(e.into()); - }; + })?; Ok(()) } #[instrument( target = "metrics", + name = "db_add_bid", fields( category = "db_queries", result = "success", @@ -685,7 +687,7 @@ impl Database for DB { skip_all )] async fn add_bid(&self, bid: &Bid) -> Result<(), RestError> { - if let Err(e) = sqlx::query!("INSERT INTO bid (id, creation_time, permission_key, chain_id, chain_type, bid_amount, status, initiation_time, profile_id, metadata) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)", + sqlx::query!("INSERT INTO bid (id, creation_time, permission_key, chain_id, chain_type, bid_amount, status, initiation_time, profile_id, metadata) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)", bid.id, bid.creation_time, bid.permission_key, @@ -697,16 +699,18 @@ impl Database for DB { bid.profile_id, serde_json::to_value(bid.metadata.clone()).expect("Failed to serialize metadata"), ).execute(self) - .await { + .await + .map_err(|e| { tracing::Span::current().record("result", "error"); tracing::error!(error = e.to_string(), bid = ?bid, "DB: Failed to insert bid"); - return Err(RestError::TemporarilyUnavailable); - }; + RestError::TemporarilyUnavailable + })?; Ok(()) } #[instrument( target = "metrics", + name = "db_conclude_auction", fields( category = "db_queries", result = "success", @@ -717,22 +721,22 @@ impl Database for DB { )] async fn conclude_auction(&self, auction_id: entities::AuctionId) -> anyhow::Result<()> { let now = OffsetDateTime::now_utc(); - if let Err(e) = sqlx::query!( + sqlx::query!( "UPDATE auction SET conclusion_time = $1 WHERE id = $2 AND conclusion_time IS NULL", PrimitiveDateTime::new(now.date(), now.time()), auction_id, ) .execute(self) .await - { + .inspect_err(|_| { tracing::Span::current().record("result", "error"); - return Err(e.into()); - }; + })?; Ok(()) } #[instrument( target = "metrics", + name = "db_get_bid", fields( category = "db_queries", result = "success", @@ -746,7 +750,7 @@ impl Database for DB { bid_id: entities::BidId, chain_id: ChainId, ) -> Result, RestError> { - let result = sqlx::query_as("SELECT * FROM bid WHERE id = $1 AND chain_id = $2") + sqlx::query_as("SELECT * FROM bid WHERE id = $1 AND chain_id = $2") .bind(bid_id) .bind(&chain_id) .fetch_one(self) @@ -754,6 +758,7 @@ impl Database for DB { .map_err(|e| match e { sqlx::Error::RowNotFound => RestError::BidNotFound, _ => { + tracing::Span::current().record("result", "error"); tracing::error!( error = e.to_string(), bid_id = bid_id.to_string(), @@ -761,17 +766,12 @@ impl Database for DB { ); RestError::TemporarilyUnavailable } - }); - - if let Err(e) = result { - tracing::Span::current().record("result", "error"); - return Err(e); - }; - result + }) } #[instrument( target = "metrics", + name = "db_get_auction", fields( category = "db_queries", result = "success", @@ -781,27 +781,24 @@ impl Database for DB { skip_all )] async fn get_auction(&self, auction_id: entities::AuctionId) -> Result { - let result = sqlx::query_as("SELECT * FROM auction WHERE id = $1") + sqlx::query_as("SELECT * FROM auction WHERE id = $1") .bind(auction_id) .fetch_one(self) .await .map_err(|e| { + tracing::Span::current().record("result", "error"); tracing::error!( error = e.to_string(), auction_id = auction_id.to_string(), "Failed to get auction from db" ); RestError::TemporarilyUnavailable - }); - if let Err(e) = result { - tracing::Span::current().record("result", "error"); - return Err(e); - }; - result + }) } #[instrument( target = "metrics", + name = "db_get_auctions_by_bids", fields( category = "db_queries", result = "success", @@ -813,23 +810,20 @@ impl Database for DB { async fn get_auctions_by_bids(&self, bids: &[Bid]) -> Result, RestError> { let auction_ids: Vec = bids.iter().filter_map(|bid| bid.auction_id).collect(); - let result = sqlx::query_as("SELECT * FROM auction WHERE id = ANY($1)") + sqlx::query_as("SELECT * FROM auction WHERE id = ANY($1)") .bind(auction_ids) .fetch_all(self) .await .map_err(|e| { + tracing::Span::current().record("result", "error"); tracing::error!("DB: Failed to fetch auctions: {}", e); RestError::TemporarilyUnavailable - }); - if let Err(e) = result { - tracing::Span::current().record("result", "error"); - return Err(e); - } - result + }) } #[instrument( target = "metrics", + name = "db_get_bids", fields( category = "db_queries", result = "success", @@ -854,20 +848,16 @@ impl Database for DB { query.push_bind(from_time); } query.push(" ORDER BY initiation_time ASC LIMIT 20"); - let result = query.build_query_as().fetch_all(self).await.map_err(|e| { + query.build_query_as().fetch_all(self).await.map_err(|e| { + tracing::Span::current().record("result", "error"); tracing::error!("DB: Failed to fetch bids: {}", e); RestError::TemporarilyUnavailable - }); - if let Err(e) = result { - tracing::Span::current().record("result", "error"); - return Err(e); - }; - let bids: Vec> = result?; - Ok(bids) + }) } #[instrument( target = "metrics", + name = "db_submit_auction", fields( category = "db_queries", result = "success", @@ -885,19 +875,19 @@ impl Database for DB { let now = OffsetDateTime::now_utc(); auction.tx_hash = Some(transaction_hash.clone()); auction.submission_time = Some(now); - if let Err(e) = sqlx::query!("UPDATE auction SET submission_time = $1, tx_hash = $2 WHERE id = $3 AND submission_time IS NULL", + sqlx::query!("UPDATE auction SET submission_time = $1, tx_hash = $2 WHERE id = $3 AND submission_time IS NULL", PrimitiveDateTime::new(now.date(), now.time()), T::BidStatusType::convert_tx_hash(transaction_hash), auction.id, - ).execute(self).await { + ).execute(self).await.inspect_err(|_| { tracing::Span::current().record("result", "error"); - return Err(e.into()); - }; + })?; Ok(auction) } #[instrument( target = "metrics", + name = "db_update_bid_status", fields( category = "db_queries", result = "success", @@ -912,11 +902,9 @@ impl Database for DB { new_status: &T::BidStatusType, ) -> anyhow::Result { let update_query = T::get_update_bid_query(bid, new_status.clone())?; - let result = update_query.execute(self).await; - if let Err(e) = result { + let result = update_query.execute(self).await.inspect_err(|_| { tracing::Span::current().record("result", "error"); - return Err(e.into()); - } - Ok(result?.rows_affected() > 0) + })?; + Ok(result.rows_affected() > 0) } } diff --git a/auction-server/src/opportunity/repository/models.rs b/auction-server/src/opportunity/repository/models.rs index beca9436..f54f82dc 100644 --- a/auction-server/src/opportunity/repository/models.rs +++ b/auction-server/src/opportunity/repository/models.rs @@ -177,6 +177,7 @@ pub trait Database: Debug + Send + Sync + 'static { impl Database for DB { #[instrument( target = "metrics", + name = "db_add_opportunity", fields( category = "db_queries", result = "success", @@ -188,7 +189,7 @@ impl Database for DB { async fn add_opportunity(&self, opportunity: &T::Opportunity) -> Result<(), RestError> { let metadata = opportunity.get_models_metadata(); let chain_type = ::ModelMetadata::get_chain_type(); - if let Err(e) = sqlx::query!("INSERT INTO opportunity (id, + sqlx::query!("INSERT INTO opportunity (id, creation_time, permission_key, chain_id, @@ -205,16 +206,17 @@ impl Database for DB { serde_json::to_value(&opportunity.sell_tokens).expect("Failed to serialize sell_tokens"), serde_json::to_value(&opportunity.buy_tokens).expect("Failed to serialize buy_tokens")) .execute(self) - .await { + .await.map_err(|e| { tracing::Span::current().record("result", "error"); tracing::error!("DB: Failed to insert opportunity: {}", e); - return Err(RestError::TemporarilyUnavailable); - } + RestError::TemporarilyUnavailable + })?; Ok(()) } #[instrument( target = "metrics", + name = "db_get_opportunities", fields( category = "db_queries", result = "success", @@ -245,11 +247,12 @@ impl Database for DB { } query.push(" ORDER BY creation_time ASC LIMIT "); query.push_bind(super::OPPORTUNITY_PAGE_SIZE_CAP as i64); - let result = query + let opps: Vec::ModelMetadata>,> = query .build_query_as() .fetch_all(self) .await .map_err(|e| { + tracing::Span::current().record("result", "error"); tracing::error!( "DB: Failed to fetch opportunities: {} - chain_id: {:?} - permission_key: {:?} - from_time: {:?}", e, @@ -258,16 +261,7 @@ impl Database for DB { from_time, ); RestError::TemporarilyUnavailable - }); - - if let Err(e) = result { - tracing::Span::current().record("result", "error"); - return Err(e); - } - - let opps: Vec< - models::Opportunity<::ModelMetadata>, - > = result?; + })?; opps.into_iter().map(|opp| opp.clone().try_into().map_err( |_| { @@ -285,6 +279,7 @@ impl Database for DB { #[instrument( target = "metrics", + name = "db_remove_opportunities", fields( category = "db_queries", result = "success", @@ -300,22 +295,22 @@ impl Database for DB { reason: OpportunityRemovalReason, ) -> anyhow::Result<()> { let now = OffsetDateTime::now_utc(); - if let Err(e) = sqlx::query("UPDATE opportunity SET removal_time = $1, removal_reason = $2 WHERE permission_key = $3 AND chain_id = $4 and removal_time IS NULL") + sqlx::query("UPDATE opportunity SET removal_time = $1, removal_reason = $2 WHERE permission_key = $3 AND chain_id = $4 and removal_time IS NULL") .bind(PrimitiveDateTime::new(now.date(), now.time())) .bind(reason) .bind(permission_key.as_ref()) .bind(&chain_id) .execute(self) - .await { + .await + .inspect_err(|_| { tracing::Span::current().record("result", "error"); - return Err(e.into()); - }; - + })?; Ok(()) } #[instrument( target = "metrics", + name = "db_remove_opportunity", fields( category = "db_queries", result = "success", @@ -330,15 +325,15 @@ impl Database for DB { reason: OpportunityRemovalReason, ) -> anyhow::Result<()> { let now = OffsetDateTime::now_utc(); - if let Err(e) = sqlx::query("UPDATE opportunity SET removal_time = $1, removal_reason = $2 WHERE id = $3 AND removal_time IS NULL") + sqlx::query("UPDATE opportunity SET removal_time = $1, removal_reason = $2 WHERE id = $3 AND removal_time IS NULL") .bind(PrimitiveDateTime::new(now.date(), now.time())) .bind(reason) .bind(opportunity.id) .execute(self) - .await { + .await + .inspect_err(|_| { tracing::Span::current().record("result", "error"); - return Err(e.into()); - }; + })?; Ok(()) } } From 17b2106cd20c27a09698bf6bcd630e8bfaeb007e Mon Sep 17 00:00:00 2001 From: --systemdf Date: Mon, 24 Mar 2025 17:38:19 +0900 Subject: [PATCH 9/9] address comma :D --- auction-server/src/opportunity/repository/models.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/auction-server/src/opportunity/repository/models.rs b/auction-server/src/opportunity/repository/models.rs index f54f82dc..d0268e59 100644 --- a/auction-server/src/opportunity/repository/models.rs +++ b/auction-server/src/opportunity/repository/models.rs @@ -247,7 +247,7 @@ impl Database for DB { } query.push(" ORDER BY creation_time ASC LIMIT "); query.push_bind(super::OPPORTUNITY_PAGE_SIZE_CAP as i64); - let opps: Vec::ModelMetadata>,> = query + let opps: Vec::ModelMetadata>> = query .build_query_as() .fetch_all(self) .await