From 6e0364c1ee4008d88b4661197e4b08016aff071e Mon Sep 17 00:00:00 2001 From: Aravinth Manivannan Date: Tue, 23 Jul 2024 20:32:54 +0530 Subject: [PATCH 1/5] feat: add Kot query table --- ...bc00c8291b5ca0624f6a8fcb1d701d6b383c5.json | 18 +++++++++ ...3e038af635766f0be61a86ca7084b34a2109d.json | 18 +++++++++ ...5fac6c716e1f575eb59d7a3a149d06760741b.json | 28 +++++++++++++ ...2a93b1c75304f8fad8922a6d62b5e06bd2490.json | 40 +++++++++++++++++++ ...e1e4d3a9b9a14c070846c8adb1ca0eaee1f37.json | 22 ++++++++++ ...20240723143757_cqrs_ordering_kot_query.sql | 16 ++++++++ 6 files changed, 142 insertions(+) create mode 100644 .sqlx/query-1804fe2c946337fb2d10683b273bc00c8291b5ca0624f6a8fcb1d701d6b383c5.json create mode 100644 .sqlx/query-4b7100e5c7442066dbb4ea7c9733e038af635766f0be61a86ca7084b34a2109d.json create mode 100644 .sqlx/query-5db3c3d1ff503d25d3f8fc529a95fac6c716e1f575eb59d7a3a149d06760741b.json create mode 100644 .sqlx/query-ad00213731142dc3853945e02662a93b1c75304f8fad8922a6d62b5e06bd2490.json create mode 100644 .sqlx/query-ff85b5343820fb3c30eb5e1efbfe1e4d3a9b9a14c070846c8adb1ca0eaee1f37.json create mode 100644 migrations/20240723143757_cqrs_ordering_kot_query.sql diff --git a/.sqlx/query-1804fe2c946337fb2d10683b273bc00c8291b5ca0624f6a8fcb1d701d6b383c5.json b/.sqlx/query-1804fe2c946337fb2d10683b273bc00c8291b5ca0624f6a8fcb1d701d6b383c5.json new file mode 100644 index 0000000..03b2769 --- /dev/null +++ b/.sqlx/query-1804fe2c946337fb2d10683b273bc00c8291b5ca0624f6a8fcb1d701d6b383c5.json @@ -0,0 +1,18 @@ +{ + "db_name": "PostgreSQL", + "query": "INSERT INTO cqrs_ordering_kot_query (\n version,\n order_id,\n kot_id,\n created_time,\n deleted\n ) VALUES (\n $1, $2, $3, $4, $5\n );", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int8", + "Uuid", + "Uuid", + "Timestamptz", + "Bool" + ] + }, + "nullable": [] + }, + "hash": "1804fe2c946337fb2d10683b273bc00c8291b5ca0624f6a8fcb1d701d6b383c5" +} diff --git a/.sqlx/query-4b7100e5c7442066dbb4ea7c9733e038af635766f0be61a86ca7084b34a2109d.json b/.sqlx/query-4b7100e5c7442066dbb4ea7c9733e038af635766f0be61a86ca7084b34a2109d.json new file mode 100644 index 0000000..2aeaf77 --- /dev/null +++ b/.sqlx/query-4b7100e5c7442066dbb4ea7c9733e038af635766f0be61a86ca7084b34a2109d.json @@ -0,0 +1,18 @@ +{ + "db_name": "PostgreSQL", + "query": "UPDATE\n cqrs_ordering_kot_query\n SET\n version = $1,\n order_id = $2,\n kot_id = $3,\n created_time = $4,\n deleted = $5;", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int8", + "Uuid", + "Uuid", + "Timestamptz", + "Bool" + ] + }, + "nullable": [] + }, + "hash": "4b7100e5c7442066dbb4ea7c9733e038af635766f0be61a86ca7084b34a2109d" +} diff --git a/.sqlx/query-5db3c3d1ff503d25d3f8fc529a95fac6c716e1f575eb59d7a3a149d06760741b.json b/.sqlx/query-5db3c3d1ff503d25d3f8fc529a95fac6c716e1f575eb59d7a3a149d06760741b.json new file mode 100644 index 0000000..2329729 --- /dev/null +++ b/.sqlx/query-5db3c3d1ff503d25d3f8fc529a95fac6c716e1f575eb59d7a3a149d06760741b.json @@ -0,0 +1,28 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT \n kot_id, version\n FROM\n cqrs_ordering_kot_query\n WHERE\n kot_id = $1;", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "kot_id", + "type_info": "Uuid" + }, + { + "ordinal": 1, + "name": "version", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + "Uuid" + ] + }, + "nullable": [ + false, + false + ] + }, + "hash": "5db3c3d1ff503d25d3f8fc529a95fac6c716e1f575eb59d7a3a149d06760741b" +} diff --git a/.sqlx/query-ad00213731142dc3853945e02662a93b1c75304f8fad8922a6d62b5e06bd2490.json b/.sqlx/query-ad00213731142dc3853945e02662a93b1c75304f8fad8922a6d62b5e06bd2490.json new file mode 100644 index 0000000..667e3a7 --- /dev/null +++ b/.sqlx/query-ad00213731142dc3853945e02662a93b1c75304f8fad8922a6d62b5e06bd2490.json @@ -0,0 +1,40 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT \n order_id,\n kot_id,\n created_time,\n deleted\n FROM\n cqrs_ordering_kot_query\n WHERE\n kot_id = $1;", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "order_id", + "type_info": "Uuid" + }, + { + "ordinal": 1, + "name": "kot_id", + "type_info": "Uuid" + }, + { + "ordinal": 2, + "name": "created_time", + "type_info": "Timestamptz" + }, + { + "ordinal": 3, + "name": "deleted", + "type_info": "Bool" + } + ], + "parameters": { + "Left": [ + "Uuid" + ] + }, + "nullable": [ + false, + false, + false, + false + ] + }, + "hash": "ad00213731142dc3853945e02662a93b1c75304f8fad8922a6d62b5e06bd2490" +} diff --git a/.sqlx/query-ff85b5343820fb3c30eb5e1efbfe1e4d3a9b9a14c070846c8adb1ca0eaee1f37.json b/.sqlx/query-ff85b5343820fb3c30eb5e1efbfe1e4d3a9b9a14c070846c8adb1ca0eaee1f37.json new file mode 100644 index 0000000..2143758 --- /dev/null +++ b/.sqlx/query-ff85b5343820fb3c30eb5e1efbfe1e4d3a9b9a14c070846c8adb1ca0eaee1f37.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT EXISTS (\n SELECT 1\n FROM cqrs_ordering_kot_query\n WHERE\n kot_id = $1\n );", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "exists", + "type_info": "Bool" + } + ], + "parameters": { + "Left": [ + "Uuid" + ] + }, + "nullable": [ + null + ] + }, + "hash": "ff85b5343820fb3c30eb5e1efbfe1e4d3a9b9a14c070846c8adb1ca0eaee1f37" +} diff --git a/migrations/20240723143757_cqrs_ordering_kot_query.sql b/migrations/20240723143757_cqrs_ordering_kot_query.sql new file mode 100644 index 0000000..31186c1 --- /dev/null +++ b/migrations/20240723143757_cqrs_ordering_kot_query.sql @@ -0,0 +1,16 @@ +-- SPDX-FileCopyrightText: 2024 Aravinth Manivannan +-- +-- SPDX-License-Identifier: AGPL-3.0-or-later + +CREATE TABLE IF NOT EXISTS cqrs_ordering_kot_query +( + version bigint CHECK (version >= 0) NOT NULL, + + created_time timestamp with time zone DEFAULT (CURRENT_TIMESTAMP) NOT NULL, + order_id UUID NOT NULL, + kot_id UUID NOT NULL UNIQUE, + + deleted BOOLEAN NOT NULL DEFAULT FALSE, + + PRIMARY KEY (kot_id) +); From 13414d34fc204a98300aaca4187a22232d9bf83d Mon Sep 17 00:00:00 2001 From: Aravinth Manivannan Date: Tue, 23 Jul 2024 20:33:52 +0530 Subject: [PATCH 2/5] feat: db port to check duplicate Kot ID --- src/ordering/adapters/output/db/errors.rs | 4 +- .../adapters/output/db/kot_id_exists.rs | 83 +++++++++++++++++++ .../application/port/output/db/errors.rs | 2 + .../port/output/db/kot_id_exists.rs | 53 ++++++++++++ .../application/port/output/db/mod.rs | 1 + src/ordering/application/services/errors.rs | 7 ++ 6 files changed, 149 insertions(+), 1 deletion(-) create mode 100644 src/ordering/adapters/output/db/kot_id_exists.rs create mode 100644 src/ordering/application/port/output/db/kot_id_exists.rs diff --git a/src/ordering/adapters/output/db/errors.rs b/src/ordering/adapters/output/db/errors.rs index 0c45084..f03a439 100644 --- a/src/ordering/adapters/output/db/errors.rs +++ b/src/ordering/adapters/output/db/errors.rs @@ -22,8 +22,10 @@ impl From for OrderingDBError { if msg.contains("cqrs_ordering_store_query_line_item_id_key") { return Self::DuplicateLineItemID; - } else if msg.contains("cqrs_ordering_store_query_order_id_key") { + } else if msg.contains("cqrs_ordering_order_order_id_query") { return Self::DuplicateOrderID; + } else if msg.contains("cqrs_ordering_kot_kot_id_query") { + return Self::DuplicateKotID; } else { println!("{msg}"); } diff --git a/src/ordering/adapters/output/db/kot_id_exists.rs b/src/ordering/adapters/output/db/kot_id_exists.rs new file mode 100644 index 0000000..5d3486f --- /dev/null +++ b/src/ordering/adapters/output/db/kot_id_exists.rs @@ -0,0 +1,83 @@ +// SPDX-FileCopyrightText: 2024 Aravinth Manivannan +// +// SPDX-License-Identifier: AGPL-3.0-or-later + +use uuid::Uuid; + +use super::OrderingDBPostgresAdapter; +use crate::ordering::application::port::output::db::{errors::*, kot_id_exists::*}; + +#[async_trait::async_trait] +impl KotIDExistsDBPort for OrderingDBPostgresAdapter { + async fn kot_id_exists(&self, kot_id: &Uuid) -> OrderingDBResult { + let res = sqlx::query!( + "SELECT EXISTS ( + SELECT 1 + FROM cqrs_ordering_kot_query + WHERE + kot_id = $1 + );", + kot_id + ) + .fetch_one(&self.pool) + .await?; + if let Some(x) = res.exists { + Ok(x) + } else { + Ok(false) + } + } +} + +#[cfg(test)] +pub mod tests { + + use super::*; + // use crate::ordering::domain::add_product_command::tests::get_customizations; + use crate::ordering::domain::kot_aggregate::*; + + async fn create_dummy_kot(kot: &Kot, db: &OrderingDBPostgresAdapter) { + sqlx::query!( + "INSERT INTO cqrs_ordering_kot_query ( + version, + order_id, + kot_id, + created_time, + deleted + ) VALUES ( + $1, $2, $3, $4, $5 + );", + 1, + kot.order_id(), + kot.kot_id(), + kot.created_time(), + kot.deleted().clone(), + ) + .execute(&db.pool) + .await + .unwrap(); + } + + #[actix_rt::test] + async fn test_postgres_product_exists() { + let settings = crate::settings::tests::get_settings().await; + settings.create_db().await; + let db = super::OrderingDBPostgresAdapter::new( + sqlx::postgres::PgPool::connect(&settings.database.url) + .await + .unwrap(), + ); + + let kot = Kot::default(); + + // state doesn't exist + assert!(!db.kot_id_exists(kot.kot_id()).await.unwrap()); + + create_dummy_kot(&kot, &db).await; + + // state exists + assert!(db.kot_id_exists(kot.kot_id()).await.unwrap()); + + settings.drop_db().await; + } +} diff --git a/src/ordering/application/port/output/db/errors.rs b/src/ordering/application/port/output/db/errors.rs index 43f5eb1..3588c7c 100644 --- a/src/ordering/application/port/output/db/errors.rs +++ b/src/ordering/application/port/output/db/errors.rs @@ -14,4 +14,6 @@ pub enum OrderingDBError { InternalError, DuplicateOrderID, OrderIDNotFound, + DuplicateKotID, + KotIDNotFound, } diff --git a/src/ordering/application/port/output/db/kot_id_exists.rs b/src/ordering/application/port/output/db/kot_id_exists.rs new file mode 100644 index 0000000..628dabb --- /dev/null +++ b/src/ordering/application/port/output/db/kot_id_exists.rs @@ -0,0 +1,53 @@ +// SPDX-FileCopyrightText: 2024 Aravinth Manivannan +// +// SPDX-License-Identifier: AGPL-3.0-or-later + +use mockall::predicate::*; +use mockall::*; +use uuid::Uuid; + +use super::errors::*; +#[cfg(test)] +#[allow(unused_imports)] +pub use tests::*; + +#[automock] +#[async_trait::async_trait] +pub trait KotIDExistsDBPort: Send + Sync { + async fn kot_id_exists(&self, kot_id: &Uuid) -> OrderingDBResult; +} + +pub type KotIDExistsDBPortObj = std::sync::Arc; + +#[cfg(test)] +pub mod tests { + use super::*; + + use std::sync::Arc; + + pub fn mock_kot_id_exists_db_port_false(times: Option) -> KotIDExistsDBPortObj { + let mut m = MockKotIDExistsDBPort::new(); + if let Some(times) = times { + m.expect_kot_id_exists() + .times(times) + .returning(|_| Ok(false)); + } else { + m.expect_kot_id_exists().returning(|_| Ok(false)); + } + + Arc::new(m) + } + + pub fn mock_kot_id_exists_db_port_true(times: Option) -> KotIDExistsDBPortObj { + let mut m = MockKotIDExistsDBPort::new(); + if let Some(times) = times { + m.expect_kot_id_exists() + .times(times) + .returning(|_| Ok(true)); + } else { + m.expect_kot_id_exists().returning(|_| Ok(true)); + } + + Arc::new(m) + } +} diff --git a/src/ordering/application/port/output/db/mod.rs b/src/ordering/application/port/output/db/mod.rs index 2db807b..14fe72e 100644 --- a/src/ordering/application/port/output/db/mod.rs +++ b/src/ordering/application/port/output/db/mod.rs @@ -3,5 +3,6 @@ // SPDX-License-Identifier: AGPL-3.0-or-later pub mod errors; +pub mod kot_id_exists; pub mod line_item_id_exists; pub mod order_id_exists; diff --git a/src/ordering/application/services/errors.rs b/src/ordering/application/services/errors.rs index de37b9e..f27978a 100644 --- a/src/ordering/application/services/errors.rs +++ b/src/ordering/application/services/errors.rs @@ -15,6 +15,7 @@ pub enum OrderingError { LineItemIDNotFound, InternalError, OrderIDNotFound, + KotIDNotFound, } // impl From for OrderingError { @@ -30,6 +31,12 @@ impl From for OrderingError { Self::InternalError } OrderingDBError::OrderIDNotFound => OrderingError::OrderIDNotFound, + + OrderingDBError::DuplicateKotID => { + error!("DuplicateKotID"); + Self::InternalError + } + OrderingDBError::KotIDNotFound => OrderingError::KotIDNotFound, OrderingDBError::InternalError => Self::InternalError, } } From ed66a074edd669f79bb9281a0ae9d846d2b71cec Mon Sep 17 00:00:00 2001 From: Aravinth Manivannan Date: Tue, 23 Jul 2024 20:34:04 +0530 Subject: [PATCH 3/5] feat: define Kot aggregate --- src/ordering/domain/kot_aggregate.rs | 168 ++++++++++++++++++++++++++- 1 file changed, 166 insertions(+), 2 deletions(-) diff --git a/src/ordering/domain/kot_aggregate.rs b/src/ordering/domain/kot_aggregate.rs index 4276087..16004cf 100644 --- a/src/ordering/domain/kot_aggregate.rs +++ b/src/ordering/domain/kot_aggregate.rs @@ -10,7 +10,10 @@ use serde::{Deserialize, Serialize}; use time::OffsetDateTime; use uuid::Uuid; -use super::line_item_aggregate::*; +use crate::ordering::{ + application::services::{errors::*, *}, + domain::{commands::*, events::*}, +}; #[derive( Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd, Builder, Getters, @@ -18,9 +21,170 @@ use super::line_item_aggregate::*; pub struct Kot { #[builder(default = "OffsetDateTime::now_utc()")] created_time: OffsetDateTime, - line_items: Vec, + // kots: Vec, kot_id: Uuid, order_id: Uuid, #[builder(default = "false")] deleted: bool, } + +impl Default for Kot { + fn default() -> Self { + Self { + created_time: OffsetDateTime::now_utc(), + order_id: Default::default(), + kot_id: Default::default(), + deleted: false, + } + } +} + +#[cfg(test)] +mod tests { + use crate::{ordering::domain::add_kot_command::AddKotCommand, utils::uuid::tests::UUID}; + + use super::*; + + impl Kot { + pub fn get_kot() -> Self { + let cmd = AddKotCommand::get_cmd(); + + KotBuilder::default() + .created_time(cmd.created_time().clone()) + .order_id(*cmd.order_id()) + .kot_id(UUID) + .build() + .unwrap() + } + } +} + +#[async_trait] +impl Aggregate for Kot { + type Command = OrderingCommand; + type Event = OrderingEvent; + type Error = OrderingError; + type Services = std::sync::Arc; + + // This identifier should be unique to the system. + fn aggregate_type() -> String { + "ordering.kot".to_string() + } + + // The aggregate logic goes here. Note that this will be the _bulk_ of a CQRS system + // so expect to use helper functions elsewhere to keep the code clean. + async fn handle( + &self, + command: Self::Command, + services: &Self::Services, + ) -> Result, Self::Error> { + match command { + OrderingCommand::AddKot(cmd) => { + let res = services.add_kot().add_kot(cmd).await?; + Ok(vec![OrderingEvent::KotAdded(res)]) + } + //OrderingCommand::UpdateKot(cmd) => { + // let res = services.update_kot().update_kot(cmd).await?; + // Ok(vec![OrderingEvent::KotUpdated(res)]) + //} + //OrderingCommand::DeleteKot(cmd) => { + // let res = services.delete_kot().delete_kot(cmd).await?; + // Ok(vec![OrderingEvent::KotDeleted(res)]) + //} + _ => Ok(Vec::default()), + } + } + + fn apply(&mut self, event: Self::Event) { + match event { + OrderingEvent::KotAdded(e) => *self = e.kot().clone(), + //OrderingEvent::KotUpdated(e) => *self = e.new_kot().clone(), + //OrderingEvent::KotDeleted(e) => *self = e.kot().clone(), + _ => (), + } + } +} + +#[cfg(test)] +mod aggregate_tests { + use std::sync::Arc; + + use add_kot_service::tests::mock_add_kot_service; + use cqrs_es::test::TestFramework; + // use delete_kot_service::tests::mock_delete_kot_service; + // use update_kot_service::tests::mock_update_kot_service; + + use super::*; + + // use crate::ordering::domain::delete_kot_command::DeleteKotCommand; + // use crate::ordering::domain::kot_deleted_event::tests::get_deleted_kot_event_from_command; + // use crate::ordering::domain::kot_updated_event::tests::get_updated_kot_event_from_command; + // use crate::ordering::domain::update_kot_command::UpdateKotCommand; + use crate::tests::bdd::*; + + use crate::ordering::domain::{ + add_kot_command::*, kot_added_event::tests::get_added_kot_event_from_command, + }; + + type KotTestFramework = TestFramework; + + #[test] + fn test_add_kot() { + let cmd = AddKotCommand::get_cmd(); + let expected = get_added_kot_event_from_command(&cmd); + let expected = OrderingEvent::KotAdded(expected); + + let mut services = MockOrderingServicesInterface::new(); + services + .expect_add_kot() + .times(IS_CALLED_ONLY_ONCE.unwrap()) + .return_const(mock_add_kot_service(IS_CALLED_ONLY_ONCE, cmd.clone())); + + KotTestFramework::with(Arc::new(services)) + .given_no_previous_events() + .when(OrderingCommand::AddKot(cmd)) + .then_expect_events(vec![expected]); + } + + // #[test] + // fn test_update_kot() { + // let cmd = UpdateKotCommand::get_cmd(); + // let expected = get_updated_kot_event_from_command(&cmd); + // let expected = OrderingEvent::KotUpdated(expected); + // + // let mut services = MockOrderingServicesInterface::new(); + // services + // .expect_update_kot() + // .times(IS_CALLED_ONLY_ONCE.unwrap()) + // .return_const(mock_update_kot_service( + // IS_CALLED_ONLY_ONCE, + // cmd.clone(), + // )); + // + // KotTestFramework::with(Arc::new(services)) + // .given_no_previous_events() + // .when(OrderingCommand::UpdateKot(cmd)) + // .then_expect_events(vec![expected]); + // } + // + // #[test] + // fn test_delete_kot() { + // let cmd = DeleteKotCommand::get_cmd(); + // let expected = get_deleted_kot_event_from_command(&cmd); + // let expected = OrderingEvent::KotDeleted(expected); + // + // let mut services = MockOrderingServicesInterface::new(); + // services + // .expect_delete_kot() + // .times(IS_CALLED_ONLY_ONCE.unwrap()) + // .return_const(mock_delete_kot_service( + // IS_CALLED_ONLY_ONCE, + // cmd.clone(), + // )); + // + // KotTestFramework::with(Arc::new(services)) + // .given_no_previous_events() + // .when(OrderingCommand::DeleteKot(cmd)) + // .then_expect_events(vec![expected]); + // } +} From ddd0716380729d6c16281317e2fe5d8e45d1c2c9 Mon Sep 17 00:00:00 2001 From: Aravinth Manivannan Date: Tue, 23 Jul 2024 20:34:27 +0530 Subject: [PATCH 4/5] feat: impl View for Kot --- src/ordering/adapters/output/db/kot_view.rs | 225 ++++++++++++++++++++ src/ordering/adapters/output/db/mod.rs | 2 + 2 files changed, 227 insertions(+) create mode 100644 src/ordering/adapters/output/db/kot_view.rs diff --git a/src/ordering/adapters/output/db/kot_view.rs b/src/ordering/adapters/output/db/kot_view.rs new file mode 100644 index 0000000..5bf5621 --- /dev/null +++ b/src/ordering/adapters/output/db/kot_view.rs @@ -0,0 +1,225 @@ +// SPDX-FileCopyrightText: 2024 Aravinth Manivannan +// +// SPDX-License-Identifier: AGPL-3.0-or-later + +use std::str::FromStr; + +use async_trait::async_trait; +use cqrs_es::persist::{PersistenceError, ViewContext, ViewRepository}; +use cqrs_es::{EventEnvelope, Query, View}; +use serde::{Deserialize, Serialize}; +use time::OffsetDateTime; +use uuid::Uuid; + +use super::errors::*; +use super::OrderingDBPostgresAdapter; +use crate::ordering::domain::events::OrderingEvent; +use crate::ordering::domain::kot_aggregate::*; +use crate::types::quantity::*; +use crate::utils::parse_aggregate_id::parse_aggregate_id; + +pub const NEW_KOT_NON_UUID: &str = "new_kot_non_uuid-asdfa"; + +// The view for a Kot query, for a standard http application this should +// be designed to reflect the response dto that will be returned to a user. +#[derive(Debug, Serialize, Deserialize)] +pub struct KotView { + order_id: Uuid, + created_time: OffsetDateTime, + + kot_id: Uuid, + + deleted: bool, +} + +impl Default for KotView { + fn default() -> Self { + Self { + created_time: OffsetDateTime::now_utc(), + order_id: Default::default(), + + kot_id: Default::default(), + + deleted: false, + } + } +} + +impl From for Kot { + fn from(v: KotView) -> Self { + KotBuilder::default() + .kot_id(v.kot_id) + .created_time(v.created_time) + .order_id(v.order_id) + .deleted(v.deleted) + .build() + .unwrap() + } +} + +// This updates the view with events as they are committed. +// The logic should be minimal here, e.g., don't calculate the account balance, +// design the events to carry the balance information instead. +impl View for KotView { + fn update(&mut self, event: &EventEnvelope) { + match &event.payload { + OrderingEvent::KotAdded(val) => { + self.order_id = *val.kot().order_id(); + self.kot_id = *val.kot().kot_id(); + + self.created_time = val.kot().created_time().clone(); + + self.deleted = false; + } + _ => (), + } + } +} + +#[async_trait] +impl ViewRepository for OrderingDBPostgresAdapter { + async fn load(&self, kot_id: &str) -> Result, PersistenceError> { + let kot_id = match parse_aggregate_id(kot_id, NEW_KOT_NON_UUID)? { + Some((val, _)) => return Ok(Some(val)), + None => Uuid::parse_str(kot_id).unwrap(), + }; + + let res = sqlx::query_as!( + KotView, + "SELECT + order_id, + kot_id, + created_time, + deleted + FROM + cqrs_ordering_kot_query + WHERE + kot_id = $1;", + kot_id + ) + .fetch_one(&self.pool) + .await + .map_err(PostgresAggregateError::from)?; + + Ok(Some(res)) + } + + async fn load_with_context( + &self, + kot_id: &str, + ) -> Result, PersistenceError> { + let kot_id = match parse_aggregate_id(kot_id, NEW_KOT_NON_UUID)? { + Some(val) => return Ok(Some(val)), + None => Uuid::parse_str(kot_id).unwrap(), + }; + + let res = sqlx::query_as!( + KotView, + "SELECT + order_id, + kot_id, + created_time, + deleted + FROM + cqrs_ordering_kot_query + WHERE + kot_id = $1;", + kot_id + ) + .fetch_one(&self.pool) + .await + .map_err(PostgresAggregateError::from)?; + + struct Context { + version: i64, + kot_id: Uuid, + } + + let ctx = sqlx::query_as!( + Context, + "SELECT + kot_id, version + FROM + cqrs_ordering_kot_query + WHERE + kot_id = $1;", + kot_id + ) + .fetch_one(&self.pool) + .await + .map_err(PostgresAggregateError::from)?; + + let view_context = ViewContext::new(ctx.kot_id.to_string(), ctx.version); + Ok(Some((res, view_context))) + } + + async fn update_view( + &self, + view: KotView, + context: ViewContext, + ) -> Result<(), PersistenceError> { + match context.version { + 0 => { + let version = context.version + 1; + sqlx::query!( + "INSERT INTO cqrs_ordering_kot_query ( + version, + order_id, + kot_id, + created_time, + deleted + ) VALUES ( + $1, $2, $3, $4, $5 + );", + version, + view.order_id, + view.kot_id, + view.created_time, + view.deleted, + ) + .execute(&self.pool) + .await + .map_err(PostgresAggregateError::from)?; + } + + _ => { + let version = context.version + 1; + sqlx::query!( + "UPDATE + cqrs_ordering_kot_query + SET + version = $1, + order_id = $2, + kot_id = $3, + created_time = $4, + deleted = $5;", + version, + view.order_id, + view.kot_id, + view.created_time, + view.deleted, + ) + .execute(&self.pool) + .await + .map_err(PostgresAggregateError::from)?; + } + } + + Ok(()) + } +} + +#[async_trait] +impl Query for OrderingDBPostgresAdapter { + async fn dispatch(&self, kot_id: &str, events: &[EventEnvelope]) { + let res = self + .load_with_context(kot_id) + .await + .unwrap_or_else(|_| Some((KotView::default(), ViewContext::new(kot_id.into(), 0)))); + let (mut view, view_context): (KotView, ViewContext) = res.unwrap(); + for event in events { + view.update(event); + } + self.update_view(view, view_context).await.unwrap(); + } +} diff --git a/src/ordering/adapters/output/db/mod.rs b/src/ordering/adapters/output/db/mod.rs index dca55ef..2b87819 100644 --- a/src/ordering/adapters/output/db/mod.rs +++ b/src/ordering/adapters/output/db/mod.rs @@ -9,6 +9,8 @@ use sqlx::postgres::PgPool; use crate::db::{migrate::RunMigrations, sqlx_postgres::Postgres}; mod errors; +mod kot_id_exists; +mod kot_view; mod line_item_id_exists; mod line_item_view; mod order_id_exists; From 6115a9adde60ac23df7c18ca624c2d44e4c1ea03 Mon Sep 17 00:00:00 2001 From: Aravinth Manivannan Date: Tue, 23 Jul 2024 20:34:41 +0530 Subject: [PATCH 5/5] feat: kot add service --- .../application/services/add_kot_service.rs | 129 ++++++++++++++++++ src/ordering/application/services/mod.rs | 7 + src/ordering/domain/add_kot_command.rs | 58 ++++++++ src/ordering/domain/commands.rs | 8 +- src/ordering/domain/events.rs | 9 +- src/ordering/domain/kot_added_event.rs | 41 ++++++ src/ordering/domain/mod.rs | 2 + 7 files changed, 248 insertions(+), 6 deletions(-) create mode 100644 src/ordering/application/services/add_kot_service.rs create mode 100644 src/ordering/domain/add_kot_command.rs create mode 100644 src/ordering/domain/kot_added_event.rs diff --git a/src/ordering/application/services/add_kot_service.rs b/src/ordering/application/services/add_kot_service.rs new file mode 100644 index 0000000..f5bb481 --- /dev/null +++ b/src/ordering/application/services/add_kot_service.rs @@ -0,0 +1,129 @@ +// SPDX-FileCopyrightText: 2024 Aravinth Manivannan +// +// SPDX-License-Identifier: AGPL-3.0-or-later + +use std::sync::Arc; + +use derive_builder::Builder; +use mockall::predicate::*; +use mockall::*; +use time::OffsetDateTime; + +use super::errors::*; +use crate::ordering::{ + application::port::output::db::kot_id_exists::*, + application::port::output::db::order_id_exists::*, + domain::{add_kot_command::*, kot_added_event::*, kot_aggregate::*}, +}; +use crate::utils::uuid::*; + +#[automock] +#[async_trait::async_trait] +pub trait AddKotUseCase: Send + Sync { + async fn add_kot(&self, cmd: AddKotCommand) -> OrderingResult; +} + +pub type AddKotServiceObj = Arc; + +#[derive(Clone, Builder)] +pub struct AddKotService { + db_kot_id_exists: KotIDExistsDBPortObj, + db_order_id_exists: OrderIDExistsDBPortObj, + get_uuid: GetUUIDInterfaceObj, +} + +#[async_trait::async_trait] +impl AddKotUseCase for AddKotService { + async fn add_kot(&self, cmd: AddKotCommand) -> OrderingResult { + if !self + .db_order_id_exists + .order_id_exists(cmd.order_id()) + .await? + { + return Err(OrderingError::OrderIDNotFound); + } + + let mut kot_id = self.get_uuid.get_uuid(); + + loop { + if self.db_kot_id_exists.kot_id_exists(&kot_id).await? { + kot_id = self.get_uuid.get_uuid(); + continue; + } else { + break; + } + } + + let kot = KotBuilder::default() + .created_time(OffsetDateTime::now_utc()) + .order_id(*cmd.order_id()) + .kot_id(kot_id) + .deleted(false) + .build() + .unwrap(); + + Ok(KotAddedEventBuilder::default() + .added_by_user(*cmd.adding_by()) + .kot(kot) + .build() + .unwrap()) + } +} + +#[cfg(test)] +pub mod tests { + use super::*; + + use crate::ordering::domain::kot_added_event::tests::get_added_kot_event_from_command; + use crate::utils::uuid::tests::UUID; + use crate::{tests::bdd::*, utils::uuid::tests::mock_get_uuid}; + + pub fn mock_add_kot_service(times: Option, cmd: AddKotCommand) -> AddKotServiceObj { + let mut m = MockAddKotUseCase::new(); + + let res = get_added_kot_event_from_command(&cmd); + if let Some(times) = times { + m.expect_add_kot() + .times(times) + .returning(move |_| Ok(res.clone())); + } else { + m.expect_add_kot().returning(move |_| Ok(res.clone())); + } + + Arc::new(m) + } + + #[actix_rt::test] + async fn test_service() { + let cmd = AddKotCommand::get_cmd(); + + let s = AddKotServiceBuilder::default() + .db_kot_id_exists(mock_kot_id_exists_db_port_false(IS_CALLED_ONLY_ONCE)) + .db_order_id_exists(mock_order_id_exists_db_port_true(IS_CALLED_ONLY_ONCE)) + .get_uuid(mock_get_uuid(IS_CALLED_ONLY_ONCE)) + .build() + .unwrap(); + + let res = s.add_kot(cmd.clone()).await.unwrap(); + assert_eq!(res.kot().order_id(), cmd.order_id()); + assert!(!res.kot().deleted()); + assert_eq!(res.added_by_user(), cmd.adding_by()); + } + + #[actix_rt::test] + async fn test_service_order_id_doesnt_exist() { + let cmd = AddKotCommand::get_cmd(); + + let s = AddKotServiceBuilder::default() + .db_kot_id_exists(mock_kot_id_exists_db_port_false(IS_NEVER_CALLED)) + .db_order_id_exists(mock_order_id_exists_db_port_false(IS_CALLED_ONLY_ONCE)) + .get_uuid(mock_get_uuid(IS_NEVER_CALLED)) + .build() + .unwrap(); + + assert_eq!( + s.add_kot(cmd.clone()).await, + Err(OrderingError::OrderIDNotFound) + ); + } +} diff --git a/src/ordering/application/services/mod.rs b/src/ordering/application/services/mod.rs index ce866b3..8765f66 100644 --- a/src/ordering/application/services/mod.rs +++ b/src/ordering/application/services/mod.rs @@ -9,6 +9,7 @@ use mockall::*; pub mod errors; //services +pub mod add_kot_service; pub mod add_line_item_service; pub mod add_order_service; pub mod delete_line_item_service; @@ -24,6 +25,7 @@ pub trait OrderingServicesInterface: Send + Sync { fn add_order(&self) -> add_order_service::AddOrderServiceObj; fn update_order(&self) -> update_order_service::UpdateOrderServiceObj; fn delete_order(&self) -> delete_order_service::DeleteOrderServiceObj; + fn add_kot(&self) -> add_kot_service::AddKotServiceObj; } #[derive(Clone, Builder)] @@ -34,6 +36,7 @@ pub struct OrderingServices { add_order: add_order_service::AddOrderServiceObj, update_order: update_order_service::UpdateOrderServiceObj, delete_order: delete_order_service::DeleteOrderServiceObj, + add_kot: add_kot_service::AddKotServiceObj, } impl OrderingServicesInterface for OrderingServices { @@ -59,4 +62,8 @@ impl OrderingServicesInterface for OrderingServices { fn delete_order(&self) -> delete_order_service::DeleteOrderServiceObj { self.delete_order.clone() } + + fn add_kot(&self) -> add_kot_service::AddKotServiceObj { + self.add_kot.clone() + } } diff --git a/src/ordering/domain/add_kot_command.rs b/src/ordering/domain/add_kot_command.rs new file mode 100644 index 0000000..597330a --- /dev/null +++ b/src/ordering/domain/add_kot_command.rs @@ -0,0 +1,58 @@ +// SPDX-FileCopyrightText: 2024 Aravinth Manivannan +// +// SPDX-License-Identifier: AGPL-3.0-or-later + +use derive_builder::Builder; +use derive_getters::Getters; +use serde::{Deserialize, Serialize}; +use time::OffsetDateTime; +use uuid::Uuid; + +#[derive( + Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd, Getters, Builder, +)] +pub struct AddKotCommand { + adding_by: Uuid, + + #[builder(default = "OffsetDateTime::now_utc()")] + created_time: OffsetDateTime, + order_id: Uuid, +} + +#[cfg(test)] +mod tests { + use time::macros::datetime; + + use crate::utils::uuid::tests::UUID; + + use super::*; + + impl AddKotCommand { + pub fn get_cmd() -> Self { + let order_id = UUID; + let adding_by = UUID; + + AddKotCommandBuilder::default() + .adding_by(adding_by) + .created_time(datetime!(1970-01-01 0:00 UTC)) + .order_id(order_id) + .build() + .unwrap() + } + } + + #[test] + fn test_cmd() { + let order_id = UUID; + let adding_by = UUID; + + let cmd = AddKotCommandBuilder::default() + .adding_by(adding_by) + .order_id(order_id) + .build() + .unwrap(); + + assert_eq!(*cmd.order_id(), order_id); + assert_eq!(*cmd.adding_by(), adding_by); + } +} diff --git a/src/ordering/domain/commands.rs b/src/ordering/domain/commands.rs index 98d5738..409be83 100644 --- a/src/ordering/domain/commands.rs +++ b/src/ordering/domain/commands.rs @@ -6,9 +6,10 @@ use mockall::predicate::*; use serde::{Deserialize, Serialize}; use super::{ - add_line_item_command::AddLineItemCommand, add_order_command::AddOrderCommand, - delete_line_item_command::DeleteLineItemCommand, delete_order_command::DeleteOrderCommand, - update_line_item_command::UpdateLineItemCommand, update_order_command::UpdateOrderCommand, + add_kot_command::AddKotCommand, add_line_item_command::AddLineItemCommand, + add_order_command::AddOrderCommand, delete_line_item_command::DeleteLineItemCommand, + delete_order_command::DeleteOrderCommand, update_line_item_command::UpdateLineItemCommand, + update_order_command::UpdateOrderCommand, }; #[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)] @@ -19,4 +20,5 @@ pub enum OrderingCommand { AddOrder(AddOrderCommand), UpdateOrder(UpdateOrderCommand), DeleteOrder(DeleteOrderCommand), + AddKot(AddKotCommand), } diff --git a/src/ordering/domain/events.rs b/src/ordering/domain/events.rs index 4b497a8..4c22eab 100644 --- a/src/ordering/domain/events.rs +++ b/src/ordering/domain/events.rs @@ -6,9 +6,10 @@ use cqrs_es::DomainEvent; use serde::{Deserialize, Serialize}; use super::{ - line_item_added_event::LineItemAddedEvent, line_item_deleted_event::LineItemDeletedEvent, - line_item_updated_event::LineItemUpdatedEvent, order_added_event::OrderAddedEvent, - order_deleted_event::OrderDeletedEvent, order_updated_event::OrderUpdatedEvent, + kot_added_event::KotAddedEvent, line_item_added_event::LineItemAddedEvent, + line_item_deleted_event::LineItemDeletedEvent, line_item_updated_event::LineItemUpdatedEvent, + order_added_event::OrderAddedEvent, order_deleted_event::OrderDeletedEvent, + order_updated_event::OrderUpdatedEvent, }; #[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)] @@ -19,6 +20,7 @@ pub enum OrderingEvent { OrderAdded(OrderAddedEvent), OrderUpdated(OrderUpdatedEvent), OrderDeleted(OrderDeletedEvent), + KotAdded(KotAddedEvent), } impl DomainEvent for OrderingEvent { @@ -34,6 +36,7 @@ impl DomainEvent for OrderingEvent { OrderingEvent::OrderAdded { .. } => "OrderingOrderAdded", OrderingEvent::OrderUpdated { .. } => "OrderingOrderUpdated", OrderingEvent::OrderDeleted { .. } => "OrderingOrderDeleted", + OrderingEvent::KotAdded { .. } => "OrderingKotAdded", }; e.to_string() diff --git a/src/ordering/domain/kot_added_event.rs b/src/ordering/domain/kot_added_event.rs new file mode 100644 index 0000000..26fd5f6 --- /dev/null +++ b/src/ordering/domain/kot_added_event.rs @@ -0,0 +1,41 @@ +// SPDX-FileCopyrightText: 2024 Aravinth Manivannan +// +// SPDX-License-Identifier: AGPL-3.0-or-later + +use derive_builder::Builder; +use derive_getters::Getters; +use serde::{Deserialize, Serialize}; +use uuid::Uuid; + +use super::kot_aggregate::Kot; + +#[derive( + Clone, Debug, Builder, Serialize, Deserialize, Getters, Eq, PartialEq, Ord, PartialOrd, +)] +pub struct KotAddedEvent { + added_by_user: Uuid, + + kot: Kot, +} + +#[cfg(test)] +pub mod tests { + use crate::ordering::domain::add_kot_command::AddKotCommand; + + use super::*; + + pub fn get_added_kot_event_from_command(cmd: &AddKotCommand) -> KotAddedEvent { + let kot = Kot::get_kot(); + + KotAddedEventBuilder::default() + .added_by_user(cmd.adding_by().clone()) + .kot(kot) + .build() + .unwrap() + } + + #[test] + fn test_event() { + get_added_kot_event_from_command(&AddKotCommand::get_cmd()); + } +} diff --git a/src/ordering/domain/mod.rs b/src/ordering/domain/mod.rs index 747c89d..a5a7520 100644 --- a/src/ordering/domain/mod.rs +++ b/src/ordering/domain/mod.rs @@ -8,6 +8,7 @@ pub mod line_item_aggregate; pub mod order_aggregate; // commands +pub mod add_kot_command; pub mod add_line_item_command; pub mod add_order_command; pub mod commands; @@ -18,6 +19,7 @@ pub mod update_order_command; // events pub mod events; +pub mod kot_added_event; pub mod line_item_added_event; pub mod line_item_deleted_event; pub mod line_item_updated_event;