summaryrefslogtreecommitdiff
path: root/src/audio/audio_task.cpp
blob: ce6d724eb1078c0f9c72158ed4e6c3c6280f6c22 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
#include "audio_task.hpp"

#include <stdlib.h>

#include <cstdint>
#include <deque>
#include <memory>

#include "cbor.h"
#include "esp_heap_caps.h"
#include "esp_log.h"
#include "freertos/portmacro.h"
#include "freertos/projdefs.h"
#include "freertos/queue.h"
#include "span.hpp"

#include "arena.hpp"
#include "audio_element.hpp"
#include "chunk.hpp"
#include "stream_event.hpp"
#include "stream_info.hpp"
#include "stream_message.hpp"
#include "sys/_stdint.h"
#include "tasks.hpp"

namespace audio {

static const char* kTag = "task";

auto StartAudioTask(const std::string& name,
                    std::optional<BaseType_t> core_id,
                    std::shared_ptr<IAudioElement> element) -> void {
  auto task_handle = std::make_unique<TaskHandle_t>();

  // Newly created task will free this.
  AudioTaskArgs* args = new AudioTaskArgs{.element = element};

  ESP_LOGI(kTag, "starting audio task %s", name.c_str());
  if (core_id) {
    xTaskCreatePinnedToCore(&AudioTaskMain, name.c_str(),
                            element->StackSizeBytes(), args, kTaskPriorityAudio,
                            task_handle.get(), *core_id);
  } else {
    xTaskCreate(&AudioTaskMain, name.c_str(), element->StackSizeBytes(), args,
                kTaskPriorityAudio, task_handle.get());
  }
}

void AudioTaskMain(void* args) {
  // Nest the body within an additional scope to ensure that destructors are
  // called before the task quits.
  {
    AudioTaskArgs* real_args = reinterpret_cast<AudioTaskArgs*>(args);
    std::shared_ptr<IAudioElement> element = std::move(real_args->element);
    delete real_args;

    // Queue of events that we have received on our input queue, but not yet
    // processed.
    std::deque<std::unique_ptr<StreamEvent>> pending_events;

    // TODO(jacqueline): quit event
    while (true) {
      // First, we pull events from our input queue into pending_events. This
      // keeps us responsive to any events that need to be handled immediately.
      // Then we check if there's any events to flush downstream.
      // Then we pass anything requiring processing to the element.

      bool has_work_to_do =
          (!pending_events.empty() || element->HasUnflushedOutput() ||
           element->HasUnprocessedInput()) &&
          !element->IsOverBuffered();

      if (has_work_to_do) {
        ESP_LOGD(kTag, "checking for events");
      } else {
        ESP_LOGD(kTag, "waiting for events");
      }

      // If we have no new events to process and the element has nothing left to
      // do, then just delay forever waiting for a new event.
      TickType_t ticks_to_wait = has_work_to_do ? 0 : portMAX_DELAY;

      StreamEvent* new_event = nullptr;
      bool has_event =
          xQueueReceive(element->InputEventQueue(), &new_event, ticks_to_wait);

      if (has_event) {
        if (new_event->tag == StreamEvent::UNINITIALISED) {
          ESP_LOGE(kTag, "discarding invalid event!!");
        } else if (new_event->tag == StreamEvent::CHUNK_NOTIFICATION) {
          delete new_event;
        } else if (new_event->tag == StreamEvent::LOG_STATUS) {
          element->ProcessLogStatus();
          if (element->OutputEventQueue() != nullptr) {
            xQueueSendToFront(element->OutputEventQueue(), &new_event, 0);
          } else {
            delete new_event;
          }
        } else {
          // This isn't an event that needs to be actioned immediately. Add it
          // to our work queue.
          pending_events.emplace_back(new_event);
          ESP_LOGD(kTag, "deferring event");
        }
        // Loop again, so that we service all incoming events before doing our
        // possibly expensive processing.
        continue;
      }

      if (element->HasUnflushedOutput()) {
        ESP_LOGD(kTag, "flushing output");
      }

      // We have no new events. Next, see if there's anything that needs to be
      // flushed.
      if (element->HasUnflushedOutput() && !element->FlushBufferedOutput()) {
        // We had things to flush, but couldn't send it all. This probably
        // implies that the downstream element is having issues servicing its
        // input queue, so hold off for a moment before retrying.
        ESP_LOGW(kTag, "failed to flush buffered output");
        vTaskDelay(pdMS_TO_TICKS(100));
        continue;
      }

      if (element->HasUnprocessedInput()) {
        ESP_LOGD(kTag, "processing input events");
        element->Process();
        continue;
      }

      // The element ran out of data, so now it's time to let it process more
      // input.
      while (!pending_events.empty()) {
        std::unique_ptr<StreamEvent> event;
        pending_events.front().swap(event);
        pending_events.pop_front();
        ESP_LOGD(kTag, "processing event, tag %i", event->tag);

        if (event->tag == StreamEvent::STREAM_INFO) {
          ESP_LOGD(kTag, "processing stream info");

          element->ProcessStreamInfo(*event->stream_info);

        } else if (event->tag == StreamEvent::ARENA_CHUNK) {
          ESP_LOGD(kTag, "processing arena data");

          memory::ArenaRef ref(event->arena_chunk);
          auto callback =
              StreamEvent::CreateChunkNotification(element->InputEventQueue());
          if (!xQueueSend(event->source, &callback, 0)) {
            ESP_LOGW(kTag, "failed to send chunk notif");
            continue;
          }

          // TODO(jacqueline): Consider giving the element a full ArenaRef here,
          // so that it can hang on to it and potentially save an alloc+copy.
          element->ProcessChunk({ref.ptr.start, ref.ptr.used_size});

          // TODO: think about whether to do the whole queue
          break;
        }
      }
    }
  }
  vTaskDelete(NULL);
}

}  // namespace audio