feat: identity: implement cqrs::Aggregate for User

This commit is contained in:
Aravinth Manivannan 2024-05-19 00:38:10 +05:30
parent 86a66a420c
commit d7ec43d4b2
Signed by: realaravinth
GPG key ID: F8F50389936984FF
5 changed files with 366 additions and 2 deletions

44
Cargo.lock generated
View file

@ -1251,6 +1251,21 @@ dependencies = [
"new_debug_unreachable",
]
[[package]]
name = "futures"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0"
dependencies = [
"futures-channel",
"futures-core",
"futures-executor",
"futures-io",
"futures-sink",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-channel"
version = "0.3.30"
@ -1295,6 +1310,17 @@ version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1"
[[package]]
name = "futures-macro"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.63",
]
[[package]]
name = "futures-sink"
version = "0.3.30"
@ -1313,8 +1339,10 @@ version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48"
dependencies = [
"futures-channel",
"futures-core",
"futures-io",
"futures-macro",
"futures-sink",
"futures-task",
"memchr",
@ -2433,6 +2461,21 @@ dependencies = [
"universal-hash",
]
[[package]]
name = "postgres-es"
version = "0.4.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "05fcd2f85b76ea7edb35f7d421c42b85f2a4964ce7a2f139851eb9a751929cc4"
dependencies = [
"async-trait",
"cqrs-es",
"futures",
"serde",
"serde_json",
"sqlx",
"tokio",
]
[[package]]
name = "powerfmt"
version = "0.2.0"
@ -3979,6 +4022,7 @@ dependencies = [
"lettre",
"log",
"mockall",
"postgres-es",
"pretty_env_logger",
"rand",
"reqwest",

View file

@ -22,10 +22,12 @@ derive_more = "0.99.17"
lettre = { version = "0.11.7", features = ["tokio1-rustls-tls", "tracing", "dkim", "tokio1-native-tls", "smtp-transport", "pool", "builder"], default-features = false }
log = "0.4.21"
mockall = { version = "0.12.1", features = ["nightly"] }
postgres-es = "0.4.11"
pretty_env_logger = "0.5.0"
rand = "0.8.5"
rust-embed = { version = "8.4.0", features = ["include-exclude"] }
serde = { version = "1.0.201", features = ["derive"] }
serde_json = "1.0.117"
sqlx = { version = "0.7.4", features = ["runtime-tokio-rustls", "postgres", "time"] }
tera = "1.19.1"
time = { version = "0.3.36", features = ["serde"] }
@ -36,4 +38,3 @@ validator = { version = "0.18.1", features = ["derive"] }
[dev-dependencies]
reqwest = { version = "0.12.4", features = ["json"] }
serde_json = "1.0.117"

View file

@ -3,4 +3,4 @@
// SPDX-License-Identifier: AGPL-3.0-or-later
mod input;
mod output;
pub mod output;

View file

@ -0,0 +1,318 @@
use async_trait::async_trait;
use cqrs_es::Aggregate;
use crate::identity::application::services::errors::*;
use crate::identity::application::services::events::UserEvent;
use crate::identity::application::services::UserCommand;
use crate::identity::application::services::UserServicesInterface;
use crate::identity::domain::aggregate::User;
use crate::identity::domain::aggregate::UserBuilder;
#[async_trait]
impl Aggregate for User {
type Command = UserCommand;
type Event = UserEvent;
type Error = IdentityError;
type Services = std::sync::Arc<dyn UserServicesInterface>;
// This identifier should be unique to the system.
fn aggregate_type() -> String {
"account".to_string()
}
// The aggregate logic goes here. Note that this will be the _bulk_ of a CQRS system
// so expect to use helper functions elsewhere to keep the code clean.
async fn handle(
&self,
command: Self::Command,
services: &Self::Services,
) -> Result<Vec<Self::Event>, Self::Error> {
match command {
UserCommand::RegisterUser(cmd) => {
let res = services.register_user().register_user(cmd).await?;
Ok(vec![UserEvent::UserRegistered(res)])
}
UserCommand::DeleteUser(cmd) => {
services.delete_user().delete_user(cmd).await;
Ok(vec![UserEvent::UserDeleted])
}
UserCommand::Login(cmd) => {
let res = services.login().login(cmd).await;
Ok(vec![UserEvent::Loggedin(res)])
}
UserCommand::UpdatePassword(cmd) => {
let res = services.update_password().update_password(cmd).await;
Ok(vec![UserEvent::PasswordUpdated(res)])
}
UserCommand::UpdateEmail(cmd) => {
let res = services.update_email().update_email(cmd).await?;
Ok(vec![UserEvent::EmailUpdated(res)])
}
UserCommand::MarkUserVerified(cmd) => {
services
.mark_user_verified()
.mark_user_verified(cmd)
.await?;
Ok(vec![UserEvent::UserVerified])
}
UserCommand::SetAdmin(cmd) => {
let res = services.set_user_admin().set_user_admin(cmd).await;
Ok(vec![UserEvent::UserPromotedToAdmin(res)])
}
UserCommand::ResendVerificationEmail(cmd) => {
services
.resend_verification_email()
.resend_verification_email(cmd)
.await?;
Ok(vec![UserEvent::VerificationEmailResent])
}
}
}
fn apply(&mut self, event: Self::Event) {
match event {
UserEvent::UserRegistered(e) => {
UserBuilder::default()
.username(e.username().into())
.email(e.email().into())
.hashed_password(e.hashed_password().into())
.is_admin(e.is_admin().to_owned())
.is_verified(e.is_verified().to_owned())
.deleted(false)
.build()
.unwrap();
}
UserEvent::UserDeleted => {
self.set_deleted(true);
}
UserEvent::Loggedin(_) => (),
UserEvent::PasswordUpdated(_) => (),
UserEvent::EmailUpdated(e) => {
self.set_email(e.new_email().into());
}
UserEvent::UserVerified => {
self.set_is_verified(true);
}
UserEvent::UserPromotedToAdmin(_) => {
self.set_is_admin(true);
}
UserEvent::VerificationEmailResent => (),
}
}
}
//// The aggregate tests are the most important part of a CQRS system.
//// The simplicity and flexibility of these tests are a good part of what
//// makes an event sourced system so friendly to changing business requirements.
//#[cfg(test)]
//mod aggregate_tests {
// use async_trait::async_trait;
// use std::sync::Mutex;
//
// use cqrs_es::test::TestFramework;
//
// use crate::domain::aggregate::User;
// use crate::domain::commands::UserCommand;
// use crate::domain::events::UserEvent;
// use crate::services::{AtmError, UserApi, UserServices, CheckingError};
//
// // A test framework that will apply our events and command
// // and verify that the logic works as expected.
// type AccountTestFramework = TestFramework<User>;
//
// #[test]
// fn test_deposit_money() {
// let expected = UserEvent::CustomerDepositedMoney {
// amount: 200.0,
// balance: 200.0,
// };
// let command = UserCommand::DepositMoney { amount: 200.0 };
// let services = UserServices::new(Box::new(MockUserServices::default()));
// // Obtain a new test framework
// AccountTestFramework::with(services)
// // In a test case with no previous events
// .given_no_previous_events()
// // Wnen we fire this command
// .when(command)
// // then we expect these results
// .then_expect_events(vec![expected]);
// }
//
// #[test]
// fn test_deposit_money_with_balance() {
// let previous = UserEvent::CustomerDepositedMoney {
// amount: 200.0,
// balance: 200.0,
// };
// let expected = UserEvent::CustomerDepositedMoney {
// amount: 200.0,
// balance: 400.0,
// };
// let command = UserCommand::DepositMoney { amount: 200.0 };
// let services = UserServices::new(Box::new(MockUserServices::default()));
//
// AccountTestFramework::with(services)
// // Given this previously applied event
// .given(vec![previous])
// // When we fire this command
// .when(command)
// // Then we expect this resultant event
// .then_expect_events(vec![expected]);
// }
//
// #[test]
// fn test_withdraw_money() {
// let previous = UserEvent::CustomerDepositedMoney {
// amount: 200.0,
// balance: 200.0,
// };
// let expected = UserEvent::CustomerWithdrewCash {
// amount: 100.0,
// balance: 100.0,
// };
// let services = MockUserServices::default();
// services.set_atm_withdrawal_response(Ok(()));
// let command = UserCommand::WithdrawMoney {
// amount: 100.0,
// atm_id: "ATM34f1ba3c".to_string(),
// };
//
// AccountTestFramework::with(UserServices::new(Box::new(services)))
// .given(vec![previous])
// .when(command)
// .then_expect_events(vec![expected]);
// }
//
// #[test]
// fn test_withdraw_money_client_error() {
// let previous = UserEvent::CustomerDepositedMoney {
// amount: 200.0,
// balance: 200.0,
// };
// let services = MockUserServices::default();
// services.set_atm_withdrawal_response(Err(AtmError));
// let command = UserCommand::WithdrawMoney {
// amount: 100.0,
// atm_id: "ATM34f1ba3c".to_string(),
// };
//
// let services = UserServices::new(Box::new(services));
// AccountTestFramework::with(services)
// .given(vec![previous])
// .when(command)
// .then_expect_error_message("atm rule violation");
// }
//
// #[test]
// fn test_withdraw_money_funds_not_available() {
// let command = UserCommand::WithdrawMoney {
// amount: 200.0,
// atm_id: "ATM34f1ba3c".to_string(),
// };
//
// let services = UserServices::new(Box::new(MockUserServices::default()));
// AccountTestFramework::with(services)
// .given_no_previous_events()
// .when(command)
// // Here we expect an error rather than any events
// .then_expect_error_message("funds not available")
// }
//
// #[test]
// fn test_wrote_check() {
// let previous = UserEvent::CustomerDepositedMoney {
// amount: 200.0,
// balance: 200.0,
// };
// let expected = UserEvent::CustomerWroteCheck {
// check_number: "1170".to_string(),
// amount: 100.0,
// balance: 100.0,
// };
// let services = MockUserServices::default();
// services.set_validate_check_response(Ok(()));
// let services = UserServices::new(Box::new(services));
// let command = UserCommand::WriteCheck {
// check_number: "1170".to_string(),
// amount: 100.0,
// };
//
// AccountTestFramework::with(services)
// .given(vec![previous])
// .when(command)
// .then_expect_events(vec![expected]);
// }
//
// #[test]
// fn test_wrote_check_bad_check() {
// let previous = UserEvent::CustomerDepositedMoney {
// amount: 200.0,
// balance: 200.0,
// };
// let services = MockUserServices::default();
// services.set_validate_check_response(Err(CheckingError));
// let services = UserServices::new(Box::new(services));
// let command = UserCommand::WriteCheck {
// check_number: "1170".to_string(),
// amount: 100.0,
// };
//
// AccountTestFramework::with(services)
// .given(vec![previous])
// .when(command)
// .then_expect_error_message("check invalid");
// }
//
// #[test]
// fn test_wrote_check_funds_not_available() {
// let command = UserCommand::WriteCheck {
// check_number: "1170".to_string(),
// amount: 100.0,
// };
//
// let services = UserServices::new(Box::new(MockUserServices::default()));
// AccountTestFramework::with(services)
// .given_no_previous_events()
// .when(command)
// .then_expect_error_message("funds not available")
// }
//
// pub struct MockUserServices {
// atm_withdrawal_response: Mutex<Option<Result<(), AtmError>>>,
// validate_check_response: Mutex<Option<Result<(), CheckingError>>>,
// }
//
// impl Default for MockUserServices {
// fn default() -> Self {
// Self {
// atm_withdrawal_response: Mutex::new(None),
// validate_check_response: Mutex::new(None),
// }
// }
// }
//
// impl MockUserServices {
// fn set_atm_withdrawal_response(&self, response: Result<(), AtmError>) {
// *self.atm_withdrawal_response.lock().unwrap() = Some(response);
// }
// fn set_validate_check_response(&self, response: Result<(), CheckingError>) {
// *self.validate_check_response.lock().unwrap() = Some(response);
// }
// }
//
// #[async_trait]
// impl UserApi for MockUserServices {
// async fn atm_withdrawal(&self, _atm_id: &str, _amount: f64) -> Result<(), AtmError> {
// self.atm_withdrawal_response.lock().unwrap().take().unwrap()
// }
//
// async fn validate_check(
// &self,
// _account_id: &str,
// _check_number: &str,
// ) -> Result<(), CheckingError> {
// self.validate_check_response.lock().unwrap().take().unwrap()
// }
// }
//}
//

View file

@ -2,5 +2,6 @@
//
// SPDX-License-Identifier: AGPL-3.0-or-later
pub mod aggregate;
pub mod port;
pub mod services;