feat: implement cache actor traits

This commit is contained in:
Aravinth Manivannan 2023-12-17 19:24:11 +05:30
parent b84e3ef275
commit adadaff463
Signed by: realaravinth
GPG key ID: F8F50389936984FF

View file

@ -23,6 +23,7 @@ use std::ops::RangeBounds;
use std::sync::Arc; use std::sync::Arc;
use std::sync::Mutex; use std::sync::Mutex;
use libmcaptcha::cache::messages::CachedPoWConfig;
use libmcaptcha::AddVisitorResult; use libmcaptcha::AddVisitorResult;
use libmcaptcha::MCaptcha; use libmcaptcha::MCaptcha;
use openraft::async_trait::async_trait; use openraft::async_trait::async_trait;
@ -38,6 +39,7 @@ use openraft::LogId;
use openraft::RaftLogReader; use openraft::RaftLogReader;
use openraft::RaftSnapshotBuilder; use openraft::RaftSnapshotBuilder;
use openraft::RaftStorage; use openraft::RaftStorage;
use openraft::RaftTypeConfig;
use openraft::SnapshotMeta; use openraft::SnapshotMeta;
use openraft::StorageError; use openraft::StorageError;
use openraft::StorageIOError; use openraft::StorageIOError;
@ -45,7 +47,6 @@ use openraft::StoredMembership;
use openraft::Vote; use openraft::Vote;
use serde::Deserialize; use serde::Deserialize;
use serde::Serialize; use serde::Serialize;
use sqlx::Statement;
use tokio::sync::RwLock; use tokio::sync::RwLock;
use url::quirks::set_pathname; use url::quirks::set_pathname;
@ -53,6 +54,9 @@ use crate::DcacheNodeId;
use crate::DcacheTypeConfig; use crate::DcacheTypeConfig;
use actix::prelude::*; use actix::prelude::*;
use libmcaptcha::cache::messages::{
CachePoW, CacheResult, DeleteCaptchaResult, DeletePoW, RetrivePoW, VerifyCaptchaResult,
};
use libmcaptcha::master::messages::{ use libmcaptcha::master::messages::{
AddSite as AddCaptcha, AddVisitor, GetInternalData, RemoveCaptcha, Rename as RenameCaptcha, AddSite as AddCaptcha, AddVisitor, GetInternalData, RemoveCaptcha, Rename as RenameCaptcha,
SetInternalData, SetInternalData,
@ -61,22 +65,25 @@ use libmcaptcha::{master::embedded::master::Master as EmbeddedMaster, system::Sy
pub mod system; pub mod system;
#[derive(Serialize, Deserialize, Debug, Clone)] #[derive(Serialize, Deserialize, Clone, Debug)]
pub enum DcacheRequest { pub enum DcacheRequest {
//Set { key: String, value: String }, // master
AddVisitor(AddVisitor), AddVisitor(AddVisitor),
AddCaptcha(AddCaptcha), AddCaptcha(AddCaptcha),
RenameCaptcha(RenameCaptcha), RenameCaptcha(RenameCaptcha),
RemoveCaptcha(RemoveCaptcha), RemoveCaptcha(RemoveCaptcha),
//cache
CachePoW(CachePoW),
DeletePoW(DeletePoW),
CacheResult(CacheResult),
DeleteCaptchaResult(DeleteCaptchaResult),
} }
#[derive(Serialize, Deserialize, Debug, Clone)] #[derive(Serialize, Deserialize, Debug, Clone)]
pub enum DcacheResponse { pub enum DcacheResponse {
AddVisitorResult(Option<AddVisitorResult>), AddVisitorResult(Option<AddVisitorResult>),
Empty, Empty, // AddCaptcha, RenameCaptcha, RemoveCaptcha, Cachepow, CacheResult,
// AddCaptchaResult, All returns () // DeletePoW, DeleteCaptchaResult
// RenameCaptchaResult,
// RemoveCaptchaResult,
} }
#[derive(Debug)] #[derive(Debug)]
@ -212,12 +219,11 @@ impl RaftLogReader<DcacheTypeConfig> for Arc<DcacheStore> {
} }
#[async_trait] #[async_trait]
impl RaftSnapshotBuilder<DcacheTypeConfig, Cursor<Vec<u8>>> for Arc<DcacheStore> { impl RaftSnapshotBuilder<DcacheTypeConfig> for Arc<DcacheStore> {
#[tracing::instrument(level = "trace", skip(self))] #[tracing::instrument(level = "trace", skip(self))]
async fn build_snapshot( async fn build_snapshot(
&mut self, &mut self,
) -> Result<Snapshot<DcacheNodeId, BasicNode, Cursor<Vec<u8>>>, StorageError<DcacheNodeId>> ) -> Result<Snapshot<DcacheTypeConfig>, StorageError<DcacheNodeId>> {
{
let data; let data;
let last_applied_log; let last_applied_log;
let last_membership; let last_membership;
@ -277,7 +283,6 @@ impl RaftSnapshotBuilder<DcacheTypeConfig, Cursor<Vec<u8>>> for Arc<DcacheStore>
#[async_trait] #[async_trait]
impl RaftStorage<DcacheTypeConfig> for Arc<DcacheStore> { impl RaftStorage<DcacheTypeConfig> for Arc<DcacheStore> {
type SnapshotData = Cursor<Vec<u8>>;
type LogReader = Self; type LogReader = Self;
type SnapshotBuilder = Self; type SnapshotBuilder = Self;
@ -298,13 +303,13 @@ impl RaftStorage<DcacheTypeConfig> for Arc<DcacheStore> {
} }
#[tracing::instrument(level = "trace", skip(self, entries))] #[tracing::instrument(level = "trace", skip(self, entries))]
async fn append_to_log( async fn append_to_log<I>(&mut self, entries: I) -> Result<(), StorageError<DcacheNodeId>>
&mut self, where
entries: &[&Entry<DcacheTypeConfig>], I: IntoIterator<Item = Entry<DcacheTypeConfig>> + Send,
) -> Result<(), StorageError<DcacheNodeId>> { {
let mut log = self.log.write().await; let mut log = self.log.write().await;
for entry in entries { for entry in entries {
log.insert(entry.log_id.index, (*entry).clone()); log.insert(entry.log_id.index, entry);
} }
Ok(()) Ok(())
} }
@ -375,7 +380,8 @@ impl RaftStorage<DcacheTypeConfig> for Arc<DcacheStore> {
#[tracing::instrument(level = "trace", skip(self, entries))] #[tracing::instrument(level = "trace", skip(self, entries))]
async fn apply_to_state_machine( async fn apply_to_state_machine(
&mut self, &mut self,
entries: &[&Entry<DcacheTypeConfig>], // entries: &[&Entry<DcacheTypeConfig>],
entries: &[Entry<DcacheTypeConfig>],
) -> Result<Vec<DcacheResponse>, StorageError<DcacheNodeId>> { ) -> Result<Vec<DcacheResponse>, StorageError<DcacheNodeId>> {
let mut res = Vec::with_capacity(entries.len()); let mut res = Vec::with_capacity(entries.len());
@ -435,7 +441,42 @@ impl RaftStorage<DcacheTypeConfig> for Arc<DcacheStore> {
.unwrap(); .unwrap();
res.push(DcacheResponse::Empty); res.push(DcacheResponse::Empty);
} }
// cache
DcacheRequest::CachePoW(msg) => {
sm.data
.cache
.send(msg.clone())
.await
.unwrap()
.await
.unwrap()
.unwrap();
res.push(DcacheResponse::Empty);
}
DcacheRequest::DeletePoW(msg) => {
sm.data.cache.send(msg.clone()).await.unwrap().unwrap();
res.push(DcacheResponse::Empty);
}
DcacheRequest::CacheResult(msg) => {
sm.data
.cache
.send(msg.clone())
.await
.unwrap()
.await
.unwrap()
.unwrap();
res.push(DcacheResponse::Empty);
}
DcacheRequest::DeleteCaptchaResult(msg) => {
sm.data.cache.send(msg.clone()).await.unwrap().unwrap();
res.push(DcacheResponse::Empty);
}
}, },
EntryPayload::Membership(ref mem) => { EntryPayload::Membership(ref mem) => {
sm.last_membership = StoredMembership::new(Some(entry.log_id), mem.clone()); sm.last_membership = StoredMembership::new(Some(entry.log_id), mem.clone());
res.push(DcacheResponse::Empty) res.push(DcacheResponse::Empty)
@ -448,7 +489,9 @@ impl RaftStorage<DcacheTypeConfig> for Arc<DcacheStore> {
#[tracing::instrument(level = "trace", skip(self))] #[tracing::instrument(level = "trace", skip(self))]
async fn begin_receiving_snapshot( async fn begin_receiving_snapshot(
&mut self, &mut self,
) -> Result<Box<Self::SnapshotData>, StorageError<DcacheNodeId>> { // ) -> Result<Box<DcacheTypeConfig as RaftTypeConfig>::SnapshotData>, StorageError<DcacheNodeId>> {
) -> Result<Box<<DcacheTypeConfig as RaftTypeConfig>::SnapshotData>, StorageError<DcacheNodeId>>
{
Ok(Box::new(Cursor::new(Vec::new()))) Ok(Box::new(Cursor::new(Vec::new())))
} }
@ -456,7 +499,7 @@ impl RaftStorage<DcacheTypeConfig> for Arc<DcacheStore> {
async fn install_snapshot( async fn install_snapshot(
&mut self, &mut self,
meta: &SnapshotMeta<DcacheNodeId, BasicNode>, meta: &SnapshotMeta<DcacheNodeId, BasicNode>,
snapshot: Box<Self::SnapshotData>, snapshot: Box<<DcacheTypeConfig as RaftTypeConfig>::SnapshotData>,
) -> Result<(), StorageError<DcacheNodeId>> { ) -> Result<(), StorageError<DcacheNodeId>> {
tracing::info!( tracing::info!(
{ snapshot_size = snapshot.get_ref().len() }, { snapshot_size = snapshot.get_ref().len() },
@ -470,17 +513,12 @@ impl RaftStorage<DcacheTypeConfig> for Arc<DcacheStore> {
// Update the state machine. // Update the state machine.
{ {
let updated_persistable_state_machine: PersistableStateMachine = let updated_state_machine: PersistableStateMachine =
serde_json::from_slice(&new_snapshot.data).map_err(|e| { serde_json::from_slice(&new_snapshot.data).map_err(|e| {
StorageIOError::new( StorageIOError::read_snapshot(Some(new_snapshot.meta.signature()), &e)
ErrorSubject::Snapshot(new_snapshot.meta.signature()),
ErrorVerb::Read,
AnyError::new(&e),
)
})?; })?;
let mut state_machine = self.state_machine.write().await; let mut state_machine = self.state_machine.write().await;
let updated_state_machine = updated_persistable_state_machine let updated_state_machine = updated_state_machine
.to_statemachine(state_machine.data.clone()) .to_statemachine(state_machine.data.clone())
.await; .await;
*state_machine = updated_state_machine; *state_machine = updated_state_machine;
@ -495,10 +533,7 @@ impl RaftStorage<DcacheTypeConfig> for Arc<DcacheStore> {
#[tracing::instrument(level = "trace", skip(self))] #[tracing::instrument(level = "trace", skip(self))]
async fn get_current_snapshot( async fn get_current_snapshot(
&mut self, &mut self,
) -> Result< ) -> Result<Option<Snapshot<DcacheTypeConfig>>, StorageError<DcacheNodeId>> {
Option<Snapshot<DcacheNodeId, BasicNode, Self::SnapshotData>>,
StorageError<DcacheNodeId>,
> {
match &*self.current_snapshot.read().await { match &*self.current_snapshot.read().await {
Some(snapshot) => { Some(snapshot) => {
let data = snapshot.data.clone(); let data = snapshot.data.clone();