summaryrefslogtreecommitdiff
path: root/src/audio/audio_task.cpp
diff options
context:
space:
mode:
authorjacqueline <me@jacqueline.id.au>2022-11-22 17:05:02 +1100
committerjacqueline <me@jacqueline.id.au>2022-11-22 17:05:02 +1100
commit9176ef187227ffb56c249c5f321cd1bf50d4cfcc (patch)
treea846c8fc4e5788e97d6fca43c2807c4bf0ae0214 /src/audio/audio_task.cpp
parent9f8cfaa7a8abd885785830e03d7c417e856b8a22 (diff)
downloadtangara-fw-9176ef187227ffb56c249c5f321cd1bf50d4cfcc.tar.gz
Add cbor wrapper, and chunk streaming util
Diffstat (limited to 'src/audio/audio_task.cpp')
-rw-r--r--src/audio/audio_task.cpp147
1 files changed, 96 insertions, 51 deletions
diff --git a/src/audio/audio_task.cpp b/src/audio/audio_task.cpp
index 86eb4f4a..ad0834e2 100644
--- a/src/audio/audio_task.cpp
+++ b/src/audio/audio_task.cpp
@@ -4,6 +4,7 @@
#include <cstdint>
+#include "esp-idf/components/cbor/tinycbor/src/cbor.h"
#include "esp_heap_caps.h"
#include "freertos/portmacro.h"
#include "freertos/queue.h"
@@ -11,6 +12,7 @@
#include "audio_element.hpp"
#include "include/audio_element.hpp"
+#include "stream_message.hpp"
namespace audio {
@@ -22,71 +24,114 @@ void audio_task(void* args) {
std::shared_ptr<IAudioElement> element = real_args->element;
delete real_args;
- QueueHandle_t commands = element->InputCommandQueue();
- StreamBufferHandle_t stream = element->InputBuffer();
+ MessageBufferHandle_t *stream = element->InputBuffer();
- // TODO: think about overflow.
- uint8_t current_sequence_number;
- uint8_t* frame_buffer =
+ uint8_t* message_buffer =
(uint8_t*)heap_caps_malloc(kFrameSize, MALLOC_CAP_SPIRAM);
while (1) {
- IAudioElement::Command command;
- ProcessResult result;
+ BaseType_t rtos_res;
+ IAudioElement::ProcessResult result;
- if (!xQueueReceive(commands, &command, kCommandWaitTicks)) {
- result = element->ProcessIdle();
- if (result == IAudioElement::ERROR) {
- break;
- }
- if (result == IAudioElement::OUTPUT_FULL) {
- vTaskDelay(kIdleTaskDelay);
- }
- continue;
- };
- if (command.type == IAudioElement::SEQUENCE_NUMBER) {
- if (command.sequence_number > current_sequence_number) {
- current_sequence_number = command.sequence_number;
- }
- continue;
+ size_t message_size = 0;
+ if (message_buffer != nullptr) {
+ // TODO: tune delay.
+ message_size = xMessageBufferReceive(stream, &message_buffer, kFrameSize, portMAX_DELAY);
}
- if (command.type == IAudioElement::READ) {
- assert(command.read_size <= kFrameSize);
- assert(stream != NULL);
- xStreamBufferReceive(stream, &frame_buffer, command.read_size, 0);
-
- if (command.sequence_number == current_sequence_number) {
- result = element->ProcessData(frame_buffer, command.read_size);
- if (result == IAudioElement::ERROR) {
- break;
- }
- if (result == IAudioElement::OUTPUT_FULL) {
- // TODO: Do we care about this? could just park indefinitely.
- }
- }
-
+ if (message_size == 0) {
+ element->ProcessIdle();
continue;
}
- if (command.type == IAudioElement::ELEMENT) {
- assert(command.data != NULL);
- if (command.sequence_number == current_sequence_number) {
- result = element->ProcessElementCommand(command.data);
- if (result == IAudioElement::ERROR) {
- break;
- }
- if (result == IAudioElement::OUTPUT_FULL) {
- // TODO: what does this mean lol
- }
- } else {
- element->SkipElementCommand(command.data);
+ // We got a valid message. Check what kind it is so that we know how to
+ // process it.
+ CborParser parser;
+ CborValue value;
+ cbor_parser_init(message_buffer, message_size, &parser, &value);
+
+ MessageType message_type;
+ if (!cbor_value_is_integer(&value) || !cbor_value_get_integer(&value, &message_type)) {
+ // We weren't able to parse the message type. This is really bad, so just
+ // abort.
+ break; // TODO.
+ }
+
+ if (message_type == STREAM_INFO) {
+ errs = StreamInfo::Create(message_buffer, message_size).map(element->ProcessStreamInfo);
+ if (errs.has_error) {
+ // TODO;
}
+ } else if (message_type == CHUNK_HEADER) {
+ } else {
+ // TODO.
}
- if (command.type == IAudioElement::QUIT) {
- break;
+ cbor_value_
+ if (!xQueueReceive(commands, &command, wait_time)) {
+ if (bytes_in_stream > 0) {
+ size_t read_length = std::min(kMaxFrameSize - leftover_data, bytes_in_stream);
+ xStreamBufferReceive(stream, &frame_buffer + leftover_data, read_length, 0);
+
+ uint8_t *data_in = frame_buffer;
+ result = element->ProcessData(&data_in, read_length);
+ if (result == IAudioElement::ERROR) {
+ break;
+ }
+
+ if (result == IAudioElement::LEFTOVER_DATA) {
+ leftover_data = frame_buffer + read_length - data_in;
+ memmove(frame_buffer, data_in, leftover_data);
+ } else {
+ leftover_data = 0;
+ }
+ } else {
+ result = element->ProcessIdle();
+ if (result == IAudioElement::ERROR) {
+ break;
+ }
+ if (result == IAudioElement::OUTPUT_FULL) {
+ vTaskDelay(kIdleTaskDelay);
+ }
+ }
+ } else {
+ if (command.type == IAudioElement::SEQUENCE_NUMBER) {
+ if (command.sequence_number > current_sequence_number) {
+ current_sequence_number = command.sequence_number;
+ bytes_in_stream = 0;
+ }
+ } else if (command.type == IAudioElement::READ) {
+ assert(command.read_size <= kFrameSize);
+ assert(stream != NULL);
+
+ if (command.sequence_number == current_sequence_number) {
+ bytes_in_stream += command.read_size;
+ } else {
+ // This data is for a different stream, so just discard it.
+ xStreamBufferReceive(stream, &frame_buffer, command.read_size, 0);
+ }
+ } else if (command.type == IAudioElement::ELEMENT) {
+ assert(command.data != NULL);
+ if (command.sequence_number == current_sequence_number) {
+ if (bytes_in_stream > 0) {
+ // We're not ready to handle this yet, so put it back.
+ xQueueSendToFront(commands, &command, kMaxWaitTicks);
+ } else {
+ result = element->ProcessElementCommand(command.data);
+ if (result == IAudioElement::ERROR) {
+ break;
+ }
+ if (result == IAudioElement::OUTPUT_FULL) {
+ // TODO: what does this mean lol
+ }
+ }
+ } else {
+ element->SkipElementCommand(command.data);
+ }
+ } else if (command.type == IAudioElement::QUIT) {
+ break;
+ }
}
}