diff --git a/src/store/mod.rs b/src/store/mod.rs index e799f79..6737a4e 100644 --- a/src/store/mod.rs +++ b/src/store/mod.rs @@ -23,6 +23,7 @@ use std::ops::RangeBounds; use std::sync::Arc; use std::sync::Mutex; +use libmcaptcha::cache::messages::CachedPoWConfig; use libmcaptcha::AddVisitorResult; use libmcaptcha::MCaptcha; use openraft::async_trait::async_trait; @@ -38,6 +39,7 @@ use openraft::LogId; use openraft::RaftLogReader; use openraft::RaftSnapshotBuilder; use openraft::RaftStorage; +use openraft::RaftTypeConfig; use openraft::SnapshotMeta; use openraft::StorageError; use openraft::StorageIOError; @@ -45,7 +47,6 @@ use openraft::StoredMembership; use openraft::Vote; use serde::Deserialize; use serde::Serialize; -use sqlx::Statement; use tokio::sync::RwLock; use url::quirks::set_pathname; @@ -53,6 +54,9 @@ use crate::DcacheNodeId; use crate::DcacheTypeConfig; use actix::prelude::*; +use libmcaptcha::cache::messages::{ + CachePoW, CacheResult, DeleteCaptchaResult, DeletePoW, RetrivePoW, VerifyCaptchaResult, +}; use libmcaptcha::master::messages::{ AddSite as AddCaptcha, AddVisitor, GetInternalData, RemoveCaptcha, Rename as RenameCaptcha, SetInternalData, @@ -61,22 +65,25 @@ use libmcaptcha::{master::embedded::master::Master as EmbeddedMaster, system::Sy pub mod system; -#[derive(Serialize, Deserialize, Debug, Clone)] +#[derive(Serialize, Deserialize, Clone, Debug)] pub enum DcacheRequest { - //Set { key: String, value: String }, + // master AddVisitor(AddVisitor), AddCaptcha(AddCaptcha), RenameCaptcha(RenameCaptcha), RemoveCaptcha(RemoveCaptcha), + //cache + CachePoW(CachePoW), + DeletePoW(DeletePoW), + CacheResult(CacheResult), + DeleteCaptchaResult(DeleteCaptchaResult), } #[derive(Serialize, Deserialize, Debug, Clone)] pub enum DcacheResponse { AddVisitorResult(Option), - Empty, - // AddCaptchaResult, All returns () - // RenameCaptchaResult, - // RemoveCaptchaResult, + Empty, // AddCaptcha, RenameCaptcha, RemoveCaptcha, Cachepow, CacheResult, + // DeletePoW, DeleteCaptchaResult } #[derive(Debug)] @@ -212,12 +219,11 @@ impl RaftLogReader for Arc { } #[async_trait] -impl RaftSnapshotBuilder>> for Arc { +impl RaftSnapshotBuilder for Arc { #[tracing::instrument(level = "trace", skip(self))] async fn build_snapshot( &mut self, - ) -> Result>>, StorageError> - { + ) -> Result, StorageError> { let data; let last_applied_log; let last_membership; @@ -277,7 +283,6 @@ impl RaftSnapshotBuilder>> for Arc #[async_trait] impl RaftStorage for Arc { - type SnapshotData = Cursor>; type LogReader = Self; type SnapshotBuilder = Self; @@ -298,13 +303,13 @@ impl RaftStorage for Arc { } #[tracing::instrument(level = "trace", skip(self, entries))] - async fn append_to_log( - &mut self, - entries: &[&Entry], - ) -> Result<(), StorageError> { + async fn append_to_log(&mut self, entries: I) -> Result<(), StorageError> + where + I: IntoIterator> + Send, + { let mut log = self.log.write().await; for entry in entries { - log.insert(entry.log_id.index, (*entry).clone()); + log.insert(entry.log_id.index, entry); } Ok(()) } @@ -375,7 +380,8 @@ impl RaftStorage for Arc { #[tracing::instrument(level = "trace", skip(self, entries))] async fn apply_to_state_machine( &mut self, - entries: &[&Entry], + // entries: &[&Entry], + entries: &[Entry], ) -> Result, StorageError> { let mut res = Vec::with_capacity(entries.len()); @@ -435,7 +441,42 @@ impl RaftStorage for Arc { .unwrap(); res.push(DcacheResponse::Empty); } + + // cache + DcacheRequest::CachePoW(msg) => { + sm.data + .cache + .send(msg.clone()) + .await + .unwrap() + .await + .unwrap() + .unwrap(); + res.push(DcacheResponse::Empty); + } + DcacheRequest::DeletePoW(msg) => { + sm.data.cache.send(msg.clone()).await.unwrap().unwrap(); + + res.push(DcacheResponse::Empty); + } + + DcacheRequest::CacheResult(msg) => { + sm.data + .cache + .send(msg.clone()) + .await + .unwrap() + .await + .unwrap() + .unwrap(); + res.push(DcacheResponse::Empty); + } + DcacheRequest::DeleteCaptchaResult(msg) => { + sm.data.cache.send(msg.clone()).await.unwrap().unwrap(); + res.push(DcacheResponse::Empty); + } }, + EntryPayload::Membership(ref mem) => { sm.last_membership = StoredMembership::new(Some(entry.log_id), mem.clone()); res.push(DcacheResponse::Empty) @@ -448,7 +489,9 @@ impl RaftStorage for Arc { #[tracing::instrument(level = "trace", skip(self))] async fn begin_receiving_snapshot( &mut self, - ) -> Result, StorageError> { + // ) -> Result::SnapshotData>, StorageError> { + ) -> Result::SnapshotData>, StorageError> + { Ok(Box::new(Cursor::new(Vec::new()))) } @@ -456,7 +499,7 @@ impl RaftStorage for Arc { async fn install_snapshot( &mut self, meta: &SnapshotMeta, - snapshot: Box, + snapshot: Box<::SnapshotData>, ) -> Result<(), StorageError> { tracing::info!( { snapshot_size = snapshot.get_ref().len() }, @@ -470,17 +513,12 @@ impl RaftStorage for Arc { // Update the state machine. { - let updated_persistable_state_machine: PersistableStateMachine = + let updated_state_machine: PersistableStateMachine = serde_json::from_slice(&new_snapshot.data).map_err(|e| { - StorageIOError::new( - ErrorSubject::Snapshot(new_snapshot.meta.signature()), - ErrorVerb::Read, - AnyError::new(&e), - ) + StorageIOError::read_snapshot(Some(new_snapshot.meta.signature()), &e) })?; - let mut state_machine = self.state_machine.write().await; - let updated_state_machine = updated_persistable_state_machine + let updated_state_machine = updated_state_machine .to_statemachine(state_machine.data.clone()) .await; *state_machine = updated_state_machine; @@ -495,10 +533,7 @@ impl RaftStorage for Arc { #[tracing::instrument(level = "trace", skip(self))] async fn get_current_snapshot( &mut self, - ) -> Result< - Option>, - StorageError, - > { + ) -> Result>, StorageError> { match &*self.current_snapshot.read().await { Some(snapshot) => { let data = snapshot.data.clone();