diff --git a/.sqlx/query-1e7df92c508fac4c32c00621b099c673d8745b8d145b603807c771906a7af756.json b/.sqlx/query-1e7df92c508fac4c32c00621b099c673d8745b8d145b603807c771906a7af756.json new file mode 100644 index 0000000..18c1034 --- /dev/null +++ b/.sqlx/query-1e7df92c508fac4c32c00621b099c673d8745b8d145b603807c771906a7af756.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "INSERT INTO cqrs_ordering_line_item_query (\n version,\n product_name,\n product_id,\n line_item_id,\n quantity_minor_unit,\n quantity_minor_number,\n quantity_major_unit,\n quantity_major_number,\n\n deleted\n ) VALUES (\n $1, $2, $3, $4, $5, $6, $7, $8, $9\n );", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int8", + "Text", + "Uuid", + "Uuid", + "Text", + "Int4", + "Text", + "Int4", + "Bool" + ] + }, + "nullable": [] + }, + "hash": "1e7df92c508fac4c32c00621b099c673d8745b8d145b603807c771906a7af756" +} diff --git a/.sqlx/query-3f2b3da434c433067e0a68fa98ad0d7b1a00a836682d9ef20fd58c72bd5115f5.json b/.sqlx/query-3f2b3da434c433067e0a68fa98ad0d7b1a00a836682d9ef20fd58c72bd5115f5.json new file mode 100644 index 0000000..75532e1 --- /dev/null +++ b/.sqlx/query-3f2b3da434c433067e0a68fa98ad0d7b1a00a836682d9ef20fd58c72bd5115f5.json @@ -0,0 +1,64 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT \n product_name,\n product_id,\n line_item_id,\n quantity_minor_unit,\n quantity_minor_number,\n quantity_major_unit,\n quantity_major_number,\n deleted\n FROM\n cqrs_ordering_line_item_query\n WHERE\n line_item_id = $1;", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "product_name", + "type_info": "Text" + }, + { + "ordinal": 1, + "name": "product_id", + "type_info": "Uuid" + }, + { + "ordinal": 2, + "name": "line_item_id", + "type_info": "Uuid" + }, + { + "ordinal": 3, + "name": "quantity_minor_unit", + "type_info": "Text" + }, + { + "ordinal": 4, + "name": "quantity_minor_number", + "type_info": "Int4" + }, + { + "ordinal": 5, + "name": "quantity_major_unit", + "type_info": "Text" + }, + { + "ordinal": 6, + "name": "quantity_major_number", + "type_info": "Int4" + }, + { + "ordinal": 7, + "name": "deleted", + "type_info": "Bool" + } + ], + "parameters": { + "Left": [ + "Uuid" + ] + }, + "nullable": [ + false, + false, + false, + false, + false, + false, + false, + false + ] + }, + "hash": "3f2b3da434c433067e0a68fa98ad0d7b1a00a836682d9ef20fd58c72bd5115f5" +} diff --git a/.sqlx/query-43c79c431bea6029ee2d2dc5997ab00ee6db214f683a1c877b08e27381148a91.json b/.sqlx/query-43c79c431bea6029ee2d2dc5997ab00ee6db214f683a1c877b08e27381148a91.json new file mode 100644 index 0000000..55decd6 --- /dev/null +++ b/.sqlx/query-43c79c431bea6029ee2d2dc5997ab00ee6db214f683a1c877b08e27381148a91.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT EXISTS (\n SELECT 1\n FROM cqrs_ordering_line_item_query\n WHERE\n line_item_id = $1\n );", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "exists", + "type_info": "Bool" + } + ], + "parameters": { + "Left": [ + "Uuid" + ] + }, + "nullable": [ + null + ] + }, + "hash": "43c79c431bea6029ee2d2dc5997ab00ee6db214f683a1c877b08e27381148a91" +} diff --git a/.sqlx/query-46fddc14a06f84a15fbcc04cfff9d3f41e03c73e2db7d7ee39a0f3e86cc38fe9.json b/.sqlx/query-46fddc14a06f84a15fbcc04cfff9d3f41e03c73e2db7d7ee39a0f3e86cc38fe9.json new file mode 100644 index 0000000..b89ea4b --- /dev/null +++ b/.sqlx/query-46fddc14a06f84a15fbcc04cfff9d3f41e03c73e2db7d7ee39a0f3e86cc38fe9.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "UPDATE\n cqrs_ordering_line_item_query\n SET\n version = $1,\n product_name = $2,\n product_id = $3,\n line_item_id = $4,\n quantity_minor_unit = $5,\n quantity_minor_number = $6,\n quantity_major_unit = $7,\n quantity_major_number = $8,\n deleted = $9;", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int8", + "Text", + "Uuid", + "Uuid", + "Text", + "Int4", + "Text", + "Int4", + "Bool" + ] + }, + "nullable": [] + }, + "hash": "46fddc14a06f84a15fbcc04cfff9d3f41e03c73e2db7d7ee39a0f3e86cc38fe9" +} diff --git a/.sqlx/query-da317770b9f4874f805c9edea8a7fed98c2186a30ac370864a4db2d64cc14b75.json b/.sqlx/query-da317770b9f4874f805c9edea8a7fed98c2186a30ac370864a4db2d64cc14b75.json new file mode 100644 index 0000000..cc0ef7b --- /dev/null +++ b/.sqlx/query-da317770b9f4874f805c9edea8a7fed98c2186a30ac370864a4db2d64cc14b75.json @@ -0,0 +1,46 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT\n name, description, store_id, category_id, deleted\n FROM\n cqrs_inventory_category_query\n WHERE\n category_id = $1;", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "name", + "type_info": "Text" + }, + { + "ordinal": 1, + "name": "description", + "type_info": "Text" + }, + { + "ordinal": 2, + "name": "store_id", + "type_info": "Uuid" + }, + { + "ordinal": 3, + "name": "category_id", + "type_info": "Uuid" + }, + { + "ordinal": 4, + "name": "deleted", + "type_info": "Bool" + } + ], + "parameters": { + "Left": [ + "Uuid" + ] + }, + "nullable": [ + false, + true, + false, + false, + false + ] + }, + "hash": "da317770b9f4874f805c9edea8a7fed98c2186a30ac370864a4db2d64cc14b75" +} diff --git a/.sqlx/query-ea58498982a42d17c51828387df4b446f9cdc89986fd49c8f9d736eeeda12f48.json b/.sqlx/query-ea58498982a42d17c51828387df4b446f9cdc89986fd49c8f9d736eeeda12f48.json new file mode 100644 index 0000000..90c6ec3 --- /dev/null +++ b/.sqlx/query-ea58498982a42d17c51828387df4b446f9cdc89986fd49c8f9d736eeeda12f48.json @@ -0,0 +1,28 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT \n line_item_id, version\n FROM\n cqrs_ordering_line_item_query\n WHERE\n line_item_id = $1;", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "line_item_id", + "type_info": "Uuid" + }, + { + "ordinal": 1, + "name": "version", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + "Uuid" + ] + }, + "nullable": [ + false, + false + ] + }, + "hash": "ea58498982a42d17c51828387df4b446f9cdc89986fd49c8f9d736eeeda12f48" +} diff --git a/migrations/20240723095912_cqrs_ordering_line_item_query.sql b/migrations/20240723095912_cqrs_ordering_line_item_query.sql new file mode 100644 index 0000000..232c34a --- /dev/null +++ b/migrations/20240723095912_cqrs_ordering_line_item_query.sql @@ -0,0 +1,23 @@ +-- SPDX-FileCopyrightText: 2024 Aravinth Manivannan +-- +-- SPDX-License-Identifier: AGPL-3.0-or-later + +CREATE TABLE IF NOT EXISTS cqrs_ordering_line_item_query +( + version bigint CHECK (version >= 0) NOT NULL, + + sale_time timestamp with time zone DEFAULT (CURRENT_TIMESTAMP), + line_item_id UUID NOT NULL UNIQUE, + + product_name TEXT NOT NULL, + product_id UUID NOT NULL, + + quantity_major_number INTEGER NOT NULL, + quantity_minor_number INTEGER NOT NULL, + quantity_major_unit TEXT NOT NULL, + quantity_minor_unit TEXT NOT NULL, + + deleted BOOLEAN NOT NULL DEFAULT FALSE, + + PRIMARY KEY (line_item_id) +); diff --git a/src/ordering/adapters/output/db/line_item_view.rs b/src/ordering/adapters/output/db/line_item_view.rs new file mode 100644 index 0000000..bb012ad --- /dev/null +++ b/src/ordering/adapters/output/db/line_item_view.rs @@ -0,0 +1,269 @@ +// SPDX-FileCopyrightText: 2024 Aravinth Manivannan +// +// SPDX-License-Identifier: AGPL-3.0-or-later + +use std::str::FromStr; + +use async_trait::async_trait; +use cqrs_es::persist::{PersistenceError, ViewContext, ViewRepository}; +use cqrs_es::{EventEnvelope, Query, View}; +use serde::{Deserialize, Serialize}; +use uuid::Uuid; + +use super::errors::*; +use super::OrderingDBPostgresAdapter; +use crate::ordering::domain::events::OrderingEvent; +use crate::ordering::domain::line_item_aggregate::*; +use crate::types::quantity::*; +use crate::utils::parse_aggregate_id::parse_aggregate_id; + +pub const NEW_LINE_ITEM_NON_UUID: &str = "new_line_item_non_uuid-asdfa"; + +// The view for a LineItem query, for a standard http application this should +// be designed to reflect the response dto that will be returned to a user. +#[derive(Debug, Default, Serialize, Deserialize)] +pub struct LineItemView { + product_name: String, + product_id: Uuid, + + line_item_id: Uuid, + + quantity_major_number: i32, + quantity_minor_number: i32, + quantity_major_unit: String, + quantity_minor_unit: String, + + deleted: bool, +} + +impl From for LineItem { + fn from(v: LineItemView) -> Self { + let quantity = QuantityBuilder::default() + .minor( + QuantityPartBuilder::default() + .number(v.quantity_minor_number as usize) + .unit(QuantityUnit::from_str(&v.quantity_minor_unit).unwrap()) + .build() + .unwrap(), + ) + .major( + QuantityPartBuilder::default() + .number(v.quantity_major_number as usize) + .unit(QuantityUnit::from_str(&v.quantity_major_unit).unwrap()) + .build() + .unwrap(), + ) + .build() + .unwrap(); + + LineItemBuilder::default() + .product_name(v.product_name) + .line_item_id(v.line_item_id) + .quantity(quantity) + .product_id(v.product_id) + .deleted(v.deleted) + .build() + .unwrap() + } +} + +// This updates the view with events as they are committed. +// The logic should be minimal here, e.g., don't calculate the account balance, +// design the events to carry the balance information instead. +impl View for LineItemView { + fn update(&mut self, event: &EventEnvelope) { + match &event.payload { + OrderingEvent::LineItemAdded(val) => { + self.product_name = val.line_item().product_name().into(); + self.product_id = *val.line_item().product_id(); + self.line_item_id = *val.line_item().line_item_id(); + + self.quantity_major_number = *val.line_item().quantity().major().number() as i32; + self.quantity_minor_number = *val.line_item().quantity().minor().number() as i32; + self.quantity_major_unit = val.line_item().quantity().major().unit().to_string(); + self.quantity_minor_unit = val.line_item().quantity().minor().unit().to_string(); + + self.deleted = false; + } + _ => (), + } + } +} + +#[async_trait] +impl ViewRepository for OrderingDBPostgresAdapter { + async fn load(&self, line_item_id: &str) -> Result, PersistenceError> { + let line_item_id = match parse_aggregate_id(line_item_id, NEW_LINE_ITEM_NON_UUID)? { + Some((val, _)) => return Ok(Some(val)), + None => Uuid::parse_str(line_item_id).unwrap(), + }; + + let res = sqlx::query_as!( + LineItemView, + "SELECT + product_name, + product_id, + line_item_id, + quantity_minor_unit, + quantity_minor_number, + quantity_major_unit, + quantity_major_number, + deleted + FROM + cqrs_ordering_line_item_query + WHERE + line_item_id = $1;", + line_item_id + ) + .fetch_one(&self.pool) + .await + .map_err(PostgresAggregateError::from)?; + + Ok(Some(res)) + } + + async fn load_with_context( + &self, + line_item_id: &str, + ) -> Result, PersistenceError> { + let line_item_id = match parse_aggregate_id(line_item_id, NEW_LINE_ITEM_NON_UUID)? { + Some(val) => return Ok(Some(val)), + None => Uuid::parse_str(line_item_id).unwrap(), + }; + + let res = sqlx::query_as!( + LineItemView, + "SELECT + product_name, + product_id, + line_item_id, + quantity_minor_unit, + quantity_minor_number, + quantity_major_unit, + quantity_major_number, + deleted + FROM + cqrs_ordering_line_item_query + WHERE + line_item_id = $1;", + line_item_id + ) + .fetch_one(&self.pool) + .await + .map_err(PostgresAggregateError::from)?; + + struct Context { + version: i64, + line_item_id: Uuid, + } + + let ctx = sqlx::query_as!( + Context, + "SELECT + line_item_id, version + FROM + cqrs_ordering_line_item_query + WHERE + line_item_id = $1;", + line_item_id + ) + .fetch_one(&self.pool) + .await + .map_err(PostgresAggregateError::from)?; + + let view_context = ViewContext::new(ctx.line_item_id.to_string(), ctx.version); + Ok(Some((res, view_context))) + } + + async fn update_view( + &self, + view: LineItemView, + context: ViewContext, + ) -> Result<(), PersistenceError> { + match context.version { + 0 => { + let version = context.version + 1; + sqlx::query!( + "INSERT INTO cqrs_ordering_line_item_query ( + version, + product_name, + product_id, + line_item_id, + quantity_minor_unit, + quantity_minor_number, + quantity_major_unit, + quantity_major_number, + + deleted + ) VALUES ( + $1, $2, $3, $4, $5, $6, $7, $8, $9 + );", + version, + view.product_name, + view.product_id, + view.line_item_id, + view.quantity_minor_unit, + view.quantity_minor_number, + view.quantity_major_unit, + view.quantity_major_number, + view.deleted, + ) + .execute(&self.pool) + .await + .map_err(PostgresAggregateError::from)?; + } + + _ => { + let version = context.version + 1; + sqlx::query!( + "UPDATE + cqrs_ordering_line_item_query + SET + version = $1, + product_name = $2, + product_id = $3, + line_item_id = $4, + quantity_minor_unit = $5, + quantity_minor_number = $6, + quantity_major_unit = $7, + quantity_major_number = $8, + deleted = $9;", + version, + view.product_name, + view.product_id, + view.line_item_id, + view.quantity_minor_unit, + view.quantity_minor_number, + view.quantity_major_unit, + view.quantity_major_number, + view.deleted, + ) + .execute(&self.pool) + .await + .map_err(PostgresAggregateError::from)?; + } + } + + Ok(()) + } +} + +#[async_trait] +impl Query for OrderingDBPostgresAdapter { + async fn dispatch(&self, line_item_id: &str, events: &[EventEnvelope]) { + let res = self + .load_with_context(line_item_id) + .await + .unwrap_or_else(|_| { + Some(( + LineItemView::default(), + ViewContext::new(line_item_id.into(), 0), + )) + }); + let (mut view, view_context): (LineItemView, ViewContext) = res.unwrap(); + for event in events { + view.update(event); + } + self.update_view(view, view_context).await.unwrap(); + } +}