feat: define store aggregates and implement service to create store
Some checks failed
ci/woodpecker/push/woodpecker Pipeline failed
ci/woodpecker/pr/woodpecker Pipeline failed

This commit is contained in:
Aravinth Manivannan 2024-07-13 17:40:11 +05:30
parent 6609e52632
commit 19b6c5420b
Signed by: realaravinth
GPG key ID: F8F50389936984FF
13 changed files with 758 additions and 0 deletions

View file

@ -0,0 +1,79 @@
// SPDX-FileCopyrightText: 2024 Aravinth Manivannan <realaravinth@batsense.net>
//
// SPDX-License-Identifier: AGPL-3.0-or-later
use std::borrow::Cow;
use cqrs_es::persist::PersistenceError;
use sqlx::Error as SqlxError;
use crate::inventory::application::port::output::db::errors::InventoryDBError;
impl From<SqlxError> for InventoryDBError {
fn from(e: SqlxError) -> Self {
log::error!("[postgres] err: {}", e);
if let SqlxError::Database(err) = e {
if err.code() == Some(Cow::from("23505")) {
let msg = err.message();
if msg.contains("cqrs_inventory_store_query_store_id_key") {
return Self::DuplicateStoreID;
} else {
println!("{msg}");
}
}
}
Self::InternalError
}
}
/// map custom row not found error to DB error
pub fn map_row_not_found_err(e: SqlxError, row_not_found: InventoryDBError) -> InventoryDBError {
if let SqlxError::RowNotFound = e {
row_not_found
} else {
e.into()
}
}
#[derive(Debug)]
pub enum PostgresAggregateError {
OptimisticLock,
ConnectionError(Box<dyn std::error::Error + Send + Sync + 'static>),
DeserializationError(Box<dyn std::error::Error + Send + Sync + 'static>),
UnknownError(Box<dyn std::error::Error + Send + Sync + 'static>),
}
impl From<SqlxError> for PostgresAggregateError {
fn from(err: SqlxError) -> Self {
// TODO: improve error handling
match &err {
SqlxError::Database(database_error) => {
if let Some(code) = database_error.code() {
if code.as_ref() == "23505" {
return PostgresAggregateError::OptimisticLock;
}
}
PostgresAggregateError::UnknownError(Box::new(err))
}
SqlxError::Io(_) | SqlxError::Tls(_) => {
PostgresAggregateError::ConnectionError(Box::new(err))
}
_ => PostgresAggregateError::UnknownError(Box::new(err)),
}
}
}
impl From<PostgresAggregateError> for PersistenceError {
fn from(err: PostgresAggregateError) -> Self {
match err {
PostgresAggregateError::OptimisticLock => PersistenceError::OptimisticLockError,
PostgresAggregateError::ConnectionError(error) => {
PersistenceError::ConnectionError(error)
}
PostgresAggregateError::DeserializationError(error) => {
PersistenceError::UnknownError(error)
}
PostgresAggregateError::UnknownError(error) => PersistenceError::UnknownError(error),
}
}
}

View file

@ -0,0 +1,27 @@
// SPDX-FileCopyrightText: 2024 Aravinth Manivannan <realaravinth@batsense.net>
//
// SPDX-License-Identifier: AGPL-3.0-or-later
use std::sync::Arc;
use sqlx::postgres::PgPool;
use crate::db::{migrate::RunMigrations, sqlx_postgres::Postgres};
mod errors;
mod store_id_exists;
mod store_view;
#[derive(Clone)]
pub struct InventoryDBPostgresAdapter {
pool: PgPool,
}
impl InventoryDBPostgresAdapter {
pub fn new(pool: PgPool) -> Self {
Self { pool }
}
pub fn migratable(&self) -> Arc<dyn RunMigrations> {
Arc::new(Postgres::new(self.pool.clone()))
}
}

View file

@ -0,0 +1,79 @@
// SPDX-FileCopyrightText: 2024 Aravinth Manivannan <realaravinth@batsense.net>
//
// SPDX-License-Identifier: AGPL-3.0-or-later
use super::InventoryDBPostgresAdapter;
use crate::inventory::application::port::output::db::{errors::*, store_id_exists::*};
use crate::inventory::domain::store_aggregate::*;
#[async_trait::async_trait]
impl StoreIDExistsDBPort for InventoryDBPostgresAdapter {
async fn store_id_exists(&self, s: &Store) -> InventoryDBResult<bool> {
let res = sqlx::query!(
"SELECT EXISTS (
SELECT 1
FROM cqrs_inventory_store_query
WHERE
store_id = $1
);",
s.store_id(),
)
.fetch_one(&self.pool)
.await?;
if let Some(x) = res.exists {
Ok(x)
} else {
Ok(false)
}
}
}
#[cfg(test)]
mod tests {
use uuid::Uuid;
use super::*;
#[actix_rt::test]
async fn test_postgres_store_exists() {
let store_id = Uuid::new_v4();
let settings = crate::settings::tests::get_settings().await;
settings.create_db().await;
let db = super::InventoryDBPostgresAdapter::new(
sqlx::postgres::PgPool::connect(&settings.database.url)
.await
.unwrap(),
);
let store = StoreBuilder::default()
.name("store_name".into())
.owner("store_owner".into())
.address(Some("store_address".into()))
.store_id(store_id)
.build()
.unwrap();
// state doesn't exist
assert!(!db.store_id_exists(&store).await.unwrap());
sqlx::query!(
"INSERT INTO cqrs_inventory_store_query
(view_id, version, name, address, store_id, owner)
VALUES ($1, $2, $3, $4, $5, $6);",
"1",
1,
store.name(),
store.address().as_ref().unwrap(),
store.store_id(),
store.owner(),
)
.execute(&db.pool)
.await
.unwrap();
// state exists
assert!(db.store_id_exists(&store).await.unwrap());
settings.drop_db().await;
}
}

View file

@ -0,0 +1,175 @@
// SPDX-FileCopyrightText: 2024 Aravinth Manivannan <realaravinth@batsense.net>
//
// SPDX-License-Identifier: AGPL-3.0-or-later
use async_trait::async_trait;
use cqrs_es::persist::GenericQuery;
use cqrs_es::persist::{PersistenceError, ViewContext, ViewRepository};
use cqrs_es::{EventEnvelope, Query, View};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use super::errors::*;
use super::InventoryDBPostgresAdapter;
use crate::inventory::domain::events::InventoryEvent;
use crate::inventory::domain::store_aggregate::Store;
// The view for a Store query, for a standard http application this should
// be designed to reflect the response dto that will be returned to a user.
#[derive(Debug, Default, Serialize, Deserialize)]
pub struct StoreView {
name: String,
address: Option<String>,
store_id: Uuid,
owner: String,
}
// This updates the view with events as they are committed.
// The logic should be minimal here, e.g., don't calculate the account balance,
// design the events to carry the balance information instead.
impl View<Store> for StoreView {
fn update(&mut self, event: &EventEnvelope<Store>) {
match &event.payload {
InventoryEvent::StoreAdded(val) => {
self.name = val.name().into();
self.address = val.address().clone();
self.store_id = val.store_id().clone();
self.owner = val.owner().clone();
}
_ => (),
}
}
}
#[async_trait]
impl ViewRepository<StoreView, Store> for InventoryDBPostgresAdapter {
async fn load(&self, view_id: &str) -> Result<Option<StoreView>, PersistenceError> {
let res = sqlx::query_as!(
StoreView,
"SELECT
name, address, store_id, owner
FROM
cqrs_inventory_store_query
WHERE
view_id = $1;",
view_id
)
.fetch_one(&self.pool)
.await
.map_err(PostgresAggregateError::from)?;
Ok(Some(res))
}
async fn load_with_context(
&self,
view_id: &str,
) -> Result<Option<(StoreView, ViewContext)>, PersistenceError> {
let res = sqlx::query_as!(
StoreView,
"SELECT
name, address, store_id, owner
FROM
cqrs_inventory_store_query
WHERE
view_id = $1;",
view_id
)
.fetch_one(&self.pool)
.await
.map_err(PostgresAggregateError::from)?;
struct Context {
version: i64,
view_id: String,
}
let ctx = sqlx::query_as!(
Context,
"SELECT
view_id, version
FROM
cqrs_inventory_store_query
WHERE
view_id = $1;",
view_id
)
.fetch_one(&self.pool)
.await
.map_err(PostgresAggregateError::from)?;
let view_context = ViewContext::new(ctx.view_id, ctx.version);
Ok(Some((res, view_context)))
}
async fn update_view(
&self,
view: StoreView,
context: ViewContext,
) -> Result<(), PersistenceError> {
match context.version {
0 => {
let version = context.version + 1;
sqlx::query!(
"INSERT INTO cqrs_inventory_store_query (
view_id, version, name, address, store_id, owner
) VALUES (
$1, $2, $3, $4, $5, $6
);",
context.view_instance_id,
version,
view.name,
view.address,
view.store_id,
view.owner,
)
.execute(&self.pool)
.await
.map_err(PostgresAggregateError::from)?;
}
_ => {
let version = context.version + 1;
sqlx::query!(
"UPDATE
cqrs_inventory_store_query
SET
view_id = $1,
version = $2,
name = $3,
address = $4,
store_id = $5,
owner = $6;",
context.view_instance_id,
version,
view.name,
view.address,
view.store_id,
view.owner,
)
.execute(&self.pool)
.await
.map_err(PostgresAggregateError::from)?;
}
}
Ok(())
}
}
pub struct SimpleLoggingQuery {}
// Our simplest query, this is great for debugging but absolutely useless in production.
// This query just pretty prints the events as they are processed.
#[async_trait]
impl Query<Store> for SimpleLoggingQuery {
async fn dispatch(&self, aggregate_id: &str, events: &[EventEnvelope<Store>]) {
for event in events {
let payload = serde_json::to_string_pretty(&event.payload).unwrap();
println!("{}-{}\n{}", aggregate_id, event.sequence, payload);
}
}
}
// Our second query, this one will be handled with Postgres `GenericQuery`
// which will serialize and persist our view after it is updated. It also
// provides a `load` method to deserialize the view on request.
pub type StoreQuery = GenericQuery<InventoryDBPostgresAdapter, StoreView, Store>;

View file

@ -0,0 +1,15 @@
// SPDX-FileCopyrightText: 2024 Aravinth Manivannan <realaravinth@batsense.net>
//
// SPDX-License-Identifier: AGPL-3.0-or-later
use derive_more::Display;
use serde::{Deserialize, Serialize};
pub type InventoryDBResult<V> = Result<V, InventoryDBError>;
#[derive(Debug, Display, Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
pub enum InventoryDBError {
DuplicateCategoryName,
DuplicateStoreID,
InternalError,
}

View file

@ -0,0 +1,7 @@
// SPDX-FileCopyrightText: 2024 Aravinth Manivannan <realaravinth@batsense.net>
//
// SPDX-License-Identifier: AGPL-3.0-or-later
pub mod category_exists;
pub mod errors;
pub mod store_id_exists;

View file

@ -0,0 +1,54 @@
// SPDX-FileCopyrightText: 2024 Aravinth Manivannan <realaravinth@batsense.net>
//
// SPDX-License-Identifier: AGPL-3.0-or-later
use mockall::predicate::*;
use mockall::*;
use crate::inventory::domain::store_aggregate::Store;
use super::errors::*;
#[cfg(test)]
#[allow(unused_imports)]
pub use tests::*;
#[automock]
#[async_trait::async_trait]
pub trait StoreIDExistsDBPort: Send + Sync {
async fn store_id_exists(&self, s: &Store) -> InventoryDBResult<bool>;
}
pub type StoreIDExistsDBPortObj = std::sync::Arc<dyn StoreIDExistsDBPort>;
#[cfg(test)]
pub mod tests {
use super::*;
use std::sync::Arc;
pub fn mock_store_id_exists_db_port_false(times: Option<usize>) -> StoreIDExistsDBPortObj {
let mut m = MockStoreIDExistsDBPort::new();
if let Some(times) = times {
m.expect_store_id_exists()
.times(times)
.returning(|_| Ok(false));
} else {
m.expect_store_id_exists().returning(|_| Ok(false));
}
Arc::new(m)
}
pub fn mock_store_id_exists_db_port_true(times: Option<usize>) -> StoreIDExistsDBPortObj {
let mut m = MockStoreIDExistsDBPort::new();
if let Some(times) = times {
m.expect_store_id_exists()
.times(times)
.returning(|_| Ok(true));
} else {
m.expect_store_id_exists().returning(|_| Ok(true));
}
Arc::new(m)
}
}

View file

@ -0,0 +1,91 @@
// SPDX-FileCopyrightText: 2024 Aravinth Manivannan <realaravinth@batsense.net>
//
// SPDX-License-Identifier: AGPL-3.0-or-later
use derive_builder::Builder;
use uuid::Uuid;
use super::errors::*;
use crate::inventory::{
application::port::output::db::{errors::InventoryDBError, store_id_exists::*},
domain::{
add_store_command::AddStoreCommand,
store_added_event::{StoreAddedEvent, StoreAddedEventBuilder},
store_aggregate::*,
},
};
#[async_trait::async_trait]
pub trait AddStoreUseCase: Send + Sync {
async fn add_store(&self, cmd: AddStoreCommand) -> InventoryResult<StoreAddedEvent>;
}
pub type AddStoreServiceObj = std::sync::Arc<dyn AddStoreUseCase>;
#[derive(Clone, Builder)]
pub struct AddStoreService {
db_store_id_exists: StoreIDExistsDBPortObj,
}
#[async_trait::async_trait]
impl AddStoreUseCase for AddStoreService {
async fn add_store(&self, cmd: AddStoreCommand) -> InventoryResult<StoreAddedEvent> {
let mut store_id = Uuid::new_v4();
let mut store = StoreBuilder::default()
.name(cmd.name().into())
.address(cmd.address().as_ref().map(|s| s.to_string()))
.owner(cmd.owner().into())
.store_id(store_id.clone())
.build()
.unwrap();
loop {
if self.db_store_id_exists.store_id_exists(&store).await? {
store_id = Uuid::new_v4();
store = StoreBuilder::default()
.name(cmd.name().into())
.address(cmd.address().as_ref().map(|s| s.to_string()))
.store_id(store_id.clone())
.build()
.unwrap();
continue;
} else {
break;
}
}
Ok(StoreAddedEventBuilder::default()
.name(store.name().into())
.address(store.address().as_ref().map(|s| s.to_string()))
.owner(cmd.owner().into())
.store_id(store_id.clone())
.build()
.unwrap())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::tests::bdd::*;
#[actix_rt::test]
async fn test_service_store_id_doesnt_exist() {
let name = "foo";
let address = "bar";
let username = "baz";
// address = None
let cmd = AddStoreCommand::new(name.into(), Some(address.into()), username.into()).unwrap();
let s = AddStoreServiceBuilder::default()
.db_store_id_exists(mock_store_id_exists_db_port_false(IS_CALLED_ONLY_ONCE))
.build()
.unwrap();
let res = s.add_store(cmd.clone()).await.unwrap();
assert_eq!(res.name(), cmd.name());
assert_eq!(res.address(), cmd.address());
assert_eq!(res.owner(), cmd.owner());
}
}

View file

@ -0,0 +1,30 @@
// SPDX-FileCopyrightText: 2024 Aravinth Manivannan <realaravinth@batsense.net>
//
// SPDX-License-Identifier: AGPL-3.0-or-later
use derive_more::{Display, Error};
use log::error;
use serde::{Deserialize, Serialize};
use crate::inventory::application::port::output::db::errors::InventoryDBError;
pub type InventoryResult<V> = Result<V, InventoryError>;
#[derive(Debug, Error, Display, Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
pub enum InventoryError {
DuplicateCategoryName,
InternalError,
}
impl From<InventoryDBError> for InventoryError {
fn from(value: InventoryDBError) -> Self {
match value {
InventoryDBError::DuplicateCategoryName => Self::DuplicateCategoryName,
InventoryDBError::DuplicateStoreID => {
error!("DuplicateStoreID");
Self::InternalError
}
InventoryDBError::InternalError => Self::InternalError,
}
}
}

View file

@ -1,3 +1,33 @@
// SPDX-FileCopyrightText: 2024 Aravinth Manivannan <realaravinth@batsense.net>
//
// SPDX-License-Identifier: AGPL-3.0-or-later
use derive_builder::Builder;
use mockall::predicate::*;
use mockall::*;
pub mod errors;
// services
pub mod add_category_service;
pub mod add_store_service;
#[automock]
pub trait InventoryServicesInterface: Send + Sync {
fn add_store(&self) -> add_store_service::AddStoreServiceObj;
fn add_category(&self) -> add_category_service::AddCategoryServiceObj;
}
#[derive(Clone, Builder)]
pub struct InventoryServices {
add_store: add_store_service::AddStoreServiceObj,
add_category: add_category_service::AddCategoryServiceObj,
}
impl InventoryServicesInterface for InventoryServices {
fn add_store(&self) -> add_store_service::AddStoreServiceObj {
self.add_store.clone()
}
fn add_category(&self) -> add_category_service::AddCategoryServiceObj {
self.add_category.clone()
}
}

View file

@ -0,0 +1,79 @@
// SPDX-FileCopyrightText: 2024 Aravinth Manivannan <realaravinth@batsense.net>
//
// SPDX-License-Identifier: AGPL-3.0-or-later
use derive_getters::Getters;
use derive_more::{Display, Error};
use serde::{Deserialize, Serialize};
#[derive(Debug, Error, Display, Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
pub enum AddStoreCommandError {
NameIsEmpty,
}
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd, Getters)]
pub struct AddStoreCommand {
name: String,
address: Option<String>,
owner: String,
}
impl AddStoreCommand {
pub fn new(
name: String,
address: Option<String>,
owner: String,
) -> Result<Self, AddStoreCommandError> {
let address: Option<String> = if let Some(address) = address {
let address = address.trim();
if address.is_empty() {
None
} else {
Some(address.to_owned())
}
} else {
None
};
let name = name.trim().to_owned();
if name.is_empty() {
return Err(AddStoreCommandError::NameIsEmpty);
}
Ok(Self {
name,
address,
owner,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_cmd() {
let name = "foo";
let address = "bar";
let username = "baz";
// address = None
let cmd = AddStoreCommand::new(name.into(), None, username.into()).unwrap();
assert_eq!(cmd.name(), name);
assert_eq!(cmd.address(), &None);
assert_eq!(cmd.owner(), username);
// address = Some
let cmd = AddStoreCommand::new(name.into(), Some(address.into()), username.into()).unwrap();
assert_eq!(cmd.name(), name);
assert_eq!(cmd.address(), &Some(address.to_owned()));
assert_eq!(cmd.owner(), username);
// AddStoreCommandError::NameIsEmpty
assert_eq!(
AddStoreCommand::new("".into(), Some(address.into()), username.into()),
Err(AddStoreCommandError::NameIsEmpty)
)
}
}

View file

@ -0,0 +1,18 @@
// SPDX-FileCopyrightText: 2024 Aravinth Manivannan <realaravinth@batsense.net>
//
// SPDX-License-Identifier: AGPL-3.0-or-later
use derive_builder::Builder;
use derive_getters::Getters;
use serde::{Deserialize, Serialize};
use uuid::Uuid;
#[derive(
Clone, Debug, Builder, Serialize, Deserialize, Getters, Eq, PartialEq, Ord, PartialOrd,
)]
pub struct StoreAddedEvent {
name: String,
address: Option<String>,
owner: String,
store_id: Uuid,
}

View file

@ -0,0 +1,74 @@
// SPDX-FileCopyrightText: 2024 Aravinth Manivannan <realaravinth@batsense.net>
//
// SPDX-License-Identifier: AGPL-3.0-or-later
use async_trait::async_trait;
use cqrs_es::Aggregate;
use derive_builder::Builder;
use derive_getters::Getters;
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use crate::inventory::application::services::errors::*;
use crate::inventory::application::services::InventoryServicesInterface;
use super::{commands::InventoryCommand, events::InventoryEvent};
#[derive(
Clone, Default, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd, Builder, Getters,
)]
pub struct Store {
name: String,
address: Option<String>,
owner: String,
store_id: Uuid,
}
#[async_trait]
impl Aggregate for Store {
type Command = InventoryCommand;
type Event = InventoryEvent;
type Error = InventoryError;
type Services = std::sync::Arc<dyn InventoryServicesInterface>;
// This identifier should be unique to the system.
fn aggregate_type() -> String {
"inventory.store".to_string()
}
// The aggregate logic goes here. Note that this will be the _bulk_ of a CQRS system
// so expect to use helper functions elsewhere to keep the code clean.
async fn handle(
&self,
command: Self::Command,
services: &Self::Services,
) -> Result<Vec<Self::Event>, Self::Error> {
match command {
InventoryCommand::AddStore(cmd) => {
let res = services.add_store().add_store(cmd).await?;
Ok(vec![InventoryEvent::StoreAdded(res)])
}
_ => Ok(Vec::default()),
// InventoryCommand::AddCategory(cmd) => {
// let res = services.add_category().add_store(cmd).await?;
// Ok(vec![InventoryEvent::CategoryAdded(res)])
//
// }
}
}
fn apply(&mut self, event: Self::Event) {
match event {
InventoryEvent::StoreAdded(e) => {
*self = StoreBuilder::default()
.name(e.name().into())
.address(e.address().as_ref().map(|s| s.to_string()))
.owner(e.owner().into())
.store_id(e.store_id().clone())
.build()
.unwrap();
}
_ => (),
}
}
}