Skip to content

Commit cc83de2

Browse files
committed
[fix] Remove closed producers and consumers from client
Fixes apache#55 ### Motivation 1. When a producer or consumer is closed, the reference is still stored in `ClientImpl`. If a client kept creating producers or consumers, the memory usage would not reduce. 2. When the `HandlerBase::connection_` field is modified, the `removeProducer` or `removeConsumer` method is not called. Then these producers and consumers will be cached in the connection until the connection is closed. ### Modifications In `ClientImpl`, use `SynchronizedHashMap` to store references of producers and consumers. The key is the pointer, the value is the `weak_ptr` objects. Then, add `cleanupProducer` and `cleanupConsumer` methods to remove a specific reference for a given pointer. To call these two methods, this PR does a refactoring that all `ClientImpl` references in producer or consumer are changed to a trivial reference `ClientImpl&` instead of a `weak_ptr`. It's because the lifetime of `ClientImpl` should be longer than them. Using `weak_ptr` is redundant and will increase many repeat codes like: ```c++ auto client = client_.lock(); if (client) { client->cleanupProducer(this); } ``` After that, unify the `shutdown` implementations for producers and consumers, which resets some states, cancels the timers, and unregister itself from both `ClientImpl` and `ClientConnection`. Add adding customized non-null callback in `closeAsync` and `unsubscribeAsync`. For `ClientConnection`, add a `beforeConnectionChange` virtual method to `HandlerBase`, which is called before `connection_` field is modified. And disallow the direct access to `connection_` from derived classes. ### Verifications `ClientCloseTest` is added to verify the following cases: - a single topic - a partitioned topic (multiple topics) - a partitioned topic with regex subscription `ClientCloseTest` is to verify these handlers are removed from `ClientImpl` and `ClientConnection`. `testShutdown` is to verify the handlers will be closed after the owned `Client` instance destroyed.
1 parent 7f7653b commit cc83de2

34 files changed

Lines changed: 537 additions & 430 deletions

.github/workflows/ci-pr-validation.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ jobs:
6868
run: ./pulsar-test-service-start.sh
6969

7070
- name: Run unit tests
71-
run: ./run-unit-tests.sh
71+
run: RETRY_FAILED=3 ./run-unit-tests.sh
7272

7373
- name: Stop Pulsar service
7474
run: ./pulsar-test-service-stop.sh

lib/AckGroupingTrackerDisabled.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include "HandlerBase.h"
2323
#include "PulsarApi.pb.h"
2424
#include <pulsar/MessageId.h>
25+
#include "LogUtils.h"
2526

2627
namespace pulsar {
2728

lib/AckGroupingTrackerEnabled.cc

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,9 @@ namespace pulsar {
3333

3434
DECLARE_LOG_OBJECT();
3535

36-
AckGroupingTrackerEnabled::AckGroupingTrackerEnabled(ClientImplPtr clientPtr,
37-
const HandlerBasePtr& handlerPtr, uint64_t consumerId,
38-
long ackGroupingTimeMs, long ackGroupingMaxSize)
36+
AckGroupingTrackerEnabled::AckGroupingTrackerEnabled(ClientImpl& client, const HandlerBasePtr& handlerPtr,
37+
uint64_t consumerId, long ackGroupingTimeMs,
38+
long ackGroupingMaxSize)
3939
: AckGroupingTracker(),
4040
handlerWeakPtr_(handlerPtr),
4141
consumerId_(consumerId),
@@ -46,7 +46,7 @@ AckGroupingTrackerEnabled::AckGroupingTrackerEnabled(ClientImplPtr clientPtr,
4646
rmutexPendingIndAcks_(),
4747
ackGroupingTimeMs_(ackGroupingTimeMs),
4848
ackGroupingMaxSize_(ackGroupingMaxSize),
49-
executor_(clientPtr->getIOExecutorProvider()->get()),
49+
executor_(client.getIOExecutorProvider()->get()),
5050
timer_(),
5151
mutexTimer_() {
5252
LOG_DEBUG("ACK grouping is enabled, grouping time " << ackGroupingTimeMs << "ms, grouping max size "

lib/AckGroupingTrackerEnabled.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,13 +41,13 @@ class AckGroupingTrackerEnabled : public AckGroupingTracker {
4141

4242
/**
4343
* Constructing ACK grouping tracker for peresistent topics.
44-
* @param[in] clientPtr pointer to client object.
44+
* @param[in] client the Client object.
4545
* @param[in] handlerPtr the shared pointer to connection handler.
4646
* @param[in] consumerId consumer ID that this tracker belongs to.
4747
* @param[in] ackGroupingTimeMs ACK grouping time window in milliseconds.
4848
* @param[in] ackGroupingMaxSize max. number of ACK requests can be grouped.
4949
*/
50-
AckGroupingTrackerEnabled(ClientImplPtr clientPtr, const HandlerBasePtr& handlerPtr, uint64_t consumerId,
50+
AckGroupingTrackerEnabled(ClientImpl& client, const HandlerBasePtr& handlerPtr, uint64_t consumerId,
5151
long ackGroupingTimeMs, long ackGroupingMaxSize);
5252

5353
void start() override;

lib/ClientConnection.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -314,7 +314,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
314314
typedef std::map<long, Promise<Result, NamespaceTopicsPtr>> PendingGetNamespaceTopicsMap;
315315
PendingGetNamespaceTopicsMap pendingGetNamespaceTopicsRequests_;
316316

317-
std::mutex mutex_;
317+
mutable std::mutex mutex_;
318318
typedef std::unique_lock<std::mutex> Lock;
319319

320320
// Pending buffers to write on the socket

lib/ClientImpl.cc

Lines changed: 59 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -170,10 +170,10 @@ void ClientImpl::handleCreateProducer(const Result result, const LookupDataResul
170170
if (!result) {
171171
ProducerImplBasePtr producer;
172172
if (partitionMetadata->getPartitions() > 0) {
173-
producer = std::make_shared<PartitionedProducerImpl>(shared_from_this(), topicName,
173+
producer = std::make_shared<PartitionedProducerImpl>(*this, topicName,
174174
partitionMetadata->getPartitions(), conf);
175175
} else {
176-
producer = std::make_shared<ProducerImpl>(shared_from_this(), *topicName, conf);
176+
producer = std::make_shared<ProducerImpl>(*this, *topicName, conf);
177177
}
178178
producer->getProducerCreatedFuture().addListener(
179179
std::bind(&ClientImpl::handleProducerCreated, shared_from_this(), std::placeholders::_1,
@@ -189,9 +189,15 @@ void ClientImpl::handleCreateProducer(const Result result, const LookupDataResul
189189
void ClientImpl::handleProducerCreated(Result result, ProducerImplBaseWeakPtr producerBaseWeakPtr,
190190
CreateProducerCallback callback, ProducerImplBasePtr producer) {
191191
if (result == ResultOk) {
192-
Lock lock(mutex_);
193-
producers_.push_back(producer);
194-
lock.unlock();
192+
auto pair = producers_.emplace(producer.get(), producer);
193+
if (!pair.second) {
194+
auto existingProducer = pair.first->second.lock();
195+
LOG_ERROR("Unexpected existing producer at the same address: "
196+
<< pair.first->first << ", producer: "
197+
<< (existingProducer ? existingProducer->getProducerName() : "(null)"));
198+
callback(ResultUnknownError, {});
199+
return;
200+
}
195201
callback(result, Producer(producer));
196202
} else {
197203
callback(result, {});
@@ -236,14 +242,23 @@ void ClientImpl::handleReaderMetadataLookup(const Result result, const LookupDat
236242
return;
237243
}
238244

239-
ReaderImplPtr reader = std::make_shared<ReaderImpl>(shared_from_this(), topicName->toString(), conf,
245+
ReaderImplPtr reader = std::make_shared<ReaderImpl>(*this, topicName->toString(), conf,
240246
getListenerExecutorProvider()->get(), callback);
241247
ConsumerImplBasePtr consumer = reader->getConsumer().lock();
242248
auto self = shared_from_this();
243249
reader->start(startMessageId, [this, self](const ConsumerImplBaseWeakPtr& weakConsumerPtr) {
244-
Lock lock(mutex_);
245-
consumers_.push_back(weakConsumerPtr);
246-
lock.unlock();
250+
auto consumer = weakConsumerPtr.lock();
251+
if (consumer) {
252+
auto pair = consumers_.emplace(consumer.get(), consumer);
253+
if (!pair.second) {
254+
auto existingConsumer = pair.first->second.lock();
255+
LOG_ERROR("Unexpected existing consumer at the same address: "
256+
<< pair.first->first
257+
<< ", consumer: " << (existingConsumer ? existingConsumer->getName() : "(null)"));
258+
}
259+
} else {
260+
LOG_ERROR("Unexpected case: the consumer is somehow expired");
261+
}
247262
});
248263
}
249264

@@ -286,7 +301,7 @@ void ClientImpl::createPatternMultiTopicsConsumer(const Result result, const Nam
286301
PatternMultiTopicsConsumerImpl::topicsPatternFilter(*topics, pattern);
287302

288303
consumer = std::make_shared<PatternMultiTopicsConsumerImpl>(
289-
shared_from_this(), regexPattern, *matchTopics, subscriptionName, conf, lookupServicePtr_);
304+
*this, regexPattern, *matchTopics, subscriptionName, conf, lookupServicePtr_);
290305

291306
consumer->getConsumerCreatedFuture().addListener(
292307
std::bind(&ClientImpl::handleConsumerCreated, shared_from_this(), std::placeholders::_1,
@@ -324,7 +339,7 @@ void ClientImpl::subscribeAsync(const std::vector<std::string>& topics, const st
324339
}
325340

326341
ConsumerImplBasePtr consumer = std::make_shared<MultiTopicsConsumerImpl>(
327-
shared_from_this(), topics, subscriptionName, topicNamePtr, conf, lookupServicePtr_);
342+
*this, topics, subscriptionName, topicNamePtr, conf, lookupServicePtr_);
328343

329344
consumer->getConsumerCreatedFuture().addListener(std::bind(&ClientImpl::handleConsumerCreated,
330345
shared_from_this(), std::placeholders::_1,
@@ -374,12 +389,12 @@ void ClientImpl::handleSubscribe(const Result result, const LookupDataResultPtr
374389
callback(ResultInvalidConfiguration, Consumer());
375390
return;
376391
}
377-
consumer = std::make_shared<MultiTopicsConsumerImpl>(shared_from_this(), topicName,
392+
consumer = std::make_shared<MultiTopicsConsumerImpl>(*this, topicName,
378393
partitionMetadata->getPartitions(),
379394
subscriptionName, conf, lookupServicePtr_);
380395
} else {
381-
auto consumerImpl = std::make_shared<ConsumerImpl>(
382-
shared_from_this(), topicName->toString(), subscriptionName, conf, topicName->isPersistent());
396+
auto consumerImpl = std::make_shared<ConsumerImpl>(*this, topicName->toString(), subscriptionName,
397+
conf, topicName->isPersistent());
383398
consumerImpl->setPartitionIndex(topicName->getPartitionIndex());
384399
consumer = consumerImpl;
385400
}
@@ -397,9 +412,15 @@ void ClientImpl::handleSubscribe(const Result result, const LookupDataResultPtr
397412
void ClientImpl::handleConsumerCreated(Result result, ConsumerImplBaseWeakPtr consumerImplBaseWeakPtr,
398413
SubscribeCallback callback, ConsumerImplBasePtr consumer) {
399414
if (result == ResultOk) {
400-
Lock lock(mutex_);
401-
consumers_.push_back(consumer);
402-
lock.unlock();
415+
auto pair = consumers_.emplace(consumer.get(), consumer);
416+
if (!pair.second) {
417+
auto existingConsumer = pair.first->second.lock();
418+
LOG_ERROR("Unexpected existing consumer at the same address: "
419+
<< pair.first->first
420+
<< ", consumer: " << (existingConsumer ? existingConsumer->getName() : "(null)"));
421+
callback(ResultUnknownError, {});
422+
return;
423+
}
403424
callback(result, Consumer(consumer));
404425
} else {
405426
callback(result, {});
@@ -478,12 +499,11 @@ void ClientImpl::getPartitionsForTopicAsync(const std::string& topic, GetPartiti
478499

479500
void ClientImpl::closeAsync(CloseCallback callback) {
480501
Lock lock(mutex_);
481-
ProducersList producers(producers_);
482-
ConsumersList consumers(consumers_);
483-
484-
if (state_ != Open && callback) {
502+
if (state_ != Open) {
485503
lock.unlock();
486-
callback(ResultAlreadyClosed);
504+
if (callback) {
505+
callback(ResultAlreadyClosed);
506+
}
487507
return;
488508
}
489509
// Set the state to Closing so that no producers could get added
@@ -492,12 +512,15 @@ void ClientImpl::closeAsync(CloseCallback callback) {
492512

493513
memoryLimitController_.close();
494514

515+
auto producers = producers_.move();
516+
auto consumers = consumers_.move();
517+
495518
SharedInt numberOfOpenHandlers = std::make_shared<int>(producers.size() + consumers.size());
496519
LOG_INFO("Closing Pulsar client with " << producers.size() << " producers and " << consumers.size()
497520
<< " consumers");
498521

499-
for (ProducersList::iterator it = producers.begin(); it != producers.end(); ++it) {
500-
ProducerImplBasePtr producer = it->lock();
522+
for (auto&& kv : producers) {
523+
ProducerImplBasePtr producer = kv.second.lock();
501524
if (producer && !producer->isClosed()) {
502525
producer->closeAsync(std::bind(&ClientImpl::handleClose, shared_from_this(),
503526
std::placeholders::_1, numberOfOpenHandlers, callback));
@@ -507,8 +530,8 @@ void ClientImpl::closeAsync(CloseCallback callback) {
507530
}
508531
}
509532

510-
for (ConsumersList::iterator it = consumers.begin(); it != consumers.end(); ++it) {
511-
ConsumerImplBasePtr consumer = it->lock();
533+
for (auto&& kv : consumers) {
534+
ConsumerImplBasePtr consumer = kv.second.lock();
512535
if (consumer && !consumer->isClosed()) {
513536
consumer->closeAsync(std::bind(&ClientImpl::handleClose, shared_from_this(),
514537
std::placeholders::_1, numberOfOpenHandlers, callback));
@@ -562,23 +585,18 @@ void ClientImpl::handleClose(Result result, SharedInt numberOfOpenHandlers, Resu
562585
}
563586

564587
void ClientImpl::shutdown() {
565-
Lock lock(mutex_);
566-
ProducersList producers;
567-
ConsumersList consumers;
568-
569-
producers.swap(producers_);
570-
consumers.swap(consumers_);
571-
lock.unlock();
588+
auto producers = producers_.move();
589+
auto consumers = consumers_.move();
572590

573-
for (ProducersList::iterator it = producers.begin(); it != producers.end(); ++it) {
574-
ProducerImplBasePtr producer = it->lock();
591+
for (auto&& kv : producers) {
592+
ProducerImplBasePtr producer = kv.second.lock();
575593
if (producer) {
576594
producer->shutdown();
577595
}
578596
}
579597

580-
for (ConsumersList::iterator it = consumers.begin(); it != consumers.end(); ++it) {
581-
ConsumerImplBasePtr consumer = it->lock();
598+
for (auto&& kv : consumers) {
599+
ConsumerImplBasePtr consumer = kv.second.lock();
582600
if (consumer) {
583601
consumer->shutdown();
584602
}
@@ -631,26 +649,24 @@ uint64_t ClientImpl::newRequestId() {
631649
}
632650

633651
uint64_t ClientImpl::getNumberOfProducers() {
634-
Lock lock(mutex_);
635652
uint64_t numberOfAliveProducers = 0;
636-
for (const auto& producer : producers_) {
653+
producers_.forEachValue([&numberOfAliveProducers](const ProducerImplBaseWeakPtr& producer) {
637654
const auto& producerImpl = producer.lock();
638655
if (producerImpl) {
639656
numberOfAliveProducers += producerImpl->getNumberOfConnectedProducer();
640657
}
641-
}
658+
});
642659
return numberOfAliveProducers;
643660
}
644661

645662
uint64_t ClientImpl::getNumberOfConsumers() {
646-
Lock lock(mutex_);
647663
uint64_t numberOfAliveConsumers = 0;
648-
for (const auto& consumer : consumers_) {
664+
consumers_.forEachValue([&numberOfAliveConsumers](const ConsumerImplBaseWeakPtr& consumer) {
649665
const auto consumerImpl = consumer.lock();
650666
if (consumerImpl) {
651667
numberOfAliveConsumers += consumerImpl->getNumberOfConnectedConsumer();
652668
}
653-
}
669+
});
654670
return numberOfAliveConsumers;
655671
}
656672

lib/ClientImpl.h

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
#include <atomic>
3232
#include <vector>
3333
#include "ServiceNameResolver.h"
34+
#include "SynchronizedHashMap.h"
3435

3536
namespace pulsar {
3637

@@ -91,6 +92,11 @@ class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
9192
ExecutorServiceProviderPtr getListenerExecutorProvider();
9293
ExecutorServiceProviderPtr getPartitionListenerExecutorProvider();
9394
LookupServicePtr getLookup();
95+
96+
void cleanupProducer(ProducerImplBase* address) { producers_.remove(address); }
97+
98+
void cleanupConsumer(ConsumerImplBase* address) { consumers_.remove(address); }
99+
94100
friend class PulsarFriend;
95101

96102
private:
@@ -147,11 +153,8 @@ class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
147153
uint64_t consumerIdGenerator_;
148154
uint64_t requestIdGenerator_;
149155

150-
typedef std::vector<ProducerImplBaseWeakPtr> ProducersList;
151-
ProducersList producers_;
152-
153-
typedef std::vector<ConsumerImplBaseWeakPtr> ConsumersList;
154-
ConsumersList consumers_;
156+
SynchronizedHashMap<ProducerImplBase*, ProducerImplBaseWeakPtr> producers_;
157+
SynchronizedHashMap<ConsumerImplBase*, ConsumerImplBaseWeakPtr> consumers_;
155158

156159
std::atomic<Result> closingError;
157160

lib/ConnectionPool.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ class PULSAR_PUBLIC ConnectionPool {
7777
std::mutex mutex_;
7878
std::atomic_bool closed_{false};
7979

80-
friend class ConnectionPoolTest;
80+
friend class PulsarFriend;
8181
};
8282
} // namespace pulsar
8383
#endif //_PULSAR_CONNECTION_POOL_HEADER_

0 commit comments

Comments
 (0)