feat: define product aggregate table and implement cqrs_es::View
Some checks failed
ci/woodpecker/push/woodpecker Pipeline failed
ci/woodpecker/pr/woodpecker Pipeline failed

This commit is contained in:
Aravinth Manivannan 2024-07-15 17:55:09 +05:30
parent 1fe54c5c38
commit 732f486aef
Signed by: realaravinth
GPG key ID: F8F50389936984FF
8 changed files with 481 additions and 0 deletions

View file

@ -0,0 +1,23 @@
{
"db_name": "PostgreSQL",
"query": "INSERT INTO cqrs_inventory_product_query (\n version,\n name,\n description,\n image,\n product_id,\n category_id,\n price_major,\n price_minor,\n price_currency,\n sku_able\n ) VALUES (\n $1, $2, $3, $4, $5, $6, $7, $8, $9, $10\n );",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Int8",
"Text",
"Text",
"Text",
"Uuid",
"Uuid",
"Int4",
"Int4",
"Text",
"Bool"
]
},
"nullable": []
},
"hash": "3c82b22884858afc80ab013abddc072c816f5ea94eac454406f5eaa3129dd180"
}

View file

@ -0,0 +1,70 @@
{
"db_name": "PostgreSQL",
"query": "SELECT \n name,\n description,\n image,\n product_id,\n category_id,\n price_major,\n price_minor,\n price_currency,\n sku_able\n FROM\n cqrs_inventory_product_query\n WHERE\n product_id = $1;",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "name",
"type_info": "Text"
},
{
"ordinal": 1,
"name": "description",
"type_info": "Text"
},
{
"ordinal": 2,
"name": "image",
"type_info": "Text"
},
{
"ordinal": 3,
"name": "product_id",
"type_info": "Uuid"
},
{
"ordinal": 4,
"name": "category_id",
"type_info": "Uuid"
},
{
"ordinal": 5,
"name": "price_major",
"type_info": "Int4"
},
{
"ordinal": 6,
"name": "price_minor",
"type_info": "Int4"
},
{
"ordinal": 7,
"name": "price_currency",
"type_info": "Text"
},
{
"ordinal": 8,
"name": "sku_able",
"type_info": "Bool"
}
],
"parameters": {
"Left": [
"Uuid"
]
},
"nullable": [
false,
true,
true,
false,
false,
false,
false,
false,
false
]
},
"hash": "4389b997a21aa5184102aaf7c90fc527776e7a753b12bf7035904dedde904aae"
}

View file

@ -0,0 +1,23 @@
{
"db_name": "PostgreSQL",
"query": "UPDATE\n cqrs_inventory_product_query\n SET\n version = $1,\n name = $2,\n description = $3,\n image = $4,\n product_id = $5,\n category_id = $6,\n price_major = $7,\n price_minor = $8,\n price_currency = $9,\n sku_able = $10;",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Int8",
"Text",
"Text",
"Text",
"Uuid",
"Uuid",
"Int4",
"Int4",
"Text",
"Bool"
]
},
"nullable": []
},
"hash": "4d3524bb1742ba55267ab53322cf6c6942d6fc4057b65327f8c6d79ef21cc064"
}

View file

@ -0,0 +1,23 @@
{
"db_name": "PostgreSQL",
"query": "SELECT EXISTS (\n SELECT 1\n FROM cqrs_inventory_product_query\n WHERE\n name = $1\n AND\n category_id = $2\n );",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "exists",
"type_info": "Bool"
}
],
"parameters": {
"Left": [
"Text",
"Uuid"
]
},
"nullable": [
null
]
},
"hash": "53a5e7e87387ffc14013067a24c80b9d14c9864875177af5320c926d68cfb4ae"
}

View file

@ -0,0 +1,22 @@
{
"db_name": "PostgreSQL",
"query": "SELECT EXISTS (\n SELECT 1\n FROM cqrs_inventory_product_query\n WHERE\n product_id = $1\n );",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "exists",
"type_info": "Bool"
}
],
"parameters": {
"Left": [
"Uuid"
]
},
"nullable": [
null
]
},
"hash": "e20284d3953f5cba62c63b3a9482cea0739c072a7d83ce29ded99342d203766f"
}

View file

@ -0,0 +1,28 @@
{
"db_name": "PostgreSQL",
"query": "SELECT \n product_id, version\n FROM\n cqrs_inventory_product_query\n WHERE\n product_id = $1;",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "product_id",
"type_info": "Uuid"
},
{
"ordinal": 1,
"name": "version",
"type_info": "Int8"
}
],
"parameters": {
"Left": [
"Uuid"
]
},
"nullable": [
false,
false
]
},
"hash": "fbd8fa5a39f6c5351e7e366ad385c02cfe11dbee292fc0cb80a01228a3cceb93"
}

View file

@ -0,0 +1,27 @@
-- SPDX-FileCopyrightText: 2024 Aravinth Manivannan <realaravinth@batsense.net>
--
-- SPDX-License-Identifier: AGPL-3.0-or-later
CREATE TABLE IF NOT EXISTS cqrs_inventory_product_query
(
version bigint CHECK (version >= 0) NOT NULL,
name TEXT NOT NULL,
description TEXT,
image TEXT,
sku_able BOOLEAN NOT NULL DEFAULT FALSE,
product_id UUID NOT NULL UNIQUE,
price_minor INTEGER NOT NULL,
price_major INTEGER NOT NULL,
price_currency TEXT NOT NULL,
category_id UUID NOT NULL,
deleted BOOLEAN NOT NULL DEFAULT FALSE,
UNIQUE(category_id, name),
PRIMARY KEY (product_id)
);

View file

@ -0,0 +1,265 @@
// SPDX-FileCopyrightText: 2024 Aravinth Manivannan <realaravinth@batsense.net>
//
// SPDX-License-Identifier: AGPL-3.0-or-later
use std::str::FromStr;
use async_trait::async_trait;
use cqrs_es::persist::{PersistenceError, ViewContext, ViewRepository};
use cqrs_es::{EventEnvelope, Query, View};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use super::errors::*;
use super::InventoryDBPostgresAdapter;
use crate::inventory::domain::events::InventoryEvent;
use crate::inventory::domain::product_aggregate::{
Currency, PriceBuilder, Product, ProductBuilder,
};
use crate::utils::parse_aggregate_id::parse_aggregate_id;
pub const NEW_PRODUCT_NON_UUID: &str = "new_product_non_uuid-asdfa";
// The view for a Product query, for a standard http application this should
// be designed to reflect the response dto that will be returned to a user.
#[derive(Debug, Default, Serialize, Deserialize)]
pub struct ProductView {
name: String,
description: Option<String>,
image: Option<String>, // string = filename
product_id: Uuid,
sku_able: bool,
price_minor: i32,
price_major: i32,
price_currency: String,
category_id: Uuid,
}
impl From<ProductView> for Product {
fn from(v: ProductView) -> Self {
let price = PriceBuilder::default()
.minor(v.price_minor as usize)
.major(v.price_major as usize)
.currency(Currency::from_str(&v.price_currency).unwrap())
.build()
.unwrap();
ProductBuilder::default()
.name(v.name)
.description(v.description)
.image(v.image)
.sku_able(v.sku_able)
.price(price)
.category_id(v.category_id)
.product_id(v.product_id)
.build()
.unwrap()
}
}
// This updates the view with events as they are committed.
// The logic should be minimal here, e.g., don't calculate the account balance,
// design the events to carry the balance information instead.
impl View<Product> for ProductView {
fn update(&mut self, event: &EventEnvelope<Product>) {
match &event.payload {
InventoryEvent::ProductAdded(val) => {
self.name = val.name().into();
self.description = val.description().clone();
self.image = val.image().clone();
self.product_id = val.product_id().clone();
self.category_id = val.category_id().clone();
self.sku_able = val.sku_able().clone();
self.price_minor = val.price().minor().clone() as i32;
self.price_major = val.price().major().clone() as i32;
self.price_currency = val.price().currency().to_string();
}
_ => (),
}
}
}
#[async_trait]
impl ViewRepository<ProductView, Product> for InventoryDBPostgresAdapter {
async fn load(&self, product_id: &str) -> Result<Option<ProductView>, PersistenceError> {
let product_id = match parse_aggregate_id(product_id, NEW_PRODUCT_NON_UUID)? {
Some((val, _)) => return Ok(Some(val)),
None => Uuid::parse_str(product_id).unwrap(),
};
let res = sqlx::query_as!(
ProductView,
"SELECT
name,
description,
image,
product_id,
category_id,
price_major,
price_minor,
price_currency,
sku_able
FROM
cqrs_inventory_product_query
WHERE
product_id = $1;",
product_id
)
.fetch_one(&self.pool)
.await
.map_err(PostgresAggregateError::from)?;
Ok(Some(res))
}
async fn load_with_context(
&self,
product_id: &str,
) -> Result<Option<(ProductView, ViewContext)>, PersistenceError> {
let product_id = match parse_aggregate_id(product_id, NEW_PRODUCT_NON_UUID)? {
Some(val) => return Ok(Some(val)),
None => Uuid::parse_str(product_id).unwrap(),
};
let res = sqlx::query_as!(
ProductView,
"SELECT
name,
description,
image,
product_id,
category_id,
price_major,
price_minor,
price_currency,
sku_able
FROM
cqrs_inventory_product_query
WHERE
product_id = $1;",
product_id
)
.fetch_one(&self.pool)
.await
.map_err(PostgresAggregateError::from)?;
struct Context {
version: i64,
product_id: Uuid,
}
let ctx = sqlx::query_as!(
Context,
"SELECT
product_id, version
FROM
cqrs_inventory_product_query
WHERE
product_id = $1;",
product_id
)
.fetch_one(&self.pool)
.await
.map_err(PostgresAggregateError::from)?;
let view_context = ViewContext::new(ctx.product_id.to_string(), ctx.version);
Ok(Some((res, view_context)))
}
async fn update_view(
&self,
view: ProductView,
context: ViewContext,
) -> Result<(), PersistenceError> {
match context.version {
0 => {
let version = context.version + 1;
sqlx::query!(
"INSERT INTO cqrs_inventory_product_query (
version,
name,
description,
image,
product_id,
category_id,
price_major,
price_minor,
price_currency,
sku_able
) VALUES (
$1, $2, $3, $4, $5, $6, $7, $8, $9, $10
);",
version,
view.name,
view.description,
view.image,
view.product_id,
view.category_id,
view.price_major,
view.price_minor,
view.price_currency,
view.sku_able
)
.execute(&self.pool)
.await
.map_err(PostgresAggregateError::from)?;
}
_ => {
let version = context.version + 1;
sqlx::query!(
"UPDATE
cqrs_inventory_product_query
SET
version = $1,
name = $2,
description = $3,
image = $4,
product_id = $5,
category_id = $6,
price_major = $7,
price_minor = $8,
price_currency = $9,
sku_able = $10;",
version,
view.name,
view.description,
view.image,
view.product_id,
view.category_id,
view.price_major,
view.price_minor,
view.price_currency,
view.sku_able
)
.execute(&self.pool)
.await
.map_err(PostgresAggregateError::from)?;
}
}
Ok(())
}
}
#[async_trait]
impl Query<Product> for InventoryDBPostgresAdapter {
async fn dispatch(&self, product_id: &str, events: &[EventEnvelope<Product>]) {
let res = self
.load_with_context(&product_id)
.await
.unwrap_or_else(|_| {
Some((
ProductView::default(),
ViewContext::new(product_id.into(), 0),
))
});
let (mut view, view_context): (ProductView, ViewContext) = res.unwrap();
for event in events {
view.update(event);
}
self.update_view(view, view_context).await.unwrap();
}
}