// SPDX-FileCopyrightText: 2024 Aravinth Manivannan // // SPDX-License-Identifier: AGPL-3.0-or-later use async_trait::async_trait; use cqrs_es::persist::{PersistenceError, ViewContext, ViewRepository}; use cqrs_es::{EventEnvelope, Query, View}; use serde::{Deserialize, Serialize}; use time::OffsetDateTime; use uuid::Uuid; use super::errors::*; use super::BillingDBPostgresAdapter; use crate::billing::domain::bill_aggregate::Bill; use crate::billing::domain::events::BillingEvent; use crate::utils::parse_aggregate_id::parse_aggregate_id; pub const NEW_BILL_NON_UUID: &str = "billing_new_bill_non_uuid-asdfa"; // The view for a Bill query, for a standard http application this should // be designed to reflect the response dto that will be returned to a user. #[derive(Debug, Serialize, Deserialize)] pub struct BillView { created_time: OffsetDateTime, store_id: Uuid, bill_id: Uuid, token_number: i32, total_price_minor: Option, total_price_major: Option, total_price_currency: Option, deleted: bool, } impl Default for BillView { fn default() -> Self { Self { created_time: OffsetDateTime::now_utc(), store_id: Default::default(), bill_id: Default::default(), token_number: Default::default(), total_price_minor: Default::default(), total_price_major: Default::default(), total_price_currency: Default::default(), deleted: false, } } } // This updates the view with events as they are committed. // The logic should be minimal here, e.g., don't calculate the account balance, // design the events to carry the balance information instead. impl View for BillView { fn update(&mut self, event: &EventEnvelope) { if let BillingEvent::BillAdded(val) = &event.payload { self.created_time = val.bill().created_time().clone(); self.store_id = *val.bill().store_id(); self.bill_id = *val.bill().bill_id(); self.token_number = *val.bill().token_number() as i32; self.total_price_minor = val.bill().total_price().as_ref().map(|t| *t.minor() as i32); self.total_price_major = val.bill().total_price().as_ref().map(|t| *t.major() as i32); self.total_price_currency = val .bill() .total_price() .as_ref() .map(|t| t.currency().to_string()); self.deleted = false; } } } #[async_trait] impl ViewRepository for BillingDBPostgresAdapter { async fn load(&self, bill_id: &str) -> Result, PersistenceError> { let bill_id = match parse_aggregate_id(bill_id, NEW_BILL_NON_UUID)? { Some((val, _)) => return Ok(Some(val)), None => Uuid::parse_str(bill_id).unwrap(), }; let res = sqlx::query_as!( BillView, "SELECT created_time, store_id, bill_id, token_number, total_price_major, total_price_minor, total_price_currency, deleted FROM cqrs_billing_bill_query WHERE bill_id = $1;", bill_id ) .fetch_one(&self.pool) .await .map_err(PostgresAggregateError::from)?; Ok(Some(res)) } async fn load_with_context( &self, bill_id: &str, ) -> Result, PersistenceError> { let bill_id = match parse_aggregate_id(bill_id, NEW_BILL_NON_UUID)? { Some(val) => return Ok(Some(val)), None => Uuid::parse_str(bill_id).unwrap(), }; let res = sqlx::query_as!( BillView, "SELECT created_time, store_id, bill_id, token_number, total_price_major, total_price_minor, total_price_currency, deleted FROM cqrs_billing_bill_query WHERE bill_id = $1;", &bill_id, ) .fetch_one(&self.pool) .await .map_err(PostgresAggregateError::from)?; struct Context { version: i64, bill_id: Uuid, } let ctx = sqlx::query_as!( Context, "SELECT bill_id, version FROM cqrs_billing_bill_query WHERE bill_id = $1;", bill_id ) .fetch_one(&self.pool) .await .map_err(PostgresAggregateError::from)?; let view_context = ViewContext::new(ctx.bill_id.to_string(), ctx.version); Ok(Some((res, view_context))) } async fn update_view( &self, view: BillView, context: ViewContext, ) -> Result<(), PersistenceError> { match context.version { 0 => { let version = context.version + 1; sqlx::query!( "INSERT INTO cqrs_billing_bill_query ( version, created_time, store_id, bill_id, token_number, total_price_major, total_price_minor, total_price_currency, deleted ) VALUES ( $1, $2, $3, $4, $5, $6, $7, $8, $9 );", version, view.created_time, view.store_id, view.bill_id, view.token_number, view.total_price_major, view.total_price_minor, view.total_price_currency, view.deleted, ) .execute(&self.pool) .await .map_err(PostgresAggregateError::from)?; } _ => { let version = context.version + 1; sqlx::query!( "UPDATE cqrs_billing_bill_query SET version = $1, created_time = $2, store_id = $3, bill_id = $4, token_number = $5, total_price_major = $6, total_price_minor = $7, total_price_currency = $8, deleted = $9;", version, view.created_time, view.store_id, view.bill_id, view.token_number, view.total_price_major, view.total_price_minor, view.total_price_currency, view.deleted, ) .execute(&self.pool) .await .map_err(PostgresAggregateError::from)?; } } Ok(()) } } pub struct SimpleLoggingQuery {} // Our simplest query, this is great for debugging but absolutely useless in production. // This query just pretty prints the events as they are processed. #[async_trait] impl Query for SimpleLoggingQuery { async fn dispatch(&self, aggregate_id: &str, events: &[EventEnvelope]) { for event in events { let payload = serde_json::to_string_pretty(&event.payload).unwrap(); println!("{}-{}\n{}", aggregate_id, event.sequence, payload); } } } #[async_trait] impl Query for BillingDBPostgresAdapter { async fn dispatch(&self, bill_id: &str, events: &[EventEnvelope]) { let res = self .load_with_context(bill_id) .await .unwrap_or_else(|_| Some((BillView::default(), ViewContext::new(bill_id.into(), 0)))); let (mut view, view_context): (BillView, ViewContext) = res.unwrap(); for event in events { view.update(event); } self.update_view(view, view_context).await.unwrap(); } } // Our second query, this one will be handled with Postgres `GenericQuery` // which will serialize and persist our view after it is updated. It also // provides a `load` method to deserialize the view on request. //pub type BillQuery = GenericQuery; //pub type BillQuery = Query; //#[cfg(test)] //mod tests { // use super::*; // // use postgres_es::PostgresCqrs; // // use crate::{ // db::migrate::*, // billing::{ // application::services::{ // add_category_service::tests::mock_add_category_service, add_customization_service::tests::mock_add_customization_service, add_line_item_service::tests::mock_add_line_item_service, add_product_service::tests::mock_add_product_service, add_bill_service::AddBillServiceBuilder, update_category_service::tests::mock_update_category_service, update_customization_service::tests::mock_update_customization_service, update_product_service::tests::mock_update_product_service, update_bill_service::tests::mock_update_bill_service, BillingServicesBuilder // }, // domain::{ // add_category_command::AddCategoryCommand, add_customization_command, // add_product_command::tests::get_command, add_bill_command::AddBillCommand, // commands::BillingCommand, // update_category_command::tests::get_update_category_command, // update_customization_command::tests::get_update_customization_command, // update_product_command, update_bill_command::tests::get_update_bill_cmd, // }, // }, // tests::bdd::IS_NEVER_CALLED, // utils::{random_string::GenerateRandomStringInterface, uuid::tests::UUID}, // }; // use std::sync::Arc; // // #[actix_rt::test] // async fn pg_query() { // let settings = crate::settings::tests::get_settings().await; // //let settings = crate::settings::Settings::new().unwrap(); // settings.create_db().await; // // let db = crate::db::sqlx_postgres::Postgres::init(&settings.database.url).await; // db.migrate().await; // let db = BillingDBPostgresAdapter::new(db.pool.clone()); // // let simple_query = SimpleLoggingQuery {}; // // let queries: Vec>> = // vec![Box::new(simple_query), Box::new(db.clone())]; // // let services = BillingServicesBuilder::default() // .add_bill(Arc::new( // AddBillServiceBuilder::default() // .db_bill_id_exists(Arc::new(db.clone())) // .db_bill_name_exists(Arc::new(db.clone())) // .get_uuid(Arc::new(crate::utils::uuid::GenerateUUID {})) // .build() // .unwrap(), // )) // .add_category(mock_add_category_service( // IS_NEVER_CALLED, // AddCategoryCommand::new("foo".into(), None, UUID, UUID).unwrap(), // )) // .add_product(mock_add_product_service(IS_NEVER_CALLED, get_command())) // .add_customization(mock_add_customization_service( // IS_NEVER_CALLED, // add_customization_command::tests::get_command(), // )) // .update_product(mock_update_product_service( // IS_NEVER_CALLED, // update_product_command::tests::get_command(), // )) // .update_customization(mock_update_customization_service( // IS_NEVER_CALLED, // get_update_customization_command(), // )) // .update_category(mock_update_category_service( // IS_NEVER_CALLED, // get_update_category_command(), // )) // .update_bill(mock_update_bill_service( // IS_NEVER_CALLED, // get_update_bill_cmd(), // )) // .build() // .unwrap(); // // let (cqrs, _bill_query): ( // Arc>, // Arc>, // ) = ( // Arc::new(postgres_es::postgres_cqrs( // db.pool.clone(), // queries, // Arc::new(services), // )), // Arc::new(db.clone()), // ); // // let rand = crate::utils::random_string::GenerateRandomString {}; // let cmd = AddBillCommand::new(rand.get_random(10), None, UUID).unwrap(); // cqrs.execute("", BillingCommand::AddBill(cmd.clone())) // .await // .unwrap(); // // settings.drop_db().await; // } //}