use std::{ cell::{Cell, UnsafeCell}, fmt, sync::{ atomic::{AtomicUsize, Ordering}, Arc, }, }; #[derive(Debug)] #[repr(align(64))] struct ProducerCacheline { /// The bounded size as specified by the user. capacity: usize, /// Index position of current tail tail: AtomicUsize, shadow_head: Cell, /// Id == 0 : never connected /// Id == usize::MAX: disconnected consumer_id: AtomicUsize, } #[derive(Debug)] #[repr(align(64))] struct ConsumerCacheline { /// The bounded size as specified by the user. capacity: usize, /// Index position of the current head head: AtomicUsize, shadow_tail: Cell, /// Id == 0 : never connected /// Id == usize::MAX: disconnected producer_id: AtomicUsize, } /// The internal memory buffer used by the queue. /// /// Buffer holds a pointer to allocated memory which represents the bounded /// ring buffer, as well as a head and tail atomicUsize which the producer and /// consumer use to track location in the ring. #[repr(C)] pub(crate) struct Buffer { buffer_storage: Vec>, pcache: ProducerCacheline, ccache: ConsumerCacheline, } impl fmt::Debug for Buffer { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let head = self.ccache.head.load(Ordering::Relaxed); let tail = self.pcache.tail.load(Ordering::Relaxed); let shead = self.pcache.shadow_head.get(); let stail = self.ccache.shadow_tail.get(); let id_to_str = |id| match id { 0 => "not connected".into(), usize::MAX => "disconnected".into(), x => format!("{}", x), }; let consumer_id = id_to_str(self.pcache.consumer_id.load(Ordering::Relaxed)); let producer_id = id_to_str(self.ccache.producer_id.load(Ordering::Relaxed)); f.debug_struct("SPSC Buffer") .field("capacity:", &self.ccache.capacity) .field("consumer_head:", &head) .field("shadow_head:", &shead) .field("producer_tail:", &tail) .field("shadow_tail:", &stail) .field("consumer_id:", &consumer_id) .field("producer_id:", &producer_id) .finish() } } unsafe impl Sync for Buffer {} /// A handle to the queue which allows consuming values from the buffer pub(crate) struct Consumer { pub(crate) buffer: Arc>, } impl Clone for Consumer { fn clone(&self) -> Self { Consumer { buffer: self.buffer.clone(), } } } /// A handle to the queue which allows adding values onto the buffer pub(crate) struct Producer { pub(crate) buffer: Arc>, } impl Clone for Producer { fn clone(&self) -> Self { Producer { buffer: self.buffer.clone(), } } } impl fmt::Debug for Consumer { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "Consumer {:?}", self.buffer) } } impl fmt::Debug for Producer { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "Producer {:?}", self.buffer) } } unsafe impl Send for Consumer {} unsafe impl Send for Producer {} impl Buffer { /// Attempt to pop a value off the buffer. /// /// If the buffer is empty, this method will not block. Instead, it will /// return `None` signifying the buffer was empty. The caller may then /// decide what to do next (e.g. spin-wait, sleep, process something /// else, etc) fn try_pop(&self) -> Option { let current_head = self.ccache.head.load(Ordering::Relaxed); if current_head == self.ccache.shadow_tail.get() { self.ccache .shadow_tail .set(self.pcache.tail.load(Ordering::Acquire)); if current_head == self.ccache.shadow_tail.get() { return None; } } let resp = unsafe { self.buffer_storage[current_head % self.ccache.capacity] .get() .read() }; self.ccache.head.store(current_head + 1, Ordering::Release); Some(resp) } /// Attempt to push a value onto the buffer. /// /// If the buffer is full, this method will not block. Instead, it will /// return `Some(v)`, where `v` was the value attempting to be pushed /// onto the buffer. If the value was successfully pushed onto the /// buffer, `None` will be returned signifying success. fn try_push(&self, v: T) -> Option { if self.consumer_disconnected() { return Some(v); } let current_tail = self.pcache.tail.load(Ordering::Relaxed); if self.pcache.shadow_head.get() + self.pcache.capacity <= current_tail { self.pcache .shadow_head .set(self.ccache.head.load(Ordering::Acquire)); if self.pcache.shadow_head.get() + self.pcache.capacity <= current_tail { return Some(v); } } unsafe { self.buffer_storage[current_tail % self.pcache.capacity] .get() .write(v); } self.pcache.tail.store(current_tail + 1, Ordering::Release); None } /// Disconnects the consumer, and returns whether or not it was already /// disconnected pub(crate) fn disconnect_consumer(&self) -> bool { self.pcache.consumer_id.swap(usize::MAX, Ordering::Release) == usize::MAX } /// Disconnects the consumer, and returns whether or not it was already /// disconnected pub(crate) fn disconnect_producer(&self) -> bool { self.ccache.producer_id.swap(usize::MAX, Ordering::Release) == usize::MAX } /// Disconnects the consumer, and returns whether or not it was already /// disconnected pub(crate) fn producer_disconnected(&self) -> bool { self.ccache.producer_id.load(Ordering::Acquire) == usize::MAX } /// Disconnects the consumer, and returns whether or not it was already /// disconnected pub(crate) fn consumer_disconnected(&self) -> bool { self.pcache.consumer_id.load(Ordering::Acquire) == usize::MAX } /// Returns the current size of the queue /// /// This value represents the current size of the queue. This value can be /// from 0-`capacity` inclusive. pub(crate) fn size(&self) -> usize { self.pcache.tail.load(Ordering::Acquire) - self.ccache.head.load(Ordering::Acquire) } } /// Handles deallocation of heap memory when the buffer is dropped impl Drop for Buffer { fn drop(&mut self) { // Pop the rest of the values off the queue. By moving them into this scope, // we implicitly call their destructor while self.try_pop().is_some() {} // We don't want to run any destructors here, because we didn't run // any of the constructors through the vector. And whatever object was // in fact still alive we popped above. unsafe { self.buffer_storage.set_len(0); } } } pub(crate) fn make(capacity: usize) -> (Producer, Consumer) { inner_make(capacity, 0) } fn inner_make(capacity: usize, initial_value: usize) -> (Producer, Consumer) { let buffer_storage = allocate_buffer(capacity); let arc = Arc::new(Buffer { buffer_storage, ccache: ConsumerCacheline { capacity, head: AtomicUsize::new(initial_value), shadow_tail: Cell::new(initial_value), producer_id: AtomicUsize::new(0), }, pcache: ProducerCacheline { capacity, tail: AtomicUsize::new(initial_value), shadow_head: Cell::new(initial_value), consumer_id: AtomicUsize::new(0), }, }); ( Producer { buffer: arc.clone(), }, Consumer { buffer: arc }, ) } fn allocate_buffer(capacity: usize) -> Vec> { let size = capacity.next_power_of_two(); let mut vec = Vec::with_capacity(size); unsafe { vec.set_len(size); } vec } pub(crate) trait BufferHalf { type Item; fn buffer(&self) -> &Buffer; fn connect(&self, id: usize); fn peer_id(&self) -> usize; /// Returns the total capacity of this queue /// /// This value represents the total capacity of the queue when it is full. /// It does not represent the current usage. For that, call `size()`. fn capacity(&self) -> usize; /// Returns the current size of the queue /// /// This value represents the current size of the queue. This value can be /// from 0-`capacity` inclusive. fn size(&self) -> usize { self.buffer().size() } } impl BufferHalf for Producer { type Item = T; fn buffer(&self) -> &Buffer { &*self.buffer } fn capacity(&self) -> usize { (*self.buffer).pcache.capacity } fn connect(&self, id: usize) { assert_ne!(id, 0); assert_ne!(id, usize::MAX); (*self.buffer) .ccache .producer_id .store(id, Ordering::Release); } fn peer_id(&self) -> usize { (*self.buffer).pcache.consumer_id.load(Ordering::Acquire) } } impl Producer { /// Attempt to push a value onto the buffer. /// /// This method does not block. If the queue is not full, the value will be /// added to the queue and the method will return `None`, signifying /// success. If the queue is full, this method will return `Some(v)``, /// where `v` is your original value. pub(crate) fn try_push(&self, v: T) -> Option { (*self.buffer).try_push(v) } /// Disconnects the producer, signaling to the consumer that no new values /// are going to be produced. /// /// Returns the buffer status before the disconnect pub(crate) fn disconnect(&self) -> bool { (*self.buffer).disconnect_producer() } pub(crate) fn consumer_disconnected(&self) -> bool { (*self.buffer).consumer_disconnected() } /// Returns the available space in the queue /// /// This value represents the number of items that can be pushed onto the /// queue before it becomes full. pub(crate) fn free_space(&self) -> usize { self.capacity() - self.size() } } impl BufferHalf for Consumer { type Item = T; fn buffer(&self) -> &Buffer { &(*self.buffer) } fn connect(&self, id: usize) { assert_ne!(id, usize::MAX); assert_ne!(id, 0); (*self.buffer) .pcache .consumer_id .store(id, Ordering::Release); } fn peer_id(&self) -> usize { (*self.buffer).ccache.producer_id.load(Ordering::Acquire) } fn capacity(&self) -> usize { (*self.buffer).ccache.capacity } } impl Consumer { /// Disconnects the consumer, signaling to the producer that no new values /// are going to be consumed. After this is done, any attempt on the /// producer to try_push should fail /// /// Returns the buffer status before the disconnect pub(crate) fn disconnect(&self) -> bool { (*self.buffer).disconnect_consumer() } pub(crate) fn producer_disconnected(&self) -> bool { (*self.buffer).producer_disconnected() } /// Attempt to pop a value off the queue. /// /// This method does not block. If the queue is empty, the method will /// return `None`. If there is a value available, the method will /// return `Some(v)`, where `v` is the value being popped off the queue. pub(crate) fn try_pop(&self) -> Option { (*self.buffer).try_pop() } } #[cfg(test)] mod tests { use super::*; use std::thread; #[test] fn test_try_push() { let (p, _) = super::make(10); for i in 0..10 { p.try_push(i); assert!(p.capacity() == 10); assert!(p.size() == i + 1); } match p.try_push(10) { Some(v) => { assert!(v == 10); } None => assert!(false, "Queue should not have accepted another write!"), } } #[test] fn test_try_poll() { let (p, c) = super::make(10); match c.try_pop() { Some(_) => assert!(false, "Queue was empty but a value was read!"), None => {} } p.try_push(123); match c.try_pop() { Some(v) => assert!(v == 123), None => assert!(false, "Queue was not empty but poll() returned nothing!"), } match c.try_pop() { Some(_) => assert!(false, "Queue was empty but a value was read!"), None => {} } } #[test] fn test_threaded() { let (p, c) = super::make(500); thread::spawn(move || { for i in 0..100000 { loop { if let None = p.try_push(i) { break; } } } }); for i in 0..100000 { loop { if let Some(t) = c.try_pop() { assert!(t == i); break; } } } } #[should_panic] #[test] fn test_wrap() { let (p, c) = super::inner_make(10, usize::MAX - 1); for i in 0..10 { assert_eq!(p.try_push(i).is_none(), true); } for i in 0..10 { assert_eq!(c.try_pop(), Some(i)); } } }