summaryrefslogtreecommitdiff
path: root/src/audio/audio_task.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/audio/audio_task.cpp')
-rw-r--r--src/audio/audio_task.cpp165
1 files changed, 59 insertions, 106 deletions
diff --git a/src/audio/audio_task.cpp b/src/audio/audio_task.cpp
index ad0834e2..a125548a 100644
--- a/src/audio/audio_task.cpp
+++ b/src/audio/audio_task.cpp
@@ -4,6 +4,8 @@
#include <cstdint>
+#include "cbor_decoder.hpp"
+#include "chunk.hpp"
#include "esp-idf/components/cbor/tinycbor/src/cbor.h"
#include "esp_heap_caps.h"
#include "freertos/portmacro.h"
@@ -13,132 +15,83 @@
#include "audio_element.hpp"
#include "include/audio_element.hpp"
#include "stream_message.hpp"
+#include "tasks.hpp"
namespace audio {
static const TickType_t kCommandWaitTicks = 1;
static const TickType_t kIdleTaskDelay = 1;
+static const size_t kChunkBufferSize = kMaxChunkSize * 1.5;
-void audio_task(void* args) {
+auto StartAudioTask(const std::string& name,
+ std::shared_ptr<IAudioElement>& element) -> void {
+ AudioTaskArgs* args = new AudioTaskArgs(element);
+ xTaskCreate(&AudioTaskMain, name.c_str(), element->StackSizeBytes(), args,
+ kTaskPriorityAudio, NULL);
+}
+
+void AudioTaskMain(void* args) {
AudioTaskArgs* real_args = reinterpret_cast<AudioTaskArgs*>(args);
- std::shared_ptr<IAudioElement> element = real_args->element;
+ std::shared_ptr<IAudioElement> element = std::move(real_args->element);
delete real_args;
- MessageBufferHandle_t *stream = element->InputBuffer();
-
- uint8_t* message_buffer =
- (uint8_t*)heap_caps_malloc(kFrameSize, MALLOC_CAP_SPIRAM);
+ ChunkReader chunk_reader = ChunkReader(element->InputBuffer());
while (1) {
- BaseType_t rtos_res;
- IAudioElement::ProcessResult result;
-
-
- size_t message_size = 0;
- if (message_buffer != nullptr) {
- // TODO: tune delay.
- message_size = xMessageBufferReceive(stream, &message_buffer, kFrameSize, portMAX_DELAY);
- }
-
- if (message_size == 0) {
- element->ProcessIdle();
- continue;
- }
-
- // We got a valid message. Check what kind it is so that we know how to
- // process it.
- CborParser parser;
- CborValue value;
- cbor_parser_init(message_buffer, message_size, &parser, &value);
-
- MessageType message_type;
- if (!cbor_value_is_integer(&value) || !cbor_value_get_integer(&value, &message_type)) {
- // We weren't able to parse the message type. This is really bad, so just
- // abort.
- break; // TODO.
- }
-
- if (message_type == STREAM_INFO) {
- errs = StreamInfo::Create(message_buffer, message_size).map(element->ProcessStreamInfo);
- if (errs.has_error) {
- // TODO;
+ cpp::result<size_t, IAudioElement::StreamError> process_res;
+
+ // If this element has an input stream, then our top priority is processing
+ // any chunks from it. Try doing this first, then fall back to the other
+ // cases.
+ bool has_received_message = false;
+ if (stream != nullptr) {
+ EncodeReadResult chunk_res = chunk_reader.ReadChunkFromStream(
+ [&](uint8_t* data, std::size_t length) -> std::optional<size_t> {
+ process_res = element->ProcessChunk(data, length);
+ if (process_res.has_value()) {
+ return process_res.value();
+ } else {
+ return {};
+ }
+ },
+ element->IdleTimeout());
+
+ if (chunk_res == CHUNK_PROCESSING_ERROR ||
+ chunk_res == CHUNK_DECODING_ERROR) {
+ break; // TODO.
+ } else if (chunk_res == CHUNK_STREAM_ENDED) {
+ has_received_message = true;
}
- } else if (message_type == CHUNK_HEADER) {
- } else {
- // TODO.
}
- cbor_value_
- if (!xQueueReceive(commands, &command, wait_time)) {
- if (bytes_in_stream > 0) {
- size_t read_length = std::min(kMaxFrameSize - leftover_data, bytes_in_stream);
- xStreamBufferReceive(stream, &frame_buffer + leftover_data, read_length, 0);
-
- uint8_t *data_in = frame_buffer;
- result = element->ProcessData(&data_in, read_length);
- if (result == IAudioElement::ERROR) {
- break;
- }
-
- if (result == IAudioElement::LEFTOVER_DATA) {
- leftover_data = frame_buffer + read_length - data_in;
- memmove(frame_buffer, data_in, leftover_data);
- } else {
- leftover_data = 0;
- }
- } else {
- result = element->ProcessIdle();
- if (result == IAudioElement::ERROR) {
- break;
- }
- if (result == IAudioElement::OUTPUT_FULL) {
- vTaskDelay(kIdleTaskDelay);
- }
- }
- } else {
- if (command.type == IAudioElement::SEQUENCE_NUMBER) {
- if (command.sequence_number > current_sequence_number) {
- current_sequence_number = command.sequence_number;
- bytes_in_stream = 0;
- }
- } else if (command.type == IAudioElement::READ) {
- assert(command.read_size <= kFrameSize);
- assert(stream != NULL);
-
- if (command.sequence_number == current_sequence_number) {
- bytes_in_stream += command.read_size;
- } else {
- // This data is for a different stream, so just discard it.
- xStreamBufferReceive(stream, &frame_buffer, command.read_size, 0);
- }
- } else if (command.type == IAudioElement::ELEMENT) {
- assert(command.data != NULL);
- if (command.sequence_number == current_sequence_number) {
- if (bytes_in_stream > 0) {
- // We're not ready to handle this yet, so put it back.
- xQueueSendToFront(commands, &command, kMaxWaitTicks);
- } else {
- result = element->ProcessElementCommand(command.data);
- if (result == IAudioElement::ERROR) {
- break;
- }
- if (result == IAudioElement::OUTPUT_FULL) {
- // TODO: what does this mean lol
- }
- }
- } else {
- element->SkipElementCommand(command.data);
- }
- } else if (command.type == IAudioElement::QUIT) {
+ if (has_received_message) {
+ auto& [buffer, length] = chunk_reader.GetLastMessage();
+ auto decoder_res = cbor::ArrayDecoder::Create(buffer, length);
+ if (decoder_res.has_error()) {
+ // TODO.
break;
}
+ auto decoder = decoder_res.value();
+ MessageType message_type = decoder->NextValue();
+ if (message_type == TYPE_STREAM_INFO) {
+ element->ProcessStreamInfo(StreamInfo(decoder->Iterator()););
+ }
+ }
+
+ // TODO: Do any out of band reading, such a a pause command, here.
+
+ // Chunk reading must have timed out, or we don't have an input stream.
+ // Signal the element to do any of its idle tasks.
+ process_res = element->ProcessIdle();
+ if (process_res.has_error()) {
+ break; // TODO.
}
}
- element = nullptr;
- free(frame_buffer);
+ element.clear();
+ free(chunk_buffer_);
- xTaskDelete(NULL);
+ vTaskDelete(NULL);
}
} // namespace audio