From 1f4d3574ab8ee62f46bf3f81c48f5fd4e5178c31 Mon Sep 17 00:00:00 2001 From: Aravinth Manivannan Date: Sat, 30 Dec 2023 18:00:09 +0530 Subject: [PATCH] feat: use DashMap to speedup hashcash implementation --- Cargo.toml | 4 +- src/mcaptcha/cache.rs | 260 +++++++++++++++++++++++++++++++++++++ src/mcaptcha/mcaptcha.rs | 268 ++++----------------------------------- src/mcaptcha/mod.rs | 1 + src/store/mod.rs | 88 +++++++------ 5 files changed, 331 insertions(+), 290 deletions(-) create mode 100644 src/mcaptcha/cache.rs diff --git a/Cargo.toml b/Cargo.toml index 03c8755..230b782 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -48,5 +48,5 @@ base64 = "0.13.0" anyhow = "1.0.63" maplit = "1.0.2" -[profile.release] -debug = true +#[profile.release] +#debug = true diff --git a/src/mcaptcha/cache.rs b/src/mcaptcha/cache.rs new file mode 100644 index 0000000..00dd063 --- /dev/null +++ b/src/mcaptcha/cache.rs @@ -0,0 +1,260 @@ +/* + * mCaptcha - A proof of work based DoS protection system + * Copyright © 2021 Aravinth Manivannan + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +//! In-memory cache implementation that uses [HashMap] +use std::sync::Arc; +use std::time::Duration; + +use dashmap::DashMap; +use serde::{Deserialize, Serialize}; + +use libmcaptcha::cache::messages::*; +use libmcaptcha::errors::*; + +#[derive(Clone, Default, Serialize, Deserialize)] +/// cache datastructure implementing [Save] +pub struct HashCache { + difficulty_map: Arc>, + result_map: Arc>, +} + +impl HashCache { + // save [PoWConfig] to cache + fn save_pow_config(&self, config: CachePoW) -> CaptchaResult<()> { + let challenge = config.string; + let config: CachedPoWConfig = CachedPoWConfig { + key: config.key, + difficulty_factor: config.difficulty_factor, + duration: config.duration, + }; + + self.difficulty_map.insert(challenge, config); + Ok(()) + } + + pub async fn clean_all_after_cold_start(&self, updated: HashCache) { + updated.difficulty_map.iter().map(|x| { + self.difficulty_map + .insert(x.key().to_owned(), x.value().to_owned()) + }); + updated.result_map.iter().map(|x| { + self.result_map + .insert(x.key().to_owned(), x.value().to_owned()) + }); + let cache = self.clone(); + let fut = async move { + for values in cache.result_map.iter() { + let inner_cache = cache.clone(); + let duration = values.value().1; + let key = values.key().to_owned(); + let inner_fut = async move { + tokio::time::sleep(Duration::new(duration, 0)).await; + inner_cache.remove_cache_result(&key); + }; + tokio::spawn(inner_fut); + } + + for values in cache.difficulty_map.iter() { + let inner_cache = cache.clone(); + let duration = values.value().duration; + let key = values.key().to_owned(); + let inner_fut = async move { + tokio::time::sleep(Duration::new(duration, 0)).await; + inner_cache.remove_pow_config(&key); + }; + tokio::spawn(inner_fut); + } + }; + + tokio::spawn(fut); + } + + // retrive [PoWConfig] from cache. Deletes config post retrival + pub fn retrive_pow_config( + &self, + msg: VerifyCaptchaResult, + ) -> CaptchaResult> { + if let Some(difficulty_factor) = self.remove_pow_config(&msg.token) { + Ok(Some(difficulty_factor)) + } else { + Ok(None) + } + } + + // delete [PoWConfig] from cache + pub fn remove_pow_config(&self, string: &str) -> Option { + self.difficulty_map.remove(string).map(|x| x.1) + } + + // save captcha result + fn save_captcha_result(&self, res: CacheResult) { + self.result_map.insert(res.token, (res.key, res.duration)); + } + + // verify captcha result + pub fn verify_captcha_result(&self, challenge: VerifyCaptchaResult) -> CaptchaResult { + if let Some(captcha_id) = self.remove_cache_result(&challenge.token) { + if captcha_id == challenge.key { + Ok(true) + } else { + Ok(false) + } + } else { + Ok(false) + } + } + + // delete cache result + pub fn remove_cache_result(&self, string: &str) -> Option { + self.result_map.remove(string).map(|x| x.1 .0) + } + + pub fn cache_pow(&self, msg: CachePoW) { + use std::time::Duration; + use tokio::time::sleep; + + let duration: Duration = Duration::new(msg.duration, 0); + let string = msg.string.clone(); + let cache = self.clone(); + let wait_for = async move { + sleep(duration).await; + //delay_for(duration).await; + cache.remove_pow_config(&string); + }; + let _ = self.save_pow_config(msg); + tokio::spawn(wait_for); + } + + /// cache PoW result + pub fn cache_result(&self, msg: CacheResult) { + use std::time::Duration; + use tokio::time::sleep; + + let token = msg.token.clone(); + msg.token.clone(); + msg.token.clone(); + msg.token.clone(); + + let duration: Duration = Duration::new(msg.duration, 0); + let cache = self.clone(); + let wait_for = async move { + sleep(duration).await; + //delay_for(duration).await; + cache.remove_cache_result(&token); + }; + tokio::spawn(wait_for); + + let _ = self.save_captcha_result(msg); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use libmcaptcha::master::AddVisitorResult; + use libmcaptcha::pow::PoWConfig; + + use std::time::Duration; + + #[actix_rt::test] + async fn hashcache_pow_cache_works() { + const DIFFICULTY_FACTOR: u32 = 54; + const DURATION: u64 = 5; + const KEY: &str = "mcaptchakey"; + let cache = HashCache::default(); + let pow: PoWConfig = PoWConfig::new(DIFFICULTY_FACTOR, KEY.into()); //salt is dummy here + let visitor_result = AddVisitorResult { + difficulty_factor: DIFFICULTY_FACTOR, + duration: DURATION, + }; + let string = pow.string.clone(); + + let msg = CachePoWBuilder::default() + .string(pow.string.clone()) + .difficulty_factor(DIFFICULTY_FACTOR) + .duration(visitor_result.duration) + .key(KEY.into()) + .build() + .unwrap(); + + cache.cache_pow(msg); + + let msg = VerifyCaptchaResult { + token: string.clone(), + key: KEY.into(), + }; + let cache_difficulty_factor = cache.retrive_pow_config(msg.clone()).unwrap(); + + assert_eq!( + DIFFICULTY_FACTOR, + cache_difficulty_factor.unwrap().difficulty_factor + ); + + let duration: Duration = Duration::new(5, 0); + //sleep(DURATION + DURATION).await; + tokio::time::sleep(duration + duration).await; + + let expired_string = cache.retrive_pow_config(msg).unwrap(); + assert_eq!(None, expired_string); + } + + #[actix_rt::test] + async fn hashcache_result_cache_works() { + const DURATION: u64 = 5; + const KEY: &str = "a"; + const RES: &str = "b"; + let cache = HashCache::default(); + // send value to cache + // send another value to cache for auto delete + // verify_captcha_result + // delete + // wait for timeout and verify_captcha_result against second value + + let add_cache = CacheResult { + key: KEY.into(), + token: RES.into(), + duration: DURATION, + }; + + cache.cache_result(add_cache); + + let verify_msg = VerifyCaptchaResult { + key: KEY.into(), + token: RES.into(), + }; + + assert!(cache.verify_captcha_result(verify_msg.clone()).unwrap()); + + // duplicate + assert!(!cache.verify_captcha_result(verify_msg).unwrap()); + + let verify_msg = VerifyCaptchaResult { + key: "cz".into(), + token: RES.into(), + }; + assert!(!cache.verify_captcha_result(verify_msg).unwrap()); + + let duration: Duration = Duration::new(5, 0); + tokio::time::sleep(duration + duration).await; + + let verify_msg = VerifyCaptchaResult { + key: KEY.into(), + token: RES.into(), + }; + assert!(!cache.verify_captcha_result(verify_msg).unwrap()); + } +} diff --git a/src/mcaptcha/mcaptcha.rs b/src/mcaptcha/mcaptcha.rs index 0309d18..67e7263 100644 --- a/src/mcaptcha/mcaptcha.rs +++ b/src/mcaptcha/mcaptcha.rs @@ -21,7 +21,6 @@ use std::time::Duration; use dashmap::DashMap; use serde::{Deserialize, Serialize}; -use tokio::sync::mpsc; use super::defense::Defense; use libmcaptcha::errors::*; @@ -84,15 +83,6 @@ pub struct MCaptcha { duration: u64, } -//impl From for crate::master::CreateMCaptcha { -// fn from(m: MCaptcha) -> Self { -// Self { -// levels: m.defense.into(), -// duration: m.duration, -// } -// } -//} - impl MCaptcha { /// increments the visitor count by one #[inline] @@ -101,12 +91,6 @@ impl MCaptcha { let current_visitor_level = self.visitor_threshold.fetch_add(1, Ordering::SeqCst) + 1; let current_level = self.defense.current_level(current_visitor_level); current_level.difficulty_factor - - //if current_visitor_level > current_level.visitor_threshold { - // self.defense.tighten_up(); - //} else { - // self.defense.loosen_up(); - //} } /// decrements the visitor count by specified count @@ -139,35 +123,11 @@ impl MCaptcha { } }); } - - // /// get current difficulty factor - // #[inline] - // pub fn get_difficulty(&self) -> u32 { - // self.defense.get_difficulty() - // } - // - // /// get [Counter]'s lifetime - // #[inline] - // pub fn get_duration(&self) -> u64 { - // self.duration - // } - // - // /// get [Counter]'s current visitor_threshold - // #[inline] - // pub fn get_visitors(&self) -> u32 { - // self.visitor_threshold.load(Ordering::Relaxed) - // } - // - // /// get mCaptcha's defense configuration - // #[inline] - // pub fn get_defense(&self) -> Defense { - // self.defense.clone() - // } } #[derive(Clone, Serialize, Deserialize)] pub struct Manager { - pub captchas: DashMap>, + pub captchas: Arc>>, pub gc: u64, } @@ -181,7 +141,7 @@ impl Manager { /// accepts a `u64` to configure garbage collection period pub fn new(gc: u64) -> Self { Manager { - captchas: DashMap::new(), + captchas: Arc::new(DashMap::new()), gc, } } @@ -209,6 +169,28 @@ impl Manager { } } + pub async fn clean_all_after_cold_start(&self, updated: Manager) { + updated.captchas.iter().map(|x| { + self.captchas + .insert(x.key().to_owned(), x.value().to_owned()) + }); + let captchas = self.clone(); + let keys: Vec = captchas + .captchas + .clone() + .iter() + .map(|x| x.key().to_owned()) + .collect(); + let fut = async move { + tokio::time::sleep(Duration::new(captchas.gc, 0)).await; + for key in keys.iter() { + captchas.rm_captcha(key); + } + }; + + tokio::spawn(fut); + } + pub fn add_visitor( &self, msg: &MasterMessages::AddVisitor, @@ -233,7 +215,7 @@ impl Manager { } } - pub fn get_internal_data(&self) -> DashMap> { + pub fn get_internal_data(&self) -> Arc>> { self.captchas.clone() } @@ -262,203 +244,3 @@ impl From<&libmcaptcha::mcaptcha::MCaptcha> for MCaptcha { new_captcha } } - -//impl Actor for Master { -// type Context = Context; -// -// fn started(&mut self, ctx: &mut Self::Context) { -// let addr = ctx.address(); -// let task = async move { -// addr.send(CleanUp).await.unwrap(); -// } -// .into_actor(self); -// ctx.spawn(task); -// } -//} -// -//impl Handler for Master { -// type Result = MessageResult; -// -// fn handle(&mut self, m: AddVisitor, ctx: &mut Self::Context) -> Self::Result { -// let (tx, rx) = channel(); -// match self.get_site(&m.0) { -// None => { -// let _ = tx.send(Ok(None)); -// } -// Some(addr) => { -// let fut = async move { -// match addr.send(super::counter::AddVisitor).await { -// Ok(val) => { -// let _ = tx.send(Ok(Some(val))); -// } -// Err(e) => { -// let err: CaptchaError = e.into(); -// let _ = tx.send(Err(err)); -// } -// } -// } -// .into_actor(self); -// ctx.spawn(fut); -// } -// } -// MessageResult(rx) -// } -//} -// -//impl Handler for Master { -// type Result = MessageResult; -// -// fn handle(&mut self, m: Rename, _ctx: &mut Self::Context) -> Self::Result { -// self.rename(m); -// let (tx, rx) = channel(); -// let _ = tx.send(Ok(())); -// MessageResult(rx) -// } -//} -// -///// Message to get an [Counter] actor from master -//#[derive(Message)] -//#[rtype(result = "Option>")] -//pub struct GetSite(pub String); -// -//impl Handler for Master { -// type Result = MessageResult; -// -// fn handle(&mut self, m: GetSite, _ctx: &mut Self::Context) -> Self::Result { -// let addr = self.get_site(&m.0); -// match addr { -// None => MessageResult(None), -// Some(addr) => MessageResult(Some(addr)), -// } -// } -//} -// -///// Message to clean up master of [Counter] actors with zero visitor count -//#[derive(Message)] -//#[rtype(result = "()")] -//pub struct CleanUp; -// -//impl Handler for Master { -// type Result = (); -// -// fn handle(&mut self, _: CleanUp, ctx: &mut Self::Context) -> Self::Result { -// let sites = self.sites.clone(); -// let gc = self.gc; -// let master = ctx.address(); -// info!("init master actor cleanup up"); -// let task = async move { -// for (id, (new, addr)) in sites.iter() { -// let visitor_count = addr.send(GetCurrentVisitorCount).await.unwrap(); -// if visitor_count == 0 && new.is_some() { -// addr.send(Stop).await.unwrap(); -// master.send(RemoveCaptcha(id.to_owned())).await.unwrap(); -// } -// } -// -// let duration = Duration::new(gc, 0); -// sleep(duration).await; -// //delay_for(duration).await; -// master.send(CleanUp).await.unwrap(); -// } -// .into_actor(self); -// ctx.spawn(task); -// } -//} -// -//impl Handler for Master { -// type Result = MessageResult; -// -// fn handle(&mut self, m: RemoveCaptcha, ctx: &mut Self::Context) -> Self::Result { -// let (tx, rx) = channel(); -// if let Some((_, addr)) = self.rm_site(&m.0) { -// let fut = async move { -// //addr.send(Stop).await?; -// let res: CaptchaResult<()> = addr.send(Stop).await.map_err(|e| e.into()); -// let _ = tx.send(res); -// } -// .into_actor(self); -// ctx.spawn(fut); -// } else { -// tx.send(Ok(())).unwrap(); -// } -// MessageResult(rx) -// } -//} -// -//impl Handler for Master { -// type Result = MessageResult; -// -// fn handle(&mut self, m: AddSite, _ctx: &mut Self::Context) -> Self::Result { -// let (tx, rx) = channel(); -// let counter: Counter = m.mcaptcha.into(); -// let addr = counter.start(); -// self.add_site(addr, m.id); -// tx.send(Ok(())).unwrap(); -// MessageResult(rx) -// } -//} - -//impl Handler for Master { -// type Result = MessageResult; -// -// fn handle(&mut self, _m: GetInternalData, ctx: &mut Self::Context) -> Self::Result { -// let (tx, rx) = channel(); -// let mut data = HashMap::with_capacity(self.sites.len()); -// -// let sites = self.sites.clone(); -// let fut = async move { -// for (name, (_read_val, addr)) in sites.iter() { -// match addr.send(super::counter::GetInternalData).await { -// Ok(val) => { -// data.insert(name.to_owned(), val); -// } -// Err(_e) => { -// println!("Trying to get data {name}. Failed"); -// continue; -// // best-effort basis persistence -// // let err: CaptchaError = e.into(); -// // let _ = tx.send(Err(err)); -// // break; -// } -// } -// -// } -// tx.send(Ok(data)); -// } -// .into_actor(self); -// ctx.spawn(fut); -// -// MessageResult(rx) -// } -//} -// -//impl Handler for Master { -// type Result = MessageResult; -// -// fn handle(&mut self, mut m: SetInternalData, ctx: &mut Self::Context) -> Self::Result { -// let (tx, rx) = channel(); -// for (name, mcaptcha) in m.mcaptcha.drain() { -// let addr = self.get_site(&name); -// let master = ctx.address(); -// let fut = async move { -// match addr { -// None => { -// master.send(AddSite { id: name, mcaptcha }).await.unwrap(); -// } -// Some(addr) => { -// let _ = addr.send(super::counter::SetInternalData(mcaptcha)).await; -// // best effort basis -// //let err: CaptchaError = e.into(); -// //let _ = tx.send(Err(err)); -// } -// } -// } -// .into_actor(self); -// ctx.spawn(fut); -// } -// -// let _ = tx.send(Ok(())); -// MessageResult(rx) -// } -//} -// diff --git a/src/mcaptcha/mod.rs b/src/mcaptcha/mod.rs index a21f891..8ec1c74 100644 --- a/src/mcaptcha/mod.rs +++ b/src/mcaptcha/mod.rs @@ -1,2 +1,3 @@ +pub mod cache; mod defense; pub mod mcaptcha; diff --git a/src/store/mod.rs b/src/store/mod.rs index df4ee33..67d974d 100644 --- a/src/store/mod.rs +++ b/src/store/mod.rs @@ -94,8 +94,9 @@ pub struct DcacheStateMachine { pub last_membership: StoredMembership, /// Application data. - pub data: Arc>, - pub new_data: crate::mcaptcha::mcaptcha::Manager, + // pub data: Arc>, + pub counter: crate::mcaptcha::mcaptcha::Manager, + pub results: crate::mcaptcha::cache::HashCache, } #[derive(Serialize, Deserialize, Clone)] @@ -104,39 +105,34 @@ struct PersistableStateMachine { last_membership: StoredMembership, - /// Application data. - data: HashMap, + counter: crate::mcaptcha::mcaptcha::Manager, + results: crate::mcaptcha::cache::HashCache, } impl PersistableStateMachine { async fn from_statemachine(m: &DcacheStateMachine) -> Self { - let internal_data = m - .data - .master - .send(GetInternalData) - .await - .unwrap() - .await - .unwrap() - .unwrap(); + let counter = m.counter.clone(); + let results = m.results.clone(); Self { last_applied_log: m.last_applied_log, last_membership: m.last_membership.clone(), - data: internal_data, + counter, + results, } } async fn to_statemachine( self, - data: Arc>, + counter: crate::mcaptcha::mcaptcha::Manager, + results: crate::mcaptcha::cache::HashCache, ) -> DcacheStateMachine { - let new_data = crate::mcaptcha::mcaptcha::Manager::new(30); - new_data.set_internal_data(self.data.clone()); + self.counter.clean_all_after_cold_start(counter).await; + self.results.clean_all_after_cold_start(results).await; DcacheStateMachine { last_applied_log: self.last_applied_log, last_membership: self.last_membership, - data, - new_data, + results: self.results, + counter: self.counter, } } } @@ -163,8 +159,8 @@ impl DcacheStore { let state_machine = RwLock::new(DcacheStateMachine { last_applied_log: Default::default(), last_membership: Default::default(), - data: system::init_system(salt), - new_data: crate::mcaptcha::mcaptcha::Manager::new(30), + counter: crate::mcaptcha::mcaptcha::Manager::new(30), + results: crate::mcaptcha::cache::HashCache::default(), }); Self { @@ -390,56 +386,42 @@ impl RaftStorage for Arc { EntryPayload::Blank => res.push(DcacheResponse::Empty), EntryPayload::Normal(ref req) => match req { DcacheRequest::AddVisitor(msg) => { - let r = sm.new_data.add_visitor(msg); - + let r = sm.counter.add_visitor(msg); res.push(DcacheResponse::AddVisitorResult(r)); } DcacheRequest::AddCaptcha(msg) => { - sm.new_data + sm.counter .add_captcha(Arc::new((&msg.mcaptcha).into()), msg.id.clone()); res.push(DcacheResponse::Empty); } DcacheRequest::RenameCaptcha(msg) => { - sm.new_data.rename(&msg.name, msg.rename_to.clone()); + sm.counter.rename(&msg.name, msg.rename_to.clone()); res.push(DcacheResponse::Empty); } DcacheRequest::RemoveCaptcha(msg) => { - sm.new_data.rm_captcha(&msg.0); + sm.counter.rm_captcha(&msg.0); res.push(DcacheResponse::Empty); } // cache DcacheRequest::CachePoW(msg) => { - sm.data - .cache - .send(msg.clone()) - .await - .unwrap() - .await - .unwrap() - .unwrap(); + sm.results.cache_pow(msg.clone()); res.push(DcacheResponse::Empty); } DcacheRequest::DeletePoW(msg) => { - sm.data.cache.send(msg.clone()).await.unwrap().unwrap(); + sm.results.remove_pow_config(&msg.0); + // sm.data.cache.send(msg.clone()).await.unwrap().unwrap(); res.push(DcacheResponse::Empty); } DcacheRequest::CacheResult(msg) => { - sm.data - .cache - .send(msg.clone()) - .await - .unwrap() - .await - .unwrap() - .unwrap(); + sm.results.cache_result(msg.clone()); res.push(DcacheResponse::Empty); } DcacheRequest::DeleteCaptchaResult(msg) => { - sm.data.cache.send(msg.clone()).await.unwrap().unwrap(); + sm.results.remove_cache_result(&msg.token); res.push(DcacheResponse::Empty); } }, @@ -486,7 +468,7 @@ impl RaftStorage for Arc { })?; let mut state_machine = self.state_machine.write().await; let updated_state_machine = updated_state_machine - .to_statemachine(state_machine.data.clone()) + .to_statemachine(state_machine.counter.clone(), state_machine.results.clone()) .await; *state_machine = updated_state_machine; } @@ -521,3 +503,19 @@ impl RaftStorage for Arc { self.clone() } } + +#[cfg(test)] +mod tests { + use super::*; + + async fn provision_dcache_store() -> Arc { + Arc::new(DcacheStore::new( + "adsfasdfasdfadsfadfadfadfadsfasdfasdfasdfasdf".into(), + )) + } + + #[test] + pub fn test_dcche_store() { + openraft::testing::Suite::test_all(provision_dcache_store).unwrap() + } +}