remote_context.rs 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148
  1. use std::any::Any;
  2. use std::cell::RefCell;
  3. use crate::remote_peer::RemotePeer;
  4. use crate::term_marker::TermMarker;
  5. use crate::verify_authority::DaemonBeatTicker;
  6. use crate::{Peer, RemoteRaft};
  7. /// A struct that contains useful utilities when contacting peers.
  8. ///
  9. /// The motivation of RemoteContext is to avoid cloning the `Arc<>` to utilities
  10. /// multiple times for each request. Instead, we embed shared utilities in the
  11. /// thread local space of each thread in a thread pool. Those references will be
  12. /// released when the thread pool is destroyed.
  13. ///
  14. /// This class provides a static reference to those utilities, which also helps
  15. /// simplifying lifetimes in futures.
  16. ///
  17. /// A big hack in this class is that we need to store generic structs in a
  18. /// thread local environment. This is because that the RPC interface is generic
  19. /// over the data to store in Raft. Thus `RemoteContext` must be generic, too.
  20. ///
  21. /// Generic thread local variables is not supported in Rust, due to the fact
  22. /// that one could potentially store multiple instances of the same class with
  23. /// different generic parameters. The instances must be somehow identified by
  24. /// their concrete types, which greatly increases implementation complexity.
  25. ///
  26. /// Luckily in our case, the "multiple instance" issue does not apply. A thread
  27. /// pool belongs to a known Raft instance, which has one set of known generic
  28. /// parameters. We would only ever need to store one instance of RemoteContext
  29. /// in one thread. Thus the ambiguity is casted away in `fetch_context()`.
  30. #[derive(Clone)]
  31. pub(crate) struct RemoteContext<Command> {
  32. term_marker: TermMarker<Command>,
  33. remote_peers: Vec<RemotePeer<Command, Peer>>,
  34. }
  35. impl<Command: 'static> RemoteContext<Command> {
  36. pub fn create(
  37. term_marker: TermMarker<Command>,
  38. remote_peers: Vec<RemotePeer<Command, Peer>>,
  39. ) -> Self {
  40. Self {
  41. term_marker,
  42. remote_peers,
  43. }
  44. }
  45. pub fn term_marker() -> &'static TermMarker<Command> {
  46. &Self::fetch_context().term_marker
  47. }
  48. pub fn remote_peer(peer: Peer) -> &'static RemotePeer<Command, Peer> {
  49. &Self::fetch_context().remote_peers[peer.0]
  50. }
  51. pub fn rpc_client(peer: Peer) -> &'static dyn RemoteRaft<Command> {
  52. Self::remote_peer(peer).rpc_client.as_ref()
  53. }
  54. pub fn beat_ticker(peer: Peer) -> &'static DaemonBeatTicker {
  55. &Self::remote_peer(peer).beat_ticker
  56. }
  57. thread_local! {
  58. // Using Any to mask the fact that we are storing a generic struct.
  59. static REMOTE_CONTEXT: RefCell<Option<&'static dyn Any>> = RefCell::new(None);
  60. }
  61. pub fn attach(self) {
  62. Self::set_context(Box::new(self))
  63. }
  64. pub fn detach() -> Box<Self> {
  65. let static_context = Self::fetch_context();
  66. Self::REMOTE_CONTEXT.with(|context| context.borrow_mut().take());
  67. unsafe { Box::from_raw((static_context as *const Self) as *mut Self) }
  68. }
  69. fn set_context(context: Box<Self>) {
  70. let context_ref = Box::leak(context);
  71. let any_ref: &'static mut dyn Any = context_ref;
  72. Self::REMOTE_CONTEXT
  73. .with(|context| *context.borrow_mut() = Some(any_ref));
  74. }
  75. fn fetch_context() -> &'static Self {
  76. let any_ref = Self::REMOTE_CONTEXT.with(|context| *context.borrow());
  77. let Some(any_ref) = any_ref else {
  78. panic!("Context is not set");
  79. };
  80. any_ref
  81. .downcast_ref::<Self>()
  82. .expect("Context is set to the wrong type.")
  83. }
  84. }
  85. #[cfg(test)]
  86. mod tests {
  87. use std::panic::catch_unwind;
  88. use std::sync::Arc;
  89. use parking_lot::Mutex;
  90. use crate::election::ElectionState;
  91. use crate::remote_peer::RemotePeer;
  92. use crate::term_marker::TermMarker;
  93. use crate::utils::do_nothing::{DoNothingPersister, DoNothingRemoteRaft};
  94. use crate::verify_authority::VerifyAuthorityDaemon;
  95. use crate::{Peer, RaftState};
  96. use super::RemoteContext;
  97. #[test]
  98. fn test_context_api() {
  99. let rf = Arc::new(Mutex::new(RaftState::<i32>::create(1, Peer(0))));
  100. let election = Arc::new(ElectionState::create());
  101. let verify_authority_daemon = VerifyAuthorityDaemon::create(1);
  102. let term_marker =
  103. TermMarker::create(rf, election, Arc::new(DoNothingPersister));
  104. let remote_peer = RemotePeer::create(
  105. Peer(0),
  106. DoNothingRemoteRaft,
  107. verify_authority_daemon.beat_ticker(0),
  108. );
  109. let context =
  110. Box::new(RemoteContext::create(term_marker, vec![remote_peer]));
  111. let context_ptr: *const RemoteContext<i32> = &*context;
  112. RemoteContext::set_context(context);
  113. let fetched_context = RemoteContext::fetch_context();
  114. let fetched_context_ptr: *const RemoteContext<i32> = fetched_context;
  115. assert_eq!(context_ptr, fetched_context_ptr);
  116. let detached_context = RemoteContext::detach();
  117. let detached_context_ptr: *const RemoteContext<i32> =
  118. &*detached_context;
  119. assert_eq!(context_ptr, detached_context_ptr);
  120. catch_unwind(|| {
  121. RemoteContext::<i32>::fetch_context();
  122. })
  123. .expect_err("Expecting error 'Context is not set'");
  124. }
  125. }