From 19b6c5420be4cf1a75f60f9410776301161816b5 Mon Sep 17 00:00:00 2001 From: Aravinth Manivannan Date: Sat, 13 Jul 2024 17:40:11 +0530 Subject: [PATCH 1/3] feat: define store aggregates and implement service to create store --- .../adapters/output/db/postgres/errors.rs | 79 ++++++++ .../adapters/output/db/postgres/mod.rs | 27 +++ .../output/db/postgres/store_id_exists.rs | 79 ++++++++ .../adapters/output/db/postgres/store_view.rs | 175 ++++++++++++++++++ .../application/port/output/db/errors.rs | 15 ++ .../application/port/output/db/mod.rs | 7 + .../port/output/db/store_id_exists.rs | 54 ++++++ .../application/services/add_store_service.rs | 91 +++++++++ src/inventory/application/services/errors.rs | 30 +++ src/inventory/application/services/mod.rs | 30 +++ src/inventory/domain/add_store_command.rs | 79 ++++++++ src/inventory/domain/store_added_event.rs | 18 ++ src/inventory/domain/store_aggregate.rs | 74 ++++++++ 13 files changed, 758 insertions(+) create mode 100644 src/inventory/adapters/output/db/postgres/errors.rs create mode 100644 src/inventory/adapters/output/db/postgres/mod.rs create mode 100644 src/inventory/adapters/output/db/postgres/store_id_exists.rs create mode 100644 src/inventory/adapters/output/db/postgres/store_view.rs create mode 100644 src/inventory/application/port/output/db/errors.rs create mode 100644 src/inventory/application/port/output/db/mod.rs create mode 100644 src/inventory/application/port/output/db/store_id_exists.rs create mode 100644 src/inventory/application/services/add_store_service.rs create mode 100644 src/inventory/application/services/errors.rs create mode 100644 src/inventory/domain/add_store_command.rs create mode 100644 src/inventory/domain/store_added_event.rs create mode 100644 src/inventory/domain/store_aggregate.rs diff --git a/src/inventory/adapters/output/db/postgres/errors.rs b/src/inventory/adapters/output/db/postgres/errors.rs new file mode 100644 index 0000000..2676699 --- /dev/null +++ b/src/inventory/adapters/output/db/postgres/errors.rs @@ -0,0 +1,79 @@ +// SPDX-FileCopyrightText: 2024 Aravinth Manivannan +// +// SPDX-License-Identifier: AGPL-3.0-or-later + +use std::borrow::Cow; + +use cqrs_es::persist::PersistenceError; +use sqlx::Error as SqlxError; + +use crate::inventory::application::port::output::db::errors::InventoryDBError; + +impl From for InventoryDBError { + fn from(e: SqlxError) -> Self { + log::error!("[postgres] err: {}", e); + if let SqlxError::Database(err) = e { + if err.code() == Some(Cow::from("23505")) { + let msg = err.message(); + if msg.contains("cqrs_inventory_store_query_store_id_key") { + return Self::DuplicateStoreID; + } else { + println!("{msg}"); + } + } + } + Self::InternalError + } +} + +/// map custom row not found error to DB error +pub fn map_row_not_found_err(e: SqlxError, row_not_found: InventoryDBError) -> InventoryDBError { + if let SqlxError::RowNotFound = e { + row_not_found + } else { + e.into() + } +} + +#[derive(Debug)] +pub enum PostgresAggregateError { + OptimisticLock, + ConnectionError(Box), + DeserializationError(Box), + UnknownError(Box), +} + +impl From for PostgresAggregateError { + fn from(err: SqlxError) -> Self { + // TODO: improve error handling + match &err { + SqlxError::Database(database_error) => { + if let Some(code) = database_error.code() { + if code.as_ref() == "23505" { + return PostgresAggregateError::OptimisticLock; + } + } + PostgresAggregateError::UnknownError(Box::new(err)) + } + SqlxError::Io(_) | SqlxError::Tls(_) => { + PostgresAggregateError::ConnectionError(Box::new(err)) + } + _ => PostgresAggregateError::UnknownError(Box::new(err)), + } + } +} + +impl From for PersistenceError { + fn from(err: PostgresAggregateError) -> Self { + match err { + PostgresAggregateError::OptimisticLock => PersistenceError::OptimisticLockError, + PostgresAggregateError::ConnectionError(error) => { + PersistenceError::ConnectionError(error) + } + PostgresAggregateError::DeserializationError(error) => { + PersistenceError::UnknownError(error) + } + PostgresAggregateError::UnknownError(error) => PersistenceError::UnknownError(error), + } + } +} diff --git a/src/inventory/adapters/output/db/postgres/mod.rs b/src/inventory/adapters/output/db/postgres/mod.rs new file mode 100644 index 0000000..f743e32 --- /dev/null +++ b/src/inventory/adapters/output/db/postgres/mod.rs @@ -0,0 +1,27 @@ +// SPDX-FileCopyrightText: 2024 Aravinth Manivannan +// +// SPDX-License-Identifier: AGPL-3.0-or-later +use std::sync::Arc; + +use sqlx::postgres::PgPool; + +use crate::db::{migrate::RunMigrations, sqlx_postgres::Postgres}; + +mod errors; +mod store_id_exists; +mod store_view; + +#[derive(Clone)] +pub struct InventoryDBPostgresAdapter { + pool: PgPool, +} + +impl InventoryDBPostgresAdapter { + pub fn new(pool: PgPool) -> Self { + Self { pool } + } + + pub fn migratable(&self) -> Arc { + Arc::new(Postgres::new(self.pool.clone())) + } +} diff --git a/src/inventory/adapters/output/db/postgres/store_id_exists.rs b/src/inventory/adapters/output/db/postgres/store_id_exists.rs new file mode 100644 index 0000000..35f66ad --- /dev/null +++ b/src/inventory/adapters/output/db/postgres/store_id_exists.rs @@ -0,0 +1,79 @@ +// SPDX-FileCopyrightText: 2024 Aravinth Manivannan +// +// SPDX-License-Identifier: AGPL-3.0-or-later + +use super::InventoryDBPostgresAdapter; +use crate::inventory::application::port::output::db::{errors::*, store_id_exists::*}; +use crate::inventory::domain::store_aggregate::*; + +#[async_trait::async_trait] +impl StoreIDExistsDBPort for InventoryDBPostgresAdapter { + async fn store_id_exists(&self, s: &Store) -> InventoryDBResult { + let res = sqlx::query!( + "SELECT EXISTS ( + SELECT 1 + FROM cqrs_inventory_store_query + WHERE + store_id = $1 + );", + s.store_id(), + ) + .fetch_one(&self.pool) + .await?; + if let Some(x) = res.exists { + Ok(x) + } else { + Ok(false) + } + } +} + +#[cfg(test)] +mod tests { + use uuid::Uuid; + + use super::*; + + #[actix_rt::test] + async fn test_postgres_store_exists() { + let store_id = Uuid::new_v4(); + let settings = crate::settings::tests::get_settings().await; + settings.create_db().await; + let db = super::InventoryDBPostgresAdapter::new( + sqlx::postgres::PgPool::connect(&settings.database.url) + .await + .unwrap(), + ); + + let store = StoreBuilder::default() + .name("store_name".into()) + .owner("store_owner".into()) + .address(Some("store_address".into())) + .store_id(store_id) + .build() + .unwrap(); + + // state doesn't exist + assert!(!db.store_id_exists(&store).await.unwrap()); + + sqlx::query!( + "INSERT INTO cqrs_inventory_store_query + (view_id, version, name, address, store_id, owner) + VALUES ($1, $2, $3, $4, $5, $6);", + "1", + 1, + store.name(), + store.address().as_ref().unwrap(), + store.store_id(), + store.owner(), + ) + .execute(&db.pool) + .await + .unwrap(); + + // state exists + assert!(db.store_id_exists(&store).await.unwrap()); + + settings.drop_db().await; + } +} diff --git a/src/inventory/adapters/output/db/postgres/store_view.rs b/src/inventory/adapters/output/db/postgres/store_view.rs new file mode 100644 index 0000000..42fac61 --- /dev/null +++ b/src/inventory/adapters/output/db/postgres/store_view.rs @@ -0,0 +1,175 @@ +// SPDX-FileCopyrightText: 2024 Aravinth Manivannan +// +// SPDX-License-Identifier: AGPL-3.0-or-later + +use async_trait::async_trait; +use cqrs_es::persist::GenericQuery; +use cqrs_es::persist::{PersistenceError, ViewContext, ViewRepository}; +use cqrs_es::{EventEnvelope, Query, View}; +use serde::{Deserialize, Serialize}; +use uuid::Uuid; + +use super::errors::*; +use super::InventoryDBPostgresAdapter; +use crate::inventory::domain::events::InventoryEvent; +use crate::inventory::domain::store_aggregate::Store; + +// The view for a Store query, for a standard http application this should +// be designed to reflect the response dto that will be returned to a user. +#[derive(Debug, Default, Serialize, Deserialize)] +pub struct StoreView { + name: String, + address: Option, + store_id: Uuid, + owner: String, +} + +// This updates the view with events as they are committed. +// The logic should be minimal here, e.g., don't calculate the account balance, +// design the events to carry the balance information instead. +impl View for StoreView { + fn update(&mut self, event: &EventEnvelope) { + match &event.payload { + InventoryEvent::StoreAdded(val) => { + self.name = val.name().into(); + self.address = val.address().clone(); + self.store_id = val.store_id().clone(); + self.owner = val.owner().clone(); + } + _ => (), + } + } +} + +#[async_trait] +impl ViewRepository for InventoryDBPostgresAdapter { + async fn load(&self, view_id: &str) -> Result, PersistenceError> { + let res = sqlx::query_as!( + StoreView, + "SELECT + name, address, store_id, owner + FROM + cqrs_inventory_store_query + WHERE + view_id = $1;", + view_id + ) + .fetch_one(&self.pool) + .await + .map_err(PostgresAggregateError::from)?; + Ok(Some(res)) + } + + async fn load_with_context( + &self, + view_id: &str, + ) -> Result, PersistenceError> { + let res = sqlx::query_as!( + StoreView, + "SELECT + name, address, store_id, owner + FROM + cqrs_inventory_store_query + WHERE + view_id = $1;", + view_id + ) + .fetch_one(&self.pool) + .await + .map_err(PostgresAggregateError::from)?; + + struct Context { + version: i64, + view_id: String, + } + + let ctx = sqlx::query_as!( + Context, + "SELECT + view_id, version + FROM + cqrs_inventory_store_query + WHERE + view_id = $1;", + view_id + ) + .fetch_one(&self.pool) + .await + .map_err(PostgresAggregateError::from)?; + + let view_context = ViewContext::new(ctx.view_id, ctx.version); + Ok(Some((res, view_context))) + } + + async fn update_view( + &self, + view: StoreView, + context: ViewContext, + ) -> Result<(), PersistenceError> { + match context.version { + 0 => { + let version = context.version + 1; + sqlx::query!( + "INSERT INTO cqrs_inventory_store_query ( + view_id, version, name, address, store_id, owner + ) VALUES ( + $1, $2, $3, $4, $5, $6 + );", + context.view_instance_id, + version, + view.name, + view.address, + view.store_id, + view.owner, + ) + .execute(&self.pool) + .await + .map_err(PostgresAggregateError::from)?; + } + _ => { + let version = context.version + 1; + sqlx::query!( + "UPDATE + cqrs_inventory_store_query + SET + view_id = $1, + version = $2, + name = $3, + address = $4, + store_id = $5, + owner = $6;", + context.view_instance_id, + version, + view.name, + view.address, + view.store_id, + view.owner, + ) + .execute(&self.pool) + .await + .map_err(PostgresAggregateError::from)?; + } + } + + Ok(()) + } +} + +pub struct SimpleLoggingQuery {} + +// Our simplest query, this is great for debugging but absolutely useless in production. +// This query just pretty prints the events as they are processed. +#[async_trait] +impl Query for SimpleLoggingQuery { + async fn dispatch(&self, aggregate_id: &str, events: &[EventEnvelope]) { + for event in events { + let payload = serde_json::to_string_pretty(&event.payload).unwrap(); + println!("{}-{}\n{}", aggregate_id, event.sequence, payload); + } + } +} + +// Our second query, this one will be handled with Postgres `GenericQuery` +// which will serialize and persist our view after it is updated. It also +// provides a `load` method to deserialize the view on request. +pub type StoreQuery = GenericQuery; diff --git a/src/inventory/application/port/output/db/errors.rs b/src/inventory/application/port/output/db/errors.rs new file mode 100644 index 0000000..3d80dce --- /dev/null +++ b/src/inventory/application/port/output/db/errors.rs @@ -0,0 +1,15 @@ +// SPDX-FileCopyrightText: 2024 Aravinth Manivannan +// +// SPDX-License-Identifier: AGPL-3.0-or-later + +use derive_more::Display; +use serde::{Deserialize, Serialize}; + +pub type InventoryDBResult = Result; + +#[derive(Debug, Display, Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)] +pub enum InventoryDBError { + DuplicateCategoryName, + DuplicateStoreID, + InternalError, +} diff --git a/src/inventory/application/port/output/db/mod.rs b/src/inventory/application/port/output/db/mod.rs new file mode 100644 index 0000000..7e35ef6 --- /dev/null +++ b/src/inventory/application/port/output/db/mod.rs @@ -0,0 +1,7 @@ +// SPDX-FileCopyrightText: 2024 Aravinth Manivannan +// +// SPDX-License-Identifier: AGPL-3.0-or-later + +pub mod category_exists; +pub mod errors; +pub mod store_id_exists; diff --git a/src/inventory/application/port/output/db/store_id_exists.rs b/src/inventory/application/port/output/db/store_id_exists.rs new file mode 100644 index 0000000..a1321af --- /dev/null +++ b/src/inventory/application/port/output/db/store_id_exists.rs @@ -0,0 +1,54 @@ +// SPDX-FileCopyrightText: 2024 Aravinth Manivannan +// +// SPDX-License-Identifier: AGPL-3.0-or-later + +use mockall::predicate::*; +use mockall::*; + +use crate::inventory::domain::store_aggregate::Store; + +use super::errors::*; +#[cfg(test)] +#[allow(unused_imports)] +pub use tests::*; + +#[automock] +#[async_trait::async_trait] +pub trait StoreIDExistsDBPort: Send + Sync { + async fn store_id_exists(&self, s: &Store) -> InventoryDBResult; +} + +pub type StoreIDExistsDBPortObj = std::sync::Arc; + +#[cfg(test)] +pub mod tests { + use super::*; + + use std::sync::Arc; + + pub fn mock_store_id_exists_db_port_false(times: Option) -> StoreIDExistsDBPortObj { + let mut m = MockStoreIDExistsDBPort::new(); + if let Some(times) = times { + m.expect_store_id_exists() + .times(times) + .returning(|_| Ok(false)); + } else { + m.expect_store_id_exists().returning(|_| Ok(false)); + } + + Arc::new(m) + } + + pub fn mock_store_id_exists_db_port_true(times: Option) -> StoreIDExistsDBPortObj { + let mut m = MockStoreIDExistsDBPort::new(); + if let Some(times) = times { + m.expect_store_id_exists() + .times(times) + .returning(|_| Ok(true)); + } else { + m.expect_store_id_exists().returning(|_| Ok(true)); + } + + Arc::new(m) + } +} diff --git a/src/inventory/application/services/add_store_service.rs b/src/inventory/application/services/add_store_service.rs new file mode 100644 index 0000000..bdb5c38 --- /dev/null +++ b/src/inventory/application/services/add_store_service.rs @@ -0,0 +1,91 @@ +// SPDX-FileCopyrightText: 2024 Aravinth Manivannan +// +// SPDX-License-Identifier: AGPL-3.0-or-later + +use derive_builder::Builder; +use uuid::Uuid; + +use super::errors::*; +use crate::inventory::{ + application::port::output::db::{errors::InventoryDBError, store_id_exists::*}, + domain::{ + add_store_command::AddStoreCommand, + store_added_event::{StoreAddedEvent, StoreAddedEventBuilder}, + store_aggregate::*, + }, +}; + +#[async_trait::async_trait] +pub trait AddStoreUseCase: Send + Sync { + async fn add_store(&self, cmd: AddStoreCommand) -> InventoryResult; +} + +pub type AddStoreServiceObj = std::sync::Arc; + +#[derive(Clone, Builder)] +pub struct AddStoreService { + db_store_id_exists: StoreIDExistsDBPortObj, +} + +#[async_trait::async_trait] +impl AddStoreUseCase for AddStoreService { + async fn add_store(&self, cmd: AddStoreCommand) -> InventoryResult { + let mut store_id = Uuid::new_v4(); + let mut store = StoreBuilder::default() + .name(cmd.name().into()) + .address(cmd.address().as_ref().map(|s| s.to_string())) + .owner(cmd.owner().into()) + .store_id(store_id.clone()) + .build() + .unwrap(); + loop { + if self.db_store_id_exists.store_id_exists(&store).await? { + store_id = Uuid::new_v4(); + store = StoreBuilder::default() + .name(cmd.name().into()) + .address(cmd.address().as_ref().map(|s| s.to_string())) + .store_id(store_id.clone()) + .build() + .unwrap(); + continue; + } else { + break; + } + } + + Ok(StoreAddedEventBuilder::default() + .name(store.name().into()) + .address(store.address().as_ref().map(|s| s.to_string())) + .owner(cmd.owner().into()) + .store_id(store_id.clone()) + .build() + .unwrap()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + use crate::tests::bdd::*; + + #[actix_rt::test] + async fn test_service_store_id_doesnt_exist() { + let name = "foo"; + let address = "bar"; + let username = "baz"; + + // address = None + let cmd = AddStoreCommand::new(name.into(), Some(address.into()), username.into()).unwrap(); + + let s = AddStoreServiceBuilder::default() + .db_store_id_exists(mock_store_id_exists_db_port_false(IS_CALLED_ONLY_ONCE)) + .build() + .unwrap(); + + let res = s.add_store(cmd.clone()).await.unwrap(); + assert_eq!(res.name(), cmd.name()); + assert_eq!(res.address(), cmd.address()); + assert_eq!(res.owner(), cmd.owner()); + } +} diff --git a/src/inventory/application/services/errors.rs b/src/inventory/application/services/errors.rs new file mode 100644 index 0000000..3a091f3 --- /dev/null +++ b/src/inventory/application/services/errors.rs @@ -0,0 +1,30 @@ +// SPDX-FileCopyrightText: 2024 Aravinth Manivannan +// +// SPDX-License-Identifier: AGPL-3.0-or-later + +use derive_more::{Display, Error}; +use log::error; +use serde::{Deserialize, Serialize}; + +use crate::inventory::application::port::output::db::errors::InventoryDBError; + +pub type InventoryResult = Result; + +#[derive(Debug, Error, Display, Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)] +pub enum InventoryError { + DuplicateCategoryName, + InternalError, +} + +impl From for InventoryError { + fn from(value: InventoryDBError) -> Self { + match value { + InventoryDBError::DuplicateCategoryName => Self::DuplicateCategoryName, + InventoryDBError::DuplicateStoreID => { + error!("DuplicateStoreID"); + Self::InternalError + } + InventoryDBError::InternalError => Self::InternalError, + } + } +} diff --git a/src/inventory/application/services/mod.rs b/src/inventory/application/services/mod.rs index 56f60de..7f5e3e0 100644 --- a/src/inventory/application/services/mod.rs +++ b/src/inventory/application/services/mod.rs @@ -1,3 +1,33 @@ // SPDX-FileCopyrightText: 2024 Aravinth Manivannan // // SPDX-License-Identifier: AGPL-3.0-or-later +use derive_builder::Builder; +use mockall::predicate::*; +use mockall::*; + +pub mod errors; + +// services +pub mod add_category_service; +pub mod add_store_service; + +#[automock] +pub trait InventoryServicesInterface: Send + Sync { + fn add_store(&self) -> add_store_service::AddStoreServiceObj; + fn add_category(&self) -> add_category_service::AddCategoryServiceObj; +} + +#[derive(Clone, Builder)] +pub struct InventoryServices { + add_store: add_store_service::AddStoreServiceObj, + add_category: add_category_service::AddCategoryServiceObj, +} + +impl InventoryServicesInterface for InventoryServices { + fn add_store(&self) -> add_store_service::AddStoreServiceObj { + self.add_store.clone() + } + fn add_category(&self) -> add_category_service::AddCategoryServiceObj { + self.add_category.clone() + } +} diff --git a/src/inventory/domain/add_store_command.rs b/src/inventory/domain/add_store_command.rs new file mode 100644 index 0000000..1a57f60 --- /dev/null +++ b/src/inventory/domain/add_store_command.rs @@ -0,0 +1,79 @@ +// SPDX-FileCopyrightText: 2024 Aravinth Manivannan +// +// SPDX-License-Identifier: AGPL-3.0-or-later + +use derive_getters::Getters; +use derive_more::{Display, Error}; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Error, Display, Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)] +pub enum AddStoreCommandError { + NameIsEmpty, +} + +#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd, Getters)] +pub struct AddStoreCommand { + name: String, + address: Option, + owner: String, +} + +impl AddStoreCommand { + pub fn new( + name: String, + address: Option, + owner: String, + ) -> Result { + let address: Option = if let Some(address) = address { + let address = address.trim(); + if address.is_empty() { + None + } else { + Some(address.to_owned()) + } + } else { + None + }; + + let name = name.trim().to_owned(); + if name.is_empty() { + return Err(AddStoreCommandError::NameIsEmpty); + } + + Ok(Self { + name, + address, + owner, + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_cmd() { + let name = "foo"; + let address = "bar"; + let username = "baz"; + + // address = None + let cmd = AddStoreCommand::new(name.into(), None, username.into()).unwrap(); + assert_eq!(cmd.name(), name); + assert_eq!(cmd.address(), &None); + assert_eq!(cmd.owner(), username); + + // address = Some + let cmd = AddStoreCommand::new(name.into(), Some(address.into()), username.into()).unwrap(); + assert_eq!(cmd.name(), name); + assert_eq!(cmd.address(), &Some(address.to_owned())); + assert_eq!(cmd.owner(), username); + + // AddStoreCommandError::NameIsEmpty + assert_eq!( + AddStoreCommand::new("".into(), Some(address.into()), username.into()), + Err(AddStoreCommandError::NameIsEmpty) + ) + } +} diff --git a/src/inventory/domain/store_added_event.rs b/src/inventory/domain/store_added_event.rs new file mode 100644 index 0000000..7a67bda --- /dev/null +++ b/src/inventory/domain/store_added_event.rs @@ -0,0 +1,18 @@ +// SPDX-FileCopyrightText: 2024 Aravinth Manivannan +// +// SPDX-License-Identifier: AGPL-3.0-or-later + +use derive_builder::Builder; +use derive_getters::Getters; +use serde::{Deserialize, Serialize}; +use uuid::Uuid; + +#[derive( + Clone, Debug, Builder, Serialize, Deserialize, Getters, Eq, PartialEq, Ord, PartialOrd, +)] +pub struct StoreAddedEvent { + name: String, + address: Option, + owner: String, + store_id: Uuid, +} diff --git a/src/inventory/domain/store_aggregate.rs b/src/inventory/domain/store_aggregate.rs new file mode 100644 index 0000000..50b8f6b --- /dev/null +++ b/src/inventory/domain/store_aggregate.rs @@ -0,0 +1,74 @@ +// SPDX-FileCopyrightText: 2024 Aravinth Manivannan +// +// SPDX-License-Identifier: AGPL-3.0-or-later + +use async_trait::async_trait; +use cqrs_es::Aggregate; +use derive_builder::Builder; +use derive_getters::Getters; +use serde::{Deserialize, Serialize}; +use uuid::Uuid; + +use crate::inventory::application::services::errors::*; +use crate::inventory::application::services::InventoryServicesInterface; + +use super::{commands::InventoryCommand, events::InventoryEvent}; + +#[derive( + Clone, Default, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd, Builder, Getters, +)] +pub struct Store { + name: String, + address: Option, + owner: String, + store_id: Uuid, +} + +#[async_trait] +impl Aggregate for Store { + type Command = InventoryCommand; + type Event = InventoryEvent; + type Error = InventoryError; + type Services = std::sync::Arc; + + // This identifier should be unique to the system. + fn aggregate_type() -> String { + "inventory.store".to_string() + } + + // The aggregate logic goes here. Note that this will be the _bulk_ of a CQRS system + // so expect to use helper functions elsewhere to keep the code clean. + async fn handle( + &self, + command: Self::Command, + services: &Self::Services, + ) -> Result, Self::Error> { + match command { + InventoryCommand::AddStore(cmd) => { + let res = services.add_store().add_store(cmd).await?; + Ok(vec![InventoryEvent::StoreAdded(res)]) + } + _ => Ok(Vec::default()), + // InventoryCommand::AddCategory(cmd) => { + // let res = services.add_category().add_store(cmd).await?; + // Ok(vec![InventoryEvent::CategoryAdded(res)]) + // + // } + } + } + + fn apply(&mut self, event: Self::Event) { + match event { + InventoryEvent::StoreAdded(e) => { + *self = StoreBuilder::default() + .name(e.name().into()) + .address(e.address().as_ref().map(|s| s.to_string())) + .owner(e.owner().into()) + .store_id(e.store_id().clone()) + .build() + .unwrap(); + } + _ => (), + } + } +} From 126ab58aa0049a2a6f2571010e2123d370d62a5b Mon Sep 17 00:00:00 2001 From: Aravinth Manivannan Date: Sat, 13 Jul 2024 19:34:52 +0530 Subject: [PATCH 2/3] fix: sqlx offline compilation --- ...e88525fba13a20c0587239e833616ff4b7887.json | 22 ++++++++++ ...6ba1f559293608196e540788947789cad244c.json | 40 +++++++++++++++++++ ...1def62da1715c65214a4d404a5ce551c501bd.json | 19 +++++++++ ...859bc6e2fb1a75da0ed248debf126ba39c1d1.json | 28 +++++++++++++ ...4a3e182ba5f4a303c142668956be3846c6d18.json | 19 +++++++++ 5 files changed, 128 insertions(+) create mode 100644 .sqlx/query-2877f56715e3768acad466dc46be88525fba13a20c0587239e833616ff4b7887.json create mode 100644 .sqlx/query-4d7b12d7ff5e008054bab7aa61d6ba1f559293608196e540788947789cad244c.json create mode 100644 .sqlx/query-bb92f2d0c882653e92f2837353c1def62da1715c65214a4d404a5ce551c501bd.json create mode 100644 .sqlx/query-c6e90aa1e4fc851ac37318ada15859bc6e2fb1a75da0ed248debf126ba39c1d1.json create mode 100644 .sqlx/query-eb97288edbbbe8c5f30454040584a3e182ba5f4a303c142668956be3846c6d18.json diff --git a/.sqlx/query-2877f56715e3768acad466dc46be88525fba13a20c0587239e833616ff4b7887.json b/.sqlx/query-2877f56715e3768acad466dc46be88525fba13a20c0587239e833616ff4b7887.json new file mode 100644 index 0000000..ee4ee15 --- /dev/null +++ b/.sqlx/query-2877f56715e3768acad466dc46be88525fba13a20c0587239e833616ff4b7887.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT EXISTS (\n SELECT 1\n FROM cqrs_inventory_store_query\n WHERE\n store_id = $1\n );", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "exists", + "type_info": "Bool" + } + ], + "parameters": { + "Left": [ + "Uuid" + ] + }, + "nullable": [ + null + ] + }, + "hash": "2877f56715e3768acad466dc46be88525fba13a20c0587239e833616ff4b7887" +} diff --git a/.sqlx/query-4d7b12d7ff5e008054bab7aa61d6ba1f559293608196e540788947789cad244c.json b/.sqlx/query-4d7b12d7ff5e008054bab7aa61d6ba1f559293608196e540788947789cad244c.json new file mode 100644 index 0000000..b15a991 --- /dev/null +++ b/.sqlx/query-4d7b12d7ff5e008054bab7aa61d6ba1f559293608196e540788947789cad244c.json @@ -0,0 +1,40 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT \n name, address, store_id, owner\n FROM\n cqrs_inventory_store_query\n WHERE\n view_id = $1;", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "name", + "type_info": "Text" + }, + { + "ordinal": 1, + "name": "address", + "type_info": "Text" + }, + { + "ordinal": 2, + "name": "store_id", + "type_info": "Uuid" + }, + { + "ordinal": 3, + "name": "owner", + "type_info": "Text" + } + ], + "parameters": { + "Left": [ + "Text" + ] + }, + "nullable": [ + false, + true, + false, + false + ] + }, + "hash": "4d7b12d7ff5e008054bab7aa61d6ba1f559293608196e540788947789cad244c" +} diff --git a/.sqlx/query-bb92f2d0c882653e92f2837353c1def62da1715c65214a4d404a5ce551c501bd.json b/.sqlx/query-bb92f2d0c882653e92f2837353c1def62da1715c65214a4d404a5ce551c501bd.json new file mode 100644 index 0000000..e112a23 --- /dev/null +++ b/.sqlx/query-bb92f2d0c882653e92f2837353c1def62da1715c65214a4d404a5ce551c501bd.json @@ -0,0 +1,19 @@ +{ + "db_name": "PostgreSQL", + "query": "UPDATE\n cqrs_inventory_store_query\n SET\n view_id = $1,\n version = $2,\n name = $3,\n address = $4,\n store_id = $5,\n owner = $6;", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Text", + "Int8", + "Text", + "Text", + "Uuid", + "Text" + ] + }, + "nullable": [] + }, + "hash": "bb92f2d0c882653e92f2837353c1def62da1715c65214a4d404a5ce551c501bd" +} diff --git a/.sqlx/query-c6e90aa1e4fc851ac37318ada15859bc6e2fb1a75da0ed248debf126ba39c1d1.json b/.sqlx/query-c6e90aa1e4fc851ac37318ada15859bc6e2fb1a75da0ed248debf126ba39c1d1.json new file mode 100644 index 0000000..cb8771e --- /dev/null +++ b/.sqlx/query-c6e90aa1e4fc851ac37318ada15859bc6e2fb1a75da0ed248debf126ba39c1d1.json @@ -0,0 +1,28 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT \n view_id, version\n FROM\n cqrs_inventory_store_query\n WHERE\n view_id = $1;", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "view_id", + "type_info": "Text" + }, + { + "ordinal": 1, + "name": "version", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + "Text" + ] + }, + "nullable": [ + false, + false + ] + }, + "hash": "c6e90aa1e4fc851ac37318ada15859bc6e2fb1a75da0ed248debf126ba39c1d1" +} diff --git a/.sqlx/query-eb97288edbbbe8c5f30454040584a3e182ba5f4a303c142668956be3846c6d18.json b/.sqlx/query-eb97288edbbbe8c5f30454040584a3e182ba5f4a303c142668956be3846c6d18.json new file mode 100644 index 0000000..9aed436 --- /dev/null +++ b/.sqlx/query-eb97288edbbbe8c5f30454040584a3e182ba5f4a303c142668956be3846c6d18.json @@ -0,0 +1,19 @@ +{ + "db_name": "PostgreSQL", + "query": "INSERT INTO cqrs_inventory_store_query (\n view_id, version, name, address, store_id, owner\n ) VALUES (\n $1, $2, $3, $4, $5, $6\n );", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Text", + "Int8", + "Text", + "Text", + "Uuid", + "Text" + ] + }, + "nullable": [] + }, + "hash": "eb97288edbbbe8c5f30454040584a3e182ba5f4a303c142668956be3846c6d18" +} From c56d13b196a4db08636d16b2656ad70e79b1a17a Mon Sep 17 00:00:00 2001 From: Aravinth Manivannan Date: Sat, 13 Jul 2024 19:37:30 +0530 Subject: [PATCH 3/3] feat: cqrs_es scaffolding --- Cargo.lock | 10 ++++-- Cargo.toml | 3 +- ...40713080804_cqrs_inventory_store_query.sql | 32 +++++++++++++++++ src/inventory/adapters/output/db/mod.rs | 1 + src/inventory/adapters/output/mod.rs | 2 ++ src/inventory/application/mod.rs | 4 +-- src/inventory/application/port/mod.rs | 4 +-- .../application/port/output/db/mod.rs | 2 +- src/inventory/application/port/output/mod.rs | 2 ++ src/inventory/application/services/mod.rs | 12 +++---- src/inventory/domain/commands.rs | 17 ++++++++++ src/inventory/domain/events.rs | 34 +++++++++++++++++++ src/inventory/domain/mod.rs | 16 +++++++++ 13 files changed, 125 insertions(+), 14 deletions(-) create mode 100644 migrations/20240713080804_cqrs_inventory_store_query.sql create mode 100644 src/inventory/adapters/output/db/mod.rs create mode 100644 src/inventory/domain/commands.rs create mode 100644 src/inventory/domain/events.rs diff --git a/Cargo.lock b/Cargo.lock index 324e8cd..a393bec 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3228,6 +3228,7 @@ dependencies = [ "tokio-stream", "tracing", "url", + "uuid", "webpki-roots 0.25.4", ] @@ -3310,6 +3311,7 @@ dependencies = [ "thiserror", "time", "tracing", + "uuid", "whoami", ] @@ -3349,6 +3351,7 @@ dependencies = [ "thiserror", "time", "tracing", + "uuid", "whoami", ] @@ -3374,6 +3377,7 @@ dependencies = [ "tracing", "url", "urlencoding", + "uuid", ] [[package]] @@ -3952,11 +3956,12 @@ checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" [[package]] name = "uuid" -version = "1.8.0" +version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a183cf7feeba97b4dd1c0d46788634f6221d87fa961b305bed08c851829efcc0" +checksum = "81dfa00651efa65069b0b6b651f4aaa31ba9e3c3ce0137aaad053604ee7e0314" dependencies = [ "getrandom", + "serde", ] [[package]] @@ -4062,6 +4067,7 @@ dependencies = [ "tracing", "tracing-actix-web", "url", + "uuid", "validator 0.18.1", ] diff --git a/Cargo.toml b/Cargo.toml index 69d620c..bbab54e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,12 +28,13 @@ rand = "0.8.5" rust-embed = { version = "8.4.0", features = ["include-exclude"] } serde = { version = "1.0.201", features = ["derive"] } serde_json = "1.0.117" -sqlx = { version = "0.7.4", features = ["runtime-tokio-rustls", "postgres", "time"] } +sqlx = { version = "0.7.4", features = ["runtime-tokio-rustls", "postgres", "time", "uuid"] } tera = "1.19.1" time = { version = "0.3.36", features = ["serde"] } tracing = { version = "0.1.40", features = ["log"] } tracing-actix-web = "0.7.10" url = { version = "2.5.0", features = ["serde"] } +uuid = { version = "1.10.0", features = ["v4", "serde"] } validator = { version = "0.18.1", features = ["derive"] } [dev-dependencies] diff --git a/migrations/20240713080804_cqrs_inventory_store_query.sql b/migrations/20240713080804_cqrs_inventory_store_query.sql new file mode 100644 index 0000000..3d2d7b9 --- /dev/null +++ b/migrations/20240713080804_cqrs_inventory_store_query.sql @@ -0,0 +1,32 @@ +--- SPDX-FileCopyrightText: 2024 Aravinth Manivannan +-- +-- SPDX-License-Identifier: AGPL-3.0-or-later + +CREATE TABLE IF NOT EXISTS categoty_events +( + aggregate_type text NOT NULL, + aggregate_id text NOT NULL, + sequence bigint CHECK (sequence >= 0) NOT NULL, + event_type text NOT NULL, + event_version text NOT NULL, + payload json NOT NULL, + metadata json NOT NULL, + timestamp timestamp with time zone DEFAULT (CURRENT_TIMESTAMP), + PRIMARY KEY (aggregate_type, aggregate_id, sequence) +); + +CREATE TABLE IF NOT EXISTS cqrs_inventory_store_query +( + view_id text NOT NULL, + version bigint CHECK (version >= 0) NOT NULL, + + name TEXT NOT NULL, + address TEXT, + owner TEXT NOT NULL, + store_id UUID NOT NULL UNIQUE, + deleted BOOLEAN NOT NULL DEFAULT FALSE, + + PRIMARY KEY (view_id) +); + +CREATE UNIQUE INDEX IF NOT EXISTS store_store_id_index ON cqrs_inventory_store_query (store_id); diff --git a/src/inventory/adapters/output/db/mod.rs b/src/inventory/adapters/output/db/mod.rs new file mode 100644 index 0000000..26e9103 --- /dev/null +++ b/src/inventory/adapters/output/db/mod.rs @@ -0,0 +1 @@ +pub mod postgres; diff --git a/src/inventory/adapters/output/mod.rs b/src/inventory/adapters/output/mod.rs index 56f60de..4589484 100644 --- a/src/inventory/adapters/output/mod.rs +++ b/src/inventory/adapters/output/mod.rs @@ -1,3 +1,5 @@ // SPDX-FileCopyrightText: 2024 Aravinth Manivannan // // SPDX-License-Identifier: AGPL-3.0-or-later + +mod db; diff --git a/src/inventory/application/mod.rs b/src/inventory/application/mod.rs index 357da8f..2f75b72 100644 --- a/src/inventory/application/mod.rs +++ b/src/inventory/application/mod.rs @@ -2,5 +2,5 @@ // // SPDX-License-Identifier: AGPL-3.0-or-later -mod port; -mod services; +pub mod port; +pub mod services; diff --git a/src/inventory/application/port/mod.rs b/src/inventory/application/port/mod.rs index 9b25f58..f571c4e 100644 --- a/src/inventory/application/port/mod.rs +++ b/src/inventory/application/port/mod.rs @@ -2,5 +2,5 @@ // // SPDX-License-Identifier: AGPL-3.0-or-later -mod input; -mod output; +pub mod input; +pub mod output; diff --git a/src/inventory/application/port/output/db/mod.rs b/src/inventory/application/port/output/db/mod.rs index 7e35ef6..efa524e 100644 --- a/src/inventory/application/port/output/db/mod.rs +++ b/src/inventory/application/port/output/db/mod.rs @@ -2,6 +2,6 @@ // // SPDX-License-Identifier: AGPL-3.0-or-later -pub mod category_exists; +//pub mod category_exists; pub mod errors; pub mod store_id_exists; diff --git a/src/inventory/application/port/output/mod.rs b/src/inventory/application/port/output/mod.rs index 56f60de..1589173 100644 --- a/src/inventory/application/port/output/mod.rs +++ b/src/inventory/application/port/output/mod.rs @@ -1,3 +1,5 @@ // SPDX-FileCopyrightText: 2024 Aravinth Manivannan // // SPDX-License-Identifier: AGPL-3.0-or-later + +pub mod db; diff --git a/src/inventory/application/services/mod.rs b/src/inventory/application/services/mod.rs index 7f5e3e0..85d1758 100644 --- a/src/inventory/application/services/mod.rs +++ b/src/inventory/application/services/mod.rs @@ -8,26 +8,26 @@ use mockall::*; pub mod errors; // services -pub mod add_category_service; +//pub mod add_category_service; pub mod add_store_service; #[automock] pub trait InventoryServicesInterface: Send + Sync { fn add_store(&self) -> add_store_service::AddStoreServiceObj; - fn add_category(&self) -> add_category_service::AddCategoryServiceObj; + // fn add_category(&self) -> add_category_service::AddCategoryServiceObj; } #[derive(Clone, Builder)] pub struct InventoryServices { add_store: add_store_service::AddStoreServiceObj, - add_category: add_category_service::AddCategoryServiceObj, + // add_category: add_category_service::AddCategoryServiceObj, } impl InventoryServicesInterface for InventoryServices { fn add_store(&self) -> add_store_service::AddStoreServiceObj { self.add_store.clone() } - fn add_category(&self) -> add_category_service::AddCategoryServiceObj { - self.add_category.clone() - } + // fn add_category(&self) -> add_category_service::AddCategoryServiceObj { + // self.add_category.clone() + // } } diff --git a/src/inventory/domain/commands.rs b/src/inventory/domain/commands.rs new file mode 100644 index 0000000..8f6db77 --- /dev/null +++ b/src/inventory/domain/commands.rs @@ -0,0 +1,17 @@ +// SPDX-FileCopyrightText: 2024 Aravinth Manivannan +// +// SPDX-License-Identifier: AGPL-3.0-or-later + +use mockall::predicate::*; +use serde::{Deserialize, Serialize}; + +use super::{ + // add_category_command::AddCategoryCommand, + add_store_command::AddStoreCommand, +}; + +#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)] +pub enum InventoryCommand { + //AddCategory(AddCategoryCommand), + AddStore(AddStoreCommand), +} diff --git a/src/inventory/domain/events.rs b/src/inventory/domain/events.rs new file mode 100644 index 0000000..4f468f0 --- /dev/null +++ b/src/inventory/domain/events.rs @@ -0,0 +1,34 @@ +// SPDX-FileCopyrightText: 2024 Aravinth Manivannan +// +// SPDX-License-Identifier: AGPL-3.0-or-later + +use cqrs_es::DomainEvent; +use serde::{Deserialize, Serialize}; + +use super::{ + // category_added_event::*, + store_added_event::StoreAddedEvent, +}; + +#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)] +pub enum InventoryEvent { + // CategoryAdded(CategoryAddedEvent), + StoreAdded(StoreAddedEvent), +} + +//TODO: define password type that takes string and converts to hash + +impl DomainEvent for InventoryEvent { + fn event_version(&self) -> String { + "1.0".to_string() + } + + fn event_type(&self) -> String { + let e: &str = match self { + // InventoryEvent::CategoryAdded { .. } => "InventoryCategoryAdded", + InventoryEvent::StoreAdded { .. } => "InventoryStoredded", + }; + + e.to_string() + } +} diff --git a/src/inventory/domain/mod.rs b/src/inventory/domain/mod.rs index 56f60de..8d475b8 100644 --- a/src/inventory/domain/mod.rs +++ b/src/inventory/domain/mod.rs @@ -1,3 +1,19 @@ // SPDX-FileCopyrightText: 2024 Aravinth Manivannan // // SPDX-License-Identifier: AGPL-3.0-or-later + +// aggregates +//pub mod money_aggregate; +//pub mod product_aggregate; +//pub mod stock_aggregate; +pub mod store_aggregate; + +// commands +//pub mod add_category_command; +pub mod add_store_command; +pub mod commands; + +// events +//pub mod category_added_event; +pub mod events; +pub mod store_added_event;