diff --git a/CHANGELOG.md b/CHANGELOG.md index 0530868a..d90671c6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,8 @@ - Change `MicroOcpp::ChargePointStatus` into C-style enum ([#309](https://github.com/matth-x/MicroOcpp/pull/309)) - Connector lock disabled by default per `MO_ENABLE_CONNECTOR_LOCK` ([#312](https://github.com/matth-x/MicroOcpp/pull/312)) +- Relaxed temporal order of non-tx-related operations ([#345](https://github.com/matth-x/MicroOcpp/pull/345)) +- Use pseudo-GUIDs as messageId ([#345](https://github.com/matth-x/MicroOcpp/pull/345)) ### Added @@ -16,10 +18,13 @@ - Build flag `MO_REPORT_NOERROR` to report error recovery ([#331](https://github.com/matth-x/MicroOcpp/pull/331)) - Support for `parentIdTag` ([#344](https://github.com/matth-x/MicroOcpp/pull/344)) - Input validation for unsigned int Configs ([#344](https://github.com/matth-x/MicroOcpp/pull/344)) +- Support for TransactionMessageAttempts/-RetryInterval ([#345](https://github.com/matth-x/MicroOcpp/pull/345)) ### Removed - ESP32 built-in HTTP OTA ([#313](https://github.com/matth-x/MicroOcpp/pull/313)) +- Operation store (files op-*.jsn and opstore.jsn) ([#345](https://github.com/matth-x/MicroOcpp/pull/345)) +- Explicit tracking of txNr (file txstore.jsn) ([#345](https://github.com/matth-x/MicroOcpp/pull/345)) ### Fixed diff --git a/CMakeLists.txt b/CMakeLists.txt index a034a1ce..8036f3fa 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -22,8 +22,6 @@ set(MO_SRC src/MicroOcpp/Core/Request.cpp src/MicroOcpp/Core/Connection.cpp src/MicroOcpp/Core/Time.cpp - src/MicroOcpp/Core/RequestQueueStorageStrategy.cpp - src/MicroOcpp/Core/RequestStore.cpp src/MicroOcpp/Operations/Authorize.cpp src/MicroOcpp/Operations/BootNotification.cpp src/MicroOcpp/Operations/CancelReservation.cpp diff --git a/src/MicroOcpp.cpp b/src/MicroOcpp.cpp index 77a6c904..bfde4634 100644 --- a/src/MicroOcpp.cpp +++ b/src/MicroOcpp.cpp @@ -289,7 +289,7 @@ void mocpp_initialize(Connection& connection, const char *bootNotificationCreden new ConnectorsCommon(*context, MO_NUMCONNECTORS, filesystem))); std::vector> connectors; for (unsigned int connectorId = 0; connectorId < MO_NUMCONNECTORS; connectorId++) { - connectors.emplace_back(new Connector(*context, connectorId)); + connectors.emplace_back(new Connector(*context, filesystem, connectorId)); } model.setConnectors(std::move(connectors)); model.setHeartbeatService(std::unique_ptr( diff --git a/src/MicroOcpp/Core/Context.cpp b/src/MicroOcpp/Core/Context.cpp index 4d651557..f3efd3ea 100644 --- a/src/MicroOcpp/Core/Context.cpp +++ b/src/MicroOcpp/Core/Context.cpp @@ -12,10 +12,8 @@ using namespace MicroOcpp; Context::Context(Connection& connection, std::shared_ptr filesystem, uint16_t bootNr, ProtocolVersion version) - : connection(connection), model{version, bootNr}, reqQueue{operationRegistry, &model, filesystem} { + : connection(connection), model{version, bootNr}, reqQueue{connection, operationRegistry} { - preBootQueue = std::unique_ptr(new RequestQueue(operationRegistry, &model, nullptr)); //pre boot queue doesn't need persistency - preBootQueue->setConnection(connection); } Context::~Context() { @@ -24,37 +22,16 @@ Context::~Context() { void Context::loop() { connection.loop(); - - if (preBootQueue) { - preBootQueue->loop(connection); - } else { - reqQueue.loop(connection); - } - + reqQueue.loop(); model.loop(); } -void Context::activatePostBootCommunication() { - //switch from pre boot connection to normal connetion - reqQueue.setConnection(connection); - preBootQueue.reset(); -} - void Context::initiateRequest(std::unique_ptr op) { - if (op) { - reqQueue.sendRequest(std::move(op)); - } -} - -void Context::initiatePreBootOperation(std::unique_ptr op) { - if (op) { - if (preBootQueue) { - preBootQueue->sendRequest(std::move(op)); - } else { - //not in pre boot mode anymore - initiate normally - initiateRequest(std::move(op)); - } + if (!op) { + MO_DBG_ERR("invalid arg"); + return; } + reqQueue.sendRequest(std::move(op)); } Model& Context::getModel() { @@ -73,6 +50,10 @@ Connection& Context::getConnection() { return connection; } +RequestQueue& Context::getRequestQueue() { + return reqQueue; +} + void Context::setFtpClient(std::unique_ptr ftpClient) { this->ftpClient = std::move(ftpClient); } diff --git a/src/MicroOcpp/Core/Context.h b/src/MicroOcpp/Core/Context.h index 1e87c0b3..eb8c0aef 100644 --- a/src/MicroOcpp/Core/Context.h +++ b/src/MicroOcpp/Core/Context.h @@ -25,8 +25,6 @@ class Context { Model model; RequestQueue reqQueue; - std::unique_ptr preBootQueue; - std::unique_ptr ftpClient; public: @@ -35,12 +33,7 @@ class Context { void loop(); - void activatePostBootCommunication(); - void initiateRequest(std::unique_ptr op); - - //for BootNotification and TriggerMessage: initiate operations before the first BootNotification was accepted (pre-boot mode) - void initiatePreBootOperation(std::unique_ptr op); Model& getModel(); @@ -50,6 +43,8 @@ class Context { Connection& getConnection(); + RequestQueue& getRequestQueue(); + void setFtpClient(std::unique_ptr ftpClient); FtpClient *getFtpClient(); }; diff --git a/src/MicroOcpp/Core/Operation.cpp b/src/MicroOcpp/Core/Operation.cpp index 386a43e6..63d76a20 100644 --- a/src/MicroOcpp/Core/Operation.cpp +++ b/src/MicroOcpp/Core/Operation.cpp @@ -17,14 +17,6 @@ const char* Operation::getOperationType(){ return "CustomOperation"; } -void Operation::initiate(StoredOperationHandler *rpcData) { - //called after initiateRequest(anyMsg) -} - -bool Operation::restore(StoredOperationHandler *rpcData) { - return false; -} - std::unique_ptr Operation::createReq() { MO_DBG_ERR("Unsupported operation: createReq() is not implemented"); return createEmptyDocument(); diff --git a/src/MicroOcpp/Core/Operation.h b/src/MicroOcpp/Core/Operation.h index e8079333..5da9aa56 100644 --- a/src/MicroOcpp/Core/Operation.h +++ b/src/MicroOcpp/Core/Operation.h @@ -18,10 +18,8 @@ #ifndef MO_OPERATION_H #define MO_OPERATION_H -#include #include - -#include +#include namespace MicroOcpp { @@ -35,10 +33,6 @@ class Operation { virtual const char* getOperationType(); - virtual void initiate(StoredOperationHandler *rpcData); - - virtual bool restore(StoredOperationHandler *rpcData); - /** * Create the payload for the respective OCPP message * diff --git a/src/MicroOcpp/Core/Request.cpp b/src/MicroOcpp/Core/Request.cpp index 6c5444d1..a2cad9b8 100644 --- a/src/MicroOcpp/Core/Request.cpp +++ b/src/MicroOcpp/Core/Request.cpp @@ -6,7 +6,6 @@ #include #include #include -#include #include #include @@ -14,26 +13,31 @@ #include #include -int unique_id_counter = 1000000; - -using namespace MicroOcpp; - -Request::Request(std::unique_ptr msg) : operation(std::move(msg)) { +namespace MicroOcpp { + unsigned int g_randSeed = 1394827383; + void writeRandomNonsecure(unsigned char *buf, size_t len) { + g_randSeed += mocpp_tick_ms(); + const unsigned int a = 16807; + const unsigned int m = 2147483647; + for (size_t i = 0; i < len; i++) { + g_randSeed = (a * g_randSeed) % m; + buf[i] = g_randSeed; + } + } } -Request::Request() { +using namespace MicroOcpp; +Request::Request(std::unique_ptr msg) : operation(std::move(msg)) { + timeout_start = mocpp_tick_ms(); + debugRequest_start = mocpp_tick_ms(); } Request::~Request(){ } -void Request::setOperation(std::unique_ptr msg){ - operation = std::move(msg); -} - Operation *Request::getOperation(){ return operation.get(); } @@ -54,41 +58,46 @@ void Request::executeTimeout() { timed_out = true; } -unsigned int Request::getTrialNo() { - return trialNo; -} - -void Request::setMessageID(const std::string &id){ +void Request::setMessageID(const char *id){ if (!messageID.empty()){ - MO_DBG_WARN("MessageID is set twice or is set after first usage!"); + MO_DBG_ERR("messageID already defined"); } messageID = id; } -const char *Request::getMessageID() { - return messageID.c_str(); -} +Request::CreateRequestResult Request::createRequest(DynamicJsonDocument& requestJson) { -std::unique_ptr Request::createRequest(){ + if (messageID.empty()) { + unsigned char random [18]; + char guuid [sizeof(random) * 2 + 1]; + + writeRandomNonsecure(random, sizeof(random)); + + for (size_t i = 0; i < sizeof(random); i++) { + snprintf(guuid + i * 2, 3, "%02x", random[i]); + } + guuid[8] = guuid[13] = guuid[18] = guuid[23] = '-'; + messageID = guuid; + } /* * Create the OCPP message */ auto requestPayload = operation->createReq(); if (!requestPayload) { - return nullptr; + return CreateRequestResult::Failure; } /* * Create OCPP-J Remote Procedure Call header */ size_t json_buffsize = JSON_ARRAY_SIZE(4) + (messageID.length() + 1) + requestPayload->capacity(); - auto requestJson = std::unique_ptr(new DynamicJsonDocument(json_buffsize)); + requestJson = DynamicJsonDocument(json_buffsize); - requestJson->add(MESSAGE_TYPE_CALL); //MessageType - requestJson->add(messageID); //Unique message ID - requestJson->add(operation->getOperationType()); //Action - requestJson->add(*requestPayload); //Payload + requestJson.add(MESSAGE_TYPE_CALL); //MessageType + requestJson.add(messageID); //Unique message ID + requestJson.add(operation->getOperationType()); //Action + requestJson.add(*requestPayload); //Payload if (MO_DBG_LEVEL >= MO_DL_DEBUG && mocpp_tick_ms() - debugRequest_start >= 10000) { //print contents on the console debugRequest_start = mocpp_tick_ms(); @@ -96,7 +105,7 @@ std::unique_ptr Request::createRequest(){ char *buf = new char[1024]; size_t len = 0; if (buf) { - len = serializeJson(*requestJson, buf, 1024); + len = serializeJson(requestJson, buf, 1024); } if (!buf || len < 1) { @@ -108,9 +117,7 @@ std::unique_ptr Request::createRequest(){ delete[] buf; } - trialNo++; - - return requestJson; + return CreateRequestResult::Success; } bool Request::receiveResponse(JsonArray response){ @@ -163,10 +170,14 @@ bool Request::receiveResponse(JsonArray response){ } -bool Request::receiveRequest(JsonArray request){ +bool Request::receiveRequest(JsonArray request) { + + if (!request[1].is()) { + MO_DBG_ERR("malformatted msgId"); + return false; + } - std::string msgId = request[1]; - setMessageID(msgId); + setMessageID(request[1].as()); /* * Hand the payload over to the Request object @@ -182,12 +193,7 @@ bool Request::receiveRequest(JsonArray request){ return true; //success } -std::unique_ptr Request::createResponse(){ - - /* - * Create the OCPP message - */ - std::unique_ptr response = nullptr; +Request::CreateResponseResult Request::createResponse(DynamicJsonDocument& response) { bool operationFailure = operation->getErrorCode() != nullptr; @@ -196,18 +202,18 @@ std::unique_ptr Request::createResponse(){ std::unique_ptr payload = operation->createConf(); if (!payload) { - return nullptr; //confirmation message still pending + return CreateResponseResult::Pending; //confirmation message still pending } /* * Create OCPP-J Remote Procedure Call header */ - size_t json_buffsize = JSON_ARRAY_SIZE(3) + (messageID.length() + 1) + payload->capacity(); - response = std::unique_ptr(new DynamicJsonDocument(json_buffsize)); + size_t json_buffsize = JSON_ARRAY_SIZE(3) + payload->capacity(); + response = DynamicJsonDocument(json_buffsize); - response->add(MESSAGE_TYPE_CALLRESULT); //MessageType - response->add(messageID); //Unique message ID - response->add(*payload); //Payload + response.add(MESSAGE_TYPE_CALLRESULT); //MessageType + response.add(messageID.c_str()); //Unique message ID + response.add(*payload); //Payload if (onSendConfListener) { onSendConfListener(payload->as()); @@ -217,127 +223,23 @@ std::unique_ptr Request::createResponse(){ const char *errorCode = operation->getErrorCode(); const char *errorDescription = operation->getErrorDescription(); - std::unique_ptr errorDetails = std::unique_ptr(operation->getErrorDetails()); - if (!errorCode) { //catch corner case when payload is null but errorCode is not set too! - errorCode = "GenericError"; - errorDescription = "Could not create payload (createConf() returns Null)"; - errorDetails = std::unique_ptr(createEmptyDocument()); - } + std::unique_ptr errorDetails = operation->getErrorDetails(); /* * Create OCPP-J Remote Procedure Call header */ size_t json_buffsize = JSON_ARRAY_SIZE(5) - + (messageID.length() + 1) - + strlen(errorCode) + 1 - + strlen(errorDescription) + 1 + errorDetails->capacity(); - response = std::unique_ptr(new DynamicJsonDocument(json_buffsize)); - - response->add(MESSAGE_TYPE_CALLERROR); //MessageType - response->add(messageID); //Unique message ID - response->add(errorCode); - response->add(errorDescription); - response->add(*errorDetails); //Error description - } - - return response; -} - -void Request::initiate(std::unique_ptr opStorage) { - - timeout_start = mocpp_tick_ms(); - debugRequest_start = mocpp_tick_ms(); - - //assign messageID - char id_str [16] = {'\0'}; - sprintf(id_str, "%d", unique_id_counter++); - messageID = std::string {id_str}; - - if (operation) { + response = DynamicJsonDocument(json_buffsize); - /* - * Create OCPP-J Remote Procedure Call header storage entry - */ - opStore = std::move(opStorage); - - if (opStore) { - size_t json_buffsize = JSON_ARRAY_SIZE(3) + (messageID.length() + 1); - auto rpcData = std::unique_ptr(new DynamicJsonDocument(json_buffsize)); - - rpcData->add(MESSAGE_TYPE_CALL); //MessageType - rpcData->add(messageID); //Unique message ID - rpcData->add(operation->getOperationType()); //Action - - opStore->setRpcData(std::move(rpcData)); - } - - operation->initiate(opStore.get()); - - if (opStore) { - opStore->clearBuffer(); - } - } else { - MO_DBG_ERR("Missing operation instance"); - } -} - -bool Request::restore(std::unique_ptr opStorage, Model *model) { - if (!opStorage) { - MO_DBG_ERR("invalid argument"); - return false; - } - - opStore = std::move(opStorage); - - auto rpcData = opStore->getRpcData(); - if (!rpcData) { - MO_DBG_ERR("corrupted storage"); - return false; - } - - messageID = (*rpcData)[1] | std::string(); - std::string opType = (*rpcData)[2] | std::string(); - if (messageID.empty() || opType.empty()) { - MO_DBG_ERR("corrupted storage"); - messageID.clear(); - return false; - } - - int parsedMessageID = -1; - if (sscanf(messageID.c_str(), "%d", &parsedMessageID) == 1) { - if (parsedMessageID > unique_id_counter) { - MO_DBG_DEBUG("restore unique_id_counter with %d", parsedMessageID); - unique_id_counter = parsedMessageID + 1; //next unique value is parsedId + 1 - } - } else { - MO_DBG_ERR("cannot set unique msgID counter"); - //skip this step but don't abort restore + response.add(MESSAGE_TYPE_CALLERROR); //MessageType + response.add(messageID.c_str()); //Unique message ID + response.add(errorCode); + response.add(errorDescription); + response.add(*errorDetails); //Error description } - timeout_period = 0; //disable timeout by default for restored msgs - - if (!strcmp(opType.c_str(), "StartTransaction") && model) { //TODO this will get a nicer solution - operation = std::unique_ptr(new Ocpp16::StartTransaction(*model, nullptr)); - } else if (!strcmp(opType.c_str(), "StopTransaction") && model) { - operation = std::unique_ptr(new Ocpp16::StopTransaction(*model, nullptr)); - } - - if (!operation) { - MO_DBG_ERR("cannot create msg"); - return false; - } - - bool success = operation->restore(opStore.get()); - opStore->clearBuffer(); - - if (success) { - MO_DBG_DEBUG("restored opNr %i: %s", opStore->getOpNr(), operation->getOperationType()); - } else { - MO_DBG_ERR("restore opNr %i error", opStore->getOpNr()); - } - - return success; + return CreateResponseResult::Success; } void Request::setOnReceiveConfListener(OnReceiveConfListener onReceiveConf){ @@ -376,3 +278,11 @@ void Request::setOnAbortListener(OnAbortListener onAbort) { const char *Request::getOperationType() { return operation ? operation->getOperationType() : "UNDEFINED"; } + +void Request::setRequestSent() { + requestSent = true; +} + +bool Request::isRequestSent() { + return requestSent; +} diff --git a/src/MicroOcpp/Core/Request.h b/src/MicroOcpp/Core/Request.h index ebdeac30..300d0d79 100644 --- a/src/MicroOcpp/Core/Request.h +++ b/src/MicroOcpp/Core/Request.h @@ -17,13 +17,12 @@ namespace MicroOcpp { class Operation; class Model; -class StoredOperationHandler; class Request { private: std::string messageID {}; std::unique_ptr operation; - void setMessageID(const std::string &id); + void setMessageID(const char *id); OnReceiveConfListener onReceiveConfListener = [] (JsonObject payload) {}; OnReceiveReqListener onReceiveReqListener = [] (JsonObject payload) {}; OnSendConfListener onSendConfListener = [] (JsonObject payload) {}; @@ -34,28 +33,22 @@ class Request { unsigned long timeout_start = 0; unsigned long timeout_period = 40000; bool timed_out = false; - - unsigned int trialNo = 0; unsigned long debugRequest_start = 0; - std::unique_ptr opStore; + bool requestSent = false; public: Request(std::unique_ptr msg); - Request(); - ~Request(); - void setOperation(std::unique_ptr msg); Operation *getOperation(); void setTimeout(unsigned long timeout); //0 = disable timeout bool isTimeoutExceeded(); - void executeTimeout(); //call Timeout handler - - unsigned int getTrialNo(); //how many times createRequest() has been tried (used for retry behavior) + void executeTimeout(); //call Timeout Listener + void setOnTimeoutListener(OnTimeoutListener onTimeout); /** * Sends the message(s) that belong to the OCPP Operation. This function puts a JSON message on the lower protocol layer. @@ -65,7 +58,11 @@ class Request { * This function is usually called multiple times by the Arduino loop(). On first call, the request is initially sent. In the * succeeding calls, the implementers decide to either resend the request, or do nothing as the operation is still pending. */ - std::unique_ptr createRequest(); + enum class CreateRequestResult { + Success, + Failure + }; + CreateRequestResult createRequest(DynamicJsonDocument& out); /** * Decides if message belongs to this operation instance and if yes, proccesses it. Receives both Confirmations and Errors @@ -86,24 +83,17 @@ class Request { * message. Returns true on success, false otherwise. Returns also true if a CallError has successfully * been sent */ - std::unique_ptr createResponse(); + enum class CreateResponseResult { + Success, + Pending, + Failure + }; - void initiate(std::unique_ptr opStorage); + CreateResponseResult createResponse(DynamicJsonDocument& out); - bool restore(std::unique_ptr opStorage, Model *model); - - StoredOperationHandler *getStorageHandler() {return opStore.get();} - - void setOnReceiveConfListener(OnReceiveConfListener onReceiveConf); - - /** - * Sets a Listener that is called after this machine processed a request by the communication counterpart - */ - void setOnReceiveReqListener(OnReceiveReqListener onReceiveReq); - - void setOnSendConfListener(OnSendConfListener onSendConf); - - void setOnTimeoutListener(OnTimeoutListener onTimeout); + void setOnReceiveConfListener(OnReceiveConfListener onReceiveConf); //listener executed when we received the .conf() to a .req() we sent + void setOnReceiveReqListener(OnReceiveReqListener onReceiveReq); //listener executed when we receive a .req() + void setOnSendConfListener(OnSendConfListener onSendConf); //listener executed when we send a .conf() to a .req() we received void setOnReceiveErrorListener(OnReceiveErrorListener onReceiveError); @@ -120,8 +110,10 @@ class Request { */ void setOnAbortListener(OnAbortListener onAbort); - const char *getMessageID(); const char *getOperationType(); + + void setRequestSent(); + bool isRequestSent(); }; } //end namespace MicroOcpp diff --git a/src/MicroOcpp/Core/RequestQueue.cpp b/src/MicroOcpp/Core/RequestQueue.cpp index b5ca5049..8f8cc3cb 100644 --- a/src/MicroOcpp/Core/RequestQueue.cpp +++ b/src/MicroOcpp/Core/RequestQueue.cpp @@ -2,12 +2,13 @@ // Copyright Matthias Akstaller 2019 - 2024 // MIT License +#include + #include #include #include #include #include -#include #include #include @@ -17,135 +18,238 @@ size_t removePayload(const char *src, size_t src_size, char *dst, size_t dst_siz using namespace MicroOcpp; -RequestQueue::RequestQueue(OperationRegistry& operationRegistry, Model *baseModel, std::shared_ptr filesystem) - : operationRegistry(operationRegistry) { - - if (filesystem) { - initiatedRequests.reset(new PersistentRequestQueue(baseModel, filesystem)); - } else { - initiatedRequests.reset(new VolatileRequestQueue()); +VolatileRequestQueue::VolatileRequestQueue(unsigned int priority) : priority{priority} { + +} + +VolatileRequestQueue::~VolatileRequestQueue() = default; + +void VolatileRequestQueue::loop() { + + /* + * Drop timed out operations + */ + size_t i = 0; + while (i < len) { + size_t index = (front + i) % MO_REQUEST_CACHE_MAXSIZE; + auto& request = requests[index]; + + if (request->isTimeoutExceeded()) { + MO_DBG_INFO("operation timeout: %s", request->getOperationType()); + request->executeTimeout(); + + if (index == front) { + requests[front].reset(); + front = (front + 1) % MO_REQUEST_CACHE_MAXSIZE; + len--; + } else { + requests[index].reset(); + for (size_t i = (index + MO_REQUEST_CACHE_MAXSIZE - front) % MO_REQUEST_CACHE_MAXSIZE; i < len - 1; i++) { + requests[(front + i) % MO_REQUEST_CACHE_MAXSIZE] = std::move(requests[(front + i + 1) % MO_REQUEST_CACHE_MAXSIZE]); + } + len--; + } + } else { + i++; + } + } +} + +unsigned int VolatileRequestQueue::getFrontRequestOpNr() { + if (len == 0) { + return NoOperation; + } + + return priority; +} + +std::unique_ptr VolatileRequestQueue::fetchFrontRequest() { + if (len == 0) { + return nullptr; + } + + std::unique_ptr result = std::move(requests[front]); + front = (front + 1) % MO_REQUEST_CACHE_MAXSIZE; + len--; + + MO_DBG_VERBOSE("front %zu len %zu", front, len); + + return result; +} + +bool VolatileRequestQueue::pushRequestBack(std::unique_ptr request) { + + // Don't queue up multiple StatusNotification messages for the same connectorId + if (strcmp(request->getOperationType(), "StatusNotification") == 0) + { + size_t i = 0; + while (i < len) { + size_t index = (front + i) % MO_REQUEST_CACHE_MAXSIZE; + + if (strcmp(requests[index]->getOperationType(), "StatusNotification")!= 0) + { + i++; + continue; + } + auto new_status_notification = static_cast(request->getOperation()); + auto old_status_notification = static_cast(requests[index]->getOperation()); + if (old_status_notification->getConnectorId() == new_status_notification->getConnectorId()) { + requests[index].reset(); + for (size_t i = (index + MO_REQUEST_CACHE_MAXSIZE - front) % MO_REQUEST_CACHE_MAXSIZE; i < len - 1; i++) { + requests[(front + i) % MO_REQUEST_CACHE_MAXSIZE] = std::move(requests[(front + i + 1) % MO_REQUEST_CACHE_MAXSIZE]); + } + len--; + } else { + i++; + } + } + } + + if (len >= MO_REQUEST_CACHE_MAXSIZE) { + MO_DBG_INFO("Drop cached operation (cache full): %s", requests[front]->getOperationType()); + requests[front]->executeTimeout(); + requests[front].reset(); + front = (front + 1) % MO_REQUEST_CACHE_MAXSIZE; + len--; } + + requests[(front + len) % MO_REQUEST_CACHE_MAXSIZE] = std::move(request); + len++; + return true; } -void RequestQueue::setConnection(Connection& sock) { +RequestQueue::RequestQueue(Connection& connection, OperationRegistry& operationRegistry) + : connection(connection), operationRegistry(operationRegistry) { + ReceiveTXTcallback callback = [this] (const char *payload, size_t length) { return this->receiveMessage(payload, length); }; - sock.setReceiveTXTcallback(callback); + connection.setReceiveTXTcallback(callback); + + memset(sendQueues, 0, sizeof(sendQueues)); + addSendQueue(&defaultSendQueue); + addSendQueue(&preBootSendQueue); } -void RequestQueue::loop(Connection& ocppSock) { +void RequestQueue::loop() { /* - * Sort out timed out operations + * Check if front request timed out */ - initiatedRequests->drop_if([] (std::unique_ptr& op) -> bool { - bool timed_out = op->isTimeoutExceeded(); - if (timed_out) { - MO_DBG_INFO("operation timeout: %s", op->getOperationType()); - op->executeTimeout(); - } - return timed_out; - }); + if (sendReqFront && sendReqFront->isTimeoutExceeded()) { + MO_DBG_INFO("operation timeout: %s", sendReqFront->getOperationType()); + sendReqFront->executeTimeout(); + sendReqFront.reset(); + } + + if (recvReqFront && recvReqFront->isTimeoutExceeded()) { + MO_DBG_INFO("operation timeout: %s", recvReqFront->getOperationType()); + recvReqFront->executeTimeout(); + recvReqFront.reset(); + } + + defaultSendQueue.loop(); + preBootSendQueue.loop(); + + if (!connection.isConnected()) { + return; + } /** - * Send and dequeue a pending confirmation message, if existing. If the first operation is awaiting, - * try with the subsequent operations. + * Send and dequeue a pending confirmation message, if existing * * If a message has been sent, terminate this loop() function. */ - for (auto received = receivedRequests.begin(); received != receivedRequests.end(); ++received) { - - auto response = (*received)->createResponse(); - if (response) { + if (!recvReqFront) { + recvReqFront = recvQueue.fetchFrontRequest(); + } + + if (recvReqFront) { + + DynamicJsonDocument response {0}; + auto ret = recvReqFront->createResponse(response); + + if (ret == Request::CreateResponseResult::Success) { std::string out; - serializeJson(*response, out); + serializeJson(response, out); - bool success = ocppSock.sendTXT(out.c_str(), out.length()); + bool success = connection.sendTXT(out.c_str(), out.length()); if (success) { MO_DBG_TRAFFIC_OUT(out.c_str()); - receivedRequests.erase(received); + recvReqFront.reset(); } return; - } //else: There will be another attempt to send this conf message in a future loop call. - // Go on with the next element in the queue. + } //else: There will be another attempt to send this conf message in a future loop call } /** * Send pending req message */ - auto initedOp = initiatedRequests->front(); - - if (!initedOp) { - //queue empty - return; - } - //check backoff time + if (!sendReqFront) { - if (initedOp->getTrialNo() == 0) { - //first trial -> send immediately - sendBackoffPeriod = 0; - } + unsigned int minOpNr = RequestEmitter::NoOperation; + size_t index = MO_NUM_REQUEST_QUEUES; + for (size_t i = 0; i < MO_NUM_REQUEST_QUEUES && sendQueues[i]; i++) { + auto opNr = sendQueues[i]->getFrontRequestOpNr(); + if (opNr < minOpNr) { + minOpNr = opNr; + index = i; + } + } - if (sockTrackLastConnected != ocppSock.getLastConnected()) { - //connection active (again) -> send immediately - sendBackoffPeriod = std::min(sendBackoffPeriod, 1000UL); + if (index < MO_NUM_REQUEST_QUEUES) { + sendReqFront = sendQueues[index]->fetchFrontRequest(); + } } - sockTrackLastConnected = ocppSock.getLastConnected(); - if (mocpp_tick_ms() - sendBackoffTime < sendBackoffPeriod) { - //still in backoff period - return; - } + if (sendReqFront && !sendReqFront->isRequestSent()) { - auto request = initedOp->createRequest(); + DynamicJsonDocument request {0}; + auto ret = sendReqFront->createRequest(request); - if (!request) { - //request not ready yet or OOM - return; - } + if (ret == Request::CreateRequestResult::Success) { - //send request - std::string out; - serializeJson(*request, out); + //send request + std::string out; + serializeJson(request, out); - bool success = ocppSock.sendTXT(out.c_str(), out.length()); + bool success = connection.sendTXT(out.c_str(), out.length()); - if (success) { - MO_DBG_TRAFFIC_OUT(out.c_str()); + if (success) { + MO_DBG_TRAFFIC_OUT(out.c_str()); + sendReqFront->setRequestSent(); //mask as sent and wait for response / timeout + } - //update backoff time - sendBackoffTime = mocpp_tick_ms(); - sendBackoffPeriod = std::min(sendBackoffPeriod + BACKOFF_PERIOD_INCREMENT, BACKOFF_PERIOD_MAX); + return; + } } } void RequestQueue::sendRequest(std::unique_ptr op){ - if (!op) { - MO_DBG_ERR("Called with null. Ignore"); - return; - } + defaultSendQueue.pushRequestBack(std::move(op)); +} - // Don't queue up multiple StatusNotification messages for the same connectorId - if (strcmp(op->getOperationType(), "StatusNotification") == 0) - { - auto new_status_notification = static_cast(op->getOperation()); - initiatedRequests->drop_if([&new_status_notification] (const std::unique_ptr& operation) { - if (strcmp(operation->getOperationType(), "StatusNotification")!= 0) - { - return false; - } - auto old_status_notification = static_cast(operation->getOperation()); - return old_status_notification->getConnectorId() == new_status_notification->getConnectorId(); - }); +void RequestQueue::sendRequestPreBoot(std::unique_ptr op){ + preBootSendQueue.pushRequestBack(std::move(op)); +} + +void RequestQueue::addSendQueue(RequestEmitter* sendQueue) { + for (size_t i = 0; i < MO_NUM_REQUEST_QUEUES; i++) { + if (!sendQueues[i]) { + sendQueues[i] = sendQueue; + return; + } } + MO_DBG_ERR("exceeded sendQueue capacity"); +} - initiatedRequests->push_back(std::move(op)); +unsigned int RequestQueue::getNextOpNr() { + return nextOpNr++; } bool RequestQueue::receiveMessage(const char* payload, size_t length) { @@ -241,26 +345,11 @@ bool RequestQueue::receiveMessage(const char* payload, size_t length) { */ void RequestQueue::receiveResponse(JsonArray json) { - bool success = false; - - initiatedRequests->drop_if( - [&json, &success] (std::unique_ptr& operation) { - bool match = operation->receiveResponse(json); - if (match) { - success = true; - //operation will be deleted by the surrounding drop_if - } - return match; - }); //executes in order and drops every operation where predicate(op) == true - - if (!success) { - //didn't find matching Request - if (json[0] == MESSAGE_TYPE_CALLERROR) { - MO_DBG_DEBUG("Received CALLERROR did not abort a pending operation"); - } else { - MO_DBG_WARN("Received response doesn't match any pending operation"); - } + if (!sendReqFront || !sendReqFront->receiveResponse(json)) { + MO_DBG_WARN("Received response doesn't match pending operation"); } + + sendReqFront.reset(); } void RequestQueue::receiveRequest(JsonArray json) { @@ -274,7 +363,7 @@ void RequestQueue::receiveRequest(JsonArray json) { void RequestQueue::receiveRequest(JsonArray json, std::unique_ptr op) { op->receiveRequest(json); //execute the operation - receivedRequests.push_back(std::move(op)); //enqueue so loop() plans conf sending + recvQueue.pushRequestBack(std::move(op)); //enqueue so loop() plans conf sending } /* diff --git a/src/MicroOcpp/Core/RequestQueue.h b/src/MicroOcpp/Core/RequestQueue.h index 1c1f576d..7eba5355 100644 --- a/src/MicroOcpp/Core/RequestQueue.h +++ b/src/MicroOcpp/Core/RequestQueue.h @@ -5,46 +5,87 @@ #ifndef MO_REQUESTQUEUE_H #define MO_REQUESTQUEUE_H -#include +#include + +#include -#include #include #include +#ifndef MO_REQUEST_CACHE_MAXSIZE +#define MO_REQUEST_CACHE_MAXSIZE 10 +#endif + +#ifndef MO_NUM_REQUEST_QUEUES +#define MO_NUM_REQUEST_QUEUES 5 +#endif + namespace MicroOcpp { -class OperationRegistry; -class Model; class Connection; +class OperationRegistry; class Request; -class FilesystemAdapter; + +class RequestEmitter { +public: + static const unsigned int NoOperation = std::numeric_limits::max(); + + virtual unsigned int getFrontRequestOpNr() = 0; //return OpNr of front request or NoOperation if queue is empty + virtual std::unique_ptr fetchFrontRequest() = 0; +}; + +class VolatileRequestQueue : public RequestEmitter { +private: + std::unique_ptr requests [MO_REQUEST_CACHE_MAXSIZE]; + size_t front = 0, len = 0; + const unsigned int priority; +public: + VolatileRequestQueue(unsigned int priority = 1); + ~VolatileRequestQueue(); + void loop(); + + unsigned int getFrontRequestOpNr() override; + std::unique_ptr fetchFrontRequest() override; + + bool pushRequestBack(std::unique_ptr request); +}; class RequestQueue { private: + Connection& connection; OperationRegistry& operationRegistry; - - std::unique_ptr initiatedRequests; - std::deque> receivedRequests; + RequestEmitter* sendQueues [MO_NUM_REQUEST_QUEUES]; + VolatileRequestQueue defaultSendQueue {1}; + VolatileRequestQueue preBootSendQueue {0}; + std::unique_ptr sendReqFront; + + VolatileRequestQueue recvQueue; + std::unique_ptr recvReqFront; + + bool receiveMessage(const char* payload, size_t length); //receive from server: either a request or response void receiveRequest(JsonArray json); void receiveRequest(JsonArray json, std::unique_ptr op); void receiveResponse(JsonArray json); - unsigned long sendBackoffTime = 0; - unsigned long sendBackoffPeriod = 0; unsigned long sockTrackLastConnected = 0; - const unsigned long BACKOFF_PERIOD_MAX = 1048576; - const unsigned long BACKOFF_PERIOD_INCREMENT = BACKOFF_PERIOD_MAX / 4; + + unsigned int nextOpNr = 10; //Nr 0 - 9 reservered for internal purposes public: - RequestQueue(OperationRegistry& operationRegistry, Model *baseModel, std::shared_ptr filesystem = nullptr); + RequestQueue() = delete; + RequestQueue(const RequestQueue&) = delete; + RequestQueue(const RequestQueue&&) = delete; - void setConnection(Connection& sock); + RequestQueue(Connection& connection, OperationRegistry& operationRegistry); - void loop(Connection& ocppSock); + void loop(); //polls all reqQueues and decides which request to send (if any) - void sendRequest(std::unique_ptr o); //send an OCPP operation request to the server - - bool receiveMessage(const char* payload, size_t length); //receive from server: either a request or response + void sendRequest(std::unique_ptr request); //send an OCPP operation request to the server; adds request to default queue + void sendRequestPreBoot(std::unique_ptr request); //send an OCPP operation request to the server; adds request to preBootQueue + + void addSendQueue(RequestEmitter* sendQueue); + + unsigned int getNextOpNr(); }; } //end namespace MicroOcpp diff --git a/src/MicroOcpp/Core/RequestQueueStorageStrategy.cpp b/src/MicroOcpp/Core/RequestQueueStorageStrategy.cpp deleted file mode 100644 index 26e24aaa..00000000 --- a/src/MicroOcpp/Core/RequestQueueStorageStrategy.cpp +++ /dev/null @@ -1,180 +0,0 @@ -// matth-x/MicroOcpp -// Copyright Matthias Akstaller 2019 - 2024 -// MIT License - -#include -#include -#include - -#include -#include -#include - -#include - -#define MO_OPERATIONCACHE_MAXSIZE 10 - -using namespace MicroOcpp; - -VolatileRequestQueue::VolatileRequestQueue() { - -} - -VolatileRequestQueue::~VolatileRequestQueue() { - -} - -Request *VolatileRequestQueue::front() { - if (!queue.empty()) { - return queue.front().get(); - } else { - return nullptr; - } -} - -void VolatileRequestQueue::pop_front() { - queue.pop_front(); -} - -void VolatileRequestQueue::push_back(std::unique_ptr op) { - - op->initiate(nullptr); - - if (queue.size() >= MO_OPERATIONCACHE_MAXSIZE) { - MO_DBG_WARN("unsafe number of cached operations"); - } - - queue.push_back(std::move(op)); -} - -void VolatileRequestQueue::drop_if(std::function&)> pred) { - queue.erase(std::remove_if(queue.begin(), queue.end(), pred), queue.end()); -} - -PersistentRequestQueue::PersistentRequestQueue(Model *baseModel, std::shared_ptr filesystem) - : opStore(filesystem), baseModel(baseModel) { } - -PersistentRequestQueue::~PersistentRequestQueue() { - -} - -void PersistentRequestQueue::pop_front() { - - if (head && head->getStorageHandler() && head->getStorageHandler()->getOpNr() >= 0) { - opStore.advanceOpNr(head->getStorageHandler()->getOpNr()); - MO_DBG_DEBUG("advanced %i to %u", head->getStorageHandler()->getOpNr(), opStore.getOpBegin()); - } - - head.reset(); -} - -Request *PersistentRequestQueue::front() { - - if (head) { - // head still loaded - return head.get(); - } - - // check if there are any more operations - if (tailCache.empty() && opStore.getOpBegin() == opStore.getOpEnd()) { - // no more operations - return nullptr; - } - - unsigned int nextOpNr = opStore.getOpBegin(); - - /* - * Find next operation to take as front. Two cases: - * A) [front, tailCache] contains all operations stored on this device - * B) [front, tailCache] does not contain all operations. The next operation after front is not present - * in the cache. Fetch the next operation from the flash to front - */ - - auto found = std::find_if(tailCache.begin(), tailCache.end(), - [nextOpNr] (std::unique_ptr& op) { - return op->getStorageHandler() && - op->getStorageHandler()->getOpNr() >= 0 && - (unsigned int) op->getStorageHandler()->getOpNr() == nextOpNr; - }); - - if (found != tailCache.end()) { - //cache hit -> case A) -> don't load from flash but just take the next element from tail - head = std::move(tailCache.front()); - tailCache.pop_front(); - } else { - //cache miss -> case B) or A) -> try to fetch operation from flash (check for case B)) or take first cached element as front - - std::unique_ptr fetched; - - unsigned int range = (opStore.getOpEnd() + MO_MAX_OPNR - nextOpNr) % MO_MAX_OPNR; - for (size_t i = 0; i < range; i++) { - auto storageHandler = opStore.makeOpHandler(); - bool exists = storageHandler->restore(nextOpNr); - if (exists) { - //case B) -> load operation from flash and take it as front element - - fetched = makeRequest(); - - bool success = fetched->restore(std::move(storageHandler), baseModel); - - if (success) { - //loaded operation from flash and will place it at head position of the queue - break; - } - - MO_DBG_ERR("could not restore operation"); - fetched.reset(); - opStore.advanceOpNr(nextOpNr); - nextOpNr = opStore.getOpBegin(); - } else { - //didn't find anything at this slot. Try next slot - nextOpNr++; - nextOpNr %= MO_MAX_OPNR; - } - } - - if (fetched) { - //found operation in flash -> case B) - head = std::move(fetched); - MO_DBG_DEBUG("restored operation from flash"); - } else { - //no operation anymore in flash -> case A) -> take next queued operation in tailCache - if (tailCache.empty()) { - //no operations anymore - } else { - head = std::move(tailCache.front()); - tailCache.pop_front(); - } - } - } - - MO_DBG_VERBOSE("reloaded head"); - - if (!head) { - MO_DBG_VERBOSE("illegal state"); - } - - return head.get(); -} - -void PersistentRequestQueue::push_back(std::unique_ptr op) { - - op->initiate(opStore.makeOpHandler()); - - if (tailCache.size() >= MO_OPERATIONCACHE_MAXSIZE) { - MO_DBG_INFO("Replace cached operation (cache full): %s", tailCache.front()->getOperationType()); - tailCache.front()->executeTimeout(); - tailCache.pop_front(); - } - - tailCache.push_back(std::move(op)); -} - -void PersistentRequestQueue::drop_if(std::function&)> pred) { - - while (head && pred(head)) { - pop_front(); - } - - tailCache.erase(std::remove_if(tailCache.begin(), tailCache.end(), pred), tailCache.end()); -} diff --git a/src/MicroOcpp/Core/RequestQueueStorageStrategy.h b/src/MicroOcpp/Core/RequestQueueStorageStrategy.h deleted file mode 100644 index 458b00d0..00000000 --- a/src/MicroOcpp/Core/RequestQueueStorageStrategy.h +++ /dev/null @@ -1,70 +0,0 @@ -// matth-x/MicroOcpp -// Copyright Matthias Akstaller 2019 - 2024 -// MIT License - -#ifndef MO_REQUESTQUEUESTORAGESTRATEGY_H -#define MO_REQUESTQUEUESTORAGESTRATEGY_H - -#include - -#include -#include -#include - -namespace MicroOcpp { - -class FilesystemAdapter; -class Request; -class Model; - -class RequestQueueStorageStrategy { -public: - virtual ~RequestQueueStorageStrategy() = default; - - virtual Request *front() = 0; - virtual void pop_front() = 0; - - virtual void push_back(std::unique_ptr op) = 0; - - virtual void drop_if(std::function&)> pred) = 0; //drops operations from this queue where pred(operation) == true. Executes pred in order -}; - -class VolatileRequestQueue : public RequestQueueStorageStrategy { -private: - std::deque> queue; -public: - VolatileRequestQueue(); - ~VolatileRequestQueue(); - - Request *front() override; - void pop_front() override; - - void push_back(std::unique_ptr op) override; - - void drop_if(std::function&)> pred) override; //drops operations from this queue where pred(operation) == true. Executes pred in order -}; - -class PersistentRequestQueue : public RequestQueueStorageStrategy { -private: - RequestStore opStore; - Model *baseModel; - - std::unique_ptr head; - std::deque> tailCache; -public: - - PersistentRequestQueue(Model *baseModel, std::shared_ptr filesystem); - ~PersistentRequestQueue(); - - Request *front() override; - void pop_front() override; - - void push_back(std::unique_ptr op) override; - - void drop_if(std::function&)> pred) override; //drops operations from this queue where pred(operation) == true. Executes pred in order - -}; - -} - -#endif diff --git a/src/MicroOcpp/Core/RequestStore.cpp b/src/MicroOcpp/Core/RequestStore.cpp deleted file mode 100644 index 601eb34b..00000000 --- a/src/MicroOcpp/Core/RequestStore.cpp +++ /dev/null @@ -1,203 +0,0 @@ -// matth-x/MicroOcpp -// Copyright Matthias Akstaller 2019 - 2024 -// MIT License - -#include -#include -#include -#include -#include - -#define MO_OPSTORE_FN MO_FILENAME_PREFIX "opstore.jsn" - -#define MO_OPHISTORY_SIZE 3 - -using namespace MicroOcpp; - -bool StoredOperationHandler::commit() { - if (isPersistent) { - MO_DBG_ERR("cannot call two times"); - return false; - } - if (!filesystem) { - MO_DBG_DEBUG("filesystem"); - return false; - } - - if (!rpc || !payload) { - MO_DBG_ERR("unitialized"); - return false; - } - - opNr = context.reserveOpNr(); - - char fn [MO_MAX_PATH_SIZE] = {'\0'}; - auto ret = snprintf(fn, MO_MAX_PATH_SIZE, MO_FILENAME_PREFIX "op" "-%u.jsn", opNr); - if (ret < 0 || ret >= MO_MAX_PATH_SIZE) { - MO_DBG_ERR("fn error: %i", ret); - return false; - } - - DynamicJsonDocument doc {JSON_OBJECT_SIZE(2) + rpc->capacity() + payload->capacity()}; - doc["rpc"] = *rpc; - doc["payload"] = *payload; - - if (!FilesystemUtils::storeJson(filesystem, fn, doc)) { - MO_DBG_ERR("FS error"); - return false; - } - - isPersistent = true; - return true; -} - -bool StoredOperationHandler::restore(unsigned int opNrToLoad) { - if (isPersistent) { - MO_DBG_ERR("cannot restore after commit"); - return false; - } - if (!filesystem) { - MO_DBG_DEBUG("filesystem"); - return false; - } - - opNr = opNrToLoad; - - char fn [MO_MAX_PATH_SIZE] = {'\0'}; - auto ret = snprintf(fn, MO_MAX_PATH_SIZE, MO_FILENAME_PREFIX "op" "-%u.jsn", opNr); - if (ret < 0 || ret >= MO_MAX_PATH_SIZE) { - MO_DBG_ERR("fn error: %i", ret); - return false; - } - - size_t msize; - if (filesystem->stat(fn, &msize) != 0) { - MO_DBG_VERBOSE("operation %u does not exist", opNr); - return false; - } - - auto doc = FilesystemUtils::loadJson(filesystem, fn); - if (!doc) { - MO_DBG_ERR("FS error"); - return false; - } - - JsonVariant rpc_restore = (*doc)["rpc"]; - JsonVariant payload_restore = (*doc)["payload"]; - - rpc = std::unique_ptr(new DynamicJsonDocument(rpc_restore.memoryUsage())); - payload = std::unique_ptr(new DynamicJsonDocument(payload_restore.memoryUsage())); - - *rpc = rpc_restore; - *payload = payload_restore; - - isPersistent = true; - return true; -} - -RequestStore::RequestStore(std::shared_ptr filesystem) : filesystem(filesystem) { - opBeginInt = declareConfiguration(MO_CONFIG_EXT_PREFIX "opBegin", 0, MO_OPSTORE_FN, false, false, false); - configuration_load(MO_OPSTORE_FN); - - if (!opBeginInt || opBeginInt->getInt() < 0) { - MO_DBG_ERR("init failure"); - } else if (filesystem) { - opEnd = opBeginInt->getInt(); - - unsigned int misses = 0; - unsigned int i = opEnd; - while (misses < 3) { - char fn [MO_MAX_PATH_SIZE] = {'\0'}; - auto ret = snprintf(fn, MO_MAX_PATH_SIZE, MO_FILENAME_PREFIX "op" "-%u.jsn", i); - if (ret < 0 || ret >= MO_MAX_PATH_SIZE) { - MO_DBG_ERR("fn error: %i", ret); - misses++; - i = (i + 1) % MO_MAX_OPNR; - continue; - } - - size_t msize; - if (filesystem->stat(fn, &msize) != 0) { - MO_DBG_DEBUG("operation %u does not exist", i); - misses++; - i = (i + 1) % MO_MAX_OPNR; - continue; - } - - //file exists - misses = 0; - i = (i + 1) % MO_MAX_OPNR; - opEnd = i; - } - } -} - -std::unique_ptr RequestStore::makeOpHandler() { - return std::unique_ptr(new StoredOperationHandler(*this, filesystem)); -} - -unsigned int RequestStore::reserveOpNr() { - MO_DBG_DEBUG("reserved opNr %u", opEnd); - auto res = opEnd; - opEnd++; - opEnd %= MO_MAX_OPNR; - return res; -} - -void RequestStore::advanceOpNr(unsigned int oldOpNr) { - if (!opBeginInt || opBeginInt->getInt() < 0) { - MO_DBG_ERR("init failure"); - return; - } - - if (oldOpNr != (unsigned int) opBeginInt->getInt()) { - if ((oldOpNr + MO_MAX_OPNR - (unsigned int) opBeginInt->getInt()) % MO_MAX_OPNR < 100) { - MO_DBG_ERR("synchronization failure - try to fix"); - } else { - MO_DBG_ERR("synchronization failure"); - return; - } - } - - unsigned int opNr = (oldOpNr + 1) % MO_MAX_OPNR; - - //delete range [opBeginInt->getInt() ... opNr) - - unsigned int rangeSize = (opNr + MO_MAX_OPNR - (unsigned int) opBeginInt->getInt()) % MO_MAX_OPNR; - - MO_DBG_DEBUG("delete %u operations", rangeSize); - - for (unsigned int i = 0; i < rangeSize; i++) { - unsigned int op = ((unsigned int) opBeginInt->getInt() + i + MO_MAX_OPNR - MO_OPHISTORY_SIZE) % MO_MAX_OPNR; - - char fn [MO_MAX_PATH_SIZE] = {'\0'}; - auto ret = snprintf(fn, MO_MAX_PATH_SIZE, MO_FILENAME_PREFIX "op" "-%u.jsn", op); - if (ret < 0 || ret >= MO_MAX_PATH_SIZE) { - MO_DBG_ERR("fn error: %i", ret); - break; - } - - size_t msize; - if (filesystem->stat(fn, &msize) != 0) { - MO_DBG_DEBUG("operation %u does not exist", i); - continue; - } - - bool success = filesystem->remove(fn); - if (!success) { - MO_DBG_ERR("error deleting %s", fn); - } - } - - MO_DBG_DEBUG("advance opBegin: %u", opNr); - opBeginInt->setInt(opNr); - configuration_save(); -} - -unsigned int RequestStore::getOpBegin() { - if (!opBeginInt || opBeginInt->getInt() < 0) { - MO_DBG_ERR("invalid state"); - return 0; - } - return (unsigned int) opBeginInt->getInt(); -} diff --git a/src/MicroOcpp/Core/RequestStore.h b/src/MicroOcpp/Core/RequestStore.h deleted file mode 100644 index b31f0cd5..00000000 --- a/src/MicroOcpp/Core/RequestStore.h +++ /dev/null @@ -1,70 +0,0 @@ -// matth-x/MicroOcpp -// Copyright Matthias Akstaller 2019 - 2024 -// MIT License - -#ifndef MO_REQUESTSTORE_H -#define MO_REQUESTSTORE_H - -#include -#include -#include - -#define MO_MAX_OPNR 10000 - -namespace MicroOcpp { - -class RequestStore; -class FilesystemAdapter; -class Configuration; - -class StoredOperationHandler { -private: - RequestStore& context; - int opNr = -1; - std::shared_ptr filesystem; - - std::unique_ptr rpc; - std::unique_ptr payload; - - bool isPersistent = false; - -public: - StoredOperationHandler(RequestStore& context, std::shared_ptr filesystem) : context(context), filesystem(filesystem) {} - - void setRpcData(std::unique_ptr rpc) {this->rpc = std::move(rpc);} - void setPayload(std::unique_ptr payload) {this->payload = std::move(payload);} - - std::unique_ptr getRpcData() {return std::move(rpc);} - std::unique_ptr getPayload() {return std::move(payload);} - - bool commit(); - void clearBuffer() {rpc.reset(); payload.reset();} - - bool restore(unsigned int opNr); - - int getOpNr() {return isPersistent ? opNr : -1;} -}; - -class RequestStore { -private: - std::shared_ptr filesystem; - std::shared_ptr opBeginInt; //Tx-related operations are stored; index of the first pending operation - unsigned int opEnd = 0; //one place after last number - -public: - RequestStore() = delete; - RequestStore(std::shared_ptr filesystem); - - std::unique_ptr makeOpHandler(); - std::unique_ptr fetchOpHandler(unsigned int opNr); - - unsigned int reserveOpNr(); - void advanceOpNr(unsigned int oldOpNr); - - unsigned int getOpBegin(); - unsigned int getOpEnd() {return opEnd;} -}; - -} - -#endif diff --git a/src/MicroOcpp/Core/SimpleRequestFactory.cpp b/src/MicroOcpp/Core/SimpleRequestFactory.cpp index 1fa5bd5b..dc0f38c2 100644 --- a/src/MicroOcpp/Core/SimpleRequestFactory.cpp +++ b/src/MicroOcpp/Core/SimpleRequestFactory.cpp @@ -11,18 +11,11 @@ std::unique_ptr makeRequest(std::unique_ptr operation){ if (operation == nullptr) { return nullptr; } - auto request = makeRequest(); - request->setOperation(std::move(operation)); - return request; + return std::unique_ptr(new Request(std::move(operation))); } std::unique_ptr makeRequest(Operation *operation) { return makeRequest(std::unique_ptr(operation)); } -std::unique_ptr makeRequest(){ - auto result = std::unique_ptr(new Request()); - return result; -} - } //end namespace MicroOcpp diff --git a/src/MicroOcpp/Core/SimpleRequestFactory.h b/src/MicroOcpp/Core/SimpleRequestFactory.h index 20f84727..45c7e0a0 100644 --- a/src/MicroOcpp/Core/SimpleRequestFactory.h +++ b/src/MicroOcpp/Core/SimpleRequestFactory.h @@ -5,17 +5,14 @@ #ifndef MO_SIMPLEREQUESTFACTORY_H #define MO_SIMPLEREQUESTFACTORY_H -#include -#include #include -#include + +#include namespace MicroOcpp { class Operation; -std::unique_ptr makeRequest(); - std::unique_ptr makeRequest(std::unique_ptr op); std::unique_ptr makeRequest(Operation *op); //takes ownership of op diff --git a/src/MicroOcpp/Model/Boot/BootService.cpp b/src/MicroOcpp/Model/Boot/BootService.cpp index cd89ca66..bf485ad3 100644 --- a/src/MicroOcpp/Model/Boot/BootService.cpp +++ b/src/MicroOcpp/Model/Boot/BootService.cpp @@ -63,11 +63,6 @@ void BootService::loop() { storeBootStats(filesystem, bootstats); } - if (!activatedPostBootCommunication && status == RegistrationStatus::Accepted) { - context.activatePostBootCommunication(); - activatedPostBootCommunication = true; - } - if (!activatedModel && (status == RegistrationStatus::Accepted || preBootTransactionsBool->getBool())) { context.getModel().activateTasks(); activatedModel = true; @@ -87,7 +82,7 @@ void BootService::loop() { */ auto bootNotification = makeRequest(new Ocpp16::BootNotification(context.getModel(), getChargePointCredentials())); bootNotification->setTimeout(interval_s * 1000UL); - context.initiatePreBootOperation(std::move(bootNotification)); + context.getRequestQueue().sendRequestPreBoot(std::move(bootNotification)); lastBootNotification = mocpp_tick_ms(); } diff --git a/src/MicroOcpp/Model/Boot/BootService.h b/src/MicroOcpp/Model/Boot/BootService.h index 263e7055..e00559bd 100644 --- a/src/MicroOcpp/Model/Boot/BootService.h +++ b/src/MicroOcpp/Model/Boot/BootService.h @@ -48,7 +48,6 @@ class BootService { std::shared_ptr preBootTransactionsBool; bool activatedModel = false; - bool activatedPostBootCommunication = false; unsigned long firstExecutionTimestamp = 0; bool executedFirstTime = false; diff --git a/src/MicroOcpp/Model/ConnectorBase/Connector.cpp b/src/MicroOcpp/Model/ConnectorBase/Connector.cpp index 5fc1175b..db137d26 100644 --- a/src/MicroOcpp/Model/ConnectorBase/Connector.cpp +++ b/src/MicroOcpp/Model/ConnectorBase/Connector.cpp @@ -33,8 +33,10 @@ using namespace MicroOcpp; -Connector::Connector(Context& context, int connectorId) - : context(context), model(context.getModel()), connectorId{connectorId} { +Connector::Connector(Context& context, std::shared_ptr filesystem, unsigned int connectorId) + : context(context), model(context.getModel()), filesystem(filesystem), connectorId(connectorId) { + + context.getRequestQueue().addSendQueue(this); //register at RequestQueue as Request emitter snprintf(availabilityBoolKey, sizeof(availabilityBoolKey), MO_CONFIG_EXT_PREFIX "AVAIL_CONN_%d", connectorId); availabilityBool = declareConfiguration(availabilityBoolKey, true, MO_KEYVALUE_FN, false, false, false); @@ -65,12 +67,57 @@ Connector::Connector(Context& context, int connectorId) txStartOnPowerPathClosedBool = declareConfiguration(MO_CONFIG_EXT_PREFIX "TxStartOnPowerPathClosed", false); + transactionMessageAttemptsInt = declareConfiguration("TransactionMessageAttempts", 3); + transactionMessageRetryIntervalInt = declareConfiguration("TransactionMessageRetryInterval", 60); + if (!availabilityBool) { MO_DBG_ERR("Cannot declare availabilityBool"); } + char txFnamePrefix [30]; + snprintf(txFnamePrefix, sizeof(txFnamePrefix), "tx-%u-", connectorId); + size_t txFnamePrefixLen = strlen(txFnamePrefix); + + unsigned int txNrPivot = std::numeric_limits::max(); + + filesystem->ftw_root([this, txFnamePrefix, txFnamePrefixLen, &txNrPivot] (const char *fname) { + if (!strncmp(fname, txFnamePrefix, txFnamePrefixLen)) { + unsigned int parsedTxNr = 0; + for (size_t i = txFnamePrefixLen; fname[i] >= '0' && fname[i] <= '9'; i++) { + parsedTxNr *= 10; + parsedTxNr += fname[i] - '0'; + } + + if (txNrPivot == std::numeric_limits::max()) { + txNrPivot = parsedTxNr; + txNrBegin = parsedTxNr; + txNrBack = (parsedTxNr + 1) % MAX_TX_CNT; + return 0; + } + + if ((parsedTxNr + MAX_TX_CNT - txNrPivot) % MAX_TX_CNT < MAX_TX_CNT / 2) { + //parsedTxNr is after pivot point + if ((parsedTxNr + 1 + MAX_TX_CNT - txNrPivot) % MAX_TX_CNT > (txNrBack + MAX_TX_CNT - txNrPivot) % MAX_TX_CNT) { + txNrBack = (parsedTxNr + 1) % MAX_TX_CNT; + } + } else if ((txNrPivot + MAX_TX_CNT - parsedTxNr) % MAX_TX_CNT < MAX_TX_CNT / 2) { + //parsedTxNr is before pivot point + if ((txNrPivot + MAX_TX_CNT - parsedTxNr) % MAX_TX_CNT > (txNrPivot + MAX_TX_CNT - txNrBegin) % MAX_TX_CNT) { + txNrBegin = parsedTxNr; + } + } + + MO_DBG_DEBUG("found %s%u.jsn - Internal range from %u to %u (exclusive)", txFnamePrefix, parsedTxNr, txNrBegin, txNrBack); + } + return 0; + }); + + MO_DBG_DEBUG("found %u transactions for connector %u. Internal range from %u to %u (exclusive)", (txNrBack + MAX_TX_CNT - txNrBegin) % MAX_TX_CNT, connectorId, txNrBegin, txNrBack); + txNrFront = txNrBegin; + if (model.getTransactionStore()) { - transaction = model.getTransactionStore()->getLatestTransaction(connectorId); + unsigned int txNrLatest = (txNrBack + MAX_TX_CNT - 1) % MAX_TX_CNT; //txNr of the most recent tx on flash + transaction = model.getTransactionStore()->getTransaction(connectorId, txNrLatest); //returns nullptr if txNrLatest does not exist on flash } else { MO_DBG_ERR("must initialize TxStore before Connector"); } @@ -201,11 +248,12 @@ void Connector::loop() { } } - if (transaction && transaction->isAborted() && MO_TX_CLEAN_ABORTED) { - //If the transaction is aborted (invalidated before started), delete all artifacts from flash + if (transaction && ((transaction->isAborted() && MO_TX_CLEAN_ABORTED) || (transaction->isSilent() && transaction->getStopSync().isRequested()))) { + //If the transaction is aborted (invalidated before started) or is silent and has stopped. Delete all artifacts from flash //This is an optimization. The memory management will attempt to remove those files again later bool removed = true; if (auto mService = model.getMeteringService()) { + mService->abortTxMeterData(connectorId); removed &= mService->removeTxMeterData(connectorId, transaction->getTxNr()); } @@ -214,20 +262,26 @@ void Connector::loop() { } if (removed) { - model.getTransactionStore()->setTxEnd(connectorId, transaction->getTxNr()); //roll back creation of last tx entry + if (txNrFront == txNrBack) { + txNrFront = transaction->getTxNr(); + } + txNrBack = transaction->getTxNr(); //roll back creation of last tx entry } - MO_DBG_DEBUG("collect aborted transaction %u-%u %s", connectorId, transaction->getTxNr(), removed ? "" : "failure"); + MO_DBG_DEBUG("collect aborted or silent transaction %u-%u %s", connectorId, transaction->getTxNr(), removed ? "" : "failure"); + MO_DBG_VERBOSE("txNrBegin=%u, txNrFront=%u, txNrBack=%u", txNrBegin, txNrFront, txNrBack); transaction = nullptr; } if (transaction && transaction->isAborted()) { MO_DBG_DEBUG("collect aborted transaction %u-%u", connectorId, transaction->getTxNr()); + MO_DBG_VERBOSE("txNrBegin=%u, txNrFront=%u, txNrBack=%u", txNrBegin, txNrFront, txNrBack); transaction = nullptr; } if (transaction && transaction->getStopSync().isRequested()) { MO_DBG_DEBUG("collect obsolete transaction %u-%u", connectorId, transaction->getTxNr()); + MO_DBG_VERBOSE("txNrBegin=%u, txNrFront=%u, txNrBack=%u", txNrBegin, txNrFront, txNrBack); transaction = nullptr; } @@ -300,36 +354,27 @@ void Connector::loop() { transaction->setStartBootNr(model.getBootNr()); } - updateTxNotification(TxNotification::StartTx); + transaction->getStartSync().setRequested(); + transaction->getStartSync().setOpNr(context.getRequestQueue().getNextOpNr()); if (transaction->isSilent()) { MO_DBG_INFO("silent Transaction: omit StartTx"); - transaction->getStartSync().setRequested(); transaction->getStartSync().confirm(); - transaction->commit(); - return; + } else { + //normal transaction, record txMeterData + if (model.getMeteringService()) { + model.getMeteringService()->beginTxMeterData(transaction.get()); + } } transaction->commit(); - if (model.getMeteringService()) { - model.getMeteringService()->beginTxMeterData(transaction.get()); - } - - auto startTx = makeRequest(new Ocpp16::StartTransaction(model, transaction)); - startTx->setTimeout(0); - startTx->setOnReceiveConfListener([this] (JsonObject response) { - //fetch authorization status from StartTransaction.conf() for user notification + updateTxNotification(TxNotification::StartTx); - const char* idTagInfoStatus = response["idTagInfo"]["status"] | "_Undefined"; - if (strcmp(idTagInfoStatus, "Accepted")) { - updateTxNotification(TxNotification::DeAuthorized); - } - }); - context.initiateRequest(std::move(startTx)); + //fetchFrontRequest will create the StartTransaction and pass it to the message sender return; } - } else { + } else { //stop tx? if (!transaction->isActive() && @@ -337,20 +382,6 @@ void Connector::loop() { //stop transaction MO_DBG_INFO("Session mngt: trigger StopTransaction"); - - if (transaction->isSilent()) { - MO_DBG_INFO("silent Transaction: omit StopTx"); - updateTxNotification(TxNotification::StopTx); - transaction->getStopSync().setRequested(); - transaction->getStopSync().confirm(); - if (auto mService = model.getMeteringService()) { - mService->removeTxMeterData(connectorId, transaction->getTxNr()); - } - model.getTransactionStore()->remove(connectorId, transaction->getTxNr()); - model.getTransactionStore()->setTxEnd(connectorId, transaction->getTxNr()); - transaction = nullptr; - return; - } auto meteringService = model.getMeteringService(); if (transaction->getMeterStop() < 0 && meteringService) { @@ -367,25 +398,24 @@ void Connector::loop() { transaction->setStopBootNr(model.getBootNr()); } - transaction->commit(); + transaction->getStopSync().setRequested(); + transaction->getStopSync().setOpNr(context.getRequestQueue().getNextOpNr()); - updateTxNotification(TxNotification::StopTx); - - std::shared_ptr stopTxData; - - if (meteringService) { - stopTxData = meteringService->endTxMeterData(transaction.get()); + if (transaction->isSilent()) { + MO_DBG_INFO("silent Transaction: omit StopTx"); + transaction->getStopSync().confirm(); + } else { + //normal transaction, record txMeterData + if (model.getMeteringService()) { + model.getMeteringService()->endTxMeterData(transaction.get()); + } } - std::unique_ptr stopTx; + transaction->commit(); - if (stopTxData) { - stopTx = makeRequest(new Ocpp16::StopTransaction(model, std::move(transaction), stopTxData->retrieveStopTxData())); - } else { - stopTx = makeRequest(new Ocpp16::StopTransaction(model, std::move(transaction))); - } - stopTx->setTimeout(0); - context.initiateRequest(std::move(stopTx)); + updateTxNotification(TxNotification::StopTx); + + //fetchFrontRequest will create the StopTransaction and pass it to the message sender return; } } @@ -516,14 +546,14 @@ const char *Connector::getErrorCode() { std::shared_ptr Connector::allocateTransaction() { - decltype(allocateTransaction()) tx; + std::shared_ptr tx; - //clean possible aorted tx - auto txr = model.getTransactionStore()->getTxEnd(connectorId); - auto txsize = model.getTransactionStore()->size(connectorId); - for (decltype(txsize) i = 0; i < txsize; i++) { + //clean possible aborted tx + unsigned int txr = txNrBack; + unsigned int txSize = (txNrBack + MAX_TX_CNT - txNrBegin) % MAX_TX_CNT; + for (unsigned int i = 0; i < txSize; i++) { txr = (txr + MAX_TX_CNT - 1) % MAX_TX_CNT; //decrement by 1 - + auto tx = model.getTransactionStore()->getTransaction(connectorId, txr); //check if dangling silent tx, aborted tx, or corrupted entry (tx == null) if (!tx || tx->isSilent() || (tx->isAborted() && MO_TX_CLEAN_ABORTED)) { @@ -536,7 +566,10 @@ std::shared_ptr Connector::allocateTransaction() { removed &= model.getTransactionStore()->remove(connectorId, txr); } if (removed) { - model.getTransactionStore()->setTxEnd(connectorId, txr); + if (txNrFront == txNrBack) { + txNrFront = txr; + } + txNrBack = txr; MO_DBG_WARN("deleted dangling silent or aborted tx for new transaction"); } else { MO_DBG_ERR("memory corruption"); @@ -548,16 +581,20 @@ std::shared_ptr Connector::allocateTransaction() { } } + txSize = (txNrBack + MAX_TX_CNT - txNrBegin) % MAX_TX_CNT; //refresh after cleaning txs + //try to create new transaction - tx = model.getTransactionStore()->createTransaction(connectorId); + if (txSize < MO_TXRECORD_SIZE) { + tx = model.getTransactionStore()->createTransaction(connectorId, txNrBack); + } if (!tx) { //could not create transaction - now, try to replace tx history entry - auto txl = model.getTransactionStore()->getTxBegin(connectorId); - auto txsize = model.getTransactionStore()->size(connectorId); + unsigned int txl = txNrBegin; + txSize = (txNrBack + MAX_TX_CNT - txNrBegin) % MAX_TX_CNT; - for (decltype(txsize) i = 0; i < txsize; i++) { + for (unsigned int i = 0; i < txSize; i++) { if (tx) { //success, finished here @@ -568,7 +605,7 @@ std::shared_ptr Connector::allocateTransaction() { auto txhist = model.getTransactionStore()->getTransaction(connectorId, txl); //oldest entry, now check if it's history and can be removed or corrupted entry - if (!txhist || txhist->isCompleted() || txhist->isAborted()) { + if (!txhist || txhist->isCompleted() || txhist->isAborted() || (txhist->isSilent() && txhist->getStopSync().isRequested())) { //yes, remove bool removed = true; if (auto mService = model.getMeteringService()) { @@ -578,10 +615,14 @@ std::shared_ptr Connector::allocateTransaction() { removed &= model.getTransactionStore()->remove(connectorId, txl); } if (removed) { - model.getTransactionStore()->setTxBegin(connectorId, (txl + 1) % MAX_TX_CNT); + txNrBegin = (txl + 1) % MAX_TX_CNT; + if (txNrFront == txl) { + txNrFront = txNrBegin; + } MO_DBG_DEBUG("deleted tx history entry for new transaction"); + MO_DBG_VERBOSE("txNrBegin=%u, txNrFront=%u, txNrBack=%u", txNrBegin, txNrFront, txNrBack); - tx = model.getTransactionStore()->createTransaction(connectorId); + tx = model.getTransactionStore()->createTransaction(connectorId, txNrBack); } else { MO_DBG_ERR("memory corruption"); break; @@ -601,7 +642,7 @@ std::shared_ptr Connector::allocateTransaction() { //couldn't create normal transaction -> check if to start charging without real transaction if (silentOfflineTransactionsBool && silentOfflineTransactionsBool->getBool()) { //try to handle charging session without sending StartTx or StopTx to the server - tx = model.getTransactionStore()->createTransaction(connectorId, true); + tx = model.getTransactionStore()->createTransaction(connectorId, txNrBack, true); if (tx) { MO_DBG_DEBUG("created silent transaction"); @@ -609,6 +650,21 @@ std::shared_ptr Connector::allocateTransaction() { } } + if (tx) { + //clean meter data which could still be here from a rolled-back transaction + if (auto mService = model.getMeteringService()) { + if (!mService->removeTxMeterData(connectorId, tx->getTxNr())) { + MO_DBG_ERR("memory corruption"); + } + } + } + + if (tx) { + txNrBack = (txNrBack + 1) % MAX_TX_CNT; + MO_DBG_DEBUG("advance txNrBack %u-%u", connectorId, txNrBack); + MO_DBG_VERBOSE("txNrBegin=%u, txNrFront=%u, txNrBack=%u", txNrBegin, txNrFront, txNrBack); + } + return tx; } @@ -1019,3 +1075,197 @@ void Connector::updateTxNotification(TxNotification event) { txNotificationOutput(transaction.get(), event); } } + +unsigned int Connector::getFrontRequestOpNr() { + + /* + * Advance front transaction? + */ + + unsigned int txSize = (txNrBack + MAX_TX_CNT - txNrFront) % MAX_TX_CNT; + + if (transactionFront && txSize == 0) { + //catch edge case where txBack has been rolled back and txFront was equal to txBack + MO_DBG_DEBUG("collect front transaction %u-%u after tx rollback", connectorId, transactionFront->getTxNr()); + MO_DBG_VERBOSE("txNrBegin=%u, txNrFront=%u, txNrBack=%u", txNrBegin, txNrFront, txNrBack); + transactionFront = nullptr; + } + + for (unsigned int i = 0; i < txSize; i++) { + + if (!transactionFront) { + transactionFront = model.getTransactionStore()->getTransaction(connectorId, txNrFront); + + #if MO_DBG_LEVEL >= MO_DL_VERBOSE + if (transactionFront) + { + MO_DBG_VERBOSE("load front transaction %u-%u", connectorId, transactionFront->getTxNr()); + } + #endif + } + + if (transactionFront && (transactionFront->isAborted() || transactionFront->isCompleted() || transactionFront->isSilent())) { + //advance front + MO_DBG_DEBUG("collect front transaction %u-%u", connectorId, transactionFront->getTxNr()); + transactionFront = nullptr; + txNrFront = (txNrFront + 1) % MAX_TX_CNT; + MO_DBG_VERBOSE("txNrBegin=%u, txNrFront=%u, txNrBack=%u", txNrBegin, txNrFront, txNrBack); + } else { + //front is accurate. Done here + break; + } + } + + if (transactionFront) { + if (transactionFront->getStartSync().isRequested() && !transactionFront->getStartSync().isConfirmed()) { + return transactionFront->getStartSync().getOpNr(); + } + + if (transactionFront->getStopSync().isRequested() && !transactionFront->getStopSync().isConfirmed()) { + return transactionFront->getStopSync().getOpNr(); + } + } + + return NoOperation; +} + +std::unique_ptr Connector::fetchFrontRequest() { + + if (transactionFront && !transactionFront->isSilent()) { + if (transactionFront->getStartSync().isRequested() && !transactionFront->getStartSync().isConfirmed()) { + //send StartTx? + + bool cancelStartTx = false; + + if (transactionFront->getStartTimestamp() < MIN_TIME && + transactionFront->getStartBootNr() != model.getBootNr()) { + //time not set, cannot be restored anymore -> invalid tx + MO_DBG_ERR("cannot recover tx from previus run"); + + cancelStartTx = true; + } + + if ((int)transactionFront->getStartSync().getAttemptNr() >= transactionMessageAttemptsInt->getInt()) { + MO_DBG_WARN("exceeded TransactionMessageAttempts. Discard transaction"); + + cancelStartTx = true; + } + + if (cancelStartTx) { + transactionFront->setSilent(); + transactionFront->setInactive(); + transactionFront->commit(); + + //clean up possible tx records + if (auto mSerivce = model.getMeteringService()) { + mSerivce->removeTxMeterData(connectorId, transactionFront->getTxNr()); + } + //next getFrontRequestOpNr() call will collect transactionFront + return nullptr; + } + + Timestamp nextAttempt = transactionFront->getStartSync().getAttemptTime() + + transactionFront->getStartSync().getAttemptNr() * transactionMessageRetryIntervalInt->getInt(); + + if (nextAttempt > model.getClock().now()) { + return nullptr; + } + + transactionFront->getStartSync().advanceAttemptNr(); + transactionFront->getStartSync().setAttemptTime(model.getClock().now()); + transactionFront->commit(); + + auto startTx = makeRequest(new Ocpp16::StartTransaction(model, transactionFront)); + startTx->setOnReceiveConfListener([this] (JsonObject response) { + //fetch authorization status from StartTransaction.conf() for user notification + + const char* idTagInfoStatus = response["idTagInfo"]["status"] | "_Undefined"; + if (strcmp(idTagInfoStatus, "Accepted")) { + updateTxNotification(TxNotification::DeAuthorized); + } + }); + auto transactionFront_capture = transactionFront; + startTx->setOnAbortListener([this, transactionFront_capture] () { + //shortcut to the attemptNr check above. Relevant if other operations block the queue while this StartTx is timing out + if (transactionFront_capture && (int)transactionFront_capture->getStartSync().getAttemptNr() >= transactionMessageAttemptsInt->getInt()) { + MO_DBG_WARN("exceeded TransactionMessageAttempts. Discard transaction"); + + transactionFront_capture->setSilent(); + transactionFront_capture->setInactive(); + transactionFront_capture->commit(); + + //clean up possible tx records + if (auto mSerivce = model.getMeteringService()) { + mSerivce->removeTxMeterData(connectorId, transactionFront_capture->getTxNr()); + } + //next getFrontRequestOpNr() call will collect transactionFront + } + }); + + return startTx; + } + + if (transactionFront->getStopSync().isRequested() && !transactionFront->getStopSync().isConfirmed()) { + //send StopTx? + + if ((int)transactionFront->getStopSync().getAttemptNr() >= transactionMessageAttemptsInt->getInt()) { + MO_DBG_WARN("exceeded TransactionMessageAttempts. Discard transaction"); + + transactionFront->setSilent(); + + //clean up possible tx records + if (auto mSerivce = model.getMeteringService()) { + mSerivce->removeTxMeterData(connectorId, transactionFront->getTxNr()); + } + //next getFrontRequestOpNr() call will collect transactionFront + return nullptr; + } + + Timestamp nextAttempt = transactionFront->getStopSync().getAttemptTime() + + transactionFront->getStopSync().getAttemptNr() * transactionMessageRetryIntervalInt->getInt(); + + if (nextAttempt > model.getClock().now()) { + return nullptr; + } + + transactionFront->getStopSync().advanceAttemptNr(); + transactionFront->getStopSync().setAttemptTime(model.getClock().now()); + transactionFront->commit(); + + std::shared_ptr stopTxData; + + if (auto meteringService = model.getMeteringService()) { + stopTxData = meteringService->getStopTxMeterData(transactionFront.get()); + } + + std::unique_ptr stopTx; + + if (stopTxData) { + stopTx = makeRequest(new Ocpp16::StopTransaction(model, transactionFront, stopTxData->retrieveStopTxData())); + } else { + stopTx = makeRequest(new Ocpp16::StopTransaction(model, transactionFront)); + } + auto transactionFront_capture = transactionFront; + stopTx->setOnAbortListener([this, transactionFront_capture] () { + //shortcut to the attemptNr check above. Relevant if other operations block the queue while this StopTx is timing out + if ((int)transactionFront_capture->getStopSync().getAttemptNr() >= transactionMessageAttemptsInt->getInt()) { + MO_DBG_WARN("exceeded TransactionMessageAttempts. Discard transaction"); + + transactionFront_capture->setSilent(); + transactionFront_capture->setInactive(); + transactionFront_capture->commit(); + + //clean up possible tx records + if (auto mSerivce = model.getMeteringService()) { + mSerivce->removeTxMeterData(connectorId, transactionFront_capture->getTxNr()); + } + //next getFrontRequestOpNr() call will collect transactionFront + } + }); + + return stopTx; + } + } + + return nullptr; +} diff --git a/src/MicroOcpp/Model/ConnectorBase/Connector.h b/src/MicroOcpp/Model/ConnectorBase/Connector.h index 08828f2d..637313e0 100644 --- a/src/MicroOcpp/Model/ConnectorBase/Connector.h +++ b/src/MicroOcpp/Model/ConnectorBase/Connector.h @@ -9,13 +9,21 @@ #include #include #include +#include #include +#include #include #include #include #include +#ifndef MO_TXRECORD_SIZE +#define MO_TXRECORD_SIZE 4 //no. of tx to hold on flash storage +#endif + +#define MAX_TX_CNT 100000U //upper limit of txNr (internal usage). Must be at least 2*MO_TXRECORD_SIZE+1 + #ifndef MO_REPORT_NOERROR #define MO_REPORT_NOERROR 0 #endif @@ -27,12 +35,13 @@ class Model; class Operation; class Transaction; -class Connector { +class Connector : public RequestEmitter { private: Context& context; Model& model; + std::shared_ptr filesystem; - const int connectorId; + const unsigned int connectorId; std::shared_ptr transaction; @@ -79,9 +88,18 @@ class Connector { std::shared_ptr txStartOnPowerPathClosedBool; // this postpones the tx start point to when evReadyInput becomes true + std::shared_ptr transactionMessageAttemptsInt; + std::shared_ptr transactionMessageRetryIntervalInt; + bool trackLoopExecute = false; //if loop has been executed once + + unsigned int txNrBegin = 0; //oldest (historical) transaction on flash. Has no function, but is useful for error diagnosis + unsigned int txNrFront = 0; //oldest transaction which is still queued to be sent to the server + unsigned int txNrBack = 0; //one position behind newest transaction + + std::shared_ptr transactionFront; public: - Connector(Context& context, int connectorId); + Connector(Context& context, std::shared_ptr filesystem, unsigned int connectorId); Connector(const Connector&) = delete; Connector(Connector&&) = delete; Connector& operator=(const Connector&) = delete; @@ -136,6 +154,9 @@ class Connector { void setTxNotificationOutput(std::function txNotificationOutput); void updateTxNotification(TxNotification event); + + unsigned int getFrontRequestOpNr() override; + std::unique_ptr fetchFrontRequest() override; }; } //end namespace MicroOcpp diff --git a/src/MicroOcpp/Model/Metering/MeterStore.cpp b/src/MicroOcpp/Model/Metering/MeterStore.cpp index f3ebe2bc..d5f7041a 100644 --- a/src/MicroOcpp/Model/Metering/MeterStore.cpp +++ b/src/MicroOcpp/Model/Metering/MeterStore.cpp @@ -137,12 +137,12 @@ bool TransactionMeterData::restore(MeterValueBuilder& mvBuilder) { txData.push_back(std::move(mv)); - mvCount = i; i++; + mvCount = i; misses = 0; } - MO_DBG_DEBUG("Restored %zu meter values", txData.size()); + MO_DBG_DEBUG("Restored %zu meter values from sd-%u-%u-0 to %u (exclusive)", txData.size(), connectorId, txNr, mvCount); return true; } @@ -210,7 +210,7 @@ std::shared_ptr MeterStore::getTxMeterData(MeterValueBuild txMeterData.push_back(tx); - MO_DBG_DEBUG("Added txNr %u, now holding %zu txs", txNr, txMeterData.size()); + MO_DBG_DEBUG("Added txNr %u, now holding %zu sds", txNr, txMeterData.size()); return tx; } diff --git a/src/MicroOcpp/Model/Metering/MeterStore.h b/src/MicroOcpp/Model/Metering/MeterStore.h index d0f5af74..3f14e47c 100644 --- a/src/MicroOcpp/Model/Metering/MeterStore.h +++ b/src/MicroOcpp/Model/Metering/MeterStore.h @@ -19,7 +19,7 @@ class TransactionMeterData { const unsigned int connectorId; //assignment to Transaction object const unsigned int txNr; //assignment to Transaction object - unsigned int mvCount = 0; //nr of saved meter values + unsigned int mvCount = 0; //nr of saved meter values, including gaps bool finalized = false; //if true, this is read-only std::shared_ptr filesystem; diff --git a/src/MicroOcpp/Model/Metering/MeteringConnector.cpp b/src/MicroOcpp/Model/Metering/MeteringConnector.cpp index 3e642f87..aa1f4dcc 100644 --- a/src/MicroOcpp/Model/Metering/MeteringConnector.cpp +++ b/src/MicroOcpp/Model/Metering/MeteringConnector.cpp @@ -24,14 +24,14 @@ MeteringConnector::MeteringConnector(Model& model, int connectorId, MeterStore& declareConfiguration("MeterValuesSampledDataMaxLength", 8, CONFIGURATION_VOLATILE, true); meterValueCacheSizeInt = declareConfiguration(MO_CONFIG_EXT_PREFIX "MeterValueCacheSize", 1); meterValueSampleIntervalInt = declareConfiguration("MeterValueSampleInterval", 60); - + auto stopTxnSampledDataString = declareConfiguration("StopTxnSampledData", ""); declareConfiguration("StopTxnSampledDataMaxLength", 8, CONFIGURATION_VOLATILE, true); - + auto meterValuesAlignedDataString = declareConfiguration("MeterValuesAlignedData", ""); declareConfiguration("MeterValuesAlignedDataMaxLength", 8, CONFIGURATION_VOLATILE, true); clockAlignedDataIntervalInt = declareConfiguration("ClockAlignedDataInterval", 0); - + auto stopTxnAlignedDataString = declareConfiguration("StopTxnAlignedData", ""); meterValuesInTxOnlyBool = declareConfiguration(MO_CONFIG_EXT_PREFIX "MeterValuesInTxOnly", true); @@ -66,12 +66,12 @@ std::unique_ptr MeteringConnector::loop() { if (transaction != model.getConnector(connectorId)->getTransaction()) { transaction = model.getConnector(connectorId)->getTransaction(); } - + if (transaction && transaction->isRunning() && !transaction->isSilent()) { //check during transaction if (!stopTxnData || stopTxnData->getTxNr() != transaction->getTxNr()) { - MO_DBG_WARN("reload stopTxnData"); + MO_DBG_WARN("reload stopTxnData, %s, for tx-%u-%u", stopTxnData ? "replace" : "first time", connectorId, transaction->getTxNr()); //reload (e.g. after power cut during transaction) stopTxnData = meterStore.getTxMeterData(*stopTxnSampledDataBuilder, transaction.get()); } @@ -108,9 +108,8 @@ std::unique_ptr MeteringConnector::loop() { stopTxnData->addTxData(std::move(alignedStopTx)); } } - } - + Timestamp midnightBase = Timestamp(2010,0,0,0,0,0); auto intervall = timestampNow - midnightBase; intervall %= 3600 * 24; @@ -143,7 +142,7 @@ std::unique_ptr MeteringConnector::loop() { } } lastSampleTime = mocpp_tick_ms(); - } + } } if (clockAlignedDataIntervalInt->getInt() < 1 && meterValueSampleIntervalInt->getInt() < 1) { @@ -216,6 +215,10 @@ std::shared_ptr MeteringConnector::endTxMeterData(Transact return std::move(stopTxnData); } +void MeteringConnector::abortTxMeterData() { + stopTxnData.reset(); +} + std::shared_ptr MeteringConnector::getStopTxMeterData(Transaction *transaction) { auto txData = meterStore.getTxMeterData(*stopTxnSampledDataBuilder, transaction); diff --git a/src/MicroOcpp/Model/Metering/MeteringConnector.h b/src/MicroOcpp/Model/Metering/MeteringConnector.h index f5f304da..c40075f5 100644 --- a/src/MicroOcpp/Model/Metering/MeteringConnector.h +++ b/src/MicroOcpp/Model/Metering/MeteringConnector.h @@ -70,6 +70,8 @@ class MeteringConnector { std::shared_ptr endTxMeterData(Transaction *transaction); + void abortTxMeterData(); + std::shared_ptr getStopTxMeterData(Transaction *transaction); bool existsSampler(const char *measurand, size_t len); diff --git a/src/MicroOcpp/Model/Metering/MeteringService.cpp b/src/MicroOcpp/Model/Metering/MeteringService.cpp index af8700f6..bff296d1 100644 --- a/src/MicroOcpp/Model/Metering/MeteringService.cpp +++ b/src/MicroOcpp/Model/Metering/MeteringService.cpp @@ -160,6 +160,16 @@ std::shared_ptr MeteringService::endTxMeterData(Transactio return connector->endTxMeterData(transaction); } +void MeteringService::abortTxMeterData(unsigned int connectorId) { + if (connectorId >= connectors.size()) { + MO_DBG_ERR("connectorId is out of bounds"); + return; + } + auto& connector = connectors[connectorId]; + + connector->abortTxMeterData(); +} + std::shared_ptr MeteringService::getStopTxMeterData(Transaction *transaction) { if (!transaction) { MO_DBG_ERR("invalid argument"); diff --git a/src/MicroOcpp/Model/Metering/MeteringService.h b/src/MicroOcpp/Model/Metering/MeteringService.h index 5249a3e3..76f8038d 100644 --- a/src/MicroOcpp/Model/Metering/MeteringService.h +++ b/src/MicroOcpp/Model/Metering/MeteringService.h @@ -40,6 +40,8 @@ class MeteringService { std::shared_ptr endTxMeterData(Transaction *transaction); //use return value to keep data in cache + void abortTxMeterData(unsigned int connectorId); //call this to free resources if txMeterData record is not ended normally. Does not remove files + std::shared_ptr getStopTxMeterData(Transaction *transaction); //prefer endTxMeterData when possible bool removeTxMeterData(unsigned int connectorId, unsigned int txNr); diff --git a/src/MicroOcpp/Model/Transactions/Transaction.h b/src/MicroOcpp/Model/Transactions/Transaction.h index 74997059..4413102a 100644 --- a/src/MicroOcpp/Model/Transactions/Transaction.h +++ b/src/MicroOcpp/Model/Transactions/Transaction.h @@ -26,11 +26,22 @@ class SendStatus { private: bool requested = false; bool confirmed = false; + + unsigned int opNr = 0; + unsigned int attemptNr = 0; + Timestamp attemptTime = MIN_TIME; public: void setRequested() {this->requested = true;} bool isRequested() {return requested;} void confirm() {confirmed = true;} bool isConfirmed() {return confirmed;} + void setOpNr(unsigned int opNr) {this->opNr = opNr;} + unsigned int getOpNr() {return opNr;} + void advanceAttemptNr() {attemptNr++;} + void setAttemptNr(unsigned int attemptNr) {this->attemptNr = attemptNr;} + unsigned int getAttemptNr() {return attemptNr;} + const Timestamp& getAttemptTime() {return attemptTime;} + void setAttemptTime(const Timestamp& timestamp) {attemptTime = timestamp;} }; class Transaction { diff --git a/src/MicroOcpp/Model/Transactions/TransactionDeserialize.cpp b/src/MicroOcpp/Model/Transactions/TransactionDeserialize.cpp index bb951f82..eb3ff237 100644 --- a/src/MicroOcpp/Model/Transactions/TransactionDeserialize.cpp +++ b/src/MicroOcpp/Model/Transactions/TransactionDeserialize.cpp @@ -16,6 +16,15 @@ bool serializeSendStatus(SendStatus& status, JsonObject out) { if (status.isConfirmed()) { out["confirmed"] = true; } + out["opNr"] = status.getOpNr(); + if (status.getAttemptNr() != 0) { + out["attemptNr"] = status.getAttemptNr(); + } + if (status.getAttemptTime() > MIN_TIME) { + char attemptTime [JSONDATE_LENGTH + 1]; + status.getAttemptTime().toJsonString(attemptTime, sizeof(attemptTime)); + out["attemptTime"] = attemptTime; + } return true; } @@ -26,6 +35,19 @@ bool deserializeSendStatus(SendStatus& status, JsonObject in) { if (in["confirmed"] | false) { status.confirm(); } + unsigned int opNr = in["opNr"] | (unsigned int)0; + if (opNr >= 10) { //10 is first valid tx-related opNr + status.setOpNr(opNr); + } + status.setAttemptNr(in["attemptNr"] | (unsigned int)0); + if (in.containsKey("attemptTime")) { + Timestamp attemptTime; + if (!attemptTime.setTime(in["attemptTime"] | "_Invalid")) { + MO_DBG_ERR("deserialization error"); + return false; + } + status.setAttemptTime(attemptTime); + } return true; } diff --git a/src/MicroOcpp/Model/Transactions/TransactionService.cpp b/src/MicroOcpp/Model/Transactions/TransactionService.cpp index 41c5d0e8..163e252a 100644 --- a/src/MicroOcpp/Model/Transactions/TransactionService.cpp +++ b/src/MicroOcpp/Model/Transactions/TransactionService.cpp @@ -327,6 +327,12 @@ void TransactionService::Evse::loop() { auto txEventRequest = makeRequest(new Ocpp201::TransactionEvent(context.getModel(), txEvent)); txEventRequest->setTimeout(0); context.initiateRequest(std::move(txEventRequest)); + + if (txEvent->eventType == TransactionEventData::Type::Started) { + transaction->started = true; + } else if (txEvent->eventType == TransactionEventData::Type::Ended) { + transaction->stopped = true; + } } } diff --git a/src/MicroOcpp/Model/Transactions/TransactionStore.cpp b/src/MicroOcpp/Model/Transactions/TransactionStore.cpp index 7be7ea79..ccb48333 100644 --- a/src/MicroOcpp/Model/Transactions/TransactionStore.cpp +++ b/src/MicroOcpp/Model/Transactions/TransactionStore.cpp @@ -3,39 +3,21 @@ // MIT License #include -#include #include -#include -#include -#include #include #include -#include - using namespace MicroOcpp; -#define MO_TXSTORE_META_FN MO_FILENAME_PREFIX "txstore.jsn" - ConnectorTransactionStore::ConnectorTransactionStore(TransactionStore& context, unsigned int connectorId, std::shared_ptr filesystem) : context(context), connectorId(connectorId), filesystem(filesystem) { - snprintf(txBeginKey, sizeof(txBeginKey), MO_TXSTORE_TXBEGIN_KEY "%u", connectorId); - txBeginInt = declareConfiguration(txBeginKey, 0, MO_TXSTORE_META_FN, false, false, false); - - snprintf(txEndKey, sizeof(txEndKey), MO_TXSTORE_TXEND_KEY "%u", connectorId); - txEndInt = declareConfiguration(txEndKey, 0, MO_TXSTORE_META_FN, false, false, false); } ConnectorTransactionStore::~ConnectorTransactionStore() { - if (txBeginInt->getKey() == txBeginKey) { - txBeginInt->setKey(nullptr); - } - if (txEndInt->getKey() == txEndKey) { - txEndInt->setKey(nullptr); - } + } std::shared_ptr ConnectorTransactionStore::getTransaction(unsigned int txNr) { @@ -100,38 +82,23 @@ std::shared_ptr ConnectorTransactionStore::getTransaction(unsigned } //before adding new entry, clean cache - transactions.erase(std::remove_if(transactions.begin(), transactions.end(), - [](std::weak_ptr tx) { - return tx.expired(); - }), - transactions.end()); + cached = transactions.begin(); + while (cached != transactions.end()) { + if (cached->expired()) { + //collect outdated cache reference + cached = transactions.erase(cached); + } else { + cached++; + } + } transactions.push_back(transaction); return transaction; } -std::shared_ptr ConnectorTransactionStore::createTransaction(bool silent) { - - if (!txBeginInt || txBeginInt->getInt() < 0 || !txEndInt || txEndInt->getInt() < 0) { - MO_DBG_ERR("memory corruption"); - return nullptr; - } - - //check if maximum number of queued tx already reached - if ((txEndInt->getInt() + MAX_TX_CNT - txBeginInt->getInt()) % MAX_TX_CNT >= MO_TXRECORD_SIZE) { - //limit reached +std::shared_ptr ConnectorTransactionStore::createTransaction(unsigned int txNr, bool silent) { - if (!silent) { - //normal tx -> abort - return nullptr; - } - //special case: silent tx -> create tx anyway, but should be deleted immediately after charging session - } - - auto transaction = std::make_shared(*this, connectorId, (unsigned int) txEndInt->getInt(), silent); - - txEndInt->setInt((txEndInt->getInt() + 1) % MAX_TX_CNT); - configuration_save(); + auto transaction = std::make_shared(*this, connectorId, txNr, silent); if (!commit(transaction.get())) { MO_DBG_ERR("FS error"); @@ -139,27 +106,20 @@ std::shared_ptr ConnectorTransactionStore::createTransaction(bool s } //before adding new entry, clean cache - transactions.erase(std::remove_if(transactions.begin(), transactions.end(), - [](std::weak_ptr tx) { - return tx.expired(); - }), - transactions.end()); + auto cached = transactions.begin(); + while (cached != transactions.end()) { + if (cached->expired()) { + //collect outdated cache reference + cached = transactions.erase(cached); + } else { + cached++; + } + } transactions.push_back(transaction); return transaction; } -std::shared_ptr ConnectorTransactionStore::getLatestTransaction() { - if (!txEndInt || txEndInt->getInt() < 0) { - MO_DBG_ERR("memory corruption"); - return nullptr; - } - - unsigned int latest = ((unsigned int) txEndInt->getInt() + MAX_TX_CNT - 1) % MAX_TX_CNT; - - return getTransaction(latest); -} - bool ConnectorTransactionStore::commit(Transaction *transaction) { if (!filesystem) { @@ -214,69 +174,12 @@ bool ConnectorTransactionStore::remove(unsigned int txNr) { return filesystem->remove(fn); } -int ConnectorTransactionStore::getTxBegin() { - if (!txBeginInt || txBeginInt->getInt() < 0) { - MO_DBG_ERR("memory corruption"); - return -1; - } - - return txBeginInt->getInt(); -} - -int ConnectorTransactionStore::getTxEnd() { - if (!txEndInt || txEndInt->getInt() < 0) { - MO_DBG_ERR("memory corruption"); - return -1; - } - - return txEndInt->getInt(); -} - -void ConnectorTransactionStore::setTxBegin(unsigned int txNr) { - if (!txBeginInt || txBeginInt->getInt() < 0) { - MO_DBG_ERR("memory corruption"); - return; - } - - txBeginInt->setInt(txNr); - configuration_save(); -} - -void ConnectorTransactionStore::setTxEnd(unsigned int txNr) { - if (!txBeginInt || txBeginInt->getInt() < 0 || !txEndInt || txEndInt->getInt() < 0) { - MO_DBG_ERR("memory corruption"); - return; - } - - txEndInt->setInt(txNr); - configuration_save(); -} - -unsigned int ConnectorTransactionStore::size() { - if (!txBeginInt || txBeginInt->getInt() < 0 || !txEndInt || txEndInt->getInt() < 0) { - MO_DBG_ERR("memory corruption"); - return 0; - } - - return (txEndInt->getInt() + MAX_TX_CNT - txBeginInt->getInt()) % MAX_TX_CNT; -} - TransactionStore::TransactionStore(unsigned int nConnectors, std::shared_ptr filesystem) { for (unsigned int i = 0; i < nConnectors; i++) { connectors.push_back(std::unique_ptr( new ConnectorTransactionStore(*this, i, filesystem))); } - - configuration_load(MO_TXSTORE_META_FN); -} - -std::shared_ptr TransactionStore::getLatestTransaction(unsigned int connectorId) { - if (connectorId >= connectors.size()) { - MO_DBG_ERR("Invalid connectorId"); - return nullptr; - } - return connectors[connectorId]->getLatestTransaction(); } bool TransactionStore::commit(Transaction *transaction) { @@ -300,12 +203,12 @@ std::shared_ptr TransactionStore::getTransaction(unsigned int conne return connectors[connectorId]->getTransaction(txNr); } -std::shared_ptr TransactionStore::createTransaction(unsigned int connectorId, bool silent) { +std::shared_ptr TransactionStore::createTransaction(unsigned int connectorId, unsigned int txNr, bool silent) { if (connectorId >= connectors.size()) { MO_DBG_ERR("Invalid connectorId"); return nullptr; } - return connectors[connectorId]->createTransaction(silent); + return connectors[connectorId]->createTransaction(txNr, silent); } bool TransactionStore::remove(unsigned int connectorId, unsigned int txNr) { @@ -315,43 +218,3 @@ bool TransactionStore::remove(unsigned int connectorId, unsigned int txNr) { } return connectors[connectorId]->remove(txNr); } - -int TransactionStore::getTxBegin(unsigned int connectorId) { - if (connectorId >= connectors.size()) { - MO_DBG_ERR("Invalid connectorId"); - return -1; - } - return connectors[connectorId]->getTxBegin(); -} - -int TransactionStore::getTxEnd(unsigned int connectorId) { - if (connectorId >= connectors.size()) { - MO_DBG_ERR("Invalid connectorId"); - return -1; - } - return connectors[connectorId]->getTxEnd(); -} - -void TransactionStore::setTxBegin(unsigned int connectorId, unsigned int txNr) { - if (connectorId >= connectors.size()) { - MO_DBG_ERR("Invalid connectorId"); - return; - } - return connectors[connectorId]->setTxBegin(txNr); -} - -void TransactionStore::setTxEnd(unsigned int connectorId, unsigned int txNr) { - if (connectorId >= connectors.size()) { - MO_DBG_ERR("Invalid connectorId"); - return; - } - return connectors[connectorId]->setTxEnd(txNr); -} - -unsigned int TransactionStore::size(unsigned int connectorId) { - if (connectorId >= connectors.size()) { - MO_DBG_ERR("Invalid connectorId"); - return 0; - } - return connectors[connectorId]->size(); -} diff --git a/src/MicroOcpp/Model/Transactions/TransactionStore.h b/src/MicroOcpp/Model/Transactions/TransactionStore.h index 100a2ab1..9f080dcd 100644 --- a/src/MicroOcpp/Model/Transactions/TransactionStore.h +++ b/src/MicroOcpp/Model/Transactions/TransactionStore.h @@ -5,19 +5,11 @@ #ifndef MO_TRANSACTIONSTORE_H #define MO_TRANSACTIONSTORE_H -#include -#include -#include +#include #include -#define MAX_TX_CNT 100000U - -#ifndef MO_TXRECORD_SIZE -#define MO_TXRECORD_SIZE 4 //no. of tx to hold on flash storage -#endif - -#define MO_TXSTORE_TXBEGIN_KEY "txBegin_" -#define MO_TXSTORE_TXEND_KEY "txEnd_" +#include +#include namespace MicroOcpp { @@ -29,11 +21,6 @@ class ConnectorTransactionStore { const unsigned int connectorId; std::shared_ptr filesystem; - std::shared_ptr txBeginInt; //if txNr < txBegin, tx has been safely deleted - char txBeginKey [sizeof(MO_TXSTORE_TXBEGIN_KEY "xxx") + 1]; //"xxx": placeholder for connectorId - - std::shared_ptr txEndInt; - char txEndKey [sizeof(MO_TXSTORE_TXEND_KEY "xxx") + 1]; std::deque> transactions; @@ -44,21 +31,13 @@ class ConnectorTransactionStore { ConnectorTransactionStore& operator=(const ConnectorTransactionStore&) = delete; ~ConnectorTransactionStore(); - - std::shared_ptr getLatestTransaction(); + bool commit(Transaction *transaction); std::shared_ptr getTransaction(unsigned int txNr); - std::shared_ptr createTransaction(bool silent = false); + std::shared_ptr createTransaction(unsigned int txNr, bool silent = false); bool remove(unsigned int txNr); - - int getTxBegin(); - int getTxEnd(); - void setTxBegin(unsigned int txNr); - void setTxEnd(unsigned int txNr); - - unsigned int size(); }; class TransactionStore { @@ -67,20 +46,12 @@ class TransactionStore { public: TransactionStore(unsigned int nConnectors, std::shared_ptr filesystem); - std::shared_ptr getLatestTransaction(unsigned int connectorId); bool commit(Transaction *transaction); std::shared_ptr getTransaction(unsigned int connectorId, unsigned int txNr); - std::shared_ptr createTransaction(unsigned int connectorId, bool silent = false); + std::shared_ptr createTransaction(unsigned int connectorId, unsigned int txNr, bool silent = false); bool remove(unsigned int connectorId, unsigned int txNr); - - int getTxBegin(unsigned int connectorId); - int getTxEnd(unsigned int connectorId); - void setTxBegin(unsigned int connectorId, unsigned int txNr); - void setTxEnd(unsigned int connectorId, unsigned int txNr); - - unsigned int size(unsigned int connectorId); }; } diff --git a/src/MicroOcpp/Operations/CustomOperation.cpp b/src/MicroOcpp/Operations/CustomOperation.cpp index 6f035e62..ddd5ab32 100644 --- a/src/MicroOcpp/Operations/CustomOperation.cpp +++ b/src/MicroOcpp/Operations/CustomOperation.cpp @@ -9,12 +9,8 @@ using MicroOcpp::Ocpp16::CustomOperation; CustomOperation::CustomOperation(const char *operationType, std::function ()> fn_createReq, std::function fn_processConf, - std::function fn_processErr, - std::function fn_initiate, - std::function fn_restore) : + std::function fn_processErr) : operationType{operationType}, - fn_initiate{fn_initiate}, - fn_restore{fn_restore}, fn_createReq{fn_createReq}, fn_processConf{fn_processConf}, fn_processErr{fn_processErr} { @@ -44,20 +40,6 @@ const char* CustomOperation::getOperationType() { return operationType.c_str(); } -void CustomOperation::initiate(StoredOperationHandler *opStore) { - if (fn_initiate) { - fn_initiate(opStore); - } -} - -bool CustomOperation::restore(StoredOperationHandler *opStore) { - if (fn_restore) { - return fn_restore(opStore); - } else { - return false; - } -} - std::unique_ptr CustomOperation::createReq() { return fn_createReq(); } diff --git a/src/MicroOcpp/Operations/CustomOperation.h b/src/MicroOcpp/Operations/CustomOperation.h index 334aa98e..88193fc8 100644 --- a/src/MicroOcpp/Operations/CustomOperation.h +++ b/src/MicroOcpp/Operations/CustomOperation.h @@ -16,8 +16,6 @@ namespace Ocpp16 { class CustomOperation : public Operation { private: std::string operationType; - std::function fn_initiate; //optional - std::function fn_restore; //optional std::function ()> fn_createReq; std::function fn_processConf; std::function fn_processErr; //optional @@ -32,9 +30,7 @@ class CustomOperation : public Operation { CustomOperation(const char *operationType, std::function ()> fn_createReq, std::function fn_processConf, - std::function fn_processErr = nullptr, - std::function fn_initiate = nullptr, - std::function fn_restore = nullptr); + std::function fn_processErr = nullptr); //for operations receied from remote CustomOperation(const char *operationType, @@ -48,10 +44,6 @@ class CustomOperation : public Operation { const char* getOperationType() override; - void initiate(StoredOperationHandler *opStore) override; - - bool restore(StoredOperationHandler *opStore) override; - std::unique_ptr createReq() override; void processConf(JsonObject payload) override; diff --git a/src/MicroOcpp/Operations/StartTransaction.cpp b/src/MicroOcpp/Operations/StartTransaction.cpp index c9378996..ccd5d89e 100644 --- a/src/MicroOcpp/Operations/StartTransaction.cpp +++ b/src/MicroOcpp/Operations/StartTransaction.cpp @@ -4,7 +4,6 @@ #include #include -#include #include #include #include @@ -27,84 +26,6 @@ const char* StartTransaction::getOperationType() { return "StartTransaction"; } -void StartTransaction::initiate(StoredOperationHandler *opStore) { - if (!transaction || transaction->getStartSync().isRequested()) { - MO_DBG_ERR("initialization error"); - return; - } - - auto payload = std::unique_ptr(new DynamicJsonDocument(JSON_OBJECT_SIZE(2))); - (*payload)["connectorId"] = transaction->getConnectorId(); - (*payload)["txNr"] = transaction->getTxNr(); - - if (opStore) { - opStore->setPayload(std::move(payload)); - opStore->commit(); - } - - transaction->getStartSync().setRequested(); - - transaction->commit(); - - MO_DBG_INFO("StartTransaction initiated"); -} - -bool StartTransaction::restore(StoredOperationHandler *opStore) { - if (!opStore) { - MO_DBG_ERR("invalid argument"); - return false; - } - - auto payload = opStore->getPayload(); - if (!payload) { - MO_DBG_ERR("memory corruption"); - return false; - } - - int connectorId = (*payload)["connectorId"] | -1; - int txNr = (*payload)["txNr"] | -1; - if (connectorId < 0 || txNr < 0) { - MO_DBG_ERR("record incomplete"); - return false; - } - - auto txStore = model.getTransactionStore(); - - if (!txStore) { - MO_DBG_ERR("invalid state"); - return false; - } - - transaction = txStore->getTransaction(connectorId, txNr); - if (!transaction) { - MO_DBG_ERR("referential integrity violation"); - - //clean up possible tx records - if (auto mSerivce = model.getMeteringService()) { - mSerivce->removeTxMeterData(connectorId, txNr); - } - return false; - } - - if (transaction->getStartTimestamp() < MIN_TIME && - transaction->getStartBootNr() != model.getBootNr()) { - //time not set, cannot be restored anymore -> invalid tx - MO_DBG_ERR("cannot recover tx from previus run"); - - //clean up possible tx records - if (auto mSerivce = model.getMeteringService()) { - mSerivce->removeTxMeterData(connectorId, txNr); - } - - transaction->setSilent(); - transaction->setInactive(); - transaction->commit(); - return false; - } - - return true; -} - std::unique_ptr StartTransaction::createReq() { auto doc = std::unique_ptr(new DynamicJsonDocument( diff --git a/src/MicroOcpp/Operations/StartTransaction.h b/src/MicroOcpp/Operations/StartTransaction.h index e6a2f0f3..e01d0461 100644 --- a/src/MicroOcpp/Operations/StartTransaction.h +++ b/src/MicroOcpp/Operations/StartTransaction.h @@ -29,10 +29,6 @@ class StartTransaction : public Operation { const char* getOperationType() override; - void initiate(StoredOperationHandler *opStore) override; - - bool restore(StoredOperationHandler *opStore) override; - std::unique_ptr createReq() override; void processConf(JsonObject payload) override; diff --git a/src/MicroOcpp/Operations/StopTransaction.cpp b/src/MicroOcpp/Operations/StopTransaction.cpp index e2df5442..49b9b029 100644 --- a/src/MicroOcpp/Operations/StopTransaction.cpp +++ b/src/MicroOcpp/Operations/StopTransaction.cpp @@ -4,7 +4,6 @@ #include #include -#include #include #include #include @@ -29,85 +28,6 @@ const char* StopTransaction::getOperationType() { return "StopTransaction"; } -void StopTransaction::initiate(StoredOperationHandler *opStore) { - if (!transaction || transaction->getStopSync().isRequested()) { - MO_DBG_ERR("initialization error"); - return; - } - - auto payload = std::unique_ptr(new DynamicJsonDocument(JSON_OBJECT_SIZE(2))); - (*payload)["connectorId"] = transaction->getConnectorId(); - (*payload)["txNr"] = transaction->getTxNr(); - - if (opStore) { - opStore->setPayload(std::move(payload)); - opStore->commit(); - } - - transaction->getStopSync().setRequested(); - - transaction->commit(); - - MO_DBG_INFO("StopTransaction initiated"); -} - -bool StopTransaction::restore(StoredOperationHandler *opStore) { - if (!opStore) { - MO_DBG_ERR("invalid argument"); - return false; - } - - auto payload = opStore->getPayload(); - if (!payload) { - MO_DBG_ERR("memory corruption"); - return false; - } - - int connectorId = (*payload)["connectorId"] | -1; - int txNr = (*payload)["txNr"] | -1; - if (connectorId < 0 || txNr < 0) { - MO_DBG_ERR("record incomplete"); - return false; - } - - auto txStore = model.getTransactionStore(); - - if (!txStore) { - MO_DBG_ERR("invalid state"); - return false; - } - - transaction = txStore->getTransaction(connectorId, txNr); - if (!transaction) { - MO_DBG_ERR("referential integrity violation"); - - //clean up possible tx records - if (auto mSerivce = model.getMeteringService()) { - mSerivce->removeTxMeterData(connectorId, txNr); - } - return false; - } - - if (transaction->isSilent()) { - //transaction has been set silent after initializing StopTx - discard operation record - MO_DBG_WARN("tx has been set silent - discard StopTx"); - - //clean up possible tx records - if (auto mSerivce = model.getMeteringService()) { - mSerivce->removeTxMeterData(connectorId, txNr); - } - return false; - } - - if (auto mSerivce = model.getMeteringService()) { - if (auto txData = mSerivce->getStopTxMeterData(transaction.get())) { - transactionData = txData->retrieveStopTxData(); - } - } - - return true; -} - std::unique_ptr StopTransaction::createReq() { /* diff --git a/src/MicroOcpp/Operations/StopTransaction.h b/src/MicroOcpp/Operations/StopTransaction.h index 9d3bea4a..9f617e9d 100644 --- a/src/MicroOcpp/Operations/StopTransaction.h +++ b/src/MicroOcpp/Operations/StopTransaction.h @@ -34,10 +34,6 @@ class StopTransaction : public Operation { const char* getOperationType() override; - void initiate(StoredOperationHandler *opStore) override; - - bool restore(StoredOperationHandler *opStore) override; - std::unique_ptr createReq() override; void processConf(JsonObject payload) override; diff --git a/src/MicroOcpp/Operations/TransactionEvent.cpp b/src/MicroOcpp/Operations/TransactionEvent.cpp index 2ff45e13..92a7e686 100644 --- a/src/MicroOcpp/Operations/TransactionEvent.cpp +++ b/src/MicroOcpp/Operations/TransactionEvent.cpp @@ -23,37 +23,6 @@ const char* TransactionEvent::getOperationType() { return "TransactionEvent"; } -void TransactionEvent::initiate(StoredOperationHandler *opStore) { - if (!txEvent || !txEvent->transaction || !txEvent->transaction) { - MO_DBG_ERR("initialization error"); - return; - } - - auto transaction = txEvent->transaction; - - if (txEvent->eventType == TransactionEventData::Type::Started) { - if (transaction->started) { - MO_DBG_ERR("initialization error"); - return; - } - - transaction->started = true; - } - - if (txEvent->eventType == TransactionEventData::Type::Ended) { - if (transaction->stopped) { - MO_DBG_ERR("initialization error"); - return; - } - - transaction->stopped = true; - } - - //commit operation and tx - - MO_DBG_INFO("TransactionEvent initiated"); -} - std::unique_ptr TransactionEvent::createReq() { auto doc = std::unique_ptr(new DynamicJsonDocument( JSON_OBJECT_SIZE(12) + //total of 12 fields diff --git a/src/MicroOcpp/Operations/TransactionEvent.h b/src/MicroOcpp/Operations/TransactionEvent.h index ea8ef856..dcedb63e 100644 --- a/src/MicroOcpp/Operations/TransactionEvent.h +++ b/src/MicroOcpp/Operations/TransactionEvent.h @@ -31,8 +31,6 @@ class TransactionEvent : public Operation { const char* getOperationType() override; - void initiate(StoredOperationHandler *opStore) override; - std::unique_ptr createReq() override; void processConf(JsonObject payload) override; diff --git a/src/MicroOcpp/Operations/TriggerMessage.cpp b/src/MicroOcpp/Operations/TriggerMessage.cpp index 8177aecb..7c246e3d 100644 --- a/src/MicroOcpp/Operations/TriggerMessage.cpp +++ b/src/MicroOcpp/Operations/TriggerMessage.cpp @@ -35,11 +35,11 @@ void TriggerMessage::processReq(JsonObject payload) { if (connectorId < 0) { auto nConnectors = mService->getNumConnectors(); for (decltype(nConnectors) cId = 0; cId < nConnectors; cId++) { - context.initiatePreBootOperation(mService->takeTriggeredMeterValues(cId)); + context.getRequestQueue().sendRequestPreBoot(mService->takeTriggeredMeterValues(cId)); statusMessage = "Accepted"; } } else if (connectorId < mService->getNumConnectors()) { - context.initiatePreBootOperation(mService->takeTriggeredMeterValues(connectorId)); + context.getRequestQueue().sendRequestPreBoot(mService->takeTriggeredMeterValues(connectorId)); statusMessage = "Accepted"; } else { errorCode = "PropertyConstraintViolation"; @@ -66,13 +66,13 @@ void TriggerMessage::processReq(JsonObject payload) { statusNotification->setTimeout(60000); - context.initiatePreBootOperation(std::move(statusNotification)); + context.getRequestQueue().sendRequestPreBoot(std::move(statusNotification)); statusMessage = "Accepted"; } } else { auto msg = context.getOperationRegistry().deserializeOperation(requestedMessage); if (msg) { - context.initiatePreBootOperation(std::move(msg)); + context.getRequestQueue().sendRequestPreBoot(std::move(msg)); statusMessage = "Accepted"; } else { statusMessage = "NotImplemented"; diff --git a/tests/ChargingSessions.cpp b/tests/ChargingSessions.cpp index 6975fea6..bff8838e 100644 --- a/tests/ChargingSessions.cpp +++ b/tests/ChargingSessions.cpp @@ -803,5 +803,283 @@ TEST_CASE( "Charging sessions" ) { } + SECTION("TransactionMessageAttempts-/RetryInterval") { + + /* + * Scenarios: + * - final failure to send txMsg after tx terminated + * - normal communication restored after a final failure + * - StartTx fails finally during tx + * - StartTx works but StopTx fails finally after tx terminated + * - sends attempts fail until final attempt succeeds + * - after reboot, continue attempting + */ + + declareConfiguration("TransactionMessageAttempts", 1)->setInt(1); + + bool checkProcessedStartTx = false; + bool checkProcessedStopTx = false; + unsigned int txId = 1000; + + /* + * - final failure to send txMsg after tx terminated + */ + + getOcppContext()->getOperationRegistry().registerOperation("StartTransaction", [&checkProcessedStartTx, &txId] () { + return new Ocpp16::CustomOperation("StartTransaction", + [&checkProcessedStartTx] (JsonObject payload) { + //receive req + checkProcessedStartTx = true; + }, + [&txId] () { + //create conf + auto doc = std::unique_ptr(new DynamicJsonDocument(JSON_OBJECT_SIZE(1) + JSON_OBJECT_SIZE(2))); + JsonObject payload = doc->to(); + + JsonObject idTagInfo = payload.createNestedObject("idTagInfo"); + idTagInfo["status"] = "Accepted"; + payload["transactionId"] = txId++; + return doc; + });}); + + getOcppContext()->getOperationRegistry().registerOperation("StopTransaction", [&checkProcessedStopTx] () { + return new Ocpp16::CustomOperation("StopTransaction", + [&checkProcessedStopTx] (JsonObject payload) { + //receive req + checkProcessedStopTx = true; + }, + [] () { + //create conf + return createEmptyDocument(); + });}); + + loopback.setOnline(false); + + REQUIRE( !ocppPermitsCharge() ); + + beginTransaction_authorized("mIdTag"); + loop(); + REQUIRE( ocppPermitsCharge() ); + + endTransaction(); + loop(); + REQUIRE( !ocppPermitsCharge() ); + + mtime += 10 * 60 * 1000; //jump 10 minutes into future + + loopback.setOnline(true); + loop(); + + REQUIRE( !checkProcessedStartTx ); + REQUIRE( !checkProcessedStopTx ); + + /* + * - normal communication restored after a final failure + */ + checkProcessedStartTx = false; + checkProcessedStopTx = false; + + beginTransaction_authorized("mIdTag"); + loop(); + REQUIRE( ocppPermitsCharge() ); + REQUIRE( checkProcessedStartTx ); + + endTransaction(); + loop(); + REQUIRE( !ocppPermitsCharge() ); + + REQUIRE( checkProcessedStopTx ); + + /* + * - StartTx fails finally during tx + */ + + checkProcessedStartTx = false; + checkProcessedStopTx = false; + + loopback.setOnline(false); + + REQUIRE( !ocppPermitsCharge() ); + + beginTransaction_authorized("mIdTag"); + loop(); + REQUIRE( ocppPermitsCharge() ); + + mtime += 10 * 60 * 1000; //jump 10 minutes into future + loop(); + REQUIRE( !ocppPermitsCharge() ); + + loopback.setOnline(true); + loop(); + + REQUIRE( !checkProcessedStartTx ); + REQUIRE( !checkProcessedStopTx ); + + /* + * - StartTx works but StopTx fails finally after tx terminated + */ + + checkProcessedStartTx = false; + checkProcessedStopTx = false; + + beginTransaction_authorized("mIdTag"); + loop(); + REQUIRE( ocppPermitsCharge() ); + REQUIRE( checkProcessedStartTx ); + + loopback.setOnline(false); + + endTransaction(); + loop(); + mtime += 10 * 60 * 1000; //jump 10 minutes into future + + loopback.setOnline(true); + loop(); + REQUIRE( !checkProcessedStopTx ); + + /* + * - sends attempts fail until final attempt succeeds + */ + + const size_t NUM_ATTEMPTS = 3; + const int RETRY_INTERVAL_SECS = 3600; + + declareConfiguration("TransactionMessageAttempts", 0)->setInt(NUM_ATTEMPTS); + declareConfiguration("TransactionMessageRetryInterval", 0)->setInt(RETRY_INTERVAL_SECS); + + configuration_save(); + + checkProcessedStartTx = false; + checkProcessedStopTx = false; + + unsigned int attemptNr = 0; + + getOcppContext()->getOperationRegistry().registerOperation("StartTransaction", [&checkProcessedStartTx, &txId, &attemptNr] () { + return new Ocpp16::CustomOperation("StartTransaction", + [&attemptNr] (JsonObject payload) { + //receive req + attemptNr++; + }, + [&txId] () { + //create conf + auto doc = std::unique_ptr(new DynamicJsonDocument(JSON_OBJECT_SIZE(1) + JSON_OBJECT_SIZE(2))); + JsonObject payload = doc->to(); + + JsonObject idTagInfo = payload.createNestedObject("idTagInfo"); + idTagInfo["status"] = "Accepted"; + payload["transactionId"] = txId++; + return doc; + }, + [&attemptNr] () { + //ErrorCode for CALLERROR + return attemptNr < NUM_ATTEMPTS ? "InternalError" : (const char*)nullptr; + });}); + + beginTransaction_authorized("mIdTag"); + loop(); + REQUIRE( ocppPermitsCharge() ); + REQUIRE( attemptNr == 1 ); + + mtime += (unsigned long)RETRY_INTERVAL_SECS * 1000; + loop(); + REQUIRE( ocppPermitsCharge() ); + REQUIRE( attemptNr == 2 ); + + mtime += 2 * (unsigned long)RETRY_INTERVAL_SECS * 1000; + loop(); + REQUIRE( ocppPermitsCharge() ); + REQUIRE( attemptNr == 3 ); + + mtime += 100 * (unsigned long)RETRY_INTERVAL_SECS * 1000; + loop(); + REQUIRE( ocppPermitsCharge() ); + REQUIRE( attemptNr == 3 ); //no further retry after third and successful attempt + + endTransaction(); + loop(); + REQUIRE( !ocppPermitsCharge() ); + REQUIRE( attemptNr == 3 ); + REQUIRE( checkProcessedStopTx ); + + /* + * - after reboot, continue attempting + */ + + getOcppContext()->getModel().getClock().setTime(BASE_TIME); //reset system time to have roughly the same time after reboot + + checkProcessedStartTx = false; + checkProcessedStopTx = false; + attemptNr = 0; + + beginTransaction_authorized("mIdTag"); + loop(); + REQUIRE( ocppPermitsCharge() ); + REQUIRE( attemptNr == 1 ); + + mocpp_deinitialize(); + + mocpp_initialize(loopback, ChargerCredentials("test-runner1234")); + + getOcppContext()->getModel().getClock().setTime(BASE_TIME); + + getOcppContext()->getOperationRegistry().registerOperation("StartTransaction", [&checkProcessedStartTx, &txId, &attemptNr] () { + return new Ocpp16::CustomOperation("StartTransaction", + [&attemptNr] (JsonObject payload) { + //receive req + attemptNr++; + }, + [&txId] () { + //create conf + auto doc = std::unique_ptr(new DynamicJsonDocument(JSON_OBJECT_SIZE(1) + JSON_OBJECT_SIZE(2))); + JsonObject payload = doc->to(); + + JsonObject idTagInfo = payload.createNestedObject("idTagInfo"); + idTagInfo["status"] = "Accepted"; + payload["transactionId"] = txId++; + return doc; + }, + [&attemptNr] () { + //ErrorCode for CALLERROR + return attemptNr < NUM_ATTEMPTS ? "InternalError" : (const char*)nullptr; + });}); + + getOcppContext()->getOperationRegistry().registerOperation("StopTransaction", [&checkProcessedStopTx] () { + return new Ocpp16::CustomOperation("StopTransaction", + [&checkProcessedStopTx] (JsonObject payload) { + //receive req + checkProcessedStopTx = true; + }, + [] () { + //create conf + return createEmptyDocument(); + });}); + + loop(); + + REQUIRE( ocppPermitsCharge() ); + REQUIRE( attemptNr == 1 ); + + mtime += (unsigned long)RETRY_INTERVAL_SECS * 1000; + loop(); + REQUIRE( ocppPermitsCharge() ); + REQUIRE( attemptNr == 2 ); + + mtime += 2 * (unsigned long)RETRY_INTERVAL_SECS * 1000; + loop(); + REQUIRE( ocppPermitsCharge() ); + REQUIRE( attemptNr == 3 ); + + mtime += 100 * (unsigned long)RETRY_INTERVAL_SECS * 1000; + loop(); + REQUIRE( ocppPermitsCharge() ); + REQUIRE( attemptNr == 3 ); //no further retry after third and successful attempt + + endTransaction(); + loop(); + REQUIRE( !ocppPermitsCharge() ); + REQUIRE( attemptNr == 3 ); + REQUIRE( checkProcessedStopTx ); + } + mocpp_deinitialize(); } diff --git a/tests/ConfigurationBehavior.cpp b/tests/ConfigurationBehavior.cpp index 3b3ea50f..1d345c3d 100644 --- a/tests/ConfigurationBehavior.cpp +++ b/tests/ConfigurationBehavior.cpp @@ -112,12 +112,12 @@ TEST_CASE( "Configuration Behavior" ) { SECTION("set true") { configBool->setBool(true); - beginTransaction("mIdTag"); + beginTransaction("mIdTag_invalid"); loop(); REQUIRE(connector->getStatus() == ChargePointStatus_Available); - beginTransaction_authorized("mIdTag"); + beginTransaction_authorized("mIdTag_invalid2"); loop(); REQUIRE(connector->getStatus() == ChargePointStatus_Available); diff --git a/tests/Metering.cpp b/tests/Metering.cpp index cec57084..63d744ba 100644 --- a/tests/Metering.cpp +++ b/tests/Metering.cpp @@ -223,6 +223,8 @@ TEST_CASE("Metering") { auto StopTxnSampledDataString = declareConfiguration("StopTxnSampledData", "", CONFIGURATION_FN); StopTxnSampledDataString->setString("Energy.Active.Import.Register"); + configuration_save(); + loop(); model.getClock().setTime(BASE_TIME); @@ -244,6 +246,9 @@ TEST_CASE("Metering") { setOnReceiveRequest("StopTransaction", [base, &checkProcessed] (JsonObject payload) { checkProcessed = true; + + REQUIRE(payload["transactionData"].size() >= 2); + Timestamp t0, t1; t0.setTime(payload["transactionData"][0]["timestamp"] | ""); t1.setTime(payload["transactionData"][1]["timestamp"] | ""); diff --git a/tests/Transactions.cpp b/tests/Transactions.cpp index 8bd88d30..bf071e73 100644 --- a/tests/Transactions.cpp +++ b/tests/Transactions.cpp @@ -13,6 +13,7 @@ #include #include #include +#include #include "./catch2/catch.hpp" #include "./helpers/testHelper.h"