From c7098b6c422b8677a2a4dd3533a9c6501c655759 Mon Sep 17 00:00:00 2001 From: Bernhard Kirchen Date: Sat, 30 Dec 2023 16:45:56 +0100 Subject: [PATCH] Fix: thread-safety and dynamic memory for MessageOutput (#567) this commit re-introduces the changes from #418, which were effectively reverted with d49481097 (merge commit introducing TaskScheduler). these adjustments are important to guarantee unmangled log messages and more importantly, to guarantee that all messages from a particular component are printed to the web console, which most people use to copy messages from when reporting issues. * use dynamic memory to allow handling of arbitrary message lenghts. * keep a message buffer for every task so no task ever mangles the message of another task. * every complete line is written to the serial console and moved to a line buffer for sending them through the websocket. * the websocket is always fed complete lines. * make sure to feed only as many lines as possible to the websocket handler, so that no lines are dropped. * lock all MessageOutput state against concurrent access. * respect HardwareSerial buffer size: the MessageOutput class buffers whole lines of output printed by any task in order to avoid mangling of text. that means we hand over full lines to the HardwareSerial instance, which might be too much in one call to write(buffer, size). we now check the return value of write(buffer, size) and call the function again with the part of the message that could not yet be written by HardwareSerial. --- include/MessageOutput.h | 23 ++++++---- src/MessageOutput.cpp | 98 +++++++++++++++++++++++++++++++---------- 2 files changed, 89 insertions(+), 32 deletions(-) diff --git a/include/MessageOutput.h b/include/MessageOutput.h index 94f915a5d..fc06f385c 100644 --- a/include/MessageOutput.h +++ b/include/MessageOutput.h @@ -2,12 +2,13 @@ #pragma once #include -#include -#include #include +#include +#include #include - -#define BUFFER_SIZE 500 +#include +#include +#include class MessageOutputClass : public Print { public: @@ -21,13 +22,19 @@ class MessageOutputClass : public Print { Task _loopTask; + using message_t = std::vector; + + // we keep a buffer for every task and only write complete lines to the + // serial output and then move them to be pushed through the websocket. + // this way we prevent mangling of messages from different contexts. + std::unordered_map _task_messages; + std::queue _lines; + AsyncWebSocket* _ws = nullptr; - char _buffer[BUFFER_SIZE]; - uint16_t _buff_pos = 0; - uint32_t _lastSend = 0; - bool _forceSend = false; std::mutex _msgLock; + + void serialWrite(message_t const& m); }; extern MessageOutputClass MessageOutput; \ No newline at end of file diff --git a/src/MessageOutput.cpp b/src/MessageOutput.cpp index f602bee15..027ce20c8 100644 --- a/src/MessageOutput.cpp +++ b/src/MessageOutput.cpp @@ -2,10 +2,9 @@ /* * Copyright (C) 2022-2023 Thomas Basler and others */ +#include #include "MessageOutput.h" -#include - MessageOutputClass MessageOutput; void MessageOutputClass::init(Scheduler& scheduler) @@ -18,46 +17,97 @@ void MessageOutputClass::init(Scheduler& scheduler) void MessageOutputClass::register_ws_output(AsyncWebSocket* output) { + std::lock_guard lock(_msgLock); + _ws = output; } +void MessageOutputClass::serialWrite(MessageOutputClass::message_t const& m) +{ + // on ESP32-S3, Serial.flush() blocks until a serial console is attached. + // operator bool() of HWCDC returns false if the device is not attached to + // a USB host. in general it makes sense to skip writing entirely if the + // default serial port is not ready. + if (!Serial) { return; } + + size_t written = 0; + while (written < m.size()) { + written += Serial.write(m.data() + written, m.size() - written); + } + Serial.flush(); +} + size_t MessageOutputClass::write(uint8_t c) { - if (_buff_pos < BUFFER_SIZE) { - std::lock_guard lock(_msgLock); - _buffer[_buff_pos] = c; - _buff_pos++; - } else { - _forceSend = true; + std::lock_guard lock(_msgLock); + + auto res = _task_messages.emplace(xTaskGetCurrentTaskHandle(), message_t()); + auto iter = res.first; + auto& message = iter->second; + + message.push_back(c); + + if (c == '\n') { + serialWrite(message); + _lines.emplace(std::move(message)); + _task_messages.erase(iter); } - return Serial.write(c); + return 1; } -size_t MessageOutputClass::write(const uint8_t* buffer, size_t size) +size_t MessageOutputClass::write(const uint8_t *buffer, size_t size) { std::lock_guard lock(_msgLock); - if (_buff_pos + size < BUFFER_SIZE) { - memcpy(&_buffer[_buff_pos], buffer, size); - _buff_pos += size; + + auto res = _task_messages.emplace(xTaskGetCurrentTaskHandle(), message_t()); + auto iter = res.first; + auto& message = iter->second; + + message.reserve(message.size() + size); + + for (size_t idx = 0; idx < size; ++idx) { + uint8_t c = buffer[idx]; + + message.push_back(c); + + if (c == '\n') { + serialWrite(message); + _lines.emplace(std::move(message)); + message.clear(); + message.reserve(size - idx - 1); + } } - _forceSend = true; - return Serial.write(buffer, size); + if (message.empty()) { _task_messages.erase(iter); } + + return size; } void MessageOutputClass::loop() { - // Send data via websocket if either time is over or buffer is full - if (_forceSend || (millis() - _lastSend > 1000)) { - std::lock_guard lock(_msgLock); - if (_ws && _buff_pos > 0) { - _ws->textAll(_buffer, _buff_pos); - _buff_pos = 0; + std::lock_guard lock(_msgLock); + + // clean up (possibly filled) buffers of deleted tasks + auto map_iter = _task_messages.begin(); + while (map_iter != _task_messages.end()) { + if (eTaskGetState(map_iter->first) == eDeleted) { + map_iter = _task_messages.erase(map_iter); + continue; } - if (_forceSend) { - _buff_pos = 0; + + ++map_iter; + } + + if (!_ws) { + while (!_lines.empty()) { + _lines.pop(); // do not hog memory } - _forceSend = false; + return; + } + + while (!_lines.empty() && _ws->availableForWriteAll()) { + _ws->textAll(std::make_shared(std::move(_lines.front()))); + _lines.pop(); } } \ No newline at end of file