1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
|
#include "audio_task.hpp"
#include <stdlib.h>
#include <algorithm>
#include <cstddef>
#include <cstdint>
#include <deque>
#include <memory>
#include <variant>
#include "audio_sink.hpp"
#include "cbor.h"
#include "dac.hpp"
#include "esp_err.h"
#include "esp_heap_caps.h"
#include "esp_log.h"
#include "freertos/portmacro.h"
#include "freertos/projdefs.h"
#include "freertos/queue.h"
#include "pipeline.hpp"
#include "span.hpp"
#include "arena.hpp"
#include "audio_element.hpp"
#include "chunk.hpp"
#include "stream_event.hpp"
#include "stream_info.hpp"
#include "stream_message.hpp"
#include "sys/_stdint.h"
#include "tasks.hpp"
namespace audio {
namespace task {
static const char* kTag = "task";
void AudioTaskMain(std::unique_ptr<Pipeline> pipeline, IAudioSink* sink) {
std::optional<StreamInfo::Format> output_format;
std::vector<Pipeline*> elements = pipeline->GetIterationOrder();
std::size_t max_inputs =
(*std::max_element(elements.begin(), elements.end(),
[](Pipeline const* first, Pipeline const* second) {
return first->NumInputs() < second->NumInputs();
}))
->NumInputs();
// We need to be able to simultaneously map all of an element's inputs, plus
// its output. So preallocate that many ranges.
std::vector<MappableRegion<kPipelineBufferSize>> in_regions(max_inputs);
MappableRegion<kPipelineBufferSize> out_region;
std::for_each(in_regions.begin(), in_regions.end(),
[](const auto& region) { assert(region.is_valid); });
assert(out_region.is_valid);
// Each element has exactly one output buffer.
std::vector<HimemAlloc<kPipelineBufferSize>> buffers(elements.size());
std::vector<StreamInfo> buffer_infos(buffers.size());
std::for_each(buffers.begin(), buffers.end(),
[](const HimemAlloc<kPipelineBufferSize>& alloc) {
assert(alloc.is_valid);
});
while (1) {
for (int i = 0; i < elements.size(); i++) {
std::vector<RawStream> raw_in_streams;
elements.at(i)->InStreams(&in_regions, &raw_in_streams);
RawStream raw_out_stream = elements.at(i)->OutStream(&out_region);
// Crop the input and output streams to the ranges that are safe to
// touch. For the input streams, this is the region that contains
// data. For the output stream, this is the region that does *not*
// already contain data.
std::vector<InputStream> in_streams;
std::for_each(raw_in_streams.begin(), raw_in_streams.end(),
[&](RawStream& s) { in_streams.emplace_back(&s); });
OutputStream out_stream(&raw_out_stream);
elements.at(i)->OutputElement()->Process(in_streams, &out_stream);
std::for_each(in_regions.begin(), in_regions.end(),
[](auto&& r) { r.Unmap(); });
out_region.Unmap();
}
RawStream raw_sink_stream = elements.front()->OutStream(&out_region);
InputStream sink_stream(&raw_sink_stream);
if (sink_stream.info().bytes_in_stream == 0) {
out_region.Unmap();
vTaskDelay(pdMS_TO_TICKS(100));
continue;
}
if (!output_format || output_format != sink_stream.info().format) {
// The format of the stream within the sink stream has changed. We
// need to reconfigure the sink, but shouldn't do so until we've fully
// drained the current buffer.
if (xStreamBufferIsEmpty(sink->buffer())) {
ESP_LOGI(kTag, "reconfiguring dac");
output_format = sink_stream.info().format;
sink->Configure(*output_format);
}
}
// We've reconfigured the sink, or it was already configured correctly.
// Send through some data.
if (output_format == sink_stream.info().format &&
!std::holds_alternative<std::monostate>(*output_format)) {
std::size_t sent =
xStreamBufferSend(sink->buffer(), sink_stream.data().data(),
sink_stream.data().size_bytes(), 0);
if (sent > 0) {
ESP_LOGI(
kTag, "sunk %u bytes out of %u (%d %%)", sent,
sink_stream.info().bytes_in_stream,
(int)(((float)sent / (float)sink_stream.info().bytes_in_stream) *
100));
}
sink_stream.consume(sent);
}
out_region.Unmap();
}
}
auto StartPipeline(Pipeline* pipeline, IAudioSink* sink) -> void {
ESP_LOGI(kTag, "starting audio pipeline task");
tasks::StartPersistent<tasks::Type::kAudio>(
[=]() { AudioTaskMain(std::unique_ptr<Pipeline>(pipeline), sink); });
}
} // namespace task
} // namespace audio
|