From 420b8a7d8befcd294aaffa992f7b37c707094871 Mon Sep 17 00:00:00 2001 From: Aravinth Manivannan Date: Sun, 19 May 2024 00:39:01 +0530 Subject: [PATCH] feat: Identifier: db pg adapter: UserView --- ...b665369d5062a42c2c405f43d56a614a1e0ea.json | 21 ++ ...ea68842eaf86ba312c2eb324854041beb9de7.json | 21 ++ ...4a5925c3bb7de87828095971cb19bc1fc7550.json | 52 +++++ ...80b615c9e0ffe9ceda82e0bce410c9aca39a0.json | 28 +++ migrations/20240516200802_cqrs_init.sql | 1 + .../adapters/output/db/postgres/errors.rs | 44 ++++ .../adapters/output/db/postgres/mod.rs | 1 + .../adapters/output/db/postgres/user_view.rs | 190 ++++++++++++++++++ 8 files changed, 358 insertions(+) create mode 100644 .sqlx/query-750f96296e6d2c0b6d79c6f21e5b665369d5062a42c2c405f43d56a614a1e0ea.json create mode 100644 .sqlx/query-7a401e98fff59eb8c0fe1744ccdea68842eaf86ba312c2eb324854041beb9de7.json create mode 100644 .sqlx/query-803f868ee4a79fefdd85c5d0a034a5925c3bb7de87828095971cb19bc1fc7550.json create mode 100644 .sqlx/query-fa1c65d51e3e0521d6a20998f4b80b615c9e0ffe9ceda82e0bce410c9aca39a0.json create mode 100644 src/identity/adapters/output/db/postgres/user_view.rs diff --git a/.sqlx/query-750f96296e6d2c0b6d79c6f21e5b665369d5062a42c2c405f43d56a614a1e0ea.json b/.sqlx/query-750f96296e6d2c0b6d79c6f21e5b665369d5062a42c2c405f43d56a614a1e0ea.json new file mode 100644 index 0000000..8ce1a68 --- /dev/null +++ b/.sqlx/query-750f96296e6d2c0b6d79c6f21e5b665369d5062a42c2c405f43d56a614a1e0ea.json @@ -0,0 +1,21 @@ +{ + "db_name": "PostgreSQL", + "query": "UPDATE\n user_query\n SET\n view_id = $1, version = $2, username = $3, email = $4,\n hashed_password = $5, is_admin = $6, is_verified = $7, deleted = $8;", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Text", + "Int8", + "Text", + "Text", + "Text", + "Bool", + "Bool", + "Bool" + ] + }, + "nullable": [] + }, + "hash": "750f96296e6d2c0b6d79c6f21e5b665369d5062a42c2c405f43d56a614a1e0ea" +} diff --git a/.sqlx/query-7a401e98fff59eb8c0fe1744ccdea68842eaf86ba312c2eb324854041beb9de7.json b/.sqlx/query-7a401e98fff59eb8c0fe1744ccdea68842eaf86ba312c2eb324854041beb9de7.json new file mode 100644 index 0000000..7990582 --- /dev/null +++ b/.sqlx/query-7a401e98fff59eb8c0fe1744ccdea68842eaf86ba312c2eb324854041beb9de7.json @@ -0,0 +1,21 @@ +{ + "db_name": "PostgreSQL", + "query": "INSERT INTO user_query (\n view_id, version, username, email,\n hashed_password, is_admin, is_verified, deleted\n ) VALUES (\n $1, $2, $3, $4, $5, $6, $7, $8\n );", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Text", + "Int8", + "Text", + "Text", + "Text", + "Bool", + "Bool", + "Bool" + ] + }, + "nullable": [] + }, + "hash": "7a401e98fff59eb8c0fe1744ccdea68842eaf86ba312c2eb324854041beb9de7" +} diff --git a/.sqlx/query-803f868ee4a79fefdd85c5d0a034a5925c3bb7de87828095971cb19bc1fc7550.json b/.sqlx/query-803f868ee4a79fefdd85c5d0a034a5925c3bb7de87828095971cb19bc1fc7550.json new file mode 100644 index 0000000..06c125a --- /dev/null +++ b/.sqlx/query-803f868ee4a79fefdd85c5d0a034a5925c3bb7de87828095971cb19bc1fc7550.json @@ -0,0 +1,52 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT \n username, email, hashed_password, is_admin, is_verified, deleted\n FROM\n user_query\n WHERE\n view_id = $1;", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "username", + "type_info": "Text" + }, + { + "ordinal": 1, + "name": "email", + "type_info": "Text" + }, + { + "ordinal": 2, + "name": "hashed_password", + "type_info": "Text" + }, + { + "ordinal": 3, + "name": "is_admin", + "type_info": "Bool" + }, + { + "ordinal": 4, + "name": "is_verified", + "type_info": "Bool" + }, + { + "ordinal": 5, + "name": "deleted", + "type_info": "Bool" + } + ], + "parameters": { + "Left": [ + "Text" + ] + }, + "nullable": [ + false, + false, + false, + false, + false, + false + ] + }, + "hash": "803f868ee4a79fefdd85c5d0a034a5925c3bb7de87828095971cb19bc1fc7550" +} diff --git a/.sqlx/query-fa1c65d51e3e0521d6a20998f4b80b615c9e0ffe9ceda82e0bce410c9aca39a0.json b/.sqlx/query-fa1c65d51e3e0521d6a20998f4b80b615c9e0ffe9ceda82e0bce410c9aca39a0.json new file mode 100644 index 0000000..ca1f198 --- /dev/null +++ b/.sqlx/query-fa1c65d51e3e0521d6a20998f4b80b615c9e0ffe9ceda82e0bce410c9aca39a0.json @@ -0,0 +1,28 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT \n view_id, version\n FROM\n user_query\n WHERE\n view_id = $1;", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "view_id", + "type_info": "Text" + }, + { + "ordinal": 1, + "name": "version", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + "Text" + ] + }, + "nullable": [ + false, + false + ] + }, + "hash": "fa1c65d51e3e0521d6a20998f4b80b615c9e0ffe9ceda82e0bce410c9aca39a0" +} diff --git a/migrations/20240516200802_cqrs_init.sql b/migrations/20240516200802_cqrs_init.sql index b115a85..e52ff9e 100644 --- a/migrations/20240516200802_cqrs_init.sql +++ b/migrations/20240516200802_cqrs_init.sql @@ -25,6 +25,7 @@ CREATE TABLE IF NOT EXISTS user_query hashed_password TEXT NOT NULL, is_admin BOOLEAN NOT NULL DEFAULT FALSE, is_verified BOOLEAN NOT NULL DEFAULT FALSE, + deleted BOOLEAN NOT NULL DEFAULT FALSE, PRIMARY KEY (view_id) ); diff --git a/src/identity/adapters/output/db/postgres/errors.rs b/src/identity/adapters/output/db/postgres/errors.rs index 449a278..bbde1ae 100644 --- a/src/identity/adapters/output/db/postgres/errors.rs +++ b/src/identity/adapters/output/db/postgres/errors.rs @@ -4,6 +4,7 @@ use std::borrow::Cow; +use cqrs_es::persist::PersistenceError; use sqlx::Error as SqlxError; use crate::identity::application::port::output::db::errors::OutDBPortError; @@ -37,3 +38,46 @@ pub fn map_row_not_found_err(e: SqlxError, row_not_found: OutDBPortError) -> Out e.into() } } + +#[derive(Debug)] +pub enum PostgresAggregateError { + OptimisticLock, + ConnectionError(Box), + DeserializationError(Box), + UnknownError(Box), +} + +impl From for PostgresAggregateError { + fn from(err: SqlxError) -> Self { + // TODO: improve error handling + match &err { + SqlxError::Database(database_error) => { + if let Some(code) = database_error.code() { + if code.as_ref() == "23505" { + return PostgresAggregateError::OptimisticLock; + } + } + PostgresAggregateError::UnknownError(Box::new(err)) + } + SqlxError::Io(_) | SqlxError::Tls(_) => { + PostgresAggregateError::ConnectionError(Box::new(err)) + } + _ => PostgresAggregateError::UnknownError(Box::new(err)), + } + } +} + +impl From for PersistenceError { + fn from(err: PostgresAggregateError) -> Self { + match err { + PostgresAggregateError::OptimisticLock => PersistenceError::OptimisticLockError, + PostgresAggregateError::ConnectionError(error) => { + PersistenceError::ConnectionError(error) + } + PostgresAggregateError::DeserializationError(error) => { + PersistenceError::UnknownError(error) + } + PostgresAggregateError::UnknownError(error) => PersistenceError::UnknownError(error), + } + } +} diff --git a/src/identity/adapters/output/db/postgres/mod.rs b/src/identity/adapters/output/db/postgres/mod.rs index 324c52f..d88ec8a 100644 --- a/src/identity/adapters/output/db/postgres/mod.rs +++ b/src/identity/adapters/output/db/postgres/mod.rs @@ -14,6 +14,7 @@ pub mod delete_verification_secret; pub mod email_exists; mod errors; pub mod get_verification_secret; +pub mod user_view; pub mod username_exists; pub mod verification_secret_exists; diff --git a/src/identity/adapters/output/db/postgres/user_view.rs b/src/identity/adapters/output/db/postgres/user_view.rs new file mode 100644 index 0000000..2a51b7f --- /dev/null +++ b/src/identity/adapters/output/db/postgres/user_view.rs @@ -0,0 +1,190 @@ +// SPDX-FileCopyrightText: 2024 Aravinth Manivannan +// +// SPDX-License-Identifier: AGPL-3.0-or-later +use async_trait::async_trait; +use cqrs_es::persist::GenericQuery; +use cqrs_es::persist::{PersistenceError, ViewContext, ViewRepository}; +use cqrs_es::{EventEnvelope, Query, View}; + +use super::errors::*; +use super::DBOutPostgresAdapter; +use crate::identity::application::services::events::UserEvent; +use crate::identity::domain::aggregate::User; +use serde::{Deserialize, Serialize}; + +// The view for a User query, for a standard http application this should +// be designed to reflect the response dto that will be returned to a user. +#[derive(Debug, Default, Serialize, Deserialize)] +pub struct UserView { + username: String, + email: String, + hashed_password: String, + is_admin: bool, + is_verified: bool, + deleted: bool, +} + +// This updates the view with events as they are committed. +// The logic should be minimal here, e.g., don't calculate the account balance, +// design the events to carry the balance information instead. +impl View for UserView { + fn update(&mut self, event: &EventEnvelope) { + match &event.payload { + UserEvent::UserRegistered(val) => { + self.username = val.username().into(); + self.email = val.email().into(); + self.hashed_password = val.hashed_password().into(); + self.is_admin = val.is_admin().to_owned(); + self.is_verified = val.is_verified().to_owned(); + self.deleted = false; + } + UserEvent::UserDeleted => self.deleted = true, + UserEvent::Loggedin(_) => (), + UserEvent::PasswordUpdated(_) => (), + UserEvent::EmailUpdated(val) => { + self.email = val.new_email().into(); + } + UserEvent::UserVerified => { + self.is_verified = true; + } + UserEvent::UserPromotedToAdmin(_) => { + self.is_admin = true; + } + UserEvent::VerificationEmailResent => (), + } + } +} + +#[async_trait] +impl ViewRepository for DBOutPostgresAdapter { + async fn load(&self, view_id: &str) -> Result, PersistenceError> { + let res = sqlx::query_as!( + UserView, + "SELECT + username, email, hashed_password, is_admin, is_verified, deleted + FROM + user_query + WHERE + view_id = $1;", + view_id + ) + .fetch_one(&self.pool) + .await + .map_err(PostgresAggregateError::from)?; + Ok(Some(res)) + } + + async fn load_with_context( + &self, + view_id: &str, + ) -> Result, PersistenceError> { + let res = sqlx::query_as!( + UserView, + "SELECT + username, email, hashed_password, is_admin, is_verified, deleted + FROM + user_query + WHERE + view_id = $1;", + view_id + ) + .fetch_one(&self.pool) + .await + .map_err(PostgresAggregateError::from)?; + + struct Context { + version: i64, + view_id: String, + } + + let ctx = sqlx::query_as!( + Context, + "SELECT + view_id, version + FROM + user_query + WHERE + view_id = $1;", + view_id + ) + .fetch_one(&self.pool) + .await + .map_err(PostgresAggregateError::from)?; + + let view_context = ViewContext::new(ctx.view_id, ctx.version); + Ok(Some((res, view_context))) + } + + async fn update_view( + &self, + view: UserView, + context: ViewContext, + ) -> Result<(), PersistenceError> { + match context.version { + 0 => { + let version = context.version + 1; + sqlx::query!( + "INSERT INTO user_query ( + view_id, version, username, email, + hashed_password, is_admin, is_verified, deleted + ) VALUES ( + $1, $2, $3, $4, $5, $6, $7, $8 + );", + context.view_instance_id, + version, + view.username, + view.email, + view.hashed_password, + view.is_admin, + view.is_verified, + view.deleted, + ) + .execute(&self.pool) + .await + .map_err(PostgresAggregateError::from)?; + } + _ => { + let version = context.version + 1; + sqlx::query!( + "UPDATE + user_query + SET + view_id = $1, version = $2, username = $3, email = $4, + hashed_password = $5, is_admin = $6, is_verified = $7, deleted = $8;", + context.view_instance_id, + version, + view.username, + view.email, + view.hashed_password, + view.is_admin, + view.is_verified, + view.deleted, + ) + .execute(&self.pool) + .await + .map_err(PostgresAggregateError::from)?; + } + } + + Ok(()) + } +} + +pub struct SimpleLoggingQuery {} + +// Our simplest query, this is great for debugging but absolutely useless in production. +// This query just pretty prints the events as they are processed. +#[async_trait] +impl Query for SimpleLoggingQuery { + async fn dispatch(&self, aggregate_id: &str, events: &[EventEnvelope]) { + for event in events { + let payload = serde_json::to_string_pretty(&event.payload).unwrap(); + println!("{}-{}\n{}", aggregate_id, event.sequence, payload); + } + } +} + +// Our second query, this one will be handled with Postgres `GenericQuery` +// which will serialize and persist our view after it is updated. It also +// provides a `load` method to deserialize the view on request. +pub type UserQuery = GenericQuery;