summaryrefslogtreecommitdiff
path: root/src/audio/chunk.cpp
diff options
context:
space:
mode:
authorjacqueline <me@jacqueline.id.au>2023-01-26 15:02:57 +1100
committerjacqueline <me@jacqueline.id.au>2023-01-26 15:02:57 +1100
commitf6dcd845fc80da4e3043146e4362258dd8e0c0a1 (patch)
tree91410899a83fcc2dfb1eb3ab4837a11e401366fc /src/audio/chunk.cpp
parente7f926e2c376ccd4f4a4d6f4b104f3c23b0059dc (diff)
downloadtangara-fw-f6dcd845fc80da4e3043146e4362258dd8e0c0a1.tar.gz
Switch from MessageBuffer to Queue for pipeline comms
Diffstat (limited to 'src/audio/chunk.cpp')
-rw-r--r--src/audio/chunk.cpp139
1 files changed, 23 insertions, 116 deletions
diff --git a/src/audio/chunk.cpp b/src/audio/chunk.cpp
index b37fdfed..baf2aba5 100644
--- a/src/audio/chunk.cpp
+++ b/src/audio/chunk.cpp
@@ -14,130 +14,37 @@
namespace audio {
-static const char* kTag = "chunk";
+ChunkReader::ChunkReader(std::size_t chunk_size)
+ : raw_working_buffer_(static_cast<std::byte*>(
+ heap_caps_malloc(chunk_size * 1.5, MALLOC_CAP_SPIRAM))),
+ working_buffer_(raw_working_buffer_, chunk_size * 1.5) {}
-ChunkWriter::ChunkWriter(StreamBuffer* buffer)
- : stream_(buffer), leftover_bytes_(0) {}
-
-ChunkWriter::~ChunkWriter() {}
-
-auto ChunkWriter::Reset() -> void {
- leftover_bytes_ = 0;
-}
-
-auto ChunkWriter::WriteChunkToStream(
- std::function<size_t(cpp::span<std::byte>)> callback,
- TickType_t max_wait) -> ChunkWriteResult {
- cpp::span<std::byte> write_buffer = stream_->WriteBuffer();
- // First, write out our chunk header so we know how much space to give to
- // the callback.
- auto header_size = WriteTypeOnlyMessage(TYPE_CHUNK_HEADER, write_buffer);
- if (header_size.has_error()) {
- return CHUNK_ENCODING_ERROR;
- }
-
- // Now we can ask the callback to fill the remaining space. If the previous
- // call to this method timed out, then we may already have the data we need
- // in our write buffer.
- size_t chunk_size;
- if (leftover_bytes_ > 0) {
- chunk_size = leftover_bytes_;
- } else {
- chunk_size = std::invoke(
- callback,
- write_buffer.subspan(header_size.value(),
- write_buffer.size() - header_size.value()));
- }
-
- if (chunk_size == 0) {
- // They had nothing for us, so bail out.
- return CHUNK_OUT_OF_DATA;
- }
-
- // Try to write to the buffer. Note the return type here will be either 0 or
- // header_size + chunk_size, as MessageBuffer doesn't allow partial writes.
- size_t intended_write_size = header_size.value() + chunk_size;
- ESP_LOGI(kTag, "writing chunk of size %d", intended_write_size);
- size_t actual_write_size =
- xMessageBufferSend(stream_->Handle(), write_buffer.data(),
- intended_write_size, max_wait);
-
- if (actual_write_size == 0) {
- leftover_bytes_ = chunk_size;
- return CHUNK_WRITE_TIMEOUT;
- } else {
- leftover_bytes_ = 0;
- }
-
- return CHUNK_WRITE_OKAY;
+ChunkReader::~ChunkReader() {
+ free(raw_working_buffer_);
}
-ChunkReader::ChunkReader(StreamBuffer* stream) : stream_(stream) {}
-
-ChunkReader::~ChunkReader() {}
-
-auto ChunkReader::Reset() -> void {
+auto ChunkReader::HandleNewData(cpp::span<std::byte> data)
+ -> cpp::span<std::byte> {
+ // Copy the new data onto the front for anything that was left over from the
+ // last portion. Note: this could be optimised for the '0 leftover bytes'
+ // case, which technically shouldn't need a copy.
+ cpp::span<std::byte> new_data_dest = working_buffer_.subspan(leftover_bytes_);
+ std::copy(data.begin(), data.end(), new_data_dest.begin());
+ last_data_in_working_buffer_ =
+ working_buffer_.first(leftover_bytes_ + data.size());
leftover_bytes_ = 0;
- last_message_size_ = 0;
-}
-
-auto ChunkReader::GetLastMessage() -> cpp::span<std::byte> {
- return stream_->ReadBuffer().subspan(leftover_bytes_, last_message_size_);
+ return last_data_in_working_buffer_;
}
-auto ChunkReader::ReadChunkFromStream(
- std::function<std::optional<size_t>(cpp::span<std::byte>)> callback,
- TickType_t max_wait) -> ChunkReadResult {
- // First, wait for a message to arrive over the buffer.
- cpp::span<std::byte> new_data_dest = stream_->ReadBuffer().last(
- stream_->ReadBuffer().size() - leftover_bytes_);
- ESP_LOGI(kTag, "reading chunk of size %d", new_data_dest.size());
- last_message_size_ = xMessageBufferReceive(
- stream_->Handle(), new_data_dest.data(), new_data_dest.size(), max_wait);
-
- if (last_message_size_ == 0) {
- return CHUNK_READ_TIMEOUT;
- }
-
- cpp::span<std::byte> new_data = GetLastMessage();
- MessageType type = ReadMessageType(new_data);
-
- if (type != TYPE_CHUNK_HEADER) {
- // This message wasn't for us, so let the caller handle it.
- Reset();
- return CHUNK_STREAM_ENDED;
- }
-
- // Work the size and position of the chunk.
- auto chunk_data = GetAdditionalData(new_data);
-
- // Now we need to stick the end of the last chunk (if it exists) onto the
- // front of the new chunk. Do it this way around bc we assume the old chunk
- // is shorter, and therefore faster to move.
- cpp::span<std::byte> leftover_data =
- stream_->ReadBuffer().first(leftover_bytes_);
- cpp::span<std::byte> combined_data(chunk_data.data() - leftover_data.size(),
- leftover_data.size() + chunk_data.size());
- if (leftover_bytes_ > 0) {
- std::copy_backward(leftover_data.begin(), leftover_data.end(),
- combined_data.begin());
- }
-
- // Tell the callback about the new data.
- std::optional<size_t> amount_processed = std::invoke(callback, combined_data);
- if (!amount_processed) {
- return CHUNK_PROCESSING_ERROR;
- }
-
- // Prepare for the next iteration.
- leftover_bytes_ = combined_data.size() - amount_processed.value();
+auto ChunkReader::HandleLeftovers(std::size_t bytes_used) -> void {
+ leftover_bytes_ = last_data_in_working_buffer_.size() - bytes_used;
if (leftover_bytes_ > 0) {
- std::copy(combined_data.begin() + amount_processed.value(),
- combined_data.end(), stream_->ReadBuffer().begin());
- return CHUNK_LEFTOVER_DATA;
+ auto data_to_keep = last_data_in_working_buffer_.last(leftover_bytes_);
+ // Copy backwards, since if more than half of the data was unused then the
+ // source and destination will overlap.
+ std::copy_backward(data_to_keep.begin(), data_to_keep.end(),
+ working_buffer_.begin());
}
-
- return CHUNK_READ_OKAY;
}
} // namespace audio