diff --git a/Cargo.lock b/Cargo.lock index 9ab6877..b445178 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,30 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "actix" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f728064aca1c318585bf4bb04ffcfac9e75e508ab4e8b1bd9ba5dfe04e2cbed5" +dependencies = [ + "actix-rt", + "actix_derive", + "bitflags", + "bytes", + "crossbeam-channel", + "futures-core", + "futures-sink", + "futures-task", + "futures-util", + "log", + "once_cell", + "parking_lot 0.12.0", + "pin-project-lite", + "smallvec", + "tokio", + "tokio-util 0.7.1", +] + [[package]] name = "actix-codec" version = "0.5.0" @@ -231,6 +255,17 @@ dependencies = [ "syn", ] +[[package]] +name = "actix_derive" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d44b8fee1ced9671ba043476deddef739dd0959bf77030b26b738cc591737a7" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "adler" version = "1.0.2" @@ -492,7 +527,7 @@ version = "0.2.0" source = "git+https://github.com/realaravinth/cache-buster#7ca4545722fb99be30698a5e72c7d982a70fa11f" dependencies = [ "data-encoding", - "derive_builder", + "derive_builder 0.11.2", "mime", "mime_guess", "serde", @@ -688,6 +723,16 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crossbeam-channel" +version = "0.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cf2b3e8478797446514c91ef04bafcb59faba183e621ad488df88983cc14128c" +dependencies = [ + "cfg-if", + "crossbeam-utils", +] + [[package]] name = "crossbeam-queue" version = "0.3.5" @@ -796,7 +841,16 @@ version = "0.11.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d07adf7be193b71cc36b193d0f5fe60b918a3a9db4dad0449f57bcfd519704a3" dependencies = [ - "derive_builder_macro", + "derive_builder_macro 0.11.2", +] + +[[package]] +name = "derive_builder" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8d67778784b508018359cbc8696edb3db78160bab2c2a28ba7f56ef6932997f8" +dependencies = [ + "derive_builder_macro 0.12.0", ] [[package]] @@ -811,13 +865,35 @@ dependencies = [ "syn", ] +[[package]] +name = "derive_builder_core" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c11bdc11a0c47bc7d37d582b5285da6849c96681023680b906673c5707af7b0f" +dependencies = [ + "darling", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "derive_builder_macro" version = "0.11.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f0314b72bed045f3a68671b3c86328386762c93f82d98c65c3cb5e5f573dd68" dependencies = [ - "derive_builder_core", + "derive_builder_core 0.11.2", + "syn", +] + +[[package]] +name = "derive_builder_macro" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ebcda35c7a396850a55ffeac740804b40ffec779b98fffbb1738f4033f0ee79e" +dependencies = [ + "derive_builder_core 0.12.0", "syn", ] @@ -2829,6 +2905,7 @@ dependencies = [ name = "starchart" version = "0.1.0" dependencies = [ + "actix", "actix-files", "actix-identity", "actix-rt", @@ -2841,6 +2918,7 @@ dependencies = [ "config", "db-core", "db-sqlx-sqlite", + "derive_builder 0.12.0", "derive_more", "federate-core", "forge-core", diff --git a/Cargo.toml b/Cargo.toml index 34dc55c..169f25a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,6 +43,8 @@ rust-embed = "6.3.0" urlencoding = "2.1.0" clap = { version = "4.0.32", features = ["derive"] } api_routes = { path ="./api_routes/"} +actix = "0.13.0" +derive_builder = "0.12.0" [dependencies.cache-buster] git = "https://github.com/realaravinth/cache-buster" diff --git a/src/counter.rs b/src/counter.rs new file mode 100644 index 0000000..9952a53 --- /dev/null +++ b/src/counter.rs @@ -0,0 +1,209 @@ +/* + * counter - A proof of work based DoS protection system + * Copyright © 2021 Aravinth Manivannan + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * al + */ +use std::time::Duration; + +use actix::clock::sleep; +use actix::dev::*; +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Serialize, Deserialize, Debug)] +pub struct Count { + pub search_threshold: u32, + pub duration: u64, +} + +impl Count { + /// increments the search count by one + #[inline] + pub fn add_search(&mut self) { + self.search_threshold += 1; + } + + /// decrements the search count by specified count + #[inline] + pub fn decrement_search_by(&mut self, count: u32) { + if self.search_threshold > 0 { + if self.search_threshold >= count { + self.search_threshold -= count; + } else { + self.search_threshold = 0; + } + } + } + + /// get [Counter]'s current search_threshold + #[inline] + pub fn get_searches(&self) -> u32 { + self.search_threshold + } +} + +/// This struct represents the counter state and is used +/// to configure leaky-bucket lifetime and manage defense +#[derive(Clone, Serialize, Deserialize, Debug)] +pub struct Counter(Count); + +impl From for Counter { + fn from(c: Count) -> Counter { + Counter(c) + } +} +impl Actor for Counter { + type Context = Context; +} + +/// Message to decrement the search count +#[derive(Message)] +#[rtype(result = "()")] +struct DeleteSearch; + +impl Handler for Counter { + type Result = (); + fn handle(&mut self, _msg: DeleteSearch, _ctx: &mut Self::Context) -> Self::Result { + self.0.decrement_search_by(1); + } +} + +/// Message to increment the search count +/// returns difficulty factor and lifetime +#[derive(Message)] +#[rtype(result = "u32")] +pub struct AddSearch; + +impl Handler for Counter { + type Result = MessageResult; + + fn handle(&mut self, _: AddSearch, ctx: &mut Self::Context) -> Self::Result { + self.0.add_search(); + let addr = ctx.address(); + + let duration: Duration = Duration::new(self.0.duration, 0); + let wait_for = async move { + sleep(duration).await; + //delay_for(duration).await; + addr.send(DeleteSearch).await.unwrap(); + } + .into_actor(self); + ctx.spawn(wait_for); + + MessageResult(self.0.get_searches()) + } +} + +/// Message to get the search count +#[derive(Message)] +#[rtype(result = "u32")] +pub struct GetCurrentSearchCount; + +impl Handler for Counter { + type Result = MessageResult; + + fn handle(&mut self, _: GetCurrentSearchCount, _ctx: &mut Self::Context) -> Self::Result { + MessageResult(self.0.get_searches()) + } +} + +/// Message to stop [Counter] +#[derive(Message)] +#[rtype(result = "()")] +pub struct Stop; + +impl Handler for Counter { + type Result = (); + + fn handle(&mut self, _: Stop, ctx: &mut Self::Context) -> Self::Result { + ctx.stop() + } +} + +#[cfg(test)] +pub mod tests { + use super::*; + + // constants for testing + // (search count, level) + pub const LEVEL_1: (u32, u32) = (50, 50); + pub const LEVEL_2: (u32, u32) = (500, 500); + pub const DURATION: u64 = 5; + + type MyActor = Addr; + + async fn race(addr: Addr, count: (u32, u32)) { + for _ in 0..count.0 as usize - 1 { + let _ = addr.send(AddSearch).await.unwrap(); + } + } + + pub fn get_counter() -> Counter { + Counter(Count { + duration: DURATION, + search_threshold: 0, + }) + } + + #[test] + fn counter_decrement_by_works() { + let mut m = get_counter(); + for _ in 0..100 { + m.0.add_search(); + } + assert_eq!(m.0.get_searches(), 100); + m.0.decrement_search_by(50); + assert_eq!(m.0.get_searches(), 50); + m.0.decrement_search_by(500); + assert_eq!(m.0.get_searches(), 0); + } + + #[actix_rt::test] + async fn get_current_search_count_works() { + let addr: MyActor = get_counter().start(); + + addr.send(AddSearch).await.unwrap(); + addr.send(AddSearch).await.unwrap(); + addr.send(AddSearch).await.unwrap(); + addr.send(AddSearch).await.unwrap(); + let count = addr.send(GetCurrentSearchCount).await.unwrap(); + + assert_eq!(count, 4); + } + + #[actix_rt::test] + async fn counter_defense_loosenup_works() { + let addr: MyActor = get_counter().start(); + + race(addr.clone(), LEVEL_2).await; + addr.send(AddSearch).await.unwrap(); + assert_eq!(addr.send(GetCurrentSearchCount).await.unwrap(), LEVEL_2.1); + + let duration = Duration::new(DURATION + 1, 0); + sleep(duration).await; + //delay_for(duration).await; + + addr.send(AddSearch).await.unwrap(); + let count = addr.send(GetCurrentSearchCount).await.unwrap(); + assert_eq!(count, 1); + } + + #[actix_rt::test] + #[should_panic] + async fn stop_works() { + let addr: MyActor = get_counter().start(); + addr.send(Stop).await.unwrap(); + addr.send(AddSearch).await.unwrap(); + } +} diff --git a/src/errors.rs b/src/errors.rs index fd0ada7..e85e084 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -153,5 +153,13 @@ impl From for ServiceError { } } +#[cfg(not(tarpaulin_include))] +impl From for ServiceError { + fn from(e: actix::MailboxError) -> Self { + log::debug!("Actor mailbox error: {:?}", e); + Self::InternalServerError + } +} + #[cfg(not(tarpaulin_include))] pub type ServiceResult = std::result::Result; diff --git a/src/main.rs b/src/main.rs index d2e0003..4f58473 100644 --- a/src/main.rs +++ b/src/main.rs @@ -23,12 +23,14 @@ use lazy_static::lazy_static; use tokio::sync::oneshot; pub mod api; +pub mod counter; pub mod ctx; pub mod db; pub mod dns; pub mod errors; pub mod federate; pub mod introduce; +pub mod master; pub mod pages; pub mod routes; pub mod search; diff --git a/src/master.rs b/src/master.rs new file mode 100644 index 0000000..05081bd --- /dev/null +++ b/src/master.rs @@ -0,0 +1,294 @@ +/* + * counter - A proof of work based DoS protection system + * Copyright © 2021 Aravinth Manivannan + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +//! Embedded [Master] actor module that manages [Counter] actors +use std::collections::BTreeMap; +use std::time::Duration; + +use actix::clock::sleep; +use actix::dev::*; +use derive_builder::Builder; +use log::info; +use tokio::sync::oneshot::channel; +use tokio::sync::oneshot::Receiver; + +use crate::counter::{Counter, GetCurrentSearchCount, Stop}; +use crate::errors::*; + +/// Message to add search to an [Counter] actor +#[derive(Message, Clone)] +#[rtype(result = "Receiver>")] +pub struct AddSearchMaster(pub String); + +/// Message to add an [Counter] actor to [Master] +#[derive(Message, Builder)] +#[rtype(result = "Receiver>")] +pub struct AddCounter { + pub id: String, + pub counter: Counter, +} + +/// Message to rename an Counter actor +#[derive(Message, Builder)] +#[rtype(result = "Receiver>")] +pub struct Rename { + pub name: String, + pub rename_to: String, +} + +/// Message to delete [Counter] actor +#[derive(Message)] +#[rtype(result = "Receiver>")] +pub struct RemoveCounter(pub String); + +/// This Actor manages the [Counter] actors. +/// A service can have several [Counter] actors with +/// varying [Defense][crate::defense::Defense] configurations +/// so a "master" actor is needed to manage them all +#[derive(Clone, Default)] +pub struct Master { + sites: BTreeMap, Addr)>, + gc: u64, +} + +impl Master { + /// add [Counter] actor to [Master] + pub fn add_site(&mut self, addr: Addr, id: String) { + self.sites.insert(id, (None, addr)); + } + + /// create new master + /// accepts a `u64` to configure garbage collection period + pub fn new(gc: u64) -> Self { + Master { + sites: BTreeMap::new(), + gc, + } + } + + /// get [Counter] actor from [Master] + pub fn get_site(&mut self, id: &str) -> Option> { + let mut r = None; + if let Some((read_val, addr)) = self.sites.get_mut(id) { + r = Some(addr.clone()); + *read_val = Some(()); + }; + r + } + + /// remvoes [Counter] actor from [Master] + pub fn rm_site(&mut self, id: &str) -> Option<(Option<()>, Addr)> { + self.sites.remove(id) + } + + /// renames [Counter] actor + pub fn rename(&mut self, msg: Rename) { + // If actor isn't present, it's okay to not throw an error + // since actors are lazyily initialized and are cleaned up when inactive + if let Some((_, counter)) = self.sites.remove(&msg.name) { + self.add_site(counter, msg.rename_to); + } + } +} + +impl Actor for Master { + type Context = Context; + + fn started(&mut self, ctx: &mut Self::Context) { + let addr = ctx.address(); + let task = async move { + addr.send(CleanUp).await.unwrap(); + } + .into_actor(self); + ctx.spawn(task); + } +} + +impl Handler for Master { + type Result = MessageResult; + + fn handle(&mut self, m: AddSearchMaster, ctx: &mut Self::Context) -> Self::Result { + let (tx, rx) = channel(); + match self.get_site(&m.0) { + None => { + let _ = tx.send(Ok(None)); + } + Some(addr) => { + let fut = async move { + match addr.send(crate::counter::AddSearch).await { + Ok(val) => { + let _ = tx.send(Ok(Some(val))); + } + Err(e) => { + let err: ServiceError = e.into(); + let _ = tx.send(Err(err)); + } + } + } + .into_actor(self); + ctx.spawn(fut); + } + }; + //MessageResult(rx) + MessageResult(()) + } +} + +impl Handler for Master { + type Result = MessageResult; + + fn handle(&mut self, m: Rename, _ctx: &mut Self::Context) -> Self::Result { + self.rename(m); + let (tx, rx) = channel(); + let _ = tx.send(Ok(())); + MessageResult(rx) + } +} + +/// Message to get an [Counter] actor from master +#[derive(Message)] +#[rtype(result = "Option>")] +pub struct GetSite(pub String); + +impl Handler for Master { + type Result = MessageResult; + + fn handle(&mut self, m: GetSite, _ctx: &mut Self::Context) -> Self::Result { + let addr = self.get_site(&m.0); + match addr { + None => MessageResult(None), + Some(addr) => MessageResult(Some(addr)), + } + } +} + +/// Message to clean up master of [Counter] actors with zero search count +#[derive(Message)] +#[rtype(result = "()")] +pub struct CleanUp; + +impl Handler for Master { + type Result = (); + + fn handle(&mut self, _: CleanUp, ctx: &mut Self::Context) -> Self::Result { + let sites = self.sites.clone(); + let gc = self.gc; + let master = ctx.address(); + info!("init master actor cleanup up"); + let task = async move { + for (id, (new, addr)) in sites.iter() { + let search_count = addr.send(GetCurrentSearchCount).await.unwrap(); + println!("{}", search_count); + if search_count == 0 && new.is_some() { + addr.send(Stop).await.unwrap(); + master.send(RemoveCounter(id.to_owned())).await.unwrap(); + println!("cleaned up"); + } + } + + let duration = Duration::new(gc, 0); + sleep(duration).await; + //delay_for(duration).await; + master.send(CleanUp).await.unwrap(); + } + .into_actor(self); + ctx.spawn(task); + } +} + +impl Handler for Master { + type Result = MessageResult; + + fn handle(&mut self, m: RemoveCounter, ctx: &mut Self::Context) -> Self::Result { + let (tx, rx) = channel(); + if let Some((_, addr)) = self.rm_site(&m.0) { + let fut = async move { + //addr.send(Stop).await?; + let res: ServiceResult<()> = addr.send(Stop).await.map_err(|e| e.into()); + let _ = tx.send(res); + } + .into_actor(self); + ctx.spawn(fut); + } else { + tx.send(Ok(())).unwrap(); + } + MessageResult(rx) + } +} + +impl Handler for Master { + type Result = MessageResult; + + fn handle(&mut self, m: AddCounter, _ctx: &mut Self::Context) -> Self::Result { + // let (tx, rx) = channel(); + let counter: Counter = m.counter.into(); + let addr = counter.start(); + self.add_site(addr, m.id); + // tx.send(Ok(())).unwrap(); + //MessageResult(rx) + MessageResult(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::counter::tests::*; + + #[actix_rt::test] + async fn master_actor_works() { + let addr = Master::new(1).start(); + + let get_add_site_msg = |id: String, counter: Counter| { + AddCounterBuilder::default() + .id(id) + .counter(counter) + .build() + .unwrap() + }; + + let id = "yo"; + let msg = get_add_site_msg(id.into(), get_counter()); + + addr.send(msg).await.unwrap(); + + let counter_addr = addr.send(GetSite(id.into())).await.unwrap(); + assert!(counter_addr.is_some()); + + let new_id = "yoyo"; + let rename = RenameBuilder::default() + .name(id.into()) + .rename_to(new_id.into()) + .build() + .unwrap(); + addr.send(rename).await.unwrap(); + let counter_addr = addr.send(GetSite(new_id.into())).await.unwrap(); + assert!(counter_addr.is_some()); + + let addr_doesnt_exist = addr.send(GetSite("a".into())).await.unwrap(); + assert!(addr_doesnt_exist.is_none()); + + let timer_expire = Duration::new(DURATION, 0); + sleep(timer_expire).await; + sleep(timer_expire).await; + + let counter_addr = addr.send(GetSite(new_id.into())).await.unwrap(); + assert_eq!(counter_addr, None); + + assert!(addr.send(RemoveCounter(new_id.into())).await.is_ok()); + } +}