Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/brpc/load_balancer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <gflags/gflags.h>
#include "brpc/reloadable_flags.h"
#include "brpc/load_balancer.h"
#include "brpc/socket.h"


namespace brpc {
Expand All @@ -34,6 +35,15 @@ BRPC_VALIDATE_GFLAG(show_lb_in_vars, PassValidate);
// For assigning unique names for lb.
static butil::static_atomic<int> g_lb_counter = BUTIL_STATIC_ATOMIC_INIT(0);

bool LoadBalancer::IsServerAvailable(SocketId id, SocketUniquePtr* out) {
SocketUniquePtr ptr;
bool res = Socket::Address(id, &ptr) == 0 && ptr->IsAvailable();
if (res) {
*out = std::move(ptr);
}
return res;
}

void SharedLoadBalancer::DescribeLB(std::ostream& os, void* arg) {
(static_cast<SharedLoadBalancer*>(arg))->Describe(os, DescribeOptions());
}
Expand Down
4 changes: 4 additions & 0 deletions src/brpc/load_balancer.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ class LoadBalancer : public NonConstDescribable, public Destroyable {

protected:
virtual ~LoadBalancer() { }

// Returns true and set `out' if the server is available (not failed, not logoff).
// Otherwise, returns false.
static bool IsServerAvailable(SocketId id, SocketUniquePtr* out);
};

DECLARE_bool(show_lb_in_vars);
Expand Down
3 changes: 1 addition & 2 deletions src/brpc/policy/consistent_hashing_load_balancer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -323,8 +323,7 @@ int ConsistentHashingLoadBalancer::SelectServer(
for (size_t i = 0; i < s->size(); ++i) {
if (((i + 1) == s->size() // always take last chance
|| !ExcludedServers::IsExcluded(in.excluded, choice->server_sock.id))
&& Socket::Address(choice->server_sock.id, out->ptr) == 0
&& (*out->ptr)->IsAvailable()) {
&& IsServerAvailable(choice->server_sock.id, out->ptr)) {
return 0;
} else {
if (++choice == s->end()) {
Expand Down
3 changes: 1 addition & 2 deletions src/brpc/policy/locality_aware_load_balancer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -302,8 +302,7 @@ int LocalityAwareLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out)
if (index < n) {
continue;
}
} else if (Socket::Address(info.server_id, out->ptr) == 0
&& (*out->ptr)->IsAvailable()) {
} else if (IsServerAvailable(info.server_id, out->ptr)) {
if ((ntry + 1) == n // Instead of fail with EHOSTDOWN, we prefer
// choosing the server again.
|| !ExcludedServers::IsExcluded(in.excluded, info.server_id)) {
Expand Down
3 changes: 1 addition & 2 deletions src/brpc/policy/randomized_load_balancer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,7 @@ int RandomizedLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) {
const SocketId id = s->server_list[offset].id;
if (((i + 1) == n // always take last chance
|| !ExcludedServers::IsExcluded(in.excluded, id))
&& Socket::Address(id, out->ptr) == 0
&& (*out->ptr)->IsAvailable()) {
&& IsServerAvailable(id, out->ptr)) {
// We found an available server
return 0;
}
Expand Down
3 changes: 1 addition & 2 deletions src/brpc/policy/round_robin_load_balancer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,7 @@ int RoundRobinLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) {
const SocketId id = s->server_list[tls.offset].id;
if (((i + 1) == n // always take last chance
|| !ExcludedServers::IsExcluded(in.excluded, id))
&& Socket::Address(id, out->ptr) == 0
&& (*out->ptr)->IsAvailable()) {
&& IsServerAvailable(id, out->ptr)) {
s.tls() = tls;
return 0;
}
Expand Down
25 changes: 10 additions & 15 deletions src/brpc/policy/weighted_randomized_load_balancer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,6 @@ size_t WeightedRandomizedLoadBalancer::RemoveServersInBatch(
return _db_servers.Modify(BatchRemove, servers);
}

bool WeightedRandomizedLoadBalancer::IsServerAvailable(SocketId id, SocketUniquePtr* out) {
return Socket::Address(id, out) == 0 && (*out)->IsAvailable();
}

int WeightedRandomizedLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) {
butil::DoublyBufferedData<Servers>::ScopedPtr s;
if (_db_servers.Read(&s) != 0) {
Expand All @@ -144,13 +140,13 @@ int WeightedRandomizedLoadBalancer::SelectServer(const SelectIn& in, SelectOut*
continue;
}
random_traversed.insert(id);
if (0 == IsServerAvailable(id, out->ptr)) {
if (IsServerAvailable(id, out->ptr)) {
// An available server is found.
return 0;
}
}

if (random_traversed.size() == n) {
if (random_traversed.size() < n) {
// Try to traverse the remaining servers to find an available server.
uint32_t offset = butil::fast_rand_less_than(n);
uint32_t stride = bthread::prime_offset();
Expand All @@ -161,19 +157,18 @@ int WeightedRandomizedLoadBalancer::SelectServer(const SelectIn& in, SelectOut*
continue;
}
if (IsServerAvailable(id, out->ptr)) {
// An available server is found.
return 0;
if (!ExcludedServers::IsExcluded(in.excluded, id)) {
// Prioritize servers that are not excluded.
return 0;
}
}
}
}

if (NULL != out->ptr) {
// Use the excluded but available server.
return 0;
}

// After traversing the whole server list, no available server is found.
return EHOSTDOWN;
// Returns EHOSTDOWN, if no available server is found
// after traversing the whole server list.
// Otherwise, returns 0 with a available excluded server.
return NULL == out->ptr ? EHOSTDOWN : 0;
}

LoadBalancer* WeightedRandomizedLoadBalancer::New(
Expand Down
3 changes: 1 addition & 2 deletions src/brpc/policy/weighted_randomized_load_balancer.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class WeightedRandomizedLoadBalancer : public LoadBalancer {
void Describe(std::ostream& os, const DescribeOptions&) override;

struct Server {
Server(SocketId s_id = 0, uint32_t s_w = 0, uint64_t s_c_w_s = 0)
explicit Server(SocketId s_id = 0, uint32_t s_w = 0, uint64_t s_c_w_s = 0)
: id(s_id), weight(s_w), current_weight_sum(s_c_w_s) {}
SocketId id;
uint32_t weight;
Expand All @@ -61,7 +61,6 @@ class WeightedRandomizedLoadBalancer : public LoadBalancer {
static bool Remove(Servers& bg, const ServerId& id);
static size_t BatchAdd(Servers& bg, const std::vector<ServerId>& servers);
static size_t BatchRemove(Servers& bg, const std::vector<ServerId>& servers);
static bool IsServerAvailable(SocketId id, SocketUniquePtr* out);

butil::DoublyBufferedData<Servers> _db_servers;
};
Expand Down
14 changes: 13 additions & 1 deletion test/brpc_load_balancer_unittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ struct SelectArg {
};

void* select_server(void* arg) {
SelectArg *sa = (SelectArg *)arg;
SelectArg *sa = (SelectArg*)arg;
brpc::LoadBalancer* c = sa->lb;
brpc::SocketUniquePtr ptr;
CountMap *selected_count = new CountMap;
Expand Down Expand Up @@ -951,6 +951,7 @@ TEST_F(LoadBalancerTest, weighted_randomized) {
brpc::policy::WeightedRandomizedLoadBalancer wrlb;
size_t valid_weight_num = 4;

std::vector<brpc::SocketId> ids;
// Add server to selected list. The server with invalid weight will be skipped.
for (size_t i = 0; i < ARRAY_SIZE(servers); ++i) {
const char *addr = servers[i];
Expand All @@ -961,6 +962,7 @@ TEST_F(LoadBalancerTest, weighted_randomized) {
options.remote_side = dummy;
options.user = new SaveRecycle;
ASSERT_EQ(0, brpc::Socket::Create(options, &id.id));
ids.emplace_back(id.id);
id.tag = weight[i];
if (i < valid_weight_num) {
int weight_num = 0;
Expand Down Expand Up @@ -1010,6 +1012,16 @@ TEST_F(LoadBalancerTest, weighted_randomized) {
// actual_rate <= expect_rate * 2
ASSERT_LE(actual_rate, expect_rate * 2);
}

for (size_t i = 1; i < ids.size(); ++i) {
brpc::Socket::SetFailed(ids[i]);
}
select_result.clear();
for (int i = 0; i < run_times; ++i) {
EXPECT_EQ(0, wrlb.SelectServer(in, &out));
// The only choice is servers[0].
ASSERT_STREQ(butil::endpoint2str(ptr->remote_side()).c_str(), servers[0]);
}
}

TEST_F(LoadBalancerTest, health_check_no_valid_server) {
Expand Down
Loading