summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/audio/CMakeLists.txt1
-rw-r--r--src/audio/audio_decoder.cpp111
-rw-r--r--src/audio/audio_element.cpp59
-rw-r--r--src/audio/audio_playback.cpp36
-rw-r--r--src/audio/audio_task.cpp146
-rw-r--r--src/audio/chunk.cpp139
-rw-r--r--src/audio/fatfs_audio_input.cpp154
-rw-r--r--src/audio/i2s_audio_output.cpp12
-rw-r--r--src/audio/include/audio_decoder.hpp10
-rw-r--r--src/audio/include/audio_element.hpp63
-rw-r--r--src/audio/include/audio_playback.hpp6
-rw-r--r--src/audio/include/chunk.hpp70
-rw-r--r--src/audio/include/fatfs_audio_input.hpp16
-rw-r--r--src/audio/include/i2s_audio_output.hpp11
-rw-r--r--src/audio/include/stream_event.hpp57
-rw-r--r--src/audio/include/stream_info.hpp12
-rw-r--r--src/audio/stream_buffer.cpp9
-rw-r--r--src/audio/stream_event.cpp75
-rw-r--r--src/audio/stream_message.cpp1
19 files changed, 470 insertions, 518 deletions
diff --git a/src/audio/CMakeLists.txt b/src/audio/CMakeLists.txt
index 4986a7da..57dae1cd 100644
--- a/src/audio/CMakeLists.txt
+++ b/src/audio/CMakeLists.txt
@@ -2,6 +2,7 @@ idf_component_register(
SRCS "audio_decoder.cpp" "audio_task.cpp" "chunk.cpp" "fatfs_audio_input.cpp"
"stream_info.cpp" "stream_message.cpp" "i2s_audio_output.cpp"
"stream_buffer.cpp" "audio_playback.cpp" "audio_element_handle.cpp"
+ "stream_event.cpp" "audio_element.cpp"
INCLUDE_DIRS "include"
REQUIRES "codecs" "drivers" "cbor" "result" "tasks" "span")
diff --git a/src/audio/audio_decoder.cpp b/src/audio/audio_decoder.cpp
index 0b3d9878..8ef90905 100644
--- a/src/audio/audio_decoder.cpp
+++ b/src/audio/audio_decoder.cpp
@@ -4,6 +4,7 @@
#include <cstddef>
#include <cstdint>
+#include <memory>
#include "freertos/FreeRTOS.h"
@@ -14,19 +15,30 @@
#include "audio_element.hpp"
#include "chunk.hpp"
#include "fatfs_audio_input.hpp"
-
-static const char* kTag = "DEC";
+#include "stream_info.hpp"
namespace audio {
+static const std::size_t kSamplesPerChunk = 256;
+
AudioDecoder::AudioDecoder() : IAudioElement(), stream_info_({}) {}
AudioDecoder::~AudioDecoder() {}
+auto AudioDecoder::HasUnprocessedInput() -> bool {
+ return !needs_more_input_ || has_samples_to_send_;
+}
+
auto AudioDecoder::ProcessStreamInfo(const StreamInfo& info)
-> cpp::result<void, AudioProcessingError> {
stream_info_ = info;
+ if (info.ChunkSize()) {
+ chunk_reader_.emplace(info.ChunkSize().value());
+ } else {
+ // TODO.
+ }
+
// Reuse the existing codec if we can. This will help with gapless playback,
// since we can potentially just continue to decode as we were before,
// without any setup overhead.
@@ -42,71 +54,66 @@ auto AudioDecoder::ProcessStreamInfo(const StreamInfo& info)
return cpp::fail(UNSUPPORTED_STREAM);
}
+ // TODO: defer until first header read, so we can give better info about
+ // sample rate, chunk size, etc.
+ auto downstream_info = StreamEvent::CreateStreamInfo(
+ input_events_, std::make_unique<StreamInfo>(info));
+ downstream_info->stream_info->BitsPerSample(32);
+ downstream_info->stream_info->SampleRate(48'000);
+ chunk_size_ = 128;
+ downstream_info->stream_info->ChunkSize(chunk_size_);
+
+ SendOrBufferEvent(std::move(downstream_info));
+
return {};
}
auto AudioDecoder::ProcessChunk(const cpp::span<std::byte>& chunk)
-> cpp::result<size_t, AudioProcessingError> {
- if (current_codec_ == nullptr) {
+ if (current_codec_ == nullptr || !chunk_reader_) {
// Should never happen, but fail explicitly anyway.
return cpp::fail(UNSUPPORTED_STREAM);
}
- current_codec_->SetInput(chunk);
-
- bool has_samples_to_send = false;
- bool needs_more_input = false;
- std::optional<codecs::ICodec::ProcessingError> error = std::nullopt;
- while (1) {
- ChunkWriteResult res = chunk_writer_->WriteChunkToStream(
- [&](cpp::span<std::byte> buffer) -> std::size_t {
- std::size_t bytes_written = 0;
- // Continue filling up the output buffer so long as we have samples
- // leftover, or are able to synthesize more samples from the input.
- while (has_samples_to_send || !needs_more_input) {
- if (!has_samples_to_send) {
- auto result = current_codec_->ProcessNextFrame();
- has_samples_to_send = true;
- if (result.has_error()) {
- error = result.error();
- // End our output stream immediately if the codec barfed.
- return 0;
- } else {
- needs_more_input = result.value();
- }
- } else {
- auto result = current_codec_->WriteOutputSamples(
- buffer.last(buffer.size() - bytes_written));
- bytes_written += result.first;
- has_samples_to_send = !result.second;
- }
- }
- return bytes_written;
- },
- // TODO
- portMAX_DELAY);
-
- switch (res) {
- case CHUNK_WRITE_OKAY:
- break;
- case CHUNK_WRITE_TIMEOUT:
- case CHUNK_OUT_OF_DATA:
+ current_codec_->SetInput(chunk_reader_->HandleNewData(chunk));
+
+ return {};
+}
+
+auto AudioDecoder::Process() -> cpp::result<void, AudioProcessingError> {
+ if (has_samples_to_send_) {
+ // Writing samples is relatively quick (it's just a bunch of memcopy's), so
+ // do them all at once.
+ while (has_samples_to_send_ && !IsOverBuffered()) {
+ auto buffer = StreamEvent::CreateChunkData(input_events_, chunk_size_);
+ auto write_res =
+ current_codec_->WriteOutputSamples(buffer->chunk_data.bytes);
+ buffer->chunk_data.bytes =
+ buffer->chunk_data.bytes.first(write_res.first);
+ has_samples_to_send_ = !write_res.second;
+
+ if (!SendOrBufferEvent(std::move(buffer))) {
return {};
- default:
- return cpp::fail(IO_ERROR);
+ }
}
+ // We will process the next frame during the next call to this method.
+ return {};
}
- if (error) {
- ESP_LOGE(kTag, "Codec encountered error %d", error.value());
- return cpp::fail(IO_ERROR);
- }
+ if (!needs_more_input_) {
+ auto res = current_codec_->ProcessNextFrame();
+ if (res.has_error()) {
+ // todo
+ return {};
+ }
+ needs_more_input_ = res.value();
+ has_samples_to_send_ = true;
- return current_codec_->GetInputPosition();
-}
+ if (needs_more_input_) {
+ chunk_reader_->HandleLeftovers(current_codec_->GetInputPosition());
+ }
+ }
-auto AudioDecoder::ProcessIdle() -> cpp::result<void, AudioProcessingError> {
- // Not used; we delay forever when waiting on IO.
return {};
}
diff --git a/src/audio/audio_element.cpp b/src/audio/audio_element.cpp
new file mode 100644
index 00000000..e1623c36
--- /dev/null
+++ b/src/audio/audio_element.cpp
@@ -0,0 +1,59 @@
+#include "audio_element.hpp"
+
+namespace audio {
+
+IAudioElement::IAudioElement()
+ : input_events_(xQueueCreate(kEventQueueSize, sizeof(StreamEvent))),
+ output_events_(nullptr),
+ unprocessed_output_chunks_(0),
+ buffered_output_(),
+ current_state_(STATE_RUN) {}
+
+IAudioElement::~IAudioElement() {
+ // Ensure we don't leak any memory from events leftover in the queue.
+ while (uxQueueSpacesAvailable(input_events_) < kEventQueueSize) {
+ StreamEvent* event;
+ if (xQueueReceive(input_events_, &event, 0)) {
+ free(event);
+ } else {
+ break;
+ }
+ }
+ // Technically there's a race here if someone is still adding to the queue,
+ // but hopefully the whole pipeline is stopped if an element is being
+ // destroyed.
+ vQueueDelete(input_events_);
+}
+
+auto IAudioElement::SendOrBufferEvent(std::unique_ptr<StreamEvent> event)
+ -> bool {
+ if (event->tag == StreamEvent::CHUNK_DATA) {
+ unprocessed_output_chunks_++;
+ }
+ if (!buffered_output_.empty()) {
+ // To ensure we send data in order, don't try to send if we've already
+ // failed to send something.
+ buffered_output_.push_back(std::move(event));
+ return false;
+ }
+ StreamEvent* raw_event = event.release();
+ if (!xQueueSend(output_events_, raw_event, 0)) {
+ buffered_output_.emplace_front(raw_event);
+ return false;
+ }
+ return true;
+}
+
+auto IAudioElement::FlushBufferedOutput() -> bool {
+ while (!buffered_output_.empty()) {
+ StreamEvent* raw_event = buffered_output_.front().release();
+ buffered_output_.pop_front();
+ if (!xQueueSend(output_events_, raw_event, 0)) {
+ buffered_output_.emplace_front(raw_event);
+ return false;
+ }
+ }
+ return true;
+}
+
+} // namespace audio
diff --git a/src/audio/audio_playback.cpp b/src/audio/audio_playback.cpp
index 7462b4f6..fae096d0 100644
--- a/src/audio/audio_playback.cpp
+++ b/src/audio/audio_playback.cpp
@@ -18,9 +18,6 @@
namespace audio {
-// TODO: idk
-static const std::size_t kMinElementBufferSize = 1024;
-
auto AudioPlayback::create(drivers::GpioExpander* expander,
std::shared_ptr<drivers::SdStorage> storage)
-> cpp::result<std::unique_ptr<AudioPlayback>, Error> {
@@ -37,8 +34,6 @@ auto AudioPlayback::create(drivers::GpioExpander* expander,
auto playback = std::make_unique<AudioPlayback>();
// Configure the pipeline
- source->InputBuffer(&playback->stream_start_);
- sink->OutputBuffer(&playback->stream_end_);
playback->ConnectElements(source.get(), codec.get());
playback->ConnectElements(codec.get(), sink.get());
@@ -52,9 +47,7 @@ auto AudioPlayback::create(drivers::GpioExpander* expander,
return playback;
}
-// TODO(jacqueline): think about sizes
-AudioPlayback::AudioPlayback()
- : stream_start_(128, 256), stream_end_(128, 256) {}
+AudioPlayback::AudioPlayback() {}
AudioPlayback::~AudioPlayback() {
for (auto& element : element_handles_) {
@@ -63,33 +56,16 @@ AudioPlayback::~AudioPlayback() {
}
auto AudioPlayback::Play(const std::string& filename) -> void {
- StreamInfo info;
- info.Path(filename);
-
- std::array<std::byte, 128> dest;
- auto len = WriteMessage(
- TYPE_STREAM_INFO, [&](auto enc) { return info.Encode(enc); }, dest);
+ auto info = std::make_unique<StreamInfo>();
+ info->Path(filename);
+ auto event = StreamEvent::CreateStreamInfo(nullptr, std::move(info));
- if (len.has_error()) {
- // TODO.
- return;
- }
-
- // TODO: short delay, return error on fail
- xMessageBufferSend(*stream_start_.Handle(), dest.data(), len.value(),
- portMAX_DELAY);
+ xQueueSend(input_handle_, event.release(), portMAX_DELAY);
}
auto AudioPlayback::ConnectElements(IAudioElement* src, IAudioElement* sink)
-> void {
- std::size_t chunk_size =
- std::max(src->InputMinChunkSize(), sink->InputMinChunkSize());
- std::size_t buffer_size = std::max(kMinElementBufferSize, chunk_size * 2);
-
- auto buffer = std::make_unique<StreamBuffer>(chunk_size, buffer_size);
- src->OutputBuffer(buffer.get());
- sink->OutputBuffer(buffer.get());
- element_buffers_.push_back(std::move(buffer));
+ src->OutputEventQueue(sink->InputEventQueue());
}
} // namespace audio
diff --git a/src/audio/audio_task.cpp b/src/audio/audio_task.cpp
index e3641973..f0a01b6c 100644
--- a/src/audio/audio_task.cpp
+++ b/src/audio/audio_task.cpp
@@ -3,6 +3,7 @@
#include <stdlib.h>
#include <cstdint>
+#include <deque>
#include <memory>
#include "audio_element_handle.hpp"
@@ -10,19 +11,21 @@
#include "esp_heap_caps.h"
#include "esp_log.h"
#include "freertos/portmacro.h"
+#include "freertos/projdefs.h"
#include "freertos/queue.h"
-#include "freertos/stream_buffer.h"
#include "span.hpp"
#include "audio_element.hpp"
#include "chunk.hpp"
+#include "stream_event.hpp"
#include "stream_info.hpp"
#include "stream_message.hpp"
+#include "sys/_stdint.h"
#include "tasks.hpp"
namespace audio {
- static const char *kTag = "task";
+static const char* kTag = "task";
auto StartAudioTask(const std::string& name,
std::shared_ptr<IAudioElement> element)
@@ -47,84 +50,91 @@ void AudioTaskMain(void* args) {
std::shared_ptr<IAudioElement> element = std::move(real_args->element);
delete real_args;
- ChunkReader chunk_reader = ChunkReader(element->InputBuffer());
+ // Queue of events that we have received on our input queue, but not yet
+ // processed.
+ std::deque<std::unique_ptr<StreamEvent>> pending_events;
while (element->ElementState() != STATE_QUIT) {
- if (element->ElementState() == STATE_PAUSE) {
- // TODO: park with a condition variable or something?
- vTaskDelay(1000);
+ // First, we pull events from our input queue into pending_events. This
+ // keeps us responsive to any events that need to be handled immediately.
+ // Then we check if there's any events to flush downstream.
+ // Then we pass anything requiring processing to the element.
+
+ bool has_work_to_do =
+ (!pending_events.empty() || element->HasUnflushedOutput() ||
+ element->HasUnprocessedInput()) &&
+ !element->IsOverBuffered();
+
+ // If we have no new events to process and the element has nothing left to
+ // do, then just delay forever waiting for a new event.
+ TickType_t ticks_to_wait = has_work_to_do ? 0 : portMAX_DELAY;
+
+ StreamEvent* event_ptr = nullptr;
+ bool has_event =
+ xQueueReceive(element->InputEventQueue(), &event_ptr, ticks_to_wait);
+
+ if (has_event && event_ptr != nullptr) {
+ std::unique_ptr<StreamEvent> event(event_ptr);
+ if (event->tag == StreamEvent::CHUNK_NOTIFICATION) {
+ element->OnChunkProcessed();
+ } else {
+ // This isn't an event that needs to be actioned immediately. Add it
+ // to our work queue.
+ pending_events.push_back(std::move(event));
+ }
+ // Loop again, so that we service all incoming events before doing our
+ // possibly expensive processing.
continue;
}
- cpp::result<size_t, AudioProcessingError> process_res;
-
- // If this element has an input stream, then our top priority is
- // processing any chunks from it. Try doing this first, then fall back to
- // the other cases.
- bool has_received_message = false;
- ChunkReadResult chunk_res = chunk_reader.ReadChunkFromStream(
- [&](cpp::span<std::byte> data) -> std::optional<size_t> {
- process_res = element->ProcessChunk(data);
- if (process_res.has_value()) {
- return process_res.value();
- } else {
- return {};
- }
- },
- 0);
-
- if (chunk_res == CHUNK_PROCESSING_ERROR ||
- chunk_res == CHUNK_DECODING_ERROR) {
- ESP_LOGE(kTag, "failed to process chunk");
- break; // TODO.
- } else if (chunk_res == CHUNK_STREAM_ENDED) {
- has_received_message = true;
+ // We have no new events. Next, see if there's anything that needs to be
+ // flushed.
+ if (element->HasUnflushedOutput() && !element->FlushBufferedOutput()) {
+ // We had things to flush, but couldn't send it all. This probably
+ // implies that the downstream element is having issues servicing its
+ // input queue, so hold off for a moment before retrying.
+ ESP_LOGW(kTag, "failed to flush buffered output");
+ vTaskDelay(pdMS_TO_TICKS(100));
+ continue;
}
- if (has_received_message) {
- auto message = chunk_reader.GetLastMessage();
- MessageType type = ReadMessageType(message);
- if (type == TYPE_STREAM_INFO) {
- auto parse_res = ReadMessage<StreamInfo>(&StreamInfo::Parse, message);
- if (parse_res.has_error()) {
- ESP_LOGE(kTag, "failed to parse stream info");
- break; // TODO.
- }
- auto info_res = element->ProcessStreamInfo(parse_res.value());
- if (info_res.has_error()) {
- ESP_LOGE(kTag, "failed to process stream info");
- break; // TODO.
- }
+ if (element->HasUnprocessedInput()) {
+ auto process_res = element->Process();
+ if (!process_res.has_error() || process_res.error() != OUT_OF_DATA) {
+ // TODO: log!
}
+ continue;
}
- // Chunk reading must have timed out, or we don't have an input stream.
- ElementState state = element->ElementState();
- if (state == STATE_PAUSE) {
- element->PrepareForPause();
-
- vTaskSuspend(NULL);
+ // The element ran out of data, so now it's time to let it process more
+ // input.
+ while (!pending_events.empty()) {
+ auto event = std::move(pending_events.front());
+ pending_events.pop_front();
- // Zzzzzz...
-
- // When we wake up, skip straight to the start of the loop again.
- continue;
- } else if (state == STATE_QUIT) {
- break;
- }
+ if (event->tag == StreamEvent::STREAM_INFO) {
+ auto process_res = element->ProcessStreamInfo(*event->stream_info);
+ if (process_res.has_error()) {
+ // TODO(jacqueline)
+ ESP_LOGE(kTag, "failed to process stream info");
+ }
+ } else if (event->tag == StreamEvent::CHUNK_DATA) {
+ StreamEvent* callback = new StreamEvent();
+ callback->source = element->InputEventQueue();
+ callback->tag = StreamEvent::CHUNK_NOTIFICATION;
+ if (!xQueueSend(event->source, callback, 0)) {
+ // TODO: log? crash? hmm.
+ pending_events.push_front(std::move(event));
+ continue;
+ }
- // Signal the element to do any of its idle tasks.
- auto process_error = element->ProcessIdle();
- if (process_error.has_error()) {
- auto err = process_error.error();
- if (err == OUT_OF_DATA) {
- // If we ran out of data, then place ourselves into the pause state.
- // We will be woken up when there's something to do.
- element->ElementState(STATE_PAUSE);
- continue;
- } else {
- ESP_LOGE(kTag, "failed to process idle");
- break; // TODO.
+ auto process_chunk_res =
+ element->ProcessChunk(event->chunk_data.bytes);
+ if (process_chunk_res.has_error()) {
+ // TODO(jacqueline)
+ ESP_LOGE(kTag, "failed to process chunk");
+ continue;
+ }
}
}
}
diff --git a/src/audio/chunk.cpp b/src/audio/chunk.cpp
index b37fdfed..baf2aba5 100644
--- a/src/audio/chunk.cpp
+++ b/src/audio/chunk.cpp
@@ -14,130 +14,37 @@
namespace audio {
-static const char* kTag = "chunk";
+ChunkReader::ChunkReader(std::size_t chunk_size)
+ : raw_working_buffer_(static_cast<std::byte*>(
+ heap_caps_malloc(chunk_size * 1.5, MALLOC_CAP_SPIRAM))),
+ working_buffer_(raw_working_buffer_, chunk_size * 1.5) {}
-ChunkWriter::ChunkWriter(StreamBuffer* buffer)
- : stream_(buffer), leftover_bytes_(0) {}
-
-ChunkWriter::~ChunkWriter() {}
-
-auto ChunkWriter::Reset() -> void {
- leftover_bytes_ = 0;
-}
-
-auto ChunkWriter::WriteChunkToStream(
- std::function<size_t(cpp::span<std::byte>)> callback,
- TickType_t max_wait) -> ChunkWriteResult {
- cpp::span<std::byte> write_buffer = stream_->WriteBuffer();
- // First, write out our chunk header so we know how much space to give to
- // the callback.
- auto header_size = WriteTypeOnlyMessage(TYPE_CHUNK_HEADER, write_buffer);
- if (header_size.has_error()) {
- return CHUNK_ENCODING_ERROR;
- }
-
- // Now we can ask the callback to fill the remaining space. If the previous
- // call to this method timed out, then we may already have the data we need
- // in our write buffer.
- size_t chunk_size;
- if (leftover_bytes_ > 0) {
- chunk_size = leftover_bytes_;
- } else {
- chunk_size = std::invoke(
- callback,
- write_buffer.subspan(header_size.value(),
- write_buffer.size() - header_size.value()));
- }
-
- if (chunk_size == 0) {
- // They had nothing for us, so bail out.
- return CHUNK_OUT_OF_DATA;
- }
-
- // Try to write to the buffer. Note the return type here will be either 0 or
- // header_size + chunk_size, as MessageBuffer doesn't allow partial writes.
- size_t intended_write_size = header_size.value() + chunk_size;
- ESP_LOGI(kTag, "writing chunk of size %d", intended_write_size);
- size_t actual_write_size =
- xMessageBufferSend(stream_->Handle(), write_buffer.data(),
- intended_write_size, max_wait);
-
- if (actual_write_size == 0) {
- leftover_bytes_ = chunk_size;
- return CHUNK_WRITE_TIMEOUT;
- } else {
- leftover_bytes_ = 0;
- }
-
- return CHUNK_WRITE_OKAY;
+ChunkReader::~ChunkReader() {
+ free(raw_working_buffer_);
}
-ChunkReader::ChunkReader(StreamBuffer* stream) : stream_(stream) {}
-
-ChunkReader::~ChunkReader() {}
-
-auto ChunkReader::Reset() -> void {
+auto ChunkReader::HandleNewData(cpp::span<std::byte> data)
+ -> cpp::span<std::byte> {
+ // Copy the new data onto the front for anything that was left over from the
+ // last portion. Note: this could be optimised for the '0 leftover bytes'
+ // case, which technically shouldn't need a copy.
+ cpp::span<std::byte> new_data_dest = working_buffer_.subspan(leftover_bytes_);
+ std::copy(data.begin(), data.end(), new_data_dest.begin());
+ last_data_in_working_buffer_ =
+ working_buffer_.first(leftover_bytes_ + data.size());
leftover_bytes_ = 0;
- last_message_size_ = 0;
-}
-
-auto ChunkReader::GetLastMessage() -> cpp::span<std::byte> {
- return stream_->ReadBuffer().subspan(leftover_bytes_, last_message_size_);
+ return last_data_in_working_buffer_;
}
-auto ChunkReader::ReadChunkFromStream(
- std::function<std::optional<size_t>(cpp::span<std::byte>)> callback,
- TickType_t max_wait) -> ChunkReadResult {
- // First, wait for a message to arrive over the buffer.
- cpp::span<std::byte> new_data_dest = stream_->ReadBuffer().last(
- stream_->ReadBuffer().size() - leftover_bytes_);
- ESP_LOGI(kTag, "reading chunk of size %d", new_data_dest.size());
- last_message_size_ = xMessageBufferReceive(
- stream_->Handle(), new_data_dest.data(), new_data_dest.size(), max_wait);
-
- if (last_message_size_ == 0) {
- return CHUNK_READ_TIMEOUT;
- }
-
- cpp::span<std::byte> new_data = GetLastMessage();
- MessageType type = ReadMessageType(new_data);
-
- if (type != TYPE_CHUNK_HEADER) {
- // This message wasn't for us, so let the caller handle it.
- Reset();
- return CHUNK_STREAM_ENDED;
- }
-
- // Work the size and position of the chunk.
- auto chunk_data = GetAdditionalData(new_data);
-
- // Now we need to stick the end of the last chunk (if it exists) onto the
- // front of the new chunk. Do it this way around bc we assume the old chunk
- // is shorter, and therefore faster to move.
- cpp::span<std::byte> leftover_data =
- stream_->ReadBuffer().first(leftover_bytes_);
- cpp::span<std::byte> combined_data(chunk_data.data() - leftover_data.size(),
- leftover_data.size() + chunk_data.size());
- if (leftover_bytes_ > 0) {
- std::copy_backward(leftover_data.begin(), leftover_data.end(),
- combined_data.begin());
- }
-
- // Tell the callback about the new data.
- std::optional<size_t> amount_processed = std::invoke(callback, combined_data);
- if (!amount_processed) {
- return CHUNK_PROCESSING_ERROR;
- }
-
- // Prepare for the next iteration.
- leftover_bytes_ = combined_data.size() - amount_processed.value();
+auto ChunkReader::HandleLeftovers(std::size_t bytes_used) -> void {
+ leftover_bytes_ = last_data_in_working_buffer_.size() - bytes_used;
if (leftover_bytes_ > 0) {
- std::copy(combined_data.begin() + amount_processed.value(),
- combined_data.end(), stream_->ReadBuffer().begin());
- return CHUNK_LEFTOVER_DATA;
+ auto data_to_keep = last_data_in_working_buffer_.last(leftover_bytes_);
+ // Copy backwards, since if more than half of the data was unused then the
+ // source and destination will overlap.
+ std::copy_backward(data_to_keep.begin(), data_to_keep.end(),
+ working_buffer_.begin());
}
-
- return CHUNK_READ_OKAY;
}
} // namespace audio
diff --git a/src/audio/fatfs_audio_input.cpp b/src/audio/fatfs_audio_input.cpp
index 7f11805e..29e03784 100644
--- a/src/audio/fatfs_audio_input.cpp
+++ b/src/audio/fatfs_audio_input.cpp
@@ -1,5 +1,6 @@
#include "fatfs_audio_input.hpp"
+#include <algorithm>
#include <cstdint>
#include <memory>
#include <string>
@@ -10,33 +11,26 @@
#include "audio_element.hpp"
#include "chunk.hpp"
#include "stream_buffer.hpp"
+#include "stream_event.hpp"
#include "stream_message.hpp"
static const char* kTag = "SRC";
namespace audio {
-static const TickType_t kServiceInterval = pdMS_TO_TICKS(50);
-
-static const std::size_t kFileBufferSize = 1024 * 128;
-static const std::size_t kMinFileReadSize = 1024 * 4;
+// 32KiB to match the minimum himen region size.
+static const std::size_t kChunkSize = 1024;
FatfsAudioInput::FatfsAudioInput(std::shared_ptr<drivers::SdStorage> storage)
: IAudioElement(),
storage_(storage),
- raw_file_buffer_(static_cast<std::byte*>(
- heap_caps_malloc(kFileBufferSize, MALLOC_CAP_SPIRAM))),
- file_buffer_(raw_file_buffer_, kFileBufferSize),
- file_buffer_read_pos_(file_buffer_.begin()),
- file_buffer_write_pos_(file_buffer_.begin()),
current_file_(),
- is_file_open_(false),
- chunk_writer_(nullptr) {
- // TODO: create our chunk writer whenever the output buffer changes.
-}
+ is_file_open_(false) {}
+
+FatfsAudioInput::~FatfsAudioInput() {}
-FatfsAudioInput::~FatfsAudioInput() {
- free(raw_file_buffer_);
+auto FatfsAudioInput::HasUnprocessedInput() -> bool {
+ return is_file_open_;
}
auto FatfsAudioInput::ProcessStreamInfo(const StreamInfo& info)
@@ -57,17 +51,12 @@ auto FatfsAudioInput::ProcessStreamInfo(const StreamInfo& info)
is_file_open_ = true;
- auto write_size =
- WriteMessage(TYPE_STREAM_INFO,
- std::bind(&StreamInfo::Encode, info, std::placeholders::_1),
- output_buffer_->WriteBuffer());
+ std::unique_ptr<StreamInfo> new_info = std::make_unique<StreamInfo>(info);
+ new_info->ChunkSize(kChunkSize);
- if (write_size.has_error()) {
- return cpp::fail(IO_ERROR);
- } else {
- xMessageBufferSend(output_buffer_, output_buffer_->WriteBuffer().data(),
- write_size.value(), portMAX_DELAY);
- }
+ auto event =
+ StreamEvent::CreateStreamInfo(input_events_, std::move(new_info));
+ SendOrBufferEvent(std::move(event));
return {};
}
@@ -77,110 +66,29 @@ auto FatfsAudioInput::ProcessChunk(const cpp::span<std::byte>& chunk)
return cpp::fail(UNSUPPORTED_STREAM);
}
-auto FatfsAudioInput::GetRingBufferDistance() const -> size_t {
- if (file_buffer_read_pos_ == file_buffer_write_pos_) {
- return 0;
- }
- if (file_buffer_read_pos_ < file_buffer_write_pos_) {
- return file_buffer_write_pos_ - file_buffer_read_pos_;
- }
- return
- // Read position to end of buffer.
- (file_buffer_.end() - file_buffer_read_pos_)
- // Start of buffer to write position.
- + (file_buffer_write_pos_ - file_buffer_.begin());
-}
-
-auto FatfsAudioInput::ProcessIdle() -> cpp::result<void, AudioProcessingError> {
- // First, see if we're able to fill up the input buffer with any more of the
- // file's contents.
+auto FatfsAudioInput::Process() -> cpp::result<void, AudioProcessingError> {
if (is_file_open_) {
- size_t ringbuf_distance = GetRingBufferDistance();
- if (file_buffer_.size() - ringbuf_distance > kMinFileReadSize) {
- size_t read_size;
- if (file_buffer_write_pos_ < file_buffer_read_pos_) {
- // Don't worry about the start of buffer -> read pos size; we can get to
- // it next iteration.
- read_size = file_buffer_read_pos_ - file_buffer_write_pos_;
- } else {
- read_size = file_buffer_.begin() - file_buffer_write_pos_;
- }
-
- ESP_LOGI(kTag, "reading up to %d bytes", (int)read_size);
-
- UINT bytes_read = 0;
- FRESULT result =
- f_read(&current_file_, std::addressof(file_buffer_write_pos_),
- read_size, &bytes_read);
- if (result != FR_OK) {
- ESP_LOGE(kTag, "file I/O error %d", result);
- return cpp::fail(IO_ERROR);
- }
-
- ESP_LOGI(kTag, "actual read size %d bytes", (int)bytes_read);
-
- if (f_eof(&current_file_)) {
- f_close(&current_file_);
- is_file_open_ = false;
-
- // TODO: open the next file?
- }
-
- file_buffer_write_pos_ += bytes_read;
- if (file_buffer_write_pos_ == file_buffer_.end()) {
- file_buffer_write_pos_ = file_buffer_.begin();
- }
+ auto dest_event = StreamEvent::CreateChunkData(input_events_, kChunkSize);
+ UINT bytes_read = 0;
+
+ FRESULT result =
+ f_read(&current_file_, dest_event->chunk_data.raw_bytes.get(),
+ kChunkSize, &bytes_read);
+ if (result != FR_OK) {
+ ESP_LOGE(kTag, "file I/O error %d", result);
+ return cpp::fail(IO_ERROR);
}
- } else if (GetRingBufferDistance() == 0) {
- // We have no file open, and no data waiting to be written. We're out of
- // stuff to do, so signal a pause.
- return cpp::fail(OUT_OF_DATA);
- }
- // Now stream data into the output buffer until it's full.
- while (GetRingBufferDistance() > 0) {
- ESP_LOGI(kTag, "writing up to %d bytes", (int)GetRingBufferDistance());
- ChunkWriteResult result = chunk_writer_->WriteChunkToStream(
- [&](cpp::span<std::byte> d) { return SendChunk(d); }, kServiceInterval);
-
- switch (result) {
- case CHUNK_WRITE_OKAY:
- break;
- case CHUNK_WRITE_TIMEOUT:
- case CHUNK_OUT_OF_DATA:
- // Both of these are fine; we will pick back up where we left off in
- // the next idle call.
- return {};
- default:
- return cpp::fail(IO_ERROR);
+ dest_event->chunk_data.bytes =
+ dest_event->chunk_data.bytes.first(bytes_read);
+ SendOrBufferEvent(std::move(dest_event));
+
+ if (f_eof(&current_file_)) {
+ f_close(&current_file_);
+ is_file_open_ = false;
}
}
-
- // We've finished writing out chunks, but there may be more of the file to
- // read. Return, and begin again in the next idle call.
return {};
}
-auto FatfsAudioInput::SendChunk(cpp::span<std::byte> dest) -> size_t {
- if (file_buffer_read_pos_ == file_buffer_write_pos_) {
- return 0;
- }
- std::size_t chunk_size;
- if (file_buffer_read_pos_ > file_buffer_write_pos_) {
- chunk_size = file_buffer_.end() - file_buffer_read_pos_;
- } else {
- chunk_size = file_buffer_write_pos_ - file_buffer_read_pos_;
- }
- chunk_size = std::min(chunk_size, dest.size());
-
- cpp::span<std::byte> source(file_buffer_read_pos_, chunk_size);
- std::copy(source.begin(), source.end(), dest.begin());
-
- file_buffer_read_pos_ = file_buffer_read_pos_ + chunk_size;
- if (file_buffer_read_pos_ == file_buffer_.end()) {
- file_buffer_read_pos_ = file_buffer_.begin();
- }
- return chunk_size;
-}
-
} // namespace audio
diff --git a/src/audio/i2s_audio_output.cpp b/src/audio/i2s_audio_output.cpp
index d853a06f..d041f726 100644
--- a/src/audio/i2s_audio_output.cpp
+++ b/src/audio/i2s_audio_output.cpp
@@ -92,22 +92,12 @@ auto I2SAudioOutput::ProcessChunk(const cpp::span<std::byte>& chunk)
return dac_->WriteData(chunk, portMAX_DELAY);
}
-auto I2SAudioOutput::IdleTimeout() const -> TickType_t {
- return kIdleTimeBeforeMute;
-}
-
-auto I2SAudioOutput::ProcessIdle() -> cpp::result<void, AudioProcessingError> {
+auto I2SAudioOutput::Process() -> cpp::result<void, AudioProcessingError> {
// TODO(jacqueline): Consider powering down the dac completely maybe?
SetSoftMute(true);
return {};
}
-auto I2SAudioOutput::PrepareForPause() -> void {
- // TODO(jacqueline): We ideally want to ensure we have enough samples in the
- // DMA buffer here, so that soft mute can work properly.
- SetSoftMute(true);
-}
-
auto I2SAudioOutput::SetVolume(uint8_t volume) -> void {
volume_ = volume;
if (!is_soft_muted_) {
diff --git a/src/audio/include/audio_decoder.hpp b/src/audio/include/audio_decoder.hpp
index 0a2df76d..a2591d25 100644
--- a/src/audio/include/audio_decoder.hpp
+++ b/src/audio/include/audio_decoder.hpp
@@ -4,6 +4,7 @@
#include <cstdint>
#include <memory>
+#include "chunk.hpp"
#include "ff.h"
#include "span.hpp"
@@ -30,11 +31,13 @@ class AudioDecoder : public IAudioElement {
return 1024;
}
+ auto HasUnprocessedInput() -> bool override;
+
auto ProcessStreamInfo(const StreamInfo& info)
-> cpp::result<void, AudioProcessingError> override;
auto ProcessChunk(const cpp::span<std::byte>& chunk)
-> cpp::result<std::size_t, AudioProcessingError> override;
- auto ProcessIdle() -> cpp::result<void, AudioProcessingError> override;
+ auto Process() -> cpp::result<void, AudioProcessingError> override;
AudioDecoder(const AudioDecoder&) = delete;
AudioDecoder& operator=(const AudioDecoder&) = delete;
@@ -42,8 +45,11 @@ class AudioDecoder : public IAudioElement {
private:
std::unique_ptr<codecs::ICodec> current_codec_;
std::optional<StreamInfo> stream_info_;
+ std::optional<ChunkReader> chunk_reader_;
- std::unique_ptr<ChunkWriter> chunk_writer_;
+ std::size_t chunk_size_;
+ bool has_samples_to_send_;
+ bool needs_more_input_;
};
} // namespace audio
diff --git a/src/audio/include/audio_element.hpp b/src/audio/include/audio_element.hpp
index eb700180..8827a0c3 100644
--- a/src/audio/include/audio_element.hpp
+++ b/src/audio/include/audio_element.hpp
@@ -2,6 +2,8 @@
#include <atomic>
#include <cstdint>
+#include <deque>
+#include <memory>
#include "freertos/FreeRTOS.h"
@@ -12,6 +14,7 @@
#include "span.hpp"
#include "stream_buffer.hpp"
+#include "stream_event.hpp"
#include "stream_info.hpp"
#include "types.hpp"
@@ -36,6 +39,8 @@ enum AudioProcessingError {
OUT_OF_DATA,
};
+static const size_t kEventQueueSize = 8;
+
/*
* One indepedentent part of an audio pipeline. Each element has an input and
* output message stream, and is responsible for taking data from the input
@@ -51,38 +56,38 @@ enum AudioProcessingError {
*/
class IAudioElement {
public:
- IAudioElement()
- : input_buffer_(nullptr), output_buffer_(nullptr), state_(STATE_RUN) {}
- virtual ~IAudioElement() {}
+ IAudioElement();
+ virtual ~IAudioElement();
/*
* Returns the stack size in bytes that this element requires. This should
* be tuned according to the observed stack size of each element, as different
- * elements have fairly different stack requirements.
+ * elements have fairly different stack requirements (particular decoders).
*/
virtual auto StackSizeBytes() const -> std::size_t { return 2048; };
- /*
- * How long to wait for new data on the input stream before triggering a call
- * to ProcessIdle(). If this is portMAX_DELAY (the default), then ProcessIdle
- * will never be called.
- */
- virtual auto IdleTimeout() const -> TickType_t { return 10; }
-
virtual auto InputMinChunkSize() const -> std::size_t { return 0; }
/* Returns this element's input buffer. */
- auto InputBuffer() const -> StreamBuffer* { return input_buffer_; }
+ auto InputEventQueue() const -> QueueHandle_t { return input_events_; }
/* Returns this element's output buffer. */
- auto OutputBuffer() const -> StreamBuffer* { return output_buffer_; }
+ auto OutputEventQueue() const -> QueueHandle_t { return output_events_; }
+
+ auto OutputEventQueue(const QueueHandle_t q) -> void { output_events_ = q; }
- auto InputBuffer(StreamBuffer* b) -> void { input_buffer_ = b; }
+ auto HasUnflushedOutput() -> bool { return !buffered_output_.empty(); }
- auto OutputBuffer(StreamBuffer* b) -> void { output_buffer_ = b; }
+ virtual auto HasUnprocessedInput() -> bool = 0;
- auto ElementState() const -> ElementState { return state_; }
- auto ElementState(enum ElementState e) -> void { state_ = e; }
+ auto IsOverBuffered() -> bool { return unprocessed_output_chunks_ > 4; }
+
+ auto FlushBufferedOutput() -> bool;
+
+ auto ElementState() const -> ElementState { return current_state_; }
+ auto ElementState(enum ElementState e) -> void { current_state_ = e; }
+
+ virtual auto OnChunkProcessed() -> void { unprocessed_output_chunks_--; }
/*
* Called when a StreamInfo message is received. Used to configure this
@@ -105,14 +110,26 @@ class IAudioElement {
* time. This could be used to synthesize output, or to save memory by
* releasing unused resources.
*/
- virtual auto ProcessIdle() -> cpp::result<void, AudioProcessingError> = 0;
-
- virtual auto PrepareForPause() -> void{};
+ virtual auto Process() -> cpp::result<void, AudioProcessingError> = 0;
protected:
- StreamBuffer* input_buffer_;
- StreamBuffer* output_buffer_;
- std::atomic<enum ElementState> state_;
+ auto SendOrBufferEvent(std::unique_ptr<StreamEvent> event) -> bool;
+
+ // Queue for events coming into this element. Owned by us.
+ QueueHandle_t input_events_;
+ // Queue for events going into the next element. Not owned by us, may be null
+ // if we're not yet in a pipeline.
+ // FIXME: it would be nicer if this was non-nullable.
+ QueueHandle_t output_events_;
+
+ // The number of output chunks that we have generated, but have not yet been
+ // processed by the next element in the pipeline. This includes any chunks
+ // that are currently help in buffered_output_.
+ int unprocessed_output_chunks_;
+ // Output events that have been generated, but are yet to be sent downstream.
+ std::deque<std::unique_ptr<StreamEvent>> buffered_output_;
+
+ enum ElementState current_state_;
};
} // namespace audio
diff --git a/src/audio/include/audio_playback.hpp b/src/audio/include/audio_playback.hpp
index bffc3f02..f05ca327 100644
--- a/src/audio/include/audio_playback.hpp
+++ b/src/audio/include/audio_playback.hpp
@@ -43,11 +43,9 @@ class AudioPlayback {
private:
auto ConnectElements(IAudioElement* src, IAudioElement* sink) -> void;
-
- StreamBuffer stream_start_;
- StreamBuffer stream_end_;
- std::vector<std::unique_ptr<StreamBuffer>> element_buffers_;
std::vector<std::unique_ptr<AudioElementHandle>> element_handles_;
+
+ QueueHandle_t input_handle_;
};
} // namespace audio
diff --git a/src/audio/include/chunk.hpp b/src/audio/include/chunk.hpp
index 5c7e73dd..6154ab25 100644
--- a/src/audio/include/chunk.hpp
+++ b/src/audio/include/chunk.hpp
@@ -2,6 +2,7 @@
#include <cstddef>
#include <cstdint>
+#include <memory>
#include <optional>
#include <string>
@@ -17,68 +18,12 @@
namespace audio {
-enum ChunkWriteResult {
- // Returned when the callback does not write any data.
- CHUNK_WRITE_OKAY,
- // Returned when the callback does not write any data.
- CHUNK_OUT_OF_DATA,
- // Returned when there is an error encoding a chunk header using cbor.
- CHUNK_ENCODING_ERROR,
- // Returned when max_wait expires without room in the stream buffer becoming
- // available.
- CHUNK_WRITE_TIMEOUT,
-};
-
-class ChunkWriter {
- public:
- explicit ChunkWriter(StreamBuffer* buffer);
- ~ChunkWriter();
-
- auto Reset() -> void;
-
- auto GetLastMessage() -> cpp::span<std::byte>;
-
- /*
- * Invokes the given callback to receive data, breaks the received data up
- * into chunks with headers, and writes those chunks to the given output
- * stream.
- *
- * The callback will be invoked with a byte buffer and its size. The callback
- * should write as much data as it can to this buffer, and then return the
- * number of bytes it wrote. Return a value of 0 to indicate that there is no
- * more input to read.
- */
- auto WriteChunkToStream(std::function<size_t(cpp::span<std::byte>)> callback,
- TickType_t max_wait) -> ChunkWriteResult;
-
- private:
- StreamBuffer* stream_;
- std::size_t leftover_bytes_ = 0;
-};
-
-enum ChunkReadResult {
- CHUNK_READ_OKAY,
- // Returned when the chunk was read successfully, but the consumer did not
- // use all of the data.
- CHUNK_LEFTOVER_DATA,
- // Returned an error in parsing the cbor-encoded header.
- CHUNK_DECODING_ERROR,
- // Returned when max_wait expired before any data was read.
- CHUNK_READ_TIMEOUT,
- // Returned when a non-chunk message is received.
- CHUNK_STREAM_ENDED,
- // Returned when the processing callback does not return a value.
- CHUNK_PROCESSING_ERROR,
-};
-
class ChunkReader {
public:
- explicit ChunkReader(StreamBuffer* buffer);
+ explicit ChunkReader(std::size_t chunk_size);
~ChunkReader();
- auto Reset() -> void;
-
- auto GetLastMessage() -> cpp::span<std::byte>;
+ auto HandleLeftovers(std::size_t bytes_used) -> void;
/*
* Reads chunks of data from the given input stream, and invokes the given
@@ -92,14 +37,13 @@ class ChunkReader {
* If this function encounters a message in the stream that is not a chunk, it
* will place the message at the start of the working_buffer and then return.
*/
- auto ReadChunkFromStream(
- std::function<std::optional<std::size_t>(cpp::span<std::byte>)> callback,
- TickType_t max_wait) -> ChunkReadResult;
+ auto HandleNewData(cpp::span<std::byte> data) -> cpp::span<std::byte>;
private:
- StreamBuffer* stream_;
+ std::byte* raw_working_buffer_;
+ cpp::span<std::byte> working_buffer_;
+ cpp::span<std::byte> last_data_in_working_buffer_;
std::size_t leftover_bytes_ = 0;
- std::size_t last_message_size_ = 0;
};
} // namespace audio
diff --git a/src/audio/include/fatfs_audio_input.hpp b/src/audio/include/fatfs_audio_input.hpp
index f3704f1d..5625d941 100644
--- a/src/audio/include/fatfs_audio_input.hpp
+++ b/src/audio/include/fatfs_audio_input.hpp
@@ -22,32 +22,22 @@ class FatfsAudioInput : public IAudioElement {
explicit FatfsAudioInput(std::shared_ptr<drivers::SdStorage> storage);
~FatfsAudioInput();
+ auto HasUnprocessedInput() -> bool override;
+
auto ProcessStreamInfo(const StreamInfo& info)
-> cpp::result<void, AudioProcessingError> override;
auto ProcessChunk(const cpp::span<std::byte>& chunk)
-> cpp::result<std::size_t, AudioProcessingError> override;
- auto ProcessIdle() -> cpp::result<void, AudioProcessingError> override;
-
- auto SendChunk(cpp::span<std::byte> dest) -> size_t;
+ auto Process() -> cpp::result<void, AudioProcessingError> override;
FatfsAudioInput(const FatfsAudioInput&) = delete;
FatfsAudioInput& operator=(const FatfsAudioInput&) = delete;
private:
- auto GetRingBufferDistance() const -> size_t;
-
std::shared_ptr<drivers::SdStorage> storage_;
- std::byte* raw_file_buffer_;
- cpp::span<std::byte> file_buffer_;
- cpp::span<std::byte>::iterator file_buffer_read_pos_;
- cpp::span<std::byte>::iterator pending_read_pos_;
- cpp::span<std::byte>::iterator file_buffer_write_pos_;
-
FIL current_file_;
bool is_file_open_;
-
- std::unique_ptr<ChunkWriter> chunk_writer_;
};
} // namespace audio
diff --git a/src/audio/include/i2s_audio_output.hpp b/src/audio/include/i2s_audio_output.hpp
index 75a3be76..4fbcad49 100644
--- a/src/audio/include/i2s_audio_output.hpp
+++ b/src/audio/include/i2s_audio_output.hpp
@@ -22,19 +22,14 @@ class I2SAudioOutput : public IAudioElement {
std::unique_ptr<drivers::AudioDac> dac);
~I2SAudioOutput();
- auto InputMinChunkSize() const -> std::size_t override {
- // TODO(jacqueline): work out a good value here. Maybe similar to the total
- // DMA buffer size?
- return 128;
- }
+ // TODO.
+ auto HasUnprocessedInput() -> bool override { return false; }
- auto IdleTimeout() const -> TickType_t override;
auto ProcessStreamInfo(const StreamInfo& info)
-> cpp::result<void, AudioProcessingError> override;
auto ProcessChunk(const cpp::span<std::byte>& chunk)
-> cpp::result<std::size_t, AudioProcessingError> override;
- auto ProcessIdle() -> cpp::result<void, AudioProcessingError> override;
- auto PrepareForPause() -> void override;
+ auto Process() -> cpp::result<void, AudioProcessingError> override;
I2SAudioOutput(const I2SAudioOutput&) = delete;
I2SAudioOutput& operator=(const I2SAudioOutput&) = delete;
diff --git a/src/audio/include/stream_event.hpp b/src/audio/include/stream_event.hpp
new file mode 100644
index 00000000..4dfdab41
--- /dev/null
+++ b/src/audio/include/stream_event.hpp
@@ -0,0 +1,57 @@
+#pragma once
+
+#include <memory>
+
+#include "freertos/FreeRTOS.h"
+#include "freertos/queue.h"
+
+#include "span.hpp"
+#include "stream_info.hpp"
+
+namespace audio {
+
+struct StreamEvent {
+ static auto CreateStreamInfo(QueueHandle_t source,
+ std::unique_ptr<StreamInfo> payload)
+ -> std::unique_ptr<StreamEvent>;
+ static auto CreateChunkData(QueueHandle_t source, std::size_t chunk_size)
+ -> std::unique_ptr<StreamEvent>;
+ static auto CreateChunkNotification(QueueHandle_t source)
+ -> std::unique_ptr<StreamEvent>;
+
+ StreamEvent();
+ ~StreamEvent();
+ StreamEvent(StreamEvent&&);
+
+ QueueHandle_t source;
+
+ enum {
+ UNINITIALISED,
+ STREAM_INFO,
+ CHUNK_DATA,
+ CHUNK_NOTIFICATION,
+ } tag;
+
+ union {
+ std::unique_ptr<StreamInfo> stream_info;
+
+ // Scott Meyers says:
+ // `About the only situation I can conceive of when a std::unique_ptr<T[]>
+ // would make sense would be when you’re using a C-like API that returns a
+ // raw pointer to a heap array that you assume ownership of.`
+ // :-)
+
+ struct {
+ std::unique_ptr<std::byte*> raw_bytes;
+ cpp::span<std::byte> bytes;
+ } chunk_data;
+
+ // FIXME: It would be nice to also support a pointer to himem data here, to
+ // save a little ordinary heap space.
+ };
+
+ StreamEvent(const StreamEvent&) = delete;
+ StreamEvent& operator=(const StreamEvent&) = delete;
+};
+
+} // namespace audio
diff --git a/src/audio/include/stream_info.hpp b/src/audio/include/stream_info.hpp
index 45f10fc6..ed3096bb 100644
--- a/src/audio/include/stream_info.hpp
+++ b/src/audio/include/stream_info.hpp
@@ -7,6 +7,7 @@
#include "cbor.h"
#include "result.hpp"
+#include "sys/_stdint.h"
namespace audio {
@@ -24,14 +25,24 @@ class StreamInfo {
auto Channels() const -> const std::optional<uint8_t>& { return channels_; }
+ auto BitsPerSample(uint8_t bpp) -> void { bits_per_sample_ = bpp; }
+
auto BitsPerSample() const -> const std::optional<uint8_t>& {
return bits_per_sample_;
}
+ auto SampleRate(uint16_t rate) -> void { sample_rate_ = rate; }
+
auto SampleRate() const -> const std::optional<uint16_t>& {
return sample_rate_;
}
+ auto ChunkSize() const -> const std::optional<std::size_t>& {
+ return chunk_size_;
+ }
+
+ auto ChunkSize(std::size_t s) -> void { chunk_size_ = s; }
+
auto Encode(CborEncoder& enc) -> std::optional<CborError>;
private:
@@ -39,6 +50,7 @@ class StreamInfo {
std::optional<uint8_t> channels_;
std::optional<uint8_t> bits_per_sample_;
std::optional<uint16_t> sample_rate_;
+ std::optional<size_t> chunk_size_;
};
} // namespace audio
diff --git a/src/audio/stream_buffer.cpp b/src/audio/stream_buffer.cpp
index 9bdeaa72..71ab3965 100644
--- a/src/audio/stream_buffer.cpp
+++ b/src/audio/stream_buffer.cpp
@@ -17,10 +17,11 @@ StreamBuffer::StreamBuffer(std::size_t chunk_size, std::size_t buffer_size)
raw_output_chunk_(static_cast<std::byte*>(
heap_caps_malloc(chunk_size, MALLOC_CAP_SPIRAM))),
output_chunk_(raw_output_chunk_, chunk_size) {
- assert(input_chunk_.size() <= buffer_size);
- assert(output_chunk_.size() <= buffer_size);
- ESP_LOGI("streambuf", "created buffer of chunk size %d, total size %d", chunk_size, buffer_size);
- }
+ assert(input_chunk_.size() <= buffer_size);
+ assert(output_chunk_.size() <= buffer_size);
+ ESP_LOGI("streambuf", "created buffer of chunk size %d, total size %d",
+ chunk_size, buffer_size);
+}
StreamBuffer::~StreamBuffer() {
vMessageBufferDelete(handle_);
diff --git a/src/audio/stream_event.cpp b/src/audio/stream_event.cpp
new file mode 100644
index 00000000..0a116297
--- /dev/null
+++ b/src/audio/stream_event.cpp
@@ -0,0 +1,75 @@
+#include "stream_event.hpp"
+#include <cstddef>
+#include <memory>
+
+namespace audio {
+
+auto StreamEvent::CreateStreamInfo(QueueHandle_t source,
+ std::unique_ptr<StreamInfo> payload)
+ -> std::unique_ptr<StreamEvent> {
+ auto event = std::make_unique<StreamEvent>();
+ event->tag = StreamEvent::STREAM_INFO;
+ event->source = source;
+ event->stream_info = std::move(payload);
+ return event;
+}
+
+auto StreamEvent::CreateChunkData(QueueHandle_t source, std::size_t chunk_size)
+ -> std::unique_ptr<StreamEvent> {
+ auto event = std::make_unique<StreamEvent>();
+ event->tag = StreamEvent::CHUNK_DATA;
+ event->source = source;
+
+ auto raw_bytes =
+ static_cast<std::byte*>(heap_caps_malloc(chunk_size, MALLOC_CAP_SPIRAM));
+
+ event->chunk_data.raw_bytes = std::make_unique<std::byte*>(raw_bytes);
+ event->chunk_data.bytes = cpp::span<std::byte>(raw_bytes, chunk_size);
+
+ return event;
+}
+
+auto StreamEvent::CreateChunkNotification(QueueHandle_t source)
+ -> std::unique_ptr<StreamEvent> {
+ auto event = std::make_unique<StreamEvent>();
+ event->tag = StreamEvent::CHUNK_NOTIFICATION;
+ event->source = source;
+ return event;
+}
+
+StreamEvent::StreamEvent() : tag(StreamEvent::UNINITIALISED) {}
+
+StreamEvent::~StreamEvent() {
+ switch (tag) {
+ case UNINITIALISED:
+ break;
+ case STREAM_INFO:
+ stream_info.reset();
+ break;
+ case CHUNK_DATA:
+ chunk_data.raw_bytes.reset();
+ break;
+ case CHUNK_NOTIFICATION:
+ break;
+ }
+}
+
+StreamEvent::StreamEvent(StreamEvent&& other) {
+ tag = other.tag;
+ source = other.source;
+ switch (tag) {
+ case UNINITIALISED:
+ break;
+ case STREAM_INFO:
+ stream_info = std::move(other.stream_info);
+ break;
+ case CHUNK_DATA:
+ chunk_data = std::move(other.chunk_data);
+ break;
+ case CHUNK_NOTIFICATION:
+ break;
+ }
+ other.tag = StreamEvent::UNINITIALISED;
+}
+
+} // namespace audio
diff --git a/src/audio/stream_message.cpp b/src/audio/stream_message.cpp
index 055e7e96..0f3b56e9 100644
--- a/src/audio/stream_message.cpp
+++ b/src/audio/stream_message.cpp
@@ -3,7 +3,6 @@
#include <cstdint>
#include "cbor.h"
-#include "esp-idf/components/cbor/tinycbor/src/cbor.h"
#include "span.hpp"
namespace audio {