|
|
@@ -4,20 +4,18 @@ use std::sync::{Arc, Condvar, Mutex, MutexGuard};
|
|
|
use std::time::Duration;
|
|
|
|
|
|
pub struct Carrier<T> {
|
|
|
- template: CarrierRef<T>,
|
|
|
+ template: Arc<CarrierTarget<T>>,
|
|
|
shutdown: AtomicBool,
|
|
|
}
|
|
|
|
|
|
impl<T> Carrier<T> {
|
|
|
- pub fn new(inner: T) -> Self {
|
|
|
+ pub fn new(target: T) -> Self {
|
|
|
Self {
|
|
|
- template: CarrierRef {
|
|
|
- inner: Arc::new(CarrierTarget {
|
|
|
- target: inner,
|
|
|
- condvar: Default::default(),
|
|
|
- count: Mutex::new(0),
|
|
|
- }),
|
|
|
- },
|
|
|
+ template: Arc::new(CarrierTarget {
|
|
|
+ target,
|
|
|
+ condvar: Default::default(),
|
|
|
+ count: Mutex::new(0),
|
|
|
+ }),
|
|
|
shutdown: AtomicBool::new(false),
|
|
|
}
|
|
|
}
|
|
|
@@ -28,7 +26,7 @@ impl<T> Carrier<T> {
|
|
|
|
|
|
pub fn create_ref(&self) -> Option<CarrierRef<T>> {
|
|
|
if !self.shutdown.load(Ordering::Acquire) {
|
|
|
- Some(self.template.dup())
|
|
|
+ Some(CarrierRef::new(&self.template))
|
|
|
} else {
|
|
|
None
|
|
|
}
|
|
|
@@ -38,34 +36,56 @@ impl<T> Carrier<T> {
|
|
|
self.shutdown.store(true, Ordering::Release);
|
|
|
}
|
|
|
|
|
|
- pub fn wait(&self) {
|
|
|
- let count = self.template.lock_count();
|
|
|
- let _count = self
|
|
|
- .template
|
|
|
- .inner
|
|
|
- .condvar
|
|
|
- .wait_while(count, |count| *count != 0)
|
|
|
- .expect("The carrier lock should not be poisoned");
|
|
|
- assert_eq!(Arc::strong_count(&self.template.inner), 1);
|
|
|
+ fn unwrap_or_panic(self) -> T {
|
|
|
+ let arc = self.template;
|
|
|
+ assert_eq!(Arc::strong_count(&arc), 1);
|
|
|
+
|
|
|
+ match Arc::try_unwrap(arc) {
|
|
|
+ Ok(t) => t.target,
|
|
|
+ Err(_arc) => {
|
|
|
+ panic!("The carrier should not have any outstanding references")
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
- pub fn wait_timeout(&self, timeout: Duration) -> bool {
|
|
|
- let count = self.template.lock_count();
|
|
|
- let (count, _result) = self
|
|
|
- .template
|
|
|
- .inner
|
|
|
- .condvar
|
|
|
- .wait_timeout_while(count, timeout, |count| *count != 0)
|
|
|
- .expect("The carrier lock should not be poisoned");
|
|
|
- return *count == 0;
|
|
|
+ pub fn wait(self) -> T {
|
|
|
+ {
|
|
|
+ let count = self.template.lock_count();
|
|
|
+ let count = self
|
|
|
+ .template
|
|
|
+ .condvar
|
|
|
+ .wait_while(count, |count| *count != 0)
|
|
|
+ .expect("The carrier lock should not be poisoned");
|
|
|
+
|
|
|
+ assert_eq!(*count, 0);
|
|
|
+ }
|
|
|
+ self.unwrap_or_panic()
|
|
|
}
|
|
|
|
|
|
- pub fn shutdown(&self) {
|
|
|
+ pub fn wait_timeout(self, timeout: Duration) -> Result<T, Self> {
|
|
|
+ let count = {
|
|
|
+ let count = self.template.lock_count();
|
|
|
+ let (count, _result) = self
|
|
|
+ .template
|
|
|
+ .condvar
|
|
|
+ .wait_timeout_while(count, timeout, |count| *count != 0)
|
|
|
+ .expect("The carrier lock should not be poisoned");
|
|
|
+ *count
|
|
|
+ };
|
|
|
+
|
|
|
+ if count == 0 {
|
|
|
+ Ok(self.unwrap_or_panic())
|
|
|
+ } else {
|
|
|
+ Err(self)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ pub fn shutdown(self) -> T {
|
|
|
self.close();
|
|
|
self.wait()
|
|
|
}
|
|
|
|
|
|
- pub fn shutdown_timeout(&self, timeout: Duration) -> bool {
|
|
|
+ pub fn shutdown_timeout(self, timeout: Duration) -> Result<T, Self> {
|
|
|
self.close();
|
|
|
self.wait_timeout(timeout)
|
|
|
}
|
|
|
@@ -79,29 +99,31 @@ struct CarrierTarget<T> {
|
|
|
count: Mutex<usize>,
|
|
|
}
|
|
|
|
|
|
-pub struct CarrierRef<T> {
|
|
|
- inner: Arc<CarrierTarget<T>>,
|
|
|
-}
|
|
|
-
|
|
|
-impl<T> CarrierRef<T> {
|
|
|
+impl<T> CarrierTarget<T> {
|
|
|
fn lock_count(&self) -> MutexGuard<usize> {
|
|
|
- self.inner
|
|
|
- .count
|
|
|
+ self.count
|
|
|
.lock()
|
|
|
.expect("The carrier lock should not be poisoned")
|
|
|
}
|
|
|
+}
|
|
|
+
|
|
|
+#[derive(Default)]
|
|
|
+pub struct CarrierRef<T> {
|
|
|
+ inner: Arc<CarrierTarget<T>>,
|
|
|
+}
|
|
|
|
|
|
- fn dup(&self) -> Self {
|
|
|
- let mut count = self.lock_count();
|
|
|
+impl<T> CarrierRef<T> {
|
|
|
+ fn new(inner: &Arc<CarrierTarget<T>>) -> Self {
|
|
|
+ let mut count = inner.lock_count();
|
|
|
*count += 1;
|
|
|
|
|
|
CarrierRef {
|
|
|
- inner: self.inner.clone(),
|
|
|
+ inner: inner.clone(),
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- fn dedup(&self) {
|
|
|
- let mut count = self.lock_count();
|
|
|
+ fn delete(&self) {
|
|
|
+ let mut count = self.inner.lock_count();
|
|
|
*count -= 1;
|
|
|
|
|
|
if *count == 0 {
|
|
|
@@ -116,17 +138,6 @@ impl<T> AsRef<T> for CarrierRef<T> {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-impl<T> Default for CarrierRef<T>
|
|
|
-where
|
|
|
- T: Default,
|
|
|
-{
|
|
|
- fn default() -> Self {
|
|
|
- Self {
|
|
|
- inner: Default::default(),
|
|
|
- }
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
impl<T> Deref for CarrierRef<T> {
|
|
|
type Target = T;
|
|
|
|
|
|
@@ -137,6 +148,6 @@ impl<T> Deref for CarrierRef<T> {
|
|
|
|
|
|
impl<T> Drop for CarrierRef<T> {
|
|
|
fn drop(&mut self) {
|
|
|
- self.dedup()
|
|
|
+ self.delete()
|
|
|
}
|
|
|
}
|