diff --git a/Cargo.lock b/Cargo.lock index 729c43f..16e0436 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1251,6 +1251,21 @@ dependencies = [ "new_debug_unreachable", ] +[[package]] +name = "futures" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" version = "0.3.30" @@ -1295,6 +1310,17 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" +[[package]] +name = "futures-macro" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.63", +] + [[package]] name = "futures-sink" version = "0.3.30" @@ -1313,8 +1339,10 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" dependencies = [ + "futures-channel", "futures-core", "futures-io", + "futures-macro", "futures-sink", "futures-task", "memchr", @@ -2433,6 +2461,21 @@ dependencies = [ "universal-hash", ] +[[package]] +name = "postgres-es" +version = "0.4.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05fcd2f85b76ea7edb35f7d421c42b85f2a4964ce7a2f139851eb9a751929cc4" +dependencies = [ + "async-trait", + "cqrs-es", + "futures", + "serde", + "serde_json", + "sqlx", + "tokio", +] + [[package]] name = "powerfmt" version = "0.2.0" @@ -3979,6 +4022,7 @@ dependencies = [ "lettre", "log", "mockall", + "postgres-es", "pretty_env_logger", "rand", "reqwest", diff --git a/Cargo.toml b/Cargo.toml index 3272e81..69d620c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,10 +22,12 @@ derive_more = "0.99.17" lettre = { version = "0.11.7", features = ["tokio1-rustls-tls", "tracing", "dkim", "tokio1-native-tls", "smtp-transport", "pool", "builder"], default-features = false } log = "0.4.21" mockall = { version = "0.12.1", features = ["nightly"] } +postgres-es = "0.4.11" pretty_env_logger = "0.5.0" rand = "0.8.5" rust-embed = { version = "8.4.0", features = ["include-exclude"] } serde = { version = "1.0.201", features = ["derive"] } +serde_json = "1.0.117" sqlx = { version = "0.7.4", features = ["runtime-tokio-rustls", "postgres", "time"] } tera = "1.19.1" time = { version = "0.3.36", features = ["serde"] } @@ -36,4 +38,3 @@ validator = { version = "0.18.1", features = ["derive"] } [dev-dependencies] reqwest = { version = "0.12.4", features = ["json"] } -serde_json = "1.0.117" diff --git a/src/identity/adapters/mod.rs b/src/identity/adapters/mod.rs index 9b25f58..c325d4d 100644 --- a/src/identity/adapters/mod.rs +++ b/src/identity/adapters/mod.rs @@ -3,4 +3,4 @@ // SPDX-License-Identifier: AGPL-3.0-or-later mod input; -mod output; +pub mod output; diff --git a/src/identity/application/aggregate.rs b/src/identity/application/aggregate.rs new file mode 100644 index 0000000..412ef14 --- /dev/null +++ b/src/identity/application/aggregate.rs @@ -0,0 +1,318 @@ +use async_trait::async_trait; +use cqrs_es::Aggregate; + +use crate::identity::application::services::errors::*; +use crate::identity::application::services::events::UserEvent; +use crate::identity::application::services::UserCommand; +use crate::identity::application::services::UserServicesInterface; +use crate::identity::domain::aggregate::User; +use crate::identity::domain::aggregate::UserBuilder; + +#[async_trait] +impl Aggregate for User { + type Command = UserCommand; + type Event = UserEvent; + type Error = IdentityError; + type Services = std::sync::Arc; + + // This identifier should be unique to the system. + fn aggregate_type() -> String { + "account".to_string() + } + + // The aggregate logic goes here. Note that this will be the _bulk_ of a CQRS system + // so expect to use helper functions elsewhere to keep the code clean. + async fn handle( + &self, + command: Self::Command, + services: &Self::Services, + ) -> Result, Self::Error> { + match command { + UserCommand::RegisterUser(cmd) => { + let res = services.register_user().register_user(cmd).await?; + Ok(vec![UserEvent::UserRegistered(res)]) + } + UserCommand::DeleteUser(cmd) => { + services.delete_user().delete_user(cmd).await; + Ok(vec![UserEvent::UserDeleted]) + } + UserCommand::Login(cmd) => { + let res = services.login().login(cmd).await; + Ok(vec![UserEvent::Loggedin(res)]) + } + UserCommand::UpdatePassword(cmd) => { + let res = services.update_password().update_password(cmd).await; + Ok(vec![UserEvent::PasswordUpdated(res)]) + } + UserCommand::UpdateEmail(cmd) => { + let res = services.update_email().update_email(cmd).await?; + Ok(vec![UserEvent::EmailUpdated(res)]) + } + UserCommand::MarkUserVerified(cmd) => { + services + .mark_user_verified() + .mark_user_verified(cmd) + .await?; + Ok(vec![UserEvent::UserVerified]) + } + UserCommand::SetAdmin(cmd) => { + let res = services.set_user_admin().set_user_admin(cmd).await; + Ok(vec![UserEvent::UserPromotedToAdmin(res)]) + } + UserCommand::ResendVerificationEmail(cmd) => { + services + .resend_verification_email() + .resend_verification_email(cmd) + .await?; + Ok(vec![UserEvent::VerificationEmailResent]) + } + } + } + + fn apply(&mut self, event: Self::Event) { + match event { + UserEvent::UserRegistered(e) => { + UserBuilder::default() + .username(e.username().into()) + .email(e.email().into()) + .hashed_password(e.hashed_password().into()) + .is_admin(e.is_admin().to_owned()) + .is_verified(e.is_verified().to_owned()) + .deleted(false) + .build() + .unwrap(); + } + UserEvent::UserDeleted => { + self.set_deleted(true); + } + UserEvent::Loggedin(_) => (), + UserEvent::PasswordUpdated(_) => (), + UserEvent::EmailUpdated(e) => { + self.set_email(e.new_email().into()); + } + UserEvent::UserVerified => { + self.set_is_verified(true); + } + UserEvent::UserPromotedToAdmin(_) => { + self.set_is_admin(true); + } + UserEvent::VerificationEmailResent => (), + } + } +} + +//// The aggregate tests are the most important part of a CQRS system. +//// The simplicity and flexibility of these tests are a good part of what +//// makes an event sourced system so friendly to changing business requirements. +//#[cfg(test)] +//mod aggregate_tests { +// use async_trait::async_trait; +// use std::sync::Mutex; +// +// use cqrs_es::test::TestFramework; +// +// use crate::domain::aggregate::User; +// use crate::domain::commands::UserCommand; +// use crate::domain::events::UserEvent; +// use crate::services::{AtmError, UserApi, UserServices, CheckingError}; +// +// // A test framework that will apply our events and command +// // and verify that the logic works as expected. +// type AccountTestFramework = TestFramework; +// +// #[test] +// fn test_deposit_money() { +// let expected = UserEvent::CustomerDepositedMoney { +// amount: 200.0, +// balance: 200.0, +// }; +// let command = UserCommand::DepositMoney { amount: 200.0 }; +// let services = UserServices::new(Box::new(MockUserServices::default())); +// // Obtain a new test framework +// AccountTestFramework::with(services) +// // In a test case with no previous events +// .given_no_previous_events() +// // Wnen we fire this command +// .when(command) +// // then we expect these results +// .then_expect_events(vec![expected]); +// } +// +// #[test] +// fn test_deposit_money_with_balance() { +// let previous = UserEvent::CustomerDepositedMoney { +// amount: 200.0, +// balance: 200.0, +// }; +// let expected = UserEvent::CustomerDepositedMoney { +// amount: 200.0, +// balance: 400.0, +// }; +// let command = UserCommand::DepositMoney { amount: 200.0 }; +// let services = UserServices::new(Box::new(MockUserServices::default())); +// +// AccountTestFramework::with(services) +// // Given this previously applied event +// .given(vec![previous]) +// // When we fire this command +// .when(command) +// // Then we expect this resultant event +// .then_expect_events(vec![expected]); +// } +// +// #[test] +// fn test_withdraw_money() { +// let previous = UserEvent::CustomerDepositedMoney { +// amount: 200.0, +// balance: 200.0, +// }; +// let expected = UserEvent::CustomerWithdrewCash { +// amount: 100.0, +// balance: 100.0, +// }; +// let services = MockUserServices::default(); +// services.set_atm_withdrawal_response(Ok(())); +// let command = UserCommand::WithdrawMoney { +// amount: 100.0, +// atm_id: "ATM34f1ba3c".to_string(), +// }; +// +// AccountTestFramework::with(UserServices::new(Box::new(services))) +// .given(vec![previous]) +// .when(command) +// .then_expect_events(vec![expected]); +// } +// +// #[test] +// fn test_withdraw_money_client_error() { +// let previous = UserEvent::CustomerDepositedMoney { +// amount: 200.0, +// balance: 200.0, +// }; +// let services = MockUserServices::default(); +// services.set_atm_withdrawal_response(Err(AtmError)); +// let command = UserCommand::WithdrawMoney { +// amount: 100.0, +// atm_id: "ATM34f1ba3c".to_string(), +// }; +// +// let services = UserServices::new(Box::new(services)); +// AccountTestFramework::with(services) +// .given(vec![previous]) +// .when(command) +// .then_expect_error_message("atm rule violation"); +// } +// +// #[test] +// fn test_withdraw_money_funds_not_available() { +// let command = UserCommand::WithdrawMoney { +// amount: 200.0, +// atm_id: "ATM34f1ba3c".to_string(), +// }; +// +// let services = UserServices::new(Box::new(MockUserServices::default())); +// AccountTestFramework::with(services) +// .given_no_previous_events() +// .when(command) +// // Here we expect an error rather than any events +// .then_expect_error_message("funds not available") +// } +// +// #[test] +// fn test_wrote_check() { +// let previous = UserEvent::CustomerDepositedMoney { +// amount: 200.0, +// balance: 200.0, +// }; +// let expected = UserEvent::CustomerWroteCheck { +// check_number: "1170".to_string(), +// amount: 100.0, +// balance: 100.0, +// }; +// let services = MockUserServices::default(); +// services.set_validate_check_response(Ok(())); +// let services = UserServices::new(Box::new(services)); +// let command = UserCommand::WriteCheck { +// check_number: "1170".to_string(), +// amount: 100.0, +// }; +// +// AccountTestFramework::with(services) +// .given(vec![previous]) +// .when(command) +// .then_expect_events(vec![expected]); +// } +// +// #[test] +// fn test_wrote_check_bad_check() { +// let previous = UserEvent::CustomerDepositedMoney { +// amount: 200.0, +// balance: 200.0, +// }; +// let services = MockUserServices::default(); +// services.set_validate_check_response(Err(CheckingError)); +// let services = UserServices::new(Box::new(services)); +// let command = UserCommand::WriteCheck { +// check_number: "1170".to_string(), +// amount: 100.0, +// }; +// +// AccountTestFramework::with(services) +// .given(vec![previous]) +// .when(command) +// .then_expect_error_message("check invalid"); +// } +// +// #[test] +// fn test_wrote_check_funds_not_available() { +// let command = UserCommand::WriteCheck { +// check_number: "1170".to_string(), +// amount: 100.0, +// }; +// +// let services = UserServices::new(Box::new(MockUserServices::default())); +// AccountTestFramework::with(services) +// .given_no_previous_events() +// .when(command) +// .then_expect_error_message("funds not available") +// } +// +// pub struct MockUserServices { +// atm_withdrawal_response: Mutex>>, +// validate_check_response: Mutex>>, +// } +// +// impl Default for MockUserServices { +// fn default() -> Self { +// Self { +// atm_withdrawal_response: Mutex::new(None), +// validate_check_response: Mutex::new(None), +// } +// } +// } +// +// impl MockUserServices { +// fn set_atm_withdrawal_response(&self, response: Result<(), AtmError>) { +// *self.atm_withdrawal_response.lock().unwrap() = Some(response); +// } +// fn set_validate_check_response(&self, response: Result<(), CheckingError>) { +// *self.validate_check_response.lock().unwrap() = Some(response); +// } +// } +// +// #[async_trait] +// impl UserApi for MockUserServices { +// async fn atm_withdrawal(&self, _atm_id: &str, _amount: f64) -> Result<(), AtmError> { +// self.atm_withdrawal_response.lock().unwrap().take().unwrap() +// } +// +// async fn validate_check( +// &self, +// _account_id: &str, +// _check_number: &str, +// ) -> Result<(), CheckingError> { +// self.validate_check_response.lock().unwrap().take().unwrap() +// } +// } +//} +// diff --git a/src/identity/application/mod.rs b/src/identity/application/mod.rs index 2f75b72..9d8c62d 100644 --- a/src/identity/application/mod.rs +++ b/src/identity/application/mod.rs @@ -2,5 +2,6 @@ // // SPDX-License-Identifier: AGPL-3.0-or-later +pub mod aggregate; pub mod port; pub mod services;