feat: cqrs init utils for identity Employee & User

This commit is contained in:
Aravinth Manivannan 2025-01-09 01:14:09 +05:30
parent ae9cbe953d
commit 3a3fcdc7b7
Signed by: realaravinth
GPG key ID: F8F50389936984FF
2 changed files with 49 additions and 2 deletions

View file

@ -3,6 +3,7 @@
// SPDX-License-Identifier: AGPL-3.0-or-later
use std::str::FromStr;
use std::sync::Arc;
use async_trait::async_trait;
use cqrs_es::persist::{PersistenceError, ViewContext, ViewRepository};
@ -13,7 +14,8 @@ use uuid::Uuid;
use super::errors::*;
use super::DBOutPostgresAdapter;
use crate::identity::application::services::events::IdentityEvent;
use crate::identity::adapters::types::{IdentityEmployeeCqrsExec, IdentityEmployeeCqrsView};
use crate::identity::application::services::{events::IdentityEvent, IdentityServicesObj};
use crate::identity::domain::employee_aggregate::*;
use crate::types::currency::{self, Currency, PriceBuilder};
use crate::utils::parse_aggregate_id::parse_aggregate_id;
@ -287,6 +289,20 @@ impl Query<Employee> for DBOutPostgresAdapter {
}
}
pub fn init_cqrs(
db: DBOutPostgresAdapter,
services: IdentityServicesObj,
) -> (IdentityEmployeeCqrsExec, IdentityEmployeeCqrsView) {
let queries: Vec<Box<dyn Query<Employee>>> = vec![Box::new(db.clone())];
let pool = db.pool.clone();
(
Arc::new(postgres_es::postgres_cqrs(pool.clone(), queries, services)),
Arc::new(db.clone()),
)
}
#[cfg(test)]
mod tests {
use super::*;

View file

@ -1,6 +1,7 @@
// SPDX-FileCopyrightText: 2024 Aravinth Manivannan <realaravinth@batsense.net>
//
// SPDX-License-Identifier: AGPL-3.0-or-later
use std::sync::Arc;
use async_trait::async_trait;
use cqrs_es::persist::GenericQuery;
@ -11,7 +12,8 @@ use uuid::Uuid;
use super::errors::*;
use super::DBOutPostgresAdapter;
use crate::identity::application::services::events::IdentityEvent;
use crate::identity::adapters::types::{IdentityUserCqrsExec, IdentityUserCqrsView};
use crate::identity::application::services::{events::IdentityEvent, IdentityServicesObj};
use crate::identity::domain::aggregate::User;
use crate::utils::parse_aggregate_id::parse_aggregate_id;
@ -206,7 +208,36 @@ impl Query<User> for SimpleLoggingQuery {
}
}
#[async_trait]
impl Query<User> for DBOutPostgresAdapter {
async fn dispatch(&self, user_id: &str, events: &[EventEnvelope<User>]) {
let res = self
.load_with_context(user_id)
.await
.unwrap_or_else(|_| Some((UserView::default(), ViewContext::new(user_id.into(), 0))));
let (mut view, view_context): (UserView, ViewContext) = res.unwrap();
for event in events {
view.update(event);
}
self.update_view(view, view_context).await.unwrap();
}
}
// Our second query, this one will be handled with Postgres `GenericQuery`
// which will serialize and persist our view after it is updated. It also
// provides a `load` method to deserialize the view on request.
pub type UserQuery = GenericQuery<DBOutPostgresAdapter, UserView, User>;
pub fn init_cqrs(
db: DBOutPostgresAdapter,
services: IdentityServicesObj,
) -> (IdentityUserCqrsExec, IdentityUserCqrsView) {
let queries: Vec<Box<dyn Query<User>>> = vec![Box::new(db.clone())];
let pool = db.pool.clone();
(
Arc::new(postgres_es::postgres_cqrs(pool.clone(), queries, services)),
Arc::new(db.clone()),
)
}