summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorjacqueline <me@jacqueline.id.au>2023-01-12 14:28:52 +1100
committerjacqueline <me@jacqueline.id.au>2023-01-12 14:28:52 +1100
commit2056cad0ab7b805f0ed5629b100b50f8ea9e127e (patch)
tree1e8385d48e18551240e9ef9683b8696292f8d760 /src
parent01be69eca1fa89c047fc29f5cb0ea8ba0898dad1 (diff)
downloadtangara-fw-2056cad0ab7b805f0ed5629b100b50f8ea9e127e.tar.gz
WIP
Diffstat (limited to 'src')
-rw-r--r--src/audio/audio_decoder.cpp62
-rw-r--r--src/audio/audio_playback.cpp10
-rw-r--r--src/audio/audio_task.cpp43
-rw-r--r--src/audio/chunk.cpp78
-rw-r--r--src/audio/fatfs_audio_input.cpp36
-rw-r--r--src/audio/i2s_audio_output.cpp6
-rw-r--r--src/audio/include/audio_decoder.hpp2
-rw-r--r--src/audio/include/audio_element.hpp18
-rw-r--r--src/audio/include/audio_playback.hpp9
-rw-r--r--src/audio/include/audio_task.hpp4
-rw-r--r--src/audio/include/chunk.hpp40
-rw-r--r--src/audio/include/fatfs_audio_input.hpp2
-rw-r--r--src/audio/include/i2s_audio_output.hpp1
-rw-r--r--src/audio/include/stream_buffer.hpp22
14 files changed, 233 insertions, 100 deletions
diff --git a/src/audio/audio_decoder.cpp b/src/audio/audio_decoder.cpp
index 872b7ead..88ddc323 100644
--- a/src/audio/audio_decoder.cpp
+++ b/src/audio/audio_decoder.cpp
@@ -57,35 +57,45 @@ auto AudioDecoder::ProcessChunk(const cpp::span<std::byte>& chunk)
bool has_samples_to_send = false;
bool needs_more_input = false;
std::optional<codecs::ICodec::ProcessingError> error = std::nullopt;
- WriteChunksToStream(
- output_buffer_,
- [&](cpp::span<std::byte> buffer) -> std::size_t {
- std::size_t bytes_written = 0;
- // Continue filling up the output buffer so long as we have samples
- // leftover, or are able to synthesize more samples from the input.
- while (has_samples_to_send || !needs_more_input) {
- if (!has_samples_to_send) {
- auto result = current_codec_->ProcessNextFrame();
- has_samples_to_send = true;
- if (result.has_error()) {
- error = result.error();
- // End our output stream immediately if the codec barfed.
- return 0;
+ while (1) {
+ ChunkWriteResult res = chunk_writer_.WriteChunkToStream(
+ [&](cpp::span<std::byte> buffer) -> std::size_t {
+ std::size_t bytes_written = 0;
+ // Continue filling up the output buffer so long as we have samples
+ // leftover, or are able to synthesize more samples from the input.
+ while (has_samples_to_send || !needs_more_input) {
+ if (!has_samples_to_send) {
+ auto result = current_codec_->ProcessNextFrame();
+ has_samples_to_send = true;
+ if (result.has_error()) {
+ error = result.error();
+ // End our output stream immediately if the codec barfed.
+ return 0;
+ } else {
+ needs_more_input = result.value();
+ }
} else {
- needs_more_input = result.value();
+ auto result = current_codec_->WriteOutputSamples(
+ buffer.last(buffer.size() - bytes_written));
+ bytes_written += result.first;
+ has_samples_to_send = !result.second;
}
- } else {
- auto result = current_codec_->WriteOutputSamples(
- buffer.last(buffer.size() - bytes_written));
- bytes_written += result.first;
- has_samples_to_send = !result.second;
}
- }
- return bytes_written;
- },
- // This element doesn't support any kind of out of band commands, so we
- // can just suspend the whole task if the output buffer fills up.
- portMAX_DELAY);
+ return bytes_written;
+ },
+ // TODO
+ portMAX_DELAY);
+
+ switch (res) {
+ case CHUNK_WRITE_OKAY:
+ break;
+ case CHUNK_WRITE_TIMEOUT:
+ case CHUNK_OUT_OF_DATA:
+ return {};
+ default:
+ return cpp::fail(IO_ERROR);
+ }
+ }
if (error) {
ESP_LOGE(kTag, "Codec encountered error %d", error.value());
diff --git a/src/audio/audio_playback.cpp b/src/audio/audio_playback.cpp
index 7b8418d7..bcc3ad04 100644
--- a/src/audio/audio_playback.cpp
+++ b/src/audio/audio_playback.cpp
@@ -43,9 +43,9 @@ auto AudioPlayback::create(drivers::GpioExpander* expander,
playback->ConnectElements(codec.get(), sink.get());
// Launch!
- StartAudioTask("src", source);
- StartAudioTask("dec", codec);
- StartAudioTask("sink", sink);
+ playback->element_handles_.push_back(StartAudioTask("src", source));
+ playback->element_handles_.push_back(StartAudioTask("dec", codec));
+ playback->element_handles_.push_back(StartAudioTask("sink", sink));
return playback;
}
@@ -55,7 +55,9 @@ AudioPlayback::AudioPlayback()
: stream_start_(128, 128), stream_end_(128, 128) {}
AudioPlayback::~AudioPlayback() {
- // TODO(jacqueline): signal the end of all things, and maybe wait for it?
+ for (auto& element : element_handles_) {
+ element->Quit();
+ }
}
auto AudioPlayback::Play(const std::string& filename) -> void {
diff --git a/src/audio/audio_task.cpp b/src/audio/audio_task.cpp
index 3512c96f..079ae852 100644
--- a/src/audio/audio_task.cpp
+++ b/src/audio/audio_task.cpp
@@ -3,9 +3,12 @@
#include <stdlib.h>
#include <cstdint>
+#include <memory>
+#include "audio_element_handle.hpp"
#include "cbor.h"
#include "esp_heap_caps.h"
+#include "esp_log.h"
#include "freertos/portmacro.h"
#include "freertos/queue.h"
#include "freertos/stream_buffer.h"
@@ -20,10 +23,17 @@
namespace audio {
auto StartAudioTask(const std::string& name,
- std::shared_ptr<IAudioElement> element) -> void {
+ std::shared_ptr<IAudioElement> element)
+ -> std::unique_ptr<AudioElementHandle> {
+ auto task_handle = std::make_unique<TaskHandle_t>();
+
+ // Newly created task will free this.
AudioTaskArgs* args = new AudioTaskArgs{.element = element};
+
xTaskCreate(&AudioTaskMain, name.c_str(), element->StackSizeBytes(), args,
- kTaskPriorityAudio, NULL);
+ kTaskPriorityAudio, task_handle.get());
+
+ return std::make_unique<AudioElementHandle>(std::move(task_handle), element);
}
void AudioTaskMain(void* args) {
@@ -32,9 +42,16 @@ void AudioTaskMain(void* args) {
std::shared_ptr<IAudioElement> element = std::move(real_args->element);
delete real_args;
+ char tag[] = "task";
ChunkReader chunk_reader = ChunkReader(element->InputBuffer());
- while (1) {
+ while (element->ElementState() != STATE_QUIT) {
+ if (element->ElementState() == STATE_PAUSE) {
+ // TODO: park with a condition variable or something?
+ vTaskDelay(100);
+ continue;
+ }
+
cpp::result<size_t, AudioProcessingError> process_res;
// If this element has an input stream, then our top priority is
@@ -54,6 +71,7 @@ void AudioTaskMain(void* args) {
if (chunk_res == CHUNK_PROCESSING_ERROR ||
chunk_res == CHUNK_DECODING_ERROR) {
+ ESP_LOGE(tag, "failed to process chunk");
break; // TODO.
} else if (chunk_res == CHUNK_STREAM_ENDED) {
has_received_message = true;
@@ -65,21 +83,36 @@ void AudioTaskMain(void* args) {
if (type == TYPE_STREAM_INFO) {
auto parse_res = ReadMessage<StreamInfo>(&StreamInfo::Parse, message);
if (parse_res.has_error()) {
+ ESP_LOGE(tag, "failed to parse stream info");
break; // TODO.
}
auto info_res = element->ProcessStreamInfo(parse_res.value());
if (info_res.has_error()) {
+ ESP_LOGE(tag, "failed to process stream info");
break; // TODO.
}
}
}
- // TODO: Do any out of band reading, such a a pause command, here.
-
// Chunk reading must have timed out, or we don't have an input stream.
+ ElementState state = element->ElementState();
+ if (state == STATE_PAUSE) {
+ element->PrepareForPause();
+
+ vTaskSuspend(NULL);
+
+ // Zzzzzz...
+
+ // When we wake up, skip straight to the start of the loop again.
+ continue;
+ } else if (state == STATE_QUIT) {
+ break;
+ }
+
// Signal the element to do any of its idle tasks.
auto process_error = element->ProcessIdle();
if (process_error.has_error()) {
+ ESP_LOGE(tag, "failed to process idle");
break; // TODO.
}
}
diff --git a/src/audio/chunk.cpp b/src/audio/chunk.cpp
index fbd795d9..5f8f3148 100644
--- a/src/audio/chunk.cpp
+++ b/src/audio/chunk.cpp
@@ -13,42 +13,58 @@
namespace audio {
-auto WriteChunksToStream(StreamBuffer* stream,
- std::function<size_t(cpp::span<std::byte>)> callback,
- TickType_t max_wait) -> ChunkWriteResult {
- cpp::span<std::byte> write_buffer = stream->WriteBuffer();
- while (1) {
- // First, write out our chunk header so we know how much space to give to
- // the callback.
- auto header_size = WriteTypeOnlyMessage(TYPE_CHUNK_HEADER, write_buffer);
- if (header_size.has_error()) {
- return CHUNK_ENCODING_ERROR;
- }
-
- // Now we can ask the callback to fill the remaining space.
- size_t chunk_size = std::invoke(
+ChunkWriter::ChunkWriter(StreamBuffer* buffer)
+ : stream_(buffer), leftover_bytes_(0) {}
+
+ChunkWriter::~ChunkWriter() {}
+
+auto ChunkWriter::Reset() -> void {
+ leftover_bytes_ = 0;
+}
+
+auto ChunkWriter::WriteChunkToStream(
+ std::function<size_t(cpp::span<std::byte>)> callback,
+ TickType_t max_wait) -> ChunkWriteResult {
+ cpp::span<std::byte> write_buffer = stream_->WriteBuffer();
+ // First, write out our chunk header so we know how much space to give to
+ // the callback.
+ auto header_size = WriteTypeOnlyMessage(TYPE_CHUNK_HEADER, write_buffer);
+ if (header_size.has_error()) {
+ return CHUNK_ENCODING_ERROR;
+ }
+
+ // Now we can ask the callback to fill the remaining space. If the previous
+ // call to this method timed out, then we may already have the data we need
+ // in our write buffer.
+ size_t chunk_size;
+ if (leftover_bytes_ > 0) {
+ chunk_size = leftover_bytes_;
+ } else {
+ chunk_size = std::invoke(
callback,
write_buffer.subspan(header_size.value(),
write_buffer.size() - header_size.value()));
+ }
- if (chunk_size == 0) {
- // They had nothing for us, so bail out.
- return CHUNK_OUT_OF_DATA;
- }
-
- // Try to write to the buffer. Note the return type here will be either 0 or
- // header_size + chunk_size, as MessageBuffer doesn't allow partial writes.
- size_t actual_write_size =
- xMessageBufferSend(stream->Handle(), write_buffer.data(),
- header_size.value() + chunk_size, max_wait);
-
- if (actual_write_size == 0) {
- // We failed to write in time, so bail out. This is techinically data loss
- // unless the caller wants to go and parse our working buffer, but we
- // assume the caller has a good reason to time us out.
- return CHUNK_WRITE_TIMEOUT;
- }
+ if (chunk_size == 0) {
+ // They had nothing for us, so bail out.
+ return CHUNK_OUT_OF_DATA;
}
+
+ // Try to write to the buffer. Note the return type here will be either 0 or
+ // header_size + chunk_size, as MessageBuffer doesn't allow partial writes.
+ size_t actual_write_size =
+ xMessageBufferSend(stream_->Handle(), write_buffer.data(),
+ header_size.value() + chunk_size, max_wait);
+
+ if (actual_write_size == 0) {
+ leftover_bytes_ = chunk_size;
+ return CHUNK_WRITE_TIMEOUT;
+ } else {
+ leftover_bytes_ = 0;
+ }
+
+ return CHUNK_WRITE_OKAY;
}
ChunkReader::ChunkReader(StreamBuffer* stream) : stream_(stream) {}
diff --git a/src/audio/fatfs_audio_input.cpp b/src/audio/fatfs_audio_input.cpp
index 9e8c5243..bc5be42a 100644
--- a/src/audio/fatfs_audio_input.cpp
+++ b/src/audio/fatfs_audio_input.cpp
@@ -126,25 +126,25 @@ auto FatfsAudioInput::ProcessIdle() -> cpp::result<void, AudioProcessingError> {
}
// Now stream data into the output buffer until it's full.
- pending_read_pos_ = file_buffer_read_pos_;
- ChunkWriteResult result = WriteChunksToStream(
- output_buffer_, [&](cpp::span<std::byte> d) { return SendChunk(d); },
- kServiceInterval);
-
- switch (result) {
- case CHUNK_WRITE_TIMEOUT:
- case CHUNK_OUT_OF_DATA:
- // Both of these are fine; SendChunk keeps track of where it's up to
- // internally, so we will pick back up where we left off.
- return {};
- default:
- return cpp::fail(IO_ERROR);
+ while (1) {
+ ChunkWriteResult result = chunk_writer_.WriteChunkToStream(
+ [&](cpp::span<std::byte> d) { return SendChunk(d); }, kServiceInterval);
+
+ switch (result) {
+ case CHUNK_WRITE_OKAY:
+ break;
+ case CHUNK_WRITE_TIMEOUT:
+ case CHUNK_OUT_OF_DATA:
+ // Both of these are fine; we will pick back up where we left off in
+ // the next idle call.
+ return {};
+ default:
+ return cpp::fail(IO_ERROR);
+ }
}
}
auto FatfsAudioInput::SendChunk(cpp::span<std::byte> dest) -> size_t {
- file_buffer_read_pos_ = pending_read_pos_;
-
if (file_buffer_read_pos_ == file_buffer_write_pos_) {
return 0;
}
@@ -159,9 +159,9 @@ auto FatfsAudioInput::SendChunk(cpp::span<std::byte> dest) -> size_t {
cpp::span<std::byte> source(file_buffer_read_pos_, chunk_size);
std::copy(source.begin(), source.end(), dest.begin());
- pending_read_pos_ = file_buffer_read_pos_ + chunk_size;
- if (pending_read_pos_ == file_buffer_.end()) {
- pending_read_pos_ = file_buffer_.begin();
+ file_buffer_read_pos_ = file_buffer_read_pos_ + chunk_size;
+ if (file_buffer_read_pos_ == file_buffer_.end()) {
+ file_buffer_read_pos_ = file_buffer_.begin();
}
return chunk_size;
}
diff --git a/src/audio/i2s_audio_output.cpp b/src/audio/i2s_audio_output.cpp
index a51d6aa5..d853a06f 100644
--- a/src/audio/i2s_audio_output.cpp
+++ b/src/audio/i2s_audio_output.cpp
@@ -102,6 +102,12 @@ auto I2SAudioOutput::ProcessIdle() -> cpp::result<void, AudioProcessingError> {
return {};
}
+auto I2SAudioOutput::PrepareForPause() -> void {
+ // TODO(jacqueline): We ideally want to ensure we have enough samples in the
+ // DMA buffer here, so that soft mute can work properly.
+ SetSoftMute(true);
+}
+
auto I2SAudioOutput::SetVolume(uint8_t volume) -> void {
volume_ = volume;
if (!is_soft_muted_) {
diff --git a/src/audio/include/audio_decoder.hpp b/src/audio/include/audio_decoder.hpp
index eaef2f8c..9c0626db 100644
--- a/src/audio/include/audio_decoder.hpp
+++ b/src/audio/include/audio_decoder.hpp
@@ -42,6 +42,8 @@ class AudioDecoder : public IAudioElement {
private:
std::unique_ptr<codecs::ICodec> current_codec_;
std::optional<StreamInfo> stream_info_;
+
+ ChunkWriter chunk_writer_;
};
} // namespace audio
diff --git a/src/audio/include/audio_element.hpp b/src/audio/include/audio_element.hpp
index 590889bd..ec6d6e80 100644
--- a/src/audio/include/audio_element.hpp
+++ b/src/audio/include/audio_element.hpp
@@ -1,10 +1,11 @@
#pragma once
+#include <atomic>
#include <cstdint>
-#include "chunk.hpp"
#include "freertos/FreeRTOS.h"
+#include "chunk.hpp"
#include "freertos/message_buffer.h"
#include "freertos/portmacro.h"
#include "result.hpp"
@@ -16,6 +17,12 @@
namespace audio {
+enum ElementState {
+ STATE_RUN,
+ STATE_PAUSE,
+ STATE_QUIT,
+};
+
/*
* Errors that may be returned by any of the Process* methods of an audio
* element.
@@ -42,7 +49,8 @@ enum AudioProcessingError {
*/
class IAudioElement {
public:
- IAudioElement() : input_buffer_(nullptr), output_buffer_(nullptr) {}
+ IAudioElement()
+ : input_buffer_(nullptr), output_buffer_(nullptr), state_(STATE_RUN) {}
virtual ~IAudioElement() {}
/*
@@ -71,6 +79,9 @@ class IAudioElement {
auto OutputBuffer(StreamBuffer* b) -> void { output_buffer_ = b; }
+ auto ElementState() const -> ElementState { return state_; }
+ auto ElementState(enum ElementState e) -> void { state_ = e; }
+
/*
* Called when a StreamInfo message is received. Used to configure this
* element in preperation for incoming chunks.
@@ -94,9 +105,12 @@ class IAudioElement {
*/
virtual auto ProcessIdle() -> cpp::result<void, AudioProcessingError> = 0;
+ virtual auto PrepareForPause() -> void{};
+
protected:
StreamBuffer* input_buffer_;
StreamBuffer* output_buffer_;
+ std::atomic<enum ElementState> state_;
};
} // namespace audio
diff --git a/src/audio/include/audio_playback.hpp b/src/audio/include/audio_playback.hpp
index 9a7c5fc0..bffc3f02 100644
--- a/src/audio/include/audio_playback.hpp
+++ b/src/audio/include/audio_playback.hpp
@@ -6,6 +6,7 @@
#include <vector>
#include "audio_element.hpp"
+#include "audio_element_handle.hpp"
#include "esp_err.h"
#include "gpio_expander.hpp"
#include "result.hpp"
@@ -16,7 +17,8 @@
namespace audio {
/*
- * TODO.
+ * Creates and links together audio elements into a pipeline. This is the main
+ * entrypoint to playing audio on the system.
*/
class AudioPlayback {
public:
@@ -29,6 +31,10 @@ class AudioPlayback {
AudioPlayback();
~AudioPlayback();
+ /*
+ * Begins playing the file at the given FatFS path. This will interrupt any
+ * currently in-progress playback.
+ */
auto Play(const std::string& filename) -> void;
// Not copyable or movable.
@@ -41,6 +47,7 @@ class AudioPlayback {
StreamBuffer stream_start_;
StreamBuffer stream_end_;
std::vector<std::unique_ptr<StreamBuffer>> element_buffers_;
+ std::vector<std::unique_ptr<AudioElementHandle>> element_handles_;
};
} // namespace audio
diff --git a/src/audio/include/audio_task.hpp b/src/audio/include/audio_task.hpp
index ca75fbd2..9a76ea7e 100644
--- a/src/audio/include/audio_task.hpp
+++ b/src/audio/include/audio_task.hpp
@@ -3,6 +3,7 @@
#include <memory>
#include "audio_element.hpp"
+#include "audio_element_handle.hpp"
namespace audio {
@@ -11,7 +12,8 @@ struct AudioTaskArgs {
};
auto StartAudioTask(const std::string& name,
- std::shared_ptr<IAudioElement> element) -> void;
+ std::shared_ptr<IAudioElement> element)
+ -> std::unique_ptr<AudioElementHandle>;
void AudioTaskMain(void* args);
diff --git a/src/audio/include/chunk.hpp b/src/audio/include/chunk.hpp
index d55e5d9d..5c7e73dd 100644
--- a/src/audio/include/chunk.hpp
+++ b/src/audio/include/chunk.hpp
@@ -19,6 +19,8 @@ namespace audio {
enum ChunkWriteResult {
// Returned when the callback does not write any data.
+ CHUNK_WRITE_OKAY,
+ // Returned when the callback does not write any data.
CHUNK_OUT_OF_DATA,
// Returned when there is an error encoding a chunk header using cbor.
CHUNK_ENCODING_ERROR,
@@ -27,18 +29,32 @@ enum ChunkWriteResult {
CHUNK_WRITE_TIMEOUT,
};
-/*
- * Invokes the given callback to receive data, breaks the received data up into
- * chunks with headers, and writes those chunks to the given output stream.
- *
- * The callback will be invoked with a byte buffer and its size. The callback
- * should write as much data as it can to this buffer, and then return the
- * number of bytes it wrote. Return a value of 0 to indicate that there is no
- * more input to read.
- */
-auto WriteChunksToStream(StreamBuffer* stream,
- std::function<size_t(cpp::span<std::byte>)> callback,
- TickType_t max_wait) -> ChunkWriteResult;
+class ChunkWriter {
+ public:
+ explicit ChunkWriter(StreamBuffer* buffer);
+ ~ChunkWriter();
+
+ auto Reset() -> void;
+
+ auto GetLastMessage() -> cpp::span<std::byte>;
+
+ /*
+ * Invokes the given callback to receive data, breaks the received data up
+ * into chunks with headers, and writes those chunks to the given output
+ * stream.
+ *
+ * The callback will be invoked with a byte buffer and its size. The callback
+ * should write as much data as it can to this buffer, and then return the
+ * number of bytes it wrote. Return a value of 0 to indicate that there is no
+ * more input to read.
+ */
+ auto WriteChunkToStream(std::function<size_t(cpp::span<std::byte>)> callback,
+ TickType_t max_wait) -> ChunkWriteResult;
+
+ private:
+ StreamBuffer* stream_;
+ std::size_t leftover_bytes_ = 0;
+};
enum ChunkReadResult {
CHUNK_READ_OKAY,
diff --git a/src/audio/include/fatfs_audio_input.hpp b/src/audio/include/fatfs_audio_input.hpp
index 21c729be..040b2b54 100644
--- a/src/audio/include/fatfs_audio_input.hpp
+++ b/src/audio/include/fatfs_audio_input.hpp
@@ -46,6 +46,8 @@ class FatfsAudioInput : public IAudioElement {
FIL current_file_;
bool is_file_open_;
+
+ ChunkWriter chunk_writer_;
};
} // namespace audio
diff --git a/src/audio/include/i2s_audio_output.hpp b/src/audio/include/i2s_audio_output.hpp
index 9e59f8fd..75a3be76 100644
--- a/src/audio/include/i2s_audio_output.hpp
+++ b/src/audio/include/i2s_audio_output.hpp
@@ -34,6 +34,7 @@ class I2SAudioOutput : public IAudioElement {
auto ProcessChunk(const cpp::span<std::byte>& chunk)
-> cpp::result<std::size_t, AudioProcessingError> override;
auto ProcessIdle() -> cpp::result<void, AudioProcessingError> override;
+ auto PrepareForPause() -> void override;
I2SAudioOutput(const I2SAudioOutput&) = delete;
I2SAudioOutput& operator=(const I2SAudioOutput&) = delete;
diff --git a/src/audio/include/stream_buffer.hpp b/src/audio/include/stream_buffer.hpp
index cfb4bf9d..3853a53f 100644
--- a/src/audio/include/stream_buffer.hpp
+++ b/src/audio/include/stream_buffer.hpp
@@ -10,13 +10,35 @@
namespace audio {
+/*
+ * A collection of the buffers required for two IAudioElement implementations to
+ * stream data between each other.
+ *
+ * Currently, we use a FreeRTOS MessageBuffer to hold the byte stream, and also
+ * maintain two chunk-sized buffers for the elements to stage their read and
+ * write operations (as MessageBuffer copies the given data into its memory
+ * space). A future optimisation here could be to instead post himem memory
+ * addresses to the message buffer, and then maintain address spaces into which
+ * we map these messages, rather than 'real' allocated buffers as we do now.
+ */
class StreamBuffer {
public:
explicit StreamBuffer(std::size_t chunk_size, std::size_t buffer_size);
~StreamBuffer();
+ /* Returns the handle for the underlying message buffer. */
auto Handle() -> MessageBufferHandle_t* { return &handle_; }
+
+ /*
+ * Returns a chunk-sized staging buffer that should be used *only* by the
+ * reader (sink) element.
+ */
auto ReadBuffer() -> cpp::span<std::byte> { return input_chunk_; }
+
+ /*
+ * Returns a chunk-sized staging buffer that should be used *only* by the
+ * writer (source) element.
+ */
auto WriteBuffer() -> cpp::span<std::byte> { return output_chunk_; }
StreamBuffer(const StreamBuffer&) = delete;