blob: cc46c836f49f606bcbd6a3e5be221d98ba35d377 [file] [log] [blame]
//! A restricted channel to pass data from signal handler.
//!
//! When trying to communicate data from signal handler to the outside world, one can use an atomic
//! variable (as it doesn't lock, so it can be made async-signal-safe). But this won't work for
//! larger data.
//!
//! This module provides a channel that can be used for that purpose. It is used by certain
//! [exfiltrators][crate::iterator::exfiltrator], but can be used as building block for custom
//! actions. In general, this is not a ready-made end-user API.
//!
//! # How does it work
//!
//! Each channel has a fixed number of slots and two queues (one for empty slots, one for full
//! slots). A signal handler takes a slot out of the empty one, fills it and passes it into the
//! full one. Outside of signal handler, it can take the value out of the full queue and return the
//! slot to the empty queue.
//!
//! The queues are implemented as bit-encoded indexes of the slots in the storage. The bits are
//! stored in an atomic variable.
//!
//! Note that the algorithm allows for a slot to be in neither queue (when it is being emptied or
//! filled).
//!
//! # Fallible allocation of a slot
//!
//! It is apparent that allocation of a new slot can fail (there's nothing in the empty slot). In
//! such case, there's no way to send the new value out of the handler (there's no way to safely
//! wait for a slot to appear, because the handler can be blocking the thread that is responsible
//! for emptying them). But that's considered acceptable ‒ even the kernel collates the same kinds
//! of signals together if they are not consumed by application fast enough and there are no free
//! slots exactly because some are being filled, emptied or are full ‒ in particular, the whole
//! system will yield a signal.
//!
//! This assumes that separate signals don't share the same buffer and that there's only one reader
//! (using multiple readers is still safe, but it is possible that all slots would be inside the
//! readers, but already empty, so the above argument would not hold).
// TODO: Other sizes? Does anyone need more than 5 slots?
use std::cell::UnsafeCell;
use std::sync::atomic::{AtomicU16, Ordering};
const SLOTS: usize = 5;
const BITS: u16 = 3;
const MASK: u16 = 0b111;
fn get(n: u16, idx: u16) -> u16 {
(n >> (BITS * idx)) & MASK
}
fn set(n: u16, idx: u16, v: u16) -> u16 {
let v = v << (BITS * idx);
let mask = MASK << (BITS * idx);
(n & !mask) | v
}
fn enqueue(q: &AtomicU16, val: u16) {
let mut current = q.load(Ordering::Relaxed);
loop {
let empty = (0..SLOTS as u16)
.find(|i| get(current, *i) == 0)
.expect("No empty slot available");
let modified = set(current, empty, val);
match q.compare_exchange_weak(current, modified, Ordering::Release, Ordering::Relaxed) {
Ok(_) => break,
Err(changed) => current = changed, // And retry with the changed value
}
}
}
fn dequeue(q: &AtomicU16) -> Option<u16> {
let mut current = q.load(Ordering::Relaxed);
loop {
let val = current & MASK;
// It's completely empty
if val == 0 {
break None;
}
let modified = current >> BITS;
match q.compare_exchange_weak(current, modified, Ordering::Acquire, Ordering::Relaxed) {
Ok(_) => break Some(val),
Err(changed) => current = changed,
}
}
}
/// A restricted async-signal-safe channel
///
/// This is a bit like the usual channel used for inter-thread communication, but with several
/// restrictions:
///
/// * There's a limited number of slots (currently 5).
/// * There's no way to wait for a place in it or for a value. If value is not available, `None` is
/// returned. If there's no space for a value, the value is silently dropped.
///
/// In exchange for that, all the operations on that channel are async-signal-safe. That means it
/// is possible to use it to communicate between a signal handler and the rest of the world with it
/// (specifically, it's designed to send information from the handler to the rest of the
/// application). The throwing out of values when full is in line with collating of the same type
/// in kernel (you should not use the same channel for multiple different signals).
///
/// Technically, this is a MPMC queue which preserves order, but it is expected to be used in MPSC
/// mode mostly (in theory, multiple threads can be executing a signal handler for the same signal
/// at the same time). The channel is not responsible for wakeups.
///
/// While the channel is async-signal-safe, you still need to make sure *creating* of the values is
/// too (it should not contain anything that allocates, for example ‒ so no `String`s inside, etc).
///
/// The code was *not* tuned for performance (signals are not expected to happen often).
pub struct Channel<T> {
storage: [UnsafeCell<Option<T>>; SLOTS],
empty: AtomicU16,
full: AtomicU16,
}
impl<T> Channel<T> {
/// Creates a new channel with nothing in it.
pub fn new() -> Self {
let storage = Default::default();
let me = Self {
storage,
empty: AtomicU16::new(0),
full: AtomicU16::new(0),
};
for i in 1..SLOTS + 1 {
enqueue(&me.empty, i as u16);
}
me
}
/// Inserts a value into the channel.
///
/// If the value doesn't fit, it is silently dropped. Never blocks.
pub fn send(&self, val: T) {
if let Some(empty_idx) = dequeue(&self.empty) {
unsafe { *self.storage[empty_idx as usize - 1].get() = Some(val) };
enqueue(&self.full, empty_idx);
}
}
/// Takes a value from the channel.
///
/// Or returns `None` if the channel is empty. Never blocks.
pub fn recv(&self) -> Option<T> {
dequeue(&self.full).map(|idx| {
let result = unsafe { &mut *self.storage[idx as usize - 1].get() }
.take()
.expect("Full slot with nothing in it");
enqueue(&self.empty, idx);
result
})
}
}
impl<T> Default for Channel<T> {
fn default() -> Self {
Self::new()
}
}
unsafe impl<T: Send> Send for Channel<T> {}
// Yes, really Send -> Sync. Having a reference to Channel allows Sending Ts, but not having refs
// on them.
unsafe impl<T: Send> Sync for Channel<T> {}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::thread;
use super::*;
#[test]
fn new_empty() {
let channel = Channel::<usize>::new();
assert!(channel.recv().is_none());
assert!(channel.recv().is_none());
}
#[test]
fn pass_value() {
let channel = Channel::new();
channel.send(42);
assert_eq!(42, channel.recv().unwrap());
assert!(channel.recv().is_none());
}
#[test]
fn multiple() {
let channel = Channel::new();
for i in 0..1000 {
channel.send(i);
assert_eq!(i, channel.recv().unwrap());
assert!(channel.recv().is_none());
}
}
#[test]
fn overflow() {
let channel = Channel::new();
for i in 0..10 {
channel.send(i);
}
for i in 0..5 {
assert_eq!(i, channel.recv().unwrap());
}
assert!(channel.recv().is_none());
}
#[test]
fn multi_thread() {
let channel = Arc::new(Channel::<usize>::new());
let sender = thread::spawn({
let channel = Arc::clone(&channel);
move || {
for i in 0..4 {
channel.send(i);
}
}
});
let mut results = Vec::new();
while results.len() < 4 {
results.extend(channel.recv());
}
assert_eq!(vec![0, 1, 2, 3], results);
sender.join().unwrap();
}
}