std::mpsc replaced with oneshot and custom builder for system
This commit is contained in:
parent
2ff85dfdbc
commit
1c6e9dd01d
8 changed files with 64 additions and 20 deletions
|
@ -34,8 +34,7 @@ async fn main() -> std::io::Result<()> {
|
||||||
.master(master)
|
.master(master)
|
||||||
.cache(cache)
|
.cache(cache)
|
||||||
.pow(pow.clone())
|
.pow(pow.clone())
|
||||||
.build()
|
.build();
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
// configure defense. This is a per site configuration. A site can have several levels
|
// configure defense. This is a per site configuration. A site can have several levels
|
||||||
// of defenses configured
|
// of defenses configured
|
||||||
|
|
|
@ -82,8 +82,7 @@
|
||||||
//! .master(master)
|
//! .master(master)
|
||||||
//! .cache(cache)
|
//! .cache(cache)
|
||||||
//! .pow(pow.clone())
|
//! .pow(pow.clone())
|
||||||
//! .build()
|
//! .build();
|
||||||
//! .unwrap();
|
|
||||||
//!
|
//!
|
||||||
//! // configure defense. This is a per site configuration. A site can have several levels
|
//! // configure defense. This is a per site configuration. A site can have several levels
|
||||||
//! // of defenses configured
|
//! // of defenses configured
|
||||||
|
@ -188,7 +187,7 @@ pub mod errors;
|
||||||
pub mod master;
|
pub mod master;
|
||||||
|
|
||||||
#[cfg(feature = "full")]
|
#[cfg(feature = "full")]
|
||||||
mod redis;
|
pub mod redis;
|
||||||
|
|
||||||
/// message datatypes to interact with [MCaptcha] actor
|
/// message datatypes to interact with [MCaptcha] actor
|
||||||
pub mod cache;
|
pub mod cache;
|
||||||
|
|
|
@ -17,13 +17,13 @@
|
||||||
*/
|
*/
|
||||||
//! Embedded [Master] actor module that manages [Counter] actors
|
//! Embedded [Master] actor module that manages [Counter] actors
|
||||||
use std::collections::BTreeMap;
|
use std::collections::BTreeMap;
|
||||||
use std::sync::mpsc::channel;
|
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
//use actix::clock::sleep;
|
//use actix::clock::sleep;
|
||||||
use actix::clock::delay_for;
|
use actix::clock::delay_for;
|
||||||
use actix::dev::*;
|
use actix::dev::*;
|
||||||
use log::info;
|
use log::info;
|
||||||
|
use tokio::sync::oneshot::channel;
|
||||||
|
|
||||||
use super::counter::Counter;
|
use super::counter::Counter;
|
||||||
use crate::errors::*;
|
use crate::errors::*;
|
||||||
|
@ -181,12 +181,15 @@ impl Handler<RemoveSite> for Master {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Handler<AddSite> for Master {
|
impl Handler<AddSite> for Master {
|
||||||
type Result = ();
|
type Result = MessageResult<AddSite>;
|
||||||
|
|
||||||
fn handle(&mut self, m: AddSite, _ctx: &mut Self::Context) -> Self::Result {
|
fn handle(&mut self, m: AddSite, _ctx: &mut Self::Context) -> Self::Result {
|
||||||
|
let (tx, rx) = channel();
|
||||||
let counter: Counter = m.mcaptcha.into();
|
let counter: Counter = m.mcaptcha.into();
|
||||||
let addr = counter.start();
|
let addr = counter.start();
|
||||||
self.add_site(addr, m.id);
|
self.add_site(addr, m.id);
|
||||||
|
tx.send(Ok(())).unwrap();
|
||||||
|
MessageResult(rx)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -62,10 +62,11 @@ impl AddVisitorResult {
|
||||||
#[cfg(feature = "full")]
|
#[cfg(feature = "full")]
|
||||||
pub mod messages {
|
pub mod messages {
|
||||||
//! Messages that a [super::Master] should respond to
|
//! Messages that a [super::Master] should respond to
|
||||||
use std::sync::mpsc::Receiver;
|
// use std::sync::mpsc::Receiver;
|
||||||
|
|
||||||
use actix::dev::*;
|
use actix::dev::*;
|
||||||
use derive_builder::Builder;
|
use derive_builder::Builder;
|
||||||
|
use tokio::sync::oneshot::Receiver;
|
||||||
|
|
||||||
use crate::errors::CaptchaResult;
|
use crate::errors::CaptchaResult;
|
||||||
use crate::mcaptcha::MCaptcha;
|
use crate::mcaptcha::MCaptcha;
|
||||||
|
@ -77,7 +78,7 @@ pub mod messages {
|
||||||
|
|
||||||
/// Message to add an [Counter] actor to [Master]
|
/// Message to add an [Counter] actor to [Master]
|
||||||
#[derive(Message, Builder)]
|
#[derive(Message, Builder)]
|
||||||
#[rtype(result = "()")]
|
#[rtype(result = "Receiver<CaptchaResult<()>>")]
|
||||||
pub struct AddSite {
|
pub struct AddSite {
|
||||||
pub id: String,
|
pub id: String,
|
||||||
pub mcaptcha: MCaptcha,
|
pub mcaptcha: MCaptcha,
|
||||||
|
|
|
@ -15,9 +15,8 @@
|
||||||
* You should have received a copy of the GNU Affero General Public License
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
use std::sync::mpsc;
|
|
||||||
|
|
||||||
use actix::dev::*;
|
use actix::dev::*;
|
||||||
|
use tokio::sync::oneshot;
|
||||||
|
|
||||||
use crate::errors::*;
|
use crate::errors::*;
|
||||||
use crate::master::messages::{AddSite, AddVisitor};
|
use crate::master::messages::{AddSite, AddVisitor};
|
||||||
|
@ -25,6 +24,7 @@ use crate::master::Master as MasterTrait;
|
||||||
use crate::redis::mcaptcha_redis::MCaptchaRedis;
|
use crate::redis::mcaptcha_redis::MCaptchaRedis;
|
||||||
use crate::redis::RedisConfig;
|
use crate::redis::RedisConfig;
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
pub struct Master {
|
pub struct Master {
|
||||||
pub redis: MCaptchaRedis,
|
pub redis: MCaptchaRedis,
|
||||||
}
|
}
|
||||||
|
@ -47,7 +47,7 @@ impl Handler<AddVisitor> for Master {
|
||||||
type Result = MessageResult<AddVisitor>;
|
type Result = MessageResult<AddVisitor>;
|
||||||
|
|
||||||
fn handle(&mut self, m: AddVisitor, ctx: &mut Self::Context) -> Self::Result {
|
fn handle(&mut self, m: AddVisitor, ctx: &mut Self::Context) -> Self::Result {
|
||||||
let (tx, rx) = mpsc::channel();
|
let (tx, rx) = oneshot::channel();
|
||||||
|
|
||||||
let con = self.redis.get_client();
|
let con = self.redis.get_client();
|
||||||
let fut = async move {
|
let fut = async move {
|
||||||
|
@ -61,17 +61,18 @@ impl Handler<AddVisitor> for Master {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Handler<AddSite> for Master {
|
impl Handler<AddSite> for Master {
|
||||||
type Result = ();
|
type Result = MessageResult<AddSite>;
|
||||||
|
|
||||||
fn handle(&mut self, m: AddSite, ctx: &mut Self::Context) -> Self::Result {
|
fn handle(&mut self, m: AddSite, ctx: &mut Self::Context) -> Self::Result {
|
||||||
//let (tx, rx) = mpsc::channel();
|
let (tx, rx) = oneshot::channel();
|
||||||
let con = self.redis.get_client();
|
let con = self.redis.get_client();
|
||||||
let fut = async move {
|
let fut = async move {
|
||||||
let _res = con.add_mcaptcha(m).await;
|
let res = con.add_mcaptcha(m).await;
|
||||||
//tx.send(res).unwrap();
|
let _ = tx.send(res);
|
||||||
}
|
}
|
||||||
.into_actor(self);
|
.into_actor(self);
|
||||||
ctx.wait(fut);
|
ctx.wait(fut);
|
||||||
|
MessageResult(rx)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -29,6 +29,7 @@ use crate::redis::RedisConfig;
|
||||||
use crate::redis::RedisConnection;
|
use crate::redis::RedisConnection;
|
||||||
|
|
||||||
/// Redis instance with mCaptcha Redis module loaded
|
/// Redis instance with mCaptcha Redis module loaded
|
||||||
|
#[derive(Clone)]
|
||||||
pub struct MCaptchaRedis(Redis);
|
pub struct MCaptchaRedis(Redis);
|
||||||
|
|
||||||
/// Connection to Redis instance with mCaptcha Redis module loaded
|
/// Connection to Redis instance with mCaptcha Redis module loaded
|
||||||
|
|
|
@ -53,6 +53,7 @@ impl RedisConfig {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Redis connection - manages both single and clustered deployments
|
/// Redis connection - manages both single and clustered deployments
|
||||||
|
#[derive(Clone)]
|
||||||
pub enum RedisConnection {
|
pub enum RedisConnection {
|
||||||
Single(Rc<RefCell<Connection>>),
|
Single(Rc<RefCell<Connection>>),
|
||||||
Cluster(Rc<RefCell<ClusterConnection>>),
|
Cluster(Rc<RefCell<ClusterConnection>>),
|
||||||
|
@ -86,6 +87,7 @@ pub enum RedisClient {
|
||||||
|
|
||||||
/// A Redis Client Object that encapsulates [RedisClient] and [RedisConnection].
|
/// A Redis Client Object that encapsulates [RedisClient] and [RedisConnection].
|
||||||
/// Use this when you need a Redis Client
|
/// Use this when you need a Redis Client
|
||||||
|
#[derive(Clone)]
|
||||||
pub struct Redis {
|
pub struct Redis {
|
||||||
_client: RedisClient,
|
_client: RedisClient,
|
||||||
connection: RedisConnection,
|
connection: RedisConnection,
|
||||||
|
|
|
@ -17,7 +17,6 @@
|
||||||
*/
|
*/
|
||||||
//! module describing mCaptcha system
|
//! module describing mCaptcha system
|
||||||
use actix::dev::*;
|
use actix::dev::*;
|
||||||
use derive_builder::Builder;
|
|
||||||
use pow_sha256::Config;
|
use pow_sha256::Config;
|
||||||
|
|
||||||
use crate::cache::messages::*;
|
use crate::cache::messages::*;
|
||||||
|
@ -27,8 +26,48 @@ use crate::master::messages::*;
|
||||||
use crate::master::Master;
|
use crate::master::Master;
|
||||||
use crate::pow::*;
|
use crate::pow::*;
|
||||||
|
|
||||||
|
pub struct SystemBuilder<T: Save, X: Master> {
|
||||||
|
pub master: Option<Addr<X>>,
|
||||||
|
cache: Option<Addr<T>>,
|
||||||
|
pow: Option<Config>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: Master, S: Save> Default for SystemBuilder<S, T> {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self {
|
||||||
|
pow: None,
|
||||||
|
cache: None,
|
||||||
|
master: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: Master, S: Save> SystemBuilder<S, T> {
|
||||||
|
pub fn master(mut self, m: Addr<T>) -> Self {
|
||||||
|
self.master = Some(m);
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn cache(mut self, c: Addr<S>) -> Self {
|
||||||
|
self.cache = Some(c);
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn pow(mut self, p: Config) -> Self {
|
||||||
|
self.pow = Some(p);
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn build(self) -> System<S, T> {
|
||||||
|
System {
|
||||||
|
master: self.master.unwrap(),
|
||||||
|
pow: self.pow.unwrap(),
|
||||||
|
cache: self.cache.unwrap(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// struct describing various bits of data required for an mCaptcha system
|
/// struct describing various bits of data required for an mCaptcha system
|
||||||
#[derive(Clone, Builder)]
|
|
||||||
pub struct System<T: Save, X: Master> {
|
pub struct System<T: Save, X: Master> {
|
||||||
pub master: Addr<X>,
|
pub master: Addr<X>,
|
||||||
cache: Addr<T>,
|
cache: Addr<T>,
|
||||||
|
@ -52,7 +91,7 @@ where
|
||||||
.send(AddVisitor(id.clone()))
|
.send(AddVisitor(id.clone()))
|
||||||
.await
|
.await
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.recv()
|
.await
|
||||||
.unwrap()
|
.unwrap()
|
||||||
{
|
{
|
||||||
Ok(Some(mcaptcha)) => {
|
Ok(Some(mcaptcha)) => {
|
||||||
|
@ -160,7 +199,6 @@ mod tests {
|
||||||
.cache(cache)
|
.cache(cache)
|
||||||
.pow(pow)
|
.pow(pow)
|
||||||
.build()
|
.build()
|
||||||
.unwrap()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_config() -> Config {
|
fn get_config() -> Config {
|
||||||
|
|
Loading…
Reference in a new issue