diff options
| author | jacqueline <me@jacqueline.id.au> | 2022-11-23 17:15:06 +1100 |
|---|---|---|
| committer | jacqueline <me@jacqueline.id.au> | 2022-11-23 17:15:06 +1100 |
| commit | a7df2855889055976956a58d2a36f23626371ee9 (patch) | |
| tree | 16e180e57f84474acaeb1893208cc07e278af6f4 /src/audio/audio_task.cpp | |
| parent | dfa9ab6e04689b99267092e016a91d9254f94cd8 (diff) | |
| download | tangara-fw-a7df2855889055976956a58d2a36f23626371ee9.tar.gz | |
Mostly done pipeline arch. Now onto cleanup and building.
Diffstat (limited to 'src/audio/audio_task.cpp')
| -rw-r--r-- | src/audio/audio_task.cpp | 165 |
1 files changed, 59 insertions, 106 deletions
diff --git a/src/audio/audio_task.cpp b/src/audio/audio_task.cpp index ad0834e2..a125548a 100644 --- a/src/audio/audio_task.cpp +++ b/src/audio/audio_task.cpp @@ -4,6 +4,8 @@ #include <cstdint> +#include "cbor_decoder.hpp" +#include "chunk.hpp" #include "esp-idf/components/cbor/tinycbor/src/cbor.h" #include "esp_heap_caps.h" #include "freertos/portmacro.h" @@ -13,132 +15,83 @@ #include "audio_element.hpp" #include "include/audio_element.hpp" #include "stream_message.hpp" +#include "tasks.hpp" namespace audio { static const TickType_t kCommandWaitTicks = 1; static const TickType_t kIdleTaskDelay = 1; +static const size_t kChunkBufferSize = kMaxChunkSize * 1.5; -void audio_task(void* args) { +auto StartAudioTask(const std::string& name, + std::shared_ptr<IAudioElement>& element) -> void { + AudioTaskArgs* args = new AudioTaskArgs(element); + xTaskCreate(&AudioTaskMain, name.c_str(), element->StackSizeBytes(), args, + kTaskPriorityAudio, NULL); +} + +void AudioTaskMain(void* args) { AudioTaskArgs* real_args = reinterpret_cast<AudioTaskArgs*>(args); - std::shared_ptr<IAudioElement> element = real_args->element; + std::shared_ptr<IAudioElement> element = std::move(real_args->element); delete real_args; - MessageBufferHandle_t *stream = element->InputBuffer(); - - uint8_t* message_buffer = - (uint8_t*)heap_caps_malloc(kFrameSize, MALLOC_CAP_SPIRAM); + ChunkReader chunk_reader = ChunkReader(element->InputBuffer()); while (1) { - BaseType_t rtos_res; - IAudioElement::ProcessResult result; - - - size_t message_size = 0; - if (message_buffer != nullptr) { - // TODO: tune delay. - message_size = xMessageBufferReceive(stream, &message_buffer, kFrameSize, portMAX_DELAY); - } - - if (message_size == 0) { - element->ProcessIdle(); - continue; - } - - // We got a valid message. Check what kind it is so that we know how to - // process it. - CborParser parser; - CborValue value; - cbor_parser_init(message_buffer, message_size, &parser, &value); - - MessageType message_type; - if (!cbor_value_is_integer(&value) || !cbor_value_get_integer(&value, &message_type)) { - // We weren't able to parse the message type. This is really bad, so just - // abort. - break; // TODO. - } - - if (message_type == STREAM_INFO) { - errs = StreamInfo::Create(message_buffer, message_size).map(element->ProcessStreamInfo); - if (errs.has_error) { - // TODO; + cpp::result<size_t, IAudioElement::StreamError> process_res; + + // If this element has an input stream, then our top priority is processing + // any chunks from it. Try doing this first, then fall back to the other + // cases. + bool has_received_message = false; + if (stream != nullptr) { + EncodeReadResult chunk_res = chunk_reader.ReadChunkFromStream( + [&](uint8_t* data, std::size_t length) -> std::optional<size_t> { + process_res = element->ProcessChunk(data, length); + if (process_res.has_value()) { + return process_res.value(); + } else { + return {}; + } + }, + element->IdleTimeout()); + + if (chunk_res == CHUNK_PROCESSING_ERROR || + chunk_res == CHUNK_DECODING_ERROR) { + break; // TODO. + } else if (chunk_res == CHUNK_STREAM_ENDED) { + has_received_message = true; } - } else if (message_type == CHUNK_HEADER) { - } else { - // TODO. } - cbor_value_ - if (!xQueueReceive(commands, &command, wait_time)) { - if (bytes_in_stream > 0) { - size_t read_length = std::min(kMaxFrameSize - leftover_data, bytes_in_stream); - xStreamBufferReceive(stream, &frame_buffer + leftover_data, read_length, 0); - - uint8_t *data_in = frame_buffer; - result = element->ProcessData(&data_in, read_length); - if (result == IAudioElement::ERROR) { - break; - } - - if (result == IAudioElement::LEFTOVER_DATA) { - leftover_data = frame_buffer + read_length - data_in; - memmove(frame_buffer, data_in, leftover_data); - } else { - leftover_data = 0; - } - } else { - result = element->ProcessIdle(); - if (result == IAudioElement::ERROR) { - break; - } - if (result == IAudioElement::OUTPUT_FULL) { - vTaskDelay(kIdleTaskDelay); - } - } - } else { - if (command.type == IAudioElement::SEQUENCE_NUMBER) { - if (command.sequence_number > current_sequence_number) { - current_sequence_number = command.sequence_number; - bytes_in_stream = 0; - } - } else if (command.type == IAudioElement::READ) { - assert(command.read_size <= kFrameSize); - assert(stream != NULL); - - if (command.sequence_number == current_sequence_number) { - bytes_in_stream += command.read_size; - } else { - // This data is for a different stream, so just discard it. - xStreamBufferReceive(stream, &frame_buffer, command.read_size, 0); - } - } else if (command.type == IAudioElement::ELEMENT) { - assert(command.data != NULL); - if (command.sequence_number == current_sequence_number) { - if (bytes_in_stream > 0) { - // We're not ready to handle this yet, so put it back. - xQueueSendToFront(commands, &command, kMaxWaitTicks); - } else { - result = element->ProcessElementCommand(command.data); - if (result == IAudioElement::ERROR) { - break; - } - if (result == IAudioElement::OUTPUT_FULL) { - // TODO: what does this mean lol - } - } - } else { - element->SkipElementCommand(command.data); - } - } else if (command.type == IAudioElement::QUIT) { + if (has_received_message) { + auto& [buffer, length] = chunk_reader.GetLastMessage(); + auto decoder_res = cbor::ArrayDecoder::Create(buffer, length); + if (decoder_res.has_error()) { + // TODO. break; } + auto decoder = decoder_res.value(); + MessageType message_type = decoder->NextValue(); + if (message_type == TYPE_STREAM_INFO) { + element->ProcessStreamInfo(StreamInfo(decoder->Iterator());); + } + } + + // TODO: Do any out of band reading, such a a pause command, here. + + // Chunk reading must have timed out, or we don't have an input stream. + // Signal the element to do any of its idle tasks. + process_res = element->ProcessIdle(); + if (process_res.has_error()) { + break; // TODO. } } - element = nullptr; - free(frame_buffer); + element.clear(); + free(chunk_buffer_); - xTaskDelete(NULL); + vTaskDelete(NULL); } } // namespace audio |
