From 1cc98659469c6e8fc73f1de8b2e6c710b4f45000 Mon Sep 17 00:00:00 2001 From: Giulio Eulisse <10544+ktf@users.noreply.github.com> Date: Fri, 5 Feb 2021 01:40:29 +0100 Subject: [PATCH 1/2] DPL: improve reliability of --driver-client-backend ws:// - Flush messages after handshake - Fix handling of splitted frames - Move to `ControlServiceHelpers::processCommand()` --- Framework/Core/src/ControlService.cxx | 2 + Framework/Core/src/ControlServiceHelpers.cxx | 8 ++-- Framework/Core/src/DPLWebSocket.cxx | 6 ++- Framework/Core/src/DPLWebSocket.h | 6 ++- Framework/Core/src/HTTPParser.cxx | 26 ++++++++++++- Framework/Core/src/HTTPParser.h | 7 ++++ Framework/Core/src/WSDriverClient.cxx | 5 ++- Framework/Core/src/WSDriverClient.h | 1 + Framework/Core/src/runDataProcessing.cxx | 40 +++----------------- Framework/Core/test/test_HTTPParser.cxx | 16 +++++++- 10 files changed, 72 insertions(+), 45 deletions(-) diff --git a/Framework/Core/src/ControlService.cxx b/Framework/Core/src/ControlService.cxx index adea456534706..f303e21af05fa 100644 --- a/Framework/Core/src/ControlService.cxx +++ b/Framework/Core/src/ControlService.cxx @@ -55,6 +55,7 @@ void ControlService::readyToQuit(QuitRequest what) mDriverClient.tell("CONTROL_ACTION: READY_TO_QUIT_ME"); break; } + mDriverClient.flushPending(); } void ControlService::notifyStreamingState(StreamingState state) @@ -73,6 +74,7 @@ void ControlService::notifyStreamingState(StreamingState state) default: throw std::runtime_error("Unknown streaming state"); } + mDriverClient.flushPending(); } } // namespace o2::framework diff --git a/Framework/Core/src/ControlServiceHelpers.cxx b/Framework/Core/src/ControlServiceHelpers.cxx index 37e1f51c742e2..f22e3be89ddd5 100644 --- a/Framework/Core/src/ControlServiceHelpers.cxx +++ b/Framework/Core/src/ControlServiceHelpers.cxx @@ -35,13 +35,13 @@ void ControlServiceHelpers::processCommand(std::vector& infos, std::string const& command, std::string const& arg) { - auto doToMatchingPid = [](std::vector& infos, int pid, auto lambda) { + auto doToMatchingPid = [](std::vector& infos, pid_t pid, auto lambda) { for (auto& deviceInfo : infos) { if (deviceInfo.pid == pid) { - lambda(deviceInfo); - break; + return lambda(deviceInfo); } } + LOGP(error, "Command received for pid {} which does not exists.", pid); }; LOGP(debug2, "Found control command {} from pid {} with argument {}.", command, pid, arg); if (command == "QUIT" && arg == "ALL") { @@ -59,6 +59,8 @@ void ControlServiceHelpers::processCommand(std::vector& infos, } else if (command == "NOTIFY_STREAMING_STATE" && arg == "EOS") { // FIXME: this should really be a policy... doToMatchingPid(infos, pid, [](DeviceInfo& info) { info.streamingState = StreamingState::EndOfStreaming; }); + } else { + LOGP(error, "Unknown command {} with argument {}", command, arg); } }; diff --git a/Framework/Core/src/DPLWebSocket.cxx b/Framework/Core/src/DPLWebSocket.cxx index ca1d43d77abe3..499cfbc3950e9 100644 --- a/Framework/Core/src/DPLWebSocket.cxx +++ b/Framework/Core/src/DPLWebSocket.cxx @@ -191,10 +191,11 @@ void websocket_client_callback(uv_stream_t* stream, ssize_t nread, const uv_buf_ } // FIXME: mNonce should be random -WSDPLClient::WSDPLClient(uv_stream_t* s, DeviceSpec const& spec) +WSDPLClient::WSDPLClient(uv_stream_t* s, DeviceSpec const& spec, std::function handshake) : mStream{s}, mNonce{"dGhlIHNhbXBsZSBub25jZQ=="}, - mSpec{spec} + mSpec{spec}, + mHandshake{handshake} { s->data = this; uv_read_start((uv_stream_t*)s, (uv_alloc_cb)my_alloc_cb, websocket_client_callback); @@ -257,6 +258,7 @@ void WSDPLClient::endHeaders() LOG(INFO) << "Correctly handshaken websocket connection."; /// Create an appropriate reply mHandshaken = true; + mHandshake(); } void ws_client_write_callback(uv_write_t* h, int status) diff --git a/Framework/Core/src/DPLWebSocket.h b/Framework/Core/src/DPLWebSocket.h index 1776cb812d45c..777acd6b3ceeb 100644 --- a/Framework/Core/src/DPLWebSocket.h +++ b/Framework/Core/src/DPLWebSocket.h @@ -15,6 +15,7 @@ #include #include #include +#include class uv_stream_s; @@ -53,7 +54,9 @@ struct WSDPLHandler : public HTTPParser { struct WSDPLClient : public HTTPParser { /// @a stream where the communication happens and @a spec of the device connecting /// to the driver. - WSDPLClient(uv_stream_t* stream, DeviceSpec const& spec); + /// @a spec the DeviceSpec associated with this client + /// @a handshake a callback to invoke whenever we have a successful handshake + WSDPLClient(uv_stream_t* stream, DeviceSpec const& spec, std::function handshake); void replyVersion(std::string_view const& s) override; void replyCode(std::string_view const& s) override; void header(std::string_view const& k, std::string_view const& v) override; @@ -72,6 +75,7 @@ struct WSDPLClient : public HTTPParser { std::string mNonce; DeviceSpec const& mSpec; bool mHandshaken = false; + std::function mHandshake; uv_stream_t* mStream = nullptr; std::map mHeaders; }; diff --git a/Framework/Core/src/HTTPParser.cxx b/Framework/Core/src/HTTPParser.cxx index 717c5162e8fc0..5664d2fdb8e5b 100644 --- a/Framework/Core/src/HTTPParser.cxx +++ b/Framework/Core/src/HTTPParser.cxx @@ -88,7 +88,19 @@ void encode_websocket_frames(std::vector& outputs, char const* src, si void decode_websocket(char* start, size_t size, WebSocketHandler& handler) { - char* cur = start; + // Handle the case the previous message was cut in half + // by the I/O stack. + char* cur = start + handler.remainingSize; + if (handler.remainingSize) { + assert(handler.pendingBuffer); + memcpy(handler.pendingBuffer + handler.pendingSize, start, handler.remainingSize); + size_t pendingProcessingSize = handler.pendingSize + handler.remainingSize; + handler.remainingSize = 0; + // One recursion should be enough. + decode_websocket(handler.pendingBuffer, pendingProcessingSize, handler); + delete[] handler.pendingBuffer; + handler.pendingBuffer = nullptr; + } handler.beginChunk(); while (cur - start < size) { WebSocketFrameTiny* header = (WebSocketFrameTiny*)cur; @@ -106,9 +118,19 @@ void decode_websocket(char* start, size_t size, WebSocketHandler& handler) payloadSize = headerSmall->len64; headerSize = 2 + 8 + (header->mask ? 4 : 0); } + size_t availableSize = size - (cur - start); + /// FIXME: handle the case in which the header itself is cut + /// apart. + if (availableSize < payloadSize + headerSize) { + handler.remainingSize = payloadSize + headerSize - availableSize; + handler.pendingSize = availableSize; + handler.pendingBuffer = new char[payloadSize + headerSize]; + memcpy(handler.pendingBuffer, cur, availableSize); + break; + } if (header->mask) { int32_t mask = *(int32_t*)(cur + headerSize - 4); - memunmask(cur, payloadSize, mask); + memunmask(cur + headerSize, payloadSize, mask); } handler.frame(cur + headerSize, payloadSize); cur += headerSize + payloadSize; diff --git a/Framework/Core/src/HTTPParser.h b/Framework/Core/src/HTTPParser.h index c880557d6bb86..4b76cf47113ca 100644 --- a/Framework/Core/src/HTTPParser.h +++ b/Framework/Core/src/HTTPParser.h @@ -102,6 +102,13 @@ struct WebSocketHandler { virtual void endFragmentation() {} /// FIXME: not implemented virtual void control(char const* frame, size_t s) {} + + /// Bytes which are still to be received for the previous, half delivered frame. + size_t remainingSize = 0; + /// Bytes which are already there from the previous, half delivered frame. + size_t pendingSize = 0; + /// A buffer large enough to contain the next frame to be processed. + char* pendingBuffer = nullptr; }; /// Decoder for websocket data. For now we assume that the frame was not split. However multiple diff --git a/Framework/Core/src/WSDriverClient.cxx b/Framework/Core/src/WSDriverClient.cxx index ebe8c1765a536..264af70115259 100644 --- a/Framework/Core/src/WSDriverClient.cxx +++ b/Framework/Core/src/WSDriverClient.cxx @@ -25,7 +25,10 @@ void on_connect(uv_connect_t* connection, int status) return; } WSDriverClient* client = (WSDriverClient*)connection->data; - client->setDPLClient(std::make_unique(connection->handle, client->spec())); + auto onHandshake = [client]() { + client->flushPending(); + }; + client->setDPLClient(std::make_unique(connection->handle, client->spec(), onHandshake)); client->sendHandshake(); } diff --git a/Framework/Core/src/WSDriverClient.h b/Framework/Core/src/WSDriverClient.h index 3e8a5dab0bd00..b817a3d1ea4b1 100644 --- a/Framework/Core/src/WSDriverClient.h +++ b/Framework/Core/src/WSDriverClient.h @@ -12,6 +12,7 @@ #include "Framework/DriverClient.h" #include +#include #include #include #include diff --git a/Framework/Core/src/runDataProcessing.cxx b/Framework/Core/src/runDataProcessing.cxx index 0c42f17745aad..fa6284f56c532 100644 --- a/Framework/Core/src/runDataProcessing.cxx +++ b/Framework/Core/src/runDataProcessing.cxx @@ -449,38 +449,6 @@ void updateMetricsNames(DriverInfo& driverInfo, std::vector c driverInfo.availableMetrics.swap(result); } -void processCommand(DeviceInfos& infos, - pid_t pid, - std::string const& command, - std::string const& arg) -{ - auto doToMatchingPid = [](std::vector& infos, int pid, auto lambda) { - for (auto& deviceInfo : infos) { - if (deviceInfo.pid == pid) { - lambda(deviceInfo); - break; - } - } - }; - LOGP(info, "Found control command {} from pid {} with argument {}.", command, pid, arg); - if (command == "QUIT" && arg == "ALL") { - for (auto& deviceInfo : infos) { - deviceInfo.readyToQuit = true; - } - } else if (command == "QUIT" && arg == "ME") { - doToMatchingPid(infos, pid, [](DeviceInfo& info) { info.readyToQuit = true; }); - } else if (command == "NOTIFY_STREAMING_STATE" && arg == "IDLE") { - // FIXME: this should really be a policy... - doToMatchingPid(infos, pid, [](DeviceInfo& info) { info.readyToQuit = true; info.streamingState = StreamingState::Idle; }); - } else if (command == "NOTIFY_STREAMING_STATE" && arg == "STREAMING") { - // FIXME: this should really be a policy... - doToMatchingPid(infos, pid, [](DeviceInfo& info) { info.streamingState = StreamingState::Streaming; }); - } else if (command == "NOTIFY_STREAMING_STATE" && arg == "EOS") { - // FIXME: this should really be a policy... - doToMatchingPid(infos, pid, [](DeviceInfo& info) { info.streamingState = StreamingState::EndOfStreaming; }); - } -}; - /// An handler for a websocket message stream. struct ControlWebSocketHandler : public WebSocketHandler { ControlWebSocketHandler(DriverServerContext& context) @@ -528,6 +496,7 @@ struct ControlWebSocketHandler : public WebSocketHandler { ParsedConfigMatch configMatch; ParsedMetricMatch metricMatch; + LOG(debug3) << "Data received: " << std::string_view(frame, s); if (DeviceMetricsHelper::parseMetric(token, metricMatch)) { // We use this callback to cache which metrics are needed to provide a // the DataRelayer view. @@ -536,15 +505,16 @@ struct ControlWebSocketHandler : public WebSocketHandler { didProcessMetric = true; didHaveNewMetric |= hasNewMetric; } else if (ControlServiceHelpers::parseControl(token, match)) { - LOG(debug2) << "Found a command, processing for pid " << mPid; + LOG(error) << "Found a command, processing for pid " << mPid; assert(mContext.infos); - processCommand(*mContext.infos, mPid, match[1].str(), match[2].str()); + ControlServiceHelpers::processCommand(*mContext.infos, mPid, match[1].str(), match[2].str()); } else if (DeviceConfigHelper::parseConfig(std::string{" "} + token, configMatch)) { LOG(debug2) << "Found configuration information for pid " << mPid; assert(mContext.infos); DeviceConfigHelper::processConfig(configMatch, (*mContext.infos)[mIndex]); + } else { + LOG(error) << "Unexpected control data: " << std::string_view(frame, s); } - LOG(debug3) << "Data received: " << std::string_view(frame, s); } /// FIXME: not implemented diff --git a/Framework/Core/test/test_HTTPParser.cxx b/Framework/Core/test/test_HTTPParser.cxx index 46204de8bfc5d..a8405bc69d17d 100644 --- a/Framework/Core/test/test_HTTPParser.cxx +++ b/Framework/Core/test/test_HTTPParser.cxx @@ -88,7 +88,7 @@ class TestWSHandler : public WebSocketHandler std::vector mSize; void frame(const char* f, size_t s) final { - mFrame.push_back(f); + mFrame.push_back(strdup(f)); mSize.push_back(s); } }; @@ -212,6 +212,20 @@ BOOST_AUTO_TEST_CASE(HTTPParser1) BOOST_REQUIRE_EQUAL(std::string(handler.mFrame[1], handler.mSize[1] - 1), std::string(buffer2)); } { + // Decode a frame which is split in two. + char* buffer = strdup("hello websockets!1"); + std::vector encoded; + encode_websocket_frames(encoded, buffer, strlen(buffer) + 1, WebSocketOpCode::Binary, 0); + BOOST_REQUIRE_EQUAL(encoded.size(), 1); + + TestWSHandler handler; + decode_websocket(encoded[0].base, encoded[0].len / 2, handler); + decode_websocket(encoded[0].base + encoded[0].len / 2, encoded[0].len - encoded[0].len / 2, handler); + BOOST_REQUIRE_EQUAL(handler.mFrame.size(), 1); + BOOST_REQUIRE_EQUAL(handler.mSize.size(), 1); + BOOST_REQUIRE_EQUAL(std::string(handler.mFrame[0], handler.mSize[0] - 1), std::string(buffer)); + } + {} { std::string checkRequest = "GET /chat HTTP/1.1\r\n" "Upgrade: websocket\r\n" From cb8e431b56c0172a2fc2da82ad840bc5f942222c Mon Sep 17 00:00:00 2001 From: Giulio Eulisse <10544+ktf@users.noreply.github.com> Date: Fri, 5 Feb 2021 07:59:25 +0100 Subject: [PATCH 2/2] Update HTTPParser.cxx --- Framework/Core/src/HTTPParser.cxx | 1 + 1 file changed, 1 insertion(+) diff --git a/Framework/Core/src/HTTPParser.cxx b/Framework/Core/src/HTTPParser.cxx index 5664d2fdb8e5b..2a77eb5a906ad 100644 --- a/Framework/Core/src/HTTPParser.cxx +++ b/Framework/Core/src/HTTPParser.cxx @@ -14,6 +14,7 @@ #include "SHA1.h" #include "Base64.h" #include +#include using namespace o2::framework::internal; namespace o2::framework