// SPDX-FileCopyrightText: 2024 Aravinth Manivannan // // SPDX-License-Identifier: AGPL-3.0-or-later use std::str::FromStr; use async_trait::async_trait; use cqrs_es::persist::{PersistenceError, ViewContext, ViewRepository}; use cqrs_es::{EventEnvelope, Query, View}; use serde::{Deserialize, Serialize}; use time::OffsetDateTime; use uuid::Uuid; use super::errors::*; use super::BillingDBPostgresAdapter; use crate::billing::domain::bill_aggregate::{Bill, BillBuilder}; use crate::billing::domain::events::BillingEvent; use crate::types::currency::{self, Currency, PriceBuilder}; use crate::utils::parse_aggregate_id::parse_aggregate_id; pub const NEW_BILL_NON_UUID: &str = "billing_new_bill_non_uuid-asdfa"; // The view for a Bill query, for a standard http application this should // be designed to reflect the response dto that will be returned to a user. #[derive(Debug, Serialize, Deserialize)] pub struct BillView { created_time: OffsetDateTime, store_id: Uuid, bill_id: Uuid, token_number: i32, total_price_minor: Option, total_price_major: Option, total_price_currency: Option, deleted: bool, } impl From for Bill { fn from(v: BillView) -> Self { let price = match ( v.total_price_minor, v.total_price_major, v.total_price_currency, ) { (Some(minor), Some(major), Some(currency)) => Some( PriceBuilder::default() .major(major as usize) .minor(minor as usize) .currency(Currency::from_str(¤cy).unwrap()) .build() .unwrap(), ), _ => None, }; BillBuilder::default() .created_time(v.created_time) .store_id(v.store_id) .bill_id(v.bill_id) .token_number(v.token_number as usize) .total_price(price) .deleted(v.deleted) .build() .unwrap() } } impl Default for BillView { fn default() -> Self { Self { created_time: OffsetDateTime::now_utc(), store_id: Default::default(), bill_id: Default::default(), token_number: Default::default(), total_price_minor: Default::default(), total_price_major: Default::default(), total_price_currency: Default::default(), deleted: false, } } } impl BillView { fn merge(&mut self, bill: &Bill) { self.created_time = bill.created_time().clone(); self.store_id = *bill.store_id(); self.bill_id = *bill.bill_id(); self.token_number = *bill.token_number() as i32; self.total_price_minor = bill.total_price().as_ref().map(|t| *t.minor() as i32); self.total_price_major = bill.total_price().as_ref().map(|t| *t.major() as i32); self.total_price_currency = bill .total_price() .as_ref() .map(|t| t.currency().to_string()); } } // This updates the view with events as they are committed. // The logic should be minimal here, e.g., don't calculate the account balance, // design the events to carry the balance information instead. impl View for BillView { fn update(&mut self, event: &EventEnvelope) { match &event.payload { BillingEvent::BillAdded(val) => { self.merge(val.bill()); self.deleted = false; } BillingEvent::BillUpdated(e) => self.merge(e.new_bill()), BillingEvent::BillTotalPriceComputed(e) => { let total_price = e.total_price().clone(); self.total_price_minor = Some(*total_price.minor() as i32); self.total_price_major = Some(*total_price.major() as i32); self.total_price_currency = Some(total_price.currency().to_string()); } BillingEvent::BillDeleted(e) => self.deleted = true, _ => (), } } } #[async_trait] impl ViewRepository for BillingDBPostgresAdapter { async fn load(&self, bill_id: &str) -> Result, PersistenceError> { let bill_id = match parse_aggregate_id(bill_id, NEW_BILL_NON_UUID)? { Some((val, _)) => return Ok(Some(val)), None => Uuid::parse_str(bill_id).unwrap(), }; let res = sqlx::query_as!( BillView, "SELECT created_time, store_id, bill_id, token_number, total_price_major, total_price_minor, total_price_currency, deleted FROM cqrs_billing_bill_query WHERE bill_id = $1;", bill_id ) .fetch_one(&self.pool) .await .map_err(PostgresAggregateError::from)?; Ok(Some(res)) } async fn load_with_context( &self, bill_id: &str, ) -> Result, PersistenceError> { let bill_id = match parse_aggregate_id(bill_id, NEW_BILL_NON_UUID)? { Some(val) => return Ok(Some(val)), None => Uuid::parse_str(bill_id).unwrap(), }; let res = sqlx::query_as!( BillView, "SELECT created_time, store_id, bill_id, token_number, total_price_major, total_price_minor, total_price_currency, deleted FROM cqrs_billing_bill_query WHERE bill_id = $1;", &bill_id, ) .fetch_one(&self.pool) .await .map_err(PostgresAggregateError::from)?; struct Context { version: i64, bill_id: Uuid, } let ctx = sqlx::query_as!( Context, "SELECT bill_id, version FROM cqrs_billing_bill_query WHERE bill_id = $1;", bill_id ) .fetch_one(&self.pool) .await .map_err(PostgresAggregateError::from)?; let view_context = ViewContext::new(ctx.bill_id.to_string(), ctx.version); Ok(Some((res, view_context))) } async fn update_view( &self, view: BillView, context: ViewContext, ) -> Result<(), PersistenceError> { match context.version { 0 => { let version = context.version + 1; sqlx::query!( "INSERT INTO cqrs_billing_bill_query ( version, created_time, store_id, bill_id, token_number, total_price_major, total_price_minor, total_price_currency, deleted ) VALUES ( $1, $2, $3, $4, $5, $6, $7, $8, $9 );", version, view.created_time, view.store_id, view.bill_id, view.token_number, view.total_price_major, view.total_price_minor, view.total_price_currency, view.deleted, ) .execute(&self.pool) .await .map_err(PostgresAggregateError::from)?; } _ => { let version = context.version + 1; sqlx::query!( "UPDATE cqrs_billing_bill_query SET version = $1, created_time = $2, store_id = $3, token_number = $4, total_price_major = $5, total_price_minor = $6, total_price_currency = $7, deleted = $8;", version, view.created_time, view.store_id, view.token_number, view.total_price_major, view.total_price_minor, view.total_price_currency, view.deleted, ) .execute(&self.pool) .await .map_err(PostgresAggregateError::from)?; } } Ok(()) } } pub struct SimpleLoggingQuery {} // Our simplest query, this is great for debugging but absolutely useless in production. // This query just pretty prints the events as they are processed. #[async_trait] impl Query for SimpleLoggingQuery { async fn dispatch(&self, aggregate_id: &str, events: &[EventEnvelope]) { for event in events { let payload = serde_json::to_string_pretty(&event.payload).unwrap(); println!("{}-{}\n{}", aggregate_id, event.sequence, payload); } } } #[async_trait] impl Query for BillingDBPostgresAdapter { async fn dispatch(&self, bill_id: &str, events: &[EventEnvelope]) { let res = self .load_with_context(bill_id) .await .unwrap_or_else(|_| Some((BillView::default(), ViewContext::new(bill_id.into(), 0)))); let (mut view, view_context): (BillView, ViewContext) = res.unwrap(); for event in events { view.update(event); } self.update_view(view, view_context).await.unwrap(); } } #[cfg(test)] mod tests { use super::*; use postgres_es::PostgresCqrs; use crate::{ billing::{ application::services::{ add_bill_service::AddBillServiceBuilder, update_bill_service::*, MockBillingServicesInterface, }, domain::{ add_bill_command::*, commands::BillingCommand, store_aggregate::Store, update_bill_command::*, }, }, db::migrate::*, tests::bdd::*, utils::uuid::tests::*, }; use std::sync::Arc; #[actix_rt::test] async fn pg_query_billing_bill_view() { let settings = crate::settings::tests::get_settings().await; //let settings = crate::settings::Settings::new().unwrap(); settings.create_db().await; let db = crate::db::sqlx_postgres::Postgres::init(&settings.database.url).await; db.migrate().await; let db = BillingDBPostgresAdapter::new(db.pool.clone()); let simple_query = SimpleLoggingQuery {}; let queries: Vec>> = vec![Box::new(simple_query), Box::new(db.clone())]; let mut mock_services = MockBillingServicesInterface::new(); let store = Store::default(); crate::billing::adapters::output::db::postgres::store_id_exists::tests::create_dummy_store_record(&store, &db).await; let db2 = db.clone(); mock_services .expect_add_bill() .times(IS_CALLED_ONLY_ONCE.unwrap()) .returning(move || { Arc::new( AddBillServiceBuilder::default() .db_bill_id_exists(Arc::new(db2.clone())) .db_next_token_id(Arc::new(db2.clone())) .build() .unwrap(), ) }); let db2 = db.clone(); mock_services .expect_update_bill() .times(IS_CALLED_ONLY_ONCE.unwrap()) .returning(move || { Arc::new( UpdateBillServiceBuilder::default() .db_bill_id_exists(Arc::new(db2.clone())) .build() .unwrap(), ) }); let (cqrs, bill_query): ( Arc>, Arc>, ) = ( Arc::new(postgres_es::postgres_cqrs( db.pool.clone(), queries, Arc::new(mock_services), )), Arc::new(db.clone()), ); let cmd = AddBillCommandBuilder::default() .adding_by(UUID) .bill_id(UUID) .store_id(*store.store_id()) .build() .unwrap(); cqrs.execute( &cmd.bill_id().to_string(), BillingCommand::AddBill(cmd.clone()), ) .await .unwrap(); let bill = bill_query .load(&(*cmd.bill_id()).to_string()) .await .unwrap() .unwrap(); let bill: Bill = bill.into(); assert_eq!(bill.store_id(), cmd.store_id()); assert_eq!(bill.bill_id(), cmd.bill_id()); assert!(!bill.deleted()); let update_bill_cmd = UpdateBillCommandBuilder::default() .adding_by(UUID) .store_id(*store.store_id()) .total_price(None) .old_bill(bill.clone()) .build() .unwrap(); cqrs.execute( &cmd.bill_id().to_string(), BillingCommand::UpdateBill(update_bill_cmd.clone()), ) .await .unwrap(); let bill = bill_query .load(&(*cmd.bill_id()).to_string()) .await .unwrap() .unwrap(); let bill: Bill = bill.into(); assert_eq!(bill.store_id(), cmd.store_id()); assert_eq!(bill.bill_id(), update_bill_cmd.old_bill().bill_id()); assert_eq!(bill.total_price(), update_bill_cmd.total_price()); assert!(!bill.deleted()); settings.drop_db().await; } }