Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Framework/Core/src/ControlService.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ void ControlService::readyToQuit(QuitRequest what)
mDriverClient.tell("CONTROL_ACTION: READY_TO_QUIT_ME");
break;
}
mDriverClient.flushPending();
}

void ControlService::notifyStreamingState(StreamingState state)
Expand All @@ -73,6 +74,7 @@ void ControlService::notifyStreamingState(StreamingState state)
default:
throw std::runtime_error("Unknown streaming state");
}
mDriverClient.flushPending();
}

} // namespace o2::framework
8 changes: 5 additions & 3 deletions Framework/Core/src/ControlServiceHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@ void ControlServiceHelpers::processCommand(std::vector<DeviceInfo>& infos,
std::string const& command,
std::string const& arg)
{
auto doToMatchingPid = [](std::vector<DeviceInfo>& infos, int pid, auto lambda) {
auto doToMatchingPid = [](std::vector<DeviceInfo>& infos, pid_t pid, auto lambda) {
for (auto& deviceInfo : infos) {
if (deviceInfo.pid == pid) {
lambda(deviceInfo);
break;
return lambda(deviceInfo);
}
}
LOGP(error, "Command received for pid {} which does not exists.", pid);
};
LOGP(debug2, "Found control command {} from pid {} with argument {}.", command, pid, arg);
if (command == "QUIT" && arg == "ALL") {
Expand All @@ -59,6 +59,8 @@ void ControlServiceHelpers::processCommand(std::vector<DeviceInfo>& infos,
} else if (command == "NOTIFY_STREAMING_STATE" && arg == "EOS") {
// FIXME: this should really be a policy...
doToMatchingPid(infos, pid, [](DeviceInfo& info) { info.streamingState = StreamingState::EndOfStreaming; });
} else {
LOGP(error, "Unknown command {} with argument {}", command, arg);
}
};

Expand Down
6 changes: 4 additions & 2 deletions Framework/Core/src/DPLWebSocket.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -191,10 +191,11 @@ void websocket_client_callback(uv_stream_t* stream, ssize_t nread, const uv_buf_
}

// FIXME: mNonce should be random
WSDPLClient::WSDPLClient(uv_stream_t* s, DeviceSpec const& spec)
WSDPLClient::WSDPLClient(uv_stream_t* s, DeviceSpec const& spec, std::function<void()> handshake)
: mStream{s},
mNonce{"dGhlIHNhbXBsZSBub25jZQ=="},
mSpec{spec}
mSpec{spec},
mHandshake{handshake}
{
s->data = this;
uv_read_start((uv_stream_t*)s, (uv_alloc_cb)my_alloc_cb, websocket_client_callback);
Expand Down Expand Up @@ -257,6 +258,7 @@ void WSDPLClient::endHeaders()
LOG(INFO) << "Correctly handshaken websocket connection.";
/// Create an appropriate reply
mHandshaken = true;
mHandshake();
}

void ws_client_write_callback(uv_write_t* h, int status)
Expand Down
6 changes: 5 additions & 1 deletion Framework/Core/src/DPLWebSocket.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <memory>
#include <string>
#include <map>
#include <functional>

class uv_stream_s;

Expand Down Expand Up @@ -53,7 +54,9 @@ struct WSDPLHandler : public HTTPParser {
struct WSDPLClient : public HTTPParser {
/// @a stream where the communication happens and @a spec of the device connecting
/// to the driver.
WSDPLClient(uv_stream_t* stream, DeviceSpec const& spec);
/// @a spec the DeviceSpec associated with this client
/// @a handshake a callback to invoke whenever we have a successful handshake
WSDPLClient(uv_stream_t* stream, DeviceSpec const& spec, std::function<void()> handshake);
void replyVersion(std::string_view const& s) override;
void replyCode(std::string_view const& s) override;
void header(std::string_view const& k, std::string_view const& v) override;
Expand All @@ -72,6 +75,7 @@ struct WSDPLClient : public HTTPParser {
std::string mNonce;
DeviceSpec const& mSpec;
bool mHandshaken = false;
std::function<void()> mHandshake;
uv_stream_t* mStream = nullptr;
std::map<std::string, std::string> mHeaders;
};
Expand Down
27 changes: 25 additions & 2 deletions Framework/Core/src/HTTPParser.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "SHA1.h"
#include "Base64.h"
#include <regex>
#include <cassert>

using namespace o2::framework::internal;
namespace o2::framework
Expand Down Expand Up @@ -88,7 +89,19 @@ void encode_websocket_frames(std::vector<uv_buf_t>& outputs, char const* src, si

void decode_websocket(char* start, size_t size, WebSocketHandler& handler)
{
char* cur = start;
// Handle the case the previous message was cut in half
// by the I/O stack.
char* cur = start + handler.remainingSize;
if (handler.remainingSize) {
assert(handler.pendingBuffer);
memcpy(handler.pendingBuffer + handler.pendingSize, start, handler.remainingSize);
size_t pendingProcessingSize = handler.pendingSize + handler.remainingSize;
handler.remainingSize = 0;
// One recursion should be enough.
decode_websocket(handler.pendingBuffer, pendingProcessingSize, handler);
delete[] handler.pendingBuffer;
handler.pendingBuffer = nullptr;
}
handler.beginChunk();
while (cur - start < size) {
WebSocketFrameTiny* header = (WebSocketFrameTiny*)cur;
Expand All @@ -106,9 +119,19 @@ void decode_websocket(char* start, size_t size, WebSocketHandler& handler)
payloadSize = headerSmall->len64;
headerSize = 2 + 8 + (header->mask ? 4 : 0);
}
size_t availableSize = size - (cur - start);
/// FIXME: handle the case in which the header itself is cut
/// apart.
if (availableSize < payloadSize + headerSize) {
handler.remainingSize = payloadSize + headerSize - availableSize;
handler.pendingSize = availableSize;
handler.pendingBuffer = new char[payloadSize + headerSize];
memcpy(handler.pendingBuffer, cur, availableSize);
break;
}
if (header->mask) {
int32_t mask = *(int32_t*)(cur + headerSize - 4);
memunmask(cur, payloadSize, mask);
memunmask(cur + headerSize, payloadSize, mask);
}
handler.frame(cur + headerSize, payloadSize);
cur += headerSize + payloadSize;
Expand Down
7 changes: 7 additions & 0 deletions Framework/Core/src/HTTPParser.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,13 @@ struct WebSocketHandler {
virtual void endFragmentation() {}
/// FIXME: not implemented
virtual void control(char const* frame, size_t s) {}

/// Bytes which are still to be received for the previous, half delivered frame.
size_t remainingSize = 0;
/// Bytes which are already there from the previous, half delivered frame.
size_t pendingSize = 0;
/// A buffer large enough to contain the next frame to be processed.
char* pendingBuffer = nullptr;
};

/// Decoder for websocket data. For now we assume that the frame was not split. However multiple
Expand Down
5 changes: 4 additions & 1 deletion Framework/Core/src/WSDriverClient.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ void on_connect(uv_connect_t* connection, int status)
return;
}
WSDriverClient* client = (WSDriverClient*)connection->data;
client->setDPLClient(std::make_unique<WSDPLClient>(connection->handle, client->spec()));
auto onHandshake = [client]() {
client->flushPending();
};
client->setDPLClient(std::make_unique<WSDPLClient>(connection->handle, client->spec(), onHandshake));
client->sendHandshake();
}

Expand Down
1 change: 1 addition & 0 deletions Framework/Core/src/WSDriverClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

#include "Framework/DriverClient.h"
#include <uv.h>
#include <functional>
#include <memory>
#include <string>
#include <vector>
Expand Down
40 changes: 5 additions & 35 deletions Framework/Core/src/runDataProcessing.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -449,38 +449,6 @@ void updateMetricsNames(DriverInfo& driverInfo, std::vector<DeviceMetricsInfo> c
driverInfo.availableMetrics.swap(result);
}

void processCommand(DeviceInfos& infos,
pid_t pid,
std::string const& command,
std::string const& arg)
{
auto doToMatchingPid = [](std::vector<DeviceInfo>& infos, int pid, auto lambda) {
for (auto& deviceInfo : infos) {
if (deviceInfo.pid == pid) {
lambda(deviceInfo);
break;
}
}
};
LOGP(info, "Found control command {} from pid {} with argument {}.", command, pid, arg);
if (command == "QUIT" && arg == "ALL") {
for (auto& deviceInfo : infos) {
deviceInfo.readyToQuit = true;
}
} else if (command == "QUIT" && arg == "ME") {
doToMatchingPid(infos, pid, [](DeviceInfo& info) { info.readyToQuit = true; });
} else if (command == "NOTIFY_STREAMING_STATE" && arg == "IDLE") {
// FIXME: this should really be a policy...
doToMatchingPid(infos, pid, [](DeviceInfo& info) { info.readyToQuit = true; info.streamingState = StreamingState::Idle; });
} else if (command == "NOTIFY_STREAMING_STATE" && arg == "STREAMING") {
// FIXME: this should really be a policy...
doToMatchingPid(infos, pid, [](DeviceInfo& info) { info.streamingState = StreamingState::Streaming; });
} else if (command == "NOTIFY_STREAMING_STATE" && arg == "EOS") {
// FIXME: this should really be a policy...
doToMatchingPid(infos, pid, [](DeviceInfo& info) { info.streamingState = StreamingState::EndOfStreaming; });
}
};

/// An handler for a websocket message stream.
struct ControlWebSocketHandler : public WebSocketHandler {
ControlWebSocketHandler(DriverServerContext& context)
Expand Down Expand Up @@ -528,6 +496,7 @@ struct ControlWebSocketHandler : public WebSocketHandler {
ParsedConfigMatch configMatch;
ParsedMetricMatch metricMatch;

LOG(debug3) << "Data received: " << std::string_view(frame, s);
if (DeviceMetricsHelper::parseMetric(token, metricMatch)) {
// We use this callback to cache which metrics are needed to provide a
// the DataRelayer view.
Expand All @@ -536,15 +505,16 @@ struct ControlWebSocketHandler : public WebSocketHandler {
didProcessMetric = true;
didHaveNewMetric |= hasNewMetric;
} else if (ControlServiceHelpers::parseControl(token, match)) {
LOG(debug2) << "Found a command, processing for pid " << mPid;
LOG(error) << "Found a command, processing for pid " << mPid;
assert(mContext.infos);
processCommand(*mContext.infos, mPid, match[1].str(), match[2].str());
ControlServiceHelpers::processCommand(*mContext.infos, mPid, match[1].str(), match[2].str());
} else if (DeviceConfigHelper::parseConfig(std::string{" "} + token, configMatch)) {
LOG(debug2) << "Found configuration information for pid " << mPid;
assert(mContext.infos);
DeviceConfigHelper::processConfig(configMatch, (*mContext.infos)[mIndex]);
} else {
LOG(error) << "Unexpected control data: " << std::string_view(frame, s);
}
LOG(debug3) << "Data received: " << std::string_view(frame, s);
}

/// FIXME: not implemented
Expand Down
16 changes: 15 additions & 1 deletion Framework/Core/test/test_HTTPParser.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ class TestWSHandler : public WebSocketHandler
std::vector<size_t> mSize;
void frame(const char* f, size_t s) final
{
mFrame.push_back(f);
mFrame.push_back(strdup(f));
mSize.push_back(s);
}
};
Expand Down Expand Up @@ -212,6 +212,20 @@ BOOST_AUTO_TEST_CASE(HTTPParser1)
BOOST_REQUIRE_EQUAL(std::string(handler.mFrame[1], handler.mSize[1] - 1), std::string(buffer2));
}
{
// Decode a frame which is split in two.
char* buffer = strdup("hello websockets!1");
std::vector<uv_buf_t> encoded;
encode_websocket_frames(encoded, buffer, strlen(buffer) + 1, WebSocketOpCode::Binary, 0);
BOOST_REQUIRE_EQUAL(encoded.size(), 1);

TestWSHandler handler;
decode_websocket(encoded[0].base, encoded[0].len / 2, handler);
decode_websocket(encoded[0].base + encoded[0].len / 2, encoded[0].len - encoded[0].len / 2, handler);
BOOST_REQUIRE_EQUAL(handler.mFrame.size(), 1);
BOOST_REQUIRE_EQUAL(handler.mSize.size(), 1);
BOOST_REQUIRE_EQUAL(std::string(handler.mFrame[0], handler.mSize[0] - 1), std::string(buffer));
}
{} {
std::string checkRequest =
"GET /chat HTTP/1.1\r\n"
"Upgrade: websocket\r\n"
Expand Down