summaryrefslogtreecommitdiff
path: root/src/audio/audio_task.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/audio/audio_task.cpp')
-rw-r--r--src/audio/audio_task.cpp146
1 files changed, 78 insertions, 68 deletions
diff --git a/src/audio/audio_task.cpp b/src/audio/audio_task.cpp
index e3641973..f0a01b6c 100644
--- a/src/audio/audio_task.cpp
+++ b/src/audio/audio_task.cpp
@@ -3,6 +3,7 @@
#include <stdlib.h>
#include <cstdint>
+#include <deque>
#include <memory>
#include "audio_element_handle.hpp"
@@ -10,19 +11,21 @@
#include "esp_heap_caps.h"
#include "esp_log.h"
#include "freertos/portmacro.h"
+#include "freertos/projdefs.h"
#include "freertos/queue.h"
-#include "freertos/stream_buffer.h"
#include "span.hpp"
#include "audio_element.hpp"
#include "chunk.hpp"
+#include "stream_event.hpp"
#include "stream_info.hpp"
#include "stream_message.hpp"
+#include "sys/_stdint.h"
#include "tasks.hpp"
namespace audio {
- static const char *kTag = "task";
+static const char* kTag = "task";
auto StartAudioTask(const std::string& name,
std::shared_ptr<IAudioElement> element)
@@ -47,84 +50,91 @@ void AudioTaskMain(void* args) {
std::shared_ptr<IAudioElement> element = std::move(real_args->element);
delete real_args;
- ChunkReader chunk_reader = ChunkReader(element->InputBuffer());
+ // Queue of events that we have received on our input queue, but not yet
+ // processed.
+ std::deque<std::unique_ptr<StreamEvent>> pending_events;
while (element->ElementState() != STATE_QUIT) {
- if (element->ElementState() == STATE_PAUSE) {
- // TODO: park with a condition variable or something?
- vTaskDelay(1000);
+ // First, we pull events from our input queue into pending_events. This
+ // keeps us responsive to any events that need to be handled immediately.
+ // Then we check if there's any events to flush downstream.
+ // Then we pass anything requiring processing to the element.
+
+ bool has_work_to_do =
+ (!pending_events.empty() || element->HasUnflushedOutput() ||
+ element->HasUnprocessedInput()) &&
+ !element->IsOverBuffered();
+
+ // If we have no new events to process and the element has nothing left to
+ // do, then just delay forever waiting for a new event.
+ TickType_t ticks_to_wait = has_work_to_do ? 0 : portMAX_DELAY;
+
+ StreamEvent* event_ptr = nullptr;
+ bool has_event =
+ xQueueReceive(element->InputEventQueue(), &event_ptr, ticks_to_wait);
+
+ if (has_event && event_ptr != nullptr) {
+ std::unique_ptr<StreamEvent> event(event_ptr);
+ if (event->tag == StreamEvent::CHUNK_NOTIFICATION) {
+ element->OnChunkProcessed();
+ } else {
+ // This isn't an event that needs to be actioned immediately. Add it
+ // to our work queue.
+ pending_events.push_back(std::move(event));
+ }
+ // Loop again, so that we service all incoming events before doing our
+ // possibly expensive processing.
continue;
}
- cpp::result<size_t, AudioProcessingError> process_res;
-
- // If this element has an input stream, then our top priority is
- // processing any chunks from it. Try doing this first, then fall back to
- // the other cases.
- bool has_received_message = false;
- ChunkReadResult chunk_res = chunk_reader.ReadChunkFromStream(
- [&](cpp::span<std::byte> data) -> std::optional<size_t> {
- process_res = element->ProcessChunk(data);
- if (process_res.has_value()) {
- return process_res.value();
- } else {
- return {};
- }
- },
- 0);
-
- if (chunk_res == CHUNK_PROCESSING_ERROR ||
- chunk_res == CHUNK_DECODING_ERROR) {
- ESP_LOGE(kTag, "failed to process chunk");
- break; // TODO.
- } else if (chunk_res == CHUNK_STREAM_ENDED) {
- has_received_message = true;
+ // We have no new events. Next, see if there's anything that needs to be
+ // flushed.
+ if (element->HasUnflushedOutput() && !element->FlushBufferedOutput()) {
+ // We had things to flush, but couldn't send it all. This probably
+ // implies that the downstream element is having issues servicing its
+ // input queue, so hold off for a moment before retrying.
+ ESP_LOGW(kTag, "failed to flush buffered output");
+ vTaskDelay(pdMS_TO_TICKS(100));
+ continue;
}
- if (has_received_message) {
- auto message = chunk_reader.GetLastMessage();
- MessageType type = ReadMessageType(message);
- if (type == TYPE_STREAM_INFO) {
- auto parse_res = ReadMessage<StreamInfo>(&StreamInfo::Parse, message);
- if (parse_res.has_error()) {
- ESP_LOGE(kTag, "failed to parse stream info");
- break; // TODO.
- }
- auto info_res = element->ProcessStreamInfo(parse_res.value());
- if (info_res.has_error()) {
- ESP_LOGE(kTag, "failed to process stream info");
- break; // TODO.
- }
+ if (element->HasUnprocessedInput()) {
+ auto process_res = element->Process();
+ if (!process_res.has_error() || process_res.error() != OUT_OF_DATA) {
+ // TODO: log!
}
+ continue;
}
- // Chunk reading must have timed out, or we don't have an input stream.
- ElementState state = element->ElementState();
- if (state == STATE_PAUSE) {
- element->PrepareForPause();
-
- vTaskSuspend(NULL);
+ // The element ran out of data, so now it's time to let it process more
+ // input.
+ while (!pending_events.empty()) {
+ auto event = std::move(pending_events.front());
+ pending_events.pop_front();
- // Zzzzzz...
-
- // When we wake up, skip straight to the start of the loop again.
- continue;
- } else if (state == STATE_QUIT) {
- break;
- }
+ if (event->tag == StreamEvent::STREAM_INFO) {
+ auto process_res = element->ProcessStreamInfo(*event->stream_info);
+ if (process_res.has_error()) {
+ // TODO(jacqueline)
+ ESP_LOGE(kTag, "failed to process stream info");
+ }
+ } else if (event->tag == StreamEvent::CHUNK_DATA) {
+ StreamEvent* callback = new StreamEvent();
+ callback->source = element->InputEventQueue();
+ callback->tag = StreamEvent::CHUNK_NOTIFICATION;
+ if (!xQueueSend(event->source, callback, 0)) {
+ // TODO: log? crash? hmm.
+ pending_events.push_front(std::move(event));
+ continue;
+ }
- // Signal the element to do any of its idle tasks.
- auto process_error = element->ProcessIdle();
- if (process_error.has_error()) {
- auto err = process_error.error();
- if (err == OUT_OF_DATA) {
- // If we ran out of data, then place ourselves into the pause state.
- // We will be woken up when there's something to do.
- element->ElementState(STATE_PAUSE);
- continue;
- } else {
- ESP_LOGE(kTag, "failed to process idle");
- break; // TODO.
+ auto process_chunk_res =
+ element->ProcessChunk(event->chunk_data.bytes);
+ if (process_chunk_res.has_error()) {
+ // TODO(jacqueline)
+ ESP_LOGE(kTag, "failed to process chunk");
+ continue;
+ }
}
}
}