From a7df2855889055976956a58d2a36f23626371ee9 Mon Sep 17 00:00:00 2001 From: jacqueline Date: Wed, 23 Nov 2022 17:15:06 +1100 Subject: Mostly done pipeline arch. Now onto cleanup and building. --- src/audio/chunk.cpp | 142 +++++++++++++++++++++++++++++++--------------------- 1 file changed, 85 insertions(+), 57 deletions(-) (limited to 'src/audio/chunk.cpp') diff --git a/src/audio/chunk.cpp b/src/audio/chunk.cpp index 40564069..a157b946 100644 --- a/src/audio/chunk.cpp +++ b/src/audio/chunk.cpp @@ -1,14 +1,21 @@ #include "chunk.hpp" -#include "cbor_encoder.hpp" -#include "cbor_decoder.hpp" #include +#include #include +#include "cbor_decoder.hpp" +#include "cbor_encoder.hpp" #include "esp-idf/components/cbor/tinycbor/src/cbor.h" #include "stream_message.hpp" namespace audio { +// TODO: tune. +static const std::size_t kMaxChunkSize = 512; + +// TODO: tune +static const std::size_t kWorkingBufferSize = kMaxChunkSize * 1.5; + /* * The amount of space to allocate for the first chunk's header. After the first * chunk, we have a more concrete idea of the header's size and can allocate @@ -17,15 +24,16 @@ namespace audio { // TODO: measure how big headers tend to be to pick a better value. static const size_t kInitialHeaderSize = 32; -auto WriteChunksToStream(MessageBufferHandle_t *stream, uint8_t *working_buffer, size_t working_buffer_length, std::function callback, TickType_t max_wait) -> EncodeWriteResult { - +auto WriteChunksToStream(MessageBufferHandle_t* stream, + uint8_t* working_buffer, + size_t working_buffer_length, + std::function callback, + TickType_t max_wait) -> EncodeWriteResult { size_t header_size = kInitialHeaderSize; while (1) { // First, ask the callback for some data to write. - size_t chunk_size = - callback( - working_buffer + header_size, - working_buffer_length - header_size); + size_t chunk_size = callback(working_buffer + header_size, + working_buffer_length - header_size); if (chunk_size == 0) { // They had nothing for us, so bail out. @@ -33,7 +41,8 @@ auto WriteChunksToStream(MessageBufferHandle_t *stream, uint8_t *working_buffer, } // Put together a header. - cbor::Encoder encoder(cbor::CONTAINER_ARRAY, 3, working_buffer, working_buffer_length); + cbor::Encoder encoder(cbor::CONTAINER_ARRAY, 3, working_buffer, + working_buffer_length); encoder.WriteUnsigned(TYPE_CHUNK_HEADER); encoder.WriteUnsigned(header_size); encoder.WriteUnsigned(chunk_size); @@ -52,9 +61,8 @@ auto WriteChunksToStream(MessageBufferHandle_t *stream, uint8_t *working_buffer, // Try to write to the buffer. Note the return type here will be either 0 or // header_size + chunk_size, as MessageBuffer doesn't allow partial writes. - size_t actual_write_size = - xMessageBufferSend( - *stream, working_buffer, header_size + chunk_size, max_wait); + size_t actual_write_size = xMessageBufferSend( + *stream, working_buffer, header_size + chunk_size, max_wait); header_size = new_header_size; @@ -67,58 +75,78 @@ auto WriteChunksToStream(MessageBufferHandle_t *stream, uint8_t *working_buffer, } } -auto ReadChunksFromStream(MessageBufferHandle_t *stream, uint8_t *working_buffer, size_t working_buffer_length, std::function callback, TickType_t max_wait) -> EncodeReadResult { - // Spillover if the previous iteration did not consume all of the input. - size_t leftover_bytes = 0; - while (1) { - // First, wait for a message to arrive over the buffer. - size_t read_size = - xMessageBufferReceive( - *stream, working_buffer + leftover_bytes, working_buffer_length - leftover_bytes, max_wait); +ChunkReader::ChunkReader(MessageBufferHandle_t* stream) : stream_(stream) { + working_buffer_ = heap_caps_malloc(kWorkingBufferSize, MALLOC_CAP_SPIRAM); +}; - if (read_size == 0) { - return CHUNK_READ_TIMEOUT; - } +ChunkReader::~ChunkReader() { + free(working_buffer_); +} - auto decoder = cbor::MapDecoder::Create(working_buffer + leftover_bytes, read_size); - if (decoder.has_error()) { - // Weird; this implies someone is shoving invalid data into the buffer. - return CHUNK_DECODING_ERROR; - } +auto ChunkReader::Reset() -> void { + leftover_bytes_ = 0; + last_message_size_ = 0; +} - MessageType type = decoder.value().ParseUnsigned().value_or(TYPE_UNKNOWN); - if (type != TYPE_CHUNK_HEADER) { - // This message wasn't for us, so put it in a consistent place and let the - // caller handle it. - memmove(working_buffer, working_buffer + leftover_bytes, read_size); - return CHUNK_STREAM_ENDED; - } +auto ChunkReader::GetLastMessage() -> std::pair { + return std::make_pair(working_buffer_ + leftover_bytes_, last_message_size_); +} - // Work the size and position of the chunk. - header_length = decoder.ParseUnsigned().value_or(0); - chunk_length = decoder.ParseUnsigned().value_or(0); - if (decoder.Failed()) { - return CHUNK_DECODING_ERROR; - } +auto ChunkReader::ReadChunkFromStream( + std::function(uint8_t*, size_t)> callback, + TickType_t max_wait) -> EncodeReadResult { + // First, wait for a message to arrive over the buffer. + last_message_size_ = + xMessageBufferReceive(*stream_, working_buffer_ + leftover_bytes_, + kWorkingBufferSize - leftover_bytes_, max_wait); - // Now we need to stick the end of the last chunk (if it exists) onto the - // front of the new chunk. Do it this way around bc we assume the old chunk - // is shorter, and therefore faster to move. - uint8_t *combined_buffer = working_buffer + header_length - leftover_bytes; - size_t combined_buffer_size = leftover_bytes + chunk_length; - if (leftover_bytes > 0) { - memmove(combined_buffer, working_buffer, leftover_bytes); - } + if (last_message_size_ == 0) { + return CHUNK_READ_TIMEOUT; + } - // Tell the callback about the new data. - size_t amount_processed = callback(combined_buffer, combined_buffer_size); + auto decoder = cbor::MapDecoder::Create(working_buffer_ + leftover_bytes_, + last_message_size_); + if (decoder.has_error()) { + // Weird; this implies someone is shoving invalid data into the buffer. + return CHUNK_DECODING_ERROR; + } - // Prepare for the next iteration. - leftover_bytes = combined_buffer_size - amount_processed; - if (leftover_bytes > 0) { - memmove(working_buffer, combined_buffer + amount_processed, leftover_bytes); - } + MessageType type = decoder.value().ParseUnsigned().value_or(TYPE_UNKNOWN); + if (type != TYPE_CHUNK_HEADER) { + // This message wasn't for us, so let the caller handle it. + Reset(); + return CHUNK_STREAM_ENDED; + } + + // Work the size and position of the chunk. + header_length = decoder.ParseUnsigned().value_or(0); + chunk_length = decoder.ParseUnsigned().value_or(0); + if (decoder.Failed()) { + return CHUNK_DECODING_ERROR; + } + + // Now we need to stick the end of the last chunk (if it exists) onto the + // front of the new chunk. Do it this way around bc we assume the old chunk + // is shorter, and therefore faster to move. + uint8_t* combined_buffer = working_buffer_ + header_length - leftover_bytes_; + size_t combined_buffer_size = leftover_bytes_ + chunk_length; + if (leftover_bytes_ > 0) { + memmove(combined_buffer, working_buffer_, leftover_bytes_); + } + + // Tell the callback about the new data. + std::optional amount_processed = + callback(combined_buffer, combined_buffer_size); + if (!amount_processed) { + return CHUNK_PROCESSING_ERROR; + } + + // Prepare for the next iteration. + leftover_bytes_ = combined_buffer_size - amount_processed.value(); + if (leftover_bytes_ > 0) { + memmove(working_buffer_, combined_buffer + amount_processed.value(), + leftover_bytes_); } } -} // namespace audio +} // namespace audio -- cgit v1.2.3