use std::collections::BTreeMap; use std::collections::HashMap; use std::fmt::Debug; use std::io::Cursor; use std::ops::RangeBounds; use std::sync::Arc; use std::sync::Mutex; use libmcaptcha::AddVisitorResult; use libmcaptcha::MCaptcha; use openraft::async_trait::async_trait; use openraft::storage::LogState; use openraft::storage::Snapshot; use openraft::AnyError; use openraft::BasicNode; use openraft::Entry; use openraft::EntryPayload; use openraft::ErrorSubject; use openraft::ErrorVerb; use openraft::LogId; use openraft::RaftLogReader; use openraft::RaftSnapshotBuilder; use openraft::RaftStorage; use openraft::SnapshotMeta; use openraft::StorageError; use openraft::StorageIOError; use openraft::StoredMembership; use openraft::Vote; use serde::Deserialize; use serde::Serialize; use sqlx::Statement; use tokio::sync::RwLock; use url::quirks::set_pathname; use crate::ExampleNodeId; use crate::ExampleTypeConfig; use actix::prelude::*; use libmcaptcha::master::messages::{ AddSite as AddCaptcha, AddVisitor, GetInternalData, RemoveCaptcha, Rename as RenameCaptcha, SetInternalData, }; use libmcaptcha::{master::embedded::master::Master as EmbeddedMaster, system::System, HashCache}; pub mod system; /** * Here you will set the types of request that will interact with the raft nodes. * For example the `Set` will be used to write data (key and value) to the raft database. * The `AddNode` will append a new node to the current existing shared list of nodes. * You will want to add any request that can write data in all nodes here. */ #[derive(Serialize, Deserialize, Debug, Clone)] pub enum ExampleRequest { //Set { key: String, value: String }, AddVisitor(AddVisitor), AddCaptcha(AddCaptcha), RenameCaptcha(RenameCaptcha), RemoveCaptcha(RemoveCaptcha), } /** * Here you will defined what type of answer you expect from reading the data of a node. * In this example it will return a optional value from a given key in * the `ExampleRequest.Set`. * * TODO: Should we explain how to create multiple `AppDataResponse`? * */ #[derive(Serialize, Deserialize, Debug, Clone)] pub enum ExampleResponse { AddVisitorResult(Option), Empty, // AddCaptchaResult, All returns () // RenameCaptchaResult, // RemoveCaptchaResult, } #[derive(Debug)] pub struct ExampleSnapshot { pub meta: SnapshotMeta, /// The data of the state machine at the time of this snapshot. pub data: Vec, } /** * Here defines a state machine of the raft, this state represents a copy of the data * between each node. Note that we are using `serde` to serialize the `data`, which has * a implementation to be serialized. Note that for this test we set both the key and * value as String, but you could set any type of value that has the serialization impl. */ pub struct ExampleStateMachine { pub last_applied_log: Option>, pub last_membership: StoredMembership, /// Application data. pub data: Arc>, } #[derive(Serialize, Deserialize, Clone)] struct PersistableStateMachine { last_applied_log: Option>, last_membership: StoredMembership, /// Application data. data: HashMap, } impl PersistableStateMachine { async fn from_statemachine(m: &ExampleStateMachine) -> Self { let internal_data = m .data .master .send(GetInternalData) .await .unwrap() .await .unwrap() .unwrap(); Self { last_applied_log: m.last_applied_log.clone(), last_membership: m.last_membership.clone(), data: internal_data, } } async fn to_statemachine( self, data: Arc>, ) -> ExampleStateMachine { data.master .send(SetInternalData { mcaptcha: self.data, }) .await .unwrap(); ExampleStateMachine { last_applied_log: self.last_applied_log, last_membership: self.last_membership, data, } } } pub struct ExampleStore { last_purged_log_id: RwLock>>, /// The Raft log. log: RwLock>>, /// The Raft state machine. pub state_machine: RwLock, /// The current granted vote. vote: RwLock>>, snapshot_idx: Arc>, current_snapshot: RwLock>, } impl ExampleStore { pub fn new(salt: String) -> Self { let state_machine = RwLock::new(ExampleStateMachine { last_applied_log: Default::default(), last_membership: Default::default(), data: system::init_system(salt), }); Self { last_purged_log_id: Default::default(), log: Default::default(), state_machine, vote: Default::default(), snapshot_idx: Default::default(), current_snapshot: Default::default(), } } } #[async_trait] impl RaftLogReader for Arc { async fn get_log_state( &mut self, ) -> Result, StorageError> { let log = self.log.read().await; let last = log.iter().rev().next().map(|(_, ent)| ent.log_id); let last_purged = *self.last_purged_log_id.read().await; let last = match last { None => last_purged, Some(x) => Some(x), }; Ok(LogState { last_purged_log_id: last_purged, last_log_id: last, }) } async fn try_get_log_entries + Clone + Debug + Send + Sync>( &mut self, range: RB, ) -> Result>, StorageError> { let log = self.log.read().await; let response = log .range(range.clone()) .map(|(_, val)| val.clone()) .collect::>(); Ok(response) } } #[async_trait] impl RaftSnapshotBuilder>> for Arc { #[tracing::instrument(level = "trace", skip(self))] async fn build_snapshot( &mut self, ) -> Result>>, StorageError> { let data; let last_applied_log; let last_membership; { // Serialize the data of the state machine. let state_machine = self.state_machine.read().await; let persistable_state_machine = PersistableStateMachine::from_statemachine(&state_machine).await; data = serde_json::to_vec(&persistable_state_machine).map_err(|e| { StorageIOError::new( ErrorSubject::StateMachine, ErrorVerb::Read, AnyError::new(&e), ) })?; last_applied_log = state_machine.last_applied_log; last_membership = state_machine.last_membership.clone(); } let snapshot_idx = { let mut l = self.snapshot_idx.lock().unwrap(); *l += 1; *l }; let snapshot_id = if let Some(last) = last_applied_log { format!("{}-{}-{}", last.leader_id, last.index, snapshot_idx) } else { format!("--{}", snapshot_idx) }; let meta = SnapshotMeta { last_log_id: last_applied_log, last_membership, snapshot_id, }; let snapshot = ExampleSnapshot { meta: meta.clone(), data: data.clone(), }; { let mut current_snapshot = self.current_snapshot.write().await; *current_snapshot = Some(snapshot); } Ok(Snapshot { meta, snapshot: Box::new(Cursor::new(data)), }) } } #[async_trait] impl RaftStorage for Arc { type SnapshotData = Cursor>; type LogReader = Self; type SnapshotBuilder = Self; #[tracing::instrument(level = "trace", skip(self))] async fn save_vote( &mut self, vote: &Vote, ) -> Result<(), StorageError> { let mut v = self.vote.write().await; *v = Some(*vote); Ok(()) } async fn read_vote( &mut self, ) -> Result>, StorageError> { Ok(*self.vote.read().await) } #[tracing::instrument(level = "trace", skip(self, entries))] async fn append_to_log( &mut self, entries: &[&Entry], ) -> Result<(), StorageError> { let mut log = self.log.write().await; for entry in entries { log.insert(entry.log_id.index, (*entry).clone()); } Ok(()) } #[tracing::instrument(level = "debug", skip(self))] async fn delete_conflict_logs_since( &mut self, log_id: LogId, ) -> Result<(), StorageError> { tracing::debug!("delete_log: [{:?}, +oo)", log_id); let mut log = self.log.write().await; let keys = log .range(log_id.index..) .map(|(k, _v)| *k) .collect::>(); for key in keys { log.remove(&key); } Ok(()) } #[tracing::instrument(level = "debug", skip(self))] async fn purge_logs_upto( &mut self, log_id: LogId, ) -> Result<(), StorageError> { tracing::debug!("delete_log: [{:?}, +oo)", log_id); { let mut ld = self.last_purged_log_id.write().await; assert!(*ld <= Some(log_id)); *ld = Some(log_id); } { let mut log = self.log.write().await; let keys = log .range(..=log_id.index) .map(|(k, _v)| *k) .collect::>(); for key in keys { log.remove(&key); } } Ok(()) } async fn last_applied_state( &mut self, ) -> Result< ( Option>, StoredMembership, ), StorageError, > { let state_machine = self.state_machine.read().await; Ok(( state_machine.last_applied_log, state_machine.last_membership.clone(), )) } #[tracing::instrument(level = "trace", skip(self, entries))] async fn apply_to_state_machine( &mut self, entries: &[&Entry], ) -> Result, StorageError> { let mut res = Vec::with_capacity(entries.len()); let mut sm = self.state_machine.write().await; for entry in entries { tracing::debug!(%entry.log_id, "replicate to sm"); sm.last_applied_log = Some(entry.log_id); match entry.payload { EntryPayload::Blank => res.push(ExampleResponse::Empty), EntryPayload::Normal(ref req) => match req { ExampleRequest::AddVisitor(msg) => { let sm = self.state_machine.read().await; res.push(ExampleResponse::AddVisitorResult( sm.data .master .send(msg.clone()) .await .unwrap() .await .unwrap() .unwrap(), )); } ExampleRequest::AddCaptcha(msg) => { let sm = self.state_machine.read().await; sm.data .master .send(msg.clone()) .await .unwrap() .await .unwrap() .unwrap(); res.push(ExampleResponse::Empty); } ExampleRequest::RenameCaptcha(msg) => { let sm = self.state_machine.read().await; sm.data .master .send(msg.clone()) .await .unwrap() .await .unwrap() .unwrap(); res.push(ExampleResponse::Empty); } ExampleRequest::RemoveCaptcha(msg) => { let sm = self.state_machine.read().await; sm.data .master .send(msg.clone()) .await .unwrap() .await .unwrap() .unwrap(); res.push(ExampleResponse::Empty); } }, EntryPayload::Membership(ref mem) => { sm.last_membership = StoredMembership::new(Some(entry.log_id), mem.clone()); res.push(ExampleResponse::Empty) } }; } Ok(res) } #[tracing::instrument(level = "trace", skip(self))] async fn begin_receiving_snapshot( &mut self, ) -> Result, StorageError> { Ok(Box::new(Cursor::new(Vec::new()))) } #[tracing::instrument(level = "trace", skip(self, snapshot))] async fn install_snapshot( &mut self, meta: &SnapshotMeta, snapshot: Box, ) -> Result<(), StorageError> { tracing::info!( { snapshot_size = snapshot.get_ref().len() }, "decoding snapshot for installation" ); let new_snapshot = ExampleSnapshot { meta: meta.clone(), data: snapshot.into_inner(), }; // Update the state machine. { let updated_persistable_state_machine: PersistableStateMachine = serde_json::from_slice(&new_snapshot.data).map_err(|e| { StorageIOError::new( ErrorSubject::Snapshot(new_snapshot.meta.signature()), ErrorVerb::Read, AnyError::new(&e), ) })?; let mut state_machine = self.state_machine.write().await; let updated_state_machine = updated_persistable_state_machine .to_statemachine(state_machine.data.clone()) .await; *state_machine = updated_state_machine; } // Update current snapshot. let mut current_snapshot = self.current_snapshot.write().await; *current_snapshot = Some(new_snapshot); Ok(()) } #[tracing::instrument(level = "trace", skip(self))] async fn get_current_snapshot( &mut self, ) -> Result< Option>, StorageError, > { match &*self.current_snapshot.read().await { Some(snapshot) => { let data = snapshot.data.clone(); Ok(Some(Snapshot { meta: snapshot.meta.clone(), snapshot: Box::new(Cursor::new(data)), })) } None => Ok(None), } } async fn get_log_reader(&mut self) -> Self::LogReader { self.clone() } async fn get_snapshot_builder(&mut self) -> Self::SnapshotBuilder { self.clone() } }