diff --git a/src/ordering/adapters/output/db/kot_view.rs b/src/ordering/adapters/output/db/kot_view.rs new file mode 100644 index 0000000..5bf5621 --- /dev/null +++ b/src/ordering/adapters/output/db/kot_view.rs @@ -0,0 +1,225 @@ +// SPDX-FileCopyrightText: 2024 Aravinth Manivannan +// +// SPDX-License-Identifier: AGPL-3.0-or-later + +use std::str::FromStr; + +use async_trait::async_trait; +use cqrs_es::persist::{PersistenceError, ViewContext, ViewRepository}; +use cqrs_es::{EventEnvelope, Query, View}; +use serde::{Deserialize, Serialize}; +use time::OffsetDateTime; +use uuid::Uuid; + +use super::errors::*; +use super::OrderingDBPostgresAdapter; +use crate::ordering::domain::events::OrderingEvent; +use crate::ordering::domain::kot_aggregate::*; +use crate::types::quantity::*; +use crate::utils::parse_aggregate_id::parse_aggregate_id; + +pub const NEW_KOT_NON_UUID: &str = "new_kot_non_uuid-asdfa"; + +// The view for a Kot query, for a standard http application this should +// be designed to reflect the response dto that will be returned to a user. +#[derive(Debug, Serialize, Deserialize)] +pub struct KotView { + order_id: Uuid, + created_time: OffsetDateTime, + + kot_id: Uuid, + + deleted: bool, +} + +impl Default for KotView { + fn default() -> Self { + Self { + created_time: OffsetDateTime::now_utc(), + order_id: Default::default(), + + kot_id: Default::default(), + + deleted: false, + } + } +} + +impl From for Kot { + fn from(v: KotView) -> Self { + KotBuilder::default() + .kot_id(v.kot_id) + .created_time(v.created_time) + .order_id(v.order_id) + .deleted(v.deleted) + .build() + .unwrap() + } +} + +// This updates the view with events as they are committed. +// The logic should be minimal here, e.g., don't calculate the account balance, +// design the events to carry the balance information instead. +impl View for KotView { + fn update(&mut self, event: &EventEnvelope) { + match &event.payload { + OrderingEvent::KotAdded(val) => { + self.order_id = *val.kot().order_id(); + self.kot_id = *val.kot().kot_id(); + + self.created_time = val.kot().created_time().clone(); + + self.deleted = false; + } + _ => (), + } + } +} + +#[async_trait] +impl ViewRepository for OrderingDBPostgresAdapter { + async fn load(&self, kot_id: &str) -> Result, PersistenceError> { + let kot_id = match parse_aggregate_id(kot_id, NEW_KOT_NON_UUID)? { + Some((val, _)) => return Ok(Some(val)), + None => Uuid::parse_str(kot_id).unwrap(), + }; + + let res = sqlx::query_as!( + KotView, + "SELECT + order_id, + kot_id, + created_time, + deleted + FROM + cqrs_ordering_kot_query + WHERE + kot_id = $1;", + kot_id + ) + .fetch_one(&self.pool) + .await + .map_err(PostgresAggregateError::from)?; + + Ok(Some(res)) + } + + async fn load_with_context( + &self, + kot_id: &str, + ) -> Result, PersistenceError> { + let kot_id = match parse_aggregate_id(kot_id, NEW_KOT_NON_UUID)? { + Some(val) => return Ok(Some(val)), + None => Uuid::parse_str(kot_id).unwrap(), + }; + + let res = sqlx::query_as!( + KotView, + "SELECT + order_id, + kot_id, + created_time, + deleted + FROM + cqrs_ordering_kot_query + WHERE + kot_id = $1;", + kot_id + ) + .fetch_one(&self.pool) + .await + .map_err(PostgresAggregateError::from)?; + + struct Context { + version: i64, + kot_id: Uuid, + } + + let ctx = sqlx::query_as!( + Context, + "SELECT + kot_id, version + FROM + cqrs_ordering_kot_query + WHERE + kot_id = $1;", + kot_id + ) + .fetch_one(&self.pool) + .await + .map_err(PostgresAggregateError::from)?; + + let view_context = ViewContext::new(ctx.kot_id.to_string(), ctx.version); + Ok(Some((res, view_context))) + } + + async fn update_view( + &self, + view: KotView, + context: ViewContext, + ) -> Result<(), PersistenceError> { + match context.version { + 0 => { + let version = context.version + 1; + sqlx::query!( + "INSERT INTO cqrs_ordering_kot_query ( + version, + order_id, + kot_id, + created_time, + deleted + ) VALUES ( + $1, $2, $3, $4, $5 + );", + version, + view.order_id, + view.kot_id, + view.created_time, + view.deleted, + ) + .execute(&self.pool) + .await + .map_err(PostgresAggregateError::from)?; + } + + _ => { + let version = context.version + 1; + sqlx::query!( + "UPDATE + cqrs_ordering_kot_query + SET + version = $1, + order_id = $2, + kot_id = $3, + created_time = $4, + deleted = $5;", + version, + view.order_id, + view.kot_id, + view.created_time, + view.deleted, + ) + .execute(&self.pool) + .await + .map_err(PostgresAggregateError::from)?; + } + } + + Ok(()) + } +} + +#[async_trait] +impl Query for OrderingDBPostgresAdapter { + async fn dispatch(&self, kot_id: &str, events: &[EventEnvelope]) { + let res = self + .load_with_context(kot_id) + .await + .unwrap_or_else(|_| Some((KotView::default(), ViewContext::new(kot_id.into(), 0)))); + let (mut view, view_context): (KotView, ViewContext) = res.unwrap(); + for event in events { + view.update(event); + } + self.update_view(view, view_context).await.unwrap(); + } +} diff --git a/src/ordering/adapters/output/db/mod.rs b/src/ordering/adapters/output/db/mod.rs index dca55ef..2b87819 100644 --- a/src/ordering/adapters/output/db/mod.rs +++ b/src/ordering/adapters/output/db/mod.rs @@ -9,6 +9,8 @@ use sqlx::postgres::PgPool; use crate::db::{migrate::RunMigrations, sqlx_postgres::Postgres}; mod errors; +mod kot_id_exists; +mod kot_view; mod line_item_id_exists; mod line_item_view; mod order_id_exists;