diff --git a/src/inventory/adapters/output/db/postgres/errors.rs b/src/inventory/adapters/output/db/postgres/errors.rs new file mode 100644 index 0000000..2676699 --- /dev/null +++ b/src/inventory/adapters/output/db/postgres/errors.rs @@ -0,0 +1,79 @@ +// SPDX-FileCopyrightText: 2024 Aravinth Manivannan +// +// SPDX-License-Identifier: AGPL-3.0-or-later + +use std::borrow::Cow; + +use cqrs_es::persist::PersistenceError; +use sqlx::Error as SqlxError; + +use crate::inventory::application::port::output::db::errors::InventoryDBError; + +impl From for InventoryDBError { + fn from(e: SqlxError) -> Self { + log::error!("[postgres] err: {}", e); + if let SqlxError::Database(err) = e { + if err.code() == Some(Cow::from("23505")) { + let msg = err.message(); + if msg.contains("cqrs_inventory_store_query_store_id_key") { + return Self::DuplicateStoreID; + } else { + println!("{msg}"); + } + } + } + Self::InternalError + } +} + +/// map custom row not found error to DB error +pub fn map_row_not_found_err(e: SqlxError, row_not_found: InventoryDBError) -> InventoryDBError { + if let SqlxError::RowNotFound = e { + row_not_found + } else { + e.into() + } +} + +#[derive(Debug)] +pub enum PostgresAggregateError { + OptimisticLock, + ConnectionError(Box), + DeserializationError(Box), + UnknownError(Box), +} + +impl From for PostgresAggregateError { + fn from(err: SqlxError) -> Self { + // TODO: improve error handling + match &err { + SqlxError::Database(database_error) => { + if let Some(code) = database_error.code() { + if code.as_ref() == "23505" { + return PostgresAggregateError::OptimisticLock; + } + } + PostgresAggregateError::UnknownError(Box::new(err)) + } + SqlxError::Io(_) | SqlxError::Tls(_) => { + PostgresAggregateError::ConnectionError(Box::new(err)) + } + _ => PostgresAggregateError::UnknownError(Box::new(err)), + } + } +} + +impl From for PersistenceError { + fn from(err: PostgresAggregateError) -> Self { + match err { + PostgresAggregateError::OptimisticLock => PersistenceError::OptimisticLockError, + PostgresAggregateError::ConnectionError(error) => { + PersistenceError::ConnectionError(error) + } + PostgresAggregateError::DeserializationError(error) => { + PersistenceError::UnknownError(error) + } + PostgresAggregateError::UnknownError(error) => PersistenceError::UnknownError(error), + } + } +} diff --git a/src/inventory/adapters/output/db/postgres/mod.rs b/src/inventory/adapters/output/db/postgres/mod.rs new file mode 100644 index 0000000..f743e32 --- /dev/null +++ b/src/inventory/adapters/output/db/postgres/mod.rs @@ -0,0 +1,27 @@ +// SPDX-FileCopyrightText: 2024 Aravinth Manivannan +// +// SPDX-License-Identifier: AGPL-3.0-or-later +use std::sync::Arc; + +use sqlx::postgres::PgPool; + +use crate::db::{migrate::RunMigrations, sqlx_postgres::Postgres}; + +mod errors; +mod store_id_exists; +mod store_view; + +#[derive(Clone)] +pub struct InventoryDBPostgresAdapter { + pool: PgPool, +} + +impl InventoryDBPostgresAdapter { + pub fn new(pool: PgPool) -> Self { + Self { pool } + } + + pub fn migratable(&self) -> Arc { + Arc::new(Postgres::new(self.pool.clone())) + } +} diff --git a/src/inventory/adapters/output/db/postgres/store_id_exists.rs b/src/inventory/adapters/output/db/postgres/store_id_exists.rs new file mode 100644 index 0000000..35f66ad --- /dev/null +++ b/src/inventory/adapters/output/db/postgres/store_id_exists.rs @@ -0,0 +1,79 @@ +// SPDX-FileCopyrightText: 2024 Aravinth Manivannan +// +// SPDX-License-Identifier: AGPL-3.0-or-later + +use super::InventoryDBPostgresAdapter; +use crate::inventory::application::port::output::db::{errors::*, store_id_exists::*}; +use crate::inventory::domain::store_aggregate::*; + +#[async_trait::async_trait] +impl StoreIDExistsDBPort for InventoryDBPostgresAdapter { + async fn store_id_exists(&self, s: &Store) -> InventoryDBResult { + let res = sqlx::query!( + "SELECT EXISTS ( + SELECT 1 + FROM cqrs_inventory_store_query + WHERE + store_id = $1 + );", + s.store_id(), + ) + .fetch_one(&self.pool) + .await?; + if let Some(x) = res.exists { + Ok(x) + } else { + Ok(false) + } + } +} + +#[cfg(test)] +mod tests { + use uuid::Uuid; + + use super::*; + + #[actix_rt::test] + async fn test_postgres_store_exists() { + let store_id = Uuid::new_v4(); + let settings = crate::settings::tests::get_settings().await; + settings.create_db().await; + let db = super::InventoryDBPostgresAdapter::new( + sqlx::postgres::PgPool::connect(&settings.database.url) + .await + .unwrap(), + ); + + let store = StoreBuilder::default() + .name("store_name".into()) + .owner("store_owner".into()) + .address(Some("store_address".into())) + .store_id(store_id) + .build() + .unwrap(); + + // state doesn't exist + assert!(!db.store_id_exists(&store).await.unwrap()); + + sqlx::query!( + "INSERT INTO cqrs_inventory_store_query + (view_id, version, name, address, store_id, owner) + VALUES ($1, $2, $3, $4, $5, $6);", + "1", + 1, + store.name(), + store.address().as_ref().unwrap(), + store.store_id(), + store.owner(), + ) + .execute(&db.pool) + .await + .unwrap(); + + // state exists + assert!(db.store_id_exists(&store).await.unwrap()); + + settings.drop_db().await; + } +} diff --git a/src/inventory/adapters/output/db/postgres/store_view.rs b/src/inventory/adapters/output/db/postgres/store_view.rs new file mode 100644 index 0000000..42fac61 --- /dev/null +++ b/src/inventory/adapters/output/db/postgres/store_view.rs @@ -0,0 +1,175 @@ +// SPDX-FileCopyrightText: 2024 Aravinth Manivannan +// +// SPDX-License-Identifier: AGPL-3.0-or-later + +use async_trait::async_trait; +use cqrs_es::persist::GenericQuery; +use cqrs_es::persist::{PersistenceError, ViewContext, ViewRepository}; +use cqrs_es::{EventEnvelope, Query, View}; +use serde::{Deserialize, Serialize}; +use uuid::Uuid; + +use super::errors::*; +use super::InventoryDBPostgresAdapter; +use crate::inventory::domain::events::InventoryEvent; +use crate::inventory::domain::store_aggregate::Store; + +// The view for a Store query, for a standard http application this should +// be designed to reflect the response dto that will be returned to a user. +#[derive(Debug, Default, Serialize, Deserialize)] +pub struct StoreView { + name: String, + address: Option, + store_id: Uuid, + owner: String, +} + +// This updates the view with events as they are committed. +// The logic should be minimal here, e.g., don't calculate the account balance, +// design the events to carry the balance information instead. +impl View for StoreView { + fn update(&mut self, event: &EventEnvelope) { + match &event.payload { + InventoryEvent::StoreAdded(val) => { + self.name = val.name().into(); + self.address = val.address().clone(); + self.store_id = val.store_id().clone(); + self.owner = val.owner().clone(); + } + _ => (), + } + } +} + +#[async_trait] +impl ViewRepository for InventoryDBPostgresAdapter { + async fn load(&self, view_id: &str) -> Result, PersistenceError> { + let res = sqlx::query_as!( + StoreView, + "SELECT + name, address, store_id, owner + FROM + cqrs_inventory_store_query + WHERE + view_id = $1;", + view_id + ) + .fetch_one(&self.pool) + .await + .map_err(PostgresAggregateError::from)?; + Ok(Some(res)) + } + + async fn load_with_context( + &self, + view_id: &str, + ) -> Result, PersistenceError> { + let res = sqlx::query_as!( + StoreView, + "SELECT + name, address, store_id, owner + FROM + cqrs_inventory_store_query + WHERE + view_id = $1;", + view_id + ) + .fetch_one(&self.pool) + .await + .map_err(PostgresAggregateError::from)?; + + struct Context { + version: i64, + view_id: String, + } + + let ctx = sqlx::query_as!( + Context, + "SELECT + view_id, version + FROM + cqrs_inventory_store_query + WHERE + view_id = $1;", + view_id + ) + .fetch_one(&self.pool) + .await + .map_err(PostgresAggregateError::from)?; + + let view_context = ViewContext::new(ctx.view_id, ctx.version); + Ok(Some((res, view_context))) + } + + async fn update_view( + &self, + view: StoreView, + context: ViewContext, + ) -> Result<(), PersistenceError> { + match context.version { + 0 => { + let version = context.version + 1; + sqlx::query!( + "INSERT INTO cqrs_inventory_store_query ( + view_id, version, name, address, store_id, owner + ) VALUES ( + $1, $2, $3, $4, $5, $6 + );", + context.view_instance_id, + version, + view.name, + view.address, + view.store_id, + view.owner, + ) + .execute(&self.pool) + .await + .map_err(PostgresAggregateError::from)?; + } + _ => { + let version = context.version + 1; + sqlx::query!( + "UPDATE + cqrs_inventory_store_query + SET + view_id = $1, + version = $2, + name = $3, + address = $4, + store_id = $5, + owner = $6;", + context.view_instance_id, + version, + view.name, + view.address, + view.store_id, + view.owner, + ) + .execute(&self.pool) + .await + .map_err(PostgresAggregateError::from)?; + } + } + + Ok(()) + } +} + +pub struct SimpleLoggingQuery {} + +// Our simplest query, this is great for debugging but absolutely useless in production. +// This query just pretty prints the events as they are processed. +#[async_trait] +impl Query for SimpleLoggingQuery { + async fn dispatch(&self, aggregate_id: &str, events: &[EventEnvelope]) { + for event in events { + let payload = serde_json::to_string_pretty(&event.payload).unwrap(); + println!("{}-{}\n{}", aggregate_id, event.sequence, payload); + } + } +} + +// Our second query, this one will be handled with Postgres `GenericQuery` +// which will serialize and persist our view after it is updated. It also +// provides a `load` method to deserialize the view on request. +pub type StoreQuery = GenericQuery; diff --git a/src/inventory/application/port/output/db/errors.rs b/src/inventory/application/port/output/db/errors.rs new file mode 100644 index 0000000..3d80dce --- /dev/null +++ b/src/inventory/application/port/output/db/errors.rs @@ -0,0 +1,15 @@ +// SPDX-FileCopyrightText: 2024 Aravinth Manivannan +// +// SPDX-License-Identifier: AGPL-3.0-or-later + +use derive_more::Display; +use serde::{Deserialize, Serialize}; + +pub type InventoryDBResult = Result; + +#[derive(Debug, Display, Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)] +pub enum InventoryDBError { + DuplicateCategoryName, + DuplicateStoreID, + InternalError, +} diff --git a/src/inventory/application/port/output/db/mod.rs b/src/inventory/application/port/output/db/mod.rs new file mode 100644 index 0000000..7e35ef6 --- /dev/null +++ b/src/inventory/application/port/output/db/mod.rs @@ -0,0 +1,7 @@ +// SPDX-FileCopyrightText: 2024 Aravinth Manivannan +// +// SPDX-License-Identifier: AGPL-3.0-or-later + +pub mod category_exists; +pub mod errors; +pub mod store_id_exists; diff --git a/src/inventory/application/port/output/db/store_id_exists.rs b/src/inventory/application/port/output/db/store_id_exists.rs new file mode 100644 index 0000000..a1321af --- /dev/null +++ b/src/inventory/application/port/output/db/store_id_exists.rs @@ -0,0 +1,54 @@ +// SPDX-FileCopyrightText: 2024 Aravinth Manivannan +// +// SPDX-License-Identifier: AGPL-3.0-or-later + +use mockall::predicate::*; +use mockall::*; + +use crate::inventory::domain::store_aggregate::Store; + +use super::errors::*; +#[cfg(test)] +#[allow(unused_imports)] +pub use tests::*; + +#[automock] +#[async_trait::async_trait] +pub trait StoreIDExistsDBPort: Send + Sync { + async fn store_id_exists(&self, s: &Store) -> InventoryDBResult; +} + +pub type StoreIDExistsDBPortObj = std::sync::Arc; + +#[cfg(test)] +pub mod tests { + use super::*; + + use std::sync::Arc; + + pub fn mock_store_id_exists_db_port_false(times: Option) -> StoreIDExistsDBPortObj { + let mut m = MockStoreIDExistsDBPort::new(); + if let Some(times) = times { + m.expect_store_id_exists() + .times(times) + .returning(|_| Ok(false)); + } else { + m.expect_store_id_exists().returning(|_| Ok(false)); + } + + Arc::new(m) + } + + pub fn mock_store_id_exists_db_port_true(times: Option) -> StoreIDExistsDBPortObj { + let mut m = MockStoreIDExistsDBPort::new(); + if let Some(times) = times { + m.expect_store_id_exists() + .times(times) + .returning(|_| Ok(true)); + } else { + m.expect_store_id_exists().returning(|_| Ok(true)); + } + + Arc::new(m) + } +} diff --git a/src/inventory/application/services/add_store_service.rs b/src/inventory/application/services/add_store_service.rs new file mode 100644 index 0000000..bdb5c38 --- /dev/null +++ b/src/inventory/application/services/add_store_service.rs @@ -0,0 +1,91 @@ +// SPDX-FileCopyrightText: 2024 Aravinth Manivannan +// +// SPDX-License-Identifier: AGPL-3.0-or-later + +use derive_builder::Builder; +use uuid::Uuid; + +use super::errors::*; +use crate::inventory::{ + application::port::output::db::{errors::InventoryDBError, store_id_exists::*}, + domain::{ + add_store_command::AddStoreCommand, + store_added_event::{StoreAddedEvent, StoreAddedEventBuilder}, + store_aggregate::*, + }, +}; + +#[async_trait::async_trait] +pub trait AddStoreUseCase: Send + Sync { + async fn add_store(&self, cmd: AddStoreCommand) -> InventoryResult; +} + +pub type AddStoreServiceObj = std::sync::Arc; + +#[derive(Clone, Builder)] +pub struct AddStoreService { + db_store_id_exists: StoreIDExistsDBPortObj, +} + +#[async_trait::async_trait] +impl AddStoreUseCase for AddStoreService { + async fn add_store(&self, cmd: AddStoreCommand) -> InventoryResult { + let mut store_id = Uuid::new_v4(); + let mut store = StoreBuilder::default() + .name(cmd.name().into()) + .address(cmd.address().as_ref().map(|s| s.to_string())) + .owner(cmd.owner().into()) + .store_id(store_id.clone()) + .build() + .unwrap(); + loop { + if self.db_store_id_exists.store_id_exists(&store).await? { + store_id = Uuid::new_v4(); + store = StoreBuilder::default() + .name(cmd.name().into()) + .address(cmd.address().as_ref().map(|s| s.to_string())) + .store_id(store_id.clone()) + .build() + .unwrap(); + continue; + } else { + break; + } + } + + Ok(StoreAddedEventBuilder::default() + .name(store.name().into()) + .address(store.address().as_ref().map(|s| s.to_string())) + .owner(cmd.owner().into()) + .store_id(store_id.clone()) + .build() + .unwrap()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + use crate::tests::bdd::*; + + #[actix_rt::test] + async fn test_service_store_id_doesnt_exist() { + let name = "foo"; + let address = "bar"; + let username = "baz"; + + // address = None + let cmd = AddStoreCommand::new(name.into(), Some(address.into()), username.into()).unwrap(); + + let s = AddStoreServiceBuilder::default() + .db_store_id_exists(mock_store_id_exists_db_port_false(IS_CALLED_ONLY_ONCE)) + .build() + .unwrap(); + + let res = s.add_store(cmd.clone()).await.unwrap(); + assert_eq!(res.name(), cmd.name()); + assert_eq!(res.address(), cmd.address()); + assert_eq!(res.owner(), cmd.owner()); + } +} diff --git a/src/inventory/application/services/errors.rs b/src/inventory/application/services/errors.rs new file mode 100644 index 0000000..3a091f3 --- /dev/null +++ b/src/inventory/application/services/errors.rs @@ -0,0 +1,30 @@ +// SPDX-FileCopyrightText: 2024 Aravinth Manivannan +// +// SPDX-License-Identifier: AGPL-3.0-or-later + +use derive_more::{Display, Error}; +use log::error; +use serde::{Deserialize, Serialize}; + +use crate::inventory::application::port::output::db::errors::InventoryDBError; + +pub type InventoryResult = Result; + +#[derive(Debug, Error, Display, Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)] +pub enum InventoryError { + DuplicateCategoryName, + InternalError, +} + +impl From for InventoryError { + fn from(value: InventoryDBError) -> Self { + match value { + InventoryDBError::DuplicateCategoryName => Self::DuplicateCategoryName, + InventoryDBError::DuplicateStoreID => { + error!("DuplicateStoreID"); + Self::InternalError + } + InventoryDBError::InternalError => Self::InternalError, + } + } +} diff --git a/src/inventory/application/services/mod.rs b/src/inventory/application/services/mod.rs index 56f60de..7f5e3e0 100644 --- a/src/inventory/application/services/mod.rs +++ b/src/inventory/application/services/mod.rs @@ -1,3 +1,33 @@ // SPDX-FileCopyrightText: 2024 Aravinth Manivannan // // SPDX-License-Identifier: AGPL-3.0-or-later +use derive_builder::Builder; +use mockall::predicate::*; +use mockall::*; + +pub mod errors; + +// services +pub mod add_category_service; +pub mod add_store_service; + +#[automock] +pub trait InventoryServicesInterface: Send + Sync { + fn add_store(&self) -> add_store_service::AddStoreServiceObj; + fn add_category(&self) -> add_category_service::AddCategoryServiceObj; +} + +#[derive(Clone, Builder)] +pub struct InventoryServices { + add_store: add_store_service::AddStoreServiceObj, + add_category: add_category_service::AddCategoryServiceObj, +} + +impl InventoryServicesInterface for InventoryServices { + fn add_store(&self) -> add_store_service::AddStoreServiceObj { + self.add_store.clone() + } + fn add_category(&self) -> add_category_service::AddCategoryServiceObj { + self.add_category.clone() + } +} diff --git a/src/inventory/domain/add_store_command.rs b/src/inventory/domain/add_store_command.rs new file mode 100644 index 0000000..1a57f60 --- /dev/null +++ b/src/inventory/domain/add_store_command.rs @@ -0,0 +1,79 @@ +// SPDX-FileCopyrightText: 2024 Aravinth Manivannan +// +// SPDX-License-Identifier: AGPL-3.0-or-later + +use derive_getters::Getters; +use derive_more::{Display, Error}; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Error, Display, Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)] +pub enum AddStoreCommandError { + NameIsEmpty, +} + +#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd, Getters)] +pub struct AddStoreCommand { + name: String, + address: Option, + owner: String, +} + +impl AddStoreCommand { + pub fn new( + name: String, + address: Option, + owner: String, + ) -> Result { + let address: Option = if let Some(address) = address { + let address = address.trim(); + if address.is_empty() { + None + } else { + Some(address.to_owned()) + } + } else { + None + }; + + let name = name.trim().to_owned(); + if name.is_empty() { + return Err(AddStoreCommandError::NameIsEmpty); + } + + Ok(Self { + name, + address, + owner, + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_cmd() { + let name = "foo"; + let address = "bar"; + let username = "baz"; + + // address = None + let cmd = AddStoreCommand::new(name.into(), None, username.into()).unwrap(); + assert_eq!(cmd.name(), name); + assert_eq!(cmd.address(), &None); + assert_eq!(cmd.owner(), username); + + // address = Some + let cmd = AddStoreCommand::new(name.into(), Some(address.into()), username.into()).unwrap(); + assert_eq!(cmd.name(), name); + assert_eq!(cmd.address(), &Some(address.to_owned())); + assert_eq!(cmd.owner(), username); + + // AddStoreCommandError::NameIsEmpty + assert_eq!( + AddStoreCommand::new("".into(), Some(address.into()), username.into()), + Err(AddStoreCommandError::NameIsEmpty) + ) + } +} diff --git a/src/inventory/domain/store_added_event.rs b/src/inventory/domain/store_added_event.rs new file mode 100644 index 0000000..7a67bda --- /dev/null +++ b/src/inventory/domain/store_added_event.rs @@ -0,0 +1,18 @@ +// SPDX-FileCopyrightText: 2024 Aravinth Manivannan +// +// SPDX-License-Identifier: AGPL-3.0-or-later + +use derive_builder::Builder; +use derive_getters::Getters; +use serde::{Deserialize, Serialize}; +use uuid::Uuid; + +#[derive( + Clone, Debug, Builder, Serialize, Deserialize, Getters, Eq, PartialEq, Ord, PartialOrd, +)] +pub struct StoreAddedEvent { + name: String, + address: Option, + owner: String, + store_id: Uuid, +} diff --git a/src/inventory/domain/store_aggregate.rs b/src/inventory/domain/store_aggregate.rs new file mode 100644 index 0000000..50b8f6b --- /dev/null +++ b/src/inventory/domain/store_aggregate.rs @@ -0,0 +1,74 @@ +// SPDX-FileCopyrightText: 2024 Aravinth Manivannan +// +// SPDX-License-Identifier: AGPL-3.0-or-later + +use async_trait::async_trait; +use cqrs_es::Aggregate; +use derive_builder::Builder; +use derive_getters::Getters; +use serde::{Deserialize, Serialize}; +use uuid::Uuid; + +use crate::inventory::application::services::errors::*; +use crate::inventory::application::services::InventoryServicesInterface; + +use super::{commands::InventoryCommand, events::InventoryEvent}; + +#[derive( + Clone, Default, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd, Builder, Getters, +)] +pub struct Store { + name: String, + address: Option, + owner: String, + store_id: Uuid, +} + +#[async_trait] +impl Aggregate for Store { + type Command = InventoryCommand; + type Event = InventoryEvent; + type Error = InventoryError; + type Services = std::sync::Arc; + + // This identifier should be unique to the system. + fn aggregate_type() -> String { + "inventory.store".to_string() + } + + // The aggregate logic goes here. Note that this will be the _bulk_ of a CQRS system + // so expect to use helper functions elsewhere to keep the code clean. + async fn handle( + &self, + command: Self::Command, + services: &Self::Services, + ) -> Result, Self::Error> { + match command { + InventoryCommand::AddStore(cmd) => { + let res = services.add_store().add_store(cmd).await?; + Ok(vec![InventoryEvent::StoreAdded(res)]) + } + _ => Ok(Vec::default()), + // InventoryCommand::AddCategory(cmd) => { + // let res = services.add_category().add_store(cmd).await?; + // Ok(vec![InventoryEvent::CategoryAdded(res)]) + // + // } + } + } + + fn apply(&mut self, event: Self::Event) { + match event { + InventoryEvent::StoreAdded(e) => { + *self = StoreBuilder::default() + .name(e.name().into()) + .address(e.address().as_ref().map(|s| s.to_string())) + .owner(e.owner().into()) + .store_id(e.store_id().clone()) + .build() + .unwrap(); + } + _ => (), + } + } +}