From 309e02fe8ff96059e5eba30b09521723490dfbfc Mon Sep 17 00:00:00 2001 From: Aravinth Manivannan Date: Thu, 9 Jan 2025 01:17:30 +0530 Subject: [PATCH] feat: define store addapters for identity and impl Query for Store aggregate --- ...23105314_cqrs_identity_store_query.sql.sql | 16 + .../adapters/output/db/postgres/mod.rs | 4 +- .../output/db/postgres/store_id_exists.rs | 87 ++++ .../output/db/postgres/store_name_exists.rs | 81 ++++ .../adapters/output/db/postgres/store_view.rs | 371 ++++++++++++++++++ 5 files changed, 558 insertions(+), 1 deletion(-) create mode 100644 migrations/20241223105314_cqrs_identity_store_query.sql.sql create mode 100644 src/identity/adapters/output/db/postgres/store_id_exists.rs create mode 100644 src/identity/adapters/output/db/postgres/store_name_exists.rs create mode 100644 src/identity/adapters/output/db/postgres/store_view.rs diff --git a/migrations/20241223105314_cqrs_identity_store_query.sql.sql b/migrations/20241223105314_cqrs_identity_store_query.sql.sql new file mode 100644 index 0000000..25da194 --- /dev/null +++ b/migrations/20241223105314_cqrs_identity_store_query.sql.sql @@ -0,0 +1,16 @@ +--- SPDX-FileCopyrightText: 2024 Aravinth Manivannan +-- +-- SPDX-License-Identifier: AGPL-3.0-or-later + +CREATE TABLE IF NOT EXISTS cqrs_identity_store_query +( + version bigint CHECK (version >= 0) NOT NULL, + + name TEXT NOT NULL, + address TEXT, + owner UUID NOT NULL, + store_id UUID NOT NULL UNIQUE, + deleted BOOLEAN NOT NULL DEFAULT FALSE, + + PRIMARY KEY (store_id) +); diff --git a/src/identity/adapters/output/db/postgres/mod.rs b/src/identity/adapters/output/db/postgres/mod.rs index 5092ebd..9738fc7 100644 --- a/src/identity/adapters/output/db/postgres/mod.rs +++ b/src/identity/adapters/output/db/postgres/mod.rs @@ -15,6 +15,7 @@ pub mod email_exists; pub mod employee_view; mod errors; pub mod get_verification_secret; +pub mod store_view; pub mod user_id_exists; pub mod user_view; pub mod verification_secret_exists; @@ -31,7 +32,8 @@ pub mod phone_exists; //pub mod get_invite; //pub mod invite_id_exists; -//pub mod store_id_exists; +pub mod store_id_exists; +pub mod store_name_exists; #[derive(Clone)] pub struct DBOutPostgresAdapter { diff --git a/src/identity/adapters/output/db/postgres/store_id_exists.rs b/src/identity/adapters/output/db/postgres/store_id_exists.rs new file mode 100644 index 0000000..286e47b --- /dev/null +++ b/src/identity/adapters/output/db/postgres/store_id_exists.rs @@ -0,0 +1,87 @@ +// SPDX-FileCopyrightText: 2024 Aravinth Manivannan +// +// SPDX-License-Identifier: AGPL-3.0-or-later + +use uuid::Uuid; + +use super::DBOutPostgresAdapter; +use crate::identity::application::port::output::db::{errors::*, store_id_exists::*}; +use crate::identity::domain::store_aggregate::*; + +#[async_trait::async_trait] +impl StoreIDExistsDBPort for DBOutPostgresAdapter { + async fn store_id_exists(&self, store_id: &Uuid) -> OutDBPortResult { + let res = sqlx::query!( + "SELECT EXISTS ( + SELECT 1 + FROM cqrs_identity_store_query + WHERE + store_id = $1 + );", + store_id + ) + .fetch_one(&self.pool) + .await?; + if let Some(x) = res.exists { + Ok(x) + } else { + Ok(false) + } + } +} + +#[cfg(test)] +pub mod tests { + use uuid::Uuid; + + use crate::utils::uuid::tests::UUID; + + use super::*; + + pub async fn create_dummy_store_record(s: &Store, db: &DBOutPostgresAdapter) { + sqlx::query!( + "INSERT INTO cqrs_identity_store_query + (version, name, address, store_id, owner, deleted) + VALUES ($1, $2, $3, $4, $5 ,$6);", + 1, + s.name(), + s.address().as_ref().map(|s| s.as_str()), + s.store_id(), + s.owner(), + false + ) + .execute(&db.pool) + .await + .unwrap(); + } + + #[actix_rt::test] + async fn test_postgres_store_exists() { + let store_id = Uuid::new_v4(); + let settings = crate::settings::tests::get_settings().await; + settings.create_db().await; + let db = super::DBOutPostgresAdapter::new( + sqlx::postgres::PgPool::connect(&settings.database.url) + .await + .unwrap(), + ); + + let store = StoreBuilder::default() + .name("store_name".into()) + .owner(UUID) + .address(Some("store_address".into())) + .store_id(store_id) + .build() + .unwrap(); + + // state doesn't exist + assert!(!db.store_id_exists(store.store_id()).await.unwrap()); + + create_dummy_store_record(&store, &db).await; + + // state exists + assert!(db.store_id_exists(store.store_id()).await.unwrap()); + + settings.drop_db().await; + } +} diff --git a/src/identity/adapters/output/db/postgres/store_name_exists.rs b/src/identity/adapters/output/db/postgres/store_name_exists.rs new file mode 100644 index 0000000..ccb4d56 --- /dev/null +++ b/src/identity/adapters/output/db/postgres/store_name_exists.rs @@ -0,0 +1,81 @@ +// SPDX-FileCopyrightText: 2024 Aravinth Manivannan +// +// SPDX-License-Identifier: AGPL-3.0-or-later + +use super::DBOutPostgresAdapter; +use crate::identity::application::port::output::db::{errors::*, store_name_exists::*}; +use crate::identity::domain::store_aggregate::*; + +#[async_trait::async_trait] +impl StoreNameExistsDBPort for DBOutPostgresAdapter { + async fn store_name_exists(&self, s: &Store) -> OutDBPortResult { + let res = sqlx::query!( + "SELECT EXISTS ( + SELECT 1 + FROM cqrs_identity_store_query + WHERE + name = $1 + AND + deleted = false + );", + s.name(), + ) + .fetch_one(&self.pool) + .await?; + if let Some(x) = res.exists { + Ok(x) + } else { + Ok(false) + } + } +} + +#[cfg(test)] +mod tests { + use uuid::Uuid; + + use crate::utils::uuid::tests::UUID; + + use super::*; + use crate::identity::adapters::output::db::postgres::store_id_exists::tests::create_dummy_store_record; + + #[actix_rt::test] + async fn test_postgres_store_exists() { + let store_id = Uuid::new_v4(); + let settings = crate::settings::tests::get_settings().await; + settings.create_db().await; + let db = super::DBOutPostgresAdapter::new( + sqlx::postgres::PgPool::connect(&settings.database.url) + .await + .unwrap(), + ); + + let store = StoreBuilder::default() + .name("store_name".into()) + .owner(UUID) + .address(Some("store_address".into())) + .store_id(store_id) + .build() + .unwrap(); + + // state doesn't exist + assert!(!db.store_name_exists(&store).await.unwrap()); + + create_dummy_store_record(&store, &db).await; + + // state exists + assert!(db.store_name_exists(&store).await.unwrap()); + + // Set store.deleted = true; now db.store_name_exists must return false + sqlx::query!( + "UPDATE cqrs_identity_store_query SET deleted = true WHERE store_id = $1;", + store.store_id() + ) + .execute(&db.pool) + .await + .unwrap(); + assert!(!db.store_name_exists(&store).await.unwrap()); + + settings.drop_db().await; + } +} diff --git a/src/identity/adapters/output/db/postgres/store_view.rs b/src/identity/adapters/output/db/postgres/store_view.rs new file mode 100644 index 0000000..82c0b45 --- /dev/null +++ b/src/identity/adapters/output/db/postgres/store_view.rs @@ -0,0 +1,371 @@ +// SPDX-FileCopyrightText: 2024 Aravinth Manivannan +// +// SPDX-License-Identifier: AGPL-3.0-or-later +use std::sync::Arc; + +use async_trait::async_trait; +use cqrs_es::persist::{PersistenceError, ViewContext, ViewRepository}; +use cqrs_es::{EventEnvelope, Query, View}; +use serde::{Deserialize, Serialize}; +use uuid::Uuid; + +use super::errors::*; +use super::DBOutPostgresAdapter; +use crate::identity::adapters::types::{IdentityStoreCqrsExec, IdentityStoreCqrsView}; +use crate::identity::application::services::{ + events::IdentityEvent, IdentityCommand, IdentityServicesObj, +}; +use crate::identity::domain::store_aggregate::*; +use crate::utils::parse_aggregate_id::parse_aggregate_id; + +pub const NEW_STORE_NON_UUID: &str = "identity_new_store_non_uuid-asdfa"; + +// The view for a Store query, for a standard http application this should +// be designed to reflect the response dto that will be returned to a user. +#[derive(Debug, Default, Serialize, Deserialize)] +pub struct StoreView { + name: String, + address: Option, + store_id: Uuid, + owner: Uuid, + deleted: bool, +} + +impl From for Store { + fn from(value: StoreView) -> Self { + StoreBuilder::default() + .name(value.name) + .address(value.address) + .store_id(value.store_id) + .owner(value.owner) + .deleted(value.deleted) + .build() + .unwrap() + } +} + +// This updates the view with events as they are committed. +// The logic should be minimal here, e.g., don't calculate the account balance, +// design the events to carry the balance information instead. +impl View for StoreView { + fn update(&mut self, event: &EventEnvelope) { + match &event.payload { + IdentityEvent::StoreAdded(val) => { + self.name = val.name().into(); + self.address = val.address().clone(); + self.store_id = *val.store_id(); + self.owner = *val.owner(); + self.deleted = false; + } + IdentityEvent::StoreUpdated(e) => { + let val = e.new_store(); + self.name = val.name().into(); + self.address = val.address().clone(); + self.store_id = *val.store_id(); + self.owner = *val.owner(); + } + _ => (), + } + } +} + +#[async_trait] +impl ViewRepository for DBOutPostgresAdapter { + async fn load(&self, store_id: &str) -> Result, PersistenceError> { + let store_id = match parse_aggregate_id(store_id, NEW_STORE_NON_UUID)? { + Some((val, _)) => return Ok(Some(val)), + None => Uuid::parse_str(store_id).unwrap(), + }; + + let res = sqlx::query_as!( + StoreView, + "SELECT + name, address, store_id, owner, deleted + FROM + cqrs_identity_store_query + WHERE + store_id = $1;", + store_id + ) + .fetch_one(&self.pool) + .await + .map_err(PostgresAggregateError::from)?; + Ok(Some(res)) + } + + async fn load_with_context( + &self, + store_id: &str, + ) -> Result, PersistenceError> { + let store_id = match parse_aggregate_id(store_id, NEW_STORE_NON_UUID)? { + Some(val) => return Ok(Some(val)), + None => Uuid::parse_str(store_id).unwrap(), + }; + + let res = sqlx::query_as!( + StoreView, + "SELECT + name, address, store_id, owner, deleted + FROM + cqrs_identity_store_query + WHERE + store_id = $1;", + &store_id, + ) + .fetch_one(&self.pool) + .await + .map_err(PostgresAggregateError::from)?; + + struct Context { + version: i64, + store_id: Uuid, + } + + let ctx = sqlx::query_as!( + Context, + "SELECT + store_id, version + FROM + cqrs_identity_store_query + WHERE + store_id = $1;", + store_id + ) + .fetch_one(&self.pool) + .await + .map_err(PostgresAggregateError::from)?; + + let view_context = ViewContext::new(ctx.store_id.to_string(), ctx.version); + Ok(Some((res, view_context))) + } + + async fn update_view( + &self, + view: StoreView, + context: ViewContext, + ) -> Result<(), PersistenceError> { + match context.version { + 0 => { + let version = context.version + 1; + sqlx::query!( + "INSERT INTO cqrs_identity_store_query ( + version, name, address, store_id, owner, deleted + ) VALUES ( + $1, $2, $3, $4, $5, $6 + );", + version, + view.name, + view.address, + view.store_id, + view.owner, + view.deleted, + ) + .execute(&self.pool) + .await + .map_err(PostgresAggregateError::from)?; + } + _ => { + let version = context.version + 1; + sqlx::query!( + "UPDATE + cqrs_identity_store_query + SET + version = $1, + name = $2, + address = $3, + owner = $4, + deleted = $5;", + version, + view.name, + view.address, + view.owner, + view.deleted, + ) + .execute(&self.pool) + .await + .map_err(PostgresAggregateError::from)?; + } + } + + Ok(()) + } +} + +pub struct SimpleLoggingQuery {} + +// Our simplest query, this is great for debugging but absolutely useless in production. +// This query just pretty prints the events as they are processed. +#[async_trait] +impl Query for SimpleLoggingQuery { + async fn dispatch(&self, aggregate_id: &str, events: &[EventEnvelope]) { + for event in events { + let payload = serde_json::to_string_pretty(&event.payload).unwrap(); + println!("{}-{}\n{}", aggregate_id, event.sequence, payload); + } + } +} + +#[async_trait] +impl Query for DBOutPostgresAdapter { + async fn dispatch(&self, store_id: &str, events: &[EventEnvelope]) { + let res = self + .load_with_context(store_id) + .await + .unwrap_or_else(|_| Some((StoreView::default(), ViewContext::new(store_id.into(), 0)))); + let (mut view, view_context): (StoreView, ViewContext) = res.unwrap(); + for event in events { + view.update(event); + } + self.update_view(view, view_context).await.unwrap(); + } +} + +pub fn init_cqrs( + db: DBOutPostgresAdapter, + services: IdentityServicesObj, +) -> (IdentityStoreCqrsExec, IdentityStoreCqrsView) { + let queries: Vec>> = vec![Box::new(db.clone())]; + + let pool = db.pool.clone(); + + ( + Arc::new(postgres_es::postgres_cqrs(pool.clone(), queries, services)), + Arc::new(db.clone()), + ) +} + +#[cfg(test)] +mod tests { + use super::*; + + use postgres_es::PostgresCqrs; + + use crate::{ + db::migrate::*, + identity::{ + application::services::{ + add_store_service::AddStoreServiceBuilder, update_store_service::*, + MockIdentityServicesInterface, + }, + domain::add_store_command::*, + // domain::commands::IdentityCommand, + domain::update_store_command::*, + }, + tests::bdd::*, + utils::{random_string::GenerateRandomStringInterface, uuid::tests::UUID}, + }; + use std::sync::Arc; + + #[actix_rt::test] + async fn pg_query_identity_store_view() { + let settings = crate::settings::tests::get_settings().await; + //let settings = crate::settings::Settings::new().unwrap(); + settings.create_db().await; + + let db = crate::db::sqlx_postgres::Postgres::init(&settings.database.url).await; + db.migrate().await; + let db = DBOutPostgresAdapter::new(db.pool.clone()); + + let simple_query = SimpleLoggingQuery {}; + + let queries: Vec>> = + vec![Box::new(simple_query), Box::new(db.clone())]; + + let mut mock_services = MockIdentityServicesInterface::new(); + + let db2 = db.clone(); + mock_services + .expect_add_store() + .times(IS_CALLED_ONLY_ONCE.unwrap()) + .returning(move || { + Arc::new( + AddStoreServiceBuilder::default() + .db_store_id_exists(Arc::new(db2.clone())) + .db_store_name_exists(Arc::new(db2.clone())) + .build() + .unwrap(), + ) + }); + + let db2 = db.clone(); + mock_services + .expect_update_store() + .times(IS_CALLED_ONLY_ONCE.unwrap()) + .returning(move || { + Arc::new( + UpdateStoreServiceBuilder::default() + .db_store_id_exists(Arc::new(db2.clone())) + .db_store_name_exists(Arc::new(db2.clone())) + .build() + .unwrap(), + ) + }); + + let (cqrs, store_query): ( + Arc>, + Arc>, + ) = ( + Arc::new(postgres_es::postgres_cqrs( + db.pool.clone(), + queries, + Arc::new(mock_services), + )), + Arc::new(db.clone()), + ); + + let rand = crate::utils::random_string::GenerateRandomString {}; + let cmd = AddStoreCommandBuilder::default() + .name(rand.get_random(10)) + .address(None) + .owner(UUID) + .store_id(UUID) + .build() + .unwrap(); + cqrs.execute( + &cmd.store_id().to_string(), + IdentityCommand::AddStore(cmd.clone()), + ) + .await + .unwrap(); + + let store = store_query + .load(&(*cmd.store_id()).to_string()) + .await + .unwrap() + .unwrap(); + let store: Store = store.into(); + assert_eq!(store.name(), cmd.name()); + assert_eq!(store.address(), cmd.address()); + assert_eq!(store.owner(), cmd.owner()); + assert_eq!(store.store_id(), cmd.store_id()); + assert!(!store.deleted()); + + let update_store_cmd = UpdateStoreCommand::new( + rand.get_random(10), + Some(rand.get_random(10)), + UUID, + store, + UUID, + ) + .unwrap(); + cqrs.execute( + &cmd.store_id().to_string(), + IdentityCommand::UpdateStore(update_store_cmd.clone()), + ) + .await + .unwrap(); + let store = store_query + .load(&(*cmd.store_id()).to_string()) + .await + .unwrap() + .unwrap(); + let store: Store = store.into(); + assert_eq!(store.name(), update_store_cmd.name()); + assert_eq!(store.address(), update_store_cmd.address()); + assert_eq!(store.owner(), update_store_cmd.owner()); + assert_eq!(store.store_id(), update_store_cmd.old_store().store_id()); + assert!(!store.deleted()); + + settings.drop_db().await; + } +}