From 3a3fcdc7b73b547ad636ab0ca8b7c65052909af9 Mon Sep 17 00:00:00 2001 From: Aravinth Manivannan Date: Thu, 9 Jan 2025 01:14:09 +0530 Subject: [PATCH] feat: cqrs init utils for identity Employee & User --- .../output/db/postgres/employee_view.rs | 18 +++++++++- .../adapters/output/db/postgres/user_view.rs | 33 ++++++++++++++++++- 2 files changed, 49 insertions(+), 2 deletions(-) diff --git a/src/identity/adapters/output/db/postgres/employee_view.rs b/src/identity/adapters/output/db/postgres/employee_view.rs index 8037ee3..f742dcf 100644 --- a/src/identity/adapters/output/db/postgres/employee_view.rs +++ b/src/identity/adapters/output/db/postgres/employee_view.rs @@ -3,6 +3,7 @@ // SPDX-License-Identifier: AGPL-3.0-or-later use std::str::FromStr; +use std::sync::Arc; use async_trait::async_trait; use cqrs_es::persist::{PersistenceError, ViewContext, ViewRepository}; @@ -13,7 +14,8 @@ use uuid::Uuid; use super::errors::*; use super::DBOutPostgresAdapter; -use crate::identity::application::services::events::IdentityEvent; +use crate::identity::adapters::types::{IdentityEmployeeCqrsExec, IdentityEmployeeCqrsView}; +use crate::identity::application::services::{events::IdentityEvent, IdentityServicesObj}; use crate::identity::domain::employee_aggregate::*; use crate::types::currency::{self, Currency, PriceBuilder}; use crate::utils::parse_aggregate_id::parse_aggregate_id; @@ -287,6 +289,20 @@ impl Query for DBOutPostgresAdapter { } } +pub fn init_cqrs( + db: DBOutPostgresAdapter, + services: IdentityServicesObj, +) -> (IdentityEmployeeCqrsExec, IdentityEmployeeCqrsView) { + let queries: Vec>> = vec![Box::new(db.clone())]; + + let pool = db.pool.clone(); + + ( + Arc::new(postgres_es::postgres_cqrs(pool.clone(), queries, services)), + Arc::new(db.clone()), + ) +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/identity/adapters/output/db/postgres/user_view.rs b/src/identity/adapters/output/db/postgres/user_view.rs index c5429a1..8105a1c 100644 --- a/src/identity/adapters/output/db/postgres/user_view.rs +++ b/src/identity/adapters/output/db/postgres/user_view.rs @@ -1,6 +1,7 @@ // SPDX-FileCopyrightText: 2024 Aravinth Manivannan // // SPDX-License-Identifier: AGPL-3.0-or-later +use std::sync::Arc; use async_trait::async_trait; use cqrs_es::persist::GenericQuery; @@ -11,7 +12,8 @@ use uuid::Uuid; use super::errors::*; use super::DBOutPostgresAdapter; -use crate::identity::application::services::events::IdentityEvent; +use crate::identity::adapters::types::{IdentityUserCqrsExec, IdentityUserCqrsView}; +use crate::identity::application::services::{events::IdentityEvent, IdentityServicesObj}; use crate::identity::domain::aggregate::User; use crate::utils::parse_aggregate_id::parse_aggregate_id; @@ -206,7 +208,36 @@ impl Query for SimpleLoggingQuery { } } +#[async_trait] +impl Query for DBOutPostgresAdapter { + async fn dispatch(&self, user_id: &str, events: &[EventEnvelope]) { + let res = self + .load_with_context(user_id) + .await + .unwrap_or_else(|_| Some((UserView::default(), ViewContext::new(user_id.into(), 0)))); + let (mut view, view_context): (UserView, ViewContext) = res.unwrap(); + for event in events { + view.update(event); + } + self.update_view(view, view_context).await.unwrap(); + } +} + // Our second query, this one will be handled with Postgres `GenericQuery` // which will serialize and persist our view after it is updated. It also // provides a `load` method to deserialize the view on request. pub type UserQuery = GenericQuery; + +pub fn init_cqrs( + db: DBOutPostgresAdapter, + services: IdentityServicesObj, +) -> (IdentityUserCqrsExec, IdentityUserCqrsView) { + let queries: Vec>> = vec![Box::new(db.clone())]; + + let pool = db.pool.clone(); + + ( + Arc::new(postgres_es::postgres_cqrs(pool.clone(), queries, services)), + Arc::new(db.clone()), + ) +}