From 732f486aef5a3e20868dbff0f1b7271e0c7ecc01 Mon Sep 17 00:00:00 2001 From: Aravinth Manivannan Date: Mon, 15 Jul 2024 17:55:09 +0530 Subject: [PATCH] feat: define product aggregate table and implement cqrs_es::View --- ...c072c816f5ea94eac454406f5eaa3129dd180.json | 23 ++ ...fc527776e7a753b12bf7035904dedde904aae.json | 70 +++++ ...f6c6942d6fc4057b65327f8c6d79ef21cc064.json | 23 ++ ...80b9d14c9864875177af5320c926d68cfb4ae.json | 23 ++ ...2cea0739c072a7d83ce29ded99342d203766f.json | 22 ++ ...5c02cfe11dbee292fc0cb80a01228a3cceb93.json | 28 ++ ...715113708_cqrs_inventory_product_query.sql | 27 ++ .../output/db/postgres/product_view.rs | 265 ++++++++++++++++++ 8 files changed, 481 insertions(+) create mode 100644 .sqlx/query-3c82b22884858afc80ab013abddc072c816f5ea94eac454406f5eaa3129dd180.json create mode 100644 .sqlx/query-4389b997a21aa5184102aaf7c90fc527776e7a753b12bf7035904dedde904aae.json create mode 100644 .sqlx/query-4d3524bb1742ba55267ab53322cf6c6942d6fc4057b65327f8c6d79ef21cc064.json create mode 100644 .sqlx/query-53a5e7e87387ffc14013067a24c80b9d14c9864875177af5320c926d68cfb4ae.json create mode 100644 .sqlx/query-e20284d3953f5cba62c63b3a9482cea0739c072a7d83ce29ded99342d203766f.json create mode 100644 .sqlx/query-fbd8fa5a39f6c5351e7e366ad385c02cfe11dbee292fc0cb80a01228a3cceb93.json create mode 100644 migrations/20240715113708_cqrs_inventory_product_query.sql create mode 100644 src/inventory/adapters/output/db/postgres/product_view.rs diff --git a/.sqlx/query-3c82b22884858afc80ab013abddc072c816f5ea94eac454406f5eaa3129dd180.json b/.sqlx/query-3c82b22884858afc80ab013abddc072c816f5ea94eac454406f5eaa3129dd180.json new file mode 100644 index 0000000..ed65490 --- /dev/null +++ b/.sqlx/query-3c82b22884858afc80ab013abddc072c816f5ea94eac454406f5eaa3129dd180.json @@ -0,0 +1,23 @@ +{ + "db_name": "PostgreSQL", + "query": "INSERT INTO cqrs_inventory_product_query (\n version,\n name,\n description,\n image,\n product_id,\n category_id,\n price_major,\n price_minor,\n price_currency,\n sku_able\n ) VALUES (\n $1, $2, $3, $4, $5, $6, $7, $8, $9, $10\n );", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int8", + "Text", + "Text", + "Text", + "Uuid", + "Uuid", + "Int4", + "Int4", + "Text", + "Bool" + ] + }, + "nullable": [] + }, + "hash": "3c82b22884858afc80ab013abddc072c816f5ea94eac454406f5eaa3129dd180" +} diff --git a/.sqlx/query-4389b997a21aa5184102aaf7c90fc527776e7a753b12bf7035904dedde904aae.json b/.sqlx/query-4389b997a21aa5184102aaf7c90fc527776e7a753b12bf7035904dedde904aae.json new file mode 100644 index 0000000..923f7f5 --- /dev/null +++ b/.sqlx/query-4389b997a21aa5184102aaf7c90fc527776e7a753b12bf7035904dedde904aae.json @@ -0,0 +1,70 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT \n name,\n description,\n image,\n product_id,\n category_id,\n price_major,\n price_minor,\n price_currency,\n sku_able\n FROM\n cqrs_inventory_product_query\n WHERE\n product_id = $1;", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "name", + "type_info": "Text" + }, + { + "ordinal": 1, + "name": "description", + "type_info": "Text" + }, + { + "ordinal": 2, + "name": "image", + "type_info": "Text" + }, + { + "ordinal": 3, + "name": "product_id", + "type_info": "Uuid" + }, + { + "ordinal": 4, + "name": "category_id", + "type_info": "Uuid" + }, + { + "ordinal": 5, + "name": "price_major", + "type_info": "Int4" + }, + { + "ordinal": 6, + "name": "price_minor", + "type_info": "Int4" + }, + { + "ordinal": 7, + "name": "price_currency", + "type_info": "Text" + }, + { + "ordinal": 8, + "name": "sku_able", + "type_info": "Bool" + } + ], + "parameters": { + "Left": [ + "Uuid" + ] + }, + "nullable": [ + false, + true, + true, + false, + false, + false, + false, + false, + false + ] + }, + "hash": "4389b997a21aa5184102aaf7c90fc527776e7a753b12bf7035904dedde904aae" +} diff --git a/.sqlx/query-4d3524bb1742ba55267ab53322cf6c6942d6fc4057b65327f8c6d79ef21cc064.json b/.sqlx/query-4d3524bb1742ba55267ab53322cf6c6942d6fc4057b65327f8c6d79ef21cc064.json new file mode 100644 index 0000000..cc910f8 --- /dev/null +++ b/.sqlx/query-4d3524bb1742ba55267ab53322cf6c6942d6fc4057b65327f8c6d79ef21cc064.json @@ -0,0 +1,23 @@ +{ + "db_name": "PostgreSQL", + "query": "UPDATE\n cqrs_inventory_product_query\n SET\n version = $1,\n name = $2,\n description = $3,\n image = $4,\n product_id = $5,\n category_id = $6,\n price_major = $7,\n price_minor = $8,\n price_currency = $9,\n sku_able = $10;", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int8", + "Text", + "Text", + "Text", + "Uuid", + "Uuid", + "Int4", + "Int4", + "Text", + "Bool" + ] + }, + "nullable": [] + }, + "hash": "4d3524bb1742ba55267ab53322cf6c6942d6fc4057b65327f8c6d79ef21cc064" +} diff --git a/.sqlx/query-53a5e7e87387ffc14013067a24c80b9d14c9864875177af5320c926d68cfb4ae.json b/.sqlx/query-53a5e7e87387ffc14013067a24c80b9d14c9864875177af5320c926d68cfb4ae.json new file mode 100644 index 0000000..5050200 --- /dev/null +++ b/.sqlx/query-53a5e7e87387ffc14013067a24c80b9d14c9864875177af5320c926d68cfb4ae.json @@ -0,0 +1,23 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT EXISTS (\n SELECT 1\n FROM cqrs_inventory_product_query\n WHERE\n name = $1\n AND\n category_id = $2\n );", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "exists", + "type_info": "Bool" + } + ], + "parameters": { + "Left": [ + "Text", + "Uuid" + ] + }, + "nullable": [ + null + ] + }, + "hash": "53a5e7e87387ffc14013067a24c80b9d14c9864875177af5320c926d68cfb4ae" +} diff --git a/.sqlx/query-e20284d3953f5cba62c63b3a9482cea0739c072a7d83ce29ded99342d203766f.json b/.sqlx/query-e20284d3953f5cba62c63b3a9482cea0739c072a7d83ce29ded99342d203766f.json new file mode 100644 index 0000000..7ad7bd5 --- /dev/null +++ b/.sqlx/query-e20284d3953f5cba62c63b3a9482cea0739c072a7d83ce29ded99342d203766f.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT EXISTS (\n SELECT 1\n FROM cqrs_inventory_product_query\n WHERE\n product_id = $1\n );", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "exists", + "type_info": "Bool" + } + ], + "parameters": { + "Left": [ + "Uuid" + ] + }, + "nullable": [ + null + ] + }, + "hash": "e20284d3953f5cba62c63b3a9482cea0739c072a7d83ce29ded99342d203766f" +} diff --git a/.sqlx/query-fbd8fa5a39f6c5351e7e366ad385c02cfe11dbee292fc0cb80a01228a3cceb93.json b/.sqlx/query-fbd8fa5a39f6c5351e7e366ad385c02cfe11dbee292fc0cb80a01228a3cceb93.json new file mode 100644 index 0000000..cf43695 --- /dev/null +++ b/.sqlx/query-fbd8fa5a39f6c5351e7e366ad385c02cfe11dbee292fc0cb80a01228a3cceb93.json @@ -0,0 +1,28 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT \n product_id, version\n FROM\n cqrs_inventory_product_query\n WHERE\n product_id = $1;", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "product_id", + "type_info": "Uuid" + }, + { + "ordinal": 1, + "name": "version", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + "Uuid" + ] + }, + "nullable": [ + false, + false + ] + }, + "hash": "fbd8fa5a39f6c5351e7e366ad385c02cfe11dbee292fc0cb80a01228a3cceb93" +} diff --git a/migrations/20240715113708_cqrs_inventory_product_query.sql b/migrations/20240715113708_cqrs_inventory_product_query.sql new file mode 100644 index 0000000..0036ba6 --- /dev/null +++ b/migrations/20240715113708_cqrs_inventory_product_query.sql @@ -0,0 +1,27 @@ +-- SPDX-FileCopyrightText: 2024 Aravinth Manivannan +-- +-- SPDX-License-Identifier: AGPL-3.0-or-later + +CREATE TABLE IF NOT EXISTS cqrs_inventory_product_query +( + version bigint CHECK (version >= 0) NOT NULL, + + name TEXT NOT NULL, + description TEXT, + image TEXT, + sku_able BOOLEAN NOT NULL DEFAULT FALSE, + product_id UUID NOT NULL UNIQUE, + + + price_minor INTEGER NOT NULL, + price_major INTEGER NOT NULL, + price_currency TEXT NOT NULL, + + + category_id UUID NOT NULL, + + deleted BOOLEAN NOT NULL DEFAULT FALSE, + UNIQUE(category_id, name), + + PRIMARY KEY (product_id) +); diff --git a/src/inventory/adapters/output/db/postgres/product_view.rs b/src/inventory/adapters/output/db/postgres/product_view.rs new file mode 100644 index 0000000..be06416 --- /dev/null +++ b/src/inventory/adapters/output/db/postgres/product_view.rs @@ -0,0 +1,265 @@ +// SPDX-FileCopyrightText: 2024 Aravinth Manivannan +// +// SPDX-License-Identifier: AGPL-3.0-or-later + +use std::str::FromStr; + +use async_trait::async_trait; +use cqrs_es::persist::{PersistenceError, ViewContext, ViewRepository}; +use cqrs_es::{EventEnvelope, Query, View}; +use serde::{Deserialize, Serialize}; +use uuid::Uuid; + +use super::errors::*; +use super::InventoryDBPostgresAdapter; +use crate::inventory::domain::events::InventoryEvent; +use crate::inventory::domain::product_aggregate::{ + Currency, PriceBuilder, Product, ProductBuilder, +}; +use crate::utils::parse_aggregate_id::parse_aggregate_id; + +pub const NEW_PRODUCT_NON_UUID: &str = "new_product_non_uuid-asdfa"; + +// The view for a Product query, for a standard http application this should +// be designed to reflect the response dto that will be returned to a user. +#[derive(Debug, Default, Serialize, Deserialize)] +pub struct ProductView { + name: String, + description: Option, + image: Option, // string = filename + product_id: Uuid, + sku_able: bool, + + price_minor: i32, + price_major: i32, + price_currency: String, + + category_id: Uuid, +} + +impl From for Product { + fn from(v: ProductView) -> Self { + let price = PriceBuilder::default() + .minor(v.price_minor as usize) + .major(v.price_major as usize) + .currency(Currency::from_str(&v.price_currency).unwrap()) + .build() + .unwrap(); + + ProductBuilder::default() + .name(v.name) + .description(v.description) + .image(v.image) + .sku_able(v.sku_able) + .price(price) + .category_id(v.category_id) + .product_id(v.product_id) + .build() + .unwrap() + } +} + +// This updates the view with events as they are committed. +// The logic should be minimal here, e.g., don't calculate the account balance, +// design the events to carry the balance information instead. +impl View for ProductView { + fn update(&mut self, event: &EventEnvelope) { + match &event.payload { + InventoryEvent::ProductAdded(val) => { + self.name = val.name().into(); + self.description = val.description().clone(); + self.image = val.image().clone(); + self.product_id = val.product_id().clone(); + self.category_id = val.category_id().clone(); + + self.sku_able = val.sku_able().clone(); + + self.price_minor = val.price().minor().clone() as i32; + self.price_major = val.price().major().clone() as i32; + self.price_currency = val.price().currency().to_string(); + } + _ => (), + } + } +} + +#[async_trait] +impl ViewRepository for InventoryDBPostgresAdapter { + async fn load(&self, product_id: &str) -> Result, PersistenceError> { + let product_id = match parse_aggregate_id(product_id, NEW_PRODUCT_NON_UUID)? { + Some((val, _)) => return Ok(Some(val)), + None => Uuid::parse_str(product_id).unwrap(), + }; + + let res = sqlx::query_as!( + ProductView, + "SELECT + name, + description, + image, + product_id, + category_id, + price_major, + price_minor, + price_currency, + sku_able + FROM + cqrs_inventory_product_query + WHERE + product_id = $1;", + product_id + ) + .fetch_one(&self.pool) + .await + .map_err(PostgresAggregateError::from)?; + Ok(Some(res)) + } + + async fn load_with_context( + &self, + product_id: &str, + ) -> Result, PersistenceError> { + let product_id = match parse_aggregate_id(product_id, NEW_PRODUCT_NON_UUID)? { + Some(val) => return Ok(Some(val)), + None => Uuid::parse_str(product_id).unwrap(), + }; + + let res = sqlx::query_as!( + ProductView, + "SELECT + name, + description, + image, + product_id, + category_id, + price_major, + price_minor, + price_currency, + sku_able + FROM + cqrs_inventory_product_query + WHERE + product_id = $1;", + product_id + ) + .fetch_one(&self.pool) + .await + .map_err(PostgresAggregateError::from)?; + + struct Context { + version: i64, + product_id: Uuid, + } + + let ctx = sqlx::query_as!( + Context, + "SELECT + product_id, version + FROM + cqrs_inventory_product_query + WHERE + product_id = $1;", + product_id + ) + .fetch_one(&self.pool) + .await + .map_err(PostgresAggregateError::from)?; + + let view_context = ViewContext::new(ctx.product_id.to_string(), ctx.version); + Ok(Some((res, view_context))) + } + + async fn update_view( + &self, + view: ProductView, + context: ViewContext, + ) -> Result<(), PersistenceError> { + match context.version { + 0 => { + let version = context.version + 1; + sqlx::query!( + "INSERT INTO cqrs_inventory_product_query ( + version, + name, + description, + image, + product_id, + category_id, + price_major, + price_minor, + price_currency, + sku_able + ) VALUES ( + $1, $2, $3, $4, $5, $6, $7, $8, $9, $10 + );", + version, + view.name, + view.description, + view.image, + view.product_id, + view.category_id, + view.price_major, + view.price_minor, + view.price_currency, + view.sku_able + ) + .execute(&self.pool) + .await + .map_err(PostgresAggregateError::from)?; + } + _ => { + let version = context.version + 1; + sqlx::query!( + "UPDATE + cqrs_inventory_product_query + SET + version = $1, + name = $2, + description = $3, + image = $4, + product_id = $5, + category_id = $6, + price_major = $7, + price_minor = $8, + price_currency = $9, + sku_able = $10;", + version, + view.name, + view.description, + view.image, + view.product_id, + view.category_id, + view.price_major, + view.price_minor, + view.price_currency, + view.sku_able + ) + .execute(&self.pool) + .await + .map_err(PostgresAggregateError::from)?; + } + } + + Ok(()) + } +} + +#[async_trait] +impl Query for InventoryDBPostgresAdapter { + async fn dispatch(&self, product_id: &str, events: &[EventEnvelope]) { + let res = self + .load_with_context(&product_id) + .await + .unwrap_or_else(|_| { + Some(( + ProductView::default(), + ViewContext::new(product_id.into(), 0), + )) + }); + let (mut view, view_context): (ProductView, ViewContext) = res.unwrap(); + for event in events { + view.update(event); + } + self.update_view(view, view_context).await.unwrap(); + } +}