diff --git a/Cargo.lock b/Cargo.lock index c244cd4..e0ed956 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -903,6 +903,7 @@ dependencies = [ "redis", "serde", "serde_json", + "tokio", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 7601447..6c6420b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,6 +29,8 @@ pow_sha256 = { version = "0.2.1", git = "https://github.com/mcaptcha/pow_sha256" #redis = { version = "0.20.1", features = ["tokio-comp","aio", "cluster"], optional=true } redis = { version = "0.17.0", features = ["tokio-comp","aio", "cluster"], optional=true } +tokio = { version = "0.2.25", features = ["sync"]} + [dev-dependencies] actix-rt = "1" diff --git a/src/cache/hashcache.rs b/src/cache/hashcache.rs index 1d857ee..9055d5c 100644 --- a/src/cache/hashcache.rs +++ b/src/cache/hashcache.rs @@ -19,6 +19,7 @@ use std::collections::HashMap; use actix::prelude::*; +use tokio::sync::oneshot; use super::messages::*; use super::Save; @@ -46,8 +47,11 @@ impl HashCache { } // retrive [PoWConfig] from cache. Deletes config post retrival - fn retrive_pow_config(&mut self, string: String) -> CaptchaResult> { - if let Some(difficulty_factor) = self.remove_pow_config(&string) { + fn retrive_pow_config( + &mut self, + msg: VerifyCaptchaResult, + ) -> CaptchaResult> { + if let Some(difficulty_factor) = self.remove_pow_config(&msg.token) { Ok(Some(difficulty_factor)) } else { Ok(None) @@ -110,7 +114,9 @@ impl Handler for HashCache { .into_actor(self); ctx.spawn(wait_for); - MessageResult(self.save_pow_config(msg)) + let (tx, rx) = oneshot::channel(); + tx.send(self.save_pow_config(msg)).unwrap(); + MessageResult(rx) } } @@ -127,7 +133,9 @@ impl Handler for HashCache { impl Handler for HashCache { type Result = MessageResult; fn handle(&mut self, msg: RetrivePoW, _ctx: &mut Self::Context) -> Self::Result { - MessageResult(self.retrive_pow_config(msg.0)) + let (tx, rx) = oneshot::channel(); + tx.send(self.retrive_pow_config(msg.0)).unwrap(); + MessageResult(rx) } } @@ -153,7 +161,9 @@ impl Handler for HashCache { .into_actor(self); ctx.spawn(wait_for); - MessageResult(self.save_captcha_result(msg)) + let (tx, rx) = oneshot::channel(); + tx.send(self.save_captcha_result(msg)).unwrap(); + MessageResult(rx) } } @@ -171,7 +181,9 @@ impl Handler for HashCache { type Result = MessageResult; fn handle(&mut self, msg: VerifyCaptchaResult, _ctx: &mut Self::Context) -> Self::Result { // MessageResult(self.retrive(msg.0)) - MessageResult(self.verify_captcha_result(msg)) + let (tx, rx) = oneshot::channel(); + tx.send(self.verify_captcha_result(msg)).unwrap(); + MessageResult(rx) } } @@ -215,10 +227,16 @@ mod tests { .build() .unwrap(); - addr.send(msg).await.unwrap().unwrap(); + addr.send(msg).await.unwrap().await.unwrap().unwrap(); + let msg = VerifyCaptchaResult { + token: string.clone(), + key: KEY.into(), + }; let cache_difficulty_factor = addr - .send(RetrivePoW(string.clone())) + .send(RetrivePoW(msg.clone())) + .await + .unwrap() .await .unwrap() .unwrap(); @@ -231,7 +249,13 @@ mod tests { //sleep(DURATION + DURATION).await; delay_for(duration + duration).await; - let expired_string = addr.send(RetrivePoW(string)).await.unwrap().unwrap(); + let expired_string = addr + .send(RetrivePoW(msg)) + .await + .unwrap() + .await + .unwrap() + .unwrap(); assert_eq!(None, expired_string); } @@ -256,22 +280,28 @@ mod tests { duration: DURATION, }; - addr.send(add_cache).await.unwrap().unwrap(); + addr.send(add_cache).await.unwrap().await.unwrap().unwrap(); let verify_msg = VerifyCaptchaResult { key: KEY.into(), token: RES.into(), }; - assert!(addr.send(verify_msg.clone()).await.unwrap().unwrap()); + assert!(addr + .send(verify_msg.clone()) + .await + .unwrap() + .await + .unwrap() + .unwrap()); // duplicate - assert!(!addr.send(verify_msg).await.unwrap().unwrap()); + assert!(!addr.send(verify_msg).await.unwrap().await.unwrap().unwrap()); let verify_msg = VerifyCaptchaResult { key: "cz".into(), token: RES.into(), }; - assert!(!addr.send(verify_msg).await.unwrap().unwrap()); + assert!(!addr.send(verify_msg).await.unwrap().await.unwrap().unwrap()); let duration: Duration = Duration::new(5, 0); delay_for(duration + duration).await; @@ -280,6 +310,6 @@ mod tests { key: KEY.into(), token: RES.into(), }; - assert!(!addr.send(verify_msg).await.unwrap().unwrap()); + assert!(!addr.send(verify_msg).await.unwrap().await.unwrap().unwrap()); } } diff --git a/src/cache/mod.rs b/src/cache/mod.rs index 92612cf..64fdff3 100644 --- a/src/cache/mod.rs +++ b/src/cache/mod.rs @@ -21,8 +21,8 @@ use serde::{Deserialize, Serialize}; #[cfg(feature = "full")] pub mod hashcache; -//#[cfg(feature = "full")] -//pub mod redis; +#[cfg(feature = "full")] +pub mod redis; #[derive(Serialize, Deserialize)] pub struct AddChallenge { @@ -50,12 +50,13 @@ pub mod messages { use actix::dev::*; use derive_builder::Builder; use serde::{Deserialize, Serialize}; + use tokio::sync::oneshot::Receiver; use crate::errors::*; /// Message to cache PoW difficulty factor and string #[derive(Message, Serialize, Deserialize, Builder, Clone)] - #[rtype(result = "CaptchaResult<()>")] + #[rtype(result = "Receiver>")] pub struct CachePoW { /// challenge string pub string: String, @@ -70,8 +71,8 @@ pub mod messages { /// Message to retrive the the difficulty factor for the specified /// string from the cache #[derive(Message)] - #[rtype(result = "CaptchaResult>")] - pub struct RetrivePoW(pub String); + #[rtype(result = "Receiver>>")] + pub struct RetrivePoW(pub VerifyCaptchaResult); #[derive(Clone, PartialEq, Debug, Default, Deserialize, Serialize)] pub struct CachedPoWConfig { @@ -90,7 +91,7 @@ pub mod messages { /// Message to cache captcha result and the captcha key for which /// it was generated #[derive(Message, Serialize, Deserialize, Builder)] - #[rtype(result = "CaptchaResult<()>")] + #[rtype(result = "Receiver>")] pub struct CacheResult { pub token: String, /// key is mCaptcha identifier @@ -114,7 +115,7 @@ pub mod messages { /// Message to verify captcha result against /// the stored captcha key #[derive(Message, Clone, Deserialize, Serialize)] - #[rtype(result = "CaptchaResult")] + #[rtype(result = "Receiver>")] pub struct VerifyCaptchaResult { pub token: String, pub key: String, diff --git a/src/cache/redis.rs b/src/cache/redis.rs index 5273550..7d40d0d 100644 --- a/src/cache/redis.rs +++ b/src/cache/redis.rs @@ -16,192 +16,140 @@ * along with this program. If not, see . */ //! Cache implementation that uses Redis -use crate::redis::mcaptcha_redis::MCaptchaRedis; -use crate::redis::RedisConfig; -use std::collections::HashMap; - use actix::prelude::*; +use tokio::sync::oneshot; use super::messages::*; use super::AddChallenge; use super::Save; use crate::errors::*; +use crate::redis::mcaptcha_redis::MCaptchaRedis; +use crate::redis::RedisConfig; pub struct RedisCache(MCaptchaRedis); -#[derive(Clone, Default)] -/// cache datastructure implementing [Save] -pub struct HashCache { - difficulty_map: HashMap, - result_map: HashMap, -} - impl RedisCache { - /// Get new [MCaptchaRedis]. Use this when executing commands that are - /// only supported by mCaptcha Redis module. Internally, when object - /// is created, checks are performed to check if the module is loaded and if - /// the required commands are available pub async fn new(redis: RedisConfig) -> CaptchaResult { - let m = MCaptchaRedis::new(redis).await?; - Ok(Self(m)) - } - - // save [PoWConfig] to cache - async fn save_pow_config(&mut self, config: CachePoW) -> CaptchaResult<()> { - let challenge = config.string; - let payload: AddChallenge = AddChallenge { - challenge: config.string, - difficulty: config.difficulty_factor as usize, - duration: config.duration, - }; - - let payload = serde_json::to_string(&payload).unwrap(); - - // {MCAPTCHA_NAME}:difficulty_map:challenge (difficulty_factor -> duration) EX duration - - // TODO use hashmap - - self.difficulty_map.insert(challenge, config); - Ok(()) - } - - // retrive [PoWConfig] from cache. Deletes config post retrival - fn retrive_pow_config(&mut self, string: String) -> CaptchaResult> { - if let Some(difficulty_factor) = self.remove_pow_config(&string) { - Ok(Some(difficulty_factor.to_owned())) - } else { - Ok(None) - } - } - - // delete [PoWConfig] from cache - fn remove_pow_config(&mut self, string: &str) -> Option { - self.difficulty_map.remove(string) - } - - // save captcha result - async fn save_captcha_result(&mut self, res: CacheResult) -> CaptchaResult<()> { - self.result_map.insert(res.token, res.key); - - // {MCAPTCHA_NAME}:result_map:token 0 EX duration - Ok(()) - } - - // verify captcha result - fn verify_captcha_result(&mut self, challenge: VerifyCaptchaResult) -> CaptchaResult { - if let Some(captcha_id) = self.remove_cache_result(&challenge.token) { - if captcha_id == challenge.key { - return Ok(true); - } else { - return Ok(false); - } - } else { - Ok(false) - } - } - - // delete cache result - fn remove_cache_result(&mut self, string: &str) -> Option { - self.result_map.remove(string) + let redis = MCaptchaRedis::new(redis).await?; + let master = Self(redis); + Ok(master) } } -impl Save for HashCache {} +impl Save for RedisCache {} -impl Actor for HashCache { +impl Actor for RedisCache { type Context = Context; } /// cache a PoWConfig -impl Handler for HashCache { +impl Handler for RedisCache { type Result = MessageResult; fn handle(&mut self, msg: CachePoW, ctx: &mut Self::Context) -> Self::Result { - //use actix::clock::sleep; - use actix::clock::delay_for; - use std::time::Duration; + let (tx, rx) = oneshot::channel(); - let addr = ctx.address(); - let del_msg = DeletePoW(msg.string.clone()); + let con = self.0.get_client(); + let fut = async move { + let payload: AddChallenge = AddChallenge { + challenge: msg.string, + difficulty: msg.difficulty_factor, + duration: msg.duration, + }; - let duration: Duration = Duration::new(msg.duration.clone(), 0); - let wait_for = async move { - //sleep(duration).await; - delay_for(duration).await; - addr.send(del_msg).await.unwrap().unwrap(); + let res = con.add_challenge(&msg.key, &payload).await; + tx.send(res).unwrap(); } .into_actor(self); - ctx.spawn(wait_for); + ctx.wait(fut); - MessageResult(self.save_pow_config(msg)) - } -} - -/// Delte a PoWConfig -impl Handler for HashCache { - type Result = MessageResult; - fn handle(&mut self, msg: DeletePoW, _ctx: &mut Self::Context) -> Self::Result { - self.remove_pow_config(&msg.0); - MessageResult(Ok(())) + MessageResult(rx) } } /// Retrive PoW difficulty_factor for a PoW string -impl Handler for HashCache { +impl Handler for RedisCache { type Result = MessageResult; - fn handle(&mut self, msg: RetrivePoW, _ctx: &mut Self::Context) -> Self::Result { - MessageResult(self.retrive_pow_config(msg.0)) + fn handle(&mut self, msg: RetrivePoW, ctx: &mut Self::Context) -> Self::Result { + let (tx, rx) = oneshot::channel(); + let con = self.0.get_client(); + + let fut = async move { + let r = match con.get_challenge(&msg.0).await { + Err(e) => Err(e), + Ok(val) => { + let res = CachedPoWConfig { + duration: val.duration, + difficulty_factor: val.difficulty_factor, + key: msg.0.key, + }; + Ok(Some(res)) + } + }; + + tx.send(r).unwrap(); + } + .into_actor(self); + ctx.wait(fut); + MessageResult(rx) } } /// cache PoW result -impl Handler for HashCache { +impl Handler for RedisCache { type Result = MessageResult; fn handle(&mut self, msg: CacheResult, ctx: &mut Self::Context) -> Self::Result { - //use actix::clock::sleep; - use actix::clock::delay_for; - use std::time::Duration; + let (tx, rx) = oneshot::channel(); - let addr = ctx.address(); - let del_msg = DeleteCaptchaResult { - token: msg.token.clone(), - }; - - let duration: Duration = Duration::new(msg.duration.clone(), 0); - let wait_for = async move { - //sleep(duration).await; - delay_for(duration).await; - addr.send(del_msg).await.unwrap().unwrap(); + let con = self.0.get_client(); + let fut = async move { + let r = con.add_token(&msg).await; + tx.send(r).unwrap(); } .into_actor(self); - ctx.spawn(wait_for); - - MessageResult(self.save_captcha_result(msg)) - } -} - -/// Delte a PoWConfig -impl Handler for HashCache { - type Result = MessageResult; - fn handle(&mut self, msg: DeleteCaptchaResult, _ctx: &mut Self::Context) -> Self::Result { - self.remove_cache_result(&msg.token); - MessageResult(Ok(())) + ctx.wait(fut); + MessageResult(rx) } } /// Retrive PoW difficulty_factor for a PoW string -impl Handler for HashCache { +impl Handler for RedisCache { type Result = MessageResult; - fn handle(&mut self, msg: VerifyCaptchaResult, _ctx: &mut Self::Context) -> Self::Result { - // MessageResult(self.retrive(msg.0)) - MessageResult(self.verify_captcha_result(msg)) + fn handle(&mut self, msg: VerifyCaptchaResult, ctx: &mut Self::Context) -> Self::Result { + let (tx, rx) = oneshot::channel(); + + let con = self.0.get_client(); + let fut = async move { + let r = con.get_token(&msg).await; + tx.send(r).unwrap(); + } + .into_actor(self); + ctx.wait(fut); + + MessageResult(rx) } } +/// Delte a PoWConfig +impl Handler for RedisCache { + type Result = MessageResult; + fn handle(&mut self, _msg: DeleteCaptchaResult, _ctx: &mut Self::Context) -> Self::Result { + MessageResult(Ok(())) + } +} + +/// Delte a PoWConfig +impl Handler for RedisCache { + type Result = MessageResult; + fn handle(&mut self, _msg: DeletePoW, _ctx: &mut Self::Context) -> Self::Result { + //self.remove_pow_config(&msg.0); + MessageResult(Ok(())) + } +} #[cfg(test)] mod tests { use super::*; - use crate::master::AddVisitorResult; - use crate::pow::PoWConfig; + //use crate::master::AddVisitorResult; + //use crate::pow::PoWConfig; // async fn sleep(time: u64) { // //use actix::clock::sleep; @@ -213,34 +161,47 @@ mod tests { // delay_for(duration).await; // } + const REDIS_URL: &str = "redis://127.0.1.1/"; + #[actix_rt::test] - async fn hashcache_pow_cache_works() { + async fn rediscache_pow_cache_works() { use actix::clock::delay_for; use actix::clock::Duration; const DIFFICULTY_FACTOR: u32 = 54; const DURATION: u64 = 5; const KEY: &str = "mcaptchakey"; - let addr = HashCache::default().start(); - let pow: PoWConfig = PoWConfig::new(DIFFICULTY_FACTOR, KEY.into()); //salt is dummy here - let visitor_result = AddVisitorResult { - difficulty_factor: DIFFICULTY_FACTOR, - duration: DURATION, - }; - let string = pow.string.clone(); + const CHALLENGE: &str = "redischallenge1"; + + let addr = RedisCache::new(RedisConfig::Single(REDIS_URL.into())) + .await + .unwrap() + .start(); let msg = CachePoWBuilder::default() - .string(pow.string.clone()) + .string(CHALLENGE.into()) .difficulty_factor(DIFFICULTY_FACTOR) - .duration(visitor_result.duration) + .duration(DURATION) .key(KEY.into()) .build() .unwrap(); - addr.send(msg).await.unwrap().unwrap(); + addr.send(msg.clone()) + .await + .unwrap() + .await + .unwrap() + .unwrap(); + + let msg = VerifyCaptchaResult { + token: CHALLENGE.into(), + key: KEY.into(), + }; let cache_difficulty_factor = addr - .send(RetrivePoW(string.clone())) + .send(RetrivePoW(msg.clone())) + .await + .unwrap() .await .unwrap() .unwrap(); @@ -253,19 +214,23 @@ mod tests { //sleep(DURATION + DURATION).await; delay_for(duration + duration).await; - let expired_string = addr.send(RetrivePoW(string)).await.unwrap().unwrap(); - assert_eq!(None, expired_string); + let expired_string = addr.send(RetrivePoW(msg)).await.unwrap().await.unwrap(); + assert!(expired_string.is_err()); } #[actix_rt::test] - async fn hashcache_result_cache_works() { + async fn redishashcache_result_cache_works() { use actix::clock::delay_for; use actix::clock::Duration; const DURATION: u64 = 5; const KEY: &str = "a"; const RES: &str = "b"; - let addr = HashCache::default().start(); + let addr = RedisCache::new(RedisConfig::Single(REDIS_URL.into())) + .await + .unwrap() + .start(); + // send value to cache // send another value to cache for auto delete // verify_captcha_result @@ -278,22 +243,28 @@ mod tests { duration: DURATION, }; - addr.send(add_cache).await.unwrap().unwrap(); + addr.send(add_cache).await.unwrap().await.unwrap().unwrap(); let verify_msg = VerifyCaptchaResult { key: KEY.into(), token: RES.into(), }; - assert!(addr.send(verify_msg.clone()).await.unwrap().unwrap()); + assert!(addr + .send(verify_msg.clone()) + .await + .unwrap() + .await + .unwrap() + .unwrap()); // duplicate - assert!(!addr.send(verify_msg).await.unwrap().unwrap()); + assert!(!addr.send(verify_msg).await.unwrap().await.unwrap().unwrap()); let verify_msg = VerifyCaptchaResult { key: "cz".into(), token: RES.into(), }; - assert!(!addr.send(verify_msg).await.unwrap().unwrap()); + assert!(!addr.send(verify_msg).await.unwrap().await.unwrap().unwrap()); let duration: Duration = Duration::new(5, 0); delay_for(duration + duration).await; @@ -302,6 +273,6 @@ mod tests { key: KEY.into(), token: RES.into(), }; - assert!(!addr.send(verify_msg).await.unwrap().unwrap()); + assert!(!addr.send(verify_msg).await.unwrap().await.unwrap().unwrap()); } } diff --git a/src/redis/mcaptcha_redis.rs b/src/redis/mcaptcha_redis.rs index 142fd67..5794c38 100644 --- a/src/redis/mcaptcha_redis.rs +++ b/src/redis/mcaptcha_redis.rs @@ -180,8 +180,7 @@ impl MCaptchaRedisConnection { /// Delete PoW Challenge object from Redis pub async fn delete_challenge(&self, msg: &VerifyCaptchaResult) -> CaptchaResult<()> { - let _: () = self - .0 + self.0 .exec(redis::cmd(DELETE_CHALLENGE).arg(&[&msg.key, &msg.token])) .await?; Ok(()) diff --git a/src/system.rs b/src/system.rs index 720e21c..383f640 100644 --- a/src/system.rs +++ b/src/system.rs @@ -66,7 +66,13 @@ where .build() .unwrap(); - self.cache.send(cache_msg).await.unwrap().unwrap(); + self.cache + .send(cache_msg) + .await + .unwrap() + .await + .unwrap() + .unwrap(); Some(pow_config) } _ => None, @@ -76,9 +82,13 @@ where /// utility function to verify [Work] pub async fn verify_pow(&self, work: Work) -> CaptchaResult { let string = work.string.clone(); - let msg = RetrivePoW(string.clone()); + let msg = VerifyCaptchaResult { + token: string.clone(), + key: work.key.clone(), + }; + let msg = RetrivePoW(msg); - let cached_config = self.cache.send(msg).await.unwrap()?; + let cached_config = self.cache.send(msg).await.unwrap().await.unwrap()?; if cached_config.is_none() { return Err(CaptchaError::StringNotFound); @@ -105,7 +115,7 @@ where let msg: CacheResult = cached_config.into(); let res = msg.token.clone(); - self.cache.send(msg).await.unwrap()?; + self.cache.send(msg).await.unwrap().await.unwrap()?; Ok(res) } @@ -114,7 +124,7 @@ where &self, msg: VerifyCaptchaResult, ) -> CaptchaResult { - self.cache.send(msg).await.unwrap() + self.cache.send(msg).await.unwrap().await.unwrap() } }