feat: access CQRS executors from a trait obj

This commit is contained in:
Aravinth Manivannan 2025-01-10 15:47:43 +05:30
parent 8a75214017
commit 17a7e58f44
Signed by: realaravinth
GPG key ID: F8F50389936984FF
4 changed files with 57 additions and 20 deletions

View file

@ -53,12 +53,14 @@ pub fn load_adapters(pool: PgPool, settings: Settings) -> impl FnOnce(&mut web::
let (line_item_cqrs_exec, line_item_cqrs_query) = let (line_item_cqrs_exec, line_item_cqrs_query) =
line_item_view::init_cqrs(db.clone(), services.clone()); line_item_view::init_cqrs(db.clone(), services.clone());
let billing_cqrs_exec = types::BillingCqrsExecBuilder::default() let billing_cqrs_exec = types::WebBillingCqrsExec::new(Arc::new(
.bill(bill_cqrs_exec) types::BillingCqrsExecBuilder::default()
.line_item(line_item_cqrs_exec) .bill(bill_cqrs_exec)
.store(store_cqrs_exec) .line_item(line_item_cqrs_exec)
.build() .store(store_cqrs_exec)
.unwrap(); .build()
.unwrap(),
));
let f = move |cfg: &mut web::ServiceConfig| { let f = move |cfg: &mut web::ServiceConfig| {
cfg.configure(input::web::load_ctx()); cfg.configure(input::web::load_ctx());
@ -67,7 +69,7 @@ pub fn load_adapters(pool: PgPool, settings: Settings) -> impl FnOnce(&mut web::
cfg.app_data(Data::new(store_cqrs_query.clone())); cfg.app_data(Data::new(store_cqrs_query.clone()));
cfg.app_data(Data::new(line_item_cqrs_query.clone())); cfg.app_data(Data::new(line_item_cqrs_query.clone()));
cfg.app_data(Data::new(Arc::new(billing_cqrs_exec))); cfg.app_data(billing_cqrs_exec);
}; };
Box::new(f) Box::new(f)

View file

@ -6,8 +6,11 @@
use std::sync::Arc; use std::sync::Arc;
use actix_web::web::Data; use actix_web::web::Data;
use async_trait::async_trait;
use cqrs_es::{persist::ViewRepository, AggregateError}; use cqrs_es::{persist::ViewRepository, AggregateError};
use derive_builder::Builder; use derive_builder::Builder;
use mockall::predicate::*;
use mockall::*;
use postgres_es::PostgresCqrs; use postgres_es::PostgresCqrs;
use crate::billing::{ use crate::billing::{
@ -41,7 +44,17 @@ pub type BillingStoreCqrsExec = Arc<PostgresCqrs<Store>>;
pub type BillingStoreCqrsView = Arc<dyn ViewRepository<StoreView, Store>>; pub type BillingStoreCqrsView = Arc<dyn ViewRepository<StoreView, Store>>;
pub type WebBillingStoreCqrsView = Data<BillingStoreCqrsView>; pub type WebBillingStoreCqrsView = Data<BillingStoreCqrsView>;
pub type WebBillingCqrsExec = Data<Arc<BillingCqrsExec>>; pub type WebBillingCqrsExec = Data<Arc<dyn BillingCqrsExecutor>>;
#[automock]
#[async_trait]
pub trait BillingCqrsExecutor {
async fn execute(
&self,
aggregate_id: &str,
command: BillingCommand,
) -> Result<(), AggregateError<BillingError>>;
}
#[derive(Clone, Builder)] #[derive(Clone, Builder)]
pub struct BillingCqrsExec { pub struct BillingCqrsExec {
@ -50,8 +63,9 @@ pub struct BillingCqrsExec {
store: BillingStoreCqrsExec, store: BillingStoreCqrsExec,
} }
impl BillingCqrsExec { #[async_trait]
pub async fn execute( impl BillingCqrsExecutor for BillingCqrsExec {
async fn execute(
&self, &self,
aggregate_id: &str, aggregate_id: &str,
command: BillingCommand, command: BillingCommand,

View file

@ -61,17 +61,22 @@ pub fn load_adapters(pool: PgPool, settings: Settings) -> impl FnOnce(&mut web::
let (employee_cqrs_exec, employee_cqrs_query) = let (employee_cqrs_exec, employee_cqrs_query) =
employee_view::init_cqrs(db.clone(), services.clone()); employee_view::init_cqrs(db.clone(), services.clone());
let identity_cqrs_exec = types::WebIdentityCqrsExec::new(Arc::new(
types::IdentityCqrsExecBuilder::default()
.user(user_cqrs_exec)
.employee(employee_cqrs_exec)
.store(store_cqrs_exec)
.build()
.unwrap(),
));
let f = move |cfg: &mut web::ServiceConfig| { let f = move |cfg: &mut web::ServiceConfig| {
cfg.configure(input::web::load_ctx()); cfg.configure(input::web::load_ctx());
cfg.app_data(Data::new(user_cqrs_exec.clone()));
cfg.app_data(Data::new(user_cqrs_query.clone())); cfg.app_data(Data::new(user_cqrs_query.clone()));
cfg.app_data(Data::new(store_cqrs_exec.clone()));
cfg.app_data(Data::new(store_cqrs_query.clone())); cfg.app_data(Data::new(store_cqrs_query.clone()));
cfg.app_data(Data::new(employee_cqrs_exec.clone()));
cfg.app_data(Data::new(employee_cqrs_query.clone())); cfg.app_data(Data::new(employee_cqrs_query.clone()));
cfg.app_data(identity_cqrs_exec.clone());
}; };
Box::new(f) Box::new(f)

View file

@ -6,8 +6,11 @@
use std::sync::Arc; use std::sync::Arc;
use actix_web::web::Data; use actix_web::web::Data;
use async_trait::async_trait;
use cqrs_es::{persist::ViewRepository, AggregateError}; use cqrs_es::{persist::ViewRepository, AggregateError};
use derive_builder::Builder; use derive_builder::Builder;
use mockall::predicate::*;
use mockall::*;
use postgres_es::PostgresCqrs; use postgres_es::PostgresCqrs;
use crate::identity::{ use crate::identity::{
@ -36,22 +39,35 @@ pub type IdentityEmployeeCqrsExec = Arc<PostgresCqrs<Employee>>;
pub type IdentityEmployeeCqrsView = Arc<dyn ViewRepository<StoreView, Store>>; pub type IdentityEmployeeCqrsView = Arc<dyn ViewRepository<StoreView, Store>>;
pub type WebIdentityEmployeeCqrsView = Data<IdentityStoreCqrsView>; pub type WebIdentityEmployeeCqrsView = Data<IdentityStoreCqrsView>;
pub type WebIdentityCqrsExec = Data<Arc<IdentityCqrsExec>>; pub type WebIdentityCqrsExec = Data<Arc<dyn IdentityCqrsExecutor>>;
//
#[automock]
#[async_trait]
pub trait IdentityCqrsExecutor {
async fn execute(
&self,
aggregate_id: &str,
command: IdentityCommand,
) -> Result<(), AggregateError<IdentityError>>;
}
#[derive(Clone, Builder)] #[derive(Clone, Builder)]
pub struct IdentityCqrsExec { pub struct IdentityCqrsExec {
user: IdentityUserCqrsExec, user: IdentityUserCqrsExec,
store: IdentityStoreCqrsExec, store: IdentityStoreCqrsExec,
employee: IdentityEmployeeCqrsExec,
} }
impl IdentityCqrsExec { #[async_trait]
pub async fn execute( impl IdentityCqrsExecutor for IdentityCqrsExec {
async fn execute(
&self, &self,
aggregate_id: &str, aggregate_id: &str,
command: IdentityCommand, command: IdentityCommand,
) -> Result<(), AggregateError<IdentityError>> { ) -> Result<(), AggregateError<IdentityError>> {
self.user.execute(aggregate_id, command.clone()).await?; self.user.execute(aggregate_id, command.clone()).await?;
self.store.execute(aggregate_id, command).await?; self.store.execute(aggregate_id, command.clone()).await?;
self.employee.execute(aggregate_id, command).await?;
Ok(()) Ok(())
} }