fix: use UUID based IDs to identify records instead of view_id in queries #29
24 changed files with 318 additions and 190 deletions
|
@ -1,11 +1,10 @@
|
|||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "INSERT INTO cqrs_inventory_category_query (\n view_id, version, name, description, category_id, store_id\n ) VALUES (\n $1, $2, $3, $4, $5, $6\n );",
|
||||
"query": "INSERT INTO cqrs_inventory_category_query (\n version, name, description, category_id, store_id\n ) VALUES (\n $1, $2, $3, $4, $5\n );",
|
||||
"describe": {
|
||||
"columns": [],
|
||||
"parameters": {
|
||||
"Left": [
|
||||
"Text",
|
||||
"Int8",
|
||||
"Text",
|
||||
"Text",
|
||||
|
@ -15,5 +14,5 @@
|
|||
},
|
||||
"nullable": []
|
||||
},
|
||||
"hash": "e66276ae53a3155b2a682a451bf4800f2ec7f777cc822cbe0866105def3e11af"
|
||||
"hash": "2ca5f8ca1b3ac04175346c1d7e57a37ac4ccf493ebced9cf690f4030e78bd439"
|
||||
}
|
|
@ -0,0 +1,28 @@
|
|||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "SELECT \n category_id, version\n FROM\n cqrs_inventory_category_query\n WHERE\n category_id = $1;",
|
||||
"describe": {
|
||||
"columns": [
|
||||
{
|
||||
"ordinal": 0,
|
||||
"name": "category_id",
|
||||
"type_info": "Uuid"
|
||||
},
|
||||
{
|
||||
"ordinal": 1,
|
||||
"name": "version",
|
||||
"type_info": "Int8"
|
||||
}
|
||||
],
|
||||
"parameters": {
|
||||
"Left": [
|
||||
"Uuid"
|
||||
]
|
||||
},
|
||||
"nullable": [
|
||||
false,
|
||||
false
|
||||
]
|
||||
},
|
||||
"hash": "2dbbf0eb4388333b57ad5bda586ada6797292064485b1f3ce342d7eb0cb3ccf4"
|
||||
}
|
|
@ -0,0 +1,18 @@
|
|||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "UPDATE\n cqrs_inventory_store_query\n SET\n version = $1,\n name = $2,\n address = $3,\n store_id = $4,\n owner = $5;",
|
||||
"describe": {
|
||||
"columns": [],
|
||||
"parameters": {
|
||||
"Left": [
|
||||
"Int8",
|
||||
"Text",
|
||||
"Text",
|
||||
"Uuid",
|
||||
"Text"
|
||||
]
|
||||
},
|
||||
"nullable": []
|
||||
},
|
||||
"hash": "48aa5f3eacaaec4a74ba5d2908875fcac797644d104d3d580715587ecb2e2119"
|
||||
}
|
|
@ -1,6 +1,6 @@
|
|||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "SELECT \n name, address, store_id, owner\n FROM\n cqrs_inventory_store_query\n WHERE\n view_id = $1;",
|
||||
"query": "SELECT \n name, address, store_id, owner\n FROM\n cqrs_inventory_store_query\n WHERE\n store_id = $1;",
|
||||
"describe": {
|
||||
"columns": [
|
||||
{
|
||||
|
@ -26,7 +26,7 @@
|
|||
],
|
||||
"parameters": {
|
||||
"Left": [
|
||||
"Text"
|
||||
"Uuid"
|
||||
]
|
||||
},
|
||||
"nullable": [
|
||||
|
@ -36,5 +36,5 @@
|
|||
false
|
||||
]
|
||||
},
|
||||
"hash": "4d7b12d7ff5e008054bab7aa61d6ba1f559293608196e540788947789cad244c"
|
||||
"hash": "5385ba9531992b91670e52b5332af1f09069e1d6d168f9e93b729cf964e97c35"
|
||||
}
|
|
@ -1,28 +0,0 @@
|
|||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "SELECT \n view_id, version\n FROM\n cqrs_inventory_category_query\n WHERE\n view_id = $1;",
|
||||
"describe": {
|
||||
"columns": [
|
||||
{
|
||||
"ordinal": 0,
|
||||
"name": "view_id",
|
||||
"type_info": "Text"
|
||||
},
|
||||
{
|
||||
"ordinal": 1,
|
||||
"name": "version",
|
||||
"type_info": "Int8"
|
||||
}
|
||||
],
|
||||
"parameters": {
|
||||
"Left": [
|
||||
"Text"
|
||||
]
|
||||
},
|
||||
"nullable": [
|
||||
false,
|
||||
false
|
||||
]
|
||||
},
|
||||
"hash": "74313c3fbf8a5985b6deae21b56469fb66ddb078d016fd6f05cb5e62ef0b23d5"
|
||||
}
|
|
@ -1,11 +1,10 @@
|
|||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "INSERT INTO cqrs_inventory_store_query (\n view_id, version, name, address, store_id, owner\n ) VALUES (\n $1, $2, $3, $4, $5, $6\n );",
|
||||
"query": "INSERT INTO cqrs_inventory_store_query (\n version, name, address, store_id, owner\n ) VALUES (\n $1, $2, $3, $4, $5\n );",
|
||||
"describe": {
|
||||
"columns": [],
|
||||
"parameters": {
|
||||
"Left": [
|
||||
"Text",
|
||||
"Int8",
|
||||
"Text",
|
||||
"Text",
|
||||
|
@ -15,5 +14,5 @@
|
|||
},
|
||||
"nullable": []
|
||||
},
|
||||
"hash": "eb97288edbbbe8c5f30454040584a3e182ba5f4a303c142668956be3846c6d18"
|
||||
"hash": "89ebb8b413db2c4da15adff8a09d04d5849c3990f0965de479b76630317f8a6e"
|
||||
}
|
|
@ -0,0 +1,28 @@
|
|||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "SELECT \n store_id, version\n FROM\n cqrs_inventory_store_query\n WHERE\n store_id = $1;",
|
||||
"describe": {
|
||||
"columns": [
|
||||
{
|
||||
"ordinal": 0,
|
||||
"name": "store_id",
|
||||
"type_info": "Uuid"
|
||||
},
|
||||
{
|
||||
"ordinal": 1,
|
||||
"name": "version",
|
||||
"type_info": "Int8"
|
||||
}
|
||||
],
|
||||
"parameters": {
|
||||
"Left": [
|
||||
"Uuid"
|
||||
]
|
||||
},
|
||||
"nullable": [
|
||||
false,
|
||||
false
|
||||
]
|
||||
},
|
||||
"hash": "a392b3e1146beac19a37da465bf567aa41b8a7ef1fbdba42116d90aa000584b8"
|
||||
}
|
|
@ -0,0 +1,22 @@
|
|||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "SELECT EXISTS (\n SELECT 1\n FROM cqrs_inventory_store_query\n WHERE\n name = $1\n );",
|
||||
"describe": {
|
||||
"columns": [
|
||||
{
|
||||
"ordinal": 0,
|
||||
"name": "exists",
|
||||
"type_info": "Bool"
|
||||
}
|
||||
],
|
||||
"parameters": {
|
||||
"Left": [
|
||||
"Text"
|
||||
]
|
||||
},
|
||||
"nullable": [
|
||||
null
|
||||
]
|
||||
},
|
||||
"hash": "b84fb940a2aa673d3b19b2317e1e3436d3983e8a32400bbcefd91efb945190cd"
|
||||
}
|
|
@ -1,19 +0,0 @@
|
|||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "UPDATE\n cqrs_inventory_store_query\n SET\n view_id = $1,\n version = $2,\n name = $3,\n address = $4,\n store_id = $5,\n owner = $6;",
|
||||
"describe": {
|
||||
"columns": [],
|
||||
"parameters": {
|
||||
"Left": [
|
||||
"Text",
|
||||
"Int8",
|
||||
"Text",
|
||||
"Text",
|
||||
"Uuid",
|
||||
"Text"
|
||||
]
|
||||
},
|
||||
"nullable": []
|
||||
},
|
||||
"hash": "bb92f2d0c882653e92f2837353c1def62da1715c65214a4d404a5ce551c501bd"
|
||||
}
|
|
@ -1,19 +0,0 @@
|
|||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "UPDATE\n cqrs_inventory_category_query\n SET\n view_id = $1,\n version = $2,\n name = $3,\n description = $4,\n category_id = $5,\n store_id = $6;",
|
||||
"describe": {
|
||||
"columns": [],
|
||||
"parameters": {
|
||||
"Left": [
|
||||
"Text",
|
||||
"Int8",
|
||||
"Text",
|
||||
"Text",
|
||||
"Uuid",
|
||||
"Uuid"
|
||||
]
|
||||
},
|
||||
"nullable": []
|
||||
},
|
||||
"hash": "c524c7fc3281e8ffe5d3d6f237d49cae596a9f621e6a64c31270d744406271ec"
|
||||
}
|
|
@ -1,28 +0,0 @@
|
|||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "SELECT \n view_id, version\n FROM\n cqrs_inventory_store_query\n WHERE\n view_id = $1;",
|
||||
"describe": {
|
||||
"columns": [
|
||||
{
|
||||
"ordinal": 0,
|
||||
"name": "view_id",
|
||||
"type_info": "Text"
|
||||
},
|
||||
{
|
||||
"ordinal": 1,
|
||||
"name": "version",
|
||||
"type_info": "Int8"
|
||||
}
|
||||
],
|
||||
"parameters": {
|
||||
"Left": [
|
||||
"Text"
|
||||
]
|
||||
},
|
||||
"nullable": [
|
||||
false,
|
||||
false
|
||||
]
|
||||
},
|
||||
"hash": "c6e90aa1e4fc851ac37318ada15859bc6e2fb1a75da0ed248debf126ba39c1d1"
|
||||
}
|
|
@ -1,6 +1,6 @@
|
|||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "SELECT \n name, description, category_id, store_id\n FROM\n cqrs_inventory_category_query\n WHERE\n view_id = $1;",
|
||||
"query": "SELECT \n name, description, category_id, store_id\n FROM\n cqrs_inventory_category_query\n WHERE\n category_id = $1;",
|
||||
"describe": {
|
||||
"columns": [
|
||||
{
|
||||
|
@ -26,7 +26,7 @@
|
|||
],
|
||||
"parameters": {
|
||||
"Left": [
|
||||
"Text"
|
||||
"Uuid"
|
||||
]
|
||||
},
|
||||
"nullable": [
|
||||
|
@ -36,5 +36,5 @@
|
|||
false
|
||||
]
|
||||
},
|
||||
"hash": "1e9dcba9a2b5a7e0bbb2b8c0d87166c24567420c5f7579aaa12b9d3d60c4e24d"
|
||||
"hash": "d396a3ccbe58a02acc0710700274eb3d28b2a1fe005190cd3599c2772dd01f7d"
|
||||
}
|
|
@ -0,0 +1,18 @@
|
|||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "UPDATE\n cqrs_inventory_category_query\n SET\n version = $1,\n name = $2,\n description = $3,\n category_id = $4,\n store_id = $5;",
|
||||
"describe": {
|
||||
"columns": [],
|
||||
"parameters": {
|
||||
"Left": [
|
||||
"Int8",
|
||||
"Text",
|
||||
"Text",
|
||||
"Uuid",
|
||||
"Uuid"
|
||||
]
|
||||
},
|
||||
"nullable": []
|
||||
},
|
||||
"hash": "fed365b25499b05fbeb73416310b4c5ee1d3beb6dda94290c80c5ae038894116"
|
||||
}
|
|
@ -2,7 +2,7 @@
|
|||
--
|
||||
-- SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
|
||||
CREATE TABLE IF NOT EXISTS user_events
|
||||
CREATE TABLE IF NOT EXISTS events
|
||||
(
|
||||
aggregate_type text NOT NULL,
|
||||
aggregate_id text NOT NULL,
|
||||
|
|
|
@ -2,22 +2,8 @@
|
|||
--
|
||||
-- SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
|
||||
CREATE TABLE IF NOT EXISTS categoty_events
|
||||
(
|
||||
aggregate_type text NOT NULL,
|
||||
aggregate_id text NOT NULL,
|
||||
sequence bigint CHECK (sequence >= 0) NOT NULL,
|
||||
event_type text NOT NULL,
|
||||
event_version text NOT NULL,
|
||||
payload json NOT NULL,
|
||||
metadata json NOT NULL,
|
||||
timestamp timestamp with time zone DEFAULT (CURRENT_TIMESTAMP),
|
||||
PRIMARY KEY (aggregate_type, aggregate_id, sequence)
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS cqrs_inventory_category_query
|
||||
(
|
||||
view_id text NOT NULL,
|
||||
version bigint CHECK (version >= 0) NOT NULL,
|
||||
|
||||
name TEXT NOT NULL,
|
||||
|
@ -27,7 +13,7 @@ CREATE TABLE IF NOT EXISTS cqrs_inventory_category_query
|
|||
category_id UUID NOT NULL UNIQUE,
|
||||
UNIQUE(store_id, name),
|
||||
|
||||
PRIMARY KEY (view_id)
|
||||
PRIMARY KEY (category_id)
|
||||
);
|
||||
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS
|
||||
|
|
|
@ -2,22 +2,8 @@
|
|||
--
|
||||
-- SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
|
||||
CREATE TABLE IF NOT EXISTS categoty_events
|
||||
(
|
||||
aggregate_type text NOT NULL,
|
||||
aggregate_id text NOT NULL,
|
||||
sequence bigint CHECK (sequence >= 0) NOT NULL,
|
||||
event_type text NOT NULL,
|
||||
event_version text NOT NULL,
|
||||
payload json NOT NULL,
|
||||
metadata json NOT NULL,
|
||||
timestamp timestamp with time zone DEFAULT (CURRENT_TIMESTAMP),
|
||||
PRIMARY KEY (aggregate_type, aggregate_id, sequence)
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS cqrs_inventory_store_query
|
||||
(
|
||||
view_id text NOT NULL,
|
||||
version bigint CHECK (version >= 0) NOT NULL,
|
||||
|
||||
name TEXT NOT NULL,
|
||||
|
@ -26,7 +12,7 @@ CREATE TABLE IF NOT EXISTS cqrs_inventory_store_query
|
|||
store_id UUID NOT NULL UNIQUE,
|
||||
deleted BOOLEAN NOT NULL DEFAULT FALSE,
|
||||
|
||||
PRIMARY KEY (view_id)
|
||||
PRIMARY KEY (store_id)
|
||||
);
|
||||
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS store_store_id_index ON cqrs_inventory_store_query (store_id);
|
||||
|
|
|
@ -59,9 +59,8 @@ mod tests {
|
|||
|
||||
sqlx::query!(
|
||||
"INSERT INTO cqrs_inventory_category_query
|
||||
(view_id, version, name, description, category_id, store_id)
|
||||
VALUES ($1, $2, $3, $4, $5, $6);",
|
||||
"1",
|
||||
(version, name, description, category_id, store_id)
|
||||
VALUES ($1, $2, $3, $4, $5);",
|
||||
1,
|
||||
category.name(),
|
||||
category.description().as_ref().unwrap(),
|
||||
|
|
|
@ -64,9 +64,8 @@ mod tests {
|
|||
|
||||
sqlx::query!(
|
||||
"INSERT INTO cqrs_inventory_category_query
|
||||
(view_id, version, name, description, category_id, store_id)
|
||||
VALUES ($1, $2, $3, $4, $5, $6);",
|
||||
"1",
|
||||
(version, name, description, category_id, store_id)
|
||||
VALUES ($1, $2, $3, $4, $5);",
|
||||
1,
|
||||
category.name(),
|
||||
category.description().as_ref().unwrap(),
|
||||
|
|
|
@ -14,6 +14,8 @@ use crate::inventory::domain::category_aggregate::Category;
|
|||
use crate::inventory::domain::events::InventoryEvent;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
pub const NEW_CATEGORY_NON_UUID: &str = "new_category_non_uuid-asdfa";
|
||||
|
||||
// The view for a Category query, for a standard http application this should
|
||||
// be designed to reflect the response dto that will be returned to a user.
|
||||
#[derive(Debug, Default, Serialize, Deserialize)]
|
||||
|
@ -43,7 +45,13 @@ impl View<Category> for CategoryView {
|
|||
|
||||
#[async_trait]
|
||||
impl ViewRepository<CategoryView, Category> for InventoryDBPostgresAdapter {
|
||||
async fn load(&self, view_id: &str) -> Result<Option<CategoryView>, PersistenceError> {
|
||||
async fn load(&self, category_id: &str) -> Result<Option<CategoryView>, PersistenceError> {
|
||||
let category_id =
|
||||
match super::utils::parse_aggregate_id(category_id, NEW_CATEGORY_NON_UUID)? {
|
||||
Some((val, _)) => return Ok(Some(val)),
|
||||
None => Uuid::parse_str(category_id).unwrap(),
|
||||
};
|
||||
|
||||
let res = sqlx::query_as!(
|
||||
CategoryView,
|
||||
"SELECT
|
||||
|
@ -51,8 +59,8 @@ impl ViewRepository<CategoryView, Category> for InventoryDBPostgresAdapter {
|
|||
FROM
|
||||
cqrs_inventory_category_query
|
||||
WHERE
|
||||
view_id = $1;",
|
||||
view_id
|
||||
category_id = $1;",
|
||||
category_id
|
||||
)
|
||||
.fetch_one(&self.pool)
|
||||
.await
|
||||
|
@ -62,8 +70,14 @@ impl ViewRepository<CategoryView, Category> for InventoryDBPostgresAdapter {
|
|||
|
||||
async fn load_with_context(
|
||||
&self,
|
||||
view_id: &str,
|
||||
category_id: &str,
|
||||
) -> Result<Option<(CategoryView, ViewContext)>, PersistenceError> {
|
||||
let category_id =
|
||||
match super::utils::parse_aggregate_id(category_id, NEW_CATEGORY_NON_UUID)? {
|
||||
Some(val) => return Ok(Some(val)),
|
||||
None => Uuid::parse_str(category_id).unwrap(),
|
||||
};
|
||||
|
||||
let res = sqlx::query_as!(
|
||||
CategoryView,
|
||||
"SELECT
|
||||
|
@ -71,8 +85,8 @@ impl ViewRepository<CategoryView, Category> for InventoryDBPostgresAdapter {
|
|||
FROM
|
||||
cqrs_inventory_category_query
|
||||
WHERE
|
||||
view_id = $1;",
|
||||
view_id
|
||||
category_id = $1;",
|
||||
category_id
|
||||
)
|
||||
.fetch_one(&self.pool)
|
||||
.await
|
||||
|
@ -80,24 +94,24 @@ impl ViewRepository<CategoryView, Category> for InventoryDBPostgresAdapter {
|
|||
|
||||
struct Context {
|
||||
version: i64,
|
||||
view_id: String,
|
||||
category_id: Uuid,
|
||||
}
|
||||
|
||||
let ctx = sqlx::query_as!(
|
||||
Context,
|
||||
"SELECT
|
||||
view_id, version
|
||||
category_id, version
|
||||
FROM
|
||||
cqrs_inventory_category_query
|
||||
WHERE
|
||||
view_id = $1;",
|
||||
view_id
|
||||
category_id = $1;",
|
||||
category_id
|
||||
)
|
||||
.fetch_one(&self.pool)
|
||||
.await
|
||||
.map_err(PostgresAggregateError::from)?;
|
||||
|
||||
let view_context = ViewContext::new(ctx.view_id, ctx.version);
|
||||
let view_context = ViewContext::new(ctx.category_id.to_string(), ctx.version);
|
||||
Ok(Some((res, view_context)))
|
||||
}
|
||||
|
||||
|
@ -111,11 +125,10 @@ impl ViewRepository<CategoryView, Category> for InventoryDBPostgresAdapter {
|
|||
let version = context.version + 1;
|
||||
sqlx::query!(
|
||||
"INSERT INTO cqrs_inventory_category_query (
|
||||
view_id, version, name, description, category_id, store_id
|
||||
version, name, description, category_id, store_id
|
||||
) VALUES (
|
||||
$1, $2, $3, $4, $5, $6
|
||||
$1, $2, $3, $4, $5
|
||||
);",
|
||||
context.view_instance_id,
|
||||
version,
|
||||
view.name,
|
||||
view.description,
|
||||
|
@ -132,13 +145,11 @@ impl ViewRepository<CategoryView, Category> for InventoryDBPostgresAdapter {
|
|||
"UPDATE
|
||||
cqrs_inventory_category_query
|
||||
SET
|
||||
view_id = $1,
|
||||
version = $2,
|
||||
name = $3,
|
||||
description = $4,
|
||||
category_id = $5,
|
||||
store_id = $6;",
|
||||
context.view_instance_id,
|
||||
version = $1,
|
||||
name = $2,
|
||||
description = $3,
|
||||
category_id = $4,
|
||||
store_id = $5;",
|
||||
version,
|
||||
view.name,
|
||||
view.description,
|
||||
|
|
|
@ -13,7 +13,9 @@ mod category_name_exists_for_store;
|
|||
mod category_view;
|
||||
mod errors;
|
||||
mod store_id_exists;
|
||||
mod store_name_exists;
|
||||
mod store_view;
|
||||
mod utils;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct InventoryDBPostgresAdapter {
|
||||
|
|
|
@ -58,9 +58,8 @@ mod tests {
|
|||
|
||||
sqlx::query!(
|
||||
"INSERT INTO cqrs_inventory_store_query
|
||||
(view_id, version, name, address, store_id, owner)
|
||||
VALUES ($1, $2, $3, $4, $5, $6);",
|
||||
"1",
|
||||
(version, name, address, store_id, owner)
|
||||
VALUES ($1, $2, $3, $4, $5);",
|
||||
1,
|
||||
store.name(),
|
||||
store.address().as_ref().unwrap(),
|
||||
|
|
|
@ -3,7 +3,6 @@
|
|||
// SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
|
||||
use async_trait::async_trait;
|
||||
use cqrs_es::persist::GenericQuery;
|
||||
use cqrs_es::persist::{PersistenceError, ViewContext, ViewRepository};
|
||||
use cqrs_es::{EventEnvelope, Query, View};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
@ -14,6 +13,8 @@ use super::InventoryDBPostgresAdapter;
|
|||
use crate::inventory::domain::events::InventoryEvent;
|
||||
use crate::inventory::domain::store_aggregate::Store;
|
||||
|
||||
pub const NEW_STORE_NON_UUID: &str = "new_store_non_uuid-asdfa";
|
||||
|
||||
// The view for a Store query, for a standard http application this should
|
||||
// be designed to reflect the response dto that will be returned to a user.
|
||||
#[derive(Debug, Default, Serialize, Deserialize)]
|
||||
|
@ -43,7 +44,12 @@ impl View<Store> for StoreView {
|
|||
|
||||
#[async_trait]
|
||||
impl ViewRepository<StoreView, Store> for InventoryDBPostgresAdapter {
|
||||
async fn load(&self, view_id: &str) -> Result<Option<StoreView>, PersistenceError> {
|
||||
async fn load(&self, store_id: &str) -> Result<Option<StoreView>, PersistenceError> {
|
||||
let store_id = match super::utils::parse_aggregate_id(store_id, NEW_STORE_NON_UUID)? {
|
||||
Some((val, _)) => return Ok(Some(val)),
|
||||
None => Uuid::parse_str(store_id).unwrap(),
|
||||
};
|
||||
|
||||
let res = sqlx::query_as!(
|
||||
StoreView,
|
||||
"SELECT
|
||||
|
@ -51,8 +57,8 @@ impl ViewRepository<StoreView, Store> for InventoryDBPostgresAdapter {
|
|||
FROM
|
||||
cqrs_inventory_store_query
|
||||
WHERE
|
||||
view_id = $1;",
|
||||
view_id
|
||||
store_id = $1;",
|
||||
store_id
|
||||
)
|
||||
.fetch_one(&self.pool)
|
||||
.await
|
||||
|
@ -62,8 +68,13 @@ impl ViewRepository<StoreView, Store> for InventoryDBPostgresAdapter {
|
|||
|
||||
async fn load_with_context(
|
||||
&self,
|
||||
view_id: &str,
|
||||
store_id: &str,
|
||||
) -> Result<Option<(StoreView, ViewContext)>, PersistenceError> {
|
||||
let store_id = match super::utils::parse_aggregate_id(store_id, NEW_STORE_NON_UUID)? {
|
||||
Some(val) => return Ok(Some(val)),
|
||||
None => Uuid::parse_str(store_id).unwrap(),
|
||||
};
|
||||
|
||||
let res = sqlx::query_as!(
|
||||
StoreView,
|
||||
"SELECT
|
||||
|
@ -71,8 +82,8 @@ impl ViewRepository<StoreView, Store> for InventoryDBPostgresAdapter {
|
|||
FROM
|
||||
cqrs_inventory_store_query
|
||||
WHERE
|
||||
view_id = $1;",
|
||||
view_id
|
||||
store_id = $1;",
|
||||
&store_id,
|
||||
)
|
||||
.fetch_one(&self.pool)
|
||||
.await
|
||||
|
@ -80,24 +91,24 @@ impl ViewRepository<StoreView, Store> for InventoryDBPostgresAdapter {
|
|||
|
||||
struct Context {
|
||||
version: i64,
|
||||
view_id: String,
|
||||
store_id: Uuid,
|
||||
}
|
||||
|
||||
let ctx = sqlx::query_as!(
|
||||
Context,
|
||||
"SELECT
|
||||
view_id, version
|
||||
store_id, version
|
||||
FROM
|
||||
cqrs_inventory_store_query
|
||||
WHERE
|
||||
view_id = $1;",
|
||||
view_id
|
||||
store_id = $1;",
|
||||
store_id
|
||||
)
|
||||
.fetch_one(&self.pool)
|
||||
.await
|
||||
.map_err(PostgresAggregateError::from)?;
|
||||
|
||||
let view_context = ViewContext::new(ctx.view_id, ctx.version);
|
||||
let view_context = ViewContext::new(ctx.store_id.to_string(), ctx.version);
|
||||
Ok(Some((res, view_context)))
|
||||
}
|
||||
|
||||
|
@ -111,11 +122,10 @@ impl ViewRepository<StoreView, Store> for InventoryDBPostgresAdapter {
|
|||
let version = context.version + 1;
|
||||
sqlx::query!(
|
||||
"INSERT INTO cqrs_inventory_store_query (
|
||||
view_id, version, name, address, store_id, owner
|
||||
version, name, address, store_id, owner
|
||||
) VALUES (
|
||||
$1, $2, $3, $4, $5, $6
|
||||
$1, $2, $3, $4, $5
|
||||
);",
|
||||
context.view_instance_id,
|
||||
version,
|
||||
view.name,
|
||||
view.address,
|
||||
|
@ -132,13 +142,11 @@ impl ViewRepository<StoreView, Store> for InventoryDBPostgresAdapter {
|
|||
"UPDATE
|
||||
cqrs_inventory_store_query
|
||||
SET
|
||||
view_id = $1,
|
||||
version = $2,
|
||||
name = $3,
|
||||
address = $4,
|
||||
store_id = $5,
|
||||
owner = $6;",
|
||||
context.view_instance_id,
|
||||
version = $1,
|
||||
name = $2,
|
||||
address = $3,
|
||||
store_id = $4,
|
||||
owner = $5;",
|
||||
version,
|
||||
view.name,
|
||||
view.address,
|
||||
|
@ -169,7 +177,99 @@ impl Query<Store> for SimpleLoggingQuery {
|
|||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Query<Store> for InventoryDBPostgresAdapter {
|
||||
async fn dispatch(&self, store_id: &str, events: &[EventEnvelope<Store>]) {
|
||||
let res = self
|
||||
.load_with_context(&store_id)
|
||||
.await
|
||||
.unwrap_or_else(|_| Some((StoreView::default(), ViewContext::new(store_id.into(), 0))));
|
||||
let (mut view, view_context): (StoreView, ViewContext) = res.unwrap();
|
||||
for event in events {
|
||||
view.update(event);
|
||||
}
|
||||
self.update_view(view, view_context).await.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
// Our second query, this one will be handled with Postgres `GenericQuery`
|
||||
// which will serialize and persist our view after it is updated. It also
|
||||
// provides a `load` method to deserialize the view on request.
|
||||
pub type StoreQuery = GenericQuery<InventoryDBPostgresAdapter, StoreView, Store>;
|
||||
//pub type StoreQuery = GenericQuery<InventoryDBPostgresAdapter, StoreView, Store>;
|
||||
//pub type StoreQuery = Query<dyn InventoryDBPostgresAdapter, StoreView, Store>;
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
use postgres_es::PostgresCqrs;
|
||||
|
||||
use crate::{
|
||||
db::migrate::*,
|
||||
inventory::{
|
||||
application::services::{
|
||||
add_category_service::tests::mock_add_category_service,
|
||||
add_store_service::AddStoreServiceBuilder, InventoryServicesBuilder,
|
||||
},
|
||||
domain::{
|
||||
add_category_command::AddCategoryCommand, add_store_command::AddStoreCommand,
|
||||
commands::InventoryCommand,
|
||||
},
|
||||
},
|
||||
tests::bdd::IS_NEVER_CALLED,
|
||||
utils::{random_string::GenerateRandomStringInterface, uuid::tests::UUID},
|
||||
};
|
||||
use std::sync::Arc;
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn pg_query() {
|
||||
let settings = crate::settings::tests::get_settings().await;
|
||||
//let settings = crate::settings::Settings::new().unwrap();
|
||||
settings.create_db().await;
|
||||
|
||||
let db = crate::db::sqlx_postgres::Postgres::init(&settings.database.url).await;
|
||||
db.migrate().await;
|
||||
let db = InventoryDBPostgresAdapter::new(db.pool.clone());
|
||||
|
||||
let simple_query = SimpleLoggingQuery {};
|
||||
|
||||
let queries: Vec<Box<dyn Query<Store>>> =
|
||||
vec![Box::new(simple_query), Box::new(db.clone())];
|
||||
|
||||
let services = InventoryServicesBuilder::default()
|
||||
.add_store(Arc::new(
|
||||
AddStoreServiceBuilder::default()
|
||||
.db_store_id_exists(Arc::new(db.clone()))
|
||||
.db_store_name_exists(Arc::new(db.clone()))
|
||||
.get_uuid(Arc::new(crate::utils::uuid::GenerateUUID {}))
|
||||
.build()
|
||||
.unwrap(),
|
||||
))
|
||||
.add_category(mock_add_category_service(
|
||||
IS_NEVER_CALLED,
|
||||
AddCategoryCommand::new("foo".into(), None, UUID.clone(), "bar".into()).unwrap(),
|
||||
))
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
let (cqrs, _store_query): (
|
||||
Arc<PostgresCqrs<Store>>,
|
||||
Arc<dyn ViewRepository<StoreView, Store>>,
|
||||
) = (
|
||||
Arc::new(postgres_es::postgres_cqrs(
|
||||
db.pool.clone(),
|
||||
queries,
|
||||
Arc::new(services),
|
||||
)),
|
||||
Arc::new(db.clone()),
|
||||
);
|
||||
|
||||
let rand = crate::utils::random_string::GenerateRandomString {};
|
||||
let cmd = AddStoreCommand::new(rand.get_random(10), None, "me".into()).unwrap();
|
||||
cqrs.execute("", InventoryCommand::AddStore(cmd.clone()))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
settings.drop_db().await;
|
||||
}
|
||||
}
|
||||
|
|
27
src/inventory/adapters/output/db/postgres/utils.rs
Normal file
27
src/inventory/adapters/output/db/postgres/utils.rs
Normal file
|
@ -0,0 +1,27 @@
|
|||
// SPDX-FileCopyrightText: 2024 Aravinth Manivannan <realaravinth@batsense.net>
|
||||
//
|
||||
// SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
|
||||
use cqrs_es::persist::{PersistenceError, ViewContext};
|
||||
use uuid::Uuid;
|
||||
|
||||
pub fn parse_aggregate_id<T: Default>(
|
||||
aggregate_id: &str,
|
||||
non_id: &str,
|
||||
) -> Result<Option<(T, ViewContext)>, PersistenceError> {
|
||||
match Uuid::parse_str(aggregate_id) {
|
||||
Ok(_) => return Ok(None),
|
||||
Err(e) => {
|
||||
if aggregate_id == non_id {
|
||||
// if store_id is unbearable, then store isn't created yet. Use cleaner, robust method
|
||||
// later.
|
||||
return Ok(Some((
|
||||
T::default(),
|
||||
ViewContext::new(aggregate_id.into(), 0),
|
||||
)));
|
||||
} else {
|
||||
return Err(PersistenceError::UnknownError(Box::new(e)));
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
|
@ -1,6 +1,7 @@
|
|||
// SPDX-FileCopyrightText: 2024 Aravinth Manivannan <realaravinth@batsense.net>
|
||||
//
|
||||
// SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use derive_builder::Builder;
|
||||
|
|
Loading…
Reference in a new issue