diff options
| author | jacqueline <me@jacqueline.id.au> | 2023-01-26 15:02:57 +1100 |
|---|---|---|
| committer | jacqueline <me@jacqueline.id.au> | 2023-01-26 15:02:57 +1100 |
| commit | f6dcd845fc80da4e3043146e4362258dd8e0c0a1 (patch) | |
| tree | 91410899a83fcc2dfb1eb3ab4837a11e401366fc /src/audio/audio_element.cpp | |
| parent | e7f926e2c376ccd4f4a4d6f4b104f3c23b0059dc (diff) | |
| download | tangara-fw-f6dcd845fc80da4e3043146e4362258dd8e0c0a1.tar.gz | |
Switch from MessageBuffer to Queue for pipeline comms
Diffstat (limited to 'src/audio/audio_element.cpp')
| -rw-r--r-- | src/audio/audio_element.cpp | 59 |
1 files changed, 59 insertions, 0 deletions
diff --git a/src/audio/audio_element.cpp b/src/audio/audio_element.cpp new file mode 100644 index 00000000..e1623c36 --- /dev/null +++ b/src/audio/audio_element.cpp @@ -0,0 +1,59 @@ +#include "audio_element.hpp" + +namespace audio { + +IAudioElement::IAudioElement() + : input_events_(xQueueCreate(kEventQueueSize, sizeof(StreamEvent))), + output_events_(nullptr), + unprocessed_output_chunks_(0), + buffered_output_(), + current_state_(STATE_RUN) {} + +IAudioElement::~IAudioElement() { + // Ensure we don't leak any memory from events leftover in the queue. + while (uxQueueSpacesAvailable(input_events_) < kEventQueueSize) { + StreamEvent* event; + if (xQueueReceive(input_events_, &event, 0)) { + free(event); + } else { + break; + } + } + // Technically there's a race here if someone is still adding to the queue, + // but hopefully the whole pipeline is stopped if an element is being + // destroyed. + vQueueDelete(input_events_); +} + +auto IAudioElement::SendOrBufferEvent(std::unique_ptr<StreamEvent> event) + -> bool { + if (event->tag == StreamEvent::CHUNK_DATA) { + unprocessed_output_chunks_++; + } + if (!buffered_output_.empty()) { + // To ensure we send data in order, don't try to send if we've already + // failed to send something. + buffered_output_.push_back(std::move(event)); + return false; + } + StreamEvent* raw_event = event.release(); + if (!xQueueSend(output_events_, raw_event, 0)) { + buffered_output_.emplace_front(raw_event); + return false; + } + return true; +} + +auto IAudioElement::FlushBufferedOutput() -> bool { + while (!buffered_output_.empty()) { + StreamEvent* raw_event = buffered_output_.front().release(); + buffered_output_.pop_front(); + if (!xQueueSend(output_events_, raw_event, 0)) { + buffered_output_.emplace_front(raw_event); + return false; + } + } + return true; +} + +} // namespace audio |
