blob: 86c6be3866a145a99ab521613567961828a492d2 [file] [log] [blame] [edit]
//! High-level interface for a [V4L2 video
//! encoder](https://www.kernel.org/doc/html/latest/userspace-api/media/v4l/dev-encoder.html).
use crate::{
device::{
poller::{DeviceEvent, PollError, PollEvent, Poller, Waker},
queue::{
direction::{Capture, Output},
dqbuf::DqBuffer,
handles_provider::HandlesProvider,
qbuf::{
get_free::{GetFreeBufferError, GetFreeCaptureBuffer, GetFreeOutputBuffer},
get_indexed::GetCaptureBufferByIndex,
CaptureQueueable, OutputQueueableProvider,
},
BuffersAllocated, CanceledBuffer, CreateQueueError, FormatBuilder, Queue, QueueInit,
RequestBuffersError,
},
AllocatedQueue, Device, DeviceConfig, DeviceOpenError, Stream, TryDequeue,
},
ioctl::{
self, DqBufError, DqBufIoctlError, EncoderCommand, FormatFlags, GFmtError,
V4l2BufferFromError,
},
memory::{BufferHandles, PrimitiveBufferHandles},
Format,
};
use log::warn;
use std::{
any::Any,
io,
path::Path,
sync::{atomic::AtomicUsize, Arc},
task::Wake,
thread::JoinHandle,
};
use thiserror::Error;
/// Trait implemented by all states of the encoder.
pub trait EncoderState {}
pub struct Encoder<S: EncoderState> {
// Make sure to keep the device alive as long as we are.
device: Arc<Device>,
state: S,
}
pub struct AwaitingCaptureFormat {
output_queue: Queue<Output, QueueInit>,
capture_queue: Queue<Capture, QueueInit>,
}
impl EncoderState for AwaitingCaptureFormat {}
#[derive(Debug, Error)]
pub enum EncoderOpenError {
#[error("error while opening device")]
DeviceOpenError(#[from] DeviceOpenError),
#[error("error while creating queue")]
CreateQueueError(#[from] CreateQueueError),
#[error("specified device is not an encoder")]
NotAnEncoder,
}
impl Encoder<AwaitingCaptureFormat> {
pub fn open(path: &Path) -> Result<Self, EncoderOpenError> {
let config = DeviceConfig::new().non_blocking_dqbuf();
let device = Arc::new(Device::open(path, config)?);
// Check that the device is indeed an encoder.
let capture_queue = Queue::get_capture_mplane_queue(device.clone())?;
let output_queue = Queue::get_output_mplane_queue(device.clone())?;
// On an encoder, the OUTPUT formats are not compressed, but the CAPTURE ones are.
// Return an error if our device does not satisfy these conditions.
output_queue
.format_iter()
.find(|fmt| !fmt.flags.contains(FormatFlags::COMPRESSED))
.and(
capture_queue
.format_iter()
.find(|fmt| fmt.flags.contains(FormatFlags::COMPRESSED)),
)
.ok_or(EncoderOpenError::NotAnEncoder)
.map(|_| ())?;
Ok(Encoder {
device,
state: AwaitingCaptureFormat {
output_queue,
capture_queue,
},
})
}
pub fn set_capture_format<F>(mut self, f: F) -> anyhow::Result<Encoder<AwaitingOutputFormat>>
where
F: FnOnce(FormatBuilder) -> anyhow::Result<()>,
{
let builder = self.state.capture_queue.change_format()?;
f(builder)?;
Ok(Encoder {
device: self.device,
state: AwaitingOutputFormat {
output_queue: self.state.output_queue,
capture_queue: self.state.capture_queue,
},
})
}
}
pub struct AwaitingOutputFormat {
output_queue: Queue<Output, QueueInit>,
capture_queue: Queue<Capture, QueueInit>,
}
impl EncoderState for AwaitingOutputFormat {}
impl Encoder<AwaitingOutputFormat> {
pub fn set_output_format<F>(mut self, f: F) -> anyhow::Result<Encoder<AwaitingOutputBuffers>>
where
F: FnOnce(FormatBuilder) -> anyhow::Result<()>,
{
let builder = self.state.output_queue.change_format()?;
f(builder)?;
Ok(Encoder {
device: self.device,
state: AwaitingOutputBuffers {
output_queue: self.state.output_queue,
capture_queue: self.state.capture_queue,
},
})
}
}
pub struct AwaitingOutputBuffers {
output_queue: Queue<Output, QueueInit>,
capture_queue: Queue<Capture, QueueInit>,
}
impl EncoderState for AwaitingOutputBuffers {}
impl Encoder<AwaitingOutputBuffers> {
pub fn allocate_output_buffers_generic<OP: BufferHandles>(
self,
memory_type: OP::SupportedMemoryType,
num_output: usize,
) -> Result<Encoder<AwaitingCaptureBuffers<OP>>, RequestBuffersError> {
Ok(Encoder {
device: self.device,
state: AwaitingCaptureBuffers {
output_queue: self
.state
.output_queue
.request_buffers_generic::<OP>(memory_type, num_output as u32)?,
capture_queue: self.state.capture_queue,
},
})
}
pub fn allocate_output_buffers<OP: PrimitiveBufferHandles>(
self,
num_output: usize,
) -> Result<Encoder<AwaitingCaptureBuffers<OP>>, RequestBuffersError> {
self.allocate_output_buffers_generic(OP::MEMORY_TYPE, num_output)
}
pub fn get_output_format(&self) -> Result<Format, GFmtError> {
self.state.output_queue.get_format()
}
pub fn get_capture_format(&self) -> Result<Format, GFmtError> {
self.state.capture_queue.get_format()
}
}
pub struct AwaitingCaptureBuffers<OP: BufferHandles> {
output_queue: Queue<Output, BuffersAllocated<OP>>,
capture_queue: Queue<Capture, QueueInit>,
}
impl<OP: BufferHandles> EncoderState for AwaitingCaptureBuffers<OP> {}
impl<OP: BufferHandles> Encoder<AwaitingCaptureBuffers<OP>> {
pub fn allocate_capture_buffers_generic<P: HandlesProvider>(
self,
memory_type: <P::HandleType as BufferHandles>::SupportedMemoryType,
num_capture: usize,
capture_memory_provider: P,
) -> Result<Encoder<ReadyToEncode<OP, P>>, RequestBuffersError>
where
for<'a> Queue<Capture, BuffersAllocated<P::HandleType>>:
GetFreeCaptureBuffer<'a, P::HandleType>,
{
Ok(Encoder {
device: self.device,
state: ReadyToEncode {
output_queue: self.state.output_queue,
capture_queue: self
.state
.capture_queue
.request_buffers_generic::<P::HandleType>(memory_type, num_capture as u32)?,
capture_memory_provider,
poll_wakeups_counter: None,
},
})
}
pub fn allocate_capture_buffers<P: HandlesProvider>(
self,
num_capture: usize,
capture_memory_provider: P,
) -> Result<Encoder<ReadyToEncode<OP, P>>, RequestBuffersError>
where
P::HandleType: PrimitiveBufferHandles,
for<'a> Queue<Capture, BuffersAllocated<P::HandleType>>:
GetFreeCaptureBuffer<'a, P::HandleType>,
{
self.allocate_capture_buffers_generic(
P::HandleType::MEMORY_TYPE,
num_capture,
capture_memory_provider,
)
}
}
pub struct ReadyToEncode<OP: BufferHandles, P: HandlesProvider> {
output_queue: Queue<Output, BuffersAllocated<OP>>,
capture_queue: Queue<Capture, BuffersAllocated<P::HandleType>>,
capture_memory_provider: P,
poll_wakeups_counter: Option<Arc<AtomicUsize>>,
}
impl<OP: BufferHandles, P: HandlesProvider> EncoderState for ReadyToEncode<OP, P> {}
impl<OP: BufferHandles, P: HandlesProvider> Encoder<ReadyToEncode<OP, P>>
where
for<'a> Queue<Capture, BuffersAllocated<P::HandleType>>:
GetFreeCaptureBuffer<'a, P::HandleType> + GetCaptureBufferByIndex<'a, P::HandleType>,
{
pub fn set_poll_counter(mut self, poll_wakeups_counter: Arc<AtomicUsize>) -> Self {
self.state.poll_wakeups_counter = Some(poll_wakeups_counter);
self
}
pub fn start<InputDoneCb, OutputReadyCb>(
self,
input_done_cb: InputDoneCb,
output_ready_cb: OutputReadyCb,
) -> io::Result<Encoder<Encoding<OP, P, InputDoneCb, OutputReadyCb>>>
where
InputDoneCb: Fn(CompletedOutputBuffer<OP>),
OutputReadyCb: FnMut(DqBuffer<Capture, P::HandleType>) + Send + 'static,
{
self.state.output_queue.stream_on().unwrap();
self.state.capture_queue.stream_on().unwrap();
let mut output_poller = Poller::new(Arc::clone(&self.device))?;
output_poller.enable_event(DeviceEvent::OutputReady)?;
let mut encoder_thread = EncoderThread::new(
&self.device,
self.state.capture_queue,
self.state.capture_memory_provider,
output_ready_cb,
)?;
if let Some(counter) = &self.state.poll_wakeups_counter {
output_poller.set_poll_counter(Arc::clone(counter));
encoder_thread.set_poll_counter(Arc::clone(counter));
}
let handle = std::thread::Builder::new()
.name("V4L2 Encoder".into())
.spawn(move || encoder_thread.run())?;
Ok(Encoder {
device: self.device,
state: Encoding {
output_queue: self.state.output_queue,
input_done_cb,
output_poller,
handle,
},
})
}
}
pub struct Encoding<OP: BufferHandles, P, InputDoneCb, OutputReadyCb>
where
P: HandlesProvider,
InputDoneCb: Fn(CompletedOutputBuffer<OP>),
OutputReadyCb: FnMut(DqBuffer<Capture, P::HandleType>) + Send,
{
output_queue: Queue<Output, BuffersAllocated<OP>>,
input_done_cb: InputDoneCb,
output_poller: Poller,
handle: JoinHandle<EncoderThread<P, OutputReadyCb>>,
}
impl<OP, P, InputDoneCb, OutputReadyCb> EncoderState for Encoding<OP, P, InputDoneCb, OutputReadyCb>
where
OP: BufferHandles,
P: HandlesProvider,
InputDoneCb: Fn(CompletedOutputBuffer<OP>),
OutputReadyCb: FnMut(DqBuffer<Capture, P::HandleType>) + Send,
{
}
// Safe because all Rcs are internal and never leaked outside of the struct.
unsafe impl<S: EncoderState> Send for Encoder<S> {}
pub enum CompletedOutputBuffer<OP: BufferHandles> {
Dequeued(DqBuffer<Output, OP>),
Canceled(CanceledBuffer<OP>),
}
#[derive(Debug, Error)]
pub enum GetBufferError {
#[error("error while dequeueing buffer")]
DequeueError(#[from] DqBufError<V4l2BufferFromError>),
#[error("error during poll")]
PollError(#[from] PollError),
#[error("error while obtaining buffer")]
GetFreeBufferError(#[from] GetFreeBufferError),
}
#[derive(Debug, Error)]
pub enum EncoderStopError {
#[error("error while sending STOP command")]
EncoderCmdError(#[from] ioctl::EncoderCmdError),
#[error("thread has panicked")]
ThreadPanickedError(Box<dyn Any + Send + 'static>),
#[error("cannot streamoff capture queue")]
CaptureQueueStreamoffError(ioctl::StreamOffError),
#[error("cannot streamoff output queue")]
OutputQueueStreamoffError(ioctl::StreamOffError),
}
impl<OP, P, InputDoneCb, OutputReadyCb> Encoder<Encoding<OP, P, InputDoneCb, OutputReadyCb>>
where
OP: BufferHandles,
P: HandlesProvider,
InputDoneCb: Fn(CompletedOutputBuffer<OP>),
OutputReadyCb: FnMut(DqBuffer<Capture, P::HandleType>) + Send,
{
/// Stop the encoder, and returns the encoder ready to be started again.
pub fn stop(self) -> Result<Encoder<ReadyToEncode<OP, P>>, EncoderStopError> {
ioctl::encoder_cmd::<_, ()>(&*self.device, &EncoderCommand::Stop(false))?;
// The encoder thread should receive the LAST buffer and exit on its own.
let encoding_thread = self
.state
.handle
.join()
.map_err(EncoderStopError::ThreadPanickedError)?;
encoding_thread
.capture_queue
.stream_off()
.map_err(EncoderStopError::CaptureQueueStreamoffError)?;
/* Return all canceled buffers to the client */
let canceled_buffers = self
.state
.output_queue
.stream_off()
.map_err(EncoderStopError::OutputQueueStreamoffError)?;
for buffer in canceled_buffers {
(self.state.input_done_cb)(CompletedOutputBuffer::Canceled(buffer));
}
Ok(Encoder {
device: self.device,
state: ReadyToEncode {
output_queue: self.state.output_queue,
capture_queue: encoding_thread.capture_queue,
capture_memory_provider: encoding_thread.capture_memory_provider,
poll_wakeups_counter: None,
},
})
}
/// Attempts to dequeue and release output buffers that the driver is done with.
fn dequeue_output_buffers(&self) -> Result<(), DqBufError<V4l2BufferFromError>> {
let output_queue = &self.state.output_queue;
while output_queue.num_queued_buffers() > 0 {
match output_queue.try_dequeue() {
Ok(buf) => {
(self.state.input_done_cb)(CompletedOutputBuffer::Dequeued(buf));
}
Err(DqBufError::IoctlError(DqBufIoctlError::NotReady)) => break,
// TODO buffers with the error flag set should not result in
// a fatal error!
Err(e) => return Err(e),
}
}
Ok(())
}
// Make this thread sleep until at least one OUTPUT buffer is ready to be
// obtained through `try_get_buffer()`, dequeuing buffers if necessary.
fn wait_for_output_buffer(&mut self) -> Result<(), GetBufferError> {
for event in self.state.output_poller.poll(None)? {
match event {
PollEvent::Device(DeviceEvent::OutputReady) => {
self.dequeue_output_buffers()?;
}
_ => panic!("Unexpected return from OUTPUT queue poll!"),
}
}
Ok(())
}
}
impl<'a, OP, P, InputDoneCb, OutputReadyCb> OutputQueueableProvider<'a, OP>
for Encoder<Encoding<OP, P, InputDoneCb, OutputReadyCb>>
where
Queue<Output, BuffersAllocated<OP>>: OutputQueueableProvider<'a, OP>,
OP: BufferHandles,
P: HandlesProvider,
InputDoneCb: Fn(CompletedOutputBuffer<OP>),
OutputReadyCb: FnMut(DqBuffer<Capture, P::HandleType>) + Send,
{
type Queueable =
<Queue<Output, BuffersAllocated<OP>> as OutputQueueableProvider<'a, OP>>::Queueable;
}
/// Let the encoder provide the buffers from the OUTPUT queue.
impl<'a, OP, P, InputDoneCb, OutputReadyCb> GetFreeOutputBuffer<'a, OP, GetBufferError>
for Encoder<Encoding<OP, P, InputDoneCb, OutputReadyCb>>
where
Queue<Output, BuffersAllocated<OP>>: GetFreeOutputBuffer<'a, OP>,
OP: BufferHandles,
P: HandlesProvider,
InputDoneCb: Fn(CompletedOutputBuffer<OP>),
OutputReadyCb: FnMut(DqBuffer<Capture, P::HandleType>) + Send,
{
/// Returns a V4L2 buffer to be filled with a frame to encode if one
/// is available.
///
/// This method will return None immediately if all the allocated buffers
/// are currently queued.
fn try_get_free_buffer(&'a self) -> Result<Self::Queueable, GetBufferError> {
self.dequeue_output_buffers()?;
Ok(self.state.output_queue.try_get_free_buffer()?)
}
}
// If `GetFreeBuffer` is implemented, we can also provide a blocking `get_buffer`
// method.
impl<'a, OP, P, InputDoneCb, OutputReadyCb> Encoder<Encoding<OP, P, InputDoneCb, OutputReadyCb>>
where
Self: GetFreeOutputBuffer<'a, OP, GetBufferError>,
OP: BufferHandles,
P: HandlesProvider,
InputDoneCb: Fn(CompletedOutputBuffer<OP>),
OutputReadyCb: FnMut(DqBuffer<Capture, P::HandleType>) + Send,
{
/// Returns a V4L2 buffer to be filled with a frame to encode, waiting for
/// one to be available if needed.
///
/// Contrary to `try_get_free_buffer(), this method will wait for a buffer
/// to be available if needed.
pub fn get_buffer(
&'a mut self,
) -> Result<<Self as OutputQueueableProvider<'a, OP>>::Queueable, GetBufferError> {
let output_queue = &self.state.output_queue;
// If all our buffers are queued, wait until we can dequeue some.
if output_queue.num_queued_buffers() == output_queue.num_buffers() {
self.wait_for_output_buffer()?;
}
self.try_get_free_buffer()
}
}
struct EncoderThread<P, OutputReadyCb>
where
P: HandlesProvider,
OutputReadyCb: FnMut(DqBuffer<Capture, P::HandleType>) + Send,
{
capture_queue: Queue<Capture, BuffersAllocated<P::HandleType>>,
capture_memory_provider: P,
poller: Poller,
waker: Arc<Waker>,
output_ready_cb: OutputReadyCb,
}
impl<P, OutputReadyCb> EncoderThread<P, OutputReadyCb>
where
P: HandlesProvider,
OutputReadyCb: FnMut(DqBuffer<Capture, P::HandleType>) + Send,
for<'a> Queue<Capture, BuffersAllocated<P::HandleType>>:
GetFreeCaptureBuffer<'a, P::HandleType> + GetCaptureBufferByIndex<'a, P::HandleType>,
{
fn new(
device: &Arc<Device>,
capture_queue: Queue<Capture, BuffersAllocated<P::HandleType>>,
capture_memory_provider: P,
output_ready_cb: OutputReadyCb,
) -> io::Result<Self> {
let mut poller = Poller::new(Arc::clone(device))?;
poller.enable_event(DeviceEvent::CaptureReady)?;
let waker = poller.add_waker(0)?;
Ok(EncoderThread {
capture_queue,
capture_memory_provider,
poller,
waker,
output_ready_cb,
})
}
fn set_poll_counter(&mut self, poll_wakeups_counter: Arc<AtomicUsize>) {
self.poller.set_poll_counter(poll_wakeups_counter);
}
fn run(mut self) -> Self {
self.enqueue_capture_buffers();
'polling: loop {
match self.capture_queue.num_queued_buffers() {
// If there are no buffers on the CAPTURE queue, poll() will return
// immediately with EPOLLERR and we would loop indefinitely.
// Prevent this by temporarily disabling polling the device in such
// cases.
0 => {
self.poller
.disable_event(DeviceEvent::CaptureReady)
.unwrap();
}
// If device polling was disabled and we have buffers queued, we
// can reenable it as poll will now wait for a CAPTURE buffer to
// be ready for dequeue.
_ => {
self.poller.enable_event(DeviceEvent::CaptureReady).unwrap();
}
}
// TODO handle errors - this system call can be interrupted and we
// should leave in this case.
for event in self.poller.poll(None).unwrap() {
match event {
// A CAPTURE buffer has been released by the client.
PollEvent::Waker(0) => {
// Requeue all available CAPTURE buffers.
self.enqueue_capture_buffers();
}
// A CAPTURE buffer is ready to be dequeued.
PollEvent::Device(DeviceEvent::CaptureReady) => {
// Get the encoded buffer
// TODO Manage errors here, including corrupted buffers!
if let Ok(mut cap_buf) = self.capture_queue.try_dequeue() {
let is_last = cap_buf.data.is_last();
let is_empty = *cap_buf.data.get_first_plane().bytesused == 0;
// Add a drop callback to the dequeued buffer so we
// re-queue it as soon as it is dropped.
let cap_waker = Arc::clone(&self.waker);
cap_buf.add_drop_callback(move |_dqbuf| {
cap_waker.wake();
});
// Empty buffers do not need to be passed to the client.
if !is_empty {
(self.output_ready_cb)(cap_buf);
}
// Last buffer of the stream? Time for us to terminate.
if is_last {
break 'polling;
}
} else {
// TODO we should not crash here.
panic!("Expected a CAPTURE buffer but none available!");
}
}
_ => panic!("Unexpected return from CAPTURE queue poll!"),
}
}
}
self
}
fn enqueue_capture_buffers(&mut self) {
'enqueue: while let Some(handles) = self.capture_memory_provider.get_handles(&self.waker) {
if let Ok(buffer) = self
.capture_memory_provider
.get_suitable_buffer_for(&handles, &self.capture_queue)
{
buffer.queue_with_handles(handles).unwrap();
} else {
warn!("Handles potentially lost due to no V4L2 buffer being available");
break 'enqueue;
}
}
}
}