feat: impl idnetity DB adapters and View for employee aggregate
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful

This commit is contained in:
Aravinth Manivannan 2024-10-08 16:39:10 +05:30
parent edd84dd537
commit b4c9e025bc
Signed by: realaravinth
GPG key ID: F8F50389936984FF
35 changed files with 1648 additions and 6 deletions

View file

@ -0,0 +1,28 @@
{
"db_name": "PostgreSQL",
"query": "SELECT \n emp_id, version\n FROM\n cqrs_identity_employee_query\n WHERE\n emp_id = $1;",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "emp_id",
"type_info": "Uuid"
},
{
"ordinal": 1,
"name": "version",
"type_info": "Int8"
}
],
"parameters": {
"Left": [
"Uuid"
]
},
"nullable": [
false,
false
]
},
"hash": "0bbafa839181369d932fb1deae71e2255c30d24779d37a703dcd3521fbac35cf"
}

View file

@ -0,0 +1,16 @@
{
"db_name": "PostgreSQL",
"query": "DELETE FROM\n emp_verification_otp\n WHERE\n otp = $1\n AND\n emp_id = (\n SELECT emp_id\n FROM cqrs_identity_employee_query\n WHERE\n phone_number_number = $2\n AND\n phone_number_country_code = $3\n );",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Int4",
"Int8",
"Int4"
]
},
"nullable": []
},
"hash": "0ce99350a114d2bc5876ca038f079aac489d27d89d164d07ded62c71e5237b10"
}

View file

@ -0,0 +1,22 @@
{
"db_name": "PostgreSQL",
"query": "UPDATE\n cqrs_identity_employee_query\n SET\n version = $1,\n\n created_time = $2,\n store_id = $3,\n first_name = $4,\n last_name = $5,\n phone_number_number = $6,\n phone_number_country_code = $7,\n phone_verified = $8,\n\n\n deleted = $9;",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Int8",
"Timestamptz",
"Uuid",
"Text",
"Text",
"Int8",
"Int4",
"Bool",
"Bool"
]
},
"nullable": []
},
"hash": "4b8bf25b161a8337bc1ee7bcfba5a065417280cbae1527d3363f6f7561cb50c3"
}

View file

@ -0,0 +1,22 @@
{
"db_name": "PostgreSQL",
"query": "SELECT EXISTS (\n SELECT 1\n FROM cqrs_identity_employee_query\n WHERE\n emp_id = $1\n );",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "exists",
"type_info": "Bool"
}
],
"parameters": {
"Left": [
"Uuid"
]
},
"nullable": [
null
]
},
"hash": "5ed9222039eabd6e564c035aae83227f5a8398d4892d62185fbf911f16bde10d"
}

View file

@ -0,0 +1,23 @@
{
"db_name": "PostgreSQL",
"query": "SELECT EXISTS (\n SELECT 1\n FROM cqrs_identity_employee_query\n WHERE\n phone_number_number = $1\n AND \n phone_number_country_code = $2\n );",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "exists",
"type_info": "Bool"
}
],
"parameters": {
"Left": [
"Int8",
"Int4"
]
},
"nullable": [
null
]
},
"hash": "6059e6de1cb1aaca0df97b3f6f95b567f0786bacecd4658776aefcaae41ca679"
}

View file

@ -0,0 +1,17 @@
{
"db_name": "PostgreSQL",
"query": "INSERT INTO emp_verification_otp (otp, created_at, purpose, emp_id)\n VALUES ($1, $2, $3, $4);",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Int4",
"Timestamptz",
"Text",
"Uuid"
]
},
"nullable": []
},
"hash": "76b69da704bce81a71c5591ac3990a3dcbab3e9d8d67e71126a1ab1de99b839a"
}

View file

@ -0,0 +1,23 @@
{
"db_name": "PostgreSQL",
"query": "INSERT INTO cqrs_identity_employee_query (\n version,\n created_time,\n store_id,\n emp_id,\n first_name,\n last_name,\n phone_number_number,\n phone_number_country_code,\n phone_verified,\n deleted\n\n\n ) VALUES (\n $1, $2, $3, $4, $5, $6, $7, $8, $9, $10\n );",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Int8",
"Timestamptz",
"Uuid",
"Uuid",
"Text",
"Text",
"Int8",
"Int4",
"Bool",
"Bool"
]
},
"nullable": []
},
"hash": "796be4344e585654ea27252b02239158ed4691448b33d4427bf70717aad41263"
}

View file

@ -0,0 +1,70 @@
{
"db_name": "PostgreSQL",
"query": "SELECT \n created_time,\n store_id,\n emp_id,\n first_name,\n last_name,\n phone_number_number,\n phone_number_country_code,\n phone_verified,\n deleted\n FROM\n cqrs_identity_employee_query\n WHERE\n emp_id = $1;",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "created_time",
"type_info": "Timestamptz"
},
{
"ordinal": 1,
"name": "store_id",
"type_info": "Uuid"
},
{
"ordinal": 2,
"name": "emp_id",
"type_info": "Uuid"
},
{
"ordinal": 3,
"name": "first_name",
"type_info": "Text"
},
{
"ordinal": 4,
"name": "last_name",
"type_info": "Text"
},
{
"ordinal": 5,
"name": "phone_number_number",
"type_info": "Int8"
},
{
"ordinal": 6,
"name": "phone_number_country_code",
"type_info": "Int4"
},
{
"ordinal": 7,
"name": "phone_verified",
"type_info": "Bool"
},
{
"ordinal": 8,
"name": "deleted",
"type_info": "Bool"
}
],
"parameters": {
"Left": [
"Uuid"
]
},
"nullable": [
false,
true,
false,
false,
false,
false,
false,
false,
false
]
},
"hash": "7c2fd6e897bf18b1f2229eec5fd12932a86d4e88f2fd4ab8ac32246a55303b03"
}

View file

@ -0,0 +1,70 @@
{
"db_name": "PostgreSQL",
"query": "SELECT \n created_time,\n store_id,\n emp_id,\n first_name,\n last_name,\n phone_number_number,\n phone_number_country_code,\n phone_verified,\n deleted\n\n FROM\n cqrs_identity_employee_query\n WHERE\n emp_id = $1;",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "created_time",
"type_info": "Timestamptz"
},
{
"ordinal": 1,
"name": "store_id",
"type_info": "Uuid"
},
{
"ordinal": 2,
"name": "emp_id",
"type_info": "Uuid"
},
{
"ordinal": 3,
"name": "first_name",
"type_info": "Text"
},
{
"ordinal": 4,
"name": "last_name",
"type_info": "Text"
},
{
"ordinal": 5,
"name": "phone_number_number",
"type_info": "Int8"
},
{
"ordinal": 6,
"name": "phone_number_country_code",
"type_info": "Int4"
},
{
"ordinal": 7,
"name": "phone_verified",
"type_info": "Bool"
},
{
"ordinal": 8,
"name": "deleted",
"type_info": "Bool"
}
],
"parameters": {
"Left": [
"Uuid"
]
},
"nullable": [
false,
true,
false,
false,
false,
false,
false,
false,
false
]
},
"hash": "848f7c8250f7aba08fcf11491ee1a80c9fd0bfb8e37ca1051604bc2bb25d5356"
}

View file

@ -1,11 +1,10 @@
{
"db_name": "PostgreSQL",
"query": "UPDATE\n user_query\n SET\n user_id = $1, version = $2, first_name = $3, email = $4,\n hashed_password = $5, is_admin = $6, is_verified = $7, deleted = $8,\n last_name=$9;",
"query": "UPDATE\n user_query\n SET\n version = $1, first_name = $2, email = $3,\n hashed_password = $4, is_admin = $5, is_verified = $6, deleted = $7,\n last_name=$8;",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Uuid",
"Int8",
"Text",
"Text",
@ -18,5 +17,5 @@
},
"nullable": []
},
"hash": "14e8d7a1c8f80701b76b2bac69b1ecd99f7694d620f1945ad5c4ae474a17be1b"
"hash": "8ae34e8c47972ce113f235888fd52ca6f48a6a6130538256827f03ea11b8cc78"
}

View file

@ -0,0 +1,23 @@
{
"db_name": "PostgreSQL",
"query": "SELECT emp_id\n FROM cqrs_identity_employee_query\n WHERE\n phone_number_number = $1\n AND\n phone_number_country_code = $2;",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "emp_id",
"type_info": "Uuid"
}
],
"parameters": {
"Left": [
"Int8",
"Int4"
]
},
"nullable": [
false
]
},
"hash": "901bd28d7ec82aae1b5b7a2add1f0b0bd8625c365a9de30b176d023333c3d266"
}

View file

@ -0,0 +1,23 @@
{
"db_name": "PostgreSQL",
"query": "SELECT\n otp\n FROM\n emp_verification_otp\n WHERE\n emp_id = (\n SELECT emp_id\n FROM cqrs_identity_employee_query\n WHERE\n phone_number_number = $1\n AND\n phone_number_country_code = $2\n );",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "otp",
"type_info": "Int4"
}
],
"parameters": {
"Left": [
"Int8",
"Int4"
]
},
"nullable": [
false
]
},
"hash": "cee6873f40b9a442ec4764e22a6885994c5fb3cd1f9367a073d6333eadbdb8e8"
}

View file

@ -0,0 +1,23 @@
{
"db_name": "PostgreSQL",
"query": "SELECT\n otp\n FROM\n emp_login_otp\n WHERE\n emp_id = (\n SELECT emp_id\n FROM cqrs_identity_employee_query\n WHERE\n phone_number_number = $1\n AND\n phone_number_country_code = $2\n );",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "otp",
"type_info": "Int4"
}
],
"parameters": {
"Left": [
"Int8",
"Int4"
]
},
"nullable": [
false
]
},
"hash": "d7ca0856c700be37f5ec475b8bbc2d4a443923c8afdd37742d75d381a8831873"
}

View file

@ -0,0 +1,17 @@
{
"db_name": "PostgreSQL",
"query": "INSERT INTO emp_login_otp (otp, created_at, purpose, emp_id)\n VALUES ($1, $2, $3, $4);",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Int4",
"Timestamptz",
"Text",
"Uuid"
]
},
"nullable": []
},
"hash": "e56fe1dadd3536ea549a30c36005982be07e3d34a1782399bee188dddd2c85a8"
}

View file

@ -0,0 +1,16 @@
{
"db_name": "PostgreSQL",
"query": "DELETE FROM\n emp_login_otp\n WHERE\n otp = $1\n AND\n emp_id = (\n SELECT emp_id\n FROM cqrs_identity_employee_query\n WHERE\n phone_number_number = $2\n AND\n phone_number_country_code = $3\n );",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Int4",
"Int8",
"Int4"
]
},
"nullable": []
},
"hash": "e665b7147456bc27158a24a8bd705bcae867b257d0eb1f963c029a665e2b9f38"
}

View file

@ -0,0 +1,11 @@
-- SPDX-FileCopyrightText: 2024 Aravinth Manivannan <realaravinth@batsense.net>
--
-- SPDX-License-Identifier: AGPL-3.0-or-later
CREATE TABLE IF NOT EXISTS emp_login_otp (
otp INTEGER NOT NULL UNIQUE,
created_at timestamp with time zone DEFAULT (CURRENT_TIMESTAMP),
purpose TEXT NOT NULL,
emp_id UUID NOT NULL,
ID SERIAL PRIMARY KEY NOT NULL
);

View file

@ -0,0 +1,11 @@
-- SPDX-FileCopyrightText: 2024 Aravinth Manivannan <realaravinth@batsense.net>
--
-- SPDX-License-Identifier: AGPL-3.0-or-later
CREATE TABLE IF NOT EXISTS emp_verification_otp (
otp INTEGER NOT NULL UNIQUE,
created_at timestamp with time zone DEFAULT (CURRENT_TIMESTAMP),
purpose TEXT NOT NULL,
emp_id UUID NOT NULL,
ID SERIAL PRIMARY KEY NOT NULL
);

View file

@ -0,0 +1,28 @@
-- SPDX-FileCopyrightText: 2024 Aravinth Manivannan <realaravinth@batsense.net>
--
-- SPDX-License-Identifier: AGPL-3.0-or-later
CREATE TABLE IF NOT EXISTS cqrs_identity_employee_query
(
version bigint CHECK (version >= 0) NOT NULL,
created_time timestamp with time zone DEFAULT (CURRENT_TIMESTAMP) NOT NULL,
first_name TEXT NOT NULL,
last_name TEXT NOT NULL,
emp_id UUID NOT NULL UNIQUE,
phone_number_country_code INTEGER NOT NULL,
phone_number_number BIGINT NOT NULL,
phone_verified BOOLEAN NOT NULL DEFAULT FALSE,
store_id UUID DEFAULT NULL,
deleted BOOLEAN NOT NULL DEFAULT FALSE,
PRIMARY KEY (emp_id)
);

View file

@ -0,0 +1,59 @@
// SPDX-FileCopyrightText: 2024 Aravinth Manivannan <realaravinth@batsense.net>
//
// SPDX-License-Identifier: AGPL-3.0-or-later
use sqlx::types::time::OffsetDateTime;
use super::DBOutPostgresAdapter;
use crate::identity::application::port::output::db::{create_login_otp::*, errors::*};
#[async_trait::async_trait]
impl CreateLoginOTPOutDBPort for DBOutPostgresAdapter {
async fn create_login_otp(&self, msg: CreateOTPMsg) -> OutDBPortResult<()> {
sqlx::query!(
"INSERT INTO emp_login_otp (otp, created_at, purpose, emp_id)
VALUES ($1, $2, $3, $4);",
msg.otp as i32,
OffsetDateTime::now_utc(),
REGISTRATION_OTP_PURPOSE,
&msg.emp_id,
)
.execute(&self.pool)
.await?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use crate::utils::uuid::tests::UUID;
use super::*;
#[actix_rt::test]
async fn test_postgres_create_login_otp() {
let settings = crate::settings::tests::get_settings().await;
settings.create_db().await;
let db = super::DBOutPostgresAdapter::new(
sqlx::postgres::PgPool::connect(&settings.database.url)
.await
.unwrap(),
);
let msg = CreateOTPMsgBuilder::default()
.otp(1010)
.emp_id(UUID)
.build()
.unwrap();
db.create_login_otp(msg.clone()).await.unwrap();
// duplicate: secret exists
assert_eq!(
db.create_login_otp(msg).await.err(),
Some(OutDBPortError::DuplicateEmpLoginOTP)
);
settings.drop_db().await;
}
}

View file

@ -0,0 +1,59 @@
// SPDX-FileCopyrightText: 2024 Aravinth Manivannan <realaravinth@batsense.net>
//
// SPDX-License-Identifier: AGPL-3.0-or-later
use sqlx::types::time::OffsetDateTime;
use super::DBOutPostgresAdapter;
use crate::identity::application::port::output::db::{create_verification_otp::*, errors::*};
#[async_trait::async_trait]
impl CreateVerificationOTPOutDBPort for DBOutPostgresAdapter {
async fn create_verification_otp(&self, msg: CreateOTPMsg) -> OutDBPortResult<()> {
sqlx::query!(
"INSERT INTO emp_verification_otp (otp, created_at, purpose, emp_id)
VALUES ($1, $2, $3, $4);",
msg.otp as i32,
OffsetDateTime::now_utc(),
REGISTRATION_OTP_PURPOSE,
&msg.emp_id,
)
.execute(&self.pool)
.await?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use crate::utils::uuid::tests::UUID;
use super::*;
#[actix_rt::test]
async fn test_postgres_create_verification_otp() {
let settings = crate::settings::tests::get_settings().await;
settings.create_db().await;
let db = super::DBOutPostgresAdapter::new(
sqlx::postgres::PgPool::connect(&settings.database.url)
.await
.unwrap(),
);
let msg = CreateOTPMsgBuilder::default()
.otp(1010)
.emp_id(UUID)
.build()
.unwrap();
db.create_verification_otp(msg.clone()).await.unwrap();
// duplicate: secret exists
assert_eq!(
db.create_verification_otp(msg).await.err(),
Some(OutDBPortError::DuplicateEmpVerificationOTP)
);
settings.drop_db().await;
}
}

View file

@ -0,0 +1,84 @@
// SPDX-FileCopyrightText: 2024 Aravinth Manivannan <realaravinth@batsense.net>
//
// SPDX-License-Identifier: AGPL-3.0-or-later
use super::DBOutPostgresAdapter;
use crate::identity::application::port::output::db::{delete_login_otp::*, errors::*};
use crate::identity::domain::employee_aggregate::*;
#[async_trait::async_trait]
impl DeleteLoginOTPOutDBPort for DBOutPostgresAdapter {
async fn delete_login_otp(
&self,
otp: usize,
phone_number: &PhoneNumber,
) -> OutDBPortResult<()> {
sqlx::query!(
"DELETE FROM
emp_login_otp
WHERE
otp = $1
AND
emp_id = (
SELECT emp_id
FROM cqrs_identity_employee_query
WHERE
phone_number_number = $2
AND
phone_number_country_code = $3
);",
otp as i32,
*phone_number.number() as i64,
*phone_number.country_code() as i32,
)
.execute(&self.pool)
.await?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
identity::{
adapters::output::db::postgres::emp_id_exists::tests::create_dummy_employee,
application::port::output::db::{
create_login_otp::*, create_login_otp::*, get_login_otp::*,
},
},
utils::uuid::tests::UUID,
};
#[actix_rt::test]
async fn test_postgres_delete_login_otp() {
let otp = 9999;
let e = Employee::default();
let settings = crate::settings::tests::get_settings().await;
settings.create_db().await;
let db = super::DBOutPostgresAdapter::new(
sqlx::postgres::PgPool::connect(&settings.database.url)
.await
.unwrap(),
);
create_dummy_employee(&e, &db).await;
db.create_login_otp(
CreateOTPMsgBuilder::default()
.otp(otp)
.emp_id(*e.emp_id())
.build()
.unwrap(),
)
.await
.unwrap();
assert!(db.get_login_otp(e.phone_number()).await.unwrap().is_some());
db.delete_login_otp(otp, e.phone_number()).await.unwrap();
assert!(db.get_login_otp(e.phone_number()).await.unwrap().is_none());
settings.drop_db().await;
}
}

View file

@ -0,0 +1,94 @@
// SPDX-FileCopyrightText: 2024 Aravinth Manivannan <realaravinth@batsense.net>
//
// SPDX-License-Identifier: AGPL-3.0-or-later
use super::DBOutPostgresAdapter;
use crate::identity::application::port::output::db::{delete_verification_otp::*, errors::*};
use crate::identity::domain::employee_aggregate::*;
#[async_trait::async_trait]
impl DeleteVerificationOTPOutDBPort for DBOutPostgresAdapter {
async fn delete_verification_otp(
&self,
otp: usize,
phone_number: &PhoneNumber,
) -> OutDBPortResult<()> {
sqlx::query!(
"DELETE FROM
emp_verification_otp
WHERE
otp = $1
AND
emp_id = (
SELECT emp_id
FROM cqrs_identity_employee_query
WHERE
phone_number_number = $2
AND
phone_number_country_code = $3
);",
otp as i32,
*phone_number.number() as i64,
*phone_number.country_code() as i32,
)
.execute(&self.pool)
.await?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
identity::{
adapters::output::db::postgres::emp_id_exists::tests::create_dummy_employee,
application::port::output::db::{
create_verification_otp::*, create_verification_otp::*, get_verification_otp::*,
},
},
utils::uuid::tests::UUID,
};
#[actix_rt::test]
async fn test_postgres_delete_verification_otp() {
let otp = 9999;
let e = Employee::default();
let settings = crate::settings::tests::get_settings().await;
settings.create_db().await;
let db = super::DBOutPostgresAdapter::new(
sqlx::postgres::PgPool::connect(&settings.database.url)
.await
.unwrap(),
);
create_dummy_employee(&e, &db).await;
db.create_verification_otp(
CreateOTPMsgBuilder::default()
.otp(otp)
.emp_id(*e.emp_id())
.build()
.unwrap(),
)
.await
.unwrap();
assert!(db
.get_verification_otp(e.phone_number())
.await
.unwrap()
.is_some());
db.delete_verification_otp(otp, e.phone_number())
.await
.unwrap();
assert!(db
.get_verification_otp(e.phone_number())
.await
.unwrap()
.is_none());
settings.drop_db().await;
}
}

View file

@ -0,0 +1,93 @@
// SPDX-FileCopyrightText: 2024 Aravinth Manivannan <realaravinth@batsense.net>
//
// SPDX-License-Identifier: AGPL-3.0-or-later
use uuid::Uuid;
use super::DBOutPostgresAdapter;
use crate::identity::application::port::output::db::{emp_id_exists::*, errors::*};
#[async_trait::async_trait]
impl EmpIDExistsOutDBPort for DBOutPostgresAdapter {
async fn emp_id_exists(&self, emp_id: &Uuid) -> OutDBPortResult<bool> {
let res = sqlx::query!(
"SELECT EXISTS (
SELECT 1
FROM cqrs_identity_employee_query
WHERE
emp_id = $1
);",
emp_id,
)
.fetch_one(&self.pool)
.await?;
if let Some(x) = res.exists {
Ok(x)
} else {
Ok(false)
}
}
}
#[cfg(test)]
pub mod tests {
use super::*;
use crate::utils::uuid::tests::UUID;
use crate::identity::adapters::output::db::postgres::employee_view::EmployeeView;
use crate::identity::domain::aggregate::*;
use crate::identity::domain::employee_aggregate::*;
pub async fn create_dummy_employee(e: &Employee, db: &DBOutPostgresAdapter) {
sqlx::query!(
"INSERT INTO cqrs_identity_employee_query (
version,
emp_id,
first_name,
last_name,
phone_number_number,
phone_number_country_code,
phone_verified,
deleted
) VALUES (
$1, $2, $3, $4, $5, $6, $7, $8
);",
0,
e.emp_id(),
e.first_name(),
e.last_name(),
*e.phone_number().number() as i64,
*e.phone_number().country_code() as i32,
*e.phone_verified(),
*e.deleted(),
)
.execute(&db.pool)
.await
.unwrap();
}
#[actix_rt::test]
async fn test_postgres_emp_id_exists() {
let settings = crate::settings::tests::get_settings().await;
settings.create_db().await;
let db = super::DBOutPostgresAdapter::new(
sqlx::postgres::PgPool::connect(&settings.database.url)
.await
.unwrap(),
);
let e = Employee::default();
// state doesn't exist
assert!(!db.emp_id_exists(e.emp_id()).await.unwrap());
create_dummy_employee(&e, &db).await;
// state exists
assert!(db.emp_id_exists(e.emp_id()).await.unwrap());
settings.drop_db().await;
}
}

View file

@ -0,0 +1,430 @@
// SPDX-FileCopyrightText: 2024 Aravinth Manivannan <realaravinth@batsense.net>
//
// SPDX-License-Identifier: AGPL-3.0-or-later
use std::str::FromStr;
use async_trait::async_trait;
use cqrs_es::persist::{PersistenceError, ViewContext, ViewRepository};
use cqrs_es::{EventEnvelope, Query, View};
use serde::{Deserialize, Serialize};
use time::OffsetDateTime;
use uuid::Uuid;
use super::errors::*;
use super::DBOutPostgresAdapter;
use crate::identity::application::services::events::IdentityEvent;
use crate::identity::domain::employee_aggregate::*;
use crate::types::currency::{self, Currency, PriceBuilder};
use crate::utils::parse_aggregate_id::parse_aggregate_id;
pub const NEW_BILL_NON_UUID: &str = "identity_new_bill_non_uuid-asdfa";
// The view for a Employee query, for a standard http application this should
// be designed to reflect the response dto that will be returned to a user.
#[derive(Debug, Serialize, Deserialize)]
pub struct EmployeeView {
created_time: OffsetDateTime,
first_name: String,
last_name: String,
emp_id: Uuid,
phone_number_country_code: i32,
phone_number_number: i64,
phone_verified: bool,
store_id: Option<Uuid>,
deleted: bool,
}
impl From<EmployeeView> for Employee {
fn from(v: EmployeeView) -> Self {
EmployeeBuilder::default()
.first_name(v.first_name)
.last_name(v.last_name)
.phone_number(
PhoneNumberBuilder::default()
.number(v.phone_number_number as u64)
.country_code(v.phone_number_country_code as usize)
.build()
.unwrap(),
)
.emp_id(v.emp_id)
.phone_verified(v.phone_verified)
.store_id(v.store_id)
.deleted(v.deleted)
.build()
.unwrap()
}
}
impl Default for EmployeeView {
fn default() -> Self {
let e = Employee::default();
Self {
created_time: OffsetDateTime::now_utc(),
first_name: e.first_name().clone(),
last_name: e.last_name().clone(),
phone_number_number: *e.phone_number().number() as i64,
phone_number_country_code: *e.phone_number().country_code() as i32,
phone_verified: *e.phone_verified(),
emp_id: *e.emp_id(),
store_id: e.store_id().clone(),
deleted: false,
}
}
}
impl EmployeeView {
fn merge(&mut self, e: &Employee) {
self.first_name = e.first_name().clone();
self.last_name = e.last_name().clone();
self.phone_number_number = *e.phone_number().number() as i64;
self.phone_number_country_code = *e.phone_number().country_code() as i32;
self.phone_verified = *e.phone_verified();
self.emp_id = *e.emp_id();
self.store_id = e.store_id().clone();
self.deleted = *e.deleted();
}
}
// This updates the view with events as they are committed.
// The logic should be minimal here, e.g., don't calculate the account balance,
// design the events to carry the balance information instead.
impl View<Employee> for EmployeeView {
fn update(&mut self, event: &EventEnvelope<Employee>) {
match &event.payload {
IdentityEvent::EmployeeRegistered(e) => self.merge(e.employee()),
IdentityEvent::EmployeeLoggedIn(e) => (),
IdentityEvent::EmployeeInitLoggedIn(e) => (),
IdentityEvent::ResentLoginOTP(e) => (),
IdentityEvent::PhoneNumberVerified(e) => self.phone_verified = true,
IdentityEvent::VerificationOTPResent(e) => (),
IdentityEvent::PhoneNumberChanged(e) => unimplemented!(),
IdentityEvent::InviteAccepted(e) => self.store_id = Some(*e.store_id()),
IdentityEvent::OrganizationExited(e) => self.store_id = None,
_ => (),
}
}
}
#[async_trait]
impl ViewRepository<EmployeeView, Employee> for DBOutPostgresAdapter {
async fn load(&self, emp_id: &str) -> Result<Option<EmployeeView>, PersistenceError> {
let emp_id = match parse_aggregate_id(emp_id, NEW_BILL_NON_UUID)? {
Some((val, _)) => return Ok(Some(val)),
None => Uuid::parse_str(emp_id).unwrap(),
};
let res = sqlx::query_as!(
EmployeeView,
"SELECT
created_time,
store_id,
emp_id,
first_name,
last_name,
phone_number_number,
phone_number_country_code,
phone_verified,
deleted
FROM
cqrs_identity_employee_query
WHERE
emp_id = $1;",
emp_id
)
.fetch_one(&self.pool)
.await
.map_err(PostgresAggregateError::from)?;
Ok(Some(res))
}
async fn load_with_context(
&self,
emp_id: &str,
) -> Result<Option<(EmployeeView, ViewContext)>, PersistenceError> {
let emp_id = match parse_aggregate_id(emp_id, NEW_BILL_NON_UUID)? {
Some(val) => return Ok(Some(val)),
None => Uuid::parse_str(emp_id).unwrap(),
};
let res = sqlx::query_as!(
EmployeeView,
"SELECT
created_time,
store_id,
emp_id,
first_name,
last_name,
phone_number_number,
phone_number_country_code,
phone_verified,
deleted
FROM
cqrs_identity_employee_query
WHERE
emp_id = $1;",
&emp_id,
)
.fetch_one(&self.pool)
.await
.map_err(PostgresAggregateError::from)?;
struct Context {
version: i64,
emp_id: Uuid,
}
let ctx = sqlx::query_as!(
Context,
"SELECT
emp_id, version
FROM
cqrs_identity_employee_query
WHERE
emp_id = $1;",
emp_id
)
.fetch_one(&self.pool)
.await
.map_err(PostgresAggregateError::from)?;
let view_context = ViewContext::new(ctx.emp_id.to_string(), ctx.version);
Ok(Some((res, view_context)))
}
async fn update_view(
&self,
view: EmployeeView,
context: ViewContext,
) -> Result<(), PersistenceError> {
match context.version {
0 => {
let version = context.version + 1;
sqlx::query!(
"INSERT INTO cqrs_identity_employee_query (
version,
created_time,
store_id,
emp_id,
first_name,
last_name,
phone_number_number,
phone_number_country_code,
phone_verified,
deleted
) VALUES (
$1, $2, $3, $4, $5, $6, $7, $8, $9, $10
);",
version,
view.created_time,
view.store_id,
view.emp_id,
view.first_name,
view.last_name,
view.phone_number_number,
view.phone_number_country_code,
view.phone_verified,
view.deleted,
)
.execute(&self.pool)
.await
.map_err(PostgresAggregateError::from)?;
}
_ => {
let version = context.version + 1;
sqlx::query!(
"UPDATE
cqrs_identity_employee_query
SET
version = $1,
created_time = $2,
store_id = $3,
first_name = $4,
last_name = $5,
phone_number_number = $6,
phone_number_country_code = $7,
phone_verified = $8,
deleted = $9;",
version,
view.created_time,
view.store_id,
view.first_name,
view.last_name,
view.phone_number_number,
view.phone_number_country_code,
view.phone_verified,
view.deleted,
)
.execute(&self.pool)
.await
.map_err(PostgresAggregateError::from)?;
}
}
Ok(())
}
}
#[async_trait]
impl Query<Employee> for DBOutPostgresAdapter {
async fn dispatch(&self, emp_id: &str, events: &[EventEnvelope<Employee>]) {
let res = self.load_with_context(emp_id).await.unwrap_or_else(|_| {
Some((EmployeeView::default(), ViewContext::new(emp_id.into(), 0)))
});
let (mut view, view_context): (EmployeeView, ViewContext) = res.unwrap();
for event in events {
view.update(event);
}
self.update_view(view, view_context).await.unwrap();
}
}
#[cfg(test)]
mod tests {
use super::*;
use postgres_es::PostgresCqrs;
use crate::{
db::migrate::*,
identity::{
application::{
port::output::{
db::get_verification_otp::GetVerificationOTPOutDBPort,
phone::account_validation_otp::mock_account_validation_otp_phone_port,
},
services::{
employee_register_service::*, employee_verify_phone_number_service::*,
IdentityCommand, MockIdentityServicesInterface,
},
},
domain::{
employee_aggregate::*, employee_register_command::*, verify_phone_number_command::*,
},
},
tests::bdd::*,
utils::{random_number::tests::mock_generate_random_number, uuid::tests::*},
};
use std::sync::Arc;
#[actix_rt::test]
async fn pg_query_identity_employee_view() {
let settings = crate::settings::tests::get_settings().await;
//let settings = crate::settings::Settings::new().unwrap();
settings.create_db().await;
let db = crate::db::sqlx_postgres::Postgres::init(&settings.database.url).await;
db.migrate().await;
let db = DBOutPostgresAdapter::new(db.pool.clone());
let queries: Vec<Box<dyn Query<Employee>>> = vec![Box::new(db.clone())];
let mut mock_services = MockIdentityServicesInterface::new();
//let store = Store::default();
//crate::identity::adapters::output::db::postgres::store_id_exists::tests::create_dummy_store_record(&store, &db).await;
let db2 = Arc::new(db.clone());
mock_services
.expect_employee_register_service()
.times(IS_CALLED_ONLY_ONCE.unwrap())
.returning(move || {
Arc::new(
EmployeeRegisterUserServiceBuilder::default()
.db_emp_id_exists_adapter(db2.clone())
.db_create_verification_otp_adapter(db2.clone())
.db_phone_exists_adapter(db2.clone())
.random_number_adapter(mock_generate_random_number(
IS_CALLED_ONLY_ONCE,
999,
))
.phone_account_validation_otp_adapter(
mock_account_validation_otp_phone_port(IS_CALLED_ONLY_ONCE),
)
.build()
.unwrap(),
)
});
let db2 = Arc::new(db.clone());
mock_services
.expect_employee_verify_phone_number_service()
.times(IS_CALLED_ONLY_ONCE.unwrap())
.returning(move || {
Arc::new(
EmployeeVerifyPhoneNumberServiceBuilder::default()
.db_get_emp_id_from_phone_number_adapter(db2.clone())
.db_delete_verification_otp(db2.clone())
.db_get_verification_otp(db2.clone())
.build()
.unwrap(),
)
});
let (cqrs, employee_query): (
Arc<PostgresCqrs<Employee>>,
Arc<dyn ViewRepository<EmployeeView, Employee>>,
) = (
Arc::new(postgres_es::postgres_cqrs(
db.pool.clone(),
queries,
Arc::new(mock_services),
)),
Arc::new(db.clone()),
);
let cmd = EmployeeRegisterCommandBuilder::default()
.first_name("foooint".into())
.last_name("foooint".into())
.phone_number(PhoneNumber::default())
.emp_id(Uuid::new_v4())
.build()
.unwrap();
let emp_id_str = cmd.emp_id().to_string();
cqrs.execute(&emp_id_str, IdentityCommand::EmployeeRegister(cmd.clone()))
.await
.unwrap();
let emp = employee_query.load(&emp_id_str).await.unwrap().unwrap();
let emp: Employee = emp.into();
assert_eq!(emp.first_name(), cmd.first_name());
assert_eq!(emp.last_name(), cmd.last_name());
assert_eq!(emp.emp_id(), cmd.emp_id());
assert_eq!(emp.phone_number(), cmd.phone_number());
assert!(!*emp.phone_verified());
assert!(!*emp.deleted());
assert!(emp.store_id().is_none());
let otp = db
.get_verification_otp(emp.phone_number())
.await
.unwrap()
.unwrap();
cqrs.execute(
&emp_id_str,
IdentityCommand::EmployeeVerifyPhoneNumber(
VerifyPhoneNumberCommandBuilder::default()
.otp(otp.into())
.phone_number(emp.phone_number().clone())
.build()
.unwrap(),
),
)
.await
.unwrap();
let emp = employee_query.load(&emp_id_str).await.unwrap().unwrap();
assert!(emp.phone_verified);
settings.drop_db().await;
}
}

View file

@ -21,6 +21,10 @@ impl From<SqlxError> for OutDBPortError {
return Self::InternalError;
} else if msg.contains("verification_otp_secret_key") {
return Self::DuplicateVerificationOTPSecret;
} else if msg.contains("emp_login_otp_otp_key") {
return Self::DuplicateEmpLoginOTP;
} else if msg.contains("emp_verification_otp_otp_key") {
return Self::DuplicateEmpVerificationOTP;
} else {
println!("{msg}");
}

View file

@ -0,0 +1,81 @@
// SPDX-FileCopyrightText: 2024 Aravinth Manivannan <realaravinth@batsense.net>
//
// SPDX-License-Identifier: AGPL-3.0-or-later
use uuid::Uuid;
use super::errors::map_row_not_found_err;
use super::DBOutPostgresAdapter;
use crate::identity::application::port::output::db::{errors::*, get_emp_id_from_phone_number::*};
use crate::identity::domain::employee_aggregate::*;
struct EmpID {
emp_id: Uuid,
}
#[async_trait::async_trait]
impl GetEmpIDFromPhoneNumberOutDBPort for DBOutPostgresAdapter {
async fn get_emp_id_from_phone_number(
&self,
phone_number: &PhoneNumber,
) -> OutDBPortResult<Uuid> {
let res = sqlx::query_as!(
EmpID,
"SELECT emp_id
FROM cqrs_identity_employee_query
WHERE
phone_number_number = $1
AND
phone_number_country_code = $2;",
*phone_number.number() as i64,
*phone_number.country_code() as i32,
)
.fetch_one(&self.pool)
.await
.map_err(|e| map_row_not_found_err(e, OutDBPortError::PhoneNumberNotFound))?;
Ok(res.emp_id)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::identity::adapters::output::db::postgres::emp_id_exists::tests::create_dummy_employee;
use crate::utils::uuid::tests::UUID;
use crate::identity::adapters::output::db::postgres::employee_view::EmployeeView;
use crate::identity::domain::aggregate::*;
use crate::identity::domain::employee_aggregate::*;
#[actix_rt::test]
async fn test_postgres_get_emp_id_from_phone() {
let settings = crate::settings::tests::get_settings().await;
settings.create_db().await;
let db = super::DBOutPostgresAdapter::new(
sqlx::postgres::PgPool::connect(&settings.database.url)
.await
.unwrap(),
);
let e = Employee::default();
// state doesn't exist
assert_eq!(
db.get_emp_id_from_phone_number(e.phone_number()).await,
Err(OutDBPortError::PhoneNumberNotFound)
);
create_dummy_employee(&e, &db).await;
// state exists
assert_eq!(
db.get_emp_id_from_phone_number(e.phone_number())
.await
.unwrap(),
*e.emp_id()
);
settings.drop_db().await;
}
}

View file

@ -0,0 +1,97 @@
// SPDX-FileCopyrightText: 2024 Aravinth Manivannan <realaravinth@batsense.net>
//
// SPDX-License-Identifier: AGPL-3.0-or-later
use uuid::Uuid;
use super::errors::*;
use super::DBOutPostgresAdapter;
use crate::identity::application::port::output::db::{errors::*, get_login_otp::*};
use crate::identity::domain::employee_aggregate::*;
#[async_trait::async_trait]
impl GetLoginOTPOutDBPort for DBOutPostgresAdapter {
async fn get_login_otp(&self, phone_number: &PhoneNumber) -> OutDBPortResult<Option<usize>> {
struct Secret {
otp: i32,
}
let res = sqlx::query_as!(
Secret,
"SELECT
otp
FROM
emp_login_otp
WHERE
emp_id = (
SELECT emp_id
FROM cqrs_identity_employee_query
WHERE
phone_number_number = $1
AND
phone_number_country_code = $2
);",
*phone_number.number() as i64,
*phone_number.country_code() as i32,
)
.fetch_one(&self.pool)
.await
.map_err(|e| map_row_not_found_err(e, OutDBPortError::EmpLoginOTPNotFound));
let res = match res {
Ok(res) => Some(res.otp as usize),
Err(OutDBPortError::EmpLoginOTPNotFound) => None,
Err(e) => return Err(e),
};
Ok(res)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
identity::{
adapters::output::db::postgres::{
create_login_otp::*, emp_id_exists::tests::create_dummy_employee,
},
application::port::output::db::create_login_otp::*,
},
utils::uuid::tests::UUID,
};
#[actix_rt::test]
async fn test_postgres_get_verification_secret() {
let otp = 999;
let e = Employee::default();
let settings = crate::settings::tests::get_settings().await;
settings.create_db().await;
let db = super::DBOutPostgresAdapter::new(
sqlx::postgres::PgPool::connect(&settings.database.url)
.await
.unwrap(),
);
create_dummy_employee(&e, &db).await;
assert!(db.get_login_otp(e.phone_number()).await.unwrap().is_none(),);
db.create_login_otp(
CreateOTPMsgBuilder::default()
.otp(otp)
.emp_id(*e.emp_id())
.build()
.unwrap(),
)
.await
.unwrap();
assert_eq!(
db.get_login_otp(e.phone_number()).await.unwrap(),
Some(otp.into())
);
settings.drop_db().await;
}
}

View file

@ -0,0 +1,104 @@
// SPDX-FileCopyrightText: 2024 Aravinth Manivannan <realaravinth@batsense.net>
//
// SPDX-License-Identifier: AGPL-3.0-or-later
use uuid::Uuid;
use super::errors::*;
use super::DBOutPostgresAdapter;
use crate::identity::application::port::output::db::{errors::*, get_verification_otp::*};
use crate::identity::domain::employee_aggregate::*;
#[async_trait::async_trait]
impl GetVerificationOTPOutDBPort for DBOutPostgresAdapter {
async fn get_verification_otp(
&self,
phone_number: &PhoneNumber,
) -> OutDBPortResult<Option<usize>> {
struct Secret {
otp: i32,
}
let res = sqlx::query_as!(
Secret,
"SELECT
otp
FROM
emp_verification_otp
WHERE
emp_id = (
SELECT emp_id
FROM cqrs_identity_employee_query
WHERE
phone_number_number = $1
AND
phone_number_country_code = $2
);",
*phone_number.number() as i64,
*phone_number.country_code() as i32,
)
.fetch_one(&self.pool)
.await
.map_err(|e| map_row_not_found_err(e, OutDBPortError::EmpVerificationOTPNotFound));
let res = match res {
Ok(res) => Some(res.otp as usize),
Err(OutDBPortError::EmpVerificationOTPNotFound) => None,
Err(e) => return Err(e),
};
Ok(res)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
identity::{
adapters::output::db::postgres::{
create_verification_otp::*, emp_id_exists::tests::create_dummy_employee,
},
application::port::output::db::create_verification_otp::*,
},
utils::uuid::tests::UUID,
};
#[actix_rt::test]
async fn test_postgres_get_verification_secret() {
let otp = 999;
let e = Employee::default();
let settings = crate::settings::tests::get_settings().await;
settings.create_db().await;
let db = super::DBOutPostgresAdapter::new(
sqlx::postgres::PgPool::connect(&settings.database.url)
.await
.unwrap(),
);
create_dummy_employee(&e, &db).await;
assert!(db
.get_verification_otp(e.phone_number())
.await
.unwrap()
.is_none(),);
db.create_verification_otp(
CreateOTPMsgBuilder::default()
.otp(otp)
.emp_id(*e.emp_id())
.build()
.unwrap(),
)
.await
.unwrap();
assert_eq!(
db.get_verification_otp(e.phone_number()).await.unwrap(),
Some(otp.into())
);
settings.drop_db().await;
}
}

View file

@ -12,12 +12,27 @@ use crate::db::{migrate::RunMigrations, sqlx_postgres::Postgres};
pub mod create_verification_secret;
pub mod delete_verification_secret;
pub mod email_exists;
pub mod employee_view;
mod errors;
pub mod get_verification_secret;
pub mod user_id_exists;
pub mod user_view;
pub mod verification_secret_exists;
pub mod create_login_otp;
pub mod create_verification_otp;
pub mod delete_login_otp;
pub mod delete_verification_otp;
pub mod emp_id_exists;
pub mod get_emp_id_from_phone_number;
pub mod get_login_otp;
pub mod get_verification_otp;
pub mod phone_exists;
//pub mod get_invite;
//pub mod invite_id_exists;
//pub mod store_id_exists;
#[derive(Clone)]
pub struct DBOutPostgresAdapter {
pool: PgPool,

View file

@ -0,0 +1,69 @@
// SPDX-FileCopyrightText: 2024 Aravinth Manivannan <realaravinth@batsense.net>
//
// SPDX-License-Identifier: AGPL-3.0-or-later
use uuid::Uuid;
use super::DBOutPostgresAdapter;
use crate::identity::application::port::output::db::{errors::*, phone_exists::*};
use crate::identity::domain::employee_aggregate::*;
#[async_trait::async_trait]
impl PhoneNumberExistsOutDBPort for DBOutPostgresAdapter {
async fn phone_exists(&self, phone: &PhoneNumber) -> OutDBPortResult<bool> {
let res = sqlx::query!(
"SELECT EXISTS (
SELECT 1
FROM cqrs_identity_employee_query
WHERE
phone_number_number = $1
AND
phone_number_country_code = $2
);",
*phone.number() as i64,
*phone.country_code() as i32,
)
.fetch_one(&self.pool)
.await?;
if let Some(x) = res.exists {
Ok(x)
} else {
Ok(false)
}
}
}
#[cfg(test)]
pub mod tests {
use super::*;
use crate::identity::adapters::output::db::postgres::emp_id_exists::tests::create_dummy_employee;
use crate::utils::uuid::tests::UUID;
use crate::identity::adapters::output::db::postgres::employee_view::EmployeeView;
use crate::identity::domain::aggregate::*;
use crate::identity::domain::employee_aggregate::*;
#[actix_rt::test]
async fn test_postgres_phone_exists() {
let settings = crate::settings::tests::get_settings().await;
settings.create_db().await;
let db = super::DBOutPostgresAdapter::new(
sqlx::postgres::PgPool::connect(&settings.database.url)
.await
.unwrap(),
);
let e = Employee::default();
// state doesn't exist
assert!(!db.phone_exists(e.phone_number()).await.unwrap());
create_dummy_employee(&e, &db).await;
// state exists
assert!(db.phone_exists(e.phone_number()).await.unwrap());
settings.drop_db().await;
}
}

View file

@ -19,7 +19,7 @@ pub use tests::*;
#[automock]
#[async_trait::async_trait]
pub trait DeleteLoginOTPOutDBPort: Send + Sync {
async fn delete_login_otp(&self, otp: usize, phone_exists: &PhoneNumber)
async fn delete_login_otp(&self, otp: usize, phone_number: &PhoneNumber)
-> OutDBPortResult<()>;
}

View file

@ -9,8 +9,8 @@ use serde::{Deserialize, Serialize};
use uuid::Uuid;
use super::errors::*;
use super::verification_otp_exists::*;
use crate::identity::domain::employee_aggregate::*;
#[cfg(test)]
#[allow(unused_imports)]
pub use tests::*;

View file

@ -10,7 +10,12 @@ pub type OutDBPortResult<V> = Result<V, OutDBPortError>;
#[derive(Debug, Display, Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
pub enum OutDBPortError {
InternalError,
DuplicateEmpLoginOTP,
DuplicateEmpVerificationOTP,
DuplicateVerificationOTPSecret,
VerificationSecretNotFound,
VerificationOTPNotFound,
PhoneNumberNotFound,
EmpLoginOTPNotFound,
EmpVerificationOTPNotFound,
}

View file

@ -20,5 +20,5 @@ pub mod invite_id_exists;
pub mod phone_exists;
pub mod store_id_exists;
pub mod user_id_exists;
pub mod verification_otp_exists;
//pub mod verification_otp_exists;
pub mod verification_secret_exists;

View file

@ -68,6 +68,12 @@ impl From<OutDBPortError> for IdentityError {
OutDBPortError::DuplicateVerificationOTPSecret => Self::DuplicateVerificationOTP,
OutDBPortError::VerificationSecretNotFound => Self::VerificationSecretNotFound,
OutDBPortError::VerificationOTPNotFound => Self::VerificationOTPNotFound,
OutDBPortError::DuplicateEmpLoginOTP | OutDBPortError::DuplicateEmpVerificationOTP => {
Self::InternalError
}
OutDBPortError::PhoneNumberNotFound => Self::PhoneNumberNotFound,
OutDBPortError::EmpLoginOTPNotFound => Self::LoginOTPNotFound,
OutDBPortError::EmpVerificationOTPNotFound => Self::VerificationOTPNotFound,
}
}
}