feat: implement View for LineItem aggregate

This commit is contained in:
Aravinth Manivannan 2024-07-23 17:41:10 +05:30
parent ac1964d21a
commit 66101f0aeb
Signed by: realaravinth
GPG key ID: F8F50389936984FF
8 changed files with 496 additions and 0 deletions

View file

@ -0,0 +1,22 @@
{
"db_name": "PostgreSQL",
"query": "INSERT INTO cqrs_ordering_line_item_query (\n version,\n product_name,\n product_id,\n line_item_id,\n quantity_minor_unit,\n quantity_minor_number,\n quantity_major_unit,\n quantity_major_number,\n\n deleted\n ) VALUES (\n $1, $2, $3, $4, $5, $6, $7, $8, $9\n );",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Int8",
"Text",
"Uuid",
"Uuid",
"Text",
"Int4",
"Text",
"Int4",
"Bool"
]
},
"nullable": []
},
"hash": "1e7df92c508fac4c32c00621b099c673d8745b8d145b603807c771906a7af756"
}

View file

@ -0,0 +1,64 @@
{
"db_name": "PostgreSQL",
"query": "SELECT \n product_name,\n product_id,\n line_item_id,\n quantity_minor_unit,\n quantity_minor_number,\n quantity_major_unit,\n quantity_major_number,\n deleted\n FROM\n cqrs_ordering_line_item_query\n WHERE\n line_item_id = $1;",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "product_name",
"type_info": "Text"
},
{
"ordinal": 1,
"name": "product_id",
"type_info": "Uuid"
},
{
"ordinal": 2,
"name": "line_item_id",
"type_info": "Uuid"
},
{
"ordinal": 3,
"name": "quantity_minor_unit",
"type_info": "Text"
},
{
"ordinal": 4,
"name": "quantity_minor_number",
"type_info": "Int4"
},
{
"ordinal": 5,
"name": "quantity_major_unit",
"type_info": "Text"
},
{
"ordinal": 6,
"name": "quantity_major_number",
"type_info": "Int4"
},
{
"ordinal": 7,
"name": "deleted",
"type_info": "Bool"
}
],
"parameters": {
"Left": [
"Uuid"
]
},
"nullable": [
false,
false,
false,
false,
false,
false,
false,
false
]
},
"hash": "3f2b3da434c433067e0a68fa98ad0d7b1a00a836682d9ef20fd58c72bd5115f5"
}

View file

@ -0,0 +1,22 @@
{
"db_name": "PostgreSQL",
"query": "SELECT EXISTS (\n SELECT 1\n FROM cqrs_ordering_line_item_query\n WHERE\n line_item_id = $1\n );",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "exists",
"type_info": "Bool"
}
],
"parameters": {
"Left": [
"Uuid"
]
},
"nullable": [
null
]
},
"hash": "43c79c431bea6029ee2d2dc5997ab00ee6db214f683a1c877b08e27381148a91"
}

View file

@ -0,0 +1,22 @@
{
"db_name": "PostgreSQL",
"query": "UPDATE\n cqrs_ordering_line_item_query\n SET\n version = $1,\n product_name = $2,\n product_id = $3,\n line_item_id = $4,\n quantity_minor_unit = $5,\n quantity_minor_number = $6,\n quantity_major_unit = $7,\n quantity_major_number = $8,\n deleted = $9;",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Int8",
"Text",
"Uuid",
"Uuid",
"Text",
"Int4",
"Text",
"Int4",
"Bool"
]
},
"nullable": []
},
"hash": "46fddc14a06f84a15fbcc04cfff9d3f41e03c73e2db7d7ee39a0f3e86cc38fe9"
}

View file

@ -0,0 +1,46 @@
{
"db_name": "PostgreSQL",
"query": "SELECT\n name, description, store_id, category_id, deleted\n FROM\n cqrs_inventory_category_query\n WHERE\n category_id = $1;",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "name",
"type_info": "Text"
},
{
"ordinal": 1,
"name": "description",
"type_info": "Text"
},
{
"ordinal": 2,
"name": "store_id",
"type_info": "Uuid"
},
{
"ordinal": 3,
"name": "category_id",
"type_info": "Uuid"
},
{
"ordinal": 4,
"name": "deleted",
"type_info": "Bool"
}
],
"parameters": {
"Left": [
"Uuid"
]
},
"nullable": [
false,
true,
false,
false,
false
]
},
"hash": "da317770b9f4874f805c9edea8a7fed98c2186a30ac370864a4db2d64cc14b75"
}

View file

@ -0,0 +1,28 @@
{
"db_name": "PostgreSQL",
"query": "SELECT \n line_item_id, version\n FROM\n cqrs_ordering_line_item_query\n WHERE\n line_item_id = $1;",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "line_item_id",
"type_info": "Uuid"
},
{
"ordinal": 1,
"name": "version",
"type_info": "Int8"
}
],
"parameters": {
"Left": [
"Uuid"
]
},
"nullable": [
false,
false
]
},
"hash": "ea58498982a42d17c51828387df4b446f9cdc89986fd49c8f9d736eeeda12f48"
}

View file

@ -0,0 +1,23 @@
-- SPDX-FileCopyrightText: 2024 Aravinth Manivannan <realaravinth@batsense.net>
--
-- SPDX-License-Identifier: AGPL-3.0-or-later
CREATE TABLE IF NOT EXISTS cqrs_ordering_line_item_query
(
version bigint CHECK (version >= 0) NOT NULL,
sale_time timestamp with time zone DEFAULT (CURRENT_TIMESTAMP),
line_item_id UUID NOT NULL UNIQUE,
product_name TEXT NOT NULL,
product_id UUID NOT NULL,
quantity_major_number INTEGER NOT NULL,
quantity_minor_number INTEGER NOT NULL,
quantity_major_unit TEXT NOT NULL,
quantity_minor_unit TEXT NOT NULL,
deleted BOOLEAN NOT NULL DEFAULT FALSE,
PRIMARY KEY (line_item_id)
);

View file

@ -0,0 +1,269 @@
// SPDX-FileCopyrightText: 2024 Aravinth Manivannan <realaravinth@batsense.net>
//
// SPDX-License-Identifier: AGPL-3.0-or-later
use std::str::FromStr;
use async_trait::async_trait;
use cqrs_es::persist::{PersistenceError, ViewContext, ViewRepository};
use cqrs_es::{EventEnvelope, Query, View};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use super::errors::*;
use super::OrderingDBPostgresAdapter;
use crate::ordering::domain::events::OrderingEvent;
use crate::ordering::domain::line_item_aggregate::*;
use crate::types::quantity::*;
use crate::utils::parse_aggregate_id::parse_aggregate_id;
pub const NEW_LINE_ITEM_NON_UUID: &str = "new_line_item_non_uuid-asdfa";
// The view for a LineItem query, for a standard http application this should
// be designed to reflect the response dto that will be returned to a user.
#[derive(Debug, Default, Serialize, Deserialize)]
pub struct LineItemView {
product_name: String,
product_id: Uuid,
line_item_id: Uuid,
quantity_major_number: i32,
quantity_minor_number: i32,
quantity_major_unit: String,
quantity_minor_unit: String,
deleted: bool,
}
impl From<LineItemView> for LineItem {
fn from(v: LineItemView) -> Self {
let quantity = QuantityBuilder::default()
.minor(
QuantityPartBuilder::default()
.number(v.quantity_minor_number as usize)
.unit(QuantityUnit::from_str(&v.quantity_minor_unit).unwrap())
.build()
.unwrap(),
)
.major(
QuantityPartBuilder::default()
.number(v.quantity_major_number as usize)
.unit(QuantityUnit::from_str(&v.quantity_major_unit).unwrap())
.build()
.unwrap(),
)
.build()
.unwrap();
LineItemBuilder::default()
.product_name(v.product_name)
.line_item_id(v.line_item_id)
.quantity(quantity)
.product_id(v.product_id)
.deleted(v.deleted)
.build()
.unwrap()
}
}
// This updates the view with events as they are committed.
// The logic should be minimal here, e.g., don't calculate the account balance,
// design the events to carry the balance information instead.
impl View<LineItem> for LineItemView {
fn update(&mut self, event: &EventEnvelope<LineItem>) {
match &event.payload {
OrderingEvent::LineItemAdded(val) => {
self.product_name = val.line_item().product_name().into();
self.product_id = *val.line_item().product_id();
self.line_item_id = *val.line_item().line_item_id();
self.quantity_major_number = *val.line_item().quantity().major().number() as i32;
self.quantity_minor_number = *val.line_item().quantity().minor().number() as i32;
self.quantity_major_unit = val.line_item().quantity().major().unit().to_string();
self.quantity_minor_unit = val.line_item().quantity().minor().unit().to_string();
self.deleted = false;
}
_ => (),
}
}
}
#[async_trait]
impl ViewRepository<LineItemView, LineItem> for OrderingDBPostgresAdapter {
async fn load(&self, line_item_id: &str) -> Result<Option<LineItemView>, PersistenceError> {
let line_item_id = match parse_aggregate_id(line_item_id, NEW_LINE_ITEM_NON_UUID)? {
Some((val, _)) => return Ok(Some(val)),
None => Uuid::parse_str(line_item_id).unwrap(),
};
let res = sqlx::query_as!(
LineItemView,
"SELECT
product_name,
product_id,
line_item_id,
quantity_minor_unit,
quantity_minor_number,
quantity_major_unit,
quantity_major_number,
deleted
FROM
cqrs_ordering_line_item_query
WHERE
line_item_id = $1;",
line_item_id
)
.fetch_one(&self.pool)
.await
.map_err(PostgresAggregateError::from)?;
Ok(Some(res))
}
async fn load_with_context(
&self,
line_item_id: &str,
) -> Result<Option<(LineItemView, ViewContext)>, PersistenceError> {
let line_item_id = match parse_aggregate_id(line_item_id, NEW_LINE_ITEM_NON_UUID)? {
Some(val) => return Ok(Some(val)),
None => Uuid::parse_str(line_item_id).unwrap(),
};
let res = sqlx::query_as!(
LineItemView,
"SELECT
product_name,
product_id,
line_item_id,
quantity_minor_unit,
quantity_minor_number,
quantity_major_unit,
quantity_major_number,
deleted
FROM
cqrs_ordering_line_item_query
WHERE
line_item_id = $1;",
line_item_id
)
.fetch_one(&self.pool)
.await
.map_err(PostgresAggregateError::from)?;
struct Context {
version: i64,
line_item_id: Uuid,
}
let ctx = sqlx::query_as!(
Context,
"SELECT
line_item_id, version
FROM
cqrs_ordering_line_item_query
WHERE
line_item_id = $1;",
line_item_id
)
.fetch_one(&self.pool)
.await
.map_err(PostgresAggregateError::from)?;
let view_context = ViewContext::new(ctx.line_item_id.to_string(), ctx.version);
Ok(Some((res, view_context)))
}
async fn update_view(
&self,
view: LineItemView,
context: ViewContext,
) -> Result<(), PersistenceError> {
match context.version {
0 => {
let version = context.version + 1;
sqlx::query!(
"INSERT INTO cqrs_ordering_line_item_query (
version,
product_name,
product_id,
line_item_id,
quantity_minor_unit,
quantity_minor_number,
quantity_major_unit,
quantity_major_number,
deleted
) VALUES (
$1, $2, $3, $4, $5, $6, $7, $8, $9
);",
version,
view.product_name,
view.product_id,
view.line_item_id,
view.quantity_minor_unit,
view.quantity_minor_number,
view.quantity_major_unit,
view.quantity_major_number,
view.deleted,
)
.execute(&self.pool)
.await
.map_err(PostgresAggregateError::from)?;
}
_ => {
let version = context.version + 1;
sqlx::query!(
"UPDATE
cqrs_ordering_line_item_query
SET
version = $1,
product_name = $2,
product_id = $3,
line_item_id = $4,
quantity_minor_unit = $5,
quantity_minor_number = $6,
quantity_major_unit = $7,
quantity_major_number = $8,
deleted = $9;",
version,
view.product_name,
view.product_id,
view.line_item_id,
view.quantity_minor_unit,
view.quantity_minor_number,
view.quantity_major_unit,
view.quantity_major_number,
view.deleted,
)
.execute(&self.pool)
.await
.map_err(PostgresAggregateError::from)?;
}
}
Ok(())
}
}
#[async_trait]
impl Query<LineItem> for OrderingDBPostgresAdapter {
async fn dispatch(&self, line_item_id: &str, events: &[EventEnvelope<LineItem>]) {
let res = self
.load_with_context(line_item_id)
.await
.unwrap_or_else(|_| {
Some((
LineItemView::default(),
ViewContext::new(line_item_id.into(), 0),
))
});
let (mut view, view_context): (LineItemView, ViewContext) = res.unwrap();
for event in events {
view.update(event);
}
self.update_view(view, view_context).await.unwrap();
}
}