diff options
Diffstat (limited to 'src/audio/audio_task.cpp')
| -rw-r--r-- | src/audio/audio_task.cpp | 146 |
1 files changed, 78 insertions, 68 deletions
diff --git a/src/audio/audio_task.cpp b/src/audio/audio_task.cpp index e3641973..f0a01b6c 100644 --- a/src/audio/audio_task.cpp +++ b/src/audio/audio_task.cpp @@ -3,6 +3,7 @@ #include <stdlib.h> #include <cstdint> +#include <deque> #include <memory> #include "audio_element_handle.hpp" @@ -10,19 +11,21 @@ #include "esp_heap_caps.h" #include "esp_log.h" #include "freertos/portmacro.h" +#include "freertos/projdefs.h" #include "freertos/queue.h" -#include "freertos/stream_buffer.h" #include "span.hpp" #include "audio_element.hpp" #include "chunk.hpp" +#include "stream_event.hpp" #include "stream_info.hpp" #include "stream_message.hpp" +#include "sys/_stdint.h" #include "tasks.hpp" namespace audio { - static const char *kTag = "task"; +static const char* kTag = "task"; auto StartAudioTask(const std::string& name, std::shared_ptr<IAudioElement> element) @@ -47,84 +50,91 @@ void AudioTaskMain(void* args) { std::shared_ptr<IAudioElement> element = std::move(real_args->element); delete real_args; - ChunkReader chunk_reader = ChunkReader(element->InputBuffer()); + // Queue of events that we have received on our input queue, but not yet + // processed. + std::deque<std::unique_ptr<StreamEvent>> pending_events; while (element->ElementState() != STATE_QUIT) { - if (element->ElementState() == STATE_PAUSE) { - // TODO: park with a condition variable or something? - vTaskDelay(1000); + // First, we pull events from our input queue into pending_events. This + // keeps us responsive to any events that need to be handled immediately. + // Then we check if there's any events to flush downstream. + // Then we pass anything requiring processing to the element. + + bool has_work_to_do = + (!pending_events.empty() || element->HasUnflushedOutput() || + element->HasUnprocessedInput()) && + !element->IsOverBuffered(); + + // If we have no new events to process and the element has nothing left to + // do, then just delay forever waiting for a new event. + TickType_t ticks_to_wait = has_work_to_do ? 0 : portMAX_DELAY; + + StreamEvent* event_ptr = nullptr; + bool has_event = + xQueueReceive(element->InputEventQueue(), &event_ptr, ticks_to_wait); + + if (has_event && event_ptr != nullptr) { + std::unique_ptr<StreamEvent> event(event_ptr); + if (event->tag == StreamEvent::CHUNK_NOTIFICATION) { + element->OnChunkProcessed(); + } else { + // This isn't an event that needs to be actioned immediately. Add it + // to our work queue. + pending_events.push_back(std::move(event)); + } + // Loop again, so that we service all incoming events before doing our + // possibly expensive processing. continue; } - cpp::result<size_t, AudioProcessingError> process_res; - - // If this element has an input stream, then our top priority is - // processing any chunks from it. Try doing this first, then fall back to - // the other cases. - bool has_received_message = false; - ChunkReadResult chunk_res = chunk_reader.ReadChunkFromStream( - [&](cpp::span<std::byte> data) -> std::optional<size_t> { - process_res = element->ProcessChunk(data); - if (process_res.has_value()) { - return process_res.value(); - } else { - return {}; - } - }, - 0); - - if (chunk_res == CHUNK_PROCESSING_ERROR || - chunk_res == CHUNK_DECODING_ERROR) { - ESP_LOGE(kTag, "failed to process chunk"); - break; // TODO. - } else if (chunk_res == CHUNK_STREAM_ENDED) { - has_received_message = true; + // We have no new events. Next, see if there's anything that needs to be + // flushed. + if (element->HasUnflushedOutput() && !element->FlushBufferedOutput()) { + // We had things to flush, but couldn't send it all. This probably + // implies that the downstream element is having issues servicing its + // input queue, so hold off for a moment before retrying. + ESP_LOGW(kTag, "failed to flush buffered output"); + vTaskDelay(pdMS_TO_TICKS(100)); + continue; } - if (has_received_message) { - auto message = chunk_reader.GetLastMessage(); - MessageType type = ReadMessageType(message); - if (type == TYPE_STREAM_INFO) { - auto parse_res = ReadMessage<StreamInfo>(&StreamInfo::Parse, message); - if (parse_res.has_error()) { - ESP_LOGE(kTag, "failed to parse stream info"); - break; // TODO. - } - auto info_res = element->ProcessStreamInfo(parse_res.value()); - if (info_res.has_error()) { - ESP_LOGE(kTag, "failed to process stream info"); - break; // TODO. - } + if (element->HasUnprocessedInput()) { + auto process_res = element->Process(); + if (!process_res.has_error() || process_res.error() != OUT_OF_DATA) { + // TODO: log! } + continue; } - // Chunk reading must have timed out, or we don't have an input stream. - ElementState state = element->ElementState(); - if (state == STATE_PAUSE) { - element->PrepareForPause(); - - vTaskSuspend(NULL); + // The element ran out of data, so now it's time to let it process more + // input. + while (!pending_events.empty()) { + auto event = std::move(pending_events.front()); + pending_events.pop_front(); - // Zzzzzz... - - // When we wake up, skip straight to the start of the loop again. - continue; - } else if (state == STATE_QUIT) { - break; - } + if (event->tag == StreamEvent::STREAM_INFO) { + auto process_res = element->ProcessStreamInfo(*event->stream_info); + if (process_res.has_error()) { + // TODO(jacqueline) + ESP_LOGE(kTag, "failed to process stream info"); + } + } else if (event->tag == StreamEvent::CHUNK_DATA) { + StreamEvent* callback = new StreamEvent(); + callback->source = element->InputEventQueue(); + callback->tag = StreamEvent::CHUNK_NOTIFICATION; + if (!xQueueSend(event->source, callback, 0)) { + // TODO: log? crash? hmm. + pending_events.push_front(std::move(event)); + continue; + } - // Signal the element to do any of its idle tasks. - auto process_error = element->ProcessIdle(); - if (process_error.has_error()) { - auto err = process_error.error(); - if (err == OUT_OF_DATA) { - // If we ran out of data, then place ourselves into the pause state. - // We will be woken up when there's something to do. - element->ElementState(STATE_PAUSE); - continue; - } else { - ESP_LOGE(kTag, "failed to process idle"); - break; // TODO. + auto process_chunk_res = + element->ProcessChunk(event->chunk_data.bytes); + if (process_chunk_res.has_error()) { + // TODO(jacqueline) + ESP_LOGE(kTag, "failed to process chunk"); + continue; + } } } } |
