瀏覽代碼

Make HEARTBEAT_INTERVAL a Duration.

Jing Yang 3 年之前
父節點
當前提交
66cccb5847
共有 5 個文件被更改,包括 10 次插入23 次删除
  1. 2 6
      src/apply_command.rs
  2. 1 1
      src/heartbeats.rs
  3. 3 8
      src/raft.rs
  4. 2 6
      src/sync_log_entries.rs
  5. 2 2
      src/verify_authority.rs

+ 2 - 6
src/apply_command.rs

@@ -1,8 +1,7 @@
 use std::sync::atomic::Ordering;
-use std::time::Duration;
 
 use crate::daemon_env::Daemon;
-use crate::heartbeats::HEARTBEAT_INTERVAL_MILLIS;
+use crate::heartbeats::HEARTBEAT_INTERVAL;
 use crate::{Index, Raft, Snapshot};
 
 pub enum ApplyCommandMessage<Command> {
@@ -71,10 +70,7 @@ where
                     if rf.last_applied >= rf.commit_index {
                         // We have applied all committed log entries, wait until
                         // new log entries are committed.
-                        condvar.wait_for(
-                            &mut rf,
-                            Duration::from_millis(HEARTBEAT_INTERVAL_MILLIS),
-                        );
+                        condvar.wait_for(&mut rf, HEARTBEAT_INTERVAL);
                     }
                     // Note that between those two nested `if`s, log start is
                     // always smaller than or equal to commit index, as

+ 1 - 1
src/heartbeats.rs

@@ -9,7 +9,7 @@ use crate::utils::{retry_rpc, RPC_DEADLINE};
 use crate::verify_authority::DaemonBeatTicker;
 use crate::{AppendEntriesArgs, Raft, RaftState, RemoteRaft};
 
-pub(crate) const HEARTBEAT_INTERVAL_MILLIS: u64 = 150;
+pub(crate) const HEARTBEAT_INTERVAL: Duration = Duration::from_millis(150);
 
 #[derive(Clone)]
 pub(crate) struct HeartbeatsDaemon {

+ 3 - 8
src/raft.rs

@@ -1,6 +1,5 @@
 use std::sync::atomic::{AtomicBool, Ordering};
 use std::sync::Arc;
-use std::time::Duration;
 
 use crossbeam_utils::sync::WaitGroup;
 use parking_lot::{Condvar, Mutex};
@@ -9,7 +8,7 @@ use serde_derive::{Deserialize, Serialize};
 use crate::apply_command::ApplyCommandFnMut;
 use crate::daemon_env::{DaemonEnv, ThreadEnv};
 use crate::election::ElectionState;
-use crate::heartbeats::{HeartbeatsDaemon, HEARTBEAT_INTERVAL_MILLIS};
+use crate::heartbeats::{HeartbeatsDaemon, HEARTBEAT_INTERVAL};
 use crate::persister::PersistedRaftState;
 use crate::snapshot::{RequestSnapshotFnMut, SnapshotDaemon};
 use crate::verify_authority::VerifyAuthorityDaemon;
@@ -142,9 +141,7 @@ where
         this.run_apply_command_daemon(apply_command);
         // One off function that schedules many little tasks, running on the
         // internal thread pool.
-        this.schedule_heartbeats(Duration::from_millis(
-            HEARTBEAT_INTERVAL_MILLIS,
-        ));
+        this.schedule_heartbeats(HEARTBEAT_INTERVAL);
         // The last step is to start running election timer.
         this.run_election_timer();
         this
@@ -203,9 +200,7 @@ where
             .expect(
                 "All references to the thread pool should have been dropped.",
             )
-            .shutdown_timeout(Duration::from_millis(
-                HEARTBEAT_INTERVAL_MILLIS * 2,
-            ));
+            .shutdown_timeout(HEARTBEAT_INTERVAL * 2);
         // DaemonEnv must be shutdown after the thread pool, since there might
         // be tasks logging errors in the pool.
         self.daemon_env.shutdown();

+ 2 - 6
src/sync_log_entries.rs

@@ -1,12 +1,11 @@
 use std::sync::atomic::{AtomicUsize, Ordering};
 use std::sync::Arc;
-use std::time::Duration;
 
 use parking_lot::{Condvar, Mutex};
 
 use crate::check_or_record;
 use crate::daemon_env::{Daemon, ErrorKind};
-use crate::heartbeats::HEARTBEAT_INTERVAL_MILLIS;
+use crate::heartbeats::HEARTBEAT_INTERVAL;
 use crate::term_marker::TermMarker;
 use crate::utils::{retry_rpc, SharedSender, RPC_DEADLINE};
 use crate::verify_authority::DaemonBeatTicker;
@@ -346,10 +345,7 @@ where
                 term_marker.mark(term);
             }
             Err(_) => {
-                tokio::time::sleep(Duration::from_millis(
-                    HEARTBEAT_INTERVAL_MILLIS,
-                ))
-                .await;
+                tokio::time::sleep(HEARTBEAT_INTERVAL).await;
                 // Ignore the error. The log syncing thread must have died.
                 let _ = rerun.send(Some(Peer(peer_index)));
             }

+ 2 - 2
src/verify_authority.rs

@@ -8,7 +8,7 @@ use parking_lot::{Condvar, Mutex};
 
 use crate::beat_ticker::{Beat, SharedBeatTicker};
 use crate::daemon_env::Daemon;
-use crate::heartbeats::HEARTBEAT_INTERVAL_MILLIS;
+use crate::heartbeats::HEARTBEAT_INTERVAL;
 use crate::{Index, Raft, Term};
 
 /// The result returned to a verify authority request.
@@ -313,7 +313,7 @@ impl VerifyAuthorityDaemon {
     }
 
     const VERIFY_AUTHORITY_REQUEST_EXPIRATION: Duration =
-        Duration::from_millis(HEARTBEAT_INTERVAL_MILLIS * 2);
+        Duration::from_millis(HEARTBEAT_INTERVAL.as_millis() as u64 * 2);
 
     /// Remove expired requests if we are no longer the leader.
     /// If we have lost leadership, we are unlikely to receive confirmations