From 55c881adc886b6b88c5cb501c63b2ca910008653 Mon Sep 17 00:00:00 2001 From: Aravinth Manivannan Date: Sun, 14 Jul 2024 18:26:39 +0530 Subject: [PATCH] feat: use unique non-UUIDs to signal new aggregate creation, and check for the same in cqrs framework --- .../output/db/postgres/category_id_exists.rs | 5 +- .../category_name_exists_for_store.rs | 5 +- .../output/db/postgres/category_view.rs | 53 ++++--- .../adapters/output/db/postgres/mod.rs | 2 + .../output/db/postgres/store_id_exists.rs | 5 +- .../adapters/output/db/postgres/store_view.rs | 146 +++++++++++++++--- .../adapters/output/db/postgres/utils.rs | 27 ++++ .../services/add_category_service.rs | 1 + 8 files changed, 191 insertions(+), 53 deletions(-) create mode 100644 src/inventory/adapters/output/db/postgres/utils.rs diff --git a/src/inventory/adapters/output/db/postgres/category_id_exists.rs b/src/inventory/adapters/output/db/postgres/category_id_exists.rs index a385c9d..6152c65 100644 --- a/src/inventory/adapters/output/db/postgres/category_id_exists.rs +++ b/src/inventory/adapters/output/db/postgres/category_id_exists.rs @@ -59,9 +59,8 @@ mod tests { sqlx::query!( "INSERT INTO cqrs_inventory_category_query - (view_id, version, name, description, category_id, store_id) - VALUES ($1, $2, $3, $4, $5, $6);", - "1", + (version, name, description, category_id, store_id) + VALUES ($1, $2, $3, $4, $5);", 1, category.name(), category.description().as_ref().unwrap(), diff --git a/src/inventory/adapters/output/db/postgres/category_name_exists_for_store.rs b/src/inventory/adapters/output/db/postgres/category_name_exists_for_store.rs index 05c87ca..ef3ed59 100644 --- a/src/inventory/adapters/output/db/postgres/category_name_exists_for_store.rs +++ b/src/inventory/adapters/output/db/postgres/category_name_exists_for_store.rs @@ -64,9 +64,8 @@ mod tests { sqlx::query!( "INSERT INTO cqrs_inventory_category_query - (view_id, version, name, description, category_id, store_id) - VALUES ($1, $2, $3, $4, $5, $6);", - "1", + (version, name, description, category_id, store_id) + VALUES ($1, $2, $3, $4, $5);", 1, category.name(), category.description().as_ref().unwrap(), diff --git a/src/inventory/adapters/output/db/postgres/category_view.rs b/src/inventory/adapters/output/db/postgres/category_view.rs index 939a7a2..37f61ed 100644 --- a/src/inventory/adapters/output/db/postgres/category_view.rs +++ b/src/inventory/adapters/output/db/postgres/category_view.rs @@ -14,6 +14,8 @@ use crate::inventory::domain::category_aggregate::Category; use crate::inventory::domain::events::InventoryEvent; use serde::{Deserialize, Serialize}; +pub const NEW_CATEGORY_NON_UUID: &str = "new_category_non_uuid-asdfa"; + // The view for a Category query, for a standard http application this should // be designed to reflect the response dto that will be returned to a user. #[derive(Debug, Default, Serialize, Deserialize)] @@ -43,7 +45,13 @@ impl View for CategoryView { #[async_trait] impl ViewRepository for InventoryDBPostgresAdapter { - async fn load(&self, view_id: &str) -> Result, PersistenceError> { + async fn load(&self, category_id: &str) -> Result, PersistenceError> { + let category_id = + match super::utils::parse_aggregate_id(category_id, NEW_CATEGORY_NON_UUID)? { + Some((val, _)) => return Ok(Some(val)), + None => Uuid::parse_str(category_id).unwrap(), + }; + let res = sqlx::query_as!( CategoryView, "SELECT @@ -51,8 +59,8 @@ impl ViewRepository for InventoryDBPostgresAdapter { FROM cqrs_inventory_category_query WHERE - view_id = $1;", - view_id + category_id = $1;", + category_id ) .fetch_one(&self.pool) .await @@ -62,8 +70,14 @@ impl ViewRepository for InventoryDBPostgresAdapter { async fn load_with_context( &self, - view_id: &str, + category_id: &str, ) -> Result, PersistenceError> { + let category_id = + match super::utils::parse_aggregate_id(category_id, NEW_CATEGORY_NON_UUID)? { + Some(val) => return Ok(Some(val)), + None => Uuid::parse_str(category_id).unwrap(), + }; + let res = sqlx::query_as!( CategoryView, "SELECT @@ -71,8 +85,8 @@ impl ViewRepository for InventoryDBPostgresAdapter { FROM cqrs_inventory_category_query WHERE - view_id = $1;", - view_id + category_id = $1;", + category_id ) .fetch_one(&self.pool) .await @@ -80,24 +94,24 @@ impl ViewRepository for InventoryDBPostgresAdapter { struct Context { version: i64, - view_id: String, + category_id: Uuid, } let ctx = sqlx::query_as!( Context, "SELECT - view_id, version + category_id, version FROM cqrs_inventory_category_query WHERE - view_id = $1;", - view_id + category_id = $1;", + category_id ) .fetch_one(&self.pool) .await .map_err(PostgresAggregateError::from)?; - let view_context = ViewContext::new(ctx.view_id, ctx.version); + let view_context = ViewContext::new(ctx.category_id.to_string(), ctx.version); Ok(Some((res, view_context))) } @@ -111,11 +125,10 @@ impl ViewRepository for InventoryDBPostgresAdapter { let version = context.version + 1; sqlx::query!( "INSERT INTO cqrs_inventory_category_query ( - view_id, version, name, description, category_id, store_id + version, name, description, category_id, store_id ) VALUES ( - $1, $2, $3, $4, $5, $6 + $1, $2, $3, $4, $5 );", - context.view_instance_id, version, view.name, view.description, @@ -132,13 +145,11 @@ impl ViewRepository for InventoryDBPostgresAdapter { "UPDATE cqrs_inventory_category_query SET - view_id = $1, - version = $2, - name = $3, - description = $4, - category_id = $5, - store_id = $6;", - context.view_instance_id, + version = $1, + name = $2, + description = $3, + category_id = $4, + store_id = $5;", version, view.name, view.description, diff --git a/src/inventory/adapters/output/db/postgres/mod.rs b/src/inventory/adapters/output/db/postgres/mod.rs index d9385e4..0edfba8 100644 --- a/src/inventory/adapters/output/db/postgres/mod.rs +++ b/src/inventory/adapters/output/db/postgres/mod.rs @@ -13,7 +13,9 @@ mod category_name_exists_for_store; mod category_view; mod errors; mod store_id_exists; +mod store_name_exists; mod store_view; +mod utils; #[derive(Clone)] pub struct InventoryDBPostgresAdapter { diff --git a/src/inventory/adapters/output/db/postgres/store_id_exists.rs b/src/inventory/adapters/output/db/postgres/store_id_exists.rs index 35f66ad..62f2034 100644 --- a/src/inventory/adapters/output/db/postgres/store_id_exists.rs +++ b/src/inventory/adapters/output/db/postgres/store_id_exists.rs @@ -58,9 +58,8 @@ mod tests { sqlx::query!( "INSERT INTO cqrs_inventory_store_query - (view_id, version, name, address, store_id, owner) - VALUES ($1, $2, $3, $4, $5, $6);", - "1", + (version, name, address, store_id, owner) + VALUES ($1, $2, $3, $4, $5);", 1, store.name(), store.address().as_ref().unwrap(), diff --git a/src/inventory/adapters/output/db/postgres/store_view.rs b/src/inventory/adapters/output/db/postgres/store_view.rs index 42fac61..3a8f030 100644 --- a/src/inventory/adapters/output/db/postgres/store_view.rs +++ b/src/inventory/adapters/output/db/postgres/store_view.rs @@ -3,7 +3,6 @@ // SPDX-License-Identifier: AGPL-3.0-or-later use async_trait::async_trait; -use cqrs_es::persist::GenericQuery; use cqrs_es::persist::{PersistenceError, ViewContext, ViewRepository}; use cqrs_es::{EventEnvelope, Query, View}; use serde::{Deserialize, Serialize}; @@ -14,6 +13,8 @@ use super::InventoryDBPostgresAdapter; use crate::inventory::domain::events::InventoryEvent; use crate::inventory::domain::store_aggregate::Store; +pub const NEW_STORE_NON_UUID: &str = "new_store_non_uuid-asdfa"; + // The view for a Store query, for a standard http application this should // be designed to reflect the response dto that will be returned to a user. #[derive(Debug, Default, Serialize, Deserialize)] @@ -43,7 +44,12 @@ impl View for StoreView { #[async_trait] impl ViewRepository for InventoryDBPostgresAdapter { - async fn load(&self, view_id: &str) -> Result, PersistenceError> { + async fn load(&self, store_id: &str) -> Result, PersistenceError> { + let store_id = match super::utils::parse_aggregate_id(store_id, NEW_STORE_NON_UUID)? { + Some((val, _)) => return Ok(Some(val)), + None => Uuid::parse_str(store_id).unwrap(), + }; + let res = sqlx::query_as!( StoreView, "SELECT @@ -51,8 +57,8 @@ impl ViewRepository for InventoryDBPostgresAdapter { FROM cqrs_inventory_store_query WHERE - view_id = $1;", - view_id + store_id = $1;", + store_id ) .fetch_one(&self.pool) .await @@ -62,8 +68,13 @@ impl ViewRepository for InventoryDBPostgresAdapter { async fn load_with_context( &self, - view_id: &str, + store_id: &str, ) -> Result, PersistenceError> { + let store_id = match super::utils::parse_aggregate_id(store_id, NEW_STORE_NON_UUID)? { + Some(val) => return Ok(Some(val)), + None => Uuid::parse_str(store_id).unwrap(), + }; + let res = sqlx::query_as!( StoreView, "SELECT @@ -71,8 +82,8 @@ impl ViewRepository for InventoryDBPostgresAdapter { FROM cqrs_inventory_store_query WHERE - view_id = $1;", - view_id + store_id = $1;", + &store_id, ) .fetch_one(&self.pool) .await @@ -80,24 +91,24 @@ impl ViewRepository for InventoryDBPostgresAdapter { struct Context { version: i64, - view_id: String, + store_id: Uuid, } let ctx = sqlx::query_as!( Context, "SELECT - view_id, version + store_id, version FROM cqrs_inventory_store_query WHERE - view_id = $1;", - view_id + store_id = $1;", + store_id ) .fetch_one(&self.pool) .await .map_err(PostgresAggregateError::from)?; - let view_context = ViewContext::new(ctx.view_id, ctx.version); + let view_context = ViewContext::new(ctx.store_id.to_string(), ctx.version); Ok(Some((res, view_context))) } @@ -111,11 +122,10 @@ impl ViewRepository for InventoryDBPostgresAdapter { let version = context.version + 1; sqlx::query!( "INSERT INTO cqrs_inventory_store_query ( - view_id, version, name, address, store_id, owner + version, name, address, store_id, owner ) VALUES ( - $1, $2, $3, $4, $5, $6 + $1, $2, $3, $4, $5 );", - context.view_instance_id, version, view.name, view.address, @@ -132,13 +142,11 @@ impl ViewRepository for InventoryDBPostgresAdapter { "UPDATE cqrs_inventory_store_query SET - view_id = $1, - version = $2, - name = $3, - address = $4, - store_id = $5, - owner = $6;", - context.view_instance_id, + version = $1, + name = $2, + address = $3, + store_id = $4, + owner = $5;", version, view.name, view.address, @@ -169,7 +177,99 @@ impl Query for SimpleLoggingQuery { } } +#[async_trait] +impl Query for InventoryDBPostgresAdapter { + async fn dispatch(&self, store_id: &str, events: &[EventEnvelope]) { + let res = self + .load_with_context(&store_id) + .await + .unwrap_or_else(|_| Some((StoreView::default(), ViewContext::new(store_id.into(), 0)))); + let (mut view, view_context): (StoreView, ViewContext) = res.unwrap(); + for event in events { + view.update(event); + } + self.update_view(view, view_context).await.unwrap(); + } +} + // Our second query, this one will be handled with Postgres `GenericQuery` // which will serialize and persist our view after it is updated. It also // provides a `load` method to deserialize the view on request. -pub type StoreQuery = GenericQuery; +//pub type StoreQuery = GenericQuery; +//pub type StoreQuery = Query; + +#[cfg(test)] +mod tests { + use super::*; + + use postgres_es::PostgresCqrs; + + use crate::{ + db::migrate::*, + inventory::{ + application::services::{ + add_category_service::tests::mock_add_category_service, + add_store_service::AddStoreServiceBuilder, InventoryServicesBuilder, + }, + domain::{ + add_category_command::AddCategoryCommand, add_store_command::AddStoreCommand, + commands::InventoryCommand, + }, + }, + tests::bdd::IS_NEVER_CALLED, + utils::{random_string::GenerateRandomStringInterface, uuid::tests::UUID}, + }; + use std::sync::Arc; + + #[actix_rt::test] + async fn pg_query() { + let settings = crate::settings::tests::get_settings().await; + //let settings = crate::settings::Settings::new().unwrap(); + settings.create_db().await; + + let db = crate::db::sqlx_postgres::Postgres::init(&settings.database.url).await; + db.migrate().await; + let db = InventoryDBPostgresAdapter::new(db.pool.clone()); + + let simple_query = SimpleLoggingQuery {}; + + let queries: Vec>> = + vec![Box::new(simple_query), Box::new(db.clone())]; + + let services = InventoryServicesBuilder::default() + .add_store(Arc::new( + AddStoreServiceBuilder::default() + .db_store_id_exists(Arc::new(db.clone())) + .db_store_name_exists(Arc::new(db.clone())) + .get_uuid(Arc::new(crate::utils::uuid::GenerateUUID {})) + .build() + .unwrap(), + )) + .add_category(mock_add_category_service( + IS_NEVER_CALLED, + AddCategoryCommand::new("foo".into(), None, UUID.clone(), "bar".into()).unwrap(), + )) + .build() + .unwrap(); + + let (cqrs, _store_query): ( + Arc>, + Arc>, + ) = ( + Arc::new(postgres_es::postgres_cqrs( + db.pool.clone(), + queries, + Arc::new(services), + )), + Arc::new(db.clone()), + ); + + let rand = crate::utils::random_string::GenerateRandomString {}; + let cmd = AddStoreCommand::new(rand.get_random(10), None, "me".into()).unwrap(); + cqrs.execute("", InventoryCommand::AddStore(cmd.clone())) + .await + .unwrap(); + + settings.drop_db().await; + } +} diff --git a/src/inventory/adapters/output/db/postgres/utils.rs b/src/inventory/adapters/output/db/postgres/utils.rs new file mode 100644 index 0000000..85c64dd --- /dev/null +++ b/src/inventory/adapters/output/db/postgres/utils.rs @@ -0,0 +1,27 @@ +// SPDX-FileCopyrightText: 2024 Aravinth Manivannan +// +// SPDX-License-Identifier: AGPL-3.0-or-later + +use cqrs_es::persist::{PersistenceError, ViewContext}; +use uuid::Uuid; + +pub fn parse_aggregate_id( + aggregate_id: &str, + non_id: &str, +) -> Result, PersistenceError> { + match Uuid::parse_str(aggregate_id) { + Ok(_) => return Ok(None), + Err(e) => { + if aggregate_id == non_id { + // if store_id is unbearable, then store isn't created yet. Use cleaner, robust method + // later. + return Ok(Some(( + T::default(), + ViewContext::new(aggregate_id.into(), 0), + ))); + } else { + return Err(PersistenceError::UnknownError(Box::new(e))); + } + } + }; +} diff --git a/src/inventory/application/services/add_category_service.rs b/src/inventory/application/services/add_category_service.rs index 11537ec..d9ccde0 100644 --- a/src/inventory/application/services/add_category_service.rs +++ b/src/inventory/application/services/add_category_service.rs @@ -1,6 +1,7 @@ // SPDX-FileCopyrightText: 2024 Aravinth Manivannan // // SPDX-License-Identifier: AGPL-3.0-or-later + use std::sync::Arc; use derive_builder::Builder;