Skip to content

Commit 72b7311

Browse files
Fix the operation timeout is not honored for GetSchema requests (#383)
1 parent 6eb228e commit 72b7311

13 files changed

Lines changed: 140 additions & 29 deletions

lib/ClientConfiguration.cc

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
* specific language governing permissions and limitations
1717
* under the License.
1818
*/
19+
#include <chrono>
1920
#include <stdexcept>
2021

2122
#include "ClientConfigurationImpl.h"
@@ -61,11 +62,13 @@ Authentication& ClientConfiguration::getAuth() const { return *impl_->authentica
6162
const AuthenticationPtr& ClientConfiguration::getAuthPtr() const { return impl_->authenticationPtr; }
6263

6364
ClientConfiguration& ClientConfiguration::setOperationTimeoutSeconds(int timeout) {
64-
impl_->operationTimeoutSeconds = timeout;
65+
impl_->operationTimeout = std::chrono::seconds(timeout);
6566
return *this;
6667
}
6768

68-
int ClientConfiguration::getOperationTimeoutSeconds() const { return impl_->operationTimeoutSeconds; }
69+
int ClientConfiguration::getOperationTimeoutSeconds() const {
70+
return std::chrono::duration_cast<std::chrono::seconds>(impl_->operationTimeout).count();
71+
}
6972

7073
ClientConfiguration& ClientConfiguration::setIOThreads(int threads) {
7174
impl_->ioThreads = threads;

lib/ClientConfigurationImpl.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,16 @@
2121

2222
#include <pulsar/ClientConfiguration.h>
2323

24+
#include <chrono>
25+
2426
namespace pulsar {
2527

2628
struct ClientConfigurationImpl {
2729
AuthenticationPtr authenticationPtr{AuthFactory::Disabled()};
2830
uint64_t memoryLimit{0ull};
2931
int ioThreads{1};
3032
int connectionsPerBroker{1};
31-
int operationTimeoutSeconds{30};
33+
std::chrono::nanoseconds operationTimeout{30L * 1000 * 1000 * 1000};
3234
int messageListenerThreads{1};
3335
int concurrentLookupRequest{50000};
3436
int maxLookupRedirects{20};

lib/ClientConnection.cc

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
#include <fstream>
2525

2626
#include "AsioDefines.h"
27+
#include "ClientImpl.h"
2728
#include "Commands.h"
2829
#include "ConnectionPool.h"
2930
#include "ConsumerImpl.h"
@@ -163,7 +164,7 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std:
163164
const ClientConfiguration& clientConfiguration,
164165
const AuthenticationPtr& authentication, const std::string& clientVersion,
165166
ConnectionPool& pool, size_t poolIndex)
166-
: operationsTimeout_(std::chrono::seconds(clientConfiguration.getOperationTimeoutSeconds())),
167+
: operationsTimeout_(ClientImpl::getOperationTimeout(clientConfiguration)),
167168
authentication_(authentication),
168169
serverProtocolVersion_(proto::ProtocolVersion_MIN),
169170
executor_(executor),
@@ -1278,6 +1279,7 @@ void ClientConnection::close(Result result, bool detach) {
12781279
auto pendingConsumerStatsMap = std::move(pendingConsumerStatsMap_);
12791280
auto pendingGetLastMessageIdRequests = std::move(pendingGetLastMessageIdRequests_);
12801281
auto pendingGetNamespaceTopicsRequests = std::move(pendingGetNamespaceTopicsRequests_);
1282+
auto pendingGetSchemaRequests = std::move(pendingGetSchemaRequests_);
12811283

12821284
numOfPendingLookupRequest_ = 0;
12831285

@@ -1342,6 +1344,9 @@ void ClientConnection::close(Result result, bool detach) {
13421344
for (auto& kv : pendingGetNamespaceTopicsRequests) {
13431345
kv.second.setFailed(result);
13441346
}
1347+
for (auto& kv : pendingGetSchemaRequests) {
1348+
kv.second.promise.setFailed(result);
1349+
}
13451350
}
13461351

13471352
bool ClientConnection::isClosed() const { return state_ == Disconnected; }
@@ -1430,6 +1435,7 @@ Future<Result, NamespaceTopicsPtr> ClientConnection::newGetTopicsOfNamespace(
14301435
Future<Result, SchemaInfo> ClientConnection::newGetSchema(const std::string& topicName,
14311436
const std::string& version, uint64_t requestId) {
14321437
Lock lock(mutex_);
1438+
14331439
Promise<Result, SchemaInfo> promise;
14341440
if (isClosed()) {
14351441
lock.unlock();
@@ -1438,8 +1444,27 @@ Future<Result, SchemaInfo> ClientConnection::newGetSchema(const std::string& top
14381444
return promise.getFuture();
14391445
}
14401446

1441-
pendingGetSchemaRequests_.insert(std::make_pair(requestId, promise));
1447+
auto timer = executor_->createDeadlineTimer();
1448+
pendingGetSchemaRequests_.emplace(requestId, GetSchemaRequest{promise, timer});
14421449
lock.unlock();
1450+
1451+
auto weakSelf = weak_from_this();
1452+
timer->expires_from_now(operationsTimeout_);
1453+
timer->async_wait([this, weakSelf, requestId](const ASIO_ERROR& ec) {
1454+
auto self = weakSelf.lock();
1455+
if (!self) {
1456+
return;
1457+
}
1458+
Lock lock(mutex_);
1459+
auto it = pendingGetSchemaRequests_.find(requestId);
1460+
if (it != pendingGetSchemaRequests_.end()) {
1461+
auto promise = std::move(it->second.promise);
1462+
pendingGetSchemaRequests_.erase(it);
1463+
lock.unlock();
1464+
promise.setFailed(ResultTimeout);
1465+
}
1466+
});
1467+
14431468
sendCommand(Commands::newGetSchema(topicName, version, requestId));
14441469
return promise.getFuture();
14451470
}
@@ -1867,7 +1892,7 @@ void ClientConnection::handleGetSchemaResponse(const proto::CommandGetSchemaResp
18671892
Lock lock(mutex_);
18681893
auto it = pendingGetSchemaRequests_.find(response.request_id());
18691894
if (it != pendingGetSchemaRequests_.end()) {
1870-
Promise<Result, SchemaInfo> getSchemaPromise = it->second;
1895+
Promise<Result, SchemaInfo> getSchemaPromise = it->second.promise;
18711896
pendingGetSchemaRequests_.erase(it);
18721897
lock.unlock();
18731898

lib/ClientConnection.h

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
#include <functional>
4343
#include <memory>
4444
#include <string>
45+
#include <unordered_map>
4546
#include <vector>
4647

4748
#include "AsioTimer.h"
@@ -224,6 +225,11 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
224225
DeadlineTimerPtr timer;
225226
};
226227

228+
struct GetSchemaRequest {
229+
Promise<Result, SchemaInfo> promise;
230+
DeadlineTimerPtr timer;
231+
};
232+
227233
/*
228234
* handler for connectAsync
229235
* creates a ConnectionPtr which has a valid ClientConnection object
@@ -363,7 +369,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
363369
typedef std::map<long, Promise<Result, NamespaceTopicsPtr>> PendingGetNamespaceTopicsMap;
364370
PendingGetNamespaceTopicsMap pendingGetNamespaceTopicsRequests_;
365371

366-
typedef std::map<long, Promise<Result, SchemaInfo>> PendingGetSchemaMap;
372+
typedef std::unordered_map<uint64_t, GetSchemaRequest> PendingGetSchemaMap;
367373
PendingGetSchemaMap pendingGetSchemaRequests_;
368374

369375
mutable std::mutex mutex_;

lib/ClientImpl.cc

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#include <pulsar/ClientConfiguration.h>
2222
#include <pulsar/Version.h>
2323

24+
#include <chrono>
2425
#include <random>
2526
#include <sstream>
2627

@@ -109,7 +110,7 @@ ClientImpl::ClientImpl(const std::string& serviceUrl, const ClientConfiguration&
109110
}
110111

111112
lookupServicePtr_ = RetryableLookupService::create(
112-
underlyingLookupServicePtr, clientConfiguration_.getOperationTimeoutSeconds(), ioExecutorProvider_);
113+
underlyingLookupServicePtr, clientConfiguration_.impl_->operationTimeout, ioExecutorProvider_);
113114
}
114115

115116
ClientImpl::~ClientImpl() { shutdown(); }
@@ -768,4 +769,8 @@ std::string ClientImpl::getClientVersion(const ClientConfiguration& clientConfig
768769
return oss.str();
769770
}
770771

772+
std::chrono::nanoseconds ClientImpl::getOperationTimeout(const ClientConfiguration& clientConfiguration) {
773+
return clientConfiguration.impl_->operationTimeout;
774+
}
775+
771776
} /* namespace pulsar */

lib/ClientImpl.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,8 @@ class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
125125

126126
ConnectionPool& getConnectionPool() noexcept { return pool_; }
127127

128+
static std::chrono::nanoseconds getOperationTimeout(const ClientConfiguration& clientConfiguration);
129+
128130
friend class PulsarFriend;
129131

130132
private:

lib/Future.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ class Promise {
141141
Future<Result, Type> getFuture() const { return Future<Result, Type>{state_}; }
142142

143143
private:
144-
const InternalStatePtr<Result, Type> state_;
144+
InternalStatePtr<Result, Type> state_;
145145
};
146146

147147
} // namespace pulsar

lib/RetryableLookupService.h

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
*/
1919
#pragma once
2020

21+
#include <chrono>
22+
2123
#include "LookupDataResult.h"
2224
#include "LookupService.h"
2325
#include "NamespaceName.h"
@@ -81,15 +83,15 @@ class RetryableLookupService : public LookupService {
8183
RetryableOperationCachePtr<NamespaceTopicsPtr> namespaceLookupCache_;
8284
RetryableOperationCachePtr<SchemaInfo> getSchemaCache_;
8385

84-
RetryableLookupService(std::shared_ptr<LookupService> lookupService, int timeoutSeconds,
86+
RetryableLookupService(std::shared_ptr<LookupService> lookupService, TimeDuration timeout,
8587
ExecutorServiceProviderPtr executorProvider)
8688
: lookupService_(lookupService),
87-
lookupCache_(RetryableOperationCache<LookupResult>::create(executorProvider, timeoutSeconds)),
89+
lookupCache_(RetryableOperationCache<LookupResult>::create(executorProvider, timeout)),
8890
partitionLookupCache_(
89-
RetryableOperationCache<LookupDataResultPtr>::create(executorProvider, timeoutSeconds)),
91+
RetryableOperationCache<LookupDataResultPtr>::create(executorProvider, timeout)),
9092
namespaceLookupCache_(
91-
RetryableOperationCache<NamespaceTopicsPtr>::create(executorProvider, timeoutSeconds)),
92-
getSchemaCache_(RetryableOperationCache<SchemaInfo>::create(executorProvider, timeoutSeconds)) {}
93+
RetryableOperationCache<NamespaceTopicsPtr>::create(executorProvider, timeout)),
94+
getSchemaCache_(RetryableOperationCache<SchemaInfo>::create(executorProvider, timeout)) {}
9395
};
9496

9597
} // namespace pulsar

lib/RetryableOperation.h

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
#include <algorithm>
2424
#include <atomic>
25+
#include <chrono>
2526
#include <functional>
2627
#include <memory>
2728

@@ -40,11 +41,11 @@ class RetryableOperation : public std::enable_shared_from_this<RetryableOperatio
4041
explicit PassKey() {}
4142
};
4243

43-
RetryableOperation(const std::string& name, std::function<Future<Result, T>()>&& func, int timeoutSeconds,
44-
DeadlineTimerPtr timer)
44+
RetryableOperation(const std::string& name, std::function<Future<Result, T>()>&& func,
45+
TimeDuration timeout, DeadlineTimerPtr timer)
4546
: name_(name),
4647
func_(std::move(func)),
47-
timeout_(std::chrono::seconds(timeoutSeconds)),
48+
timeout_(timeout),
4849
backoff_(std::chrono::milliseconds(100), timeout_ + timeout_, std::chrono::milliseconds(0)),
4950
timer_(timer) {}
5051

lib/RetryableOperationCache.h

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
#pragma once
2020

21+
#include <chrono>
2122
#include <mutex>
2223
#include <unordered_map>
2324

@@ -40,8 +41,8 @@ class RetryableOperationCache : public std::enable_shared_from_this<RetryableOpe
4041
explicit PassKey() {}
4142
};
4243

43-
RetryableOperationCache(ExecutorServiceProviderPtr executorProvider, int timeoutSeconds)
44-
: executorProvider_(executorProvider), timeoutSeconds_(timeoutSeconds) {}
44+
RetryableOperationCache(ExecutorServiceProviderPtr executorProvider, TimeDuration timeout)
45+
: executorProvider_(executorProvider), timeout_(timeout) {}
4546

4647
using Self = RetryableOperationCache<T>;
4748

@@ -69,7 +70,7 @@ class RetryableOperationCache : public std::enable_shared_from_this<RetryableOpe
6970
return promise.getFuture();
7071
}
7172

72-
auto operation = RetryableOperation<T>::create(key, std::move(func), timeoutSeconds_, timer);
73+
auto operation = RetryableOperation<T>::create(key, std::move(func), timeout_, timer);
7374
auto future = operation->run();
7475
operations_[key] = operation;
7576
lock.unlock();
@@ -106,7 +107,7 @@ class RetryableOperationCache : public std::enable_shared_from_this<RetryableOpe
106107

107108
private:
108109
ExecutorServiceProviderPtr executorProvider_;
109-
const int timeoutSeconds_;
110+
const TimeDuration timeout_;
110111

111112
std::unordered_map<std::string, std::shared_ptr<RetryableOperation<T>>> operations_;
112113
mutable std::mutex mutex_;

0 commit comments

Comments
 (0)