feat: define Customization aggregate, and impl Query and View

This commit is contained in:
Aravinth Manivannan 2024-07-16 15:43:07 +05:30
parent 0f29902b4b
commit f189ecbf38
Signed by: realaravinth
GPG key ID: F8F50389936984FF
11 changed files with 477 additions and 132 deletions

View file

@ -1,18 +0,0 @@
{
"db_name": "PostgreSQL",
"query": "INSERT INTO cqrs_inventory_product_customizations_query (\n version,\n name,\n customization_id,\n product_id,\n deleted\n ) VALUES (\n $1, $2, $3, $4, $5\n );",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Int8",
"Text",
"Uuid",
"Uuid",
"Bool"
]
},
"nullable": []
},
"hash": "0994f26a6ec76a33d2839cc290e6cfac98a2dee007a1388ffe95f7feecbc7442"
}

View file

@ -1,94 +0,0 @@
{
"db_name": "PostgreSQL",
"query": "SELECT \n name,\n description,\n image,\n product_id,\n category_id,\n price_major,\n price_minor,\n price_currency,\n sku_able,\n quantity_unit,\n quantity_number,\n deleted,\n customizations_available\n FROM\n cqrs_inventory_product_query\n WHERE\n product_id = $1;",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "name",
"type_info": "Text"
},
{
"ordinal": 1,
"name": "description",
"type_info": "Text"
},
{
"ordinal": 2,
"name": "image",
"type_info": "Text"
},
{
"ordinal": 3,
"name": "product_id",
"type_info": "Uuid"
},
{
"ordinal": 4,
"name": "category_id",
"type_info": "Uuid"
},
{
"ordinal": 5,
"name": "price_major",
"type_info": "Int4"
},
{
"ordinal": 6,
"name": "price_minor",
"type_info": "Int4"
},
{
"ordinal": 7,
"name": "price_currency",
"type_info": "Text"
},
{
"ordinal": 8,
"name": "sku_able",
"type_info": "Bool"
},
{
"ordinal": 9,
"name": "quantity_unit",
"type_info": "Text"
},
{
"ordinal": 10,
"name": "quantity_number",
"type_info": "Int4"
},
{
"ordinal": 11,
"name": "deleted",
"type_info": "Bool"
},
{
"ordinal": 12,
"name": "customizations_available",
"type_info": "Bool"
}
],
"parameters": {
"Left": [
"Uuid"
]
},
"nullable": [
false,
true,
true,
false,
false,
false,
false,
false,
false,
false,
false,
false,
false
]
},
"hash": "27212976fb97f84722c5bf522580e6d64dcded15e56e1a4064c66d9df57fcdda"
}

View file

@ -1,6 +1,6 @@
{
"db_name": "PostgreSQL",
"query": "SELECT\n name,\n customization_id,\n deleted,\n product_id\n FROM\n cqrs_inventory_product_customizations_query\n WHERE\n product_id = $1;",
"query": "SELECT \n name,\n customization_id,\n product_id,\n deleted\n FROM\n cqrs_inventory_product_customizations_query\n WHERE\n customization_id = $1;",
"describe": {
"columns": [
{
@ -15,13 +15,13 @@
},
{
"ordinal": 2,
"name": "deleted",
"type_info": "Bool"
"name": "product_id",
"type_info": "Uuid"
},
{
"ordinal": 3,
"name": "product_id",
"type_info": "Uuid"
"name": "deleted",
"type_info": "Bool"
}
],
"parameters": {
@ -36,5 +36,5 @@
false
]
},
"hash": "674258549f4d79aeaf3ecd7d242df79e5f2ba063409fd81cb283bb2b75f10c32"
"hash": "2e0cec451addfee3a8f0ac1603205077814f6c8b64cabe1cf4687187c1a766ce"
}

View file

@ -0,0 +1,18 @@
{
"db_name": "PostgreSQL",
"query": "INSERT INTO cqrs_inventory_product_customizations_query (\n version,\n name,\n customization_id,\n product_id,\n deleted\n ) VALUES (\n $1, $2, $3, $4, $5\n );",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Int8",
"Text",
"Uuid",
"Uuid",
"Bool"
]
},
"nullable": []
},
"hash": "3c21629e4125c2a26ed535b981cb42b86d2cffee477a4b9a69507615ef0b91c5"
}

View file

@ -1,6 +1,6 @@
{
"db_name": "PostgreSQL",
"query": "SELECT \n name,\n description,\n image,\n product_id,\n category_id,\n price_major,\n price_minor,\n price_currency,\n sku_able,\n quantity_unit,\n quantity_number,\n customizations_available,\n deleted\n FROM\n cqrs_inventory_product_query\n WHERE\n product_id = $1;",
"query": "SELECT \n name,\n description,\n image,\n product_id,\n category_id,\n price_major,\n price_minor,\n price_currency,\n sku_able,\n quantity_unit,\n quantity_number,\n deleted\n FROM\n cqrs_inventory_product_query\n WHERE\n product_id = $1;",
"describe": {
"columns": [
{
@ -60,11 +60,6 @@
},
{
"ordinal": 11,
"name": "customizations_available",
"type_info": "Bool"
},
{
"ordinal": 12,
"name": "deleted",
"type_info": "Bool"
}
@ -86,9 +81,8 @@
false,
false,
false,
false,
false
]
},
"hash": "10c89e8b7be6fa4a1bf39ae2fbb922fad9db5e9d094497b9cc401e9301b5631f"
"hash": "3f83167782a2de1be7d87d35f63a3e530373595ce83cd36612ee48d3c36c81f3"
}

View file

@ -1,6 +1,6 @@
{
"db_name": "PostgreSQL",
"query": "INSERT INTO cqrs_inventory_product_query (\n version,\n name,\n description,\n image,\n product_id,\n category_id,\n price_major,\n price_minor,\n price_currency,\n sku_able,\n quantity_unit,\n quantity_number,\n deleted,\n customizations_available\n ) VALUES (\n $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14\n );",
"query": "INSERT INTO cqrs_inventory_product_query (\n version,\n name,\n description,\n image,\n product_id,\n category_id,\n price_major,\n price_minor,\n price_currency,\n sku_able,\n quantity_unit,\n quantity_number,\n deleted\n ) VALUES (\n $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13\n );",
"describe": {
"columns": [],
"parameters": {
@ -17,11 +17,10 @@
"Bool",
"Text",
"Int4",
"Bool",
"Bool"
]
},
"nullable": []
},
"hash": "1c048c8e1fc4739d20df983b5042e7ed9eadef4497648acff9d472a2d7fabf78"
"hash": "9390910c71001ef77313e594dea0e4f0682f740778e483a2db91d6c73ac6bc6d"
}

View file

@ -0,0 +1,28 @@
{
"db_name": "PostgreSQL",
"query": "SELECT \n customization_id, version\n FROM\n cqrs_inventory_product_customizations_query\n WHERE\n customization_id = $1;",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "customization_id",
"type_info": "Uuid"
},
{
"ordinal": 1,
"name": "version",
"type_info": "Int8"
}
],
"parameters": {
"Left": [
"Uuid"
]
},
"nullable": [
false,
false
]
},
"hash": "bdaf5b691f50281bb7108d8153c5862d4fca3f565bb394ef4115bb94913a43c3"
}

View file

@ -1,6 +1,6 @@
{
"db_name": "PostgreSQL",
"query": "UPDATE\n cqrs_inventory_product_query\n SET\n version = $1,\n name = $2,\n description = $3,\n image = $4,\n product_id = $5,\n category_id = $6,\n price_major = $7,\n price_minor = $8,\n price_currency = $9,\n sku_able = $10,\n quantity_unit = $11,\n quantity_number = $12,\n deleted = $13,\n customizations_available = $14;",
"query": "UPDATE\n cqrs_inventory_product_query\n SET\n version = $1,\n name = $2,\n description = $3,\n image = $4,\n product_id = $5,\n category_id = $6,\n price_major = $7,\n price_minor = $8,\n price_currency = $9,\n sku_able = $10,\n quantity_unit = $11,\n quantity_number = $12,\n deleted = $13;",
"describe": {
"columns": [],
"parameters": {
@ -17,11 +17,10 @@
"Bool",
"Text",
"Int4",
"Bool",
"Bool"
]
},
"nullable": []
},
"hash": "c0850c387878c368a89aa0f1f505fc337a3ece328f5eba95f11f987af88f8b23"
"hash": "f55fe530202bb369fdea3baaf91f86bbc866218e4146b6d608255c1b929073b4"
}

View file

@ -0,0 +1,271 @@
// SPDX-FileCopyrightText: 2024 Aravinth Manivannan <realaravinth@batsense.net>
//
// SPDX-License-Identifier: AGPL-3.0-or-later
use std::str::FromStr;
use async_trait::async_trait;
use cqrs_es::persist::{PersistenceError, ViewContext, ViewRepository};
use cqrs_es::{EventEnvelope, Query, View};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use super::errors::*;
use super::InventoryDBPostgresAdapter;
use crate::inventory::domain::{customization_aggregate::*, events::InventoryEvent};
use crate::utils::parse_aggregate_id::parse_aggregate_id;
pub const NEW_PRODUCT_NON_UUID: &str = "new_product_non_uuid-asdfa";
//#[derive(Debug, Default, Serialize, Deserialize)]
//struct Customizations {
// customizations: Vec<CustomizationView>,
//}
#[derive(Debug, Default, Serialize, Deserialize)]
struct CustomizationView {
name: String,
product_id: Uuid,
customization_id: Uuid,
deleted: bool,
}
impl From<CustomizationView> for Customization {
fn from(v: CustomizationView) -> Self {
CustomizationBuilder::default()
.name(v.name)
.customization_id(v.customization_id)
.deleted(v.deleted)
.build()
.unwrap()
}
}
// This updates the view with events as they are committed.
// The logic should be minimal here, e.g., don't calculate the account balance,
// design the events to carry the balance information instead.
impl View<Customization> for CustomizationView {
fn update(&mut self, event: &EventEnvelope<Customization>) {
match &event.payload {
InventoryEvent::CustomizationAdded(val) => {
self.name = val.customization().name().into();
self.product_id = val.product_id().clone();
self.customization_id = val.customization().customization_id().clone();
self.deleted = false;
}
_ => (),
}
}
}
pub struct InnerCustomization {
name: String,
customization_id: Uuid,
product_id: Uuid,
deleted: bool,
}
impl From<InnerCustomization> for Customization {
fn from(value: InnerCustomization) -> Self {
CustomizationBuilder::default()
.name(value.name)
.customization_id(value.customization_id)
.deleted(value.deleted)
.build()
.unwrap()
}
}
//impl InnerCustomizationView {
// async fn get_customizations(
// &self,
// db: &InventoryDBPostgresAdapter,
// ) -> Result<Vec<Customization>, PersistenceError> {
// let customizations = if self.customizations_available {
// let mut inner_customizations = sqlx::query_as!(
// InnerCustomization,
// "SELECT
// name,
// customization_id,
// deleted,
// customization_id
// FROM
// cqrs_inventory_product_customizations_query
// WHERE
// customization_id = $1;",
// self.customization_id
// )
// .fetch_all(&db.pool)
// .await
// .map_err(PostgresAggregateError::from)?;
//
// let mut customizations = Vec::with_capacity(inner_customizations.len());
// for c in inner_customizations.drain(0..) {
// customizations.push(c.into());
// }
// customizations
// } else {
// Vec::default()
// };
// Ok(customizations)
// }
//}
#[async_trait]
impl ViewRepository<CustomizationView, Customization> for InventoryDBPostgresAdapter {
async fn load(
&self,
customization_id: &str,
) -> Result<Option<CustomizationView>, PersistenceError> {
let customization_id = match parse_aggregate_id(customization_id, NEW_PRODUCT_NON_UUID)? {
Some((val, _)) => return Ok(Some(val)),
None => Uuid::parse_str(customization_id).unwrap(),
};
let res = sqlx::query_as!(
CustomizationView,
"SELECT
name,
customization_id,
product_id,
deleted
FROM
cqrs_inventory_product_customizations_query
WHERE
customization_id = $1;",
customization_id
)
.fetch_one(&self.pool)
.await
.map_err(PostgresAggregateError::from)?;
// let customizations = res.get_customizations(&self).await?;
Ok(Some(res))
}
async fn load_with_context(
&self,
customization_id: &str,
) -> Result<Option<(CustomizationView, ViewContext)>, PersistenceError> {
let customization_id = match parse_aggregate_id(customization_id, NEW_PRODUCT_NON_UUID)? {
Some(val) => return Ok(Some(val)),
None => Uuid::parse_str(customization_id).unwrap(),
};
let res = sqlx::query_as!(
CustomizationView,
"SELECT
name,
customization_id,
product_id,
deleted
FROM
cqrs_inventory_product_customizations_query
WHERE
customization_id = $1;",
customization_id
)
.fetch_one(&self.pool)
.await
.map_err(PostgresAggregateError::from)?;
// let customizations = res.get_customizations(&self).await?;
struct Context {
version: i64,
customization_id: Uuid,
}
let ctx = sqlx::query_as!(
Context,
"SELECT
customization_id, version
FROM
cqrs_inventory_product_customizations_query
WHERE
customization_id = $1;",
customization_id
)
.fetch_one(&self.pool)
.await
.map_err(PostgresAggregateError::from)?;
let view_context = ViewContext::new(ctx.customization_id.to_string(), ctx.version);
Ok(Some((res, view_context)))
}
async fn update_view(
&self,
view: CustomizationView,
context: ViewContext,
) -> Result<(), PersistenceError> {
match context.version {
0 => {
let version = context.version + 1;
sqlx::query!(
"INSERT INTO cqrs_inventory_product_customizations_query (
version,
name,
customization_id,
product_id,
deleted
) VALUES (
$1, $2, $3, $4, $5
);",
version,
view.name,
view.customization_id,
view.product_id,
view.deleted,
)
.execute(&self.pool)
.await
.map_err(PostgresAggregateError::from)?;
}
_ => {
let version = context.version + 1;
sqlx::query!(
"UPDATE
cqrs_inventory_product_customizations_query
SET
version = $1,
name = $2,
customization_id = $3,
product_id = $4,
deleted = $5;",
version,
view.name,
view.customization_id,
view.product_id,
view.deleted,
)
.execute(&self.pool)
.await
.map_err(PostgresAggregateError::from)?;
}
}
Ok(())
}
}
#[async_trait]
impl Query<Customization> for InventoryDBPostgresAdapter {
async fn dispatch(&self, customization_id: &str, events: &[EventEnvelope<Customization>]) {
let res = self
.load_with_context(&customization_id)
.await
.unwrap_or_else(|_| {
Some((
CustomizationView::default(),
ViewContext::new(customization_id.into(), 0),
))
});
let (mut view, view_context): (CustomizationView, ViewContext) = res.unwrap();
for event in events {
view.update(event);
}
self.update_view(view, view_context).await.unwrap();
}
}

View file

@ -13,6 +13,7 @@ mod category_name_exists_for_store;
mod category_view;
mod customization_id_exists;
mod customization_name_exists_for_product;
mod customization_view;
mod errors;
mod product_id_exists;
mod product_name_exists_for_category;

View file

@ -0,0 +1,147 @@
// SPDX-FileCopyrightText: 2024 Aravinth Manivannan <realaravinth@batsense.net>
//
// SPDX-License-Identifier: AGPL-3.0-or-later
use std::str::FromStr;
use async_trait::async_trait;
use config::builder;
use cqrs_es::Aggregate;
use derive_builder::Builder;
use derive_getters::Getters;
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use super::{commands::InventoryCommand, events::InventoryEvent};
use crate::inventory::application::services::errors::*;
use crate::inventory::application::services::InventoryServicesInterface;
#[derive(
Clone, Default, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd, Getters, Builder,
)]
pub struct Customization {
name: String,
customization_id: Uuid,
#[builder(default = "false")]
deleted: bool,
}
#[async_trait]
impl Aggregate for Customization {
type Command = InventoryCommand;
type Event = InventoryEvent;
type Error = InventoryError;
type Services = std::sync::Arc<dyn InventoryServicesInterface>;
// This identifier should be unique to the system.
fn aggregate_type() -> String {
"inventory.product".to_string()
}
// The aggregate logic goes here. Note that this will be the _bulk_ of a CQRS system
// so expect to use helper functions elsewhere to keep the code clean.
async fn handle(
&self,
command: Self::Command,
services: &Self::Services,
) -> Result<Vec<Self::Event>, Self::Error> {
match command {
InventoryCommand::AddCustomization(cmd) => {
let res = services.add_customization().add_customization(cmd).await?;
Ok(vec![InventoryEvent::CustomizationAdded(res)])
}
_ => Ok(Vec::default()),
}
}
fn apply(&mut self, event: Self::Event) {
match event {
InventoryEvent::CustomizationAdded(e) => {
*self = e.customization().clone();
}
_ => (),
}
}
}
//#[cfg(test)]
//mod aggregate_tests {
// use std::sync::Arc;
//
// use cqrs_es::test::TestFramework;
//
// use super::*;
// use crate::inventory::{
// application::services::{add_product_service::tests::*, *},
// domain::{
// add_product_command::tests::get_command, commands::InventoryCommand,
// events::InventoryEvent, product_added_event::tests::get_event_from_command,
// },
// };
// use crate::tests::bdd::*;
//
// type ProductTestFramework = TestFramework<Product>;
//
// #[test]
// fn test_create_product() {
// let cmd = get_command();
// let expected = get_event_from_command(&cmd);
// let expected = InventoryEvent::ProductAdded(expected);
//
// let mut services = MockInventoryServicesInterface::new();
// services
// .expect_add_product()
// .times(IS_CALLED_ONLY_ONCE.unwrap())
// .return_const(mock_add_product_service(IS_CALLED_ONLY_ONCE, cmd.clone()));
//
// ProductTestFramework::with(Arc::new(services))
// .given_no_previous_events()
// .when(InventoryCommand::AddProduct(cmd))
// .then_expect_events(vec![expected]);
// }
//
// fn test_helper<T>(t: T, str_value: &str) -> bool
// where
// T: ToString + FromStr + std::fmt::Debug + PartialEq,
// <T as FromStr>::Err: std::fmt::Debug,
// {
// println!("Testing type: {:?} against value {str_value}", t);
// assert_eq!(t.to_string(), str_value.to_string());
//
// assert_eq!(T::from_str(str_value).unwrap(), t);
//
// assert_eq!(T::from_str(t.to_string().as_str()).unwrap(), t,);
//
// true
// }
//
// #[test]
// fn currency_to_string_from_str() {
// assert!(test_helper(Currency::INR, INR));
// }
//
// #[test]
// fn quantity_unit_kilogram() {
// assert!(test_helper(QuantityUnit::Kilogram, KILO_GRAM));
// }
//
// #[test]
// fn quantity_unit_gram() {
// assert!(test_helper(QuantityUnit::Gram, GRAM));
// }
//
// #[test]
// fn quantity_unit_discrete_number() {
// assert!(test_helper(QuantityUnit::DiscreteNumber, DISCRETE_NUMBER));
// }
//
// #[test]
// fn quantity_unit_milli_liter() {
// assert!(test_helper(QuantityUnit::MilliLiter, MILLI_LITER));
// }
//
// #[test]
// fn quantity_unit_liter() {
// assert!(test_helper(QuantityUnit::Liter, LITER));
// }
//}