// SPDX-FileCopyrightText: 2024 Aravinth Manivannan // // SPDX-License-Identifier: AGPL-3.0-or-later use async_trait::async_trait; use cqrs_es::persist::GenericQuery; use cqrs_es::persist::{PersistenceError, ViewContext, ViewRepository}; use cqrs_es::{EventEnvelope, Query, View}; use serde::{Deserialize, Serialize}; use uuid::Uuid; use super::errors::*; use super::DBOutPostgresAdapter; use crate::identity::application::services::events::UserEvent; use crate::identity::domain::aggregate::User; use crate::utils::parse_aggregate_id::parse_aggregate_id; pub const NEW_USER_NON_UUID: &str = "new_user_non_uuid-asdfa"; // The view for a User query, for a standard http application this should // be designed to reflect the response dto that will be returned to a user. #[derive(Debug, Default, Serialize, Deserialize)] pub struct UserView { first_name: String, last_name: String, user_id: Uuid, email: String, hashed_password: String, is_admin: bool, is_verified: bool, deleted: bool, } // This updates the view with events as they are committed. // The logic should be minimal here, e.g., don't calculate the account balance, // design the events to carry the balance information instead. impl View for UserView { fn update(&mut self, event: &EventEnvelope) { match &event.payload { UserEvent::UserRegistered(val) => { self.email = val.email().into(); self.hashed_password = val.hashed_password().into(); self.is_admin = val.is_admin().to_owned(); self.is_verified = val.is_verified().to_owned(); self.deleted = false; self.first_name = val.first_name().into(); self.last_name = val.last_name().into(); self.user_id = *val.user_id(); } UserEvent::UserDeleted => self.deleted = true, UserEvent::Loggedin(_) => (), UserEvent::PasswordUpdated(_) => (), UserEvent::EmailUpdated(val) => { self.email = val.new_email().into(); } UserEvent::UserVerified => { self.is_verified = true; } UserEvent::UserPromotedToAdmin(_) => { self.is_admin = true; } UserEvent::VerificationEmailResent => (), } } } #[async_trait] impl ViewRepository for DBOutPostgresAdapter { async fn load(&self, user_id: &str) -> Result, PersistenceError> { let user_id = match parse_aggregate_id(user_id, NEW_USER_NON_UUID)? { Some((val, _)) => return Ok(Some(val)), None => Uuid::parse_str(user_id).unwrap(), }; let res = sqlx::query_as!( UserView, "SELECT first_name, last_name, user_id, email, hashed_password, is_admin, is_verified, deleted FROM user_query WHERE user_id = $1;", user_id ) .fetch_one(&self.pool) .await .map_err(PostgresAggregateError::from)?; Ok(Some(res)) } async fn load_with_context( &self, user_id: &str, ) -> Result, PersistenceError> { let user_id = match parse_aggregate_id(user_id, NEW_USER_NON_UUID)? { Some(val) => return Ok(Some(val)), None => Uuid::parse_str(user_id).unwrap(), }; let res = sqlx::query_as!( UserView, "SELECT first_name, last_name, user_id, email, hashed_password, is_admin, is_verified, deleted FROM user_query WHERE user_id = $1;", user_id ) .fetch_one(&self.pool) .await .map_err(PostgresAggregateError::from)?; struct Context { version: i64, user_id: String, } let ctx = sqlx::query_as!( Context, "SELECT user_id, version FROM user_query WHERE user_id = $1;", user_id ) .fetch_one(&self.pool) .await .map_err(PostgresAggregateError::from)?; let view_context = ViewContext::new(ctx.user_id.to_string(), ctx.version); Ok(Some((res, view_context))) } async fn update_view( &self, view: UserView, context: ViewContext, ) -> Result<(), PersistenceError> { match context.version { 0 => { let version = context.version + 1; sqlx::query!( "INSERT INTO user_query ( version, first_name, last_name, email, hashed_password, is_admin, is_verified, deleted, user_id ) VALUES ( $1, $2, $3, $4, $5, $6, $7, $8, $9 );", version, view.first_name, view.last_name, view.email, view.hashed_password, view.is_admin, view.is_verified, view.deleted, view.user_id, ) .execute(&self.pool) .await .map_err(PostgresAggregateError::from)?; } _ => { let version = context.version + 1; sqlx::query!( "UPDATE user_query SET user_id = $1, version = $2, first_name = $3, email = $4, hashed_password = $5, is_admin = $6, is_verified = $7, deleted = $8, last_name=$9;", view.user_id, version, view.first_name, view.email, view.hashed_password, view.is_admin, view.is_verified, view.deleted, view.last_name, ) .execute(&self.pool) .await .map_err(PostgresAggregateError::from)?; } } Ok(()) } } pub struct SimpleLoggingQuery {} // Our simplest query, this is great for debugging but absolutely useless in production. // This query just pretty prints the events as they are processed. #[async_trait] impl Query for SimpleLoggingQuery { async fn dispatch(&self, aggregate_id: &str, events: &[EventEnvelope]) { for event in events { let payload = serde_json::to_string_pretty(&event.payload).unwrap(); println!("{}-{}\n{}", aggregate_id, event.sequence, payload); } } } // Our second query, this one will be handled with Postgres `GenericQuery` // which will serialize and persist our view after it is updated. It also // provides a `load` method to deserialize the view on request. pub type UserQuery = GenericQuery;