|
@@ -1,8 +1,8 @@
|
|
|
-use crossbeam_utils::sync::WaitGroup;
|
|
|
|
|
use std::sync::atomic::{AtomicBool, Ordering};
|
|
use std::sync::atomic::{AtomicBool, Ordering};
|
|
|
use std::sync::Arc;
|
|
use std::sync::Arc;
|
|
|
use std::time::Duration;
|
|
use std::time::Duration;
|
|
|
|
|
|
|
|
|
|
+use crossbeam_utils::sync::WaitGroup;
|
|
|
use parking_lot::{Condvar, Mutex};
|
|
use parking_lot::{Condvar, Mutex};
|
|
|
use serde_derive::{Deserialize, Serialize};
|
|
use serde_derive::{Deserialize, Serialize};
|
|
|
|
|
|
|
@@ -60,7 +60,7 @@ impl<Command: ReplicableCommand> Raft<Command> {
|
|
|
pub fn new(
|
|
pub fn new(
|
|
|
peers: Vec<impl RemoteRaft<Command> + 'static>,
|
|
peers: Vec<impl RemoteRaft<Command> + 'static>,
|
|
|
me: usize,
|
|
me: usize,
|
|
|
- persister: Arc<dyn Persister>,
|
|
|
|
|
|
|
+ persister: impl Persister + 'static,
|
|
|
apply_command: impl ApplyCommandFnMut<Command>,
|
|
apply_command: impl ApplyCommandFnMut<Command>,
|
|
|
max_state_size_bytes: Option<usize>,
|
|
max_state_size_bytes: Option<usize>,
|
|
|
request_snapshot: impl RequestSnapshotFnMut,
|
|
request_snapshot: impl RequestSnapshotFnMut,
|
|
@@ -95,6 +95,7 @@ impl<Command: ReplicableCommand> Raft<Command> {
|
|
|
let election = Arc::new(ElectionState::create());
|
|
let election = Arc::new(ElectionState::create());
|
|
|
election.reset_election_timer();
|
|
election.reset_election_timer();
|
|
|
|
|
|
|
|
|
|
+ let persister = Arc::new(persister);
|
|
|
let term_marker = TermMarker::create(
|
|
let term_marker = TermMarker::create(
|
|
|
inner_state.clone(),
|
|
inner_state.clone(),
|
|
|
election.clone(),
|
|
election.clone(),
|
|
@@ -133,7 +134,7 @@ impl<Command: ReplicableCommand> Raft<Command> {
|
|
|
})
|
|
})
|
|
|
.build()
|
|
.build()
|
|
|
.expect("Creating thread pool should not fail");
|
|
.expect("Creating thread pool should not fail");
|
|
|
- let peers = (0..peer_size).map(Peer).collect();
|
|
|
|
|
|
|
+ let peers = (0..peer_size).filter(|p| *p != me).map(Peer).collect();
|
|
|
let (sync_log_entries_comms, sync_log_entries_daemon) =
|
|
let (sync_log_entries_comms, sync_log_entries_daemon) =
|
|
|
crate::sync_log_entries::create(peer_size);
|
|
crate::sync_log_entries::create(peer_size);
|
|
|
|
|
|
|
@@ -285,6 +286,11 @@ impl RaftJoinHandle {
|
|
|
|
|
|
|
|
#[cfg(test)]
|
|
#[cfg(test)]
|
|
|
mod tests {
|
|
mod tests {
|
|
|
|
|
+ use crate::utils::do_nothing::{DoNothingPersister, DoNothingRemoteRaft};
|
|
|
|
|
+ use crate::ApplyCommandMessage;
|
|
|
|
|
+
|
|
|
|
|
+ use super::*;
|
|
|
|
|
+
|
|
|
#[test]
|
|
#[test]
|
|
|
fn test_raft_must_sync() {
|
|
fn test_raft_must_sync() {
|
|
|
let optional_raft: Option<super::Raft<i32>> = None;
|
|
let optional_raft: Option<super::Raft<i32>> = None;
|
|
@@ -296,4 +302,24 @@ mod tests {
|
|
|
// The following raft is not Sync.
|
|
// The following raft is not Sync.
|
|
|
// let optional_raft: Option<super::Raft<std::rc::Rc<i32>>> = None;
|
|
// let optional_raft: Option<super::Raft<std::rc::Rc<i32>>> = None;
|
|
|
}
|
|
}
|
|
|
|
|
+
|
|
|
|
|
+ #[test]
|
|
|
|
|
+ fn test_no_me_in_peers() {
|
|
|
|
|
+ let peer_size = 5;
|
|
|
|
|
+ let me = 2;
|
|
|
|
|
+
|
|
|
|
|
+ let raft = Raft::new(
|
|
|
|
|
+ vec![DoNothingRemoteRaft {}; peer_size],
|
|
|
|
|
+ me,
|
|
|
|
|
+ DoNothingPersister {},
|
|
|
|
|
+ |_: ApplyCommandMessage<i32>| {},
|
|
|
|
|
+ None,
|
|
|
|
|
+ |_| {},
|
|
|
|
|
+ );
|
|
|
|
|
+
|
|
|
|
|
+ assert_eq!(4, raft.peers.len());
|
|
|
|
|
+ for peer in &raft.peers {
|
|
|
|
|
+ assert_ne!(peer.0, me);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|