From de81f09af8d7eacc03c57f8e6dc036ecf7360944 Mon Sep 17 00:00:00 2001 From: Aravinth Manivannan Date: Tue, 23 Jul 2024 19:18:09 +0530 Subject: [PATCH 1/7] feat: define order query table --- ...de29a1a2b346b39c8d656c15c472dde68876e.json | 28 +++++++++++++ ...1d114407079fbe5723664004bd52bba0426ab.json | 22 ++++++++++ ...8e664c96299edfd9a11d08487e4db10a10e8b.json | 18 +++++++++ ...16a6e72bb0007a298cdba17f45f4b8bed5f56.json | 18 +++++++++ ...201ec52340d09d45041addd54eb349af82488.json | 40 +++++++++++++++++++ ...240723132531_cqrs_ordering_order_query.sql | 17 ++++++++ 6 files changed, 143 insertions(+) create mode 100644 .sqlx/query-79b4ef2c22faea05f053c04da49de29a1a2b346b39c8d656c15c472dde68876e.json create mode 100644 .sqlx/query-8e7d8433e8454e9a4edfb96852d1d114407079fbe5723664004bd52bba0426ab.json create mode 100644 .sqlx/query-a342003149f2991ef0280d008f18e664c96299edfd9a11d08487e4db10a10e8b.json create mode 100644 .sqlx/query-bc7d17aab113d0519c53e5f612116a6e72bb0007a298cdba17f45f4b8bed5f56.json create mode 100644 .sqlx/query-dbe1e41f04a81b2a504b9179911201ec52340d09d45041addd54eb349af82488.json create mode 100644 migrations/20240723132531_cqrs_ordering_order_query.sql diff --git a/.sqlx/query-79b4ef2c22faea05f053c04da49de29a1a2b346b39c8d656c15c472dde68876e.json b/.sqlx/query-79b4ef2c22faea05f053c04da49de29a1a2b346b39c8d656c15c472dde68876e.json new file mode 100644 index 0000000..134f882 --- /dev/null +++ b/.sqlx/query-79b4ef2c22faea05f053c04da49de29a1a2b346b39c8d656c15c472dde68876e.json @@ -0,0 +1,28 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT \n order_id, version\n FROM\n cqrs_ordering_order_query\n WHERE\n order_id = $1;", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "order_id", + "type_info": "Uuid" + }, + { + "ordinal": 1, + "name": "version", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + "Uuid" + ] + }, + "nullable": [ + false, + false + ] + }, + "hash": "79b4ef2c22faea05f053c04da49de29a1a2b346b39c8d656c15c472dde68876e" +} diff --git a/.sqlx/query-8e7d8433e8454e9a4edfb96852d1d114407079fbe5723664004bd52bba0426ab.json b/.sqlx/query-8e7d8433e8454e9a4edfb96852d1d114407079fbe5723664004bd52bba0426ab.json new file mode 100644 index 0000000..822c5a8 --- /dev/null +++ b/.sqlx/query-8e7d8433e8454e9a4edfb96852d1d114407079fbe5723664004bd52bba0426ab.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT EXISTS (\n SELECT 1\n FROM cqrs_ordering_order_query\n WHERE\n order_id = $1\n );", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "exists", + "type_info": "Bool" + } + ], + "parameters": { + "Left": [ + "Uuid" + ] + }, + "nullable": [ + null + ] + }, + "hash": "8e7d8433e8454e9a4edfb96852d1d114407079fbe5723664004bd52bba0426ab" +} diff --git a/.sqlx/query-a342003149f2991ef0280d008f18e664c96299edfd9a11d08487e4db10a10e8b.json b/.sqlx/query-a342003149f2991ef0280d008f18e664c96299edfd9a11d08487e4db10a10e8b.json new file mode 100644 index 0000000..8fed07b --- /dev/null +++ b/.sqlx/query-a342003149f2991ef0280d008f18e664c96299edfd9a11d08487e4db10a10e8b.json @@ -0,0 +1,18 @@ +{ + "db_name": "PostgreSQL", + "query": "INSERT INTO cqrs_ordering_order_query (\n version,\n customer_name,\n order_id,\n created_time,\n deleted\n\n ) VALUES (\n $1, $2, $3, $4, $5\n );", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int8", + "Text", + "Uuid", + "Timestamptz", + "Bool" + ] + }, + "nullable": [] + }, + "hash": "a342003149f2991ef0280d008f18e664c96299edfd9a11d08487e4db10a10e8b" +} diff --git a/.sqlx/query-bc7d17aab113d0519c53e5f612116a6e72bb0007a298cdba17f45f4b8bed5f56.json b/.sqlx/query-bc7d17aab113d0519c53e5f612116a6e72bb0007a298cdba17f45f4b8bed5f56.json new file mode 100644 index 0000000..bcbb2e8 --- /dev/null +++ b/.sqlx/query-bc7d17aab113d0519c53e5f612116a6e72bb0007a298cdba17f45f4b8bed5f56.json @@ -0,0 +1,18 @@ +{ + "db_name": "PostgreSQL", + "query": "UPDATE\n cqrs_ordering_order_query\n SET\n version = $1,\n customer_name = $2,\n order_id = $3,\n created_time = $4,\n deleted = $5;", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int8", + "Text", + "Uuid", + "Timestamptz", + "Bool" + ] + }, + "nullable": [] + }, + "hash": "bc7d17aab113d0519c53e5f612116a6e72bb0007a298cdba17f45f4b8bed5f56" +} diff --git a/.sqlx/query-dbe1e41f04a81b2a504b9179911201ec52340d09d45041addd54eb349af82488.json b/.sqlx/query-dbe1e41f04a81b2a504b9179911201ec52340d09d45041addd54eb349af82488.json new file mode 100644 index 0000000..804ef95 --- /dev/null +++ b/.sqlx/query-dbe1e41f04a81b2a504b9179911201ec52340d09d45041addd54eb349af82488.json @@ -0,0 +1,40 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT \n customer_name,\n order_id,\n created_time,\n deleted\n FROM\n cqrs_ordering_order_query\n WHERE\n order_id = $1;", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "customer_name", + "type_info": "Text" + }, + { + "ordinal": 1, + "name": "order_id", + "type_info": "Uuid" + }, + { + "ordinal": 2, + "name": "created_time", + "type_info": "Timestamptz" + }, + { + "ordinal": 3, + "name": "deleted", + "type_info": "Bool" + } + ], + "parameters": { + "Left": [ + "Uuid" + ] + }, + "nullable": [ + false, + false, + false, + false + ] + }, + "hash": "dbe1e41f04a81b2a504b9179911201ec52340d09d45041addd54eb349af82488" +} diff --git a/migrations/20240723132531_cqrs_ordering_order_query.sql b/migrations/20240723132531_cqrs_ordering_order_query.sql new file mode 100644 index 0000000..3b4b094 --- /dev/null +++ b/migrations/20240723132531_cqrs_ordering_order_query.sql @@ -0,0 +1,17 @@ +-- SPDX-FileCopyrightText: 2024 Aravinth Manivannan +-- +-- SPDX-License-Identifier: AGPL-3.0-or-later + +CREATE TABLE IF NOT EXISTS cqrs_ordering_order_query +( + version bigint CHECK (version >= 0) NOT NULL, + + created_time timestamp with time zone DEFAULT (CURRENT_TIMESTAMP) NOT NULL, + order_id UUID NOT NULL UNIQUE, + + customer_name TEXT NOT NULL, + + deleted BOOLEAN NOT NULL DEFAULT FALSE, + + PRIMARY KEY (order_id) +); -- 2.39.2 From 95dcf0bae1c0160fb340f72217997930776495e2 Mon Sep 17 00:00:00 2001 From: Aravinth Manivannan Date: Tue, 23 Jul 2024 19:18:58 +0530 Subject: [PATCH 2/7] feat: db port to check for duplicate order IDs --- .../adapters/output/db/order_id_exists.rs | 89 +++++++++++++++++++ .../application/port/output/db/errors.rs | 3 + .../application/port/output/db/mod.rs | 1 + .../port/output/db/order_id_exists.rs | 57 ++++++++++++ 4 files changed, 150 insertions(+) create mode 100644 src/ordering/adapters/output/db/order_id_exists.rs create mode 100644 src/ordering/application/port/output/db/order_id_exists.rs diff --git a/src/ordering/adapters/output/db/order_id_exists.rs b/src/ordering/adapters/output/db/order_id_exists.rs new file mode 100644 index 0000000..9510030 --- /dev/null +++ b/src/ordering/adapters/output/db/order_id_exists.rs @@ -0,0 +1,89 @@ +// SPDX-FileCopyrightText: 2024 Aravinth Manivannan +// +// SPDX-License-Identifier: AGPL-3.0-or-later + +use uuid::Uuid; + +use super::OrderingDBPostgresAdapter; +use crate::ordering::application::port::output::db::{errors::*, order_id_exists::*}; + +#[async_trait::async_trait] +impl OrderIDExistsDBPort for OrderingDBPostgresAdapter { + async fn order_id_exists(&self, order_id: &Uuid) -> OrderingDBResult { + let res = sqlx::query!( + "SELECT EXISTS ( + SELECT 1 + FROM cqrs_ordering_order_query + WHERE + order_id = $1 + );", + order_id + ) + .fetch_one(&self.pool) + .await?; + if let Some(x) = res.exists { + Ok(x) + } else { + Ok(false) + } + } +} + +#[cfg(test)] +pub mod tests { + + use super::*; + // use crate::ordering::domain::add_order_command::tests::get_customizations; + use crate::ordering::domain::order_aggregate::*; + + async fn create_dummy_order(order: &Order, db: &OrderingDBPostgresAdapter) { + sqlx::query!( + "INSERT INTO cqrs_ordering_order_query ( + version, + order_id, + customer_name, + created_time, + deleted + ) VALUES ( + $1, $2, $3, $4, $5 + );", + 1, + order.order_id(), + order.customer_name(), + order.created_time(), + order.deleted().clone(), + ) + .execute(&db.pool) + .await + .unwrap(); + } + + #[actix_rt::test] + async fn test_postgres_order_exists() { + let settings = crate::settings::tests::get_settings().await; + settings.create_db().await; + let db = super::OrderingDBPostgresAdapter::new( + sqlx::postgres::PgPool::connect(&settings.database.url) + .await + .unwrap(), + ); + + let order = Order::default(); + + // state doesn't exist + assert!(!db + .order_id_exists(order.order_id()) + .await + .unwrap()); + + create_dummy_order(&order, &db).await; + + // state exists + assert!(db + .order_id_exists(order.order_id()) + .await + .unwrap()); + + settings.drop_db().await; + } +} diff --git a/src/ordering/application/port/output/db/errors.rs b/src/ordering/application/port/output/db/errors.rs index db03945..b383b4c 100644 --- a/src/ordering/application/port/output/db/errors.rs +++ b/src/ordering/application/port/output/db/errors.rs @@ -12,4 +12,7 @@ pub enum OrderingDBError { DuplicateLineItemID, LineItemIDNotFound, InternalError, + DuplicateOrderID, + OrderIDNotFound, + } diff --git a/src/ordering/application/port/output/db/mod.rs b/src/ordering/application/port/output/db/mod.rs index 7c397a6..2db807b 100644 --- a/src/ordering/application/port/output/db/mod.rs +++ b/src/ordering/application/port/output/db/mod.rs @@ -4,3 +4,4 @@ pub mod errors; pub mod line_item_id_exists; +pub mod order_id_exists; diff --git a/src/ordering/application/port/output/db/order_id_exists.rs b/src/ordering/application/port/output/db/order_id_exists.rs new file mode 100644 index 0000000..97e7951 --- /dev/null +++ b/src/ordering/application/port/output/db/order_id_exists.rs @@ -0,0 +1,57 @@ +// SPDX-FileCopyrightText: 2024 Aravinth Manivannan +// +// SPDX-License-Identifier: AGPL-3.0-or-later + +use mockall::predicate::*; +use mockall::*; +use uuid::Uuid; + +use super::errors::*; +#[cfg(test)] +#[allow(unused_imports)] +pub use tests::*; + +#[automock] +#[async_trait::async_trait] +pub trait OrderIDExistsDBPort: Send + Sync { + async fn order_id_exists(&self, order_id: &Uuid) -> OrderingDBResult; +} + +pub type OrderIDExistsDBPortObj = std::sync::Arc; + +#[cfg(test)] +pub mod tests { + use super::*; + + use std::sync::Arc; + + pub fn mock_order_id_exists_db_port_false( + times: Option, + ) -> OrderIDExistsDBPortObj { + let mut m = MockOrderIDExistsDBPort::new(); + if let Some(times) = times { + m.expect_order_id_exists() + .times(times) + .returning(|_| Ok(false)); + } else { + m.expect_order_id_exists().returning(|_| Ok(false)); + } + + Arc::new(m) + } + + pub fn mock_order_id_exists_db_port_true( + times: Option, + ) -> OrderIDExistsDBPortObj { + let mut m = MockOrderIDExistsDBPort::new(); + if let Some(times) = times { + m.expect_order_id_exists() + .times(times) + .returning(|_| Ok(true)); + } else { + m.expect_order_id_exists().returning(|_| Ok(true)); + } + + Arc::new(m) + } +} -- 2.39.2 From a47810e4e7c74e49adf3af7b936d966c9daa53fe Mon Sep 17 00:00:00 2001 From: Aravinth Manivannan Date: Tue, 23 Jul 2024 19:19:18 +0530 Subject: [PATCH 3/7] feat: define order aggregate --- src/ordering/domain/order_aggregate.rs | 185 +++++++++++++++++++++++++ 1 file changed, 185 insertions(+) create mode 100644 src/ordering/domain/order_aggregate.rs diff --git a/src/ordering/domain/order_aggregate.rs b/src/ordering/domain/order_aggregate.rs new file mode 100644 index 0000000..2768f7c --- /dev/null +++ b/src/ordering/domain/order_aggregate.rs @@ -0,0 +1,185 @@ +// SPDX-FileCopyrightText: 2024 Aravinth Manivannan +// +// SPDX-License-Identifier: AGPL-3.0-or-later + +use async_trait::async_trait; +use cqrs_es::Aggregate; +use derive_builder::Builder; +use derive_getters::Getters; +use serde::{Deserialize, Serialize}; +use time::OffsetDateTime; +use uuid::Uuid; + +use crate::ordering::{ + application::services::{errors::*, *}, + domain::{commands::*, events::*}, +}; + + +#[derive( + Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd, Builder, Getters, +)] +pub struct Order { + #[builder(default = "OffsetDateTime::now_utc()")] + created_time: OffsetDateTime, + // kot_ids: Vec, + order_id: Uuid, + #[builder(default = "false")] + deleted: bool, + customer_name: String, +} + +impl Default for Order { + fn default() -> Self { + Self { + created_time: OffsetDateTime::now_utc(), + order_id: Default::default(), + deleted: false, + customer_name: Default::default(), + } + } +} + +#[cfg(test)] +mod tests { + use crate::{ordering::domain::add_order_command::AddOrderCommand, utils::uuid::tests::UUID}; + + use super::*; + + impl Order { + pub fn get_order() -> Self { + let cmd = AddOrderCommand::get_cmd(); + + OrderBuilder::default() + .created_time(cmd.created_time().clone()) + .customer_name("test_product".into()) + .order_id(UUID) + .build() + .unwrap() + } + } +} + + +#[async_trait] +impl Aggregate for Order { + type Command = OrderingCommand; + type Event = OrderingEvent; + type Error = OrderingError; + type Services = std::sync::Arc; + + // This identifier should be unique to the system. + fn aggregate_type() -> String { + "ordering.order".to_string() + } + + // The aggregate logic goes here. Note that this will be the _bulk_ of a CQRS system + // so expect to use helper functions elsewhere to keep the code clean. + async fn handle( + &self, + command: Self::Command, + services: &Self::Services, + ) -> Result, Self::Error> { + match command { + OrderingCommand::AddOrder(cmd) => { + let res = services.add_order().add_order(cmd).await?; + Ok(vec![OrderingEvent::OrderAdded(res)]) + } + _ => Ok(Vec::default()), + } + } + + fn apply(&mut self, event: Self::Event) { + match event { + OrderingEvent::OrderAdded(e) => *self = e.order().clone(), + // OrderingEvent::OrderUpdated(e) => *self = e.new_order().clone(), + // OrderingEvent::OrderDeleted(e) => *self = e.order().clone(), + _ => (), + } + } +} + +#[cfg(test)] +mod aggregate_tests { + use std::sync::Arc; + + use add_order_service::tests::mock_add_order_service; + use cqrs_es::test::TestFramework; + // use delete_order_service::tests::mock_delete_order_service; + // use update_order_service::tests::mock_update_order_service; + + use super::*; + +// use crate::ordering::domain::delete_order_command::DeleteOrderCommand; +// use crate::ordering::domain::order_deleted_event::tests::get_deleted_order_event_from_command; +// use crate::ordering::domain::order_updated_event::tests::get_updated_order_event_from_command; +// use crate::ordering::domain::update_order_command::UpdateOrderCommand; + use crate::tests::bdd::*; + + use crate::ordering::domain::{ + add_order_command::*, + order_added_event::tests::get_added_order_event_from_command, + }; + + type OrderTestFramework = TestFramework; + + #[test] + fn test_add_order() { + let cmd = AddOrderCommand::get_cmd(); + let expected = get_added_order_event_from_command(&cmd); + let expected = OrderingEvent::OrderAdded(expected); + + let mut services = MockOrderingServicesInterface::new(); + services + .expect_add_order() + .times(IS_CALLED_ONLY_ONCE.unwrap()) + .return_const(mock_add_order_service(IS_CALLED_ONLY_ONCE, cmd.clone())); + + OrderTestFramework::with(Arc::new(services)) + .given_no_previous_events() + .when(OrderingCommand::AddOrder(cmd)) + .then_expect_events(vec![expected]); + } +// +// #[test] +// fn test_update_order() { +// let cmd = UpdateOrderCommand::get_cmd(); +// let expected = get_updated_order_event_from_command(&cmd); +// let expected = OrderingEvent::OrderUpdated(expected); +// +// let mut services = MockOrderingServicesInterface::new(); +// services +// .expect_update_order() +// .times(IS_CALLED_ONLY_ONCE.unwrap()) +// .return_const(mock_update_order_service( +// IS_CALLED_ONLY_ONCE, +// cmd.clone(), +// )); +// +// OrderTestFramework::with(Arc::new(services)) +// .given_no_previous_events() +// .when(OrderingCommand::UpdateOrder(cmd)) +// .then_expect_events(vec![expected]); +// } +// +// #[test] +// fn test_delete_order() { +// let cmd = DeleteOrderCommand::get_cmd(); +// let expected = get_deleted_order_event_from_command(&cmd); +// let expected = OrderingEvent::OrderDeleted(expected); +// +// let mut services = MockOrderingServicesInterface::new(); +// services +// .expect_delete_order() +// .times(IS_CALLED_ONLY_ONCE.unwrap()) +// .return_const(mock_delete_order_service( +// IS_CALLED_ONLY_ONCE, +// cmd.clone(), +// )); +// +// OrderTestFramework::with(Arc::new(services)) +// .given_no_previous_events() +// .when(OrderingCommand::DeleteOrder(cmd)) +// .then_expect_events(vec![expected]); +// } +} -- 2.39.2 From 3bb854f2eadb392e1f82d318dd2dca3d4fc35d2e Mon Sep 17 00:00:00 2001 From: Aravinth Manivannan Date: Tue, 23 Jul 2024 19:19:53 +0530 Subject: [PATCH 4/7] feat: add order command and reslutant event --- src/ordering/domain/add_order_command.rs | 108 +++++++++++++++++++++++ src/ordering/domain/commands.rs | 4 +- src/ordering/domain/events.rs | 4 +- src/ordering/domain/mod.rs | 3 + src/ordering/domain/order_added_event.rs | 41 +++++++++ 5 files changed, 158 insertions(+), 2 deletions(-) create mode 100644 src/ordering/domain/add_order_command.rs create mode 100644 src/ordering/domain/order_added_event.rs diff --git a/src/ordering/domain/add_order_command.rs b/src/ordering/domain/add_order_command.rs new file mode 100644 index 0000000..72a3526 --- /dev/null +++ b/src/ordering/domain/add_order_command.rs @@ -0,0 +1,108 @@ +// SPDX-FileCopyrightText: 2024 Aravinth Manivannan +// +// SPDX-License-Identifier: AGPL-3.0-or-later + +use derive_builder::Builder; +use derive_getters::Getters; +use derive_more::{Display, Error}; +use serde::{Deserialize, Serialize}; +use time::OffsetDateTime; +use uuid::Uuid; + +use crate::utils::string::empty_string_err; + +#[derive(Debug, Error, Display, Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)] +pub enum AddOrderCommandError { + CustomerNameIsEmpty, +} + +#[derive( + Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd, Getters, Builder, +)] +pub struct UnvalidatedAddOrderCommand { + adding_by: Uuid, + + #[builder(default = "OffsetDateTime::now_utc()")] + created_time: OffsetDateTime, + customer_name: String, +} + +impl UnvalidatedAddOrderCommand { + pub fn validate(self) -> Result { + let customer_name = empty_string_err( + self.customer_name, + AddOrderCommandError::CustomerNameIsEmpty, + )?; + + Ok(AddOrderCommand { + created_time: self.created_time, + customer_name, + adding_by: self.adding_by, + }) + } +} + +#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd, Getters)] +pub struct AddOrderCommand { + created_time: OffsetDateTime, + customer_name: String, + adding_by: Uuid, +} + +#[cfg(test)] +mod tests { + use time::macros::datetime; + + use crate::utils::uuid::tests::UUID; + + use super::*; + + impl AddOrderCommand { + pub fn get_cmd() -> Self { + let customer_name = "foo"; + let adding_by = UUID; + + UnvalidatedAddOrderCommandBuilder::default() + .customer_name(customer_name.into()) + .adding_by(adding_by) + .created_time(datetime!(1970-01-01 0:00 UTC)) + .build() + .unwrap() + .validate() + .unwrap() + } + } + + #[test] + fn test_cmd() { + let customer_name = "foo"; + let adding_by = UUID; + + let cmd = UnvalidatedAddOrderCommandBuilder::default() + .customer_name(customer_name.into()) + .adding_by(adding_by) + .build() + .unwrap() + .validate() + .unwrap(); + + assert_eq!(*cmd.adding_by(), adding_by); + assert_eq!(cmd.customer_name(), customer_name); + } + + #[test] + fn test_cmd_customer_name_empty() { + let customer_name = ""; + let adding_by = UUID; + + assert_eq!( + UnvalidatedAddOrderCommandBuilder::default() + .customer_name(customer_name.into()) + .adding_by(adding_by) + .build() + .unwrap() + .validate(), + Err(AddOrderCommandError::CustomerNameIsEmpty) + ); + } +} diff --git a/src/ordering/domain/commands.rs b/src/ordering/domain/commands.rs index 6a74d43..d68ec89 100644 --- a/src/ordering/domain/commands.rs +++ b/src/ordering/domain/commands.rs @@ -6,7 +6,8 @@ use mockall::predicate::*; use serde::{Deserialize, Serialize}; use super::{ - add_line_item_command::AddLineItemCommand, delete_line_item_command::DeleteLineItemCommand, + add_line_item_command::AddLineItemCommand, add_order_command::AddOrderCommand, + delete_line_item_command::DeleteLineItemCommand, update_line_item_command::UpdateLineItemCommand, }; @@ -15,4 +16,5 @@ pub enum OrderingCommand { AddLineItem(AddLineItemCommand), UpdateLineItem(UpdateLineItemCommand), DeleteLineItem(DeleteLineItemCommand), + AddOrder(AddOrderCommand), } diff --git a/src/ordering/domain/events.rs b/src/ordering/domain/events.rs index c82ab04..0d6a6ef 100644 --- a/src/ordering/domain/events.rs +++ b/src/ordering/domain/events.rs @@ -7,7 +7,7 @@ use serde::{Deserialize, Serialize}; use super::{ line_item_added_event::LineItemAddedEvent, line_item_deleted_event::LineItemDeletedEvent, - line_item_updated_event::LineItemUpdatedEvent, + line_item_updated_event::LineItemUpdatedEvent, order_added_event::OrderAddedEvent, }; #[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)] @@ -15,6 +15,7 @@ pub enum OrderingEvent { LineItemAdded(LineItemAddedEvent), LineItemUpdated(LineItemUpdatedEvent), LineItemDeleted(LineItemDeletedEvent), + OrderAdded(OrderAddedEvent), } impl DomainEvent for OrderingEvent { @@ -27,6 +28,7 @@ impl DomainEvent for OrderingEvent { OrderingEvent::LineItemAdded { .. } => "OrderingLineItemAdded", OrderingEvent::LineItemUpdated { .. } => "OrderingLineItemUpdated", OrderingEvent::LineItemDeleted { .. } => "OrderingLineItemDeleted", + OrderingEvent::OrderAdded { .. } => "OrderingOrderAdded", }; e.to_string() diff --git a/src/ordering/domain/mod.rs b/src/ordering/domain/mod.rs index 4758903..1a806b2 100644 --- a/src/ordering/domain/mod.rs +++ b/src/ordering/domain/mod.rs @@ -5,9 +5,11 @@ // aggregates pub mod kot_aggregate; pub mod line_item_aggregate; +pub mod order_aggregate; // commands pub mod add_line_item_command; +pub mod add_order_command; pub mod commands; pub mod delete_line_item_command; pub mod update_line_item_command; @@ -17,3 +19,4 @@ pub mod events; pub mod line_item_added_event; pub mod line_item_deleted_event; pub mod line_item_updated_event; +pub mod order_added_event; diff --git a/src/ordering/domain/order_added_event.rs b/src/ordering/domain/order_added_event.rs new file mode 100644 index 0000000..8d2be36 --- /dev/null +++ b/src/ordering/domain/order_added_event.rs @@ -0,0 +1,41 @@ +// SPDX-FileCopyrightText: 2024 Aravinth Manivannan +// +// SPDX-License-Identifier: AGPL-3.0-or-later + +use derive_builder::Builder; +use derive_getters::Getters; +use serde::{Deserialize, Serialize}; +use uuid::Uuid; + +use super::order_aggregate::Order; + +#[derive( + Clone, Debug, Builder, Serialize, Deserialize, Getters, Eq, PartialEq, Ord, PartialOrd, +)] +pub struct OrderAddedEvent { + added_by_user: Uuid, + + order: Order, +} + +#[cfg(test)] +pub mod tests { + use crate::ordering::domain::add_order_command::AddOrderCommand; + + use super::*; + + pub fn get_added_order_event_from_command(cmd: &AddOrderCommand) -> OrderAddedEvent { + let order = Order::get_order(); + + OrderAddedEventBuilder::default() + .added_by_user(cmd.adding_by().clone()) + .order(order) + .build() + .unwrap() + } + + #[test] + fn test_event() { + get_added_order_event_from_command(&AddOrderCommand::get_cmd()); + } +} -- 2.39.2 From a99ba07d19dcf5af8cf4a65cae80a5abc395cac5 Mon Sep 17 00:00:00 2001 From: Aravinth Manivannan Date: Tue, 23 Jul 2024 19:20:12 +0530 Subject: [PATCH 5/7] feat: impl View for Order aggregate --- src/ordering/adapters/output/db/errors.rs | 21 +- src/ordering/adapters/output/db/mod.rs | 4 +- src/ordering/adapters/output/db/order_view.rs | 227 ++++++++++++++++++ 3 files changed, 242 insertions(+), 10 deletions(-) create mode 100644 src/ordering/adapters/output/db/order_view.rs diff --git a/src/ordering/adapters/output/db/errors.rs b/src/ordering/adapters/output/db/errors.rs index 81f70cc..5a6789a 100644 --- a/src/ordering/adapters/output/db/errors.rs +++ b/src/ordering/adapters/output/db/errors.rs @@ -20,8 +20,11 @@ impl From for OrderingDBError { if err.code() == Some(Cow::from("23505")) { let msg = err.message(); - if msg.contains("cqrs_ordering_store_query_product_id_key") { + if msg.contains("cqrs_ordering_store_query_line_item_id_key") { return Self::DuplicateLineItemID; + } else if msg.contains("cqrs_ordering_store_query_order_id_key") { + return Self::DuplicateOrderID; + } else { println!("{msg}"); } @@ -31,14 +34,14 @@ impl From for OrderingDBError { } } -///// map custom row not found error to DB error -//pub fn map_row_not_found_err(e: SqlxError, row_not_found: OrderingDBError) -> OrderingDBError { -// if let SqlxError::RowNotFound = e { -// row_not_found -// } else { -// e.into() -// } -//} +/// map custom row not found error to DB error +pub fn map_row_not_found_err(e: SqlxError, row_not_found: OrderingDBError) -> OrderingDBError { + if let SqlxError::RowNotFound = e { + row_not_found + } else { + e.into() + } +} #[derive(Debug)] pub enum PostgresAggregateError { diff --git a/src/ordering/adapters/output/db/mod.rs b/src/ordering/adapters/output/db/mod.rs index bb9cbd9..0f1a601 100644 --- a/src/ordering/adapters/output/db/mod.rs +++ b/src/ordering/adapters/output/db/mod.rs @@ -9,8 +9,10 @@ use sqlx::postgres::PgPool; use crate::db::{migrate::RunMigrations, sqlx_postgres::Postgres}; mod errors; -pub mod line_item_id_exists; +mod line_item_id_exists; +mod order_id_exists; mod line_item_view; +mod order_view; #[derive(Clone)] pub struct OrderingDBPostgresAdapter { diff --git a/src/ordering/adapters/output/db/order_view.rs b/src/ordering/adapters/output/db/order_view.rs new file mode 100644 index 0000000..242713c --- /dev/null +++ b/src/ordering/adapters/output/db/order_view.rs @@ -0,0 +1,227 @@ +// SPDX-FileCopyrightText: 2024 Aravinth Manivannan +// +// SPDX-License-Identifier: AGPL-3.0-or-later + +use std::str::FromStr; + +use async_trait::async_trait; +use cqrs_es::persist::{PersistenceError, ViewContext, ViewRepository}; +use cqrs_es::{EventEnvelope, Query, View}; +use serde::{Deserialize, Serialize}; +use time::OffsetDateTime; +use uuid::Uuid; + +use super::errors::*; +use super::OrderingDBPostgresAdapter; +use crate::ordering::domain::events::OrderingEvent; +use crate::ordering::domain::order_aggregate::*; +use crate::utils::parse_aggregate_id::parse_aggregate_id; + +pub const NEW_ORDER_NON_UUID: &str = "new_order_non_uuid-asdfa"; + +// The view for a Order query, for a standard http application this should +// be designed to reflect the response dto that will be returned to a user. +#[derive(Debug, Serialize, Deserialize)] +pub struct OrderView { + customer_name: String, + order_id: Uuid, + + created_time: OffsetDateTime, + deleted: bool, +} + +impl Default for OrderView { + fn default() -> Self { + Self { + customer_name: Default::default(), + order_id: Default::default(), + created_time: OffsetDateTime::now_utc(), + deleted: false, + } + } +} + +impl From for Order { + fn from(v: OrderView) -> Self { + + OrderBuilder::default() + .customer_name(v.customer_name) + .order_id(v.order_id) + .created_time(v.created_time) + .deleted(v.deleted) + .build() + .unwrap() + } +} + +// This updates the view with events as they are committed. +// The logic should be minimal here, e.g., don't calculate the account balance, +// design the events to carry the balance information instead. +impl View for OrderView { + fn update(&mut self, event: &EventEnvelope) { + match &event.payload { + OrderingEvent::OrderAdded(val) => { + self.customer_name = val.order().customer_name().into(); + self.order_id = *val.order().order_id(); + self.created_time = val.order().created_time().clone(); + + self.deleted = false; + } + _ => (), + } + } +} + +#[async_trait] +impl ViewRepository for OrderingDBPostgresAdapter { + async fn load(&self, order_id: &str) -> Result, PersistenceError> { + let order_id = match parse_aggregate_id(order_id, NEW_ORDER_NON_UUID)? { + Some((val, _)) => return Ok(Some(val)), + None => Uuid::parse_str(order_id).unwrap(), + }; + + let res = sqlx::query_as!( + OrderView, + "SELECT + customer_name, + order_id, + created_time, + deleted + FROM + cqrs_ordering_order_query + WHERE + order_id = $1;", + order_id + ) + .fetch_one(&self.pool) + .await + .map_err(PostgresAggregateError::from)?; + + Ok(Some(res)) + } + + async fn load_with_context( + &self, + order_id: &str, + ) -> Result, PersistenceError> { + let order_id = match parse_aggregate_id(order_id, NEW_ORDER_NON_UUID)? { + Some(val) => return Ok(Some(val)), + None => Uuid::parse_str(order_id).unwrap(), + }; + + let res = sqlx::query_as!( + OrderView, + "SELECT + customer_name, + order_id, + created_time, + deleted + FROM + cqrs_ordering_order_query + WHERE + order_id = $1;", + order_id + ) + .fetch_one(&self.pool) + .await + .map_err(PostgresAggregateError::from)?; + + struct Context { + version: i64, + order_id: Uuid, + } + + let ctx = sqlx::query_as!( + Context, + "SELECT + order_id, version + FROM + cqrs_ordering_order_query + WHERE + order_id = $1;", + order_id + ) + .fetch_one(&self.pool) + .await + .map_err(PostgresAggregateError::from)?; + + let view_context = ViewContext::new(ctx.order_id.to_string(), ctx.version); + Ok(Some((res, view_context))) + } + + async fn update_view( + &self, + view: OrderView, + context: ViewContext, + ) -> Result<(), PersistenceError> { + match context.version { + 0 => { + let version = context.version + 1; + sqlx::query!( + "INSERT INTO cqrs_ordering_order_query ( + version, + customer_name, + order_id, + created_time, + deleted + + ) VALUES ( + $1, $2, $3, $4, $5 + );", + version, + view.customer_name, + view.order_id, + view.created_time, + view.deleted, + ) + .execute(&self.pool) + .await + .map_err(PostgresAggregateError::from)?; + } + + _ => { + let version = context.version + 1; + sqlx::query!( + "UPDATE + cqrs_ordering_order_query + SET + version = $1, + customer_name = $2, + order_id = $3, + created_time = $4, + deleted = $5;", + version, + view.customer_name, + view.order_id, + view.created_time, + view.deleted, + ) + .execute(&self.pool) + .await + .map_err(PostgresAggregateError::from)?; + } + } + + Ok(()) + } +} + +#[async_trait] +impl Query for OrderingDBPostgresAdapter { + async fn dispatch(&self, order_id: &str, events: &[EventEnvelope]) { + let res = self + .load_with_context(order_id) + .await + .unwrap_or_else(|_| { + Some(( + OrderView::default(), + ViewContext::new(order_id.into(), 0), + )) + }); + let (mut view, view_context): (OrderView, ViewContext) = res.unwrap(); + for event in events { + view.update(event); + } + self.update_view(view, view_context).await.unwrap(); + } +} -- 2.39.2 From 3a19d7099d444c6a8bf21d9e9b6ae322d177c89b Mon Sep 17 00:00:00 2001 From: Aravinth Manivannan Date: Tue, 23 Jul 2024 19:20:31 +0530 Subject: [PATCH 6/7] feat: service to add order aggregate --- .../application/services/add_order_service.rs | 108 ++++++++++++++++++ src/ordering/application/services/errors.rs | 6 + src/ordering/application/services/mod.rs | 7 ++ 3 files changed, 121 insertions(+) create mode 100644 src/ordering/application/services/add_order_service.rs diff --git a/src/ordering/application/services/add_order_service.rs b/src/ordering/application/services/add_order_service.rs new file mode 100644 index 0000000..d66b45e --- /dev/null +++ b/src/ordering/application/services/add_order_service.rs @@ -0,0 +1,108 @@ +// SPDX-FileCopyrightText: 2024 Aravinth Manivannan +// +// SPDX-License-Identifier: AGPL-3.0-or-later + +use std::sync::Arc; + +use derive_builder::Builder; +use mockall::predicate::*; +use mockall::*; +use time::OffsetDateTime; + +use super::errors::*; +use crate::ordering::{ + application::port::output::db::order_id_exists::*, + domain::{add_order_command::*, order_added_event::*, order_aggregate::*}, +}; +use crate::utils::uuid::*; + +#[automock] +#[async_trait::async_trait] +pub trait AddOrderUseCase: Send + Sync { + async fn add_order(&self, cmd: AddOrderCommand) -> OrderingResult; +} + +pub type AddOrderServiceObj = Arc; + +#[derive(Clone, Builder)] +pub struct AddOrderService { + db_order_id_exists: OrderIDExistsDBPortObj, + get_uuid: GetUUIDInterfaceObj, +} + +#[async_trait::async_trait] +impl AddOrderUseCase for AddOrderService { + async fn add_order(&self, cmd: AddOrderCommand) -> OrderingResult { + let mut order_id = self.get_uuid.get_uuid(); + + loop { + if self + .db_order_id_exists + .order_id_exists(&order_id) + .await? + { + order_id = self.get_uuid.get_uuid(); + continue; + } else { + break; + } + } + + let order = OrderBuilder::default() + .created_time(OffsetDateTime::now_utc()) + .customer_name(cmd.customer_name().into()) + .order_id(order_id) + .deleted(false) + .build() + .unwrap(); + + Ok(OrderAddedEventBuilder::default() + .added_by_user(*cmd.adding_by()) + .order(order) + .build() + .unwrap()) + } +} + +#[cfg(test)] +pub mod tests { + use super::*; + + use crate::ordering::domain::order_added_event::tests::get_added_order_event_from_command; + use crate::utils::uuid::tests::UUID; + use crate::{tests::bdd::*, utils::uuid::tests::mock_get_uuid}; + + pub fn mock_add_order_service( + times: Option, + cmd: AddOrderCommand, + ) -> AddOrderServiceObj { + let mut m = MockAddOrderUseCase::new(); + + let res = get_added_order_event_from_command(&cmd); + if let Some(times) = times { + m.expect_add_order() + .times(times) + .returning(move |_| Ok(res.clone())); + } else { + m.expect_add_order().returning(move |_| Ok(res.clone())); + } + + Arc::new(m) + } + + #[actix_rt::test] + async fn test_service() { + let cmd = AddOrderCommand::get_cmd(); + + let s = AddOrderServiceBuilder::default() + .db_order_id_exists(mock_order_id_exists_db_port_false(IS_CALLED_ONLY_ONCE)) + .get_uuid(mock_get_uuid(IS_CALLED_ONLY_ONCE)) + .build() + .unwrap(); + + let res = s.add_order(cmd.clone()).await.unwrap(); + assert_eq!(res.order().customer_name(), cmd.customer_name()); + assert!(!res.order().deleted()); + assert_eq!(res.added_by_user(), cmd.adding_by()); + } +} diff --git a/src/ordering/application/services/errors.rs b/src/ordering/application/services/errors.rs index 175d921..de37b9e 100644 --- a/src/ordering/application/services/errors.rs +++ b/src/ordering/application/services/errors.rs @@ -14,6 +14,7 @@ pub type OrderingResult = Result; pub enum OrderingError { LineItemIDNotFound, InternalError, + OrderIDNotFound, } // impl From for OrderingError { @@ -24,6 +25,11 @@ impl From for OrderingError { Self::InternalError } OrderingDBError::LineItemIDNotFound => OrderingError::LineItemIDNotFound, + OrderingDBError::DuplicateOrderID => { + error!("DuplicateOrderID"); + Self::InternalError + } + OrderingDBError::OrderIDNotFound => OrderingError::OrderIDNotFound, OrderingDBError::InternalError => Self::InternalError, } } diff --git a/src/ordering/application/services/mod.rs b/src/ordering/application/services/mod.rs index 705fc2f..170139c 100644 --- a/src/ordering/application/services/mod.rs +++ b/src/ordering/application/services/mod.rs @@ -12,12 +12,14 @@ pub mod errors; pub mod add_line_item_service; pub mod delete_line_item_service; pub mod update_line_item_service; +pub mod add_order_service; #[automock] pub trait OrderingServicesInterface: Send + Sync { fn add_line_item(&self) -> add_line_item_service::AddLineItemServiceObj; fn update_line_item(&self) -> update_line_item_service::UpdateLineItemServiceObj; fn delete_line_item(&self) -> delete_line_item_service::DeleteLineItemServiceObj; + fn add_order(&self) -> add_order_service::AddOrderServiceObj; } #[derive(Clone, Builder)] @@ -25,6 +27,7 @@ pub struct OrderingServices { add_line_item: add_line_item_service::AddLineItemServiceObj, update_line_item: update_line_item_service::UpdateLineItemServiceObj, delete_line_item: delete_line_item_service::DeleteLineItemServiceObj, + add_order: add_order_service::AddOrderServiceObj, } impl OrderingServicesInterface for OrderingServices { @@ -38,4 +41,8 @@ impl OrderingServicesInterface for OrderingServices { fn delete_line_item(&self) -> delete_line_item_service::DeleteLineItemServiceObj { self.delete_line_item.clone() } + + fn add_order(&self) -> add_order_service::AddOrderServiceObj { + self.add_order.clone() + } } -- 2.39.2 From b1fabadaad56da4a9dfe78924c9b5a6d9614e148 Mon Sep 17 00:00:00 2001 From: Aravinth Manivannan Date: Tue, 23 Jul 2024 19:37:54 +0530 Subject: [PATCH 7/7] fix: catchall event and cmd handler for LineItem --- src/ordering/domain/line_item_aggregate.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/ordering/domain/line_item_aggregate.rs b/src/ordering/domain/line_item_aggregate.rs index 68aeab6..0483d33 100644 --- a/src/ordering/domain/line_item_aggregate.rs +++ b/src/ordering/domain/line_item_aggregate.rs @@ -98,7 +98,8 @@ impl Aggregate for LineItem { OrderingCommand::DeleteLineItem(cmd) => { let res = services.delete_line_item().delete_line_item(cmd).await?; Ok(vec![OrderingEvent::LineItemDeleted(res)]) - } // _ => Ok(Vec::default()), + } + _ => Ok(Vec::default()), } } @@ -107,7 +108,7 @@ impl Aggregate for LineItem { OrderingEvent::LineItemAdded(e) => *self = e.line_item().clone(), OrderingEvent::LineItemUpdated(e) => *self = e.new_line_item().clone(), OrderingEvent::LineItemDeleted(e) => *self = e.line_item().clone(), - // _ => (), + _ => (), } } } -- 2.39.2