fix: rename vars
This commit is contained in:
parent
d6b6c5266e
commit
9c0a85915c
9 changed files with 153 additions and 153 deletions
14
src/app.rs
14
src/app.rs
|
@ -19,16 +19,16 @@ use std::sync::Arc;
|
||||||
|
|
||||||
use openraft::Config;
|
use openraft::Config;
|
||||||
|
|
||||||
use crate::ExampleNodeId;
|
use crate::DcacheNodeId;
|
||||||
use crate::ExampleRaft;
|
use crate::DcacheRaft;
|
||||||
use crate::ExampleStore;
|
use crate::DcacheStore;
|
||||||
|
|
||||||
// Representation of an application state. This struct can be shared around to share
|
// Representation of an application state. This struct can be shared around to share
|
||||||
// instances of raft, store and more.
|
// instances of raft, store and more.
|
||||||
pub struct ExampleApp {
|
pub struct DcacheApp {
|
||||||
pub id: ExampleNodeId,
|
pub id: DcacheNodeId,
|
||||||
pub addr: String,
|
pub addr: String,
|
||||||
pub raft: ExampleRaft,
|
pub raft: DcacheRaft,
|
||||||
pub store: Arc<ExampleStore>,
|
pub store: Arc<DcacheStore>,
|
||||||
pub config: Arc<Config>,
|
pub config: Arc<Config>,
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,14 +16,14 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
use clap::Parser;
|
use clap::Parser;
|
||||||
use dcache::network::raft_network_impl::ExampleNetwork;
|
use dcache::network::raft_network_impl::DcacheNetwork;
|
||||||
use dcache::start_example_raft_node;
|
use dcache::start_example_raft_node;
|
||||||
use dcache::store::ExampleStore;
|
use dcache::store::DcacheStore;
|
||||||
use dcache::ExampleTypeConfig;
|
use dcache::DcacheTypeConfig;
|
||||||
use openraft::Raft;
|
use openraft::Raft;
|
||||||
use tracing_subscriber::EnvFilter;
|
use tracing_subscriber::EnvFilter;
|
||||||
|
|
||||||
pub type ExampleRaft = Raft<ExampleTypeConfig, ExampleNetwork, ExampleStore>;
|
pub type DcacheRaft = Raft<DcacheTypeConfig, DcacheNetwork, DcacheStore>;
|
||||||
|
|
||||||
#[derive(Parser, Clone, Debug)]
|
#[derive(Parser, Clone, Debug)]
|
||||||
#[clap(author, version, about, long_about = None)]
|
#[clap(author, version, about, long_about = None)]
|
||||||
|
|
|
@ -34,24 +34,24 @@ use serde::Serialize;
|
||||||
use tokio::time::timeout;
|
use tokio::time::timeout;
|
||||||
|
|
||||||
use crate::typ;
|
use crate::typ;
|
||||||
use crate::ExampleNodeId;
|
use crate::DcacheNodeId;
|
||||||
use crate::ExampleRequest;
|
use crate::DcacheRequest;
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
pub struct Empty {}
|
pub struct Empty {}
|
||||||
|
|
||||||
pub struct ExampleClient {
|
pub struct DcacheClient {
|
||||||
/// The leader node to send request to.
|
/// The leader node to send request to.
|
||||||
///
|
///
|
||||||
/// All traffic should be sent to the leader in a cluster.
|
/// All traffic should be sent to the leader in a cluster.
|
||||||
pub leader: Arc<Mutex<(ExampleNodeId, String)>>,
|
pub leader: Arc<Mutex<(DcacheNodeId, String)>>,
|
||||||
|
|
||||||
pub inner: Client,
|
pub inner: Client,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ExampleClient {
|
impl DcacheClient {
|
||||||
/// Create a client with a leader node id and a node manager to get node address by node id.
|
/// Create a client with a leader node id and a node manager to get node address by node id.
|
||||||
pub fn new(leader_id: ExampleNodeId, leader_addr: String) -> Self {
|
pub fn new(leader_id: DcacheNodeId, leader_addr: String) -> Self {
|
||||||
Self {
|
Self {
|
||||||
leader: Arc::new(Mutex::new((leader_id, leader_addr))),
|
leader: Arc::new(Mutex::new((leader_id, leader_addr))),
|
||||||
inner: reqwest::Client::new(),
|
inner: reqwest::Client::new(),
|
||||||
|
@ -68,7 +68,7 @@ impl ExampleClient {
|
||||||
/// The result of applying the request will be returned.
|
/// The result of applying the request will be returned.
|
||||||
pub async fn write(
|
pub async fn write(
|
||||||
&self,
|
&self,
|
||||||
req: &ExampleRequest,
|
req: &DcacheRequest,
|
||||||
) -> Result<typ::ClientWriteResponse, typ::RPCError<typ::ClientWriteError>> {
|
) -> Result<typ::ClientWriteResponse, typ::RPCError<typ::ClientWriteError>> {
|
||||||
self.send_rpc_to_leader("write", Some(req)).await
|
self.send_rpc_to_leader("write", Some(req)).await
|
||||||
}
|
}
|
||||||
|
@ -105,10 +105,10 @@ impl ExampleClient {
|
||||||
|
|
||||||
/// Add a node as learner.
|
/// Add a node as learner.
|
||||||
///
|
///
|
||||||
/// The node to add has to exist, i.e., being added with `write(ExampleRequest::AddNode{})`
|
/// The node to add has to exist, i.e., being added with `write(DcacheRequest::AddNode{})`
|
||||||
pub async fn add_learner(
|
pub async fn add_learner(
|
||||||
&self,
|
&self,
|
||||||
req: (ExampleNodeId, String),
|
req: (DcacheNodeId, String),
|
||||||
) -> Result<typ::ClientWriteResponse, typ::RPCError<typ::ClientWriteError>> {
|
) -> Result<typ::ClientWriteResponse, typ::RPCError<typ::ClientWriteError>> {
|
||||||
self.send_rpc_to_leader("add-learner", Some(&req)).await
|
self.send_rpc_to_leader("add-learner", Some(&req)).await
|
||||||
}
|
}
|
||||||
|
@ -119,7 +119,7 @@ impl ExampleClient {
|
||||||
/// or an error [`LearnerNotFound`] will be returned.
|
/// or an error [`LearnerNotFound`] will be returned.
|
||||||
pub async fn change_membership(
|
pub async fn change_membership(
|
||||||
&self,
|
&self,
|
||||||
req: &BTreeSet<ExampleNodeId>,
|
req: &BTreeSet<DcacheNodeId>,
|
||||||
) -> Result<typ::ClientWriteResponse, typ::RPCError<typ::ClientWriteError>> {
|
) -> Result<typ::ClientWriteResponse, typ::RPCError<typ::ClientWriteError>> {
|
||||||
self.send_rpc_to_leader("change-membership", Some(req))
|
self.send_rpc_to_leader("change-membership", Some(req))
|
||||||
.await
|
.await
|
||||||
|
@ -130,7 +130,7 @@ impl ExampleClient {
|
||||||
/// Metrics contains various information about the cluster, such as current leader,
|
/// Metrics contains various information about the cluster, such as current leader,
|
||||||
/// membership config, replication status etc.
|
/// membership config, replication status etc.
|
||||||
/// See [`RaftMetrics`].
|
/// See [`RaftMetrics`].
|
||||||
pub async fn metrics(&self) -> Result<RaftMetrics<ExampleNodeId, BasicNode>, typ::RPCError> {
|
pub async fn metrics(&self) -> Result<RaftMetrics<DcacheNodeId, BasicNode>, typ::RPCError> {
|
||||||
self.do_send_rpc_to_leader("metrics", None::<&()>).await
|
self.do_send_rpc_to_leader("metrics", None::<&()>).await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
42
src/lib.rs
42
src/lib.rs
|
@ -28,50 +28,50 @@ use openraft::BasicNode;
|
||||||
use openraft::Config;
|
use openraft::Config;
|
||||||
use openraft::Raft;
|
use openraft::Raft;
|
||||||
|
|
||||||
use crate::app::ExampleApp;
|
use crate::app::DcacheApp;
|
||||||
use crate::network::api;
|
use crate::network::api;
|
||||||
use crate::network::management;
|
use crate::network::management;
|
||||||
use crate::network::raft;
|
use crate::network::raft;
|
||||||
use crate::network::raft_network_impl::ExampleNetwork;
|
use crate::network::raft_network_impl::DcacheNetwork;
|
||||||
use crate::store::ExampleRequest;
|
use crate::store::DcacheRequest;
|
||||||
use crate::store::ExampleResponse;
|
use crate::store::DcacheResponse;
|
||||||
use crate::store::ExampleStore;
|
use crate::store::DcacheStore;
|
||||||
|
|
||||||
pub mod app;
|
pub mod app;
|
||||||
pub mod client;
|
pub mod client;
|
||||||
pub mod network;
|
pub mod network;
|
||||||
pub mod store;
|
pub mod store;
|
||||||
|
|
||||||
pub type ExampleNodeId = u64;
|
pub type DcacheNodeId = u64;
|
||||||
|
|
||||||
openraft::declare_raft_types!(
|
openraft::declare_raft_types!(
|
||||||
/// Declare the type configuration for example K/V store.
|
/// Declare the type configuration for example K/V store.
|
||||||
pub ExampleTypeConfig: D = ExampleRequest, R = ExampleResponse, NodeId = ExampleNodeId, Node = BasicNode
|
pub DcacheTypeConfig: D = DcacheRequest, R = DcacheResponse, NodeId = DcacheNodeId, Node = BasicNode
|
||||||
);
|
);
|
||||||
|
|
||||||
pub type ExampleRaft = Raft<ExampleTypeConfig, ExampleNetwork, Arc<ExampleStore>>;
|
pub type DcacheRaft = Raft<DcacheTypeConfig, DcacheNetwork, Arc<DcacheStore>>;
|
||||||
|
|
||||||
pub mod typ {
|
pub mod typ {
|
||||||
use openraft::BasicNode;
|
use openraft::BasicNode;
|
||||||
|
|
||||||
use crate::ExampleNodeId;
|
use crate::DcacheNodeId;
|
||||||
use crate::ExampleTypeConfig;
|
use crate::DcacheTypeConfig;
|
||||||
|
|
||||||
pub type RaftError<E = openraft::error::Infallible> =
|
pub type RaftError<E = openraft::error::Infallible> =
|
||||||
openraft::error::RaftError<ExampleNodeId, E>;
|
openraft::error::RaftError<DcacheNodeId, E>;
|
||||||
pub type RPCError<E = openraft::error::Infallible> =
|
pub type RPCError<E = openraft::error::Infallible> =
|
||||||
openraft::error::RPCError<ExampleNodeId, BasicNode, RaftError<E>>;
|
openraft::error::RPCError<DcacheNodeId, BasicNode, RaftError<E>>;
|
||||||
|
|
||||||
pub type ClientWriteError = openraft::error::ClientWriteError<ExampleNodeId, BasicNode>;
|
pub type ClientWriteError = openraft::error::ClientWriteError<DcacheNodeId, BasicNode>;
|
||||||
pub type CheckIsLeaderError = openraft::error::CheckIsLeaderError<ExampleNodeId, BasicNode>;
|
pub type CheckIsLeaderError = openraft::error::CheckIsLeaderError<DcacheNodeId, BasicNode>;
|
||||||
pub type ForwardToLeader = openraft::error::ForwardToLeader<ExampleNodeId, BasicNode>;
|
pub type ForwardToLeader = openraft::error::ForwardToLeader<DcacheNodeId, BasicNode>;
|
||||||
pub type InitializeError = openraft::error::InitializeError<ExampleNodeId, BasicNode>;
|
pub type InitializeError = openraft::error::InitializeError<DcacheNodeId, BasicNode>;
|
||||||
|
|
||||||
pub type ClientWriteResponse = openraft::raft::ClientWriteResponse<ExampleTypeConfig>;
|
pub type ClientWriteResponse = openraft::raft::ClientWriteResponse<DcacheTypeConfig>;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn start_example_raft_node(
|
pub async fn start_example_raft_node(
|
||||||
node_id: ExampleNodeId,
|
node_id: DcacheNodeId,
|
||||||
http_addr: String,
|
http_addr: String,
|
||||||
) -> std::io::Result<()> {
|
) -> std::io::Result<()> {
|
||||||
// Create a configuration for the raft instance.
|
// Create a configuration for the raft instance.
|
||||||
|
@ -89,11 +89,11 @@ pub async fn start_example_raft_node(
|
||||||
let config = Arc::new(config.validate().unwrap());
|
let config = Arc::new(config.validate().unwrap());
|
||||||
|
|
||||||
// Create a instance of where the Raft data will be stored.
|
// Create a instance of where the Raft data will be stored.
|
||||||
let store = Arc::new(ExampleStore::new(salt));
|
let store = Arc::new(DcacheStore::new(salt));
|
||||||
|
|
||||||
// Create the network layer that will connect and communicate the raft instances and
|
// Create the network layer that will connect and communicate the raft instances and
|
||||||
// will be used in conjunction with the store created above.
|
// will be used in conjunction with the store created above.
|
||||||
let network = ExampleNetwork {};
|
let network = DcacheNetwork {};
|
||||||
|
|
||||||
// Create a local raft instance.
|
// Create a local raft instance.
|
||||||
let raft = Raft::new(node_id, config.clone(), network, store.clone())
|
let raft = Raft::new(node_id, config.clone(), network, store.clone())
|
||||||
|
@ -102,7 +102,7 @@ pub async fn start_example_raft_node(
|
||||||
|
|
||||||
// Create an application that will store all the instances created above, this will
|
// Create an application that will store all the instances created above, this will
|
||||||
// be later used on the actix-web services.
|
// be later used on the actix-web services.
|
||||||
let app = Data::new(ExampleApp {
|
let app = Data::new(DcacheApp {
|
||||||
id: node_id,
|
id: node_id,
|
||||||
addr: http_addr.clone(),
|
addr: http_addr.clone(),
|
||||||
raft,
|
raft,
|
||||||
|
|
|
@ -25,9 +25,9 @@ use openraft::error::RaftError;
|
||||||
use openraft::BasicNode;
|
use openraft::BasicNode;
|
||||||
use web::Json;
|
use web::Json;
|
||||||
|
|
||||||
use crate::app::ExampleApp;
|
use crate::app::DcacheApp;
|
||||||
use crate::store::ExampleRequest;
|
use crate::store::DcacheRequest;
|
||||||
use crate::ExampleNodeId;
|
use crate::DcacheNodeId;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Application API
|
* Application API
|
||||||
|
@ -40,8 +40,8 @@ use crate::ExampleNodeId;
|
||||||
*/
|
*/
|
||||||
#[post("/write")]
|
#[post("/write")]
|
||||||
pub async fn write(
|
pub async fn write(
|
||||||
app: Data<ExampleApp>,
|
app: Data<DcacheApp>,
|
||||||
req: Json<ExampleRequest>,
|
req: Json<DcacheRequest>,
|
||||||
) -> actix_web::Result<impl Responder> {
|
) -> actix_web::Result<impl Responder> {
|
||||||
let response = app.raft.client_write(req.0).await;
|
let response = app.raft.client_write(req.0).await;
|
||||||
Ok(Json(response))
|
Ok(Json(response))
|
||||||
|
@ -51,7 +51,7 @@ pub async fn write(
|
||||||
// RenameCaptcha(RenameCaptcha),
|
// RenameCaptcha(RenameCaptcha),
|
||||||
// RemoveCaptcha(RemoveCaptcha),
|
// RemoveCaptcha(RemoveCaptcha),
|
||||||
//#[post("/post")]
|
//#[post("/post")]
|
||||||
//pub async fn read(app: Data<ExampleApp>, req: Json<String>) -> actix_web::Result<impl Responder> {
|
//pub async fn read(app: Data<DcacheApp>, req: Json<String>) -> actix_web::Result<impl Responder> {
|
||||||
// let state_machine = app.store.state_machine.read().await;
|
// let state_machine = app.store.state_machine.read().await;
|
||||||
// let key = req.0;
|
// let key = req.0;
|
||||||
// let value = state_machine.data.get(&key).cloned();
|
// let value = state_machine.data.get(&key).cloned();
|
||||||
|
@ -61,7 +61,7 @@ pub async fn write(
|
||||||
//}
|
//}
|
||||||
//
|
//
|
||||||
//#[post("/visitor/add")]
|
//#[post("/visitor/add")]
|
||||||
//pub async fn add_visitor(app: Data<ExampleApp>, req: Json<String>) -> actix_web::Result<impl Responder> {
|
//pub async fn add_visitor(app: Data<DcacheApp>, req: Json<String>) -> actix_web::Result<impl Responder> {
|
||||||
// let state_machine = app.store.state_machine.read().await;
|
// let state_machine = app.store.state_machine.read().await;
|
||||||
// let key = req.0;
|
// let key = req.0;
|
||||||
// let value = state_machine.data.get(&key).cloned();
|
// let value = state_machine.data.get(&key).cloned();
|
||||||
|
|
|
@ -28,8 +28,8 @@ use openraft::BasicNode;
|
||||||
use openraft::RaftMetrics;
|
use openraft::RaftMetrics;
|
||||||
use web::Json;
|
use web::Json;
|
||||||
|
|
||||||
use crate::app::ExampleApp;
|
use crate::app::DcacheApp;
|
||||||
use crate::ExampleNodeId;
|
use crate::DcacheNodeId;
|
||||||
|
|
||||||
// --- Cluster management
|
// --- Cluster management
|
||||||
|
|
||||||
|
@ -40,8 +40,8 @@ use crate::ExampleNodeId;
|
||||||
/// (by calling `change-membership`)
|
/// (by calling `change-membership`)
|
||||||
#[post("/add-learner")]
|
#[post("/add-learner")]
|
||||||
pub async fn add_learner(
|
pub async fn add_learner(
|
||||||
app: Data<ExampleApp>,
|
app: Data<DcacheApp>,
|
||||||
req: Json<(ExampleNodeId, String)>,
|
req: Json<(DcacheNodeId, String)>,
|
||||||
) -> actix_web::Result<impl Responder> {
|
) -> actix_web::Result<impl Responder> {
|
||||||
let node_id = req.0 .0;
|
let node_id = req.0 .0;
|
||||||
let node = BasicNode {
|
let node = BasicNode {
|
||||||
|
@ -54,8 +54,8 @@ pub async fn add_learner(
|
||||||
/// Changes specified learners to members, or remove members.
|
/// Changes specified learners to members, or remove members.
|
||||||
#[post("/change-membership")]
|
#[post("/change-membership")]
|
||||||
pub async fn change_membership(
|
pub async fn change_membership(
|
||||||
app: Data<ExampleApp>,
|
app: Data<DcacheApp>,
|
||||||
req: Json<BTreeSet<ExampleNodeId>>,
|
req: Json<BTreeSet<DcacheNodeId>>,
|
||||||
) -> actix_web::Result<impl Responder> {
|
) -> actix_web::Result<impl Responder> {
|
||||||
let res = app.raft.change_membership(req.0, false).await;
|
let res = app.raft.change_membership(req.0, false).await;
|
||||||
Ok(Json(res))
|
Ok(Json(res))
|
||||||
|
@ -63,7 +63,7 @@ pub async fn change_membership(
|
||||||
|
|
||||||
/// Initialize a single-node cluster.
|
/// Initialize a single-node cluster.
|
||||||
#[post("/init")]
|
#[post("/init")]
|
||||||
pub async fn init(app: Data<ExampleApp>) -> actix_web::Result<impl Responder> {
|
pub async fn init(app: Data<DcacheApp>) -> actix_web::Result<impl Responder> {
|
||||||
let mut nodes = BTreeMap::new();
|
let mut nodes = BTreeMap::new();
|
||||||
nodes.insert(
|
nodes.insert(
|
||||||
app.id,
|
app.id,
|
||||||
|
@ -77,9 +77,9 @@ pub async fn init(app: Data<ExampleApp>) -> actix_web::Result<impl Responder> {
|
||||||
|
|
||||||
/// Get the latest metrics of the cluster
|
/// Get the latest metrics of the cluster
|
||||||
#[get("/metrics")]
|
#[get("/metrics")]
|
||||||
pub async fn metrics(app: Data<ExampleApp>) -> actix_web::Result<impl Responder> {
|
pub async fn metrics(app: Data<DcacheApp>) -> actix_web::Result<impl Responder> {
|
||||||
let metrics = app.raft.metrics().borrow().clone();
|
let metrics = app.raft.metrics().borrow().clone();
|
||||||
|
|
||||||
let res: Result<RaftMetrics<ExampleNodeId, BasicNode>, Infallible> = Ok(metrics);
|
let res: Result<RaftMetrics<DcacheNodeId, BasicNode>, Infallible> = Ok(metrics);
|
||||||
Ok(Json(res))
|
Ok(Json(res))
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,16 +24,16 @@ use openraft::raft::InstallSnapshotRequest;
|
||||||
use openraft::raft::VoteRequest;
|
use openraft::raft::VoteRequest;
|
||||||
use web::Json;
|
use web::Json;
|
||||||
|
|
||||||
use crate::app::ExampleApp;
|
use crate::app::DcacheApp;
|
||||||
use crate::ExampleNodeId;
|
use crate::DcacheNodeId;
|
||||||
use crate::ExampleTypeConfig;
|
use crate::DcacheTypeConfig;
|
||||||
|
|
||||||
// --- Raft communication
|
// --- Raft communication
|
||||||
|
|
||||||
#[post("/raft-vote")]
|
#[post("/raft-vote")]
|
||||||
pub async fn vote(
|
pub async fn vote(
|
||||||
app: Data<ExampleApp>,
|
app: Data<DcacheApp>,
|
||||||
req: Json<VoteRequest<ExampleNodeId>>,
|
req: Json<VoteRequest<DcacheNodeId>>,
|
||||||
) -> actix_web::Result<impl Responder> {
|
) -> actix_web::Result<impl Responder> {
|
||||||
let res = app.raft.vote(req.0).await;
|
let res = app.raft.vote(req.0).await;
|
||||||
Ok(Json(res))
|
Ok(Json(res))
|
||||||
|
@ -41,8 +41,8 @@ pub async fn vote(
|
||||||
|
|
||||||
#[post("/raft-append")]
|
#[post("/raft-append")]
|
||||||
pub async fn append(
|
pub async fn append(
|
||||||
app: Data<ExampleApp>,
|
app: Data<DcacheApp>,
|
||||||
req: Json<AppendEntriesRequest<ExampleTypeConfig>>,
|
req: Json<AppendEntriesRequest<DcacheTypeConfig>>,
|
||||||
) -> actix_web::Result<impl Responder> {
|
) -> actix_web::Result<impl Responder> {
|
||||||
let res = app.raft.append_entries(req.0).await;
|
let res = app.raft.append_entries(req.0).await;
|
||||||
Ok(Json(res))
|
Ok(Json(res))
|
||||||
|
@ -50,8 +50,8 @@ pub async fn append(
|
||||||
|
|
||||||
#[post("/raft-snapshot")]
|
#[post("/raft-snapshot")]
|
||||||
pub async fn snapshot(
|
pub async fn snapshot(
|
||||||
app: Data<ExampleApp>,
|
app: Data<DcacheApp>,
|
||||||
req: Json<InstallSnapshotRequest<ExampleTypeConfig>>,
|
req: Json<InstallSnapshotRequest<DcacheTypeConfig>>,
|
||||||
) -> actix_web::Result<impl Responder> {
|
) -> actix_web::Result<impl Responder> {
|
||||||
let res = app.raft.install_snapshot(req.0).await;
|
let res = app.raft.install_snapshot(req.0).await;
|
||||||
Ok(Json(res))
|
Ok(Json(res))
|
||||||
|
|
|
@ -33,19 +33,19 @@ use openraft::RaftNetworkFactory;
|
||||||
use serde::de::DeserializeOwned;
|
use serde::de::DeserializeOwned;
|
||||||
use serde::Serialize;
|
use serde::Serialize;
|
||||||
|
|
||||||
use crate::ExampleNodeId;
|
use crate::DcacheNodeId;
|
||||||
use crate::ExampleTypeConfig;
|
use crate::DcacheTypeConfig;
|
||||||
|
|
||||||
pub struct ExampleNetwork {}
|
pub struct DcacheNetwork {}
|
||||||
|
|
||||||
impl ExampleNetwork {
|
impl DcacheNetwork {
|
||||||
pub async fn send_rpc<Req, Resp, Err>(
|
pub async fn send_rpc<Req, Resp, Err>(
|
||||||
&self,
|
&self,
|
||||||
target: ExampleNodeId,
|
target: DcacheNodeId,
|
||||||
target_node: &BasicNode,
|
target_node: &BasicNode,
|
||||||
uri: &str,
|
uri: &str,
|
||||||
req: Req,
|
req: Req,
|
||||||
) -> Result<Resp, RPCError<ExampleNodeId, BasicNode, Err>>
|
) -> Result<Resp, RPCError<DcacheNodeId, BasicNode, Err>>
|
||||||
where
|
where
|
||||||
Req: Serialize,
|
Req: Serialize,
|
||||||
Err: std::error::Error + DeserializeOwned,
|
Err: std::error::Error + DeserializeOwned,
|
||||||
|
@ -79,35 +79,35 @@ impl ExampleNetwork {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// NOTE: This could be implemented also on `Arc<ExampleNetwork>`, but since it's empty, implemented
|
// NOTE: This could be implemented also on `Arc<DcacheNetwork>`, but since it's empty, implemented
|
||||||
// directly.
|
// directly.
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl RaftNetworkFactory<ExampleTypeConfig> for ExampleNetwork {
|
impl RaftNetworkFactory<DcacheTypeConfig> for DcacheNetwork {
|
||||||
type Network = ExampleNetworkConnection;
|
type Network = DcacheNetworkConnection;
|
||||||
|
|
||||||
async fn new_client(&mut self, target: ExampleNodeId, node: &BasicNode) -> Self::Network {
|
async fn new_client(&mut self, target: DcacheNodeId, node: &BasicNode) -> Self::Network {
|
||||||
ExampleNetworkConnection {
|
DcacheNetworkConnection {
|
||||||
owner: ExampleNetwork {},
|
owner: DcacheNetwork {},
|
||||||
target,
|
target,
|
||||||
target_node: node.clone(),
|
target_node: node.clone(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct ExampleNetworkConnection {
|
pub struct DcacheNetworkConnection {
|
||||||
owner: ExampleNetwork,
|
owner: DcacheNetwork,
|
||||||
target: ExampleNodeId,
|
target: DcacheNodeId,
|
||||||
target_node: BasicNode,
|
target_node: BasicNode,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl RaftNetwork<ExampleTypeConfig> for ExampleNetworkConnection {
|
impl RaftNetwork<DcacheTypeConfig> for DcacheNetworkConnection {
|
||||||
async fn send_append_entries(
|
async fn send_append_entries(
|
||||||
&mut self,
|
&mut self,
|
||||||
req: AppendEntriesRequest<ExampleTypeConfig>,
|
req: AppendEntriesRequest<DcacheTypeConfig>,
|
||||||
) -> Result<
|
) -> Result<
|
||||||
AppendEntriesResponse<ExampleNodeId>,
|
AppendEntriesResponse<DcacheNodeId>,
|
||||||
RPCError<ExampleNodeId, BasicNode, RaftError<ExampleNodeId>>,
|
RPCError<DcacheNodeId, BasicNode, RaftError<DcacheNodeId>>,
|
||||||
> {
|
> {
|
||||||
self.owner
|
self.owner
|
||||||
.send_rpc(self.target, &self.target_node, "raft-append", req)
|
.send_rpc(self.target, &self.target_node, "raft-append", req)
|
||||||
|
@ -116,10 +116,10 @@ impl RaftNetwork<ExampleTypeConfig> for ExampleNetworkConnection {
|
||||||
|
|
||||||
async fn send_install_snapshot(
|
async fn send_install_snapshot(
|
||||||
&mut self,
|
&mut self,
|
||||||
req: InstallSnapshotRequest<ExampleTypeConfig>,
|
req: InstallSnapshotRequest<DcacheTypeConfig>,
|
||||||
) -> Result<
|
) -> Result<
|
||||||
InstallSnapshotResponse<ExampleNodeId>,
|
InstallSnapshotResponse<DcacheNodeId>,
|
||||||
RPCError<ExampleNodeId, BasicNode, RaftError<ExampleNodeId, InstallSnapshotError>>,
|
RPCError<DcacheNodeId, BasicNode, RaftError<DcacheNodeId, InstallSnapshotError>>,
|
||||||
> {
|
> {
|
||||||
self.owner
|
self.owner
|
||||||
.send_rpc(self.target, &self.target_node, "raft-snapshot", req)
|
.send_rpc(self.target, &self.target_node, "raft-snapshot", req)
|
||||||
|
@ -128,10 +128,10 @@ impl RaftNetwork<ExampleTypeConfig> for ExampleNetworkConnection {
|
||||||
|
|
||||||
async fn send_vote(
|
async fn send_vote(
|
||||||
&mut self,
|
&mut self,
|
||||||
req: VoteRequest<ExampleNodeId>,
|
req: VoteRequest<DcacheNodeId>,
|
||||||
) -> Result<
|
) -> Result<
|
||||||
VoteResponse<ExampleNodeId>,
|
VoteResponse<DcacheNodeId>,
|
||||||
RPCError<ExampleNodeId, BasicNode, RaftError<ExampleNodeId>>,
|
RPCError<DcacheNodeId, BasicNode, RaftError<DcacheNodeId>>,
|
||||||
> {
|
> {
|
||||||
self.owner
|
self.owner
|
||||||
.send_rpc(self.target, &self.target_node, "raft-vote", req)
|
.send_rpc(self.target, &self.target_node, "raft-vote", req)
|
||||||
|
|
120
src/store/mod.rs
120
src/store/mod.rs
|
@ -49,8 +49,8 @@ use sqlx::Statement;
|
||||||
use tokio::sync::RwLock;
|
use tokio::sync::RwLock;
|
||||||
use url::quirks::set_pathname;
|
use url::quirks::set_pathname;
|
||||||
|
|
||||||
use crate::ExampleNodeId;
|
use crate::DcacheNodeId;
|
||||||
use crate::ExampleTypeConfig;
|
use crate::DcacheTypeConfig;
|
||||||
|
|
||||||
use actix::prelude::*;
|
use actix::prelude::*;
|
||||||
use libmcaptcha::master::messages::{
|
use libmcaptcha::master::messages::{
|
||||||
|
@ -68,7 +68,7 @@ pub mod system;
|
||||||
* You will want to add any request that can write data in all nodes here.
|
* You will want to add any request that can write data in all nodes here.
|
||||||
*/
|
*/
|
||||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||||
pub enum ExampleRequest {
|
pub enum DcacheRequest {
|
||||||
//Set { key: String, value: String },
|
//Set { key: String, value: String },
|
||||||
AddVisitor(AddVisitor),
|
AddVisitor(AddVisitor),
|
||||||
AddCaptcha(AddCaptcha),
|
AddCaptcha(AddCaptcha),
|
||||||
|
@ -79,13 +79,13 @@ pub enum ExampleRequest {
|
||||||
/**
|
/**
|
||||||
* Here you will defined what type of answer you expect from reading the data of a node.
|
* Here you will defined what type of answer you expect from reading the data of a node.
|
||||||
* In this example it will return a optional value from a given key in
|
* In this example it will return a optional value from a given key in
|
||||||
* the `ExampleRequest.Set`.
|
* the `DcacheRequest.Set`.
|
||||||
*
|
*
|
||||||
* TODO: Should we explain how to create multiple `AppDataResponse`?
|
* TODO: Should we explain how to create multiple `AppDataResponse`?
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||||
pub enum ExampleResponse {
|
pub enum DcacheResponse {
|
||||||
AddVisitorResult(Option<AddVisitorResult>),
|
AddVisitorResult(Option<AddVisitorResult>),
|
||||||
Empty,
|
Empty,
|
||||||
// AddCaptchaResult, All returns ()
|
// AddCaptchaResult, All returns ()
|
||||||
|
@ -94,8 +94,8 @@ pub enum ExampleResponse {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct ExampleSnapshot {
|
pub struct DcacheSnapshot {
|
||||||
pub meta: SnapshotMeta<ExampleNodeId, BasicNode>,
|
pub meta: SnapshotMeta<DcacheNodeId, BasicNode>,
|
||||||
|
|
||||||
/// The data of the state machine at the time of this snapshot.
|
/// The data of the state machine at the time of this snapshot.
|
||||||
pub data: Vec<u8>,
|
pub data: Vec<u8>,
|
||||||
|
@ -107,10 +107,10 @@ pub struct ExampleSnapshot {
|
||||||
* a implementation to be serialized. Note that for this test we set both the key and
|
* a implementation to be serialized. Note that for this test we set both the key and
|
||||||
* value as String, but you could set any type of value that has the serialization impl.
|
* value as String, but you could set any type of value that has the serialization impl.
|
||||||
*/
|
*/
|
||||||
pub struct ExampleStateMachine {
|
pub struct DcacheStateMachine {
|
||||||
pub last_applied_log: Option<LogId<ExampleNodeId>>,
|
pub last_applied_log: Option<LogId<DcacheNodeId>>,
|
||||||
|
|
||||||
pub last_membership: StoredMembership<ExampleNodeId, BasicNode>,
|
pub last_membership: StoredMembership<DcacheNodeId, BasicNode>,
|
||||||
|
|
||||||
/// Application data.
|
/// Application data.
|
||||||
pub data: Arc<System<HashCache, EmbeddedMaster>>,
|
pub data: Arc<System<HashCache, EmbeddedMaster>>,
|
||||||
|
@ -118,16 +118,16 @@ pub struct ExampleStateMachine {
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, Clone)]
|
#[derive(Serialize, Deserialize, Clone)]
|
||||||
struct PersistableStateMachine {
|
struct PersistableStateMachine {
|
||||||
last_applied_log: Option<LogId<ExampleNodeId>>,
|
last_applied_log: Option<LogId<DcacheNodeId>>,
|
||||||
|
|
||||||
last_membership: StoredMembership<ExampleNodeId, BasicNode>,
|
last_membership: StoredMembership<DcacheNodeId, BasicNode>,
|
||||||
|
|
||||||
/// Application data.
|
/// Application data.
|
||||||
data: HashMap<String, MCaptcha>,
|
data: HashMap<String, MCaptcha>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl PersistableStateMachine {
|
impl PersistableStateMachine {
|
||||||
async fn from_statemachine(m: &ExampleStateMachine) -> Self {
|
async fn from_statemachine(m: &DcacheStateMachine) -> Self {
|
||||||
let internal_data = m
|
let internal_data = m
|
||||||
.data
|
.data
|
||||||
.master
|
.master
|
||||||
|
@ -147,14 +147,14 @@ impl PersistableStateMachine {
|
||||||
async fn to_statemachine(
|
async fn to_statemachine(
|
||||||
self,
|
self,
|
||||||
data: Arc<System<HashCache, EmbeddedMaster>>,
|
data: Arc<System<HashCache, EmbeddedMaster>>,
|
||||||
) -> ExampleStateMachine {
|
) -> DcacheStateMachine {
|
||||||
data.master
|
data.master
|
||||||
.send(SetInternalData {
|
.send(SetInternalData {
|
||||||
mcaptcha: self.data,
|
mcaptcha: self.data,
|
||||||
})
|
})
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
ExampleStateMachine {
|
DcacheStateMachine {
|
||||||
last_applied_log: self.last_applied_log,
|
last_applied_log: self.last_applied_log,
|
||||||
last_membership: self.last_membership,
|
last_membership: self.last_membership,
|
||||||
data,
|
data,
|
||||||
|
@ -162,26 +162,26 @@ impl PersistableStateMachine {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct ExampleStore {
|
pub struct DcacheStore {
|
||||||
last_purged_log_id: RwLock<Option<LogId<ExampleNodeId>>>,
|
last_purged_log_id: RwLock<Option<LogId<DcacheNodeId>>>,
|
||||||
|
|
||||||
/// The Raft log.
|
/// The Raft log.
|
||||||
log: RwLock<BTreeMap<u64, Entry<ExampleTypeConfig>>>,
|
log: RwLock<BTreeMap<u64, Entry<DcacheTypeConfig>>>,
|
||||||
|
|
||||||
/// The Raft state machine.
|
/// The Raft state machine.
|
||||||
pub state_machine: RwLock<ExampleStateMachine>,
|
pub state_machine: RwLock<DcacheStateMachine>,
|
||||||
|
|
||||||
/// The current granted vote.
|
/// The current granted vote.
|
||||||
vote: RwLock<Option<Vote<ExampleNodeId>>>,
|
vote: RwLock<Option<Vote<DcacheNodeId>>>,
|
||||||
|
|
||||||
snapshot_idx: Arc<Mutex<u64>>,
|
snapshot_idx: Arc<Mutex<u64>>,
|
||||||
|
|
||||||
current_snapshot: RwLock<Option<ExampleSnapshot>>,
|
current_snapshot: RwLock<Option<DcacheSnapshot>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ExampleStore {
|
impl DcacheStore {
|
||||||
pub fn new(salt: String) -> Self {
|
pub fn new(salt: String) -> Self {
|
||||||
let state_machine = RwLock::new(ExampleStateMachine {
|
let state_machine = RwLock::new(DcacheStateMachine {
|
||||||
last_applied_log: Default::default(),
|
last_applied_log: Default::default(),
|
||||||
last_membership: Default::default(),
|
last_membership: Default::default(),
|
||||||
data: system::init_system(salt),
|
data: system::init_system(salt),
|
||||||
|
@ -199,10 +199,10 @@ impl ExampleStore {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl RaftLogReader<ExampleTypeConfig> for Arc<ExampleStore> {
|
impl RaftLogReader<DcacheTypeConfig> for Arc<DcacheStore> {
|
||||||
async fn get_log_state(
|
async fn get_log_state(
|
||||||
&mut self,
|
&mut self,
|
||||||
) -> Result<LogState<ExampleTypeConfig>, StorageError<ExampleNodeId>> {
|
) -> Result<LogState<DcacheTypeConfig>, StorageError<DcacheNodeId>> {
|
||||||
let log = self.log.read().await;
|
let log = self.log.read().await;
|
||||||
let last = log.iter().rev().next().map(|(_, ent)| ent.log_id);
|
let last = log.iter().rev().next().map(|(_, ent)| ent.log_id);
|
||||||
|
|
||||||
|
@ -222,7 +222,7 @@ impl RaftLogReader<ExampleTypeConfig> for Arc<ExampleStore> {
|
||||||
async fn try_get_log_entries<RB: RangeBounds<u64> + Clone + Debug + Send + Sync>(
|
async fn try_get_log_entries<RB: RangeBounds<u64> + Clone + Debug + Send + Sync>(
|
||||||
&mut self,
|
&mut self,
|
||||||
range: RB,
|
range: RB,
|
||||||
) -> Result<Vec<Entry<ExampleTypeConfig>>, StorageError<ExampleNodeId>> {
|
) -> Result<Vec<Entry<DcacheTypeConfig>>, StorageError<DcacheNodeId>> {
|
||||||
let log = self.log.read().await;
|
let log = self.log.read().await;
|
||||||
let response = log
|
let response = log
|
||||||
.range(range.clone())
|
.range(range.clone())
|
||||||
|
@ -233,11 +233,11 @@ impl RaftLogReader<ExampleTypeConfig> for Arc<ExampleStore> {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl RaftSnapshotBuilder<ExampleTypeConfig, Cursor<Vec<u8>>> for Arc<ExampleStore> {
|
impl RaftSnapshotBuilder<DcacheTypeConfig, Cursor<Vec<u8>>> for Arc<DcacheStore> {
|
||||||
#[tracing::instrument(level = "trace", skip(self))]
|
#[tracing::instrument(level = "trace", skip(self))]
|
||||||
async fn build_snapshot(
|
async fn build_snapshot(
|
||||||
&mut self,
|
&mut self,
|
||||||
) -> Result<Snapshot<ExampleNodeId, BasicNode, Cursor<Vec<u8>>>, StorageError<ExampleNodeId>>
|
) -> Result<Snapshot<DcacheNodeId, BasicNode, Cursor<Vec<u8>>>, StorageError<DcacheNodeId>>
|
||||||
{
|
{
|
||||||
let data;
|
let data;
|
||||||
let last_applied_log;
|
let last_applied_log;
|
||||||
|
@ -279,7 +279,7 @@ impl RaftSnapshotBuilder<ExampleTypeConfig, Cursor<Vec<u8>>> for Arc<ExampleStor
|
||||||
snapshot_id,
|
snapshot_id,
|
||||||
};
|
};
|
||||||
|
|
||||||
let snapshot = ExampleSnapshot {
|
let snapshot = DcacheSnapshot {
|
||||||
meta: meta.clone(),
|
meta: meta.clone(),
|
||||||
data: data.clone(),
|
data: data.clone(),
|
||||||
};
|
};
|
||||||
|
@ -297,7 +297,7 @@ impl RaftSnapshotBuilder<ExampleTypeConfig, Cursor<Vec<u8>>> for Arc<ExampleStor
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl RaftStorage<ExampleTypeConfig> for Arc<ExampleStore> {
|
impl RaftStorage<DcacheTypeConfig> for Arc<DcacheStore> {
|
||||||
type SnapshotData = Cursor<Vec<u8>>;
|
type SnapshotData = Cursor<Vec<u8>>;
|
||||||
type LogReader = Self;
|
type LogReader = Self;
|
||||||
type SnapshotBuilder = Self;
|
type SnapshotBuilder = Self;
|
||||||
|
@ -305,8 +305,8 @@ impl RaftStorage<ExampleTypeConfig> for Arc<ExampleStore> {
|
||||||
#[tracing::instrument(level = "trace", skip(self))]
|
#[tracing::instrument(level = "trace", skip(self))]
|
||||||
async fn save_vote(
|
async fn save_vote(
|
||||||
&mut self,
|
&mut self,
|
||||||
vote: &Vote<ExampleNodeId>,
|
vote: &Vote<DcacheNodeId>,
|
||||||
) -> Result<(), StorageError<ExampleNodeId>> {
|
) -> Result<(), StorageError<DcacheNodeId>> {
|
||||||
let mut v = self.vote.write().await;
|
let mut v = self.vote.write().await;
|
||||||
*v = Some(*vote);
|
*v = Some(*vote);
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -314,15 +314,15 @@ impl RaftStorage<ExampleTypeConfig> for Arc<ExampleStore> {
|
||||||
|
|
||||||
async fn read_vote(
|
async fn read_vote(
|
||||||
&mut self,
|
&mut self,
|
||||||
) -> Result<Option<Vote<ExampleNodeId>>, StorageError<ExampleNodeId>> {
|
) -> Result<Option<Vote<DcacheNodeId>>, StorageError<DcacheNodeId>> {
|
||||||
Ok(*self.vote.read().await)
|
Ok(*self.vote.read().await)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(level = "trace", skip(self, entries))]
|
#[tracing::instrument(level = "trace", skip(self, entries))]
|
||||||
async fn append_to_log(
|
async fn append_to_log(
|
||||||
&mut self,
|
&mut self,
|
||||||
entries: &[&Entry<ExampleTypeConfig>],
|
entries: &[&Entry<DcacheTypeConfig>],
|
||||||
) -> Result<(), StorageError<ExampleNodeId>> {
|
) -> Result<(), StorageError<DcacheNodeId>> {
|
||||||
let mut log = self.log.write().await;
|
let mut log = self.log.write().await;
|
||||||
for entry in entries {
|
for entry in entries {
|
||||||
log.insert(entry.log_id.index, (*entry).clone());
|
log.insert(entry.log_id.index, (*entry).clone());
|
||||||
|
@ -333,8 +333,8 @@ impl RaftStorage<ExampleTypeConfig> for Arc<ExampleStore> {
|
||||||
#[tracing::instrument(level = "debug", skip(self))]
|
#[tracing::instrument(level = "debug", skip(self))]
|
||||||
async fn delete_conflict_logs_since(
|
async fn delete_conflict_logs_since(
|
||||||
&mut self,
|
&mut self,
|
||||||
log_id: LogId<ExampleNodeId>,
|
log_id: LogId<DcacheNodeId>,
|
||||||
) -> Result<(), StorageError<ExampleNodeId>> {
|
) -> Result<(), StorageError<DcacheNodeId>> {
|
||||||
tracing::debug!("delete_log: [{:?}, +oo)", log_id);
|
tracing::debug!("delete_log: [{:?}, +oo)", log_id);
|
||||||
|
|
||||||
let mut log = self.log.write().await;
|
let mut log = self.log.write().await;
|
||||||
|
@ -352,8 +352,8 @@ impl RaftStorage<ExampleTypeConfig> for Arc<ExampleStore> {
|
||||||
#[tracing::instrument(level = "debug", skip(self))]
|
#[tracing::instrument(level = "debug", skip(self))]
|
||||||
async fn purge_logs_upto(
|
async fn purge_logs_upto(
|
||||||
&mut self,
|
&mut self,
|
||||||
log_id: LogId<ExampleNodeId>,
|
log_id: LogId<DcacheNodeId>,
|
||||||
) -> Result<(), StorageError<ExampleNodeId>> {
|
) -> Result<(), StorageError<DcacheNodeId>> {
|
||||||
tracing::debug!("delete_log: [{:?}, +oo)", log_id);
|
tracing::debug!("delete_log: [{:?}, +oo)", log_id);
|
||||||
|
|
||||||
{
|
{
|
||||||
|
@ -381,10 +381,10 @@ impl RaftStorage<ExampleTypeConfig> for Arc<ExampleStore> {
|
||||||
&mut self,
|
&mut self,
|
||||||
) -> Result<
|
) -> Result<
|
||||||
(
|
(
|
||||||
Option<LogId<ExampleNodeId>>,
|
Option<LogId<DcacheNodeId>>,
|
||||||
StoredMembership<ExampleNodeId, BasicNode>,
|
StoredMembership<DcacheNodeId, BasicNode>,
|
||||||
),
|
),
|
||||||
StorageError<ExampleNodeId>,
|
StorageError<DcacheNodeId>,
|
||||||
> {
|
> {
|
||||||
let state_machine = self.state_machine.read().await;
|
let state_machine = self.state_machine.read().await;
|
||||||
Ok((
|
Ok((
|
||||||
|
@ -396,8 +396,8 @@ impl RaftStorage<ExampleTypeConfig> for Arc<ExampleStore> {
|
||||||
#[tracing::instrument(level = "trace", skip(self, entries))]
|
#[tracing::instrument(level = "trace", skip(self, entries))]
|
||||||
async fn apply_to_state_machine(
|
async fn apply_to_state_machine(
|
||||||
&mut self,
|
&mut self,
|
||||||
entries: &[&Entry<ExampleTypeConfig>],
|
entries: &[&Entry<DcacheTypeConfig>],
|
||||||
) -> Result<Vec<ExampleResponse>, StorageError<ExampleNodeId>> {
|
) -> Result<Vec<DcacheResponse>, StorageError<DcacheNodeId>> {
|
||||||
let mut res = Vec::with_capacity(entries.len());
|
let mut res = Vec::with_capacity(entries.len());
|
||||||
|
|
||||||
let mut sm = self.state_machine.write().await;
|
let mut sm = self.state_machine.write().await;
|
||||||
|
@ -408,9 +408,9 @@ impl RaftStorage<ExampleTypeConfig> for Arc<ExampleStore> {
|
||||||
sm.last_applied_log = Some(entry.log_id);
|
sm.last_applied_log = Some(entry.log_id);
|
||||||
|
|
||||||
match entry.payload {
|
match entry.payload {
|
||||||
EntryPayload::Blank => res.push(ExampleResponse::Empty),
|
EntryPayload::Blank => res.push(DcacheResponse::Empty),
|
||||||
EntryPayload::Normal(ref req) => match req {
|
EntryPayload::Normal(ref req) => match req {
|
||||||
ExampleRequest::AddVisitor(msg) => {
|
DcacheRequest::AddVisitor(msg) => {
|
||||||
let r = sm
|
let r = sm
|
||||||
.data
|
.data
|
||||||
.master
|
.master
|
||||||
|
@ -421,9 +421,9 @@ impl RaftStorage<ExampleTypeConfig> for Arc<ExampleStore> {
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
res.push(ExampleResponse::AddVisitorResult(r));
|
res.push(DcacheResponse::AddVisitorResult(r));
|
||||||
}
|
}
|
||||||
ExampleRequest::AddCaptcha(msg) => {
|
DcacheRequest::AddCaptcha(msg) => {
|
||||||
sm.data
|
sm.data
|
||||||
.master
|
.master
|
||||||
.send(msg.clone())
|
.send(msg.clone())
|
||||||
|
@ -432,9 +432,9 @@ impl RaftStorage<ExampleTypeConfig> for Arc<ExampleStore> {
|
||||||
.await
|
.await
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.unwrap();
|
.unwrap();
|
||||||
res.push(ExampleResponse::Empty);
|
res.push(DcacheResponse::Empty);
|
||||||
}
|
}
|
||||||
ExampleRequest::RenameCaptcha(msg) => {
|
DcacheRequest::RenameCaptcha(msg) => {
|
||||||
sm.data
|
sm.data
|
||||||
.master
|
.master
|
||||||
.send(msg.clone())
|
.send(msg.clone())
|
||||||
|
@ -443,9 +443,9 @@ impl RaftStorage<ExampleTypeConfig> for Arc<ExampleStore> {
|
||||||
.await
|
.await
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.unwrap();
|
.unwrap();
|
||||||
res.push(ExampleResponse::Empty);
|
res.push(DcacheResponse::Empty);
|
||||||
}
|
}
|
||||||
ExampleRequest::RemoveCaptcha(msg) => {
|
DcacheRequest::RemoveCaptcha(msg) => {
|
||||||
sm.data
|
sm.data
|
||||||
.master
|
.master
|
||||||
.send(msg.clone())
|
.send(msg.clone())
|
||||||
|
@ -454,12 +454,12 @@ impl RaftStorage<ExampleTypeConfig> for Arc<ExampleStore> {
|
||||||
.await
|
.await
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.unwrap();
|
.unwrap();
|
||||||
res.push(ExampleResponse::Empty);
|
res.push(DcacheResponse::Empty);
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
EntryPayload::Membership(ref mem) => {
|
EntryPayload::Membership(ref mem) => {
|
||||||
sm.last_membership = StoredMembership::new(Some(entry.log_id), mem.clone());
|
sm.last_membership = StoredMembership::new(Some(entry.log_id), mem.clone());
|
||||||
res.push(ExampleResponse::Empty)
|
res.push(DcacheResponse::Empty)
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
@ -469,22 +469,22 @@ impl RaftStorage<ExampleTypeConfig> for Arc<ExampleStore> {
|
||||||
#[tracing::instrument(level = "trace", skip(self))]
|
#[tracing::instrument(level = "trace", skip(self))]
|
||||||
async fn begin_receiving_snapshot(
|
async fn begin_receiving_snapshot(
|
||||||
&mut self,
|
&mut self,
|
||||||
) -> Result<Box<Self::SnapshotData>, StorageError<ExampleNodeId>> {
|
) -> Result<Box<Self::SnapshotData>, StorageError<DcacheNodeId>> {
|
||||||
Ok(Box::new(Cursor::new(Vec::new())))
|
Ok(Box::new(Cursor::new(Vec::new())))
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(level = "trace", skip(self, snapshot))]
|
#[tracing::instrument(level = "trace", skip(self, snapshot))]
|
||||||
async fn install_snapshot(
|
async fn install_snapshot(
|
||||||
&mut self,
|
&mut self,
|
||||||
meta: &SnapshotMeta<ExampleNodeId, BasicNode>,
|
meta: &SnapshotMeta<DcacheNodeId, BasicNode>,
|
||||||
snapshot: Box<Self::SnapshotData>,
|
snapshot: Box<Self::SnapshotData>,
|
||||||
) -> Result<(), StorageError<ExampleNodeId>> {
|
) -> Result<(), StorageError<DcacheNodeId>> {
|
||||||
tracing::info!(
|
tracing::info!(
|
||||||
{ snapshot_size = snapshot.get_ref().len() },
|
{ snapshot_size = snapshot.get_ref().len() },
|
||||||
"decoding snapshot for installation"
|
"decoding snapshot for installation"
|
||||||
);
|
);
|
||||||
|
|
||||||
let new_snapshot = ExampleSnapshot {
|
let new_snapshot = DcacheSnapshot {
|
||||||
meta: meta.clone(),
|
meta: meta.clone(),
|
||||||
data: snapshot.into_inner(),
|
data: snapshot.into_inner(),
|
||||||
};
|
};
|
||||||
|
@ -517,8 +517,8 @@ impl RaftStorage<ExampleTypeConfig> for Arc<ExampleStore> {
|
||||||
async fn get_current_snapshot(
|
async fn get_current_snapshot(
|
||||||
&mut self,
|
&mut self,
|
||||||
) -> Result<
|
) -> Result<
|
||||||
Option<Snapshot<ExampleNodeId, BasicNode, Self::SnapshotData>>,
|
Option<Snapshot<DcacheNodeId, BasicNode, Self::SnapshotData>>,
|
||||||
StorageError<ExampleNodeId>,
|
StorageError<DcacheNodeId>,
|
||||||
> {
|
> {
|
||||||
match &*self.current_snapshot.read().await {
|
match &*self.current_snapshot.read().await {
|
||||||
Some(snapshot) => {
|
Some(snapshot) => {
|
||||||
|
|
Loading…
Reference in a new issue