From f6dcd845fc80da4e3043146e4362258dd8e0c0a1 Mon Sep 17 00:00:00 2001 From: jacqueline Date: Thu, 26 Jan 2023 15:02:57 +1100 Subject: Switch from MessageBuffer to Queue for pipeline comms --- src/audio/include/audio_element.hpp | 63 +++++++++++++++++++++++-------------- 1 file changed, 40 insertions(+), 23 deletions(-) (limited to 'src/audio/include/audio_element.hpp') diff --git a/src/audio/include/audio_element.hpp b/src/audio/include/audio_element.hpp index eb700180..8827a0c3 100644 --- a/src/audio/include/audio_element.hpp +++ b/src/audio/include/audio_element.hpp @@ -2,6 +2,8 @@ #include #include +#include +#include #include "freertos/FreeRTOS.h" @@ -12,6 +14,7 @@ #include "span.hpp" #include "stream_buffer.hpp" +#include "stream_event.hpp" #include "stream_info.hpp" #include "types.hpp" @@ -36,6 +39,8 @@ enum AudioProcessingError { OUT_OF_DATA, }; +static const size_t kEventQueueSize = 8; + /* * One indepedentent part of an audio pipeline. Each element has an input and * output message stream, and is responsible for taking data from the input @@ -51,38 +56,38 @@ enum AudioProcessingError { */ class IAudioElement { public: - IAudioElement() - : input_buffer_(nullptr), output_buffer_(nullptr), state_(STATE_RUN) {} - virtual ~IAudioElement() {} + IAudioElement(); + virtual ~IAudioElement(); /* * Returns the stack size in bytes that this element requires. This should * be tuned according to the observed stack size of each element, as different - * elements have fairly different stack requirements. + * elements have fairly different stack requirements (particular decoders). */ virtual auto StackSizeBytes() const -> std::size_t { return 2048; }; - /* - * How long to wait for new data on the input stream before triggering a call - * to ProcessIdle(). If this is portMAX_DELAY (the default), then ProcessIdle - * will never be called. - */ - virtual auto IdleTimeout() const -> TickType_t { return 10; } - virtual auto InputMinChunkSize() const -> std::size_t { return 0; } /* Returns this element's input buffer. */ - auto InputBuffer() const -> StreamBuffer* { return input_buffer_; } + auto InputEventQueue() const -> QueueHandle_t { return input_events_; } /* Returns this element's output buffer. */ - auto OutputBuffer() const -> StreamBuffer* { return output_buffer_; } + auto OutputEventQueue() const -> QueueHandle_t { return output_events_; } + + auto OutputEventQueue(const QueueHandle_t q) -> void { output_events_ = q; } - auto InputBuffer(StreamBuffer* b) -> void { input_buffer_ = b; } + auto HasUnflushedOutput() -> bool { return !buffered_output_.empty(); } - auto OutputBuffer(StreamBuffer* b) -> void { output_buffer_ = b; } + virtual auto HasUnprocessedInput() -> bool = 0; - auto ElementState() const -> ElementState { return state_; } - auto ElementState(enum ElementState e) -> void { state_ = e; } + auto IsOverBuffered() -> bool { return unprocessed_output_chunks_ > 4; } + + auto FlushBufferedOutput() -> bool; + + auto ElementState() const -> ElementState { return current_state_; } + auto ElementState(enum ElementState e) -> void { current_state_ = e; } + + virtual auto OnChunkProcessed() -> void { unprocessed_output_chunks_--; } /* * Called when a StreamInfo message is received. Used to configure this @@ -105,14 +110,26 @@ class IAudioElement { * time. This could be used to synthesize output, or to save memory by * releasing unused resources. */ - virtual auto ProcessIdle() -> cpp::result = 0; - - virtual auto PrepareForPause() -> void{}; + virtual auto Process() -> cpp::result = 0; protected: - StreamBuffer* input_buffer_; - StreamBuffer* output_buffer_; - std::atomic state_; + auto SendOrBufferEvent(std::unique_ptr event) -> bool; + + // Queue for events coming into this element. Owned by us. + QueueHandle_t input_events_; + // Queue for events going into the next element. Not owned by us, may be null + // if we're not yet in a pipeline. + // FIXME: it would be nicer if this was non-nullable. + QueueHandle_t output_events_; + + // The number of output chunks that we have generated, but have not yet been + // processed by the next element in the pipeline. This includes any chunks + // that are currently help in buffered_output_. + int unprocessed_output_chunks_; + // Output events that have been generated, but are yet to be sent downstream. + std::deque> buffered_output_; + + enum ElementState current_state_; }; } // namespace audio -- cgit v1.2.3