feat: use unique non-UUIDs to signal new aggregate creation, and check
Some checks failed
ci/woodpecker/push/woodpecker Pipeline failed
ci/woodpecker/pr/woodpecker Pipeline failed
ci/woodpecker/pull_request_closed/woodpecker Pipeline was successful

for the same in cqrs framework
This commit is contained in:
Aravinth Manivannan 2024-07-14 18:26:39 +05:30
parent 4c6aa0782e
commit 55c881adc8
Signed by: realaravinth
GPG key ID: F8F50389936984FF
8 changed files with 191 additions and 53 deletions

View file

@ -59,9 +59,8 @@ mod tests {
sqlx::query!(
"INSERT INTO cqrs_inventory_category_query
(view_id, version, name, description, category_id, store_id)
VALUES ($1, $2, $3, $4, $5, $6);",
"1",
(version, name, description, category_id, store_id)
VALUES ($1, $2, $3, $4, $5);",
1,
category.name(),
category.description().as_ref().unwrap(),

View file

@ -64,9 +64,8 @@ mod tests {
sqlx::query!(
"INSERT INTO cqrs_inventory_category_query
(view_id, version, name, description, category_id, store_id)
VALUES ($1, $2, $3, $4, $5, $6);",
"1",
(version, name, description, category_id, store_id)
VALUES ($1, $2, $3, $4, $5);",
1,
category.name(),
category.description().as_ref().unwrap(),

View file

@ -14,6 +14,8 @@ use crate::inventory::domain::category_aggregate::Category;
use crate::inventory::domain::events::InventoryEvent;
use serde::{Deserialize, Serialize};
pub const NEW_CATEGORY_NON_UUID: &str = "new_category_non_uuid-asdfa";
// The view for a Category query, for a standard http application this should
// be designed to reflect the response dto that will be returned to a user.
#[derive(Debug, Default, Serialize, Deserialize)]
@ -43,7 +45,13 @@ impl View<Category> for CategoryView {
#[async_trait]
impl ViewRepository<CategoryView, Category> for InventoryDBPostgresAdapter {
async fn load(&self, view_id: &str) -> Result<Option<CategoryView>, PersistenceError> {
async fn load(&self, category_id: &str) -> Result<Option<CategoryView>, PersistenceError> {
let category_id =
match super::utils::parse_aggregate_id(category_id, NEW_CATEGORY_NON_UUID)? {
Some((val, _)) => return Ok(Some(val)),
None => Uuid::parse_str(category_id).unwrap(),
};
let res = sqlx::query_as!(
CategoryView,
"SELECT
@ -51,8 +59,8 @@ impl ViewRepository<CategoryView, Category> for InventoryDBPostgresAdapter {
FROM
cqrs_inventory_category_query
WHERE
view_id = $1;",
view_id
category_id = $1;",
category_id
)
.fetch_one(&self.pool)
.await
@ -62,8 +70,14 @@ impl ViewRepository<CategoryView, Category> for InventoryDBPostgresAdapter {
async fn load_with_context(
&self,
view_id: &str,
category_id: &str,
) -> Result<Option<(CategoryView, ViewContext)>, PersistenceError> {
let category_id =
match super::utils::parse_aggregate_id(category_id, NEW_CATEGORY_NON_UUID)? {
Some(val) => return Ok(Some(val)),
None => Uuid::parse_str(category_id).unwrap(),
};
let res = sqlx::query_as!(
CategoryView,
"SELECT
@ -71,8 +85,8 @@ impl ViewRepository<CategoryView, Category> for InventoryDBPostgresAdapter {
FROM
cqrs_inventory_category_query
WHERE
view_id = $1;",
view_id
category_id = $1;",
category_id
)
.fetch_one(&self.pool)
.await
@ -80,24 +94,24 @@ impl ViewRepository<CategoryView, Category> for InventoryDBPostgresAdapter {
struct Context {
version: i64,
view_id: String,
category_id: Uuid,
}
let ctx = sqlx::query_as!(
Context,
"SELECT
view_id, version
category_id, version
FROM
cqrs_inventory_category_query
WHERE
view_id = $1;",
view_id
category_id = $1;",
category_id
)
.fetch_one(&self.pool)
.await
.map_err(PostgresAggregateError::from)?;
let view_context = ViewContext::new(ctx.view_id, ctx.version);
let view_context = ViewContext::new(ctx.category_id.to_string(), ctx.version);
Ok(Some((res, view_context)))
}
@ -111,11 +125,10 @@ impl ViewRepository<CategoryView, Category> for InventoryDBPostgresAdapter {
let version = context.version + 1;
sqlx::query!(
"INSERT INTO cqrs_inventory_category_query (
view_id, version, name, description, category_id, store_id
version, name, description, category_id, store_id
) VALUES (
$1, $2, $3, $4, $5, $6
$1, $2, $3, $4, $5
);",
context.view_instance_id,
version,
view.name,
view.description,
@ -132,13 +145,11 @@ impl ViewRepository<CategoryView, Category> for InventoryDBPostgresAdapter {
"UPDATE
cqrs_inventory_category_query
SET
view_id = $1,
version = $2,
name = $3,
description = $4,
category_id = $5,
store_id = $6;",
context.view_instance_id,
version = $1,
name = $2,
description = $3,
category_id = $4,
store_id = $5;",
version,
view.name,
view.description,

View file

@ -13,7 +13,9 @@ mod category_name_exists_for_store;
mod category_view;
mod errors;
mod store_id_exists;
mod store_name_exists;
mod store_view;
mod utils;
#[derive(Clone)]
pub struct InventoryDBPostgresAdapter {

View file

@ -58,9 +58,8 @@ mod tests {
sqlx::query!(
"INSERT INTO cqrs_inventory_store_query
(view_id, version, name, address, store_id, owner)
VALUES ($1, $2, $3, $4, $5, $6);",
"1",
(version, name, address, store_id, owner)
VALUES ($1, $2, $3, $4, $5);",
1,
store.name(),
store.address().as_ref().unwrap(),

View file

@ -3,7 +3,6 @@
// SPDX-License-Identifier: AGPL-3.0-or-later
use async_trait::async_trait;
use cqrs_es::persist::GenericQuery;
use cqrs_es::persist::{PersistenceError, ViewContext, ViewRepository};
use cqrs_es::{EventEnvelope, Query, View};
use serde::{Deserialize, Serialize};
@ -14,6 +13,8 @@ use super::InventoryDBPostgresAdapter;
use crate::inventory::domain::events::InventoryEvent;
use crate::inventory::domain::store_aggregate::Store;
pub const NEW_STORE_NON_UUID: &str = "new_store_non_uuid-asdfa";
// The view for a Store query, for a standard http application this should
// be designed to reflect the response dto that will be returned to a user.
#[derive(Debug, Default, Serialize, Deserialize)]
@ -43,7 +44,12 @@ impl View<Store> for StoreView {
#[async_trait]
impl ViewRepository<StoreView, Store> for InventoryDBPostgresAdapter {
async fn load(&self, view_id: &str) -> Result<Option<StoreView>, PersistenceError> {
async fn load(&self, store_id: &str) -> Result<Option<StoreView>, PersistenceError> {
let store_id = match super::utils::parse_aggregate_id(store_id, NEW_STORE_NON_UUID)? {
Some((val, _)) => return Ok(Some(val)),
None => Uuid::parse_str(store_id).unwrap(),
};
let res = sqlx::query_as!(
StoreView,
"SELECT
@ -51,8 +57,8 @@ impl ViewRepository<StoreView, Store> for InventoryDBPostgresAdapter {
FROM
cqrs_inventory_store_query
WHERE
view_id = $1;",
view_id
store_id = $1;",
store_id
)
.fetch_one(&self.pool)
.await
@ -62,8 +68,13 @@ impl ViewRepository<StoreView, Store> for InventoryDBPostgresAdapter {
async fn load_with_context(
&self,
view_id: &str,
store_id: &str,
) -> Result<Option<(StoreView, ViewContext)>, PersistenceError> {
let store_id = match super::utils::parse_aggregate_id(store_id, NEW_STORE_NON_UUID)? {
Some(val) => return Ok(Some(val)),
None => Uuid::parse_str(store_id).unwrap(),
};
let res = sqlx::query_as!(
StoreView,
"SELECT
@ -71,8 +82,8 @@ impl ViewRepository<StoreView, Store> for InventoryDBPostgresAdapter {
FROM
cqrs_inventory_store_query
WHERE
view_id = $1;",
view_id
store_id = $1;",
&store_id,
)
.fetch_one(&self.pool)
.await
@ -80,24 +91,24 @@ impl ViewRepository<StoreView, Store> for InventoryDBPostgresAdapter {
struct Context {
version: i64,
view_id: String,
store_id: Uuid,
}
let ctx = sqlx::query_as!(
Context,
"SELECT
view_id, version
store_id, version
FROM
cqrs_inventory_store_query
WHERE
view_id = $1;",
view_id
store_id = $1;",
store_id
)
.fetch_one(&self.pool)
.await
.map_err(PostgresAggregateError::from)?;
let view_context = ViewContext::new(ctx.view_id, ctx.version);
let view_context = ViewContext::new(ctx.store_id.to_string(), ctx.version);
Ok(Some((res, view_context)))
}
@ -111,11 +122,10 @@ impl ViewRepository<StoreView, Store> for InventoryDBPostgresAdapter {
let version = context.version + 1;
sqlx::query!(
"INSERT INTO cqrs_inventory_store_query (
view_id, version, name, address, store_id, owner
version, name, address, store_id, owner
) VALUES (
$1, $2, $3, $4, $5, $6
$1, $2, $3, $4, $5
);",
context.view_instance_id,
version,
view.name,
view.address,
@ -132,13 +142,11 @@ impl ViewRepository<StoreView, Store> for InventoryDBPostgresAdapter {
"UPDATE
cqrs_inventory_store_query
SET
view_id = $1,
version = $2,
name = $3,
address = $4,
store_id = $5,
owner = $6;",
context.view_instance_id,
version = $1,
name = $2,
address = $3,
store_id = $4,
owner = $5;",
version,
view.name,
view.address,
@ -169,7 +177,99 @@ impl Query<Store> for SimpleLoggingQuery {
}
}
#[async_trait]
impl Query<Store> for InventoryDBPostgresAdapter {
async fn dispatch(&self, store_id: &str, events: &[EventEnvelope<Store>]) {
let res = self
.load_with_context(&store_id)
.await
.unwrap_or_else(|_| Some((StoreView::default(), ViewContext::new(store_id.into(), 0))));
let (mut view, view_context): (StoreView, ViewContext) = res.unwrap();
for event in events {
view.update(event);
}
self.update_view(view, view_context).await.unwrap();
}
}
// Our second query, this one will be handled with Postgres `GenericQuery`
// which will serialize and persist our view after it is updated. It also
// provides a `load` method to deserialize the view on request.
pub type StoreQuery = GenericQuery<InventoryDBPostgresAdapter, StoreView, Store>;
//pub type StoreQuery = GenericQuery<InventoryDBPostgresAdapter, StoreView, Store>;
//pub type StoreQuery = Query<dyn InventoryDBPostgresAdapter, StoreView, Store>;
#[cfg(test)]
mod tests {
use super::*;
use postgres_es::PostgresCqrs;
use crate::{
db::migrate::*,
inventory::{
application::services::{
add_category_service::tests::mock_add_category_service,
add_store_service::AddStoreServiceBuilder, InventoryServicesBuilder,
},
domain::{
add_category_command::AddCategoryCommand, add_store_command::AddStoreCommand,
commands::InventoryCommand,
},
},
tests::bdd::IS_NEVER_CALLED,
utils::{random_string::GenerateRandomStringInterface, uuid::tests::UUID},
};
use std::sync::Arc;
#[actix_rt::test]
async fn pg_query() {
let settings = crate::settings::tests::get_settings().await;
//let settings = crate::settings::Settings::new().unwrap();
settings.create_db().await;
let db = crate::db::sqlx_postgres::Postgres::init(&settings.database.url).await;
db.migrate().await;
let db = InventoryDBPostgresAdapter::new(db.pool.clone());
let simple_query = SimpleLoggingQuery {};
let queries: Vec<Box<dyn Query<Store>>> =
vec![Box::new(simple_query), Box::new(db.clone())];
let services = InventoryServicesBuilder::default()
.add_store(Arc::new(
AddStoreServiceBuilder::default()
.db_store_id_exists(Arc::new(db.clone()))
.db_store_name_exists(Arc::new(db.clone()))
.get_uuid(Arc::new(crate::utils::uuid::GenerateUUID {}))
.build()
.unwrap(),
))
.add_category(mock_add_category_service(
IS_NEVER_CALLED,
AddCategoryCommand::new("foo".into(), None, UUID.clone(), "bar".into()).unwrap(),
))
.build()
.unwrap();
let (cqrs, _store_query): (
Arc<PostgresCqrs<Store>>,
Arc<dyn ViewRepository<StoreView, Store>>,
) = (
Arc::new(postgres_es::postgres_cqrs(
db.pool.clone(),
queries,
Arc::new(services),
)),
Arc::new(db.clone()),
);
let rand = crate::utils::random_string::GenerateRandomString {};
let cmd = AddStoreCommand::new(rand.get_random(10), None, "me".into()).unwrap();
cqrs.execute("", InventoryCommand::AddStore(cmd.clone()))
.await
.unwrap();
settings.drop_db().await;
}
}

View file

@ -0,0 +1,27 @@
// SPDX-FileCopyrightText: 2024 Aravinth Manivannan <realaravinth@batsense.net>
//
// SPDX-License-Identifier: AGPL-3.0-or-later
use cqrs_es::persist::{PersistenceError, ViewContext};
use uuid::Uuid;
pub fn parse_aggregate_id<T: Default>(
aggregate_id: &str,
non_id: &str,
) -> Result<Option<(T, ViewContext)>, PersistenceError> {
match Uuid::parse_str(aggregate_id) {
Ok(_) => return Ok(None),
Err(e) => {
if aggregate_id == non_id {
// if store_id is unbearable, then store isn't created yet. Use cleaner, robust method
// later.
return Ok(Some((
T::default(),
ViewContext::new(aggregate_id.into(), 0),
)));
} else {
return Err(PersistenceError::UnknownError(Box::new(e)));
}
}
};
}

View file

@ -1,6 +1,7 @@
// SPDX-FileCopyrightText: 2024 Aravinth Manivannan <realaravinth@batsense.net>
//
// SPDX-License-Identifier: AGPL-3.0-or-later
use std::sync::Arc;
use derive_builder::Builder;