Эх сурвалжийг харах

Remove Sync from Command type requirement.

Jing Yang 5 жил өмнө
parent
commit
bff43d8c12
1 өөрчлөгдсөн 28 нэмэгдсэн , 13 устгасан
  1. 28 13
      src/lib.rs

+ 28 - 13
src/lib.rs

@@ -110,8 +110,7 @@ struct Opening(Arc<AtomicUsize>);
 // 2. serializable: they are sent over RPCs and persisted.
 // 3. deserializable: they are restored from storage.
 // 4. send: they are referenced in futures.
-// 5. sync: they are shared to other threads.
-// 6. default, because we need an element for the first entry.
+// 5. default, because we need an element for the first entry.
 impl<Command> Raft<Command>
 where
     Command: 'static
@@ -119,7 +118,6 @@ where
         + serde::Serialize
         + serde::de::DeserializeOwned
         + Send
-        + Sync
         + Default,
 {
     /// Create a new raft instance.
@@ -325,11 +323,10 @@ where
 // 0. 'static: Raft<Command> must be 'static, it is moved to another thread.
 // 1. clone: they are copied to the persister.
 // 2. send: Arc<Mutex<Vec<LogEntry<Command>>>> must be send, it is moved to another thread.
-// 3. sync: AppendEntries<Command> are used across await points for RPC retries.
-// 4. serialize: they are converted to bytes to persist.
+// 3. serialize: they are converted to bytes to persist.
 impl<Command> Raft<Command>
 where
-    Command: 'static + Clone + Send + Sync + serde::Serialize,
+    Command: 'static + Clone + Send + serde::Serialize,
 {
     fn run_election_timer(&self) -> std::thread::JoinHandle<()> {
         let this = self.clone();
@@ -475,8 +472,10 @@ where
         args: RequestVoteArgs,
     ) -> Option<bool> {
         let term = args.term;
+        // See the comment in send_heartbeat() for this override.
+        let rpc_client = rpc_client.as_ref();
         let reply =
-            retry_rpc(Self::REQUEST_VOTE_RETRY, RPC_DEADLINE, |_round| {
+            retry_rpc(Self::REQUEST_VOTE_RETRY, RPC_DEADLINE, move |_round| {
                 rpc_client.call_request_vote(args.clone())
             })
             .await;
@@ -603,7 +602,22 @@ where
         rpc_client: Arc<RpcClient>,
         args: AppendEntriesArgs<Command>,
     ) -> std::io::Result<()> {
-        retry_rpc(Self::HEARTBEAT_RETRY, RPC_DEADLINE, |_round| {
+        // Passing a reference that is moved to the following closure.
+        //
+        // It won't work if the rpc_client of type Arc is moved into the closure
+        // directly. To clone the Arc, the function must own a mutable reference
+        // to it. At the same time, rpc_client.call_append_entries() returns a
+        // future that must own a reference, too. That caused a compiling error
+        // of FnMut allowing "references to captured variables to escape".
+        //
+        // By passing-in a reference instead of an Arc, the closure becomes a Fn
+        // (no Mut), which can allow references to escape.
+        //
+        // Another option is to use non-move closures, in which case rpc_client
+        // of type Arc can be passed-in directly. However that requires args to
+        // be sync because they can be shared by more than one futures.
+        let rpc_client = rpc_client.as_ref();
+        retry_rpc(Self::HEARTBEAT_RETRY, RPC_DEADLINE, move |_round| {
             rpc_client.call_append_entries(args.clone())
         })
         .await?;
@@ -759,11 +773,12 @@ where
         args: AppendEntriesArgs<Command>,
     ) -> std::io::Result<Option<bool>> {
         let term = args.term;
-        let reply =
-            retry_rpc(Self::APPEND_ENTRIES_RETRY, RPC_DEADLINE, |_round| {
-                rpc_client.call_append_entries(args.clone())
-            })
-            .await?;
+        let reply = retry_rpc(
+            Self::APPEND_ENTRIES_RETRY,
+            RPC_DEADLINE,
+            move |_round| rpc_client.call_append_entries(args.clone()),
+        )
+        .await?;
         Ok(if reply.term == term {
             Some(reply.success)
         } else {