feat: leaky-bucket algo implementing federated search counter

This commit is contained in:
Aravinth Manivannan 2023-03-04 16:41:48 +05:30
parent 0576a54af7
commit 5e18cad34c
Signed by: realaravinth
GPG key ID: AD9F0F08E855ED88
6 changed files with 596 additions and 3 deletions

84
Cargo.lock generated
View file

@ -2,6 +2,30 @@
# It is not intended for manual editing. # It is not intended for manual editing.
version = 3 version = 3
[[package]]
name = "actix"
version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f728064aca1c318585bf4bb04ffcfac9e75e508ab4e8b1bd9ba5dfe04e2cbed5"
dependencies = [
"actix-rt",
"actix_derive",
"bitflags",
"bytes",
"crossbeam-channel",
"futures-core",
"futures-sink",
"futures-task",
"futures-util",
"log",
"once_cell",
"parking_lot 0.12.0",
"pin-project-lite",
"smallvec",
"tokio",
"tokio-util 0.7.1",
]
[[package]] [[package]]
name = "actix-codec" name = "actix-codec"
version = "0.5.0" version = "0.5.0"
@ -231,6 +255,17 @@ dependencies = [
"syn", "syn",
] ]
[[package]]
name = "actix_derive"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6d44b8fee1ced9671ba043476deddef739dd0959bf77030b26b738cc591737a7"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]] [[package]]
name = "adler" name = "adler"
version = "1.0.2" version = "1.0.2"
@ -492,7 +527,7 @@ version = "0.2.0"
source = "git+https://github.com/realaravinth/cache-buster#7ca4545722fb99be30698a5e72c7d982a70fa11f" source = "git+https://github.com/realaravinth/cache-buster#7ca4545722fb99be30698a5e72c7d982a70fa11f"
dependencies = [ dependencies = [
"data-encoding", "data-encoding",
"derive_builder", "derive_builder 0.11.2",
"mime", "mime",
"mime_guess", "mime_guess",
"serde", "serde",
@ -688,6 +723,16 @@ dependencies = [
"cfg-if", "cfg-if",
] ]
[[package]]
name = "crossbeam-channel"
version = "0.5.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf2b3e8478797446514c91ef04bafcb59faba183e621ad488df88983cc14128c"
dependencies = [
"cfg-if",
"crossbeam-utils",
]
[[package]] [[package]]
name = "crossbeam-queue" name = "crossbeam-queue"
version = "0.3.5" version = "0.3.5"
@ -796,7 +841,16 @@ version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d07adf7be193b71cc36b193d0f5fe60b918a3a9db4dad0449f57bcfd519704a3" checksum = "d07adf7be193b71cc36b193d0f5fe60b918a3a9db4dad0449f57bcfd519704a3"
dependencies = [ dependencies = [
"derive_builder_macro", "derive_builder_macro 0.11.2",
]
[[package]]
name = "derive_builder"
version = "0.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8d67778784b508018359cbc8696edb3db78160bab2c2a28ba7f56ef6932997f8"
dependencies = [
"derive_builder_macro 0.12.0",
] ]
[[package]] [[package]]
@ -811,13 +865,35 @@ dependencies = [
"syn", "syn",
] ]
[[package]]
name = "derive_builder_core"
version = "0.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c11bdc11a0c47bc7d37d582b5285da6849c96681023680b906673c5707af7b0f"
dependencies = [
"darling",
"proc-macro2",
"quote",
"syn",
]
[[package]] [[package]]
name = "derive_builder_macro" name = "derive_builder_macro"
version = "0.11.2" version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f0314b72bed045f3a68671b3c86328386762c93f82d98c65c3cb5e5f573dd68" checksum = "8f0314b72bed045f3a68671b3c86328386762c93f82d98c65c3cb5e5f573dd68"
dependencies = [ dependencies = [
"derive_builder_core", "derive_builder_core 0.11.2",
"syn",
]
[[package]]
name = "derive_builder_macro"
version = "0.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ebcda35c7a396850a55ffeac740804b40ffec779b98fffbb1738f4033f0ee79e"
dependencies = [
"derive_builder_core 0.12.0",
"syn", "syn",
] ]
@ -2829,6 +2905,7 @@ dependencies = [
name = "starchart" name = "starchart"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"actix",
"actix-files", "actix-files",
"actix-identity", "actix-identity",
"actix-rt", "actix-rt",
@ -2841,6 +2918,7 @@ dependencies = [
"config", "config",
"db-core", "db-core",
"db-sqlx-sqlite", "db-sqlx-sqlite",
"derive_builder 0.12.0",
"derive_more", "derive_more",
"federate-core", "federate-core",
"forge-core", "forge-core",

View file

@ -43,6 +43,8 @@ rust-embed = "6.3.0"
urlencoding = "2.1.0" urlencoding = "2.1.0"
clap = { version = "4.0.32", features = ["derive"] } clap = { version = "4.0.32", features = ["derive"] }
api_routes = { path ="./api_routes/"} api_routes = { path ="./api_routes/"}
actix = "0.13.0"
derive_builder = "0.12.0"
[dependencies.cache-buster] [dependencies.cache-buster]
git = "https://github.com/realaravinth/cache-buster" git = "https://github.com/realaravinth/cache-buster"

209
src/counter.rs Normal file
View file

@ -0,0 +1,209 @@
/*
* counter - A proof of work based DoS protection system
* Copyright © 2021 Aravinth Manivannan <realravinth@batsense.net>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* al
*/
use std::time::Duration;
use actix::clock::sleep;
use actix::dev::*;
use serde::{Deserialize, Serialize};
#[derive(Clone, Serialize, Deserialize, Debug)]
pub struct Count {
pub search_threshold: u32,
pub duration: u64,
}
impl Count {
/// increments the search count by one
#[inline]
pub fn add_search(&mut self) {
self.search_threshold += 1;
}
/// decrements the search count by specified count
#[inline]
pub fn decrement_search_by(&mut self, count: u32) {
if self.search_threshold > 0 {
if self.search_threshold >= count {
self.search_threshold -= count;
} else {
self.search_threshold = 0;
}
}
}
/// get [Counter]'s current search_threshold
#[inline]
pub fn get_searches(&self) -> u32 {
self.search_threshold
}
}
/// This struct represents the counter state and is used
/// to configure leaky-bucket lifetime and manage defense
#[derive(Clone, Serialize, Deserialize, Debug)]
pub struct Counter(Count);
impl From<Count> for Counter {
fn from(c: Count) -> Counter {
Counter(c)
}
}
impl Actor for Counter {
type Context = Context<Self>;
}
/// Message to decrement the search count
#[derive(Message)]
#[rtype(result = "()")]
struct DeleteSearch;
impl Handler<DeleteSearch> for Counter {
type Result = ();
fn handle(&mut self, _msg: DeleteSearch, _ctx: &mut Self::Context) -> Self::Result {
self.0.decrement_search_by(1);
}
}
/// Message to increment the search count
/// returns difficulty factor and lifetime
#[derive(Message)]
#[rtype(result = "u32")]
pub struct AddSearch;
impl Handler<AddSearch> for Counter {
type Result = MessageResult<AddSearch>;
fn handle(&mut self, _: AddSearch, ctx: &mut Self::Context) -> Self::Result {
self.0.add_search();
let addr = ctx.address();
let duration: Duration = Duration::new(self.0.duration, 0);
let wait_for = async move {
sleep(duration).await;
//delay_for(duration).await;
addr.send(DeleteSearch).await.unwrap();
}
.into_actor(self);
ctx.spawn(wait_for);
MessageResult(self.0.get_searches())
}
}
/// Message to get the search count
#[derive(Message)]
#[rtype(result = "u32")]
pub struct GetCurrentSearchCount;
impl Handler<GetCurrentSearchCount> for Counter {
type Result = MessageResult<GetCurrentSearchCount>;
fn handle(&mut self, _: GetCurrentSearchCount, _ctx: &mut Self::Context) -> Self::Result {
MessageResult(self.0.get_searches())
}
}
/// Message to stop [Counter]
#[derive(Message)]
#[rtype(result = "()")]
pub struct Stop;
impl Handler<Stop> for Counter {
type Result = ();
fn handle(&mut self, _: Stop, ctx: &mut Self::Context) -> Self::Result {
ctx.stop()
}
}
#[cfg(test)]
pub mod tests {
use super::*;
// constants for testing
// (search count, level)
pub const LEVEL_1: (u32, u32) = (50, 50);
pub const LEVEL_2: (u32, u32) = (500, 500);
pub const DURATION: u64 = 5;
type MyActor = Addr<Counter>;
async fn race(addr: Addr<Counter>, count: (u32, u32)) {
for _ in 0..count.0 as usize - 1 {
let _ = addr.send(AddSearch).await.unwrap();
}
}
pub fn get_counter() -> Counter {
Counter(Count {
duration: DURATION,
search_threshold: 0,
})
}
#[test]
fn counter_decrement_by_works() {
let mut m = get_counter();
for _ in 0..100 {
m.0.add_search();
}
assert_eq!(m.0.get_searches(), 100);
m.0.decrement_search_by(50);
assert_eq!(m.0.get_searches(), 50);
m.0.decrement_search_by(500);
assert_eq!(m.0.get_searches(), 0);
}
#[actix_rt::test]
async fn get_current_search_count_works() {
let addr: MyActor = get_counter().start();
addr.send(AddSearch).await.unwrap();
addr.send(AddSearch).await.unwrap();
addr.send(AddSearch).await.unwrap();
addr.send(AddSearch).await.unwrap();
let count = addr.send(GetCurrentSearchCount).await.unwrap();
assert_eq!(count, 4);
}
#[actix_rt::test]
async fn counter_defense_loosenup_works() {
let addr: MyActor = get_counter().start();
race(addr.clone(), LEVEL_2).await;
addr.send(AddSearch).await.unwrap();
assert_eq!(addr.send(GetCurrentSearchCount).await.unwrap(), LEVEL_2.1);
let duration = Duration::new(DURATION + 1, 0);
sleep(duration).await;
//delay_for(duration).await;
addr.send(AddSearch).await.unwrap();
let count = addr.send(GetCurrentSearchCount).await.unwrap();
assert_eq!(count, 1);
}
#[actix_rt::test]
#[should_panic]
async fn stop_works() {
let addr: MyActor = get_counter().start();
addr.send(Stop).await.unwrap();
addr.send(AddSearch).await.unwrap();
}
}

View file

@ -153,5 +153,13 @@ impl From<ParseError> for ServiceError {
} }
} }
#[cfg(not(tarpaulin_include))]
impl From<actix::MailboxError> for ServiceError {
fn from(e: actix::MailboxError) -> Self {
log::debug!("Actor mailbox error: {:?}", e);
Self::InternalServerError
}
}
#[cfg(not(tarpaulin_include))] #[cfg(not(tarpaulin_include))]
pub type ServiceResult<V> = std::result::Result<V, ServiceError>; pub type ServiceResult<V> = std::result::Result<V, ServiceError>;

View file

@ -23,12 +23,14 @@ use lazy_static::lazy_static;
use tokio::sync::oneshot; use tokio::sync::oneshot;
pub mod api; pub mod api;
pub mod counter;
pub mod ctx; pub mod ctx;
pub mod db; pub mod db;
pub mod dns; pub mod dns;
pub mod errors; pub mod errors;
pub mod federate; pub mod federate;
pub mod introduce; pub mod introduce;
pub mod master;
pub mod pages; pub mod pages;
pub mod routes; pub mod routes;
pub mod search; pub mod search;

294
src/master.rs Normal file
View file

@ -0,0 +1,294 @@
/*
* counter - A proof of work based DoS protection system
* Copyright © 2021 Aravinth Manivannan <realravinth@batsense.net>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
//! Embedded [Master] actor module that manages [Counter] actors
use std::collections::BTreeMap;
use std::time::Duration;
use actix::clock::sleep;
use actix::dev::*;
use derive_builder::Builder;
use log::info;
use tokio::sync::oneshot::channel;
use tokio::sync::oneshot::Receiver;
use crate::counter::{Counter, GetCurrentSearchCount, Stop};
use crate::errors::*;
/// Message to add search to an [Counter] actor
#[derive(Message, Clone)]
#[rtype(result = "Receiver<ServiceResult<Option<u32>>")]
pub struct AddSearchMaster(pub String);
/// Message to add an [Counter] actor to [Master]
#[derive(Message, Builder)]
#[rtype(result = "Receiver<ServiceResult<Option<u32>>")]
pub struct AddCounter {
pub id: String,
pub counter: Counter,
}
/// Message to rename an Counter actor
#[derive(Message, Builder)]
#[rtype(result = "Receiver<ServiceResult<()>>")]
pub struct Rename {
pub name: String,
pub rename_to: String,
}
/// Message to delete [Counter] actor
#[derive(Message)]
#[rtype(result = "Receiver<ServiceResult<()>>")]
pub struct RemoveCounter(pub String);
/// This Actor manages the [Counter] actors.
/// A service can have several [Counter] actors with
/// varying [Defense][crate::defense::Defense] configurations
/// so a "master" actor is needed to manage them all
#[derive(Clone, Default)]
pub struct Master {
sites: BTreeMap<String, (Option<()>, Addr<Counter>)>,
gc: u64,
}
impl Master {
/// add [Counter] actor to [Master]
pub fn add_site(&mut self, addr: Addr<Counter>, id: String) {
self.sites.insert(id, (None, addr));
}
/// create new master
/// accepts a `u64` to configure garbage collection period
pub fn new(gc: u64) -> Self {
Master {
sites: BTreeMap::new(),
gc,
}
}
/// get [Counter] actor from [Master]
pub fn get_site(&mut self, id: &str) -> Option<Addr<Counter>> {
let mut r = None;
if let Some((read_val, addr)) = self.sites.get_mut(id) {
r = Some(addr.clone());
*read_val = Some(());
};
r
}
/// remvoes [Counter] actor from [Master]
pub fn rm_site(&mut self, id: &str) -> Option<(Option<()>, Addr<Counter>)> {
self.sites.remove(id)
}
/// renames [Counter] actor
pub fn rename(&mut self, msg: Rename) {
// If actor isn't present, it's okay to not throw an error
// since actors are lazyily initialized and are cleaned up when inactive
if let Some((_, counter)) = self.sites.remove(&msg.name) {
self.add_site(counter, msg.rename_to);
}
}
}
impl Actor for Master {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
let addr = ctx.address();
let task = async move {
addr.send(CleanUp).await.unwrap();
}
.into_actor(self);
ctx.spawn(task);
}
}
impl Handler<AddSearchMaster> for Master {
type Result = MessageResult<AddSearchMaster>;
fn handle(&mut self, m: AddSearchMaster, ctx: &mut Self::Context) -> Self::Result {
let (tx, rx) = channel();
match self.get_site(&m.0) {
None => {
let _ = tx.send(Ok(None));
}
Some(addr) => {
let fut = async move {
match addr.send(crate::counter::AddSearch).await {
Ok(val) => {
let _ = tx.send(Ok(Some(val)));
}
Err(e) => {
let err: ServiceError = e.into();
let _ = tx.send(Err(err));
}
}
}
.into_actor(self);
ctx.spawn(fut);
}
};
//MessageResult(rx)
MessageResult(())
}
}
impl Handler<Rename> for Master {
type Result = MessageResult<Rename>;
fn handle(&mut self, m: Rename, _ctx: &mut Self::Context) -> Self::Result {
self.rename(m);
let (tx, rx) = channel();
let _ = tx.send(Ok(()));
MessageResult(rx)
}
}
/// Message to get an [Counter] actor from master
#[derive(Message)]
#[rtype(result = "Option<Addr<Counter>>")]
pub struct GetSite(pub String);
impl Handler<GetSite> for Master {
type Result = MessageResult<GetSite>;
fn handle(&mut self, m: GetSite, _ctx: &mut Self::Context) -> Self::Result {
let addr = self.get_site(&m.0);
match addr {
None => MessageResult(None),
Some(addr) => MessageResult(Some(addr)),
}
}
}
/// Message to clean up master of [Counter] actors with zero search count
#[derive(Message)]
#[rtype(result = "()")]
pub struct CleanUp;
impl Handler<CleanUp> for Master {
type Result = ();
fn handle(&mut self, _: CleanUp, ctx: &mut Self::Context) -> Self::Result {
let sites = self.sites.clone();
let gc = self.gc;
let master = ctx.address();
info!("init master actor cleanup up");
let task = async move {
for (id, (new, addr)) in sites.iter() {
let search_count = addr.send(GetCurrentSearchCount).await.unwrap();
println!("{}", search_count);
if search_count == 0 && new.is_some() {
addr.send(Stop).await.unwrap();
master.send(RemoveCounter(id.to_owned())).await.unwrap();
println!("cleaned up");
}
}
let duration = Duration::new(gc, 0);
sleep(duration).await;
//delay_for(duration).await;
master.send(CleanUp).await.unwrap();
}
.into_actor(self);
ctx.spawn(task);
}
}
impl Handler<RemoveCounter> for Master {
type Result = MessageResult<RemoveCounter>;
fn handle(&mut self, m: RemoveCounter, ctx: &mut Self::Context) -> Self::Result {
let (tx, rx) = channel();
if let Some((_, addr)) = self.rm_site(&m.0) {
let fut = async move {
//addr.send(Stop).await?;
let res: ServiceResult<()> = addr.send(Stop).await.map_err(|e| e.into());
let _ = tx.send(res);
}
.into_actor(self);
ctx.spawn(fut);
} else {
tx.send(Ok(())).unwrap();
}
MessageResult(rx)
}
}
impl Handler<AddCounter> for Master {
type Result = MessageResult<AddCounter>;
fn handle(&mut self, m: AddCounter, _ctx: &mut Self::Context) -> Self::Result {
// let (tx, rx) = channel();
let counter: Counter = m.counter.into();
let addr = counter.start();
self.add_site(addr, m.id);
// tx.send(Ok(())).unwrap();
//MessageResult(rx)
MessageResult(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::counter::tests::*;
#[actix_rt::test]
async fn master_actor_works() {
let addr = Master::new(1).start();
let get_add_site_msg = |id: String, counter: Counter| {
AddCounterBuilder::default()
.id(id)
.counter(counter)
.build()
.unwrap()
};
let id = "yo";
let msg = get_add_site_msg(id.into(), get_counter());
addr.send(msg).await.unwrap();
let counter_addr = addr.send(GetSite(id.into())).await.unwrap();
assert!(counter_addr.is_some());
let new_id = "yoyo";
let rename = RenameBuilder::default()
.name(id.into())
.rename_to(new_id.into())
.build()
.unwrap();
addr.send(rename).await.unwrap();
let counter_addr = addr.send(GetSite(new_id.into())).await.unwrap();
assert!(counter_addr.is_some());
let addr_doesnt_exist = addr.send(GetSite("a".into())).await.unwrap();
assert!(addr_doesnt_exist.is_none());
let timer_expire = Duration::new(DURATION, 0);
sleep(timer_expire).await;
sleep(timer_expire).await;
let counter_addr = addr.send(GetSite(new_id.into())).await.unwrap();
assert_eq!(counter_addr, None);
assert!(addr.send(RemoveCounter(new_id.into())).await.is_ok());
}
}