1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
|
#include "chunk.hpp"
#include <string.h>
#include <cstddef>
#include <cstdint>
#include "cbor_decoder.hpp"
#include "cbor_encoder.hpp"
#include "esp-idf/components/cbor/tinycbor/src/cbor.h"
#include "stream_message.hpp"
namespace audio {
// TODO: tune.
static const std::size_t kMaxChunkSize = 512;
// TODO: tune
static const std::size_t kWorkingBufferSize = kMaxChunkSize * 1.5;
/*
* The amount of space to allocate for the first chunk's header. After the first
* chunk, we have a more concrete idea of the header's size and can allocate
* space for future headers more compactly.
*/
// TODO: measure how big headers tend to be to pick a better value.
static const size_t kInitialHeaderSize = 32;
auto WriteChunksToStream(MessageBufferHandle_t* stream,
uint8_t* working_buffer,
size_t working_buffer_length,
std::function<size_t(uint8_t*, size_t)> callback,
TickType_t max_wait) -> EncodeWriteResult {
size_t header_size = kInitialHeaderSize;
while (1) {
// First, ask the callback for some data to write.
size_t chunk_size = callback(working_buffer + header_size,
working_buffer_length - header_size);
if (chunk_size == 0) {
// They had nothing for us, so bail out.
return CHUNK_OUT_OF_DATA;
}
// Put together a header.
cbor::Encoder encoder(cbor::CONTAINER_ARRAY, 3, working_buffer,
working_buffer_length);
encoder.WriteUnsigned(TYPE_CHUNK_HEADER);
encoder.WriteUnsigned(header_size);
encoder.WriteUnsigned(chunk_size);
size_t new_header_size = header_size;
cpp::result<size_t, CborError> encoder_res = encoder.Finish();
if (encoder_res.has_error()) {
return CHUNK_ENCODING_ERROR;
} else {
// We can now tune the space to allocate for the header to be closer to
// its actual size. We pad this by 2 bytes to allow extra space for the
// chunk size and header size fields to each spill over into another byte
// each.
new_header_size = encoder_res.value() + 2;
}
// Try to write to the buffer. Note the return type here will be either 0 or
// header_size + chunk_size, as MessageBuffer doesn't allow partial writes.
size_t actual_write_size = xMessageBufferSend(
*stream, working_buffer, header_size + chunk_size, max_wait);
header_size = new_header_size;
if (actual_write_size == 0) {
// We failed to write in time, so bail out. This is techinically data loss
// unless the caller wants to go and parse our working buffer, but we
// assume the caller has a good reason to time us out.
return CHUNK_WRITE_TIMEOUT;
}
}
}
ChunkReader::ChunkReader(MessageBufferHandle_t* stream) : stream_(stream) {
working_buffer_ = heap_caps_malloc(kWorkingBufferSize, MALLOC_CAP_SPIRAM);
};
ChunkReader::~ChunkReader() {
free(working_buffer_);
}
auto ChunkReader::Reset() -> void {
leftover_bytes_ = 0;
last_message_size_ = 0;
}
auto ChunkReader::GetLastMessage() -> std::pair<uint8_t*, size_t> {
return std::make_pair(working_buffer_ + leftover_bytes_, last_message_size_);
}
auto ChunkReader::ReadChunkFromStream(
std::function<std::optional<size_t>(uint8_t*, size_t)> callback,
TickType_t max_wait) -> EncodeReadResult {
// First, wait for a message to arrive over the buffer.
last_message_size_ =
xMessageBufferReceive(*stream_, working_buffer_ + leftover_bytes_,
kWorkingBufferSize - leftover_bytes_, max_wait);
if (last_message_size_ == 0) {
return CHUNK_READ_TIMEOUT;
}
auto decoder = cbor::MapDecoder::Create(working_buffer_ + leftover_bytes_,
last_message_size_);
if (decoder.has_error()) {
// Weird; this implies someone is shoving invalid data into the buffer.
return CHUNK_DECODING_ERROR;
}
MessageType type = decoder.value().ParseUnsigned().value_or(TYPE_UNKNOWN);
if (type != TYPE_CHUNK_HEADER) {
// This message wasn't for us, so let the caller handle it.
Reset();
return CHUNK_STREAM_ENDED;
}
// Work the size and position of the chunk.
header_length = decoder.ParseUnsigned().value_or(0);
chunk_length = decoder.ParseUnsigned().value_or(0);
if (decoder.Failed()) {
return CHUNK_DECODING_ERROR;
}
// Now we need to stick the end of the last chunk (if it exists) onto the
// front of the new chunk. Do it this way around bc we assume the old chunk
// is shorter, and therefore faster to move.
uint8_t* combined_buffer = working_buffer_ + header_length - leftover_bytes_;
size_t combined_buffer_size = leftover_bytes_ + chunk_length;
if (leftover_bytes_ > 0) {
memmove(combined_buffer, working_buffer_, leftover_bytes_);
}
// Tell the callback about the new data.
std::optional<size_t> amount_processed =
callback(combined_buffer, combined_buffer_size);
if (!amount_processed) {
return CHUNK_PROCESSING_ERROR;
}
// Prepare for the next iteration.
leftover_bytes_ = combined_buffer_size - amount_processed.value();
if (leftover_bytes_ > 0) {
memmove(working_buffer_, combined_buffer + amount_processed.value(),
leftover_bytes_);
}
}
} // namespace audio
|